1515#include " Framework/CompletionPolicyHelpers.h"
1616#include " Framework/DataRelayer.h"
1717#include " Framework/DataProcessingHeader.h"
18+ #include " Framework/DataProcessingStates.h"
19+ #include " Framework/DataProcessingStats.h"
20+ #include " Framework/DeviceState.h"
21+ #include " Framework/DriverConfig.h"
22+ #include " Framework/ServiceRegistryHelpers.h"
23+ #include " Framework/TimingHelpers.h"
1824#include < Monitoring/Monitoring.h>
1925#include < fairmq/TransportFactory.h>
2026#include < cstring>
2127#include < vector>
28+ #include < uv.h>
2229
2330using Monitoring = o2::monitoring::Monitoring;
2431using namespace o2 ::framework;
2532using DataHeader = o2::header::DataHeader;
2633using Stack = o2::header::Stack;
2734using RecordAction = o2::framework::DataRelayer::RecordAction;
2835
36+ struct BenchmarkServices {
37+ Monitoring monitoring;
38+ const DriverConfig driverConfig{.batch = false };
39+ DataProcessingStates states{
40+ TimingHelpers::defaultRealtimeBaseConfigurator (0 , uv_default_loop ()),
41+ TimingHelpers::defaultCPUTimeConfigurator (uv_default_loop ())};
42+ DataProcessingStats stats{
43+ TimingHelpers::defaultRealtimeBaseConfigurator (0 , uv_default_loop ()),
44+ TimingHelpers::defaultCPUTimeConfigurator (uv_default_loop ()), {}};
45+ DeviceState deviceState;
46+ ServiceRegistry registry;
47+
48+ ServiceRegistryRef ref ()
49+ {
50+ using MetricSpec = DataProcessingStats::MetricSpec;
51+ int quickUpdateInterval = 1 ;
52+ std::vector<MetricSpec> specs{
53+ MetricSpec{.name = " malformed_inputs" , .metricId = static_cast <short >(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
54+ MetricSpec{.name = " dropped_computations" , .metricId = static_cast <short >(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
55+ MetricSpec{.name = " dropped_incoming_messages" , .metricId = static_cast <short >(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
56+ MetricSpec{.name = " relayed_messages" , .metricId = static_cast <short >(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
57+ for (auto & spec : specs) {
58+ stats.registerMetric (spec);
59+ }
60+
61+ ServiceRegistryRef r{registry};
62+ r.registerService (ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
63+ r.registerService (ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
64+ r.registerService (ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
65+ r.registerService (ServiceRegistryHelpers::handleForService<DriverConfig const >(&driverConfig));
66+ r.registerService (ServiceRegistryHelpers::handleForService<DeviceState>(&deviceState));
67+ return r;
68+ }
69+ };
70+
2971// a simple benchmark of the contribution of the pure message creation
3072// this was important when the benchmarks below included the message
3173// creation inside the benchmark loop, its somewhat obsolete now but
@@ -54,7 +96,7 @@ BENCHMARK(BM_RelayMessageCreation);
5496// and the subsequent InputRecord is immediately requested.
5597static void BM_RelaySingleSlot (benchmark::State& state)
5698{
57- Monitoring metrics ;
99+ BenchmarkServices services ;
58100 InputSpec spec{" clusters" , " TPC" , " CLUSTERS" };
59101
60102 std::vector<InputRoute> inputs = {
@@ -64,8 +106,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
64106 std::vector<InputChannelInfo> infos{1 };
65107 TimesliceIndex index{1 , infos};
66108 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
67- ServiceRegistry registry;
68- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
109+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
69110 relayer.setPipelineLength (4 );
70111
71112 // Let's create a dummy O2 Message with two headers in the stack:
@@ -106,7 +147,7 @@ BENCHMARK(BM_RelaySingleSlot);
106147// This one will simulate a single input.
107148static void BM_RelayMultipleSlots (benchmark::State& state)
108149{
109- Monitoring metrics ;
150+ BenchmarkServices services ;
110151 InputSpec spec{" clusters" , " TPC" , " CLUSTERS" };
111152
112153 std::vector<InputRoute> inputs = {
@@ -117,8 +158,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
117158 TimesliceIndex index{1 , infos};
118159
119160 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
120- ServiceRegistry registry;
121- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
161+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
122162 relayer.setPipelineLength (4 );
123163
124164 // Let's create a dummy O2 Message with two headers in the stack:
@@ -163,7 +203,7 @@ BENCHMARK(BM_RelayMultipleSlots);
163203// / In this case we have a record with two entries
164204static void BM_RelayMultipleRoutes (benchmark::State& state)
165205{
166- Monitoring metrics ;
206+ BenchmarkServices services ;
167207 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
168208 InputSpec spec2{" tracks" , " TPC" , " TRACKS" };
169209
@@ -176,8 +216,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
176216 TimesliceIndex index{1 , infos};
177217
178218 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
179- ServiceRegistry registry;
180- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
219+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
181220 relayer.setPipelineLength (4 );
182221
183222 // Let's create a dummy O2 Message with two headers in the stack:
@@ -241,7 +280,7 @@ BENCHMARK(BM_RelayMultipleRoutes);
241280// / In this case we have a record with two entries
242281static void BM_RelaySplitParts (benchmark::State& state)
243282{
244- Monitoring metrics ;
283+ BenchmarkServices services ;
245284 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
246285
247286 std::vector<InputRoute> inputs = {
@@ -253,8 +292,7 @@ static void BM_RelaySplitParts(benchmark::State& state)
253292 TimesliceIndex index{1 , infos};
254293
255294 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
256- ServiceRegistry registry;
257- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
295+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
258296 relayer.setPipelineLength (4 );
259297
260298 // Let's create a dummy O2 Message with two headers in the stack:
@@ -301,7 +339,7 @@ BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
301339
302340static void BM_RelayMultiplePayloads (benchmark::State& state)
303341{
304- Monitoring metrics ;
342+ BenchmarkServices services ;
305343 InputSpec spec1{" clusters" , " TPC" , " CLUSTERS" };
306344
307345 std::vector<InputRoute> inputs = {
@@ -313,8 +351,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state)
313351 TimesliceIndex index{1 , infos};
314352
315353 auto policy = CompletionPolicyHelpers::consumeWhenAny ();
316- ServiceRegistry registry;
317- DataRelayer relayer (policy, inputs, index, {registry}, -1 );
354+ DataRelayer relayer (policy, inputs, index, services.ref (), -1 );
318355 relayer.setPipelineLength (4 );
319356
320357 // DataHeader matching the one provided in the input
0 commit comments