Skip to content

Example D · SSE Streaming

Subscribe to real-time events from Privitty Edge using curl, shell, or Python.


Raw stream (curl)

curl -N http://127.0.0.1:7200/events

Sample output:

data: {"type":"IncomingMsg","account_id":1,"chat_id":7,"msg_id":42}

data: {"type":"MsgDelivered","account_id":1,"chat_id":7,"msg_id":42}

: ping

Press Ctrl+C to stop.


Shell — filter incoming messages

#!/usr/bin/env bash
# watch-messages.sh — print new incoming messages

curl -N -s http://127.0.0.1:7200/events | while read -r line; do
  case "$line" in
    data:*)
      type=$(echo "$line" | sed 's/^data: //' | jq -r .type 2>/dev/null)
      case "$type" in
        IncomingMsg)
          echo "$line" | sed 's/^data: //' | jq '{type, chat_id, msg_id}'
          ;;
        MsgDelivered|MsgRead)
          echo "$line" | sed 's/^data: //' | jq .
          ;;
        Error|Warning)
          echo "⚠ $line" >&2
          ;;
      esac
      ;;
  esac
done

Run:

chmod +x watch-messages.sh
./watch-messages.sh

Python — structured event handler

#!/usr/bin/env python3
"""Listen for Privitty Edge events and dispatch to handlers."""

import json
import urllib.request

ENDPOINT = "http://127.0.0.1:7200/events"

HANDLERS = {
    "IncomingMsg": lambda e: print(f"📨 chat={e['chat_id']} msg={e['msg_id']}"),
    "MsgDelivered": lambda e: print(f"✓✓ delivered msg={e['msg_id']}"),
    "MsgRead": lambda e: print(f"👁 read msg={e['msg_id']}"),
    "SecurejoinInviterProgress": lambda e: print(f"🤝 join progress={e.get('progress')}"),
    "PrivittyFileEncrypted": lambda e: print(f"🔒 file encrypted msg={e['msg_id']}"),
    "Error": lambda e: print(f"✗ {e.get('msg')}"),
}


def main():
    req = urllib.request.Request(ENDPOINT, headers={"Accept": "text/event-stream"})
    with urllib.request.urlopen(req, timeout=None) as resp:
        for raw_line in resp:
            line = raw_line.decode("utf-8").strip()
            if not line.startswith("data:"):
                continue
            payload = json.loads(line.removeprefix("data:").strip())
            handler = HANDLERS.get(payload.get("type"))
            if handler:
                handler(payload)
            else:
                print(f{payload.get('type')}", payload)


if __name__ == "__main__":
    main()

Install: Python 3.9+ (stdlib only — no pip packages required).

Run:

python3 watch_events.py

Python — fetch message body on incoming event

Extend the handler to load the message text when IncomingMsg arrives:

import json
import urllib.request

RPC = "http://127.0.0.1:7200/rpc"


def rpc(method, params):
    body = json.dumps({"jsonrpc": "2.0", "method": method, "params": params, "id": 1}).encode()
    req = urllib.request.Request(RPC, data=body, headers={"Content-Type": "application/json"})
    with urllib.request.urlopen(req) as resp:
        return json.loads(resp.read())["result"]


def on_incoming(chat_id, msg_id):
    msgs = rpc("get_messages", [1, [msg_id]])
    msg = msgs[str(msg_id)]
    print(f"From chat {chat_id}: {msg.get('text', '(file/message)')}")


# … wire into the SSE loop above

Integration patterns

Pattern Approach
Polling alternative Long-lived SSE connection — lower latency than polling /rpc
Systemd service Run the Python script as a Type=simple unit alongside the container
Docker sidecar Second container on the same network reading http://privitty-edged:7200/events
Reconnect on failure Wrap the stream in a retry loop with exponential backoff

Full event reference: SSE Events


Next