Skip to content

Commit c8cd9de

Browse files
fixup! Generalizing DataRelayer to relay an array of messages
1 parent 6823a83 commit c8cd9de

3 files changed

Lines changed: 15 additions & 17 deletions

File tree

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,15 @@ class DataRelayer
9191
/// @a rawHeader raw header pointer
9292
/// @a messages pointer to array of messages
9393
/// @a nMessages size of the array
94-
/// @a createPairs create PartRef objects with header-payload pairs if true
95-
/// or one PartRef object with multiple payloads
96-
/// is the header which is common across all subsequent elements.
94+
/// @a nPayloads number of payploads in the message sequence, default is 1
95+
/// which is the standard header-payload message pair, in this
96+
/// case nMessages / 2 pairs will be inserted and considered
97+
/// separate parts
9798
/// Notice that we expect that the header is an O2 Header Stack
9899
RelayChoice relay(void const* rawHeader,
99100
std::unique_ptr<FairMQMessage>* messages,
100101
size_t nMessages,
101-
bool createPairs = true);
102+
size_t nPayloads = 1);
102103

103104
/// @returns the actions ready to be performed.
104105
void getReadyToProcess(std::vector<RecordAction>& completed);

Framework/Core/src/DataRelayer.cxx

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ DataRelayer::RelayChoice
204204
DataRelayer::relay(void const* rawHeader,
205205
std::unique_ptr<FairMQMessage>* messages,
206206
size_t nMessages,
207-
bool createPairs)
207+
size_t nPayloads)
208208
{
209209
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
210210
DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
@@ -280,7 +280,7 @@ DataRelayer::RelayChoice
280280
auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
281281
&messages,
282282
&nMessages,
283-
&createPairs,
283+
&nPayloads,
284284
&cache,
285285
&numInputTypes,
286286
&metrics](TimesliceId timeslice, int input, TimesliceSlot slot) {
@@ -289,13 +289,11 @@ DataRelayer::RelayChoice
289289
cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
290290
// TODO: make sure that multiple parts can only be added within the same call of
291291
// DataRelayer::relay
292-
// depending on message model, add pairs or sequences with variable number of payloads
293-
if (createPairs) {
294-
for (size_t pi = 0; pi < nMessages / 2; ++pi) {
295-
target.add([&messages, &pi](size_t i) -> FairMQMessagePtr& {return messages[pi * 2 + i];}, 2);
296-
}
297-
} else {
298-
target.add([&messages](size_t i) -> FairMQMessagePtr& {return messages[i];}, nMessages);
292+
assert(nPayloads > 0);
293+
for (size_t mi = 0; mi < nMessages; ++mi) {
294+
assert(mi + nPayloads < nMessages);
295+
target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& {return messages[mi + i];}, nPayloads + 1);
296+
mi += nPayloads;
299297
}
300298
};
301299

@@ -516,8 +514,6 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
516514
return DataRef{};
517515
};
518516
auto nPartsGetter = [&partial](size_t idx) {
519-
// with PartRef holding potentially multiple payload messages,
520-
// one would need to accumulate the result, think about caching
521517
return partial[idx].size();
522518
};
523519
InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -698,16 +698,17 @@ BOOST_AUTO_TEST_CASE(SplitPayloadSequence)
698698

699699
for (size_t i = 0; i < nSplitParts; ++i) {
700700
splitParts.emplace_back(transport->CreateMessage(100));
701+
*(reinterpret_cast<size_t*>(splitParts.back()->GetData())) = i;
701702
}
702703
BOOST_REQUIRE_EQUAL(splitParts.size(), nSplitParts + 1);
703704

704-
relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size(), false);
705+
relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size(), nSplitParts);
705706
std::vector<RecordAction> ready;
706707
relayer.getReadyToProcess(ready);
707708
BOOST_REQUIRE_EQUAL(ready.size(), 1);
708709
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
709710
auto messageSet = relayer.getInputsForTimeslice(ready[0].slot);
710-
// we have one input route and thus one message set containing one PartRef
711+
// we have one input route and thus one message set containing one sequence of messages
711712
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
712713
BOOST_REQUIRE_EQUAL(messageSet[0].size(), 1);
713714
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 100);

0 commit comments

Comments
 (0)