From 052a5c9689b3ae1139520aa9ae4aef9386864ce3 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Mon, 8 Nov 2021 20:01:45 +0100 Subject: [PATCH] DPL: Storing actual message size in the DataRef When multiple payloads per header will be supported, the payload size in the DataHeader can only store one value and we need the actual payload message size. DataRefUtils::getPayloadSize should always be used. --- Framework/Core/include/Framework/DataRef.h | 3 ++ .../Core/include/Framework/DataRefUtils.h | 9 +++++- .../Core/include/Framework/InputRecord.h | 31 ++++++++----------- Framework/Core/src/DataProcessingDevice.cxx | 5 +-- Framework/Core/src/DataRelayer.cxx | 5 +-- 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/Framework/Core/include/Framework/DataRef.h b/Framework/Core/include/Framework/DataRef.h index 6aa2b8278def8..d4cba88b19333 100644 --- a/Framework/Core/include/Framework/DataRef.h +++ b/Framework/Core/include/Framework/DataRef.h @@ -11,6 +11,8 @@ #ifndef FRAMEWORK_DATAREF_H #define FRAMEWORK_DATAREF_H +#include // for size_t + namespace o2 { namespace framework @@ -24,6 +26,7 @@ struct DataRef { const InputSpec* spec = nullptr; const char* header = nullptr; const char* payload = nullptr; + size_t payloadSize = 0; }; } // namespace framework diff --git a/Framework/Core/include/Framework/DataRefUtils.h b/Framework/Core/include/Framework/DataRefUtils.h index 97ded9972bc2d..224328fbfc19e 100644 --- a/Framework/Core/include/Framework/DataRefUtils.h +++ b/Framework/Core/include/Framework/DataRefUtils.h @@ -168,7 +168,14 @@ struct DataRefUtils { if (!header) { return 0; } - return header->payloadSize; + // in case of an O2 message with multiple payloads, the size of the message stored + // in DataRef is returned, + // as a prototype solution we are using splitPayloadIndex == splitPayloadParts to + // indicate that there are splitPayloadParts payloads following the header + if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) { + return ref.payloadSize; + } + return header->payloadSize < ref.payloadSize || ref.payloadSize == 0 ? header->payloadSize : ref.payloadSize; } template diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index 62a0d19fccfac..281b930736e74 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -234,9 +234,7 @@ class InputRecord // the buffer to be deleted when it goes out of scope. The string is built // from the data and its lengh, null-termination is not necessary. // return std::string object - auto header = DataRefUtils::getHeader(ref); - assert(header); - return std::string(ref.payload, header->payloadSize); + return std::string(ref.payload, DataRefUtils::getPayloadSize(ref)); // implementation (c) } else if constexpr (std::is_same::value) { @@ -253,10 +251,8 @@ class InputRecord // substitution for TableConsumer // For the moment this is dummy, as it requires proper support to // create the RDataSource from the arrow buffer. - auto header = DataRefUtils::getHeader(ref); - assert(header); auto data = reinterpret_cast(ref.payload); - return std::make_unique(data, header->payloadSize); + return std::make_unique(data, DataRefUtils::getPayloadSize(ref)); // implementation (e) } else if constexpr (framework::is_boost_serializable::value || is_specialization::value) { @@ -264,10 +260,8 @@ class InputRecord // We have to deserialize the ostringstream. // FIXME: check that the string is null terminated. // @return deserialized copy of payload - auto header = DataRefUtils::getHeader(ref); - assert(header); - auto str = std::string(ref.payload, header->payloadSize); - assert(header->payloadSize == sizeof(T)); + auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref)); + assert(DataRefUtils::getPayloadSize(ref) == sizeof(T)); if constexpr (is_specialization::value) { return o2::utils::BoostDeserialize(str); } else { @@ -285,25 +279,27 @@ class InputRecord throw runtime_error("Inconsistent serialization method for extracting span"); } using ValueT = typename T::value_type; - if (header->payloadSize % sizeof(ValueT)) { + auto payloadSize = DataRefUtils::getPayloadSize(ref); + if (payloadSize % sizeof(ValueT)) { throw runtime_error(("Inconsistent type and payload size at " + std::string(ref.spec->binding) + "(" + DataSpecUtils::describe(*ref.spec) + ")" + ": type size " + std::to_string(sizeof(ValueT)) + - " payload size " + std::to_string(header->payloadSize)) + " payload size " + std::to_string(payloadSize)) .c_str()); } - return gsl::span(reinterpret_cast(ref.payload), header->payloadSize / sizeof(ValueT)); + return gsl::span(reinterpret_cast(ref.payload), payloadSize / sizeof(ValueT)); // implementation (g) } else if constexpr (is_container::value) { // currently implemented only for vectors if constexpr (is_specialization::type, std::vector>::value) { auto header = DataRefUtils::getHeader(ref); + auto payloadSize = DataRefUtils::getPayloadSize(ref); auto method = header->payloadSerializationMethod; if (method == o2::header::gSerializationMethodNone) { // TODO: construct a vector spectator // this is a quick solution now which makes a copy of the plain vector data auto* start = reinterpret_cast(ref.payload); - auto* end = start + header->payloadSize / sizeof(typename T::value_type); + auto* end = start + payloadSize / sizeof(typename T::value_type); T result(start, end); return result; } else if (method == o2::header::gSerializationMethodROOT) { @@ -358,6 +354,7 @@ class InputRecord using ValueT = typename std::remove_pointer::type; auto header = DataRefUtils::getHeader(ref); + auto payloadSize = DataRefUtils::getPayloadSize(ref); auto method = header->payloadSerializationMethod; if (method == o2::header::gSerializationMethodNone) { if constexpr (is_messageable::value) { @@ -369,7 +366,7 @@ class InputRecord // TODO: construct a vector spectator // this is a quick solution now which makes a copy of the plain vector data auto* start = reinterpret_cast(ref.payload); - auto* end = start + header->payloadSize / sizeof(typename ValueT::value_type); + auto* end = start + payloadSize / sizeof(typename ValueT::value_type); auto container = std::make_unique(start, end); std::unique_ptr> result(container.release(), Deleter(true)); return result; @@ -425,9 +422,7 @@ class InputRecord T get_boost(char const* binding) const { DataRef ref = get(binding); - auto header = DataRefUtils::getHeader(ref); - assert(header); - auto str = std::string(ref.payload, header->payloadSize); + auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref)); auto desData = o2::utils::BoostDeserialize(str); return std::move(desData); } diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 1431756398e2a..2471248c160b7 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1168,9 +1168,10 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context, auto payload = currentSetOfInputs[i].payload(partindex).get(); return DataRef{nullptr, static_cast(header->GetData()), - static_cast(payload ? payload->GetData() : nullptr)}; + static_cast(payload ? payload->GetData() : nullptr), + payload ? payload->GetSize() : 0}; } - return DataRef{nullptr, nullptr, nullptr}; + return DataRef{}; }; auto nofPartsGetter = [¤tSetOfInputs](size_t i) -> size_t { return currentSetOfInputs[i].size(); diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index c6a5d627349f3..6ff98959387e6 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -512,9 +512,10 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto payload = partial[idx].payload(part).get(); return DataRef{nullptr, reinterpret_cast(header->GetData()), - reinterpret_cast(payload ? payload->GetData() : nullptr)}; + reinterpret_cast(payload ? payload->GetData() : nullptr), + payload ? payload->GetSize() : 0}; } - return DataRef{nullptr, nullptr, nullptr}; + return DataRef{}; }; auto nPartsGetter = [&partial](size_t idx) { return partial[idx].size();