Example D · SSE Streaming¶
Subscribe to real-time events from Privitty Edge using curl, shell, or Python.
Raw stream (curl)¶
Sample output:
data: {"type":"IncomingMsg","account_id":1,"chat_id":7,"msg_id":42}
data: {"type":"MsgDelivered","account_id":1,"chat_id":7,"msg_id":42}
: ping
Press Ctrl+C to stop.
Shell — filter incoming messages¶
#!/usr/bin/env bash
# watch-messages.sh — print new incoming messages
curl -N -s http://127.0.0.1:7200/events | while read -r line; do
case "$line" in
data:*)
type=$(echo "$line" | sed 's/^data: //' | jq -r .type 2>/dev/null)
case "$type" in
IncomingMsg)
echo "$line" | sed 's/^data: //' | jq '{type, chat_id, msg_id}'
;;
MsgDelivered|MsgRead)
echo "$line" | sed 's/^data: //' | jq .
;;
Error|Warning)
echo "⚠ $line" >&2
;;
esac
;;
esac
done
Run:
Python — structured event handler¶
#!/usr/bin/env python3
"""Listen for Privitty Edge events and dispatch to handlers."""
import json
import urllib.request
ENDPOINT = "http://127.0.0.1:7200/events"
HANDLERS = {
"IncomingMsg": lambda e: print(f"📨 chat={e['chat_id']} msg={e['msg_id']}"),
"MsgDelivered": lambda e: print(f"✓✓ delivered msg={e['msg_id']}"),
"MsgRead": lambda e: print(f"👁 read msg={e['msg_id']}"),
"SecurejoinInviterProgress": lambda e: print(f"🤝 join progress={e.get('progress')}"),
"PrivittyFileEncrypted": lambda e: print(f"🔒 file encrypted msg={e['msg_id']}"),
"Error": lambda e: print(f"✗ {e.get('msg')}"),
}
def main():
req = urllib.request.Request(ENDPOINT, headers={"Accept": "text/event-stream"})
with urllib.request.urlopen(req, timeout=None) as resp:
for raw_line in resp:
line = raw_line.decode("utf-8").strip()
if not line.startswith("data:"):
continue
payload = json.loads(line.removeprefix("data:").strip())
handler = HANDLERS.get(payload.get("type"))
if handler:
handler(payload)
else:
print(f"· {payload.get('type')}", payload)
if __name__ == "__main__":
main()
Install: Python 3.9+ (stdlib only — no pip packages required).
Run:
Python — fetch message body on incoming event¶
Extend the handler to load the message text when IncomingMsg arrives:
import json
import urllib.request
RPC = "http://127.0.0.1:7200/rpc"
def rpc(method, params):
body = json.dumps({"jsonrpc": "2.0", "method": method, "params": params, "id": 1}).encode()
req = urllib.request.Request(RPC, data=body, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())["result"]
def on_incoming(chat_id, msg_id):
msgs = rpc("get_messages", [1, [msg_id]])
msg = msgs[str(msg_id)]
print(f"From chat {chat_id}: {msg.get('text', '(file/message)')}")
# … wire into the SSE loop above
Integration patterns¶
| Pattern | Approach |
|---|---|
| Polling alternative | Long-lived SSE connection — lower latency than polling /rpc |
| Systemd service | Run the Python script as a Type=simple unit alongside the container |
| Docker sidecar | Second container on the same network reading http://privitty-edged:7200/events |
| Reconnect on failure | Wrap the stream in a retry loop with exponential backoff |
Full event reference: SSE Events
Next¶
- Example E · License — activation and error handling