Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
#include "ReconstructionDataFormats/VtxTrackRef.h"
#include "ReconstructionDataFormats/TrackCosmics.h"
#include "DataFormatsITSMFT/TrkClusRef.h"
// FIXME: ideally, the data formats definition should be independent of the framework
// collectData is using the input of ProcessingContext to extract the first valid
// header and the TF orbit from it
#include "Framework/ProcessingContext.h"
#include "Framework/DataRefUtils.h"

using namespace o2::globaltracking;
using namespace o2::framework;
Expand Down Expand Up @@ -306,7 +311,7 @@ void RecoContainer::collectData(ProcessingContext& pc, const DataRequest& reques
{
auto& reqMap = requests.requestMap;

const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
startIR = {0, dh->firstTForbit};

auto req = reqMap.find("trackITS");
Expand Down
1 change: 0 additions & 1 deletion Detectors/GlobalTrackingWorkflow/src/TOFMatcherSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "DetectorsBase/Propagator.h"
#include "DetectorsCommonDataFormats/NameConf.h"
#include "DataFormatsParameters/GRPObject.h"
#include "Headers/DataHeader.h"
#include "CommonDataFormat/InteractionRecord.h"
#include "DataFormatsGlobalTracking/RecoContainer.h"
#include "Framework/Task.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "DataFormatsTPC/Constants.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/Task.h"
#include "Framework/DataRefUtils.h"
#include <string>
#include "TStopwatch.h"
#include "Framework/ConfigParamRegistry.h"
Expand Down Expand Up @@ -131,7 +132,7 @@ void TPCITSMatchingDPL::init(InitContext& ic)

void TPCITSMatchingDPL::run(ProcessingContext& pc)
{
const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
LOG(INFO) << " startOrbit: " << dh->firstTForbit;
mTimer.Start(false);
RecoContainer recoData;
Expand Down
9 changes: 3 additions & 6 deletions Detectors/HMPID/workflow/src/DataDecoderSpec2.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "Framework/WorkflowSpec.h"
#include "Framework/Logger.h"
#include "Framework/InputRecordWalker.h"
#include "Framework/DataRefUtils.h"

#include "Headers/RAWDataHeader.h"
#include "DetectorsRaw/RDHUtils.h"
Expand Down Expand Up @@ -251,21 +252,17 @@ void DataDecoderTask2::decodeRawFile(framework::ProcessingContext& pc)

for (auto&& input : pc.inputs()) {
if (input.spec->binding == "file") {
const header::DataHeader* header = o2::header::get<header::DataHeader*>(input.header);
if (!header) {
return;
}

auto const* raw = input.payload;
size_t payloadSize = header->payloadSize;
size_t payloadSize = DataRefUtils::getPayloadSize(input);

LOG(INFO) << " payloadSize=" << payloadSize;
if (payloadSize == 0) {
return;
}

uint32_t* theBuffer = (uint32_t*)input.payload;
int pagesize = header->payloadSize;
int pagesize = payloadSize;
mDeco->setUpStream(theBuffer, pagesize);
try {
if (mFastAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "Framework/WorkflowSpec.h"
#include "Framework/ConfigParamSpec.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DataRefUtils.h"

#include "Headers/RDHAny.h"
#include "MCHRawDecoder/PageDecoder.h"
Expand Down Expand Up @@ -283,13 +284,10 @@ class PedestalsTask
// the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
void decodeReadout(const o2::framework::DataRef& input)
{
const auto* header = o2::header::get<header::DataHeader*>(input.header);
if (!header) {
return;
}

// Note: DPL allows to extract the san directly from the input
// this would make this function obsolete
auto const* raw = input.payload;
size_t payloadSize = header->payloadSize;
size_t payloadSize = DataRefUtils::getPayloadSize(input);

gsl::span<const std::byte> buffer(reinterpret_cast<const std::byte*>(raw), payloadSize);
decodeBuffer(buffer);
Expand Down
10 changes: 3 additions & 7 deletions Detectors/MUON/MCH/Workflow/src/DataDecoderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "Framework/Output.h"
#include "Framework/Task.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DataRefUtils.h"

#include "Headers/RAWDataHeader.h"
#include "DetectorsRaw/RDHUtils.h"
Expand Down Expand Up @@ -98,7 +99,7 @@ class DataDecoderTask
// the decodeTF() function processes the messages generated by the (sub)TimeFrame builder
void decodeTF(framework::ProcessingContext& pc)
{
const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
mFirstTForbit = dh->firstTForbit;

if (!mDecoder->getFirstOrbitInRun()) {
Expand Down Expand Up @@ -137,14 +138,9 @@ class DataDecoderTask
return;
}

const auto* header = o2::header::get<header::DataHeader*>(input.header);
if (!header) {
return;
}

auto const* raw = input.payload;
// size of payload
size_t payloadSize = header->payloadSize;
size_t payloadSize = DataRefUtils::getPayloadSize(input);

if (mDebug) {
std::cout << nFrame << " payloadSize=" << payloadSize << std::endl;
Expand Down
3 changes: 2 additions & 1 deletion Detectors/TOF/workflow/src/CompressedDecodingTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "Framework/Logger.h"
#include "DetectorsRaw/RDHUtils.h"
#include "Framework/InputRecordWalker.h"
#include "Framework/DataRefUtils.h"

using namespace o2::framework;

Expand Down Expand Up @@ -123,7 +124,7 @@ void CompressedDecodingTask::run(ProcessingContext& pc)
mTimer.Start(false);

//RS set the 1st orbit of the TF from the O2 header, relying on rdhHandler is not good (in fact, the RDH might be eliminated in the derived data)
const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
mInitOrbit = dh->firstTForbit;
if (!mConetMode) {
mDecoder.setFirstIR({0, mInitOrbit});
Expand Down
3 changes: 2 additions & 1 deletion Detectors/TOF/workflow/src/TOFClusterizerSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "TOFCalibration/CalibTOFapi.h"
#include "TStopwatch.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/DataRefUtils.h"

#include <memory> // for make_shared, make_unique, unique_ptr
#include <vector>
Expand Down Expand Up @@ -78,7 +79,7 @@ class TOFDPLClustererTask
auto digits = pc.inputs().get<gsl::span<o2::tof::Digit>>("tofdigits");
auto row = pc.inputs().get<gsl::span<o2::tof::ReadoutWindowData>>("readoutwin");

const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
mClusterer.setFirstOrbit(dh->firstTForbit);

auto labelvector = std::make_shared<std::vector<o2::dataformats::MCTruthContainer<o2::MCCompLabel>>>();
Expand Down
3 changes: 2 additions & 1 deletion Detectors/TPC/workflow/src/ZSSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Framework/ControlService.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/InputRecordWalker.h"
#include "Framework/DataRefUtils.h"
#include "DataFormatsTPC/TPCSectorHeader.h"
#include "DataFormatsTPC/ZeroSuppression.h"
#include "DataFormatsTPC/Helpers.h"
Expand Down Expand Up @@ -99,7 +100,7 @@ DataProcessorSpec getZSEncoderSpec(std::vector<int> const& tpcSectors, bool outR

const auto& inputs = getWorkflowTPCInput(pc, 0, false, false, tpcSectorMask, true);
sizes.resize(NSectors * NEndpoints);
const auto* dh = o2::header::get<o2::header::DataHeader*>(pc.inputs().getFirstValid(true).header);
const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
o2::InteractionRecord ir{0, dh->firstTForbit};
o2::gpu::GPUReconstructionConvert::RunZSEncoder<o2::tpc::Digit, DigitArray>(inputs->inputDigits, &zsoutput, sizes.data(), nullptr, &ir, _GPUParam, true, verify, config.configReconstruction.tpc.zsThreshold);
ZeroSuppressedContainer8kb* page = reinterpret_cast<ZeroSuppressedContainer8kb*>(zsoutput.get());
Expand Down
18 changes: 9 additions & 9 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class InputRecord
// the buffer to be deleted when it goes out of scope. The string is built
// from the data and its lengh, null-termination is not necessary.
// return std::string object
auto header = header::get<const header::DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
return std::string(ref.payload, header->payloadSize);

Expand All @@ -253,7 +253,7 @@ class InputRecord
// substitution for TableConsumer
// For the moment this is dummy, as it requires proper support to
// create the RDataSource from the arrow buffer.
auto header = header::get<const header::DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto data = reinterpret_cast<uint8_t const*>(ref.payload);
return std::make_unique<TableConsumer>(data, header->payloadSize);
Expand All @@ -264,7 +264,7 @@ class InputRecord
// We have to deserialize the ostringstream.
// FIXME: check that the string is null terminated.
// @return deserialized copy of payload
auto header = header::get<const header::DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto str = std::string(ref.payload, header->payloadSize);
assert(header->payloadSize == sizeof(T));
Expand All @@ -279,7 +279,7 @@ class InputRecord
// substitution for span of messageable objects
// FIXME: there will be std::span in C++20
static_assert(is_messageable<typename T::value_type>::value, "span can only be created for messageable types");
auto header = header::get<const header::DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
if (sizeof(typename T::value_type) > 1 && header->payloadSerializationMethod != o2::header::gSerializationMethodNone) {
throw runtime_error("Inconsistent serialization method for extracting span");
Expand All @@ -297,7 +297,7 @@ class InputRecord
} else if constexpr (is_container<T>::value) {
// currently implemented only for vectors
if constexpr (is_specialization<typename std::remove_const<T>::type, std::vector>::value) {
auto header = o2::header::get<const DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
// TODO: construct a vector spectator
Expand Down Expand Up @@ -340,7 +340,7 @@ class InputRecord
// unserialized objects
using DataHeader = o2::header::DataHeader;

auto header = o2::header::get<const DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto method = header->payloadSerializationMethod;
if (method != o2::header::gSerializationMethodNone) {
// FIXME: we could in principle support serialized content here as well if we
Expand All @@ -357,7 +357,7 @@ class InputRecord
using DataHeader = o2::header::DataHeader;
using ValueT = typename std::remove_pointer<T>::type;

auto header = o2::header::get<const DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
if constexpr (is_messageable<ValueT>::value) {
Expand Down Expand Up @@ -402,7 +402,7 @@ class InputRecord
// the operation depends on the transmitted serialization method
using DataHeader = o2::header::DataHeader;

auto header = o2::header::get<const DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
auto method = header->payloadSerializationMethod;
if (method == o2::header::gSerializationMethodNone) {
// this code path is only selected if the type is non-messageable
Expand All @@ -425,7 +425,7 @@ class InputRecord
T get_boost(char const* binding) const
{
DataRef ref = get<DataRef>(binding);
auto header = header::get<const header::DataHeader*>(ref.header);
auto header = DataRefUtils::getHeader<header::DataHeader*>(ref);
assert(header);
auto str = std::string(ref.payload, header->payloadSize);
auto desData = o2::utils::BoostDeserialize<T>(str);
Expand Down
8 changes: 4 additions & 4 deletions Framework/Core/test/test_CCDBFetcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
AlgorithmSpec{

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktf, looks like this test is not compiled anymore, should we remove it? There is almost identical code in Framework/TestWorkflows/src/test_CCDBFetcher.cxx

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

adaptStateless([](DataAllocator& outputs, InputRecord& inputs, ControlService& control) {
DataRef condition = inputs.get("somecondition");
auto* header = o2::header::get<const DataHeader*>(condition.header);
if (header->payloadSize != 2048) {
LOGP(ERROR, "Wrong size for condition payload (expected {}, found {}", 2048, header->payloadSize);
auto payloadSize = DataRefUtils::getPayloadSize(condition);
if (payloadSize != 2048) {
LOGP(ERROR, "Wrong size for condition payload (expected {}, found {}", 2048, payloadSize);
}
header->payloadSize;
payloadSize;
control.readyToQuit(QuitRequest::All);
})},
Options{
Expand Down
7 changes: 4 additions & 3 deletions Framework/Core/test/test_DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "Framework/SerializationMethods.h"
#include "Framework/OutputRoute.h"
#include "Framework/ConcreteDataMatcher.h"
#include "Framework/DataRefUtils.h"
#include "Headers/DataHeader.h"
#include "TestClasses.h"
#include "Framework/Logger.h"
Expand Down Expand Up @@ -192,7 +193,7 @@ DataProcessorSpec getSinkSpec()
if (iit.isValid() == false) {
continue;
}
auto* dh = o2::header::get<const DataHeader*>(input.header);
auto* dh = DataRefUtils::getHeader<const DataHeader*>(input);
LOG(INFO) << "{" << dh->dataOrigin.str << ":" << dh->dataDescription.str << ":" << dh->subSpecification << "}"
<< " payload size " << dh->payloadSize;

Expand Down Expand Up @@ -322,7 +323,7 @@ DataProcessorSpec getSinkSpec()
auto pmrspan = pc.inputs().get<gsl::span<o2::test::TriviallyCopyable>>("inputPMR");
ASSERT_ERROR((pmrspan[0] == o2::test::TriviallyCopyable{1, 2, 3}));
auto dataref = pc.inputs().get<DataRef>("inputPMR");
auto header = o2::header::get<const o2::header::DataHeader*>(dataref.header);
auto header = DataRefUtils::getHeader<const o2::header::DataHeader*>(dataref);
ASSERT_ERROR((header->payloadSize == sizeof(o2::test::TriviallyCopyable)));

LOG(INFO) << "extracting POD vector";
Expand Down Expand Up @@ -371,7 +372,7 @@ DataProcessorSpec getSpectatorSinkSpec()
if (iit.isValid() == false) {
continue;
}
auto* dh = o2::header::get<const DataHeader*>(input.header);
auto* dh = DataRefUtils::getHeader<const DataHeader*>(input);
LOG(INFO) << "{" << dh->dataOrigin.str << ":" << dh->dataDescription.str << ":" << dh->subSpecification << "}"
<< " payload size " << dh->payloadSize;

Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/test/test_Parallel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "Framework/ControlService.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataRefUtils.h"
#include "Framework/ParallelContext.h"
#include "Framework/runDataProcessing.h"

Expand Down Expand Up @@ -111,7 +112,7 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)

LOG(DEBUG) << "DataSampler sends data from subSpec: " << matcher.subSpec;

const auto* inputHeader = o2::header::get<o2::header::DataHeader*>(input.header);
const auto* inputHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
auto& output = ctx.outputs().make<char>(description, inputHeader->size());

//todo: use some std function or adopt(), when it is available for POD data
Expand Down
1 change: 0 additions & 1 deletion Framework/Core/test/test_SimpleRDataFrameProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <memory>

using namespace o2::framework;
using DataHeader = o2::header::DataHeader;

/// Example of how to use ROOT::RDataFrame using DPL.
WorkflowSpec defineDataProcessing(ConfigContext const&)
Expand Down
3 changes: 2 additions & 1 deletion Framework/Core/test/test_SimpleWildcard02.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Framework/ControlService.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/Logger.h"
#include "Framework/DataRefUtils.h"
#include <iostream>
#include <algorithm>
#include <memory>
Expand Down Expand Up @@ -59,7 +60,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
}
for (size_t i = 0; i < n; ++i) {
auto ref = inputs.getByPos(0, i);
auto dh = o2::header::get<o2::header::DataHeader*>(ref.header);
auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
LOG(INFO) << "String is " << s->GetString().Data() << " " << dh->subSpecification;
}
}); })}}};
Expand Down
6 changes: 3 additions & 3 deletions Framework/TestWorkflows/src/o2DummyWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
AlgorithmSpec{
[](ProcessingContext& ctx) {
// We verify we got inputs in the correct order
auto h0 = o2::header::get<DataHeader*>(ctx.inputs().get("clusters").header);
auto h1 = o2::header::get<DataHeader*>(ctx.inputs().get("summary").header);
auto h2 = o2::header::get<DataHeader*>(ctx.inputs().get("other_summary").header);
auto h0 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("clusters"));
auto h1 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("summary"));
auto h2 = DataRefUtils::getHeader<DataHeader*>(ctx.inputs().get("other_summary"));
// This should always be the case, since the
// test for an actual DataHeader should happen in the device itself.
assert(h0 && h1 && h2);
Expand Down
2 changes: 0 additions & 2 deletions Framework/TestWorkflows/src/o2_sim_its_ALP3.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

using namespace o2::framework;

using DataHeader = o2::header::DataHeader;

double radii2Turbo(double rMin, double rMid, double rMax, double sensW)
{
// compute turbo angle from radii and sensor width
Expand Down
2 changes: 0 additions & 2 deletions Framework/TestWorkflows/src/o2_sim_tpc.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

using namespace o2::framework;

using DataHeader = o2::header::DataHeader;

#define BOX_GENERATOR 1

namespace o2
Expand Down
Loading