@@ -184,9 +184,10 @@ InjectorFunction o2DataModelAdaptor(OutputSpec const& spec, uint64_t startTime,
184184 };
185185}
186186
187- InjectorFunction dplModelAdaptor (std::vector<OutputSpec> const & filterSpecs, bool throwOnUnmatchedInputs )
187+ InjectorFunction dplModelAdaptor (std::vector<OutputSpec> const & filterSpecs, DPLModelAdapterConfig config )
188188{
189- // structure to hald information on the unmatch ed data and print a warning at cleanup
189+ bool throwOnUnmatchedInputs = config.throwOnUnmatchedInputs ;
190+ // structure to hold information on the unmatched data and print a warning at cleanup
190191 class DroppedDataSpecs
191192 {
192193 public:
@@ -209,7 +210,8 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
209210 void warning () const
210211 {
211212 if (not descriptions.empty ()) {
212- LOG (warning) << " Some input data are not matched by filter rules " << descriptions << " \n "
213+ LOG (warning) << " Some input data could not be matched by filter rules to output specs\n "
214+ << " Active rules: " << descriptions << " \n "
213215 << " DROPPING OF THESE MESSAGES HAS BEEN ENABLED BY CONFIGURATION" ;
214216 }
215217 }
@@ -221,11 +223,9 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
221223 return [filterSpecs = std::move (filterSpecs), throwOnUnmatchedInputs, droppedDataSpecs = std::make_shared<DroppedDataSpecs>()](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
222224 std::unordered_map<std::string, FairMQParts> outputs;
223225 std::vector<std::string> unmatchedDescriptions;
224- int lastSplitPartIndex = -1 ;
225- std::string channelNameForSplitParts;
226226 static int64_t dplCounter = -1 ;
227227 dplCounter++;
228- for (size_t msgidx = 0 ; msgidx < parts.Size () / 2 ; ++ msgidx) {
228+ for (size_t msgidx = 0 ; msgidx < parts.Size (); msgidx += 2 ) {
229229 const auto dh = o2::header::get<DataHeader*>(parts.At (msgidx * 2 )->GetData ());
230230 if (!dh) {
231231 LOG (error) << " data on input " << msgidx << " does not follow the O2 data model, DataHeader missing" ;
@@ -241,49 +241,63 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, boo
241241
242242 OutputSpec query{dh->dataOrigin , dh->dataDescription , dh->subSpecification };
243243 LOG (debug) << " processing " << DataSpecUtils::describe (OutputSpec{dh->dataOrigin , dh->dataDescription , dh->subSpecification }) << " time slice " << dph->startTime << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts ;
244- bool indexDone = false ;
244+ size_t finalBlockIndex = 0 ;
245+ std::string channelName = " " ;
246+
245247 for (auto const & spec : filterSpecs) {
246248 // filter on the specified OutputSpecs, the default value is a ConcreteDataTypeMatcher with origin and description 'any'
247249 if (DataSpecUtils::match (spec, OutputSpec{{header::gDataOriginAny , header::gDataDescriptionAny }}) ||
248250 DataSpecUtils::match (spec, query)) {
249- auto channelName = channelRetriever (query, dph->startTime );
251+ channelName = channelRetriever (query, dph->startTime );
250252 if (channelName.empty ()) {
251253 LOG (warning) << " can not find matching channel, not able to adopt " << DataSpecUtils::describe (query);
252- break ;
253- }
254- // the checks for consistency of split payload parts are of informative nature
255- // forwarding happens independently
256- if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype (dh->splitPayloadParts )>::max ()) {
257- if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0 ) {
258- LOG (warning) << " wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)" ;
259- } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1 ) {
260- LOG (warning) << " unordered split parts, expecting part " << lastSplitPartIndex + 1 << " , got " << dh->splitPayloadIndex
261- << " of " << dh->splitPayloadParts ;
262- } else if (channelNameForSplitParts.empty () == false && channelName != channelNameForSplitParts) {
263- LOG (error) << " inconsistent channel for split part " << dh->splitPayloadIndex
264- << " , matching " << channelName << " , expecting " << channelNameForSplitParts;
265- }
266- lastSplitPartIndex = dh->splitPayloadIndex ;
267- channelNameForSplitParts = channelName;
268- if (lastSplitPartIndex + 1 == dh->splitPayloadParts ) {
269- lastSplitPartIndex = -1 ;
270- channelNameForSplitParts = " " ;
271- }
272- } else if (lastSplitPartIndex != -1 ) {
273- LOG (warning) << " found incomplete or unordered split parts, expecting part " << lastSplitPartIndex + 1
274- << " but got a new data block" ;
275254 }
276- outputs[channelName].AddPart (std::move (parts.At (msgidx * 2 )));
277- outputs[channelName].AddPart (std::move (parts.At (msgidx * 2 + 1 )));
278- LOG (debug) << " associating part with index " << msgidx << " to channel " << channelName << " (" << outputs[channelName].Size () << " )" ;
279- indexDone = true ;
280255 break ;
281256 }
282257 }
283- if (indexDone == false && !DataSpecUtils::match (query, " DPL" , " EOS" , 0 )) {
258+ if (!channelName.empty ()) {
259+ if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex ) {
260+ // this is indicating a sequence of payloads following the header
261+ // FIXME: we will probably also set the DataHeader version
262+ finalBlockIndex = msgidx + dh->splitPayloadParts + 1 ;
263+ } else {
264+ // We can consider the next splitPayloadParts as one block of messages pairs
265+ // because we are guaranteed they are all the same.
266+ // If splitPayloadParts = 0, we assume that means there is only one (header, payload)
267+ // pair.
268+ finalBlockIndex = msgidx + (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1 ) * 2 ;
269+ }
270+ assert (finalBlockIndex >= msgidx + 2 );
271+ if (finalBlockIndex > parts.Size ()) {
272+ // TODO error handling
273+ // LOGP(error, "DataHeader::splitPayloadParts invalid");
274+ continue ;
275+ }
276+
277+ // the checks for consistency of split payload parts are of informative nature
278+ // forwarding happens independently
279+ // if (dh->splitPayloadParts > 1 && dh->splitPayloadParts != std::numeric_limits<decltype(dh->splitPayloadParts)>::max()) {
280+ // if (lastSplitPartIndex == -1 && dh->splitPayloadIndex != 0) {
281+ // LOG(warning) << "wrong split part index, expecting the first of " << dh->splitPayloadParts << " part(s)";
282+ // } else if (dh->splitPayloadIndex != lastSplitPartIndex + 1) {
283+ // LOG(warning) << "unordered split parts, expecting part " << lastSplitPartIndex + 1 << ", got " << dh->splitPayloadIndex
284+ // << " of " << dh->splitPayloadParts;
285+ // } else if (channelNameForSplitParts.empty() == false && channelName != channelNameForSplitParts) {
286+ // LOG(error) << "inconsistent channel for split part " << dh->splitPayloadIndex
287+ // << ", matching " << channelName << ", expecting " << channelNameForSplitParts;
288+ // }
289+ // }
290+ LOGP (debug, " associating {} part(s) at index {} to channel {} ({})" , finalBlockIndex - msgidx, msgidx, channelName, outputs[channelName].Size ());
291+ for (; msgidx < finalBlockIndex; ++msgidx) {
292+ outputs[channelName].AddPart (std::move (parts.At (msgidx)));
293+ }
294+ msgidx -= 2 ;
295+ }
296+ if (finalBlockIndex == 0 && !DataSpecUtils::match (query, " DPL" , " EOS" , 0 )) {
284297 unmatchedDescriptions.emplace_back (DataSpecUtils::describe (query));
285298 }
286- }
299+ } // end of loop over parts
300+
287301 for (auto & [channelName, channelParts] : outputs) {
288302 if (channelParts.Size () == 0 ) {
289303 continue ;
0 commit comments