-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathevent_producing.py
More file actions
143 lines (108 loc) · 4.72 KB
/
event_producing.py
File metadata and controls
143 lines (108 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
Example: Event Producing with Message Broker
This example demonstrates how to produce and publish events to a message broker
using NotificationEvent. The system shows how to publish events to message brokers
for asynchronous processing in event-driven architectures.
Use case: Decoupling event producers from consumers. Events are published to message
brokers (like Kafka, RabbitMQ) where they can be consumed by multiple subscribers
asynchronously. This enables scalable, loosely-coupled microservices.
================================================================================
HOW TO RUN THIS EXAMPLE
================================================================================
Run the example:
python examples/event_producing.py
The example will:
- Execute JoinMeetingCommand commands
- Emit multiple NotificationEvent instances with different topics
- Publish events to DevnullMessageBroker (in-memory for testing)
- Verify that events are sent to the message broker
Note: This example uses DevnullMessageBroker for testing. In production, you would
use a real message broker like Kafka or RabbitMQ.
================================================================================
WHAT THIS EXAMPLE DEMONSTRATES
================================================================================
1. NotificationEvent Creation:
- Create NotificationEvent instances with typed payloads (Pydantic models)
- Specify event names and topics for routing
- Events can have different payload types for different purposes
2. Multiple Events from Single Command:
- A single command can emit multiple events
- Events can be published to different topics
- Each event type serves a different purpose (ECST events, notifications)
3. Message Broker Integration:
- Configure message broker in bootstrap.bootstrap()
- Events are automatically published to the broker after command execution
- Use DevnullMessageBroker for testing without external dependencies
4. Event Publishing:
- Events are serialized and sent to message broker
- Broker routes events to appropriate topics
- Consumers can subscribe to topics to receive events
================================================================================
REQUIREMENTS
================================================================================
Make sure you have installed:
- cqrs (this package)
- di (dependency injection)
- pydantic (for typed payloads)
================================================================================
"""
import asyncio
import logging
import di
import pydantic
import cqrs
from cqrs.message_brokers import devnull
from cqrs.requests import bootstrap
logging.basicConfig(level=logging.DEBUG)
class UserJoinedNotificationPayload(pydantic.BaseModel, frozen=True):
user_id: str
meeting_id: str
class UserJoinedECSTPayload(pydantic.BaseModel, frozen=True):
user_id: str
meeting_id: str
class JoinMeetingCommand(cqrs.Request):
user_id: str
meeting_id: str
class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
def __init__(self):
self._events = []
@property
def events(self):
return self._events
async def handle(self, request: JoinMeetingCommand) -> None:
print(f"User {request.user_id} joined meeting {request.meeting_id}")
self._events.append(
cqrs.NotificationEvent[UserJoinedECSTPayload](
event_name="user_joined_ecst",
topic="user_ecst_events",
payload=UserJoinedECSTPayload(
user_id=request.user_id,
meeting_id=request.meeting_id,
),
),
)
self._events.append(
cqrs.NotificationEvent[UserJoinedNotificationPayload](
event_name="user_joined_notification",
topic="user_notification_events",
payload=UserJoinedNotificationPayload(
user_id=request.user_id,
meeting_id=request.meeting_id,
),
),
)
def command_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
async def main():
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=command_mapper,
message_broker=devnull.DevnullMessageBroker(),
)
await mediator.send(JoinMeetingCommand(user_id="1", meeting_id="1"))
await mediator.send(JoinMeetingCommand(user_id="1", meeting_id="2"))
await mediator.send(JoinMeetingCommand(user_id="1", meeting_id="3"))
assert len(devnull.MESSAGE_BUS) == 6 # type: ignore
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())