Skip to content

Commit 94ef6f6

Browse files
DPL: Storing actual message size in the DataRef
When multiple payloads per header will be supported, the payload size in the DataHeader can only store one value and we need the actual payload message size. DataRefUtils::getPayloadSize should always be used.
1 parent 86dbc3f commit 94ef6f6

5 files changed

Lines changed: 30 additions & 23 deletions

File tree

Framework/Core/include/Framework/DataRef.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#ifndef FRAMEWORK_DATAREF_H
1212
#define FRAMEWORK_DATAREF_H
1313

14+
#include <cstddef> // for size_t
15+
1416
namespace o2
1517
{
1618
namespace framework
@@ -24,6 +26,7 @@ struct DataRef {
2426
const InputSpec* spec = nullptr;
2527
const char* header = nullptr;
2628
const char* payload = nullptr;
29+
size_t payloadSize = 0;
2730
};
2831

2932
} // namespace framework

Framework/Core/include/Framework/DataRefUtils.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,14 @@ struct DataRefUtils {
168168
if (!header) {
169169
return 0;
170170
}
171-
return header->payloadSize;
171+
// in case of an O2 message with multiple payloads, the size of the message stored
172+
// in DataRef is returned,
173+
// as a prototype solution we are using splitPayloadIndex == splitPayloadParts to
174+
// indicate that there are splitPayloadParts payloads following the header
175+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
176+
return ref.payloadSize;
177+
}
178+
return header->payloadSize < ref.payloadSize || ref.payloadSize == 0 ? header->payloadSize : ref.payloadSize;
172179
}
173180

174181
template <typename T>

Framework/Core/include/Framework/InputRecord.h

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,7 @@ class InputRecord
234234
// the buffer to be deleted when it goes out of scope. The string is built
235235
// from the data and its lengh, null-termination is not necessary.
236236
// return std::string object
237-
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
238-
assert(header);
239-
return std::string(ref.payload, header->payloadSize);
237+
return std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
240238

