Skip to content

Commit 70a3ecf

Browse files
committed
Split events into two types, and prioritize packet events
Previously, when the host would get the next event and there were multiple events at the same time, it would prioritize events with a lower source host ID. This means that sometimes packets would be prioritized over local events, and sometimes local events would be prioritized depending on the source host's ID relative to the current host's ID. This PR removes this inconsistency, and now packet events will always be prioritized over local events. This PR also splits the event into separate event types: packet events (that contain a packet object) and local events (that contain a task). This means that we can take a packet object directly from a packet event, which is more flexible than running an arbitrary task closure containing the packet object.
1 parent 3cf95e9 commit 70a3ecf

3 files changed

Lines changed: 146 additions & 54 deletions

File tree

src/main/core/work/event.rs

Lines changed: 114 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::host::host::Host;
2+
use crate::network::packet::Packet;
23
use crate::utility::{Magic, ObjectCounter};
34
use shadow_shim_helper_rs::emulated_time::EmulatedTime;
45
use shadow_shim_helper_rs::HostId;
@@ -8,51 +9,55 @@ use super::task::TaskRef;
89
#[derive(Debug)]
910
pub struct Event {
1011
magic: Magic<Self>,
11-
task: TaskRef,
1212
time: EmulatedTime,
13-
src_host_id: HostId,
14-
dst_host_id: HostId,
15-
src_host_event_id: u64,
13+
data: EventData,
1614
_counter: ObjectCounter,
1715
}
1816

