-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathstreaming_sse.py
More file actions
87 lines (70 loc) · 3.44 KB
/
streaming_sse.py
File metadata and controls
87 lines (70 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
from pytonapi.streaming import (
AccountStateNotification,
ActionsNotification,
Finality,
JettonsNotification,
TonapiSSE,
TransactionsNotification,
)
from pytonapi.types import Network
# TONAPI key — get one at https://tonconsole.com/
API_KEY = "YOUR_API_KEY"
# Target network — MAINNET or TESTNET
NETWORK = Network.MAINNET
# Account address to track — any form (raw, bounceable, non-bounceable)
ACCOUNT = "UQBAjaOyi2wGWlk-EDkSabqqnF-MrrwMadnwqrurKpkla4QB"
async def main() -> None:
client = TonapiSSE(API_KEY, NETWORK)
# Register handlers via decorators before calling start()
# Each decorator subscribes to a specific notification type
# min_finality controls when handlers fire:
# PENDING — as soon as the transaction is seen (unconfirmed)
# CONFIRMED — included in a block but not yet finalized
# FINALIZED — irreversible (default)
@client.on_transactions(min_finality=Finality.FINALIZED)
async def handle_transactions(event: TransactionsNotification) -> None:
# event.transactions — list of raw transaction dicts
# event.trace_external_hash_norm — trace hash grouping related TXs
# event.is_finalized / is_confirmed / is_pending — finality helpers
for tx in event.transactions:
print(f"TX: {tx.get('hash', '?')} | finality={event.finality}")
@client.on_actions(min_finality=Finality.CONFIRMED)
async def handle_actions(event: ActionsNotification) -> None:
# event.actions — list of parsed action dicts (ton_transfer, jetton_transfer, etc.)
# Use action_types= in the decorator to filter by specific action types
for action in event.actions:
print(f"Action: {action.get('type', '?')} | finality={event.finality}")
@client.on_account_states(min_finality=Finality.FINALIZED)
async def handle_account_state(event: AccountStateNotification) -> None:
# event.account — address that changed
# event.state — AccountState with hash, balance, account_status, etc.
# Note: account_state_change does not support PENDING finality
balance = event.state.balance if event.state else "?"
print(f"Account state: {event.account} | balance={balance}")
@client.on_jettons(min_finality=Finality.FINALIZED)
async def handle_jettons(event: JettonsNotification) -> None:
# event.jetton — JettonWallet with address, balance, owner, jetton master
# Note: jettons_change does not support PENDING finality
if event.jetton:
print(f"Jetton: {event.jetton.jetton} | balance={event.jetton.balance}")
# start() creates a session, subscribes, and blocks until stop() is called
# addresses — which accounts to monitor (required for all types except trace)
# include_address_book — resolve addresses to DNS names in notifications
# include_metadata — include token metadata in notifications
# Auto-stop after 60 seconds for demo purposes
async def _stop_later() -> None:
await asyncio.sleep(60)
await client.stop()
background_tasks: set[asyncio.Task[None]] = set()
task = asyncio.create_task(_stop_later())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
print("Subscribing via SSE...")
await client.start(
addresses=[ACCOUNT],
include_address_book=True,
)
print("Stopped.")
if __name__ == "__main__":
asyncio.run(main())