Real-Time Bluesky Post Deletion Watcher with Web UI
Ever wondered what posts are being deleted on Bluesky in real time? The deleted_web.py script is a powerful tool that connects to the Bluesky Jetstream, watches for post deletions, and displays them instantly in a web interface. Here’s how it works and how you can run it yourself.
What Does deleted_web.py Do?
Subscribes to Bluesky Jetstream for app.bsky.feed.post events.
Caches new posts (text, images, counts, etc.) for a short window (default: 60 seconds).
Detects deletions: When a post is deleted, it checks if it’s in the cache (i.e., was created recently).
Prints deleted post info to the console and publishes it to a live web UI.
Web UI: A simple, modern web page that updates in real time as posts are deleted.
How Does It Work?
The script connects to one of several Jetstream websocket endpoints. It listens for events related to post creation and deletion.
When a new post is created, its details (text, images, like/repost/quote counts) are cached with a timestamp. The cache only keeps posts for a configurable time window (default: 60 seconds).
When a delete event is received:
The cache is purged of any entries older than the buffer window.
If the deleted post is still in the cache, its details are printed and sent to the web UI.
The cache entry is then removed.
A minimal web server (using aiohttp) serves a single-page app at /. The UI connects to /events via Server-Sent Events (SSE) and updates instantly as new deletions are detected. Each deleted post shows:
A link to the original post (now deleted)
The post text
Any images
Like, repost, and quote counts
The script handles SIGINT/SIGTERM for clean shutdowns.
How to Run It
Make sure you have Python 3.8+ and install the required packages:
pip install aiohttp websockets
python deleted_web.py
By default, the web UI will be available at http://localhost:8080.
You can customize behavior with environment variables:
JETSTREAM_URL: Comma-separated list of Jetstream websocket URLs (optional)
SELF_BUFFER_SECONDS: How long to keep posts in cache (default: 60)
SELF_LOG_LEVEL: Logging level (default: INFO)
SELF_LIKE_HTTP_HOST: Web server host (default: 0.0.0.0)
SELF_LIKE_HTTP_PORT: Web server port (default: 8080)
Example:
SELF_BUFFER_SECONDS=90 SELF_LIKE_HTTP_PORT=9000 python deleted_web.py
Open your browser to the configured host/port (e.g., http://localhost:8080). As posts are deleted, they’ll appear instantly in the feed.
Summary
deleted_web.py is a handy tool for monitoring Bluesky post deletions in real time, complete with a live-updating web interface. It’s easy to run, requires minimal setup, and is highly configurable. Perfect for moderation, research, or just curiosity!
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Bluesky Jetstream 1-minute delete watcher with enriched Web UI - Subscribes to Jetstream for app.bsky.feed.post events - Caches post creates (text + did + images + counts) for 60 seconds - On delete: * purge cache older than 60s * prints to console * publishes event to web UI """ import asyncio import json import logging import os import random import signal import sys import time from typing import Dict, Tuple, Optional, List import websockets from aiohttp import web # -------- Config -------- DEFAULT_JETSTREAM_HOSTS = [ "jetstream2.us-east.bsky.network", "jetstream1.us-east.bsky.network", "jetstream1.us-west.bsky.network", "jetstream2.us-west.bsky.network", ] WANTED_COLLECTION = "app.bsky.feed.post" JETSTREAM_URL_ENV = os.environ.get("JETSTREAM_URL", "").strip() BUFFER_SECONDS = int(os.environ.get("SELF_BUFFER_SECONDS", "60") or "60") LOG_LEVEL = os.environ.get("SELF_LOG_LEVEL", "INFO").upper() HTTP_HOST = os.environ.get("SELF_LIKE_HTTP_HOST", "0.0.0.0") HTTP_PORT = int(os.environ.get("SELF_LIKE_HTTP_PORT", "8080")) OPEN_TIMEOUT = 20 PING_INTERVAL = 30 PING_TIMEOUT = 20 MAX_QUEUE = 1024 # -------- Helpers -------- def _build_default_endpoints() -> List[str]: return [ f"wss://{host}/subscribe?wantedCollections={WANTED_COLLECTION}" for host in DEFAULT_JETSTREAM_HOSTS ] def _get_endpoints() -> List[str]: if JETSTREAM_URL_ENV: urls = [u.strip() for u in JETSTREAM_URL_ENV.split(",") if u.strip()] if urls: return urls return _build_default_endpoints() ENDPOINTS = _get_endpoints() def _iter_ops(evt: dict) -> List[dict]: ops: List[dict] = [] top = evt.get("ops") if isinstance(top, list): ops.extend([o for o in top if isinstance(o, dict)]) commit = evt.get("commit") if isinstance(commit, dict): c_ops = commit.get("ops") if isinstance(c_ops, list): ops.extend([o for o in c_ops if isinstance(o, dict)]) operation = (commit.get("operation") or commit.get("op") or "").lower() collection = commit.get("collection") or "" rkey = commit.get("rkey") record = commit.get("record") path = None if collection and rkey: path = f"{collection}/{rkey}" if operation in ("create", "delete") and collection and rkey: pseudo = {"action": operation, "operation": operation, "path": path} if isinstance(record, dict): pseudo["record"] = record ops.append(pseudo) return ops def _op_action(op: dict) -> str: return (op.get("action") or op.get("operation") or "").lower() def _op_path(op: dict) -> str: return str(op.get("path") or "") def _extract_rkey_from_path(path: str, expected_collection: str) -> Optional[str]: if not path or not path.startswith(expected_collection + "/"): return None parts = path.split("/", 1) if len(parts) != 2: return None return parts[1].strip() or None def _event_repo_did(evt: dict) -> Optional[str]: for k in ("did", "repo"): v = evt.get(k) if isinstance(v, str) and v: return v commit = evt.get("commit") if isinstance(commit, dict): v = commit.get("repo") if isinstance(v, str) and v: return v return None def _extract_post_info(op: dict) -> dict: rec = op.get("record") or op.get("value") or op.get("rec") result = { "text": "", "images": [], "like_count": 0, "repost_count": 0, "quote_count": 0 } if not isinstance(rec, dict): return result result["text"] = rec.get("text", "") # images if present images = rec.get("images") or rec.get("media") or [] if isinstance(images, list): result["images"] = [img.get("thumb") or img.get("url") for img in images if isinstance(img, dict)] counts = rec.get("counts") or {} if isinstance(counts, dict): result["like_count"] = counts.get("likes", counts.get("likeCount", 0)) or 0 result["repost_count"] = counts.get("reposts", counts.get("repostCount", 0)) or 0 result["quote_count"] = counts.get("quotes", counts.get("quoteCount", 0)) or 0 return result # -------- Cache -------- CacheKey = Tuple[str, str] CacheVal = Tuple[dict, float] class MinuteCache: def __init__(self, ttl_seconds: int = 120) -> None: self.ttl = ttl_seconds self._data: Dict[CacheKey, CacheVal] = {} def put(self, did: str, rkey: str, info: dict, now_s: Optional[float] = None) -> None: if not now_s: now_s = time.time() self._data[(did, rkey)] = (info, now_s) def get(self, did: str, rkey: str) -> Optional[CacheVal]: return self._data.get((did, rkey)) def purge_older_than_now(self, now_s: Optional[float] = None) -> int: if not now_s: now_s = time.time() cutoff = now_s - self.ttl to_del: List[CacheKey] = [k for k, (_, ts) in self._data.items() if ts < cutoff] for k in to_del: self._data.pop(k, None) return len(to_del) CACHE = MinuteCache(BUFFER_SECONDS) # -------- Web UI -------- class BroadcastHub: def __init__(self): self._clients: set[asyncio.Queue] = set() self._lock = asyncio.Lock() async def subscribe(self) -> asyncio.Queue: q = asyncio.Queue(maxsize=200) async with self._lock: self._clients.add(q) return q async def unsubscribe(self, q: asyncio.Queue): async with self._lock: self._clients.discard(q) async def publish(self, item: dict): async with self._lock: clients = list(self._clients) for q in clients: try: q.put_nowait(item) except asyncio.QueueFull: try: _ = q.get_nowait() except Exception: pass try: q.put_nowait(item) except Exception: pass HUB = BroadcastHub() INDEX_HTML = """<!doctype html> <html> <head> <meta charset="utf-8"/> <title>Deleted posts watcher</title> <style> body { font-family: sans-serif; margin: 1em; background: #f9f9f9; } .item { margin-bottom: 1.5em; padding: 0.8em; background: #fff; border-radius: 8px; box-shadow: 0 2px 5px rgba(0,0,0,0.1);} .item img { max-width: 200px; margin: 0.2em; border-radius: 4px;} .stats { font-size: 0.9em; color: #555; } </style> </head> <body> <h1>Deleted posts</h1> <div id="feed"></div> <script> const feed = document.getElementById("feed"); function addItem(evt) { const div = document.createElement("div"); div.className = "item"; let html = `<a href="${evt.url}" target="_blank"><b>${evt.url}</b></a><br/>`; html += `<div>${evt.text ? evt.text.replace(/\\n/g,"<br>") : "(no text)"}</div>`; if(evt.images && evt.images.length>0){ html += '<div>' + evt.images.map(i=>`<img src="${i}">`).join('') + '</div>'; } html += `<div class="stats">Likes: ${evt.like_count} | Reposts: ${evt.repost_count} | Quotes: ${evt.quote_count}</div>`; div.innerHTML = html; feed.prepend(div); } const es = new EventSource("/events"); es.onmessage = e => { try { addItem(JSON.parse(e.data)); } catch {} }; </script> </body> </html> """ async def index_handler(_request: web.Request): return web.Response(text=INDEX_HTML, content_type="text/html") async def events_handler(request: web.Request): q = await HUB.subscribe() resp = web.StreamResponse( status=200, reason="OK", headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}, ) await resp.prepare(request) try: while True: item = await q.get() await resp.write(f"data: {json.dumps(item)}\n\n".encode("utf-8")) except asyncio.CancelledError: pass finally: await HUB.unsubscribe(q) return resp async def start_web_server() -> web.AppRunner: app = web.Application() app.add_routes([web.get("/", index_handler), web.get("/events", events_handler)]) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, HTTP_HOST, HTTP_PORT) await site.start() logging.info("Web UI running at http://%s:%d", HTTP_HOST, HTTP_PORT) return runner # -------- Watcher -------- async def watch() -> None: backoff = 1.0 idx = 0 while True: url = ENDPOINTS[idx % len(ENDPOINTS)] try: logging.info("Connecting to Jetstream: %s", url) async with websockets.connect( url, open_timeout=OPEN_TIMEOUT, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, max_queue=MAX_QUEUE, compression="deflate", ) as ws: logging.info("Connected.") backoff = 1.0 async for raw in ws: try: evt = json.loads(raw) except json.JSONDecodeError: continue kind = str(evt.get("kind") or "").lower() if kind and kind != "commit": continue did = _event_repo_did(evt) if not isinstance(did, str): continue for op in _iter_ops(evt): action = _op_action(op) path = _op_path(op) rkey = _extract_rkey_from_path(path, WANTED_COLLECTION) if not rkey: continue if action == "create": info = _extract_post_info(op) CACHE.put(did, rkey, info, time.time()) elif action == "delete": CACHE.purge_older_than_now(time.time()) val = CACHE.get(did, rkey) if val: info, ts = val age = time.time() - ts bsky_url = f"https://bsky.app/profile/{did}/post/{rkey}" print(f"[DELETED] did={did} rkey={rkey} age={age:.1f}s", flush=True) print(f"URL: {bsky_url}", flush=True) if info.get("text"): print("Text:", info["text"], flush=True) print("----------", flush=True) payload = { "did": did, "rkey": rkey, "url": bsky_url, "text": info.get("text", ""), "images": info.get("images", []), "like_count": info.get("like_count", 0), "repost_count": info.get("repost_count", 0), "quote_count": info.get("quote_count", 0), "age": age } await HUB.publish(payload) CACHE._data.pop((did, rkey), None) except asyncio.CancelledError: raise except Exception as e: delay = backoff + random.uniform(0, backoff * 0.5) logging.warning("Disconnected/error: %s. Reconnecting in %.1fs", e, delay) await asyncio.sleep(delay) backoff = min(backoff * 2, 60) idx += 1 # -------- Main -------- def main() -> None: logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s %(levelname)s %(message)s", ) loop = asyncio.get_event_loop() runner = loop.run_until_complete(start_web_server()) tasks = [loop.create_task(watch())] # Graceful shutdown def stop(*_): logging.info("Stopping...") for t in tasks: t.cancel() loop.stop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, stop) try: loop.run_forever() finally: loop.run_until_complete(runner.cleanup()) loop.close() if __name__ == "__main__": main()