Skip to content

Commit db01b62

Browse files
Extended test of payload sequences
1 parent bd6b250 commit db01b62

1 file changed

Lines changed: 40 additions & 22 deletions

File tree

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ BOOST_AUTO_TEST_CASE(SplitPayloadPairs)
666666
BOOST_AUTO_TEST_CASE(SplitPayloadSequence)
667667
{
668668
Monitoring metrics;
669-
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
669+
InputSpec spec1{"clusters", "TST", "COUNTER"};
670670

671671
std::vector<InputRoute> inputs = {
672672
InputRoute{spec1, 0, "Fake1", 0},
@@ -679,37 +679,55 @@ BOOST_AUTO_TEST_CASE(SplitPayloadSequence)
679679
DataRelayer relayer(policy, inputs, metrics, index);
680680
relayer.setPipelineLength(4);
681681

682-
DataHeader dh{"CLUSTERS", "TPC", 0};
683-
684682
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
685-
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
686683
size_t timeslice = 0;
687684

688-
const int nSplitParts = 100;
689-
std::vector<std::unique_ptr<FairMQMessage>> splitParts;
690-
splitParts.reserve(nSplitParts + 1);
685+
std::vector<size_t> sequenceSize;
686+
size_t nTotalPayloads = 0;
691687

692-
// one header with index set to the number of split parts indicates sequence
693-
// of payloads without additional headers
694-
dh.splitPayloadIndex = nSplitParts;
695-
dh.splitPayloadParts = nSplitParts;
696-
FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
697-
splitParts.emplace_back(std::move(header));
688+
auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
689+
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
690+
std::vector<std::unique_ptr<FairMQMessage>> messages;
691+
messages.reserve(nPayloads + 1);
692+
DataHeader dh{"COUNTER", "TST", 0};
698693

699-
for (size_t i = 0; i < nSplitParts; ++i) {
700-
splitParts.emplace_back(transport->CreateMessage(100));
701-
*(reinterpret_cast<size_t*>(splitParts.back()->GetData())) = i;
702-
}
703-
BOOST_REQUIRE_EQUAL(splitParts.size(), nSplitParts + 1);
694+
// one header with index set to the number of split parts indicates sequence
695+
// of payloads without additional headers
696+
dh.splitPayloadIndex = nPayloads;
697+
dh.splitPayloadParts = nPayloads;
698+
FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
699+
messages.emplace_back(std::move(header));
700+
701+
for (size_t i = 0; i < nPayloads; ++i) {
702+
messages.emplace_back(transport->CreateMessage(100));
703+
*(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
704+
++nTotalPayloads;
705+
}
706+
BOOST_CHECK_EQUAL(messages.size(), nPayloads + 1);
707+
relayer.relay(messages[0]->GetData(), messages.data(), messages.size(), nPayloads);
708+
sequenceSize.emplace_back(nPayloads);
709+
};
710+
createSequence(100);
711+
createSequence(1);
712+
createSequence(42);
704713

705-
relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size(), nSplitParts);
706714
std::vector<RecordAction> ready;
707715
relayer.getReadyToProcess(ready);
708716
BOOST_REQUIRE_EQUAL(ready.size(), 1);
709717
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
710718
auto messageSet = relayer.getInputsForTimeslice(ready[0].slot);
711-
// we have one input route and thus one message set containing one sequence of messages
719+
// we have one input route
712720
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
713-
BOOST_REQUIRE_EQUAL(messageSet[0].size(), 1);
714-
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 100);
721+
// one message set containing number of added sequences of messages
722+
BOOST_REQUIRE_EQUAL(messageSet[0].size(), sequenceSize.size());
723+
size_t counter = 0;
724+
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
725+
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(seqid), sequenceSize[seqid]);
726+
for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
727+
BOOST_REQUIRE(messageSet[0].payload(seqid, pi));
728+
auto const* data = messageSet[0].payload(seqid, pi)->GetData();
729+
BOOST_CHECK_EQUAL(*(reinterpret_cast<size_t const*>(data)), counter);
730+
++counter;
731+
}
732+
}
715733
}

0 commit comments

Comments
 (0)