Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,29 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, uint64_t startTime
/// multipart ensemble.
InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime, uint64_t step);

/// @struct DPLModelAdapterConfig
/// Configuration object for dplModelAdaptor
struct DPLModelAdapterConfig {
/// throw runtime error if an input message is not matched by filter rules
bool throwOnUnmatchedInputs = true;
/// do all kinds of consistency checks
bool paranoid = false;
/// blindly forward on one channel
bool blindForward = false;
};
Comment thread
matthiasrichter marked this conversation as resolved.
Outdated

/// This is to be used when the input data is already formatted like DPL
/// expects it, i.e. with the DataProcessingHeader in the header stack
/// The list of specs is used as a filter list, all incoming data matching an entry
/// in the list will be send through the corresponding channel
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs = {{header::gDataOriginAny, header::gDataDescriptionAny}},
bool throwOnUnmatchedInputs = true);
DPLModelAdapterConfig config = DPLModelAdapterConfig{});

/// legacy function
inline InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& specs, bool throwOnUnmatchedInputs)
{
return dplModelAdaptor(specs, DPLModelAdapterConfig{throwOnUnmatchedInputs});
}

/// The default connection method for the custom source
static auto gDefaultConverter = incrementalConverter(OutputSpec{"TST", "TEST", 0}, 0, 1);
Expand Down
88 changes: 51 additions & 37 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime,
};
}

InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, bool throwOnUnmatchedInputs)
InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPLModelAdapterConfig config)
{
// structure to hald information on the unmatch ed data and print a warning at cleanup
bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs;
// structure to hold information on the unmatched data and print a warning at cleanup
class DroppedDataSpecs
{
public:
Expand All @@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
void warning() const
{
if (not descriptions.empty()) {
LOG(warning) << "Some input data are not matched by filter rules " << descriptions << "\n"
LOG(warning) << "Some input data could not be matched by filter rules to output specs\n"
<< "Active rules: " << descriptions << "\n"
<< "DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION";
}
}
Expand All @@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
return [filterSpecs = std::move(filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
std::unordered_map<std::string, FairMQParts> outputs;
std::vector<std::string> unmatchedDescriptions;
int lastSplitPartIndex = -1;
std::string channelNameForSplitParts;
static int64_t dplCounter = -1;
dplCounter++;
for (size_t msgidx = 0; msgidx < parts.Size() / 2; ++msgidx) {
for (size_t msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx * 2)->GetData());
if (!dh) {
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
Expand All @@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo

OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
LOG(debug) << "processing " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts;
bool indexDone = false;
size_t finalBlockIndex = 0;
std::string channelName = "";

for (auto const& spec : filterSpecs) {
// filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
if (DataSpecUtils::match(spec, OutputSpec{{header::gDataOriginAny, header::gDataDescriptionAny}}) ||
DataSpecUtils::match(spec, query)) {
auto channelName = channelRetriever(query, dph->startTime);
channelName = channelRetriever(query, dph->startTime);
if (channelName.empty()) {
LOG(warning) << "can not find matching channel, not able to adopt " << DataSpecUtils::describe(query);
break;
}
// the checks for consistency of split payload parts are of informative nature
// forwarding happens independently
if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
} else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
<< " of " << dh->splitPayloadParts;
} else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
<< ", matching " << channelName << ", expecting " << channelNameForSplitParts;
}
lastSplitPartIndex = dh->splitPayloadIndex;
channelNameForSplitParts = channelName;
if (lastSplitPartIndex + 1 == dh->splitPayloadParts) {
lastSplitPartIndex = -1;
channelNameForSplitParts = "";
}
} else if (lastSplitPartIndex != -1) {
LOG(warning) << "found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1
<< " but got a new data block";
}
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2)));
outputs[channelName].AddPart(std::move(parts.At(msgidx * 2 + 1)));
LOG(debug) << "associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size() << ")";
indexDone = true;
break;
}
}
if (indexDone == false && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
if (!channelName.empty()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would invert the logic and have a continue on channelName.empty() == true.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah, can revise the logic. Actually I did not want to continue in the loop because the unmatched specs are logged. But I will combine this, there is room for cleanup in the logic.

if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) {
// this is indicating a sequence of payloads following the header
// FIXME: we will probably also set the DataHeader version
finalBlockIndex = msgidx + dh->splitPayloadParts + 1;
} else {
// We can consider the next splitPayloadParts as one block of messages pairs
// because we are guaranteed they are all the same.
// If splitPayloadParts = 0, we assume that means there is only one (header, payload)
// pair.
finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2;
}
assert(finalBlockIndex >= msgidx + 2);
if (finalBlockIndex > parts.Size()) {
// TODO error handling
//LOGP(error, "DataHeader::splitPayloadParts invalid");
continue;
}

// the checks for consistency of split payload parts are of informative nature
// forwarding happens independently
//if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
// if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
// LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
// } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
// LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
// << " of " << dh->splitPayloadParts;
// } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
// LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
// << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
// }
//}
LOGP(debug, "associating {} part(s) at index {} to channel {} ({})", finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size());
for (; msgidx < finalBlockIndex; ++msgidx) {
outputs[channelName].AddPart(std::move(parts.At(msgidx)));
}
msgidx -= 2;
}
if (finalBlockIndex == 0 && !DataSpecUtils::match(query, "DPL", "EOS", 0)) {
unmatchedDescriptions.emplace_back(DataSpecUtils::describe(query));
}
}
} // end of loop over parts

for (auto& [channelName, channelParts] : outputs) {
if (channelParts.Size() == 0) {
continue;
Expand Down
Loading