Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions Framework/Core/include/Framework/MessageSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@ struct MessageSet {
};
// linear storage of messages
std::vector<FairMQMessagePtr> messages;
// message map descibes O2 messages consisting of a header message and
// payload message(s), index descibes position in the linear storage
// message map describes O2 messages consisting of a header message and
// payload message(s), index describes position in the linear storage
std::vector<Index> messageMap;
// pair map describes all messages in header-payload pairs and where
// in the message index the an associated header and payload can be found
std::vector<std::pair<size_t, size_t>> pairMap;
// pair map describes all messages in one sequence of header-payload pairs and
// where in the message index the associated header and payload can be found
struct PairMapping {
PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
// O2 message where the pair is located in
size_t partIndex = 0;
// payload index within the O2 message
size_t payloadIndex = 0;
};
std::vector<PairMapping> pairMap;

MessageSet()
: messages(), messageMap(), pairMap()
Expand Down Expand Up @@ -161,13 +168,13 @@ struct MessageSet {

FairMQMessagePtr const& associatedHeader(size_t pos) const
{
return messages[messageMap[pairMap[pos].first].position];
return messages[messageMap[pairMap[pos].partIndex].position];
}

FairMQMessagePtr const& associatedPayload(size_t pos) const
{
auto partIndex = pairMap[pos].first;
auto payloadIndex = pairMap[pos].second;
auto partIndex = pairMap[pos].partIndex;
auto payloadIndex = pairMap[pos].payloadIndex;
return messages[messageMap[partIndex].position + payloadIndex + 1];
}
};
Expand Down
21 changes: 17 additions & 4 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
LOGP(error, "Header is not a DataHeader?");
continue;
}
if (dh->payloadSize != parts.At(pi + 1)->GetSize()) {
if (dh->payloadSize > parts.At(pi + 1)->GetSize()) {
insertInputInfo(pi, 0, InputType::Invalid);
LOGP(error, "DataHeader payloadSize mismatch");
continue;
Expand All @@ -960,7 +960,12 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
LOGP(error, "Header stack does not contain DataProcessingHeader");
continue;
}
{
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
// this is indicating a sequence of payloads following the header
// FIXME: we will probably also set the DataHeader version
insertInputInfo(pi, dh->splitPayloadParts + 1, InputType::Data);
pi += dh->splitPayloadParts - 1;
} else {
// We can set the type for the next splitPayloadParts
// because we are guaranteed they are all the same.
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
Expand Down Expand Up @@ -1001,7 +1006,11 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
auto headerIndex = input.position;
auto nMessages = 0;
auto nPayloadsPerHeader = 0;
{
if (input.size > 2) {
// header and multiple payload sequence
nMessages = input.size;
nPayloadsPerHeader = nMessages - 1;
} else {
// multiple header-payload pairs
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
Expand Down Expand Up @@ -1213,6 +1222,9 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
assert(record.size() == currentSetOfInputs.size());
for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
// assuming that for timer inputs we do have exactly one PartRef object
// in the MessageSet, multiple PartRef Objects are only possible for either
// split payload messages of wildcard matchers, both for data inputs
DataRef input = record.getByPos(ii);
if (input.spec->lifetime != Lifetime::Timer) {
continue;
Expand Down Expand Up @@ -1327,7 +1339,8 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
}
// We need to find the forward route only for the first
// part of a split payload. All the others will use the same.
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1) {
// but always check if we have a sequence of multiple payloads
if (fdh->splitPayloadIndex == 0 || fdh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 1) {
cachedForwardingChoice = -1;
for (size_t fi = 0; fi < spec->forwards.size(); fi++) {
auto& forward = spec->forwards[fi];
Expand Down
3 changes: 1 addition & 2 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ DataRelayer::RelayChoice
cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
// TODO: make sure that multiple parts can only be added within the same call of
// DataRelayer::relay
assert(nPayloads == 1);
assert(nMessages % 2 == 0);
assert(nPayloads > 0);
for (size_t mi = 0; mi < nMessages; ++mi) {
assert(mi + nPayloads < nMessages);
target.add([&messages, &mi](size_t i) -> FairMQMessagePtr& { return messages[mi + i]; }, nPayloads + 1);
Expand Down
120 changes: 120 additions & 0 deletions Framework/Core/test/test_DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,123 @@ BOOST_AUTO_TEST_CASE(SplitParts)
BOOST_CHECK_NE(header2.get(), nullptr);
BOOST_CHECK_NE(payload2.get(), nullptr);
}

BOOST_AUTO_TEST_CASE(SplitPayloadPairs)
{
Monitoring metrics;
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};

std::vector<InputRoute> inputs = {
InputRoute{spec1, 0, "Fake1", 0},
};

std::vector<ForwardRoute> forwards;
TimesliceIndex index{1};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
DataRelayer relayer(policy, inputs, metrics, index);
relayer.setPipelineLength(4);

DataHeader dh{"CLUSTERS", "TPC", 0};

auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
size_t timeslice = 0;

const int nSplitParts = 100;
std::vector<std::unique_ptr<FairMQMessage>> splitParts;
splitParts.reserve(2 * nSplitParts);

for (size_t i = 0; i < nSplitParts; ++i) {
dh.splitPayloadIndex = i;
dh.splitPayloadParts = nSplitParts;

FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
FairMQMessagePtr payload = transport->CreateMessage(100);

splitParts.emplace_back(std::move(header));
splitParts.emplace_back(std::move(payload));
}
BOOST_REQUIRE_EQUAL(splitParts.size(), 2 * nSplitParts);

relayer.relay(splitParts[0]->GetData(), splitParts.data(), splitParts.size());
std::vector<RecordAction> ready;
relayer.getReadyToProcess(ready);
BOOST_REQUIRE_EQUAL(ready.size(), 1);
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
// we have one input route and thus one message set containing pairs for all
// payloads
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
BOOST_CHECK_EQUAL(messageSet[0].size(), nSplitParts);
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(0), 1);
}

BOOST_AUTO_TEST_CASE(SplitPayloadSequence)
{
Monitoring metrics;
InputSpec spec1{"clusters", "TST", "COUNTER"};

std::vector<InputRoute> inputs = {
InputRoute{spec1, 0, "Fake1", 0},
};

std::vector<ForwardRoute> forwards;
TimesliceIndex index{1};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
DataRelayer relayer(policy, inputs, metrics, index);
relayer.setPipelineLength(4);

auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
size_t timeslice = 0;

std::vector<size_t> sequenceSize;
size_t nTotalPayloads = 0;

auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
std::vector<std::unique_ptr<FairMQMessage>> messages;
messages.reserve(nPayloads + 1);
DataHeader dh{"COUNTER", "TST", 0};

// one header with index set to the number of split parts indicates sequence
// of payloads without additional headers
dh.splitPayloadIndex = nPayloads;
dh.splitPayloadParts = nPayloads;
FairMQMessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
messages.emplace_back(std::move(header));

for (size_t i = 0; i < nPayloads; ++i) {
messages.emplace_back(transport->CreateMessage(100));
*(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
++nTotalPayloads;
}
BOOST_CHECK_EQUAL(messages.size(), nPayloads + 1);
relayer.relay(messages[0]->GetData(), messages.data(), messages.size(), nPayloads);
sequenceSize.emplace_back(nPayloads);
};
createSequence(100);
createSequence(1);
createSequence(42);

std::vector<RecordAction> ready;
relayer.getReadyToProcess(ready);
BOOST_REQUIRE_EQUAL(ready.size(), 1);
BOOST_REQUIRE_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
// we have one input route
BOOST_REQUIRE_EQUAL(messageSet.size(), 1);
// one message set containing number of added sequences of messages
BOOST_REQUIRE_EQUAL(messageSet[0].size(), sequenceSize.size());
size_t counter = 0;
for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
BOOST_CHECK_EQUAL(messageSet[0].getNumberOfPayloads(seqid), sequenceSize[seqid]);
for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
BOOST_REQUIRE(messageSet[0].payload(seqid, pi));
auto const* data = messageSet[0].payload(seqid, pi)->GetData();
BOOST_CHECK_EQUAL(*(reinterpret_cast<size_t const*>(data)), counter);
++counter;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
outputs.make<int>(OutputRef{"allocator", 0}) = counter;

if (channelName.empty()) {
OutputSpec const query{"TST", "PAIR", 0};
OutputSpec const query{"TST", "SEQUENCE", 0};
auto outputRoutes = rds.spec().outputs;
for (auto& route : outputRoutes) {
if (DataSpecUtils::match(route.matcher, query)) {
Expand Down Expand Up @@ -161,7 +161,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
}
};

//createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0});
createSequence(attributes->distrib(attributes->gen), DataHeader{"SEQUENCE", "TST", 0});
createPairs(counter + 1, DataHeader{"PAIR", "TST", 0});

// using utility from ExternalFairMQDeviceProxy
Expand All @@ -178,6 +178,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
workflow.emplace_back(DataProcessorSpec{"producer",
{InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
{OutputSpec{{"pair"}, "TST", "PAIR", 0, Lifetime::Timeframe},
OutputSpec{{"sequence"}, "TST", "SEQUENCE", 0, Lifetime::Timeframe},
OutputSpec{{"allocator"}, "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
AlgorithmSpec{adaptStateless(producerCallback)},
{ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}});
Expand Down Expand Up @@ -267,6 +268,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)

workflow.emplace_back(DataProcessorSpec{"consumer",
{InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
{},
AlgorithmSpec{adaptStateful(consumerInit)}});
Expand All @@ -276,6 +278,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
//
workflow.emplace_back(DataProcessorSpec{"spectator",
{InputSpec{"pairin", "TST", "PAIR", 0, Lifetime::Timeframe},
InputSpec{"sequencein", "TST", "SEQUENCE", 0, Lifetime::Timeframe},
InputSpec{"dpldefault", "TST", "ALLOCATOR", 0, Lifetime::Timeframe}},
{},
AlgorithmSpec{adaptStateful(consumerInit)}});
Expand Down