1917
impl Event {
20-
pub fn new(task: TaskRef, time: EmulatedTime, src_host: &Host, dst_host_id: HostId) -> Self {
18+
/// A new packet event, which is an event for packets arriving from the Internet. Packet events
19+
/// do not include packets on localhost.
20+
pub fn new_packet(packet: Packet, time: EmulatedTime, src_host: &Host) -> Self {
2121
Self {
2222
magic: Magic::new(),
23-
task,
2423
time,
25-
src_host_id: src_host.id(),
26-
dst_host_id,
27-
src_host_event_id: src_host.get_new_event_id(),
24+
data: EventData::Packet(PacketEventData {
25+
packet,
26+
src_host_id: src_host.id(),
27+
src_host_event_id: src_host.get_new_event_id(),
28+
}),
2829
_counter: ObjectCounter::new("Event"),
2930
}
3031
}
3132

32-
pub fn execute(self, host: &Host) {
33-
self.magic.debug_check();
34-
35-
// make sure we're executing on the correct host
36-
assert_eq!(self.host_id(), host.id());
37-
38-
host.continue_execution_timer();
39-
self.task.execute(host);
40-
host.stop_execution_timer();
33+
/// A new local event, which is an event that was generated locally by the host itself (timers,
34+
/// localhost packets, etc).
35+
pub fn new_local(task: TaskRef, time: EmulatedTime, host: &Host) -> Self {
36+
Self {
37+
magic: Magic::new(),
38+
time,
39+
data: EventData::Local(LocalEventData {
40+
task,
41+
event_id: host.get_new_event_id(),
42+
}),
43+
_counter: ObjectCounter::new("Event"),
44+
}
4145
}
4246

4347
pub fn time(&self) -> EmulatedTime {
4448
self.magic.debug_check();
4549
self.time
4650
}
4751

48-
pub fn host_id(&self) -> HostId {
52+
pub fn set_time(&mut self, time: EmulatedTime) {
4953
self.magic.debug_check();
50-
self.dst_host_id
54+
self.time = time;
5155
}
5256

53-
pub fn set_time(&mut self, time: EmulatedTime) {
57+
/// The event data.
58+
pub fn data(self) -> EventData {
5459
self.magic.debug_check();
55-
self.time = time;
60+
self.data
5661
}
5762
}
5863

@@ -61,30 +66,107 @@ impl PartialEq for Event {
6166
self.magic.debug_check();
6267
other.magic.debug_check();
6368

69+
fn check_impl_eq(_: impl Eq) {}
70+
71+
// below we impl Eq for Event, so we should make sure that all of our comparisons below are
72+
// also Eq
73+
check_impl_eq(self.time);
74+
check_impl_eq(&self.data);
75+
6476
// check every field except '_counter'
65-
self.task == other.task
66-
&& self.time == other.time
67-
&& self.src_host_id == other.src_host_id
68-
&& self.dst_host_id == other.dst_host_id
69-
&& self.src_host_event_id == other.src_host_event_id
77+
self.time == other.time && self.data == other.data
7078
}
7179
}
7280

81+
// we checked above that Event's `PartialEq` impl is also `Eq`
7382
impl Eq for Event {}
7483

7584
impl PartialOrd for Event {
7685
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
7786
self.magic.debug_check();
7887
other.magic.debug_check();
7988

80-
// sort by event time first, then use other fields we're able to compare
89+
// sort by event time, then by the event data
90+
let cmp = self.time.cmp(&other.time);
91+
92+
if cmp.is_ne() {
93+
Some(cmp)
94+
} else {
95+
// event times were equal
96+
self.data.partial_cmp(&other.data)
97+
}
98+
}
99+
}
100+
101+
/// Data for an event. Different event types will contain different data.
102+
#[derive(Debug, PartialEq, Eq, PartialOrd)]
103+
pub enum EventData {
104+
// IMPORTANT: The order of these enum variants is important and deliberate. The `PartialOrd`
105+
// derive affects the order of events in the event queue, and therefore which events are
106+
// processed before others (packet events will be processed before local events), and changing
107+
// this could significantly affect the simulation, possibly leading to incorrect behaviour.
108+
Packet(PacketEventData),
109+
Local(LocalEventData),
110+
}
111+
112+
#[derive(Debug, PartialEq, Eq)]
113+
pub struct PacketEventData {
114+
packet: Packet,
115+
src_host_id: HostId,
116+
src_host_event_id: u64,
117+
}
118+
119+
#[derive(Debug, PartialEq, Eq)]
120+
pub struct LocalEventData {
121+
task: TaskRef,
122+
event_id: u64,
123+
}
124+
125+
impl PacketEventData {
126+
/// Get the packet contained within this event.
127+
pub fn packet(self) -> Packet {
128+
self.packet
129+
}
130+
}
131+
132+
impl PartialOrd for PacketEventData {
133+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
134+
// sort by src host ID, then by event ID
81135
let cmp = self
82-
.time
83-
.cmp(&other.time)
84-
.then_with(|| self.dst_host_id.cmp(&other.dst_host_id))
85-
.then_with(|| self.src_host_id.cmp(&other.src_host_id))
136+
.src_host_id
137+
.cmp(&other.src_host_id)
86138
.then_with(|| self.src_host_event_id.cmp(&other.src_host_event_id));
87139

140+
// if the above fields were all equal (this should ideally not occur in practice since it
141+
// leads to non-determinism, but we handle it anyways)
142+
if cmp == std::cmp::Ordering::Equal {
143+
if self.packet != other.packet {
144+
// packets are not equal, so the events must not be equal
145+
assert_ne!(self, other);
146+
// we have nothing left to order them by
147+
return None;
148+
}
149+
150+
// packets are equal, so the events must be equal
151+
assert_eq!(self, other);
152+
}
153+
154+
Some(cmp)
155+
}
156+
}
157+
158+
impl LocalEventData {
159+
/// Get the task contained within this event.
160+
pub fn task(self) -> TaskRef {
161+
self.task
162+
}
163+
}
164+
165+
impl PartialOrd for LocalEventData {
166+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
167+
// they are local events and should be on the same host, so we can just sort by event ID
168+
let cmp = self.event_id.cmp(&other.event_id);
169+
88170
// if the above fields were all equal (this should ideally not occur in practice since it
89171
// leads to non-determinism, but we handle it anyways)
90172
if cmp == std::cmp::Ordering::Equal {

src/main/core/worker.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use atomic_refcell::{AtomicRef, AtomicRefCell};
2-
use crossbeam::atomic::AtomicCell;
32
use once_cell::sync::Lazy;
43
use rand::Rng;
54
use shadow_shim_helper_rs::rootedcell::rc::RootedRc;
@@ -10,7 +9,6 @@ use crate::core::scheduler::runahead::Runahead;
109
use crate::core::sim_config::Bandwidth;
1110
use crate::core::sim_stats::{LocalSimStats, SharedSimStats};
1211
use crate::core::work::event::Event;
13-
use crate::core::work::task::TaskRef;
1412
use crate::cshadow;
1513
use crate::host::host::Host;
1614
use crate::host::process::{Process, ProcessId};
@@ -395,7 +393,6 @@ impl Worker {
395393
}
396394

397395
let delay = Worker::with(|w| w.shared.latency(src_ip, dst_ip).unwrap()).unwrap();
398-
let deliver_time = current_time + delay;
399396

400397
Worker::update_lowest_used_latency(delay);
401398
Worker::with(|w| w.shared.increment_packet_count(src_ip, dst_ip)).unwrap();
@@ -412,28 +409,22 @@ impl Worker {
412409

413410
// copy the packet
414411
let packet = Packet::from_raw(unsafe { cshadow::packet_copy(packet) });
415-
let packet = Arc::new(AtomicCell::new(Some(packet)));
416-
417-
let packet_task = TaskRef::new(move |host| {
418-
let packet = packet.take().expect("Packet task ran twice");
419-
host.upstream_router_borrow_mut()
420-
.route_incoming_packet(packet);
421-
host.notify_router_has_packets();
422-
});
423-
424-
let mut packet_event = Event::new(packet_task, deliver_time, src_host, dst_host_id);
425412

426413
// delay the packet until the next round
414+
let mut deliver_time = current_time + delay;
427415
if deliver_time < round_end_time {
428-
packet_event.set_time(round_end_time);
416+
deliver_time = round_end_time;
429417
}
430418

431419
// we may have sent this packet after the destination host finished running the current
432420
// round and calculated its min event time, so we put this in our min event time instead
433-
Worker::update_next_event_time(packet_event.time());
421+
Worker::update_next_event_time(deliver_time);
434422

435-
debug_assert!(packet_event.time() >= round_end_time);
436-
Worker::with(|w| w.shared.push_to_host(dst_host_id, packet_event)).unwrap();
423+
Worker::with(|w| {
424+
w.shared
425+
.push_packet_to_host(packet, dst_host_id, deliver_time, src_host)
426+
})
427+
.unwrap();
437428
}
438429

439430
// Runs `f` with a shared reference to the current thread's Worker. Returns
@@ -527,6 +518,7 @@ pub struct WorkerShared {
527518
// calculates the runahead for the next simulation round
528519
pub runahead: Runahead,
529520
pub child_pid_watcher: ChildPidWatcher,
521+
/// Event queues for each host. This should only be used to push packet events.
530522
pub event_queues: HashMap<HostId, Arc<Mutex<EventQueue>>>,
531523
pub bootstrap_end_time: EmulatedTime,
532524
pub sim_end_time: EmulatedTime,
@@ -633,8 +625,17 @@ impl WorkerShared {
633625
&self.child_pid_watcher
634626
}
635627

636-
pub fn push_to_host(&self, host: HostId, event: Event) {
637-
let event_queue = self.event_queues.get(&host).unwrap();
628+
/// Push a packet to the destination host's event queue. Does not check that the time is valid
629+
/// (is outside of the current scheduling round, etc).
630+
pub fn push_packet_to_host(
631+
&self,
632+
packet: Packet,
633+
dst_host_id: HostId,
634+
time: EmulatedTime,
635+
src_host: &Host,
636+
) {
637+
let event = Event::new_packet(packet, time, src_host);
638+
let event_queue = self.event_queues.get(&dst_host_id).unwrap();
638639
event_queue.lock().unwrap().push(event);
639640
}
640641
}

src/main/host/host.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::core::support::configuration::QDiscMode;
2-
use crate::core::work::event::Event;
2+
use crate::core::work::event::{Event, EventData};
33
use crate::core::work::event_queue::EventQueue;
44
use crate::core::work::task::TaskRef;
55
use crate::core::worker::Worker;
@@ -581,7 +581,7 @@ impl Host {
581581
}
582582

583583
pub fn schedule_task_at_emulated_time(&self, task: TaskRef, t: EmulatedTime) -> bool {
584-
let event = Event::new(task, t, self, self.id());
584+
let event = Event::new_local(task, t, self);
585585
self.push_local_event(event)
586586
}
587587

@@ -693,7 +693,16 @@ impl Host {
693693

694694
// run the event
695695
Worker::set_current_time(event.time());
696-
event.execute(self);
696+
self.continue_execution_timer();
697+
match event.data() {
698+
EventData::Packet(data) => {
699+
self.upstream_router_borrow_mut()
700+
.route_incoming_packet(data.packet());
701+
self.notify_router_has_packets();
702+
}
703+
EventData::Local(data) => data.task().execute(self),
704+
}
705+
self.stop_execution_timer();
697706
Worker::clear_current_time();
698707
}
699708
}

0 commit comments

Comments
 (0)