241239
// implementation (c)
242240
} else if constexpr (std::is_same<T, char const*>::value) {
@@ -253,21 +251,17 @@ class InputRecord
253251
// substitution for TableConsumer
254252
// For the moment this is dummy, as it requires proper support to
255253
// create the RDataSource from the arrow buffer.
256-
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
257-
assert(header);
258254
auto data = reinterpret_cast<uint8_t const*>(ref.payload);
259-
return std::make_unique<TableConsumer>(data, header->payloadSize);
255+
return std::make_unique<TableConsumer>(data, DataRefUtils::getPayloadSize(ref));
260256

261257
// implementation (e)
262258
} else if constexpr (framework::is_boost_serializable<T>::value || is_specialization<T, BoostSerialized>::value) {
263259
// substitution for boost-serialized entities
264260
// We have to deserialize the ostringstream.
265261
// FIXME: check that the string is null terminated.
266262
// @return deserialized copy of payload
267-
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
268-
assert(header);
269-
auto str = std::string(ref.payload, header->payloadSize);
270-
assert(header->payloadSize == sizeof(T));
263+
auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
264+
assert(DataRefUtils::getPayloadSize(ref) == sizeof(T));
271265
if constexpr (is_specialization<T, BoostSerialized>::value) {
272266
return o2::utils::BoostDeserialize<typename T::wrapped_type>(str);
273267
} else {
@@ -285,25 +279,27 @@ class InputRecord
285279
throw runtime_error("Inconsistent serialization method for extracting span");
286280
}
287281
using ValueT = typename T::value_type;
288-
if (header->payloadSize % sizeof(ValueT)) {
282+
auto payloadSize = DataRefUtils::getPayloadSize(ref);
283+
if (payloadSize % sizeof(ValueT)) {
289284
throw runtime_error(("Inconsistent type and payload size at " + std::string(ref.spec->binding) + "(" + DataSpecUtils::describe(*ref.spec) + ")" +
290285
": type size " + std::to_string(sizeof(ValueT)) +
291-
" payload size " + std::to_string(header->payloadSize))
286+
" payload size " + std::to_string(payloadSize))
292287
.c_str());
293288
}
294-
return gsl::span<ValueT const>(reinterpret_cast<ValueT const*>(ref.payload), header->payloadSize / sizeof(ValueT));
289+
return gsl::span<ValueT const>(reinterpret_cast<ValueT const*>(ref.payload), payloadSize / sizeof(ValueT));
295290

296291
// implementation (g)
297292
} else if constexpr (is_container<T>::value) {
298293
// currently implemented only for vectors
299294
if constexpr (is_specialization<typename std::remove_const<T>::type, std::vector>::value) {
300295
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
296+
auto payloadSize = DataRefUtils::getPayloadSize(ref);
301297
auto method = header->payloadSerializationMethod;
302298
if (method == o2::header::gSerializationMethodNone) {
303299
// TODO: construct a vector spectator
304300
// this is a quick solution now which makes a copy of the plain vector data
305301
auto* start = reinterpret_cast<typename T::value_type const*>(ref.payload);
306-
auto* end = start + header->payloadSize / sizeof(typename T::value_type);
302+
auto* end = start + payloadSize / sizeof(typename T::value_type);
307303
T result(start, end);
308304
return result;
309305
} else if (method == o2::header::gSerializationMethodROOT) {
@@ -358,6 +354,7 @@ class InputRecord
358354
using ValueT = typename std::remove_pointer<T>::type;
359355

360356
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
357+
auto payloadSize = DataRefUtils::getPayloadSize(ref);
361358
auto method = header->payloadSerializationMethod;
362359
if (method == o2::header::gSerializationMethodNone) {
363360
if constexpr (is_messageable<ValueT>::value) {
@@ -369,7 +366,7 @@ class InputRecord
369366
// TODO: construct a vector spectator
370367
// this is a quick solution now which makes a copy of the plain vector data
371368
auto* start = reinterpret_cast<typename ValueT::value_type const*>(ref.payload);
372-
auto* end = start + header->payloadSize / sizeof(typename ValueT::value_type);
369+
auto* end = start + payloadSize / sizeof(typename ValueT::value_type);
373370
auto container = std::make_unique<ValueT>(start, end);
374371
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(container.release(), Deleter<ValueT const>(true));
375372
return result;
@@ -425,9 +422,7 @@ class InputRecord
425422
T get_boost(char const* binding) const
426423
{
427424
DataRef ref = get<DataRef>(binding);
428-
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
429-
assert(header);
430-
auto str = std::string(ref.payload, header->payloadSize);
425+
auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
431426
auto desData = o2::utils::BoostDeserialize<T>(str);
432427
return std::move(desData);
433428
}

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,9 +1168,10 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
11681168
auto payload = currentSetOfInputs[i].payload(partindex).get();
11691169
return DataRef{nullptr,
11701170
static_cast<char const*>(header->GetData()),
1171-
static_cast<char const*>(payload ? payload->GetData() : nullptr)};
1171+
static_cast<char const*>(payload ? payload->GetData() : nullptr),
1172+
payload ? payload->GetSize() : 0};
11721173
}
1173-
return DataRef{nullptr, nullptr, nullptr};
1174+
return DataRef{};
11741175
};
11751176
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
11761177
return currentSetOfInputs[i].size();

Framework/Core/src/DataRelayer.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,10 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
515515
auto payload = partial[idx].payload(part).get();
516516
return DataRef{nullptr,
517517
reinterpret_cast<const char*>(header->GetData()),
518-
reinterpret_cast<const char*>(payload ? payload->GetData() : nullptr)};
518+
reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
519+
payload ? payload->GetSize() : 0};
519520
}
520-
return DataRef{nullptr, nullptr, nullptr};
521+
return DataRef{};
521522
};
522523
auto nPartsGetter = [&partial](size_t idx) {
523524
return partial[idx].size();

0 commit comments

Comments
 (0)