Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/DataRef.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef FRAMEWORK_DATAREF_H
#define FRAMEWORK_DATAREF_H

#include <cstddef> // for size_t

namespace o2
{
namespace framework
Expand All @@ -24,6 +26,7 @@ struct DataRef {
const InputSpec* spec = nullptr;
const char* header = nullptr;
const char* payload = nullptr;
size_t payloadSize = 0;
};

} // namespace framework
Expand Down
9 changes: 8 additions & 1 deletion Framework/Core/include/Framework/DataRefUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,14 @@ struct DataRefUtils {
if (!header) {
return 0;
}
return header->payloadSize;
// in case of an O2 message with multiple payloads, the size of the message stored
// in DataRef is returned,
// as a prototype solution we are using splitPayloadIndex == splitPayloadParts to
// indicate that there are splitPayloadParts payloads following the header
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
Comment thread
matthiasrichter marked this conversation as resolved.
Outdated
return ref.payloadSize;
}
return header->payloadSize < ref.payloadSize || ref.payloadSize == 0 ? header->payloadSize : ref.payloadSize;
}

template <typename T>
Expand Down
31 changes: 13 additions & 18 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,7 @@ class InputRecord
// the buffer to be deleted when it goes out of scope. The string is built
// from the data and its lengh, null-termination is not necessary.
// return std::string object
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
return std::string(ref.payload, header->payloadSize);
return std::string(ref.payload, DataRefUtils::getPayloadSize(ref));

// implementation (c)
} else if constexpr (std::is_same<T, char const*>::value) {
Expand All @@ -253,21 +251,17 @@ class InputRecord
// substitution for TableConsumer
// For the moment this is dummy, as it requires proper support to
// create the RDataSource from the arrow buffer.
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto data = reinterpret_cast<uint8_t const*>(ref.payload);
return std::make_unique<TableConsumer>(data, header->payloadSize);
return std::make_unique<TableConsumer>(data, DataRefUtils::getPayloadSize(ref));

// implementation (e)
} else if constexpr (framework::is_boost_serializable<T>::value || is_specialization<T, BoostSerialized>::value) {
// substitution for boost-serialized entities
// We have to deserialize the ostringstream.
// FIXME: check that the string is null terminated.
// @return deserialized copy of payload
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto str = std::string(ref.payload, header->payloadSize);
assert(header->payloadSize == sizeof(T));
auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
assert(DataRefUtils::getPayloadSize(ref) == sizeof(T));
if constexpr (is_specialization<T, BoostSerialized>::value) {
return o2::utils::BoostDeserialize<typename T::wrapped_type>(str);
} else {
Expand All @@ -285,25 +279,27 @@ class InputRecord
throw runtime_error("Inconsistent serialization method for extracting span");
}
using ValueT = typename T::value_type;
if (header->payloadSize % sizeof(ValueT)) {
auto payloadSize = DataRefUtils::getPayloadSize(ref);
if (payloadSize % sizeof(ValueT)) {
throw runtime_error(("Inconsistent type and payload size at " + std::string(ref.spec->binding) + "(" + DataSpecUtils::describe(*ref.spec) + ")" +
Comment thread
matthiasrichter marked this conversation as resolved.
Outdated
": type size " + std::to_string(sizeof(ValueT)) +
" payload size " + std::to_string(header->payloadSize))
" payload size " + std::to_string(payloadSize))
.c_str());
}
return gsl::span<ValueT const>(reinterpret_cast<ValueT const*>(ref.payload), header->payloadSize / sizeof(ValueT));
return gsl::span<ValueT const>(reinterpret_cast<ValueT const*>(ref.payload), payloadSize / sizeof(ValueT));

// implementation (g)
} else if constexpr (is_container<T>::value) {
// currently implemented only for vectors
if constexpr (is_specialization<typename std::remove_const<T>::type, std::vector>::value) {
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto payloadSize = DataRefUtils::getPayloadSize(ref);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
// TODO: construct a vector spectator
// this is a quick solution now which makes a copy of the plain vector data
auto* start = reinterpret_cast<typename T::value_type const*>(ref.payload);
auto* end = start + header->payloadSize / sizeof(typename T::value_type);
auto* end = start + payloadSize / sizeof(typename T::value_type);
T result(start, end);
return result;
} else if (method == o2::header::gSerializationMethodROOT) {
Expand Down Expand Up @@ -358,6 +354,7 @@ class InputRecord
using ValueT = typename std::remove_pointer<T>::type;

auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto payloadSize = DataRefUtils::getPayloadSize(ref);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
if constexpr (is_messageable<ValueT>::value) {
Expand All @@ -369,7 +366,7 @@ class InputRecord
// TODO: construct a vector spectator
// this is a quick solution now which makes a copy of the plain vector data
auto* start = reinterpret_cast<typename ValueT::value_type const*>(ref.payload);
auto* end = start + header->payloadSize / sizeof(typename ValueT::value_type);
auto* end = start + payloadSize / sizeof(typename ValueT::value_type);
auto container = std::make_unique<ValueT>(start, end);
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(container.release(), Deleter<ValueT const>(true));
return result;
Expand Down Expand Up @@ -425,9 +422,7 @@ class InputRecord
T get_boost(char const* binding) const
{
DataRef ref = get<DataRef>(binding);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto str = std::string(ref.payload, header->payloadSize);
auto str = std::string(ref.payload, DataRefUtils::getPayloadSize(ref));
auto desData = o2::utils::BoostDeserialize<T>(str);
return std::move(desData);
}
Expand Down
5 changes: 3 additions & 2 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1168,9 +1168,10 @@ bool DataProcessingDevice::tryDispatchComputation(DataProcessorContext& context,
auto payload = currentSetOfInputs[i].payload(partindex).get();
return DataRef{nullptr,
static_cast<char const*>(header->GetData()),
static_cast<char const*>(payload ? payload->GetData() : nullptr)};
static_cast<char const*>(payload ? payload->GetData() : nullptr),
payload ? payload->GetSize() : 0};
}
return DataRef{nullptr, nullptr, nullptr};
return DataRef{};
};
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
return currentSetOfInputs[i].size();
Expand Down
5 changes: 3 additions & 2 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,10 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
auto payload = partial[idx].payload(part).get();
return DataRef{nullptr,
reinterpret_cast<const char*>(header->GetData()),
reinterpret_cast<const char*>(payload ? payload->GetData() : nullptr)};
reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
payload ? payload->GetSize() : 0};
}
return DataRef{nullptr, nullptr, nullptr};
return DataRef{};
};
auto nPartsGetter = [&partial](size_t idx) {
return partial[idx].size();
Expand Down