Skip to content

Commit 3abec09

Browse files
activating handling of variable payload sequences
1 parent db01b62 commit 3abec09

1 file changed

Lines changed: 33 additions & 18 deletions

File tree

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -932,20 +932,28 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
932932
LOGP(error, "Header stack does not contain DataProcessingHeader");
933933
continue;
934934
}
935-
// We can set the type for the next splitPayloadParts
936-
// because we are guaranteed they are all the same.
937-
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
938-
// pair.
939-
size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
940-
if (finalSplitPayloadIndex > parts.Size()) {
941-
LOGP(error, "DataHeader::splitPayloadParts invalid");
942-
results.back().type = InputType::Invalid;
943-
continue;
944-
}
945-
results.back().type = InputType::Data;
946-
results.back().size = 2;
947-
for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
948-
results.emplace_back(pi + 2, 2, InputType::Data);
935+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
936+
// this is indicating a sequence of payloads following the header
937+
// FIXME: we will probably also set the DataHeader version
938+
results.back().type = InputType::Data;
939+
results.back().size = dh->splitPayloadParts;
940+
pi += dh->splitPayloadParts - 1;
941+
} else {
942+
// We can set the type for the next splitPayloadParts
943+
// because we are guaranteed they are all the same.
944+
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
945+
// pair.
946+
size_t finalSplitPayloadIndex = pi + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
947+
if (finalSplitPayloadIndex > parts.Size()) {
948+
LOGP(error, "DataHeader::splitPayloadParts invalid");
949+
results.back().type = InputType::Invalid;
950+
continue;
951+
}
952+
results.back().type = InputType::Data;
953+
results.back().size = 2;
954+
for (; pi + 2 < finalSplitPayloadIndex; pi += 2) {
955+
results.emplace_back(pi + 2, 2, InputType::Data);
956+
}
949957
}
950958
}
951959
// FIXME: this is an intermediate check, remove it for supporting O2 messages with
@@ -970,12 +978,19 @@ void DataProcessingDevice::handleData(DataProcessorContext& context, InputChanne
970978
switch (input.type) {
971979
case InputType::Data: {
972980
auto headerIndex = input.position;
973-
auto payloadIndex = input.position + 1;
974-
assert(payloadIndex < parts.Size());
975-
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
981+
auto nMessages = 0;
982+
if (input.size > 1) {
983+
// header and multiple payload sequence
984+
nMessages = input.size + 1;
985+
} else {
986+
// multiple header-payload pairs
987+
auto dh = o2::header::get<DataHeader*>(parts.At(headerIndex)->GetData());
988+
nMessages = dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2;
989+
}
976990
auto relayed = relayer.relay(parts.At(headerIndex)->GetData(),
977991
&parts.At(headerIndex),
978-
dh->splitPayloadParts > 0 ? dh->splitPayloadParts * 2 : 2);
992+
nMessages,
993+
input.size);
979994
switch (relayed) {
980995
case DataRelayer::Backpressured:
981996
policy.backpressure(info);

0 commit comments

Comments
 (0)