Skip to content

Commit 74dc3e7

Browse files
DPL raw proxy: support for sequence of payloads/pruned headers
This is also simplifying the message processing loop to avoid repeated query matching for split payload message pairs. Extending configuration of DPL injector function, but new configuration features like `paranoid` checking of message consistency still need to be implemented. Time consuming consistency checks are will be optional.
1 parent d803180 commit 74dc3e7

2 files changed

Lines changed: 69 additions & 38 deletions

File tree

Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,29 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
5656
/// multipart ensemble.
5757
InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step);
5858

59+
/// @struct DPLModelAdapterConfig
60+
/// Configuration object for dplModelAdaptor
61+
struct DPLModelAdapterConfig {
62+
/// throw runtime error if an input message is not matched by filter rules
63+
bool throwOnUnmatchedInputs = true;
64+
/// do all kinds of consistency checks
65+
bool paranoid = false;
66+
/// blindly forward on one channel
67+
bool blindForward = false;
68+
};
69+
5970
/// This is to be used when the input data is already formatted like DPL
6071
/// expects it, i.e. with the DataProcessingHeader in the header stack
6172
/// The list of specs is used as a filter list, all incoming data matching an entry
6273
/// in the list will be send through the corresponding channel
6374
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}},
64-
bool throwOnUnmatchedInputs = true);
75+
DPLModelAdapterConfig config = DPLModelAdapterConfig{});
76+
77+
/// legacy function
78+
inline InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs, bool throwOnUnmatchedInputs)
79+
{
80+
return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs});
81+
}
6582

6683
/// The default connection method for the custom source
6784
static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1);

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime,
184184
};
185185
}
186186

187-
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, bool throwOnUnmatchedInputs)
187+
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config)
188188
{
189-
// structure to hald information on the unmatch ed data and print a warning at cleanup
189+
bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs;
190+
// structure to hold information on the unmatched data and print a warning at cleanup
190191
class DroppedDataSpecs
191192
{
192193
public:
@@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
209210
void warning() const
210211
{
211212
if (not descriptions.empty()) {
212-
LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n"
213+
LOG(warning) << "Some input data could not be matched by filter rules to output specs\n"
214+
<< "Active rules: " << descriptions << "\n"
213215
<< "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
214216
}
215217
}
@@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
221223
return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
222224
std::unordered_map<std::string, FairMQParts> outputs;
223225
std::vector<std::string> unmatchedDescriptions;
224-
int lastSplitPartIndex = -1;
225-
std::string channelNameForSplitParts;
226226
static int64_t dplCounter = -1;
227227
dplCounter++;
228-
for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) {
228+
for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
229229
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx * 2)->GetData());
230230
if (!dh) {
231231
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
@@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
241241

242242
OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
243243
LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts;
244-
bool indexDone = false;
244+
size_t finalBlockIndex = 0;
245+
std::string channelName = "";
246+
245247
for (auto const& spec : filterSpecs) {
246248
// filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
247249
if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) ||
248250
DataSpecUtils::match(spec, query)) {
249-
auto channelName = channelRetriever(query, dph->startTime);
251+
channelName = channelRetriever(query, dph->startTime);
250252
if (channelName.empty()) {
251253
LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query);
252-
break;
253-
}
254-
// the checks for consistency of split payload parts are of informative nature
255-
// forwarding happens independently
256-
if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
257-
if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
258-
LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
259-
} else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
260-
LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
261-
<< " of " << dh->splitPayloadParts;
262-
} else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
263-
LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
264-
<< ", matching " << channelName << ", expecting " << channelNameForSplitParts;
265-
}
266-
lastSplitPartIndex = dh->splitPayloadIndex;
267-
channelNameForSplitParts = channelName;
268-
if (lastSplitPartIndex + 1 == dh->splitPayloadParts) {
269-
lastSplitPartIndex = -1;
270-
channelNameForSplitParts = "";
271-
}
272-
} else if (lastSplitPartIndex != -1) {
273-
LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1
274-
<< " but got a new data block";
275254
}
276-
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2)));
277-
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1)));
278-
LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")";
279-
indexDone = true;
280255
break;
281256
}
282257
}
283-
if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
258+
if (!channelName.empty()) {
259+
if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
260+
// this is indicating a sequence of payloads following the header
261+
// FIXME: we will probably also set the DataHeader version
262+
finalBlockIndex = msgidx + dh->splitPayloadParts + 1;
263+
} else {
264+
// We can consider the next splitPayloadParts as one block of messages pairs
265+
// because we are guaranteed they are all the same.
266+
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
267+
// pair.
268+
finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
269+
}
270+
assert(finalBlockIndex >= msgidx + 2);
271+
if (finalBlockIndex > parts.Size()) {
272+
// TODO error handling
273+
//LOGP(error, "DataHeader::splitPayloadParts invalid");
274+
continue;
275+
}
276+
277+
// the checks for consistency of split payload parts are of informative nature
278+
// forwarding happens independently
279+
//if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
280+
// if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
281+
// LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
282+
// } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
283+
// LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
284+
// << " of " << dh->splitPayloadParts;
285+
// } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
286+
// LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
287+
// << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
288+
// }
289+
//}
290+
LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size());
291+
for (; msgidx < finalBlockIndex; ++msgidx) {
292+
outputs[channelName].AddPart(std::move(parts.At(msgidx)));
293+
}
294+
msgidx -= 2;
295+
}
296+
if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
284297
unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query));
285298
}
286-
}
299+
} // end of loop over parts
300+
287301
for (auto& [channelName, channelParts] : outputs) {
288302
if (channelParts.Size() == 0) {
289303
continue;

0 commit comments

Comments
 (0)