diff --git a/Framework/Core/src/TableTreeHelpers.cxx b/Framework/Core/src/TableTreeHelpers.cxx index bbfcadaa31065..187d80b0e2e06 100644 --- a/Framework/Core/src/TableTreeHelpers.cxx +++ b/Framework/Core/src/TableTreeHelpers.cxx @@ -196,7 +196,7 @@ std::pair, std::shared_ptr> B while (readEntries < totalEntries) { auto readLast = mBranch->GetBulkRead().GetBulkEntries(readEntries, *buffer); readEntries += readLast; - status &= static_cast(mValueBuilder)->AppendValues(reinterpret_cast(buffer->GetCurrent()), readLast * mListSize); + status &= static_cast(mValueBuilder)->AppendValues(reinterpret_cast(buffer->GetCurrent()), (int64_t)readLast * (int64_t)mListSize); } if (mListSize > 1) { status &= static_cast(mListBuilder.get())->AppendValues(readEntries); @@ -214,73 +214,209 @@ std::pair, std::shared_ptr> B } } else { // other types: use serialized read to build arrays directly - auto&& result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool); - if (!result.ok()) { - throw runtime_error("Cannot allocate values buffer"); - } - std::shared_ptr arrowValuesBuffer = std::move(result).ValueUnsafe(); - auto ptr = arrowValuesBuffer->mutable_data(); - if (ptr == nullptr) { - throw runtime_error("Invalid buffer"); - } + if (mVLA && totalEntries > 616) { + // special case workaround + auto status = arrow::MakeBuilder(mPool, mArrowType->field(0)->type(), &mBuilder); + if (!status.ok()) { + throw runtime_error("Failed to create value builder"); + } + mListBuilder = std::make_unique(mPool, std::move(mBuilder)); + mValueBuilder = static_cast(mListBuilder.get())->value_builder(); + void* ptr = nullptr; + + switch (mType) { + case EDataType::kUChar_t: + ptr = new uint8_t[255]; + break; + case EDataType::kUShort_t: + ptr = new uint16_t[255]; + break; + case EDataType::kUInt_t: + ptr = new uint32_t[255]; + break; + case EDataType::kULong64_t: + ptr = new uint64_t[255]; + break; + case EDataType::kChar_t: + ptr = new int8_t[255]; + break; + case EDataType::kShort_t: + ptr = new int16_t[255]; + break; + case EDataType::kInt_t: + ptr = new int32_t[255]; + break; + case EDataType::kLong64_t: + ptr = new int64_t[255]; + break; + case EDataType::kFloat_t: + ptr = new float[255]; + break; + case EDataType::kDouble_t: + ptr = new double[255]; + break; + default: + throw runtime_error("Unsupported branch type"); + } - auto typeSize = TDataType::GetDataType(mType)->Size(); - std::unique_ptr offsetBuffer; - - uint32_t offset = 0; - uint32_t lastOffset; - int count = 0; - std::shared_ptr arrowOffsetBuffer; - gsl::span offsets; - int size = 0; - uint32_t totalSize = 0; - if (mVLA) { - offsetBuffer.reset(new TBufferFile{TBuffer::EMode::kWrite, 4 * 1024 * 1024}); - result = arrow::AllocateResizableBuffer((totalEntries + 1) * sizeof(int), mPool); + int sz; + auto* mSizeBranch = mBranch->GetTree()->GetBranch((std::string{mBranch->GetName()} + TableTreeHelpers::sizeBranchSuffix).c_str()); + mSizeBranch->SetAddress(&sz); + mBranch->SetAddress(ptr); + std::vector offsets; + + offsets.push_back(0); + for (auto entry = 0; entry < totalEntries; ++entry) { + mBranch->GetEntry(entry); + mSizeBranch->GetEntry(entry); + offsets.push_back(sz + offsets.back()); + arrow::Status status; + switch (mType) { + case EDataType::kUChar_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kUShort_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kUInt_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kULong64_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kChar_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kShort_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kInt_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kLong64_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kFloat_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + case EDataType::kDouble_t: + status = static_cast(mValueBuilder)->AppendValues(reinterpret_cast(ptr), sz); + break; + default: + throw runtime_error("Unsupported branch type"); + } + } + status &= static_cast(mListBuilder.get())->AppendValues(offsets.data(), totalEntries); + status &= static_cast(mListBuilder.get())->Finish(&array); + + mSizeBranch->SetStatus(false); + mSizeBranch->DropBaskets("all"); + mSizeBranch->Reset(); + mSizeBranch->GetTransientBuffer(0)->Expand(0); + + switch (mType) { + case EDataType::kUChar_t: + delete[] static_cast(ptr); + break; + case EDataType::kUShort_t: + delete[] static_cast(ptr); + break; + case EDataType::kUInt_t: + delete[] static_cast(ptr); + break; + case EDataType::kULong64_t: + delete[] static_cast(ptr); + break; + case EDataType::kChar_t: + delete[] static_cast(ptr); + break; + case EDataType::kShort_t: + delete[] static_cast(ptr); + break; + case EDataType::kInt_t: + delete[] static_cast(ptr); + break; + case EDataType::kLong64_t: + delete[] static_cast(ptr); + break; + case EDataType::kFloat_t: + delete[] static_cast(ptr); + break; + case EDataType::kDouble_t: + delete[] static_cast(ptr); + break; + default: + throw runtime_error("Unsupported branch type"); + } + } else { + auto&& result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool); if (!result.ok()) { - throw runtime_error("Cannot allocate offset buffer"); + throw runtime_error("Cannot allocate values buffer"); + } + std::shared_ptr arrowValuesBuffer = std::move(result).ValueUnsafe(); + auto ptr = arrowValuesBuffer->mutable_data(); + if (ptr == nullptr) { + throw runtime_error("Invalid buffer"); } - arrowOffsetBuffer = std::move(result).ValueUnsafe(); - unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); - auto* tPtrOffset = reinterpret_cast(ptrOffset); - offsets = gsl::span{tPtrOffset, tPtrOffset + totalEntries + 1}; - } - while (readEntries < totalEntries) { - auto readLast = mBranch->GetBulkRead().GetEntriesSerialized(readEntries, *buffer, offsetBuffer.get()); - readEntries += readLast; + auto typeSize = TDataType::GetDataType(mType)->Size(); + std::unique_ptr offsetBuffer; + uint32_t offset = 0; + uint32_t lastOffset; + int count = 0; + std::shared_ptr arrowOffsetBuffer; + gsl::span offsets; + int size = 0; + uint32_t totalSize = 0; if (mVLA) { - lastOffset = offset; - for (auto i = 0; i < readLast; ++i) { - offsets[count++] = (int)offset; - offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); + offsetBuffer = std::make_unique(TBuffer::EMode::kWrite, 4 * 1024 * 1024); + result = arrow::AllocateResizableBuffer((int64_t)(sizeof(int) * (totalEntries + 1)), mPool); + if (!result.ok()) { + throw runtime_error("Cannot allocate offset buffer"); + } + arrowOffsetBuffer = std::move(result).ValueUnsafe(); + unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data(); + auto* tPtrOffset = reinterpret_cast(ptrOffset); + offsets = gsl::span{tPtrOffset, tPtrOffset + totalEntries + 1}; + } + + while (readEntries < totalEntries) { + auto readLast = mBranch->GetBulkRead().GetEntriesSerialized(readEntries, *buffer, offsetBuffer.get()); + readEntries += readLast; + + if (mVLA) { + lastOffset = offset; + for (auto i = 0; i < readLast; ++i) { + offsets[count++] = (int)offset; + offset += swap32_(reinterpret_cast(offsetBuffer->GetCurrent())[i]); + } + size = (int)(offset - lastOffset); + } else { + size = readLast * mListSize; } - size = offset - lastOffset; + swapCopy(ptr, buffer->GetCurrent(), size, typeSize); + ptr += (ptrdiff_t)(size * typeSize); + } + if (mVLA) { + offsets[count] = (int)offset; + totalSize = offset; } else { - size = readLast * mListSize; + totalSize = readEntries * mListSize; + } + std::shared_ptr varray; + switch (mListSize) { + case -1: + varray = std::make_shared(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer); + array = std::make_shared(mArrowType, readEntries, arrowOffsetBuffer, varray); + break; + case 1: + array = std::make_shared(mArrowType, readEntries, arrowValuesBuffer); + break; + default: + varray = std::make_shared(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer); + array = std::make_shared(mArrowType, readEntries, varray); } - swapCopy(ptr, buffer->GetCurrent(), size, typeSize); - ptr += size * typeSize; - } - if (mVLA) { - offsets[count] = offset; - totalSize = offset; - } else { - totalSize = readEntries * mListSize; - } - std::shared_ptr varray; - switch (mListSize) { - case -1: - varray = std::make_shared(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer); - array = std::make_shared(mArrowType, readEntries, arrowOffsetBuffer, varray); - break; - case 1: - array = std::make_shared(mArrowType, readEntries, arrowValuesBuffer); - break; - default: - varray = std::make_shared(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer); - array = std::make_shared(mArrowType, readEntries, varray); } }