Skip to content

Commit 0ea655e

Browse files
committed
DPL: make sure DataRelayer benchmark works again
1 parent 056b5f4 commit 0ea655e

File tree

1 file changed

+52
-15
lines changed

1 file changed

+52
-15
lines changed

Framework/Core/test/benchmark_DataRelayer.cxx

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,59 @@
1515
#include "Framework/CompletionPolicyHelpers.h"
1616
#include "Framework/DataRelayer.h"
1717
#include "Framework/DataProcessingHeader.h"
18+
#include "Framework/DataProcessingStates.h"
19+
#include "Framework/DataProcessingStats.h"
20+
#include "Framework/DeviceState.h"
21+
#include "Framework/DriverConfig.h"
22+
#include "Framework/ServiceRegistryHelpers.h"
23+
#include "Framework/TimingHelpers.h"
1824
#include <Monitoring/Monitoring.h>
1925
#include <fairmq/TransportFactory.h>
2026
#include <cstring>
2127
#include <vector>
28+
#include <uv.h>
2229

2330
using Monitoring = o2::monitoring::Monitoring;
2431
using namespace o2::framework;
2532
using DataHeader = o2::header::DataHeader;
2633
using Stack = o2::header::Stack;
2734
using RecordAction = o2::framework::DataRelayer::RecordAction;
2835

36+
struct BenchmarkServices {
37+
Monitoring monitoring;
38+
const DriverConfig driverConfig{.batch = false};
39+
DataProcessingStates states{
40+
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
41+
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())};
42+
DataProcessingStats stats{
43+
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
44+
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {}};
45+
DeviceState deviceState;
46+
ServiceRegistry registry;
47+
48+
ServiceRegistryRef ref()
49+
{
50+
using MetricSpec = DataProcessingStats::MetricSpec;
51+
int quickUpdateInterval = 1;
52+
std::vector<MetricSpec> specs{
53+
MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
54+
MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
55+
MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
56+
MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
57+
for (auto& spec : specs) {
58+
stats.registerMetric(spec);
59+
}
60+
61+
ServiceRegistryRef r{registry};
62+
r.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
63+
r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
64+
r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
65+
r.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
66+
r.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&deviceState));
67+
return r;
68+
}
69+
};
70+
2971
// a simple benchmark of the contribution of the pure message creation
3072
// this was important when the benchmarks below included the message
3173
// creation inside the benchmark loop, its somewhat obsolete now but
@@ -54,7 +96,7 @@ BENCHMARK(BM_RelayMessageCreation);
5496
// and the subsequent InputRecord is immediately requested.
5597
static void BM_RelaySingleSlot(benchmark::State& state)
5698
{
57-
Monitoring metrics;
99+
BenchmarkServices services;
58100
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
59101

60102
std::vector<InputRoute> inputs = {
@@ -64,8 +106,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
64106
std::vector<InputChannelInfo> infos{1};
65107
TimesliceIndex index{1, infos};
66108
auto policy = CompletionPolicyHelpers::consumeWhenAny();
67-
ServiceRegistry registry;
68-
DataRelayer relayer(policy, inputs, index, {registry}, -1);
109+
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
69110
relayer.setPipelineLength(4);
70111

71112
// Let's create a dummy O2 Message with two headers in the stack:
@@ -106,7 +147,7 @@ BENCHMARK(BM_RelaySingleSlot);
106147
// This one will simulate a single input.
107148
static void BM_RelayMultipleSlots(benchmark::State& state)
108149
{
109-
Monitoring metrics;
150+
BenchmarkServices services;
110151
InputSpec spec{"clusters", "TPC", "CLUSTERS"};
111152

112153
std::vector<InputRoute> inputs = {
@@ -117,8 +158,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
117158
TimesliceIndex index{1, infos};
118159

119160
auto policy = CompletionPolicyHelpers::consumeWhenAny();
120-
ServiceRegistry registry;
121-
DataRelayer relayer(policy, inputs, index, {registry}, -1);
161+
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
122162
relayer.setPipelineLength(4);
123163

124164
// Let's create a dummy O2 Message with two headers in the stack:
@@ -163,7 +203,7 @@ BENCHMARK(BM_RelayMultipleSlots);
163203
/// In this case we have a record with two entries
164204
static void BM_RelayMultipleRoutes(benchmark::State& state)
165205
{
166-
Monitoring metrics;
206+
BenchmarkServices services;
167207
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
168208
InputSpec spec2{"tracks", "TPC", "TRACKS"};
169209

@@ -176,8 +216,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
176216
TimesliceIndex index{1, infos};
177217

178218
auto policy = CompletionPolicyHelpers::consumeWhenAny();
179-
ServiceRegistry registry;
180-
DataRelayer relayer(policy, inputs, index, {registry}, -1);
219+
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
181220
relayer.setPipelineLength(4);
182221

183222
// Let's create a dummy O2 Message with two headers in the stack:
@@ -241,7 +280,7 @@ BENCHMARK(BM_RelayMultipleRoutes);
241280
/// In this case we have a record with two entries
242281
static void BM_RelaySplitParts(benchmark::State& state)
243282
{
244-
Monitoring metrics;
283+
BenchmarkServices services;
245284
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
246285

247286
std::vector<InputRoute> inputs = {
@@ -253,8 +292,7 @@ static void BM_RelaySplitParts(benchmark::State& state)
253292
TimesliceIndex index{1, infos};
254293

255294
auto policy = CompletionPolicyHelpers::consumeWhenAny();
256-
ServiceRegistry registry;
257-
DataRelayer relayer(policy, inputs, index, {registry}, -1);
295+
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
258296
relayer.setPipelineLength(4);
259297

260298
// Let's create a dummy O2 Message with two headers in the stack:
@@ -301,7 +339,7 @@ BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
301339

302340
static void BM_RelayMultiplePayloads(benchmark::State& state)
303341
{
304-
Monitoring metrics;
342+
BenchmarkServices services;
305343
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
306344

307345
std::vector<InputRoute> inputs = {
@@ -313,8 +351,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state)
313351
TimesliceIndex index{1, infos};
314352

315353
auto policy = CompletionPolicyHelpers::consumeWhenAny();
316-
ServiceRegistry registry;
317-
DataRelayer relayer(policy, inputs, index, {registry}, -1);
354+
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
318355
relayer.setPipelineLength(4);
319356

320357
// DataHeader matching the one provided in the input

0 commit comments

Comments
 (0)