From 6fa676360379bd1937eda36af67c699635d8db02 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 25 Jan 2022 15:01:05 +0100 Subject: [PATCH 1/3] DPL Analysis: disentangle slicing kernels * Remove unnecessary templates * Move code to .cxx * Consolidate usage of arrow value_counts kernel * Remove unnecessary includes --- Framework/Core/CMakeLists.txt | 1 + Framework/Core/include/Framework/ASoA.h | 17 +- .../Core/include/Framework/ASoAHelpers.h | 1 - .../Core/include/Framework/AnalysisManagers.h | 1 - Framework/Core/include/Framework/Kernels.h | 171 +++----------- Framework/Core/src/ASoA.cxx | 23 +- Framework/Core/src/Kernels.cxx | 208 ++++++++++++++++++ Framework/Core/test/test_Kernels.cxx | 4 +- 8 files changed, 240 insertions(+), 186 deletions(-) create mode 100644 Framework/Core/src/Kernels.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index e8e1bfef66079..322e0bb20e997 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -80,6 +80,7 @@ o2_add_library(Framework src/InputSpan.cxx src/InputSpec.cxx src/OutputSpec.cxx + src/Kernels.cxx src/LifetimeHelpers.cxx src/LocalRootFileService.cxx src/RootConfigParamHelpers.cxx diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 1f9b2eab62c42..bee91a96c9af0 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -20,11 +20,10 @@ #include "Framework/Expressions.h" #include "Framework/ArrowTypes.h" #include "Framework/RuntimeError.h" +#include "Framework/Kernels.h" #include #include #include -#include -#include #include #include #include @@ -936,14 +935,12 @@ auto select(T const& t, framework::expressions::Filter const& f) return Filtered({t.asArrowTable()}, selectionToVector(framework::expressions::createSelection(t.asArrowTable(), f))); } -arrow::Status getSliceFor(int value, char const* key, std::shared_ptr const& input, std::shared_ptr& output, uint64_t& offset); - template auto sliceBy(T const& t, framework::expressions::BindingNode const& node, int value) { uint64_t offset = 0; std::shared_ptr result = nullptr; - auto status = getSliceFor(value, node.name.c_str(), t.asArrowTable(), result, offset); + auto status = o2::framework::getSliceFor(value, node.name.c_str(), t.asArrowTable(), result, offset); if (status.ok()) { return T({result}, offset); } @@ -1260,15 +1257,7 @@ class Table arrow::Status initializeSliceCaches(char const* key) { mCurrentKey = key; - arrow::Datum value_counts; - auto options = arrow::compute::ScalarAggregateOptions::Defaults(); - ARROW_ASSIGN_OR_RAISE(value_counts, - arrow::compute::CallFunction("value_counts", {mTable->GetColumnByName(key)}, - &options)); - auto pair = static_cast(value_counts.array()); - mValues = std::make_shared>(pair.field(0)->data()); - mCounts = std::make_shared>(pair.field(1)->data()); - return arrow::Status::OK(); + return o2::framework::getSlices(key, mTable, mValues, mCounts); } public: diff --git a/Framework/Core/include/Framework/ASoAHelpers.h b/Framework/Core/include/Framework/ASoAHelpers.h index acf8ed6269b3b..dbdabcffb74f5 100644 --- a/Framework/Core/include/Framework/ASoAHelpers.h +++ b/Framework/Core/include/Framework/ASoAHelpers.h @@ -13,7 +13,6 @@ #define O2_FRAMEWORK_ASOAHELPERS_H_ #include "Framework/ASoA.h" -#include "Framework/Kernels.h" #include "Framework/RuntimeError.h" #include diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index d9daf7ab89147..9e4d8d3bb7c4f 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -13,7 +13,6 @@ #define FRAMEWORK_ANALYSISMANAGERS_H #include "Framework/AnalysisHelpers.h" #include "Framework/GroupedCombinations.h" -#include "Framework/Kernels.h" #include "Framework/ASoA.h" #include "Framework/ProcessingContext.h" #include "Framework/EndOfStreamContext.h" diff --git a/Framework/Core/include/Framework/Kernels.h b/Framework/Core/include/Framework/Kernels.h index 1355968559868..57c4c5b1c008c 100644 --- a/Framework/Core/include/Framework/Kernels.h +++ b/Framework/Core/include/Framework/Kernels.h @@ -12,168 +12,47 @@ #ifndef O2_FRAMEWORK_KERNELS_H_ #define O2_FRAMEWORK_KERNELS_H_ -#include "Framework/BasicOps.h" -#include "Framework/TableBuilder.h" - -#include -#include -#include -#include -#include - -#include +#include namespace o2::framework { using ListVector = std::vector>; -template -auto sliceByColumnGeneric( +/// Slice a given table uncheked, filling slice caches +arrow::Status getSlices( + const char* key, + std::shared_ptr const& input, + std::shared_ptr>& values, + std::shared_ptr>& counts); + +/// Slice a given table unchecked +arrow::Status getSliceFor( + int value, + char const* key, + std::shared_ptr const& input, + std::shared_ptr& output, + uint64_t& offset); + +/// Slice a given table checked, for grouping association +void sliceByColumnGeneric( char const* key, char const* target, std::shared_ptr const& input, - T fullSize, + int32_t fullSize, ListVector* groups, - ListVector* unassigned = nullptr) -{ - groups->resize(fullSize); - auto column = input->GetColumnByName(key); - int64_t row = 0; - for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) { - auto chunk = static_cast::ArrowType>>(column->chunk(iChunk)->data()); - for (auto iElement = 0; iElement < chunk.length(); ++iElement) { - auto v = chunk.Value(iElement); - if (v >= 0) { - if (v >= groups->size()) { - throw runtime_error_f("Table %s has an entry with index (%d) that is larger than the grouping table size (%d)", target, v, fullSize); - } - (*groups)[v].push_back(row); - } else if (unassigned != nullptr) { - auto av = std::abs(v); - if (unassigned->size() < av + 1) { - unassigned->resize(av + 1); - } - (*unassigned)[av].push_back(row); - } - ++row; - } - } -} + ListVector* unassigned = nullptr); -/// Slice a given table in a vector of tables each containing a slice. -/// @a slices the arrow tables in which the original @a input -/// is split into. -/// @a offset the offset in the original table at which the corresponding -/// slice was split. -template -auto sliceByColumn( +/// Slice a given table checked, fast, for grouping association assuming +/// the index is properly sorted +arrow::Status sliceByColumn( char const* key, char const* target, std::shared_ptr const& input, - T fullSize, + int32_t fullSize, std::vector* slices, std::vector* offsets = nullptr, std::vector* sizes = nullptr, std::vector* unassignedSlices = nullptr, - std::vector* unassignedOffsets = nullptr) -{ - arrow::Datum value_counts; - auto column = input->GetColumnByName(key); - for (auto i = 0; i < column->num_chunks(); ++i) { - T prev = 0; - T cur = 0; - T lastNeg = 0; - T lastPos = 0; - - auto array = static_cast::ArrowType>>(column->chunk(i)->data()); - for (auto e = 0; e < array.length(); ++e) { - prev = cur; - if (prev >= 0) { - lastPos = prev; - } else { - lastNeg = prev; - } - cur = array.Value(e); - if (cur >= 0) { - if (lastPos > cur) { - throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target, key, cur, lastPos); - } else if (lastPos == cur && prev < 0) { - throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); - } - } else { - if (lastNeg < cur) { - throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target, key, cur, lastNeg); - } else if (lastNeg == cur && prev >= 0) { - throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); - } - } - } - } - auto options = arrow::compute::ScalarAggregateOptions::Defaults(); - ARROW_ASSIGN_OR_RAISE(value_counts, - arrow::compute::CallFunction("value_counts", {column}, - &options)); - auto pair = static_cast(value_counts.array()); - auto values = static_cast::ArrowType>>(pair.field(0)->data()); - auto counts = static_cast>(pair.field(1)->data()); - - // create slices and offsets - uint64_t offset = 0; - auto count = 0; - auto size = values.length(); - - auto makeSlice = [&](uint64_t offset_, T count_) { - slices->emplace_back(arrow::Datum{input->Slice(offset_, count_)}); - if (offsets) { - offsets->emplace_back(offset_); - } - if (sizes) { - sizes->emplace_back(count_); - } - }; - - auto makeUnassignedSlice = [&](uint64_t offset_, T count_) { - if (unassignedSlices) { - unassignedSlices->emplace_back(arrow::Datum{input->Slice(offset_, count_)}); - } - if (unassignedOffsets) { - unassignedOffsets->emplace_back(offset_); - } - }; - - auto v = 0; - auto vprev = v; - auto nzeros = 0; - - for (auto i = 0; i < size; ++i) { - count = counts.Value(i); - if (v >= 0) { - vprev = v; - } - v = values.Value(i); - if (v < 0) { - makeUnassignedSlice(offset, count); - offset += count; - continue; - } - nzeros = v - vprev - ((i == 0 || slices->empty() == true) ? 0 : 1); - for (auto z = 0; z < nzeros; ++z) { - makeSlice(offset, 0); - } - makeSlice(offset, count); - offset += count; - } - v = values.Value(size - 1); - if (v >= 0) { - vprev = v; - } - if (vprev < fullSize - 1) { - for (auto v = vprev + 1; v < fullSize; ++v) { - makeSlice(offset, 0); - } - } - - return arrow::Status::OK(); -} + std::vector* unassignedOffsets = nullptr); } // namespace o2::framework #endif // O2_FRAMEWORK_KERNELS_H_ diff --git a/Framework/Core/src/ASoA.cxx b/Framework/Core/src/ASoA.cxx index b15697f80349c..e751daf0853a6 100644 --- a/Framework/Core/src/ASoA.cxx +++ b/Framework/Core/src/ASoA.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "Framework/ASoA.h" +#include "Framework/Kernels.h" #include "ArrowDebugHelpers.h" #include "Framework/RuntimeError.h" #include @@ -109,26 +110,4 @@ arrow::ChunkedArray* getIndexFromLabel(arrow::Table* table, const char* label) return table->column(index[0]).get(); } -arrow::Status getSliceFor(int value, char const* key, std::shared_ptr const& input, std::shared_ptr& output, uint64_t& offset) -{ - arrow::Datum value_counts; - auto options = arrow::compute::ScalarAggregateOptions::Defaults(); - ARROW_ASSIGN_OR_RAISE(value_counts, - arrow::compute::CallFunction("value_counts", {input->GetColumnByName(key)}, - &options)); - auto pair = static_cast(value_counts.array()); - auto values = static_cast>(pair.field(0)->data()); - auto counts = static_cast>(pair.field(1)->data()); - - for (auto slice = 0; slice < values.length(); ++slice) { - if (values.Value(slice) == value) { - output = input->Slice(offset, counts.Value(slice)); - return arrow::Status::OK(); - } - offset += counts.Value(slice); - } - output = input->Slice(offset, 0); - return arrow::Status::OK(); -} - } // namespace o2::soa diff --git a/Framework/Core/src/Kernels.cxx b/Framework/Core/src/Kernels.cxx new file mode 100644 index 0000000000000..1e49228429fde --- /dev/null +++ b/Framework/Core/src/Kernels.cxx @@ -0,0 +1,208 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/Kernels.h" +#include "Framework/BasicOps.h" +#include "Framework/RuntimeError.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace o2::framework +{ +arrow::Status getSlices( + const char* key, + std::shared_ptr const& input, + std::shared_ptr>& values, + std::shared_ptr>& counts) +{ + arrow::Datum value_counts; + auto options = arrow::compute::ScalarAggregateOptions::Defaults(); + ARROW_ASSIGN_OR_RAISE(value_counts, + arrow::compute::CallFunction("value_counts", {input->GetColumnByName(key)}, + &options)); + auto pair = static_cast(value_counts.array()); + values = std::make_shared>(pair.field(0)->data()); + counts = std::make_shared>(pair.field(1)->data()); + return arrow::Status::OK(); +} + +arrow::Status getSliceFor( + int value, + char const* key, + std::shared_ptr const& input, + std::shared_ptr& output, + uint64_t& offset) +{ + std::shared_ptr> values; + std::shared_ptr> counts; + + auto status = getSlices(key, input, values, counts); + + for (auto slice = 0; slice < values->length(); ++slice) { + if (values->Value(slice) == value) { + output = input->Slice(offset, counts->Value(slice)); + return arrow::Status::OK(); + } + offset += counts->Value(slice); + } + output = input->Slice(offset, 0); + return arrow::Status::OK(); +} + +void sliceByColumnGeneric( + char const* key, + char const* target, + std::shared_ptr const& input, + int32_t fullSize, + ListVector* groups, + ListVector* unassigned) +{ + groups->resize(fullSize); + auto column = input->GetColumnByName(key); + int32_t row = 0; + for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) { + auto chunk = static_cast>(column->chunk(iChunk)->data()); + for (auto iElement = 0; iElement < chunk.length(); ++iElement) { + auto v = chunk.Value(iElement); + if (v >= 0) { + if (v >= groups->size()) { + throw runtime_error_f("Table %s has an entry with index (%d) that is larger than the grouping table size (%d)", target, v, fullSize); + } + (*groups)[v].push_back(row); + } else if (unassigned != nullptr) { + auto av = std::abs(v); + if (unassigned->size() < av + 1) { + unassigned->resize(av + 1); + } + (*unassigned)[av].push_back(row); + } + ++row; + } + } +} + +arrow::Status sliceByColumn( + char const* key, + char const* target, + std::shared_ptr const& input, + int32_t fullSize, + std::vector* slices, + std::vector* offsets, + std::vector* sizes, + std::vector* unassignedSlices, + std::vector* unassignedOffsets) +{ + arrow::Datum value_counts; + auto column = input->GetColumnByName(key); + for (auto i = 0; i < column->num_chunks(); ++i) { + int32_t prev = 0; + int32_t cur = 0; + int32_t lastNeg = 0; + int32_t lastPos = 0; + + auto array = static_cast>(column->chunk(i)->data()); + for (auto e = 0; e < array.length(); ++e) { + prev = cur; + if (prev >= 0) { + lastPos = prev; + } else { + lastNeg = prev; + } + cur = array.Value(e); + if (cur >= 0) { + if (lastPos > cur) { + throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target, key, cur, lastPos); + } + if (lastPos == cur && prev < 0) { + throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); + } + } else { + if (lastNeg < cur) { + throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target, key, cur, lastNeg); + } + if (lastNeg == cur && prev >= 0) { + throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); + } + } + } + } + std::shared_ptr> values; + std::shared_ptr> counts; + + auto status = getSlices(key, input, values, counts); + + // create slices and offsets + uint64_t offset = 0; + int64_t count = 0; + auto size = values->length(); + + auto makeSlice = [&](uint64_t offset_, int64_t count_) { + slices->emplace_back(arrow::Datum{input->Slice(offset_, count_)}); + if (offsets) { + offsets->emplace_back(offset_); + } + if (sizes) { + sizes->emplace_back(count_); + } + }; + + auto makeUnassignedSlice = [&](uint64_t offset_, int64_t count_) { + if (unassignedSlices) { + unassignedSlices->emplace_back(arrow::Datum{input->Slice(offset_, count_)}); + } + if (unassignedOffsets) { + unassignedOffsets->emplace_back(offset_); + } + }; + + auto v = 0; + auto vprev = v; + auto nzeros = 0; + + for (auto i = 0; i < size; ++i) { + count = counts->Value(i); + if (v >= 0) { + vprev = v; + } + v = values->Value(i); + if (v < 0) { + makeUnassignedSlice(offset, count); + offset += count; + continue; + } + nzeros = v - vprev - ((i == 0 || slices->empty() == true) ? 0 : 1); + for (auto z = 0; z < nzeros; ++z) { + makeSlice(offset, 0); + } + makeSlice(offset, count); + offset += count; + } + v = values->Value(size - 1); + if (v >= 0) { + vprev = v; + } + if (vprev < fullSize - 1) { + for (auto v = vprev + 1; v < fullSize; ++v) { + makeSlice(offset, 0); + } + } + + return arrow::Status::OK(); +} +} // namespace o2::framework diff --git a/Framework/Core/test/test_Kernels.cxx b/Framework/Core/test/test_Kernels.cxx index ae75fae29b6d3..689888770f8a7 100644 --- a/Framework/Core/test/test_Kernels.cxx +++ b/Framework/Core/test/test_Kernels.cxx @@ -71,7 +71,7 @@ BOOST_AUTO_TEST_CASE(TestSlicingFramework) std::vector offsets; std::vector slices; - auto status = sliceByColumn("x", "xy", table, 12, &slices, &offsets); + auto status = sliceByColumn("x", "xy", table, 12, &slices, &offsets); BOOST_REQUIRE(status.ok()); BOOST_REQUIRE_EQUAL(slices.size(), 12); std::array sizes{0, 4, 1, 0, 1, 2, 0, 0, 0, 0, 0, 0}; @@ -98,7 +98,7 @@ BOOST_AUTO_TEST_CASE(TestSlicingException) std::vector offsets; std::vector slices; try { - auto status = sliceByColumn("x", "xy", table, 12, &slices, &offsets); + auto status = sliceByColumn("x", "xy", table, 12, &slices, &offsets); } catch (RuntimeErrorRef re) { BOOST_REQUIRE_EQUAL(std::string{error_from_ref(re).what}, "Table xy index x is not sorted: next value 4 < previous value 5!"); return; From 063e799bfc4bd0986b2c38eaec676b7f9e124378 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 26 Jan 2022 10:30:09 +0100 Subject: [PATCH 2/3] fix erroneous unassigned sortedness check --- Framework/Core/src/Kernels.cxx | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Framework/Core/src/Kernels.cxx b/Framework/Core/src/Kernels.cxx index 1e49228429fde..c80631821003f 100644 --- a/Framework/Core/src/Kernels.cxx +++ b/Framework/Core/src/Kernels.cxx @@ -115,7 +115,7 @@ arrow::Status sliceByColumn( int32_t cur = 0; int32_t lastNeg = 0; int32_t lastPos = 0; - + bool switchedGroup = false; auto array = static_cast>(column->chunk(i)->data()); for (auto e = 0; e < array.length(); ++e) { prev = cur; @@ -129,17 +129,22 @@ arrow::Status sliceByColumn( if (lastPos > cur) { throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target, key, cur, lastPos); } - if (lastPos == cur && prev < 0) { + if (switchedGroup && lastPos == cur && prev < 0) { throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); } } else { if (lastNeg < cur) { throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target, key, cur, lastNeg); } - if (lastNeg == cur && prev >= 0) { + if (switchedGroup && lastNeg == cur && prev >= 0) { throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); } } + if (cur != prev) { + switchedGroup = true; + } else { + switchedGroup = false; + } } } std::shared_ptr> values; From cecef3381e86cd7761466992dc33700393e787bc Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 26 Jan 2022 12:53:57 +0100 Subject: [PATCH 3/3] fixup! fix erroneous unassigned sortedness check --- Framework/Core/src/Kernels.cxx | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/Framework/Core/src/Kernels.cxx b/Framework/Core/src/Kernels.cxx index c80631821003f..526c04a318789 100644 --- a/Framework/Core/src/Kernels.cxx +++ b/Framework/Core/src/Kernels.cxx @@ -110,12 +110,12 @@ arrow::Status sliceByColumn( { arrow::Datum value_counts; auto column = input->GetColumnByName(key); + auto array0 = static_cast>(column->chunk(0)->data()); + int32_t prev = 0; + int32_t cur = array0.Value(0); + int32_t lastNeg = cur < 0 ? cur : 0; + int32_t lastPos = cur < 0 ? -1 : cur; for (auto i = 0; i < column->num_chunks(); ++i) { - int32_t prev = 0; - int32_t cur = 0; - int32_t lastNeg = 0; - int32_t lastPos = 0; - bool switchedGroup = false; auto array = static_cast>(column->chunk(i)->data()); for (auto e = 0; e < array.length(); ++e) { prev = cur; @@ -129,22 +129,17 @@ arrow::Status sliceByColumn( if (lastPos > cur) { throw runtime_error_f("Table %s index %s is not sorted: next value %d < previous value %d!", target, key, cur, lastPos); } - if (switchedGroup && lastPos == cur && prev < 0) { + if (lastPos == cur && prev < 0) { throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); } } else { if (lastNeg < cur) { throw runtime_error_f("Table %s index %s is not sorted: next negative value %d > previous negative value %d!", target, key, cur, lastNeg); } - if (switchedGroup && lastNeg == cur && prev >= 0) { + if (lastNeg == cur && prev >= 0) { throw runtime_error_f("Table %s index %s has a group with index %d that is split by %d", target, key, cur, prev); } } - if (cur != prev) { - switchedGroup = true; - } else { - switchedGroup = false; - } } } std::shared_ptr> values;