From d276c5f9e95ecc89c10d2ba77f7178593072aede Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Wed, 15 Apr 2026 18:10:01 +0800 Subject: [PATCH 1/3] feat(rest): parse storage-credentials from LoadTableResponse --- src/iceberg/catalog/rest/json_serde.cc | 34 +++++++++++++++ src/iceberg/catalog/rest/types.cc | 3 +- src/iceberg/catalog/rest/types.h | 13 +++++- src/iceberg/test/rest_json_serde_test.cc | 53 ++++++++++++++++++++++-- 4 files changed, 98 insertions(+), 5 deletions(-) diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index ac80e9f08..c5927895f 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -71,6 +71,8 @@ constexpr std::string_view kSource = "source"; constexpr std::string_view kDestination = "destination"; constexpr std::string_view kMetadata = "metadata"; constexpr std::string_view kConfig = "config"; +constexpr std::string_view kStorageCredentials = "storage-credentials"; +constexpr std::string_view kPrefix = "prefix"; constexpr std::string_view kIdentifiers = "identifiers"; constexpr std::string_view kOverrides = "overrides"; constexpr std::string_view kDefaults = "defaults"; @@ -695,6 +697,17 @@ nlohmann::json ToJson(const LoadTableResult& result) { SetOptionalStringField(json, kMetadataLocation, result.metadata_location); json[kMetadata] = ToJson(*result.metadata); SetContainerField(json, kConfig, result.config); + if (!result.storage_credentials.empty()) { + nlohmann::json creds = nlohmann::json::array(); + for (const auto& cred : result.storage_credentials) { + nlohmann::json entry; + entry[kPrefix] = cred.prefix; + // config is required, so always write it (matches Java). + entry[kConfig] = cred.config; + creds.push_back(std::move(entry)); + } + json[kStorageCredentials] = std::move(creds); + } return json; } @@ -707,6 +720,27 @@ Result LoadTableResultFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json)); ICEBERG_ASSIGN_OR_RAISE(result.config, GetJsonValueOrDefault(json, kConfig)); + if (auto it = json.find(kStorageCredentials); it != json.end() && !it->is_null()) { + if (!it->is_array()) { + return JsonParseError("Cannot parse storage credentials from non-array: {}", + SafeDumpJson(*it)); + } + for (const auto& entry : *it) { + StorageCredential cred; + ICEBERG_ASSIGN_OR_RAISE(cred.prefix, GetJsonValue(entry, kPrefix)); + ICEBERG_ASSIGN_OR_RAISE(cred.config, + GetJsonValue(entry, kConfig)); + // prefix and config are required by the REST spec; non-empty matches the + // Java reference implementation (Credential.validate()). + if (cred.prefix.empty()) { + return JsonParseError("Invalid storage credential: prefix must be non-empty"); + } + if (cred.config.empty()) { + return JsonParseError("Invalid storage credential: config must be non-empty"); + } + result.storage_credentials.push_back(std::move(cred)); + } + } ICEBERG_RETURN_UNEXPECTED(result.Validate()); return result; } diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 8d96bccb2..84fba9a7c 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -86,7 +86,8 @@ bool CreateTableRequest::operator==(const CreateTableRequest& other) const { } bool LoadTableResult::operator==(const LoadTableResult& other) const { - if (metadata_location != other.metadata_location || config != other.config) { + if (metadata_location != other.metadata_location || config != other.config || + storage_credentials != other.storage_credentials) { return false; } diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 7849b366b..0403fc22c 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -180,12 +180,23 @@ struct ICEBERG_REST_EXPORT CreateTableRequest { /// \brief An opaque token that allows clients to make use of pagination for list APIs. using PageToken = std::string; +/// \brief A short-lived credential vended by a REST catalog for a storage +/// location ``prefix`` (clients pick the longest matching prefix); ``config`` +/// holds backend properties such as ``"s3.access-key-id"`` (Iceberg REST spec). +struct ICEBERG_REST_EXPORT StorageCredential { + std::string prefix; + std::unordered_map config; + + bool operator==(const StorageCredential& other) const = default; +}; + /// \brief Result body for table create/load/register APIs. struct ICEBERG_REST_EXPORT LoadTableResult { std::string metadata_location; std::shared_ptr metadata; // required std::unordered_map config; - // TODO(Li Feiyang): Add std::shared_ptr storage_credential; + /// \brief Vended storage credentials, one per URI prefix; empty if none. + std::vector storage_credentials; /// \brief Validates the LoadTableResult. Status Validate() const { diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 7304831c6..51bce4f51 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -82,6 +82,11 @@ static std::shared_ptr MakeSimpleTableMetadata() { }); } +std::string LoadTableJsonWithCredentials(std::string_view storage_credentials) { + return std::string(R"({"storage-credentials":)") + std::string(storage_credentials) + + R"(,"metadata":{"format-version":2,"table-uuid":"test","location":"s3://test","last-sequence-number":0,"last-column-id":1,"last-updated-ms":0,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0}})"; +} + // Test parameter structure for roundtrip tests template struct JsonRoundTripParam { @@ -1116,7 +1121,17 @@ INSTANTIATE_TEST_SUITE_P( .model = {.metadata_location = "s3://bucket/metadata/v1.json", .metadata = MakeSimpleTableMetadata(), .config = {{"warehouse", "s3://bucket/warehouse"}, - {"foo", "bar"}}}}), + {"foo", "bar"}}}}, + LoadTableResultParam{ + .test_name = "WithStorageCredentials", + .expected_json_str = + R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot-log":[],"snapshots":[],"sort-orders":[{"fields":[],"order-id":0}],"statistics":[],"table-uuid":"test-uuid-1234"},"storage-credentials":[{"config":{"s3.access-key-id":"AKIAtest","s3.region":"us-east-1","s3.secret-access-key":"secret"},"prefix":"s3"}]})", + .model = + {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = {{.prefix = "s3", + .config = {{"s3.access-key-id", "AKIAtest"}, + {"s3.secret-access-key", "secret"}, + {"s3.region", "us-east-1"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1145,7 +1160,18 @@ INSTANTIATE_TEST_SUITE_P( .json_str = R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"config":{"warehouse":"s3://bucket/warehouse"}})", .expected_model = {.metadata = MakeSimpleTableMetadata(), - .config = {{"warehouse", "s3://bucket/warehouse"}}}}), + .config = {{"warehouse", "s3://bucket/warehouse"}}}}, + LoadTableResultDeserializeParam{ + .test_name = "WithStorageCredentials", + .json_str = + R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"storage-credentials":[{"prefix":"s3","config":{"s3.access-key-id":"AKIAtest","s3.secret-access-key":"secret","s3.session-token":"token","s3.region":"us-east-1"}}]})", + .expected_model = + {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = {{.prefix = "s3", + .config = {{"s3.access-key-id", "AKIAtest"}, + {"s3.secret-access-key", "secret"}, + {"s3.session-token", "token"}, + {"s3.region", "us-east-1"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1184,7 +1210,28 @@ INSTANTIATE_TEST_SUITE_P( LoadTableResultInvalidParam{ .test_name = "InvalidMetadataContent", .invalid_json_str = R"({"metadata":{"format-version":"invalid"}})", - .expected_error_message = "type must be number, but is string"}), + .expected_error_message = "type must be number, but is string"}, + LoadTableResultInvalidParam{ + .test_name = "StorageCredentialsNotArray", + .invalid_json_str = LoadTableJsonWithCredentials(R"("oops")"), + .expected_error_message = "Cannot parse storage credentials from non-array"}, + LoadTableResultInvalidParam{ + .test_name = "StorageCredentialMissingPrefix", + .invalid_json_str = LoadTableJsonWithCredentials(R"([{"config":{"k":"v"}}])"), + .expected_error_message = "Missing 'prefix'"}, + LoadTableResultInvalidParam{ + .test_name = "StorageCredentialMissingConfig", + .invalid_json_str = LoadTableJsonWithCredentials(R"([{"prefix":"s3"}])"), + .expected_error_message = "Missing 'config'"}, + LoadTableResultInvalidParam{.test_name = "StorageCredentialEmptyPrefix", + .invalid_json_str = LoadTableJsonWithCredentials( + R"([{"prefix":"","config":{"k":"v"}}])"), + .expected_error_message = "prefix must be non-empty"}, + LoadTableResultInvalidParam{ + .test_name = "StorageCredentialEmptyConfig", + .invalid_json_str = + LoadTableJsonWithCredentials(R"([{"prefix":"s3","config":{}}])"), + .expected_error_message = "config must be non-empty"}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); From 43838fd24bb1bd97032133ac6ccde46bc7be7cf5 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Mon, 15 Jun 2026 18:30:33 +0800 Subject: [PATCH 2/3] feat(rest): bind a per-table FileIO from vended credentials --- src/iceberg/arrow/s3/arrow_s3_file_io.cc | 27 +++++++--- src/iceberg/arrow/s3/arrow_s3_internal.h | 44 ++++++++++++++++ src/iceberg/arrow/s3/s3_properties.h | 4 +- src/iceberg/catalog/rest/rest_catalog.cc | 33 ++++++++++-- src/iceberg/catalog/rest/rest_catalog.h | 3 +- src/iceberg/catalog/rest/rest_file_io.cc | 14 ++++++ src/iceberg/catalog/rest/rest_file_io.h | 7 +++ src/iceberg/catalog/rest/type_fwd.h | 1 + src/iceberg/test/arrow_s3_file_io_test.cc | 61 +++++++++++++++++++++++ src/iceberg/test/rest_file_io_test.cc | 32 ++++++++++++ 10 files changed, 214 insertions(+), 12 deletions(-) create mode 100644 src/iceberg/arrow/s3/arrow_s3_internal.h diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc b/src/iceberg/arrow/s3/arrow_s3_file_io.cc index cffd95840..36858f76d 100644 --- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc +++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc @@ -30,6 +30,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_io_util.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3/arrow_s3_internal.h" #include "iceberg/arrow/s3/s3_properties.h" #include "iceberg/util/macros.h" #include "iceberg/util/string_util.h" @@ -74,6 +75,12 @@ Status EnsureS3Initialized() { return {}; } +#endif + +} // namespace + +#if ICEBERG_S3_ENABLED + /// \brief Configure S3Options from a properties map. /// /// \param properties The configuration properties map. @@ -100,15 +107,25 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( } // Configure region - if (const auto* region = FindProperty(properties, S3Properties::kRegion); - region != nullptr) { + // Prefer the standard `client.region`; fall back to legacy `s3.region`. + const auto* region = FindProperty(properties, S3Properties::kClientRegion); + if (region == nullptr) { + region = FindProperty(properties, S3Properties::kRegion); + } + if (region != nullptr) { options.region = *region; } - // Configure endpoint (for MinIO, LocalStack, etc.) + // Configure endpoint (for MinIO, LocalStack, OSS, etc.) if (const auto* endpoint = FindProperty(properties, S3Properties::kEndpoint); endpoint != nullptr) { - options.endpoint_override = *endpoint; + // `s3.endpoint` may be a full URI; Arrow wants host[:port], so strip the scheme. + std::string_view ep = *endpoint; + if (const auto pos = ep.find("://"); pos != std::string_view::npos) { + options.scheme = std::string(ep.substr(0, pos)); + ep = ep.substr(pos + 3); + } + options.endpoint_override = std::string(ep); } else { // Fall back to AWS standard environment variables for endpoint override const char* s3_endpoint_env = std::getenv("AWS_ENDPOINT_URL_S3"); @@ -154,8 +171,6 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( } #endif -} // namespace - Result> MakeS3FileIO( const std::unordered_map& properties) { #if ICEBERG_S3_ENABLED diff --git a/src/iceberg/arrow/s3/arrow_s3_internal.h b/src/iceberg/arrow/s3/arrow_s3_internal.h new file mode 100644 index 000000000..c37663a62 --- /dev/null +++ b/src/iceberg/arrow/s3/arrow_s3_internal.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_bundle_export.h" +#include "iceberg/result.h" + +#if ICEBERG_S3_ENABLED +# include +#endif + +namespace iceberg::arrow { + +#if ICEBERG_S3_ENABLED +/// \brief Build Arrow ``S3Options`` from an Iceberg properties map. +/// +/// Production code should use MakeS3FileIO(); this is exposed so the +/// property-to-option mapping (region resolution, endpoint scheme handling, +/// addressing style) can be unit tested without a live S3 endpoint. +ICEBERG_BUNDLE_EXPORT Result<::arrow::fs::S3Options> ConfigureS3Options( + const std::unordered_map& properties); +#endif + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/s3/s3_properties.h b/src/iceberg/arrow/s3/s3_properties.h index 53657743d..61248948b 100644 --- a/src/iceberg/arrow/s3/s3_properties.h +++ b/src/iceberg/arrow/s3/s3_properties.h @@ -37,8 +37,10 @@ struct S3Properties { static constexpr std::string_view kSecretAccessKey = "s3.secret-access-key"; /// AWS session token (for temporary credentials) static constexpr std::string_view kSessionToken = "s3.session-token"; - /// AWS region + /// AWS region (legacy, non-standard key kept for compatibility) static constexpr std::string_view kRegion = "s3.region"; + /// AWS region, standard Iceberg client property (preferred over kRegion). + static constexpr std::string_view kClientRegion = "client.region"; /// Custom endpoint override (for MinIO, LocalStack, etc.) static constexpr std::string_view kEndpoint = "s3.endpoint"; /// Whether to use path-style access (needed for MinIO) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 455c4d744..bb34e62c7 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -41,6 +41,7 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/file_io_registry.h" #include "iceberg/json_serde_internal.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" @@ -516,7 +517,26 @@ Result> RestCatalog::TableAuthSession( Result> RestCatalog::TableFileIO( const SessionContext& /*context*/, - const std::unordered_map& table_config) const { + const std::unordered_map& table_config, + const std::vector& storage_credentials) const { + // Longest-prefix S3-family vended credential, matching the Java client's + // VendedCredentialsProvider. When present, build a per-table S3 FileIO from + // catalog + table config + the credential; its ResolvePath resolves oss:// and + // other S3-compatible schemes. + if (const StorageCredential* s3_cred = SelectS3StorageCredential(storage_credentials); + s3_cred != nullptr) { + auto properties = config_.configs(); + for (const auto& [key, value] : table_config) { + properties[key] = value; + } + for (const auto& [key, value] : s3_cred->config) { + properties[key] = value; + } + ICEBERG_ASSIGN_OR_RAISE( + auto io, + FileIORegistry::Load(std::string(FileIORegistry::kArrowS3FileIO), properties)); + return std::shared_ptr(std::move(io)); + } ICEBERG_RETURN_UNEXPECTED(ValidateNoFileIOConfig(table_config)); return file_io_; } @@ -746,7 +766,9 @@ Result> RestCatalog::StageCreateTable( CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/true, *contextual_session)); auto table_config = std::move(result.config); - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + auto storage_credentials = std::move(result.storage_credentials); + ICEBERG_ASSIGN_OR_RAISE(auto table_io, + TableFileIO(context, table_config, storage_credentials)); ICEBERG_ASSIGN_OR_RAISE( auto table_session, TableAuthSession(identifier, table_config, std::move(contextual_session))); @@ -859,7 +881,9 @@ Result> RestCatalog::MakeTableFromLoadResult( const SessionContext& context, std::shared_ptr contextual_session) { auto table_config = std::move(result.config); - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + auto storage_credentials = std::move(result.storage_credentials); + ICEBERG_ASSIGN_OR_RAISE(auto table_io, + TableFileIO(context, table_config, storage_credentials)); ICEBERG_ASSIGN_OR_RAISE( auto table_session, TableAuthSession(identifier, table_config, std::move(contextual_session))); @@ -878,7 +902,8 @@ Result> RestCatalog::MakeTableFromCommitResponse( // TODO(gangwu): If the REST commit response grows table config or // storage credentials, derive a replacement table session/FileIO from that // response. The current table commit response does not define config. - ICEBERG_ASSIGN_OR_RAISE(auto table_io, TableFileIO(context, table_config)); + ICEBERG_ASSIGN_OR_RAISE(auto table_io, + TableFileIO(context, table_config, /*storage_credentials=*/{})); auto table_catalog = std::make_shared( shared_from_this(), context, identifier, table_config, table_session); return Table::Make(identifier, std::move(response.metadata), diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index e693ceba3..62d8c543f 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -79,7 +79,8 @@ class ICEBERG_REST_EXPORT RestCatalog final Result> TableFileIO( const SessionContext& context, - const std::unordered_map& table_config) const; + const std::unordered_map& table_config, + const std::vector& storage_credentials) const; Result> ListNamespaces(const Namespace& ns, auth::AuthSession& session) const; diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index 5fadca1ac..a12561feb 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -20,7 +20,9 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include +#include +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io_registry.h" #include "iceberg/util/macros.h" @@ -92,4 +94,16 @@ Result> MakeCatalogFileIO(const RestCatalogProperties& c return FileIORegistry::Load(io_impl, config.configs()); } +const StorageCredential* SelectS3StorageCredential( + const std::vector& credentials) { + const StorageCredential* best = nullptr; + for (const auto& cred : credentials) { + if (cred.prefix.starts_with("s3") && + (best == nullptr || cred.prefix.size() > best->prefix.size())) { + best = &cred; + } + } + return best; +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h index 68482521a..5f0fb9fc2 100644 --- a/src/iceberg/catalog/rest/rest_file_io.h +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -22,9 +22,11 @@ #include #include #include +#include #include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io.h" #include "iceberg/file_io_registry.h" #include "iceberg/result.h" @@ -44,4 +46,9 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind); ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( const RestCatalogProperties& config); +/// \brief Returns the longest-prefix S3-family vended credential (prefix +/// starting with "s3"), or nullptr if none. +ICEBERG_REST_EXPORT const StorageCredential* SelectS3StorageCredential( + const std::vector& credentials); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/type_fwd.h b/src/iceberg/catalog/rest/type_fwd.h index ee684b245..783f22750 100644 --- a/src/iceberg/catalog/rest/type_fwd.h +++ b/src/iceberg/catalog/rest/type_fwd.h @@ -28,6 +28,7 @@ struct ErrorResponse; struct CommitTableResponse; struct LoadTableResult; struct OAuthTokenResponse; +struct StorageCredential; class Endpoint; class ErrorHandler; diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc index b1caff1e8..e90325e0b 100644 --- a/src/iceberg/test/arrow_s3_file_io_test.cc +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -27,6 +27,7 @@ #include #include "iceberg/arrow/arrow_io_util.h" +#include "iceberg/arrow/s3/arrow_s3_internal.h" #include "iceberg/arrow/s3/s3_properties.h" #include "iceberg/test/matchers.h" @@ -76,6 +77,11 @@ namespace { class ArrowS3FileIOTest : public ::testing::Test { protected: + static void SetUpTestSuite() { + auto io = MakeS3FileIO({}); + ASSERT_THAT(io, IsOk()); + } + static void TearDownTestSuite() { auto status = FinalizeS3(); if (!status.has_value()) { @@ -181,4 +187,59 @@ TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) { ASSERT_THAT(io_res, IsOk()); } +#if ICEBERG_S3_ENABLED +TEST_F(ArrowS3FileIOTest, ConfigureS3OptionsPrefersClientRegionOverS3Region) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kClientRegion), "cn-hangzhou"}, + {std::string(S3Properties::kRegion), "us-east-1"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->region, "cn-hangzhou"); +} + +TEST_F(ArrowS3FileIOTest, ConfigureS3OptionsFallsBackToS3Region) { + auto result = ConfigureS3Options({{std::string(S3Properties::kRegion), "us-east-1"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->region, "us-east-1"); +} + +TEST_F(ArrowS3FileIOTest, ConfigureS3OptionsStripsHttpsEndpointScheme) { + auto result = ConfigureS3Options({{std::string(S3Properties::kEndpoint), + "https://oss-cn-hangzhou.aliyuncs.com:443"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->endpoint_override, "oss-cn-hangzhou.aliyuncs.com:443"); + EXPECT_EQ(result->scheme, "https"); +} + +TEST_F(ArrowS3FileIOTest, ConfigureS3OptionsStripsHttpEndpointScheme) { + auto result = ConfigureS3Options( + {{std::string(S3Properties::kEndpoint), "http://localhost:9000"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->endpoint_override, "localhost:9000"); + EXPECT_EQ(result->scheme, "http"); +} + +TEST_F(ArrowS3FileIOTest, ConfigureS3OptionsKeepsSchemelessEndpoint) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kEndpoint), "localhost:9000"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->endpoint_override, "localhost:9000"); +} + +TEST_F(ArrowS3FileIOTest, + ConfigureS3OptionsPathStyleAccessFalseEnablesVirtualAddressing) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kPathStyleAccess), "false"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result->force_virtual_addressing); +} + +TEST_F(ArrowS3FileIOTest, + ConfigureS3OptionsPathStyleAccessTrueDisablesVirtualAddressing) { + auto result = + ConfigureS3Options({{std::string(S3Properties::kPathStyleAccess), "true"}}); + ASSERT_THAT(result, IsOk()); + EXPECT_FALSE(result->force_virtual_addressing); +} +#endif + } // namespace iceberg::arrow diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index b1193d9f8..50d33e383 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -19,9 +19,12 @@ #include "iceberg/catalog/rest/rest_file_io.h" +#include + #include #include +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io_registry.h" #include "iceberg/test/matchers.h" @@ -147,4 +150,33 @@ TEST(RestFileIOTest, MakeCatalogFileIOSkipsCheckWhenWarehouseAbsent) { ASSERT_THAT(result, IsOk()); } +TEST(RestFileIOTest, SelectS3StorageCredentialPicksLongestS3Prefix) { + std::vector credentials = { + {.prefix = "s3", .config = {{"s3.access-key-id", "a"}}}, + {.prefix = "s3://bucket/data", .config = {{"s3.access-key-id", "b"}}}, + {.prefix = "s3://bucket", .config = {{"s3.access-key-id", "c"}}}, + }; + const auto* cred = SelectS3StorageCredential(credentials); + ASSERT_NE(cred, nullptr); + EXPECT_EQ(cred->prefix, "s3://bucket/data"); +} + +TEST(RestFileIOTest, SelectS3StorageCredentialIgnoresNonS3Prefixes) { + std::vector credentials = { + {.prefix = "gs://bucket", .config = {{"k", "v"}}}, + {.prefix = "s3", .config = {{"s3.access-key-id", "a"}}}, + }; + const auto* cred = SelectS3StorageCredential(credentials); + ASSERT_NE(cred, nullptr); + EXPECT_EQ(cred->prefix, "s3"); +} + +TEST(RestFileIOTest, SelectS3StorageCredentialReturnsNullWhenNoneMatch) { + std::vector credentials = { + {.prefix = "gs://bucket", .config = {{"k", "v"}}}, + }; + EXPECT_EQ(SelectS3StorageCredential(credentials), nullptr); + EXPECT_EQ(SelectS3StorageCredential({}), nullptr); +} + } // namespace iceberg::rest From 924d8ba49b56fc6a84cff69dedc5f5d0b6694b9f Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Tue, 16 Jun 2026 17:58:40 +0800 Subject: [PATCH 3/3] address review comments --- src/iceberg/arrow/s3/arrow_s3_file_io.cc | 3 ++- src/iceberg/catalog/rest/json_serde.cc | 4 ++-- src/iceberg/catalog/rest/rest_catalog.cc | 18 ++++-------------- src/iceberg/catalog/rest/rest_file_io.cc | 15 +++++++++++++++ src/iceberg/catalog/rest/rest_file_io.h | 8 ++++++++ src/iceberg/test/arrow_s3_file_io_test.cc | 2 ++ src/iceberg/test/rest_file_io_test.cc | 21 +++++++++++++++++++++ 7 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/iceberg/arrow/s3/arrow_s3_file_io.cc b/src/iceberg/arrow/s3/arrow_s3_file_io.cc index 36858f76d..5392f65fa 100644 --- a/src/iceberg/arrow/s3/arrow_s3_file_io.cc +++ b/src/iceberg/arrow/s3/arrow_s3_file_io.cc @@ -145,7 +145,8 @@ Result<::arrow::fs::S3Options> ConfigureS3Options( options.force_virtual_addressing = !*path_style_access; } - // Configure SSL + // Configure SSL. Explicit `s3.ssl.enabled` overrides any scheme derived from + // the endpoint above. ICEBERG_ASSIGN_OR_RAISE(const auto ssl_enabled, ParseOptionalBool(properties, S3Properties::kSslEnabled)); if (ssl_enabled.has_value() && !*ssl_enabled) { diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index c5927895f..30ab6de03 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -722,8 +722,8 @@ Result LoadTableResultFromJson(const nlohmann::json& json) { GetJsonValueOrDefault(json, kConfig)); if (auto it = json.find(kStorageCredentials); it != json.end() && !it->is_null()) { if (!it->is_array()) { - return JsonParseError("Cannot parse storage credentials from non-array: {}", - SafeDumpJson(*it)); + // Don't echo the value — it may carry credential material. + return JsonParseError("Cannot parse storage credentials from non-array"); } for (const auto& entry : *it) { StorageCredential cred; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index bb34e62c7..e21aa048c 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -41,7 +41,6 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" -#include "iceberg/file_io_registry.h" #include "iceberg/json_serde_internal.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" @@ -519,22 +518,13 @@ Result> RestCatalog::TableFileIO( const SessionContext& /*context*/, const std::unordered_map& table_config, const std::vector& storage_credentials) const { - // Longest-prefix S3-family vended credential, matching the Java client's - // VendedCredentialsProvider. When present, build a per-table S3 FileIO from - // catalog + table config + the credential; its ResolvePath resolves oss:// and - // other S3-compatible schemes. + // Longest-prefix "s3" vended credential. Java's VendedCredentialsProvider + // resolves per path against the table location; we bind one at load time + // (fine for the common one-credential-per-table case). if (const StorageCredential* s3_cred = SelectS3StorageCredential(storage_credentials); s3_cred != nullptr) { - auto properties = config_.configs(); - for (const auto& [key, value] : table_config) { - properties[key] = value; - } - for (const auto& [key, value] : s3_cred->config) { - properties[key] = value; - } ICEBERG_ASSIGN_OR_RAISE( - auto io, - FileIORegistry::Load(std::string(FileIORegistry::kArrowS3FileIO), properties)); + auto io, MakeS3FileIOFromCredential(config_.configs(), table_config, *s3_cred)); return std::shared_ptr(std::move(io)); } ICEBERG_RETURN_UNEXPECTED(ValidateNoFileIOConfig(table_config)); diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index a12561feb..a1002c296 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -20,6 +20,7 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include +#include #include #include "iceberg/catalog/rest/types.h" @@ -106,4 +107,18 @@ const StorageCredential* SelectS3StorageCredential( return best; } +Result> MakeS3FileIOFromCredential( + const std::unordered_map& catalog_config, + const std::unordered_map& table_config, + const StorageCredential& s3_cred) { + auto properties = catalog_config; + for (const auto& [key, value] : table_config) { + properties[key] = value; + } + for (const auto& [key, value] : s3_cred.config) { + properties[key] = value; + } + return FileIORegistry::Load(std::string(FileIORegistry::kArrowS3FileIO), properties); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h index 5f0fb9fc2..e4f52766e 100644 --- a/src/iceberg/catalog/rest/rest_file_io.h +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "iceberg/catalog/rest/catalog_properties.h" @@ -51,4 +52,11 @@ ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( ICEBERG_REST_EXPORT const StorageCredential* SelectS3StorageCredential( const std::vector& credentials); +/// \brief Builds an `arrow-fs-s3` FileIO, merging catalog, table, then +/// credential config (later wins). +ICEBERG_REST_EXPORT Result> MakeS3FileIOFromCredential( + const std::unordered_map& catalog_config, + const std::unordered_map& table_config, + const StorageCredential& s3_cred); + } // namespace iceberg::rest diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc index e90325e0b..b4e64d505 100644 --- a/src/iceberg/test/arrow_s3_file_io_test.cc +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -77,10 +77,12 @@ namespace { class ArrowS3FileIOTest : public ::testing::Test { protected: +#if ICEBERG_S3_ENABLED static void SetUpTestSuite() { auto io = MakeS3FileIO({}); ASSERT_THAT(io, IsOk()); } +#endif static void TearDownTestSuite() { auto status = FinalizeS3(); diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index 50d33e383..ac70a934f 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -19,6 +19,8 @@ #include "iceberg/catalog/rest/rest_file_io.h" +#include +#include #include #include @@ -179,4 +181,23 @@ TEST(RestFileIOTest, SelectS3StorageCredentialReturnsNullWhenNoneMatch) { EXPECT_EQ(SelectS3StorageCredential({}), nullptr); } +TEST(RestFileIOTest, MakeS3FileIOFromCredentialMergesConfigWithPrecedence) { + auto captured = std::make_shared>(); + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [captured](const std::unordered_map& properties) + -> Result> { + *captured = properties; + return std::make_unique(); + }); + auto result = MakeS3FileIOFromCredential( + {{"a", "catalog"}, {"shared", "catalog"}}, {{"b", "table"}, {"shared", "table"}}, + StorageCredential{.prefix = "s3", .config = {{"c", "cred"}, {"shared", "cred"}}}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ((*captured)["a"], "catalog"); + EXPECT_EQ((*captured)["b"], "table"); + EXPECT_EQ((*captured)["c"], "cred"); + EXPECT_EQ((*captured)["shared"], "cred"); +} + } // namespace iceberg::rest