Skip to content

Python: Add drain method to get all events within the dora queue#1177

Merged
haixuanTao merged 11 commits intomainfrom
add-drain-method
Nov 20, 2025
Merged

Python: Add drain method to get all events within the dora queue#1177
haixuanTao merged 11 commits intomainfrom
add-drain-method

Conversation

@haixuanTao
Copy link
Copy Markdown
Collaborator

@haixuanTao haixuanTao commented Nov 2, 2025

In order to reduce having too many filled queues., node.drain() enable to get all event from all input as a vec/list.

In order to make this fast and manageable the order of draining is going to be first ordered by topic and then by time of arrival

@haixuanTao haixuanTao changed the base branch from main to simplifying-example-with-build-and-run-function-from-cli November 2, 2025 12:50
Base automatically changed from simplifying-example-with-build-and-run-function-from-cli to main November 4, 2025 12:32
@haixuanTao haixuanTao force-pushed the add-drain-method branch 2 times, most recently from 9acd776 to 2b4239d Compare November 5, 2025 04:56
Comment on lines +302 to +314
if self.scheduler.is_empty() {
if let Some(event) = self.receiver.next().await {
self.scheduler.add_event(event);
} else {
break;
}
} else {
match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
Either::Left((_elapsed, _)) => break,
Either::Right((Some(event), _)) => self.scheduler.add_event(event),
Either::Right((None, _)) => break,
};
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you're trying to do here? It looks like this code will just block until the event channel is closed?

@phil-opp
Copy link
Copy Markdown
Collaborator

phil-opp commented Nov 5, 2025

I'm not sure if this is the best approach to empty the queues. How about we instead add a try_recv method that doesn't block or awaits anything? So it behaves like recv, but returns None instead of waiting for the next event if there is nothing in the queue.

Such a function would allow node authors to empty the queue themselves by just calling try_recv in a loop. For convenience, we could also provide a drain method based on it:

fn drain(&mut self) -> Vec<Event> {
    let mut events = Vec::new();
    while let Some(event) = self.try_recv() {
        events.push(event);
    }
    events
}

This pattern is common in the Rust ecosystem.

In order to make this fast and manageable the order of draining is going to be first ordered by topic and then by time of arrival

I'm not sure if I understand this part. How is a different drain order faster?

@haixuanTao haixuanTao force-pushed the add-drain-method branch 2 times, most recently from a890ef2 to 54ef17b Compare November 7, 2025 05:59
@phil-opp
Copy link
Copy Markdown
Collaborator

phil-opp commented Nov 7, 2025

I just opened #1185 to provied a simpler drain method based on try_recv.

haixuanTao added a commit that referenced this pull request Nov 9, 2025
Alternative to #1177.

Based on #1183, should be merged after.
@phil-opp phil-opp changed the title Add drain method to get all events within the dora queue Python: Add drain method to get all events within the dora queue Nov 11, 2025
@haixuanTao haixuanTao merged commit 76c3503 into main Nov 20, 2025
50 checks passed
@haixuanTao haixuanTao deleted the add-drain-method branch November 20, 2025 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants