From 668aaa495c9ffe85903ad6c5294ba0e632d399be Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Thu, 11 Jun 2026 11:56:13 -0700 Subject: [PATCH] feat(rest): vend storage credentials into per-table FileIO The REST catalog declared the `.../credentials` endpoint path but `LoadTableResult` discarded the `storage-credentials` the server returns, so credential-vending catalogs could not access table data end to end. - Add a `StorageCredential` type and parse the `storage-credentials` list on `LoadTableResult` (load/create/register/stage responses), with JSON round-trip support. - Add `MakeTableFileIO`, which builds a table-scoped FileIO by layering the most-specific (longest-prefix) vended credential and the table's `config` on top of the catalog configuration, reusing the shared catalog FileIO when there are no overrides. This also resolves the per-table FileIO FIXME in `RestCatalog::LoadTable`. - Wire it through LoadTable / CreateTable / RegisterTable / StageCreateTable. - Unit tests for serde and for credential selection / FileIO layering. --- src/iceberg/catalog/rest/json_serde.cc | 25 +++++++++ src/iceberg/catalog/rest/rest_catalog.cc | 24 ++++++-- src/iceberg/catalog/rest/rest_file_io.cc | 60 ++++++++++++++++++++ src/iceberg/catalog/rest/rest_file_io.h | 26 +++++++++ src/iceberg/catalog/rest/types.cc | 3 +- src/iceberg/catalog/rest/types.h | 19 ++++++- src/iceberg/test/rest_file_io_test.cc | 70 ++++++++++++++++++++++++ src/iceberg/test/rest_json_serde_test.cc | 27 ++++++++- 8 files changed, 244 insertions(+), 10 deletions(-) diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index 2eb0d19de..fafea47b3 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -71,6 +71,8 @@ constexpr std::string_view kSource = "source"; constexpr std::string_view kDestination = "destination"; constexpr std::string_view kMetadata = "metadata"; constexpr std::string_view kConfig = "config"; +constexpr std::string_view kStorageCredentials = "storage-credentials"; +constexpr std::string_view kPrefix = "prefix"; constexpr std::string_view kIdentifiers = "identifiers"; constexpr std::string_view kOverrides = "overrides"; constexpr std::string_view kDefaults = "defaults"; @@ -689,12 +691,32 @@ Result RenameTableRequestFromJson(const nlohmann::json& json return request; } +// StorageCredential (used by LoadTableResult) +nlohmann::json ToJson(const StorageCredential& credential) { + nlohmann::json json; + json[kPrefix] = credential.prefix; + SetContainerField(json, kConfig, credential.config); + return json; +} + +Result StorageCredentialFromJson(const nlohmann::json& json) { + StorageCredential credential; + ICEBERG_ASSIGN_OR_RAISE(credential.prefix, GetJsonValue(json, kPrefix)); + ICEBERG_ASSIGN_OR_RAISE( + credential.config, + GetJsonValueOrDefault(json, kConfig)); + return credential; +} + // LoadTableResult (used by CreateTableResponse, LoadTableResponse) nlohmann::json ToJson(const LoadTableResult& result) { nlohmann::json json; SetOptionalStringField(json, kMetadataLocation, result.metadata_location); json[kMetadata] = ToJson(*result.metadata); SetContainerField(json, kConfig, result.config); + for (const auto& credential : result.storage_credentials) { + json[kStorageCredentials].emplace_back(ToJson(credential)); + } return json; } @@ -707,6 +729,9 @@ Result LoadTableResultFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json)); ICEBERG_ASSIGN_OR_RAISE(result.config, GetJsonValueOrDefault(json, kConfig)); + ICEBERG_ASSIGN_OR_RAISE(result.storage_credentials, + FromJsonList(json, kStorageCredentials, + StorageCredentialFromJson)); ICEBERG_RETURN_UNEXPECTED(result.Validate()); return result; } diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index f04f5fb55..bb5f709cb 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -366,8 +366,12 @@ Result> RestCatalog::CreateTable( ICEBERG_ASSIGN_OR_RAISE(auto result, CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/false)); + ICEBERG_ASSIGN_OR_RAISE(auto file_io, + MakeTableFileIO(config_, file_io_, result.metadata->location, + result.config, result.storage_credentials)); return Table::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, shared_from_this()); + std::move(result.metadata_location), std::move(file_io), + shared_from_this()); } Result> RestCatalog::UpdateTable( @@ -409,10 +413,13 @@ Result> RestCatalog::StageCreateTable( ICEBERG_ASSIGN_OR_RAISE(auto result, CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/true)); + ICEBERG_ASSIGN_OR_RAISE(auto file_io, + MakeTableFileIO(config_, file_io_, result.metadata->location, + result.config, result.storage_credentials)); ICEBERG_ASSIGN_OR_RAISE(auto staged_table, StagedTable::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, - shared_from_this())); + std::move(result.metadata_location), + std::move(file_io), shared_from_this())); return Transaction::Make(std::move(staged_table), TransactionKind::kCreate); } @@ -479,9 +486,11 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); - /// FIXME: support per-table FileIO creation + ICEBERG_ASSIGN_OR_RAISE( + auto file_io, MakeTableFileIO(config_, file_io_, load_result.metadata->location, + load_result.config, load_result.storage_credentials)); return Table::Make(identifier, std::move(load_result.metadata), - std::move(load_result.metadata_location), file_io_, + std::move(load_result.metadata_location), std::move(file_io), shared_from_this()); } @@ -503,8 +512,11 @@ Result> RestCatalog::RegisterTable( ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + ICEBERG_ASSIGN_OR_RAISE( + auto file_io, MakeTableFileIO(config_, file_io_, load_result.metadata->location, + load_result.config, load_result.storage_credentials)); return Table::Make(identifier, std::move(load_result.metadata), - std::move(load_result.metadata_location), file_io_, + std::move(load_result.metadata_location), std::move(file_io), shared_from_this()); } diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index f08a03353..addc99710 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -19,8 +19,13 @@ #include "iceberg/catalog/rest/rest_file_io.h" +#include #include +#include +#include +#include +#include "iceberg/catalog/rest/rest_util.h" #include "iceberg/file_io_registry.h" #include "iceberg/util/macros.h" @@ -92,4 +97,59 @@ Result> MakeCatalogFileIO(const RestCatalogProperties& c return FileIORegistry::Load(io_impl, config.configs()); } +const StorageCredential* MatchStorageCredential( + std::string_view location, const std::vector& credentials) { + const StorageCredential* best = nullptr; + for (const auto& credential : credentials) { + if (!location.starts_with(credential.prefix)) { + continue; + } + if (best == nullptr || credential.prefix.size() > best->prefix.size()) { + best = &credential; + } + } + return best; +} + +Result> MakeTableFileIO( + const RestCatalogProperties& catalog_config, + const std::shared_ptr& catalog_file_io, std::string_view location, + const std::unordered_map& table_config, + const std::vector& storage_credentials) { + const StorageCredential* credential = + MatchStorageCredential(location, storage_credentials); + + // Without table-specific overrides, reuse the shared catalog FileIO. + if (table_config.empty() && credential == nullptr) { + return catalog_file_io; + } + + // Layer table config, then vended credentials, on top of the catalog properties. + // Vended credentials are the most specific and therefore take precedence. + static const std::unordered_map kEmptyConfig; + auto properties = + MergeConfigs(catalog_config.configs(), table_config, + credential != nullptr ? credential->config : kEmptyConfig); + + std::string io_impl; + if (auto it = properties.find(RestCatalogProperties::kIOImpl.key()); + it != properties.end()) { + io_impl = it->second; + } + if (io_impl.empty()) { + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(location)); + io_impl = std::string(BuiltinFileIOName(detected_kind)); + } else if (!location.empty() && IsBuiltinImpl(io_impl)) { + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(location)); + if (io_impl != BuiltinFileIOName(detected_kind)) { + return InvalidArgument( + R"("io-impl" value '{}' is incompatible with table location '{}')", io_impl, + location); + } + } + + ICEBERG_ASSIGN_OR_RAISE(auto file_io, FileIORegistry::Load(io_impl, properties)); + return std::shared_ptr(std::move(file_io)); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h index 68482521a..e539120b4 100644 --- a/src/iceberg/catalog/rest/rest_file_io.h +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -22,9 +22,12 @@ #include #include #include +#include +#include #include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io.h" #include "iceberg/file_io_registry.h" #include "iceberg/result.h" @@ -44,4 +47,27 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind); ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( const RestCatalogProperties& config); +/// \brief Select the storage credential whose prefix is the most specific (longest) +/// match for `location`, per the REST spec guidance. +/// +/// \return A pointer into `credentials`, or nullptr if none match. The pointer is +/// valid only as long as `credentials` is alive and unmodified. +ICEBERG_REST_EXPORT const StorageCredential* MatchStorageCredential( + std::string_view location, const std::vector& credentials); + +/// \brief Build a FileIO for a single table from a load/create response. +/// +/// Layers the table's vended storage credentials (the most specific prefix match for +/// `location`) and table-specific `table_config` on top of the catalog configuration, +/// then resolves a FileIO implementation. Vended credentials take precedence over +/// `table_config`, which takes precedence over the catalog configuration. +/// +/// When the table supplies neither vended credentials nor overriding config, +/// `catalog_file_io` is returned unchanged so the shared instance is reused. +ICEBERG_REST_EXPORT Result> MakeTableFileIO( + const RestCatalogProperties& catalog_config, + const std::shared_ptr& catalog_file_io, std::string_view location, + const std::unordered_map& table_config, + const std::vector& storage_credentials); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 8d96bccb2..84fba9a7c 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -86,7 +86,8 @@ bool CreateTableRequest::operator==(const CreateTableRequest& other) const { } bool LoadTableResult::operator==(const LoadTableResult& other) const { - if (metadata_location != other.metadata_location || config != other.config) { + if (metadata_location != other.metadata_location || config != other.config || + storage_credentials != other.storage_credentials) { return false; } diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 7849b366b..a27391032 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -180,12 +180,29 @@ struct ICEBERG_REST_EXPORT CreateTableRequest { /// \brief An opaque token that allows clients to make use of pagination for list APIs. using PageToken = std::string; +/// \brief A storage credential vended by the REST catalog, scoped to a location prefix. +/// +/// The REST catalog returns these so clients can access table data without holding +/// long-lived storage credentials. When several credentials are available, clients +/// should pick the one with the most specific (longest) matching prefix. +struct ICEBERG_REST_EXPORT StorageCredential { + /// Storage location prefix this credential applies to (e.g. "s3://bucket/db/table"). + std::string prefix; // required + /// Credential properties to layer onto the FileIO configuration (e.g. access key, + /// secret key, session token). + std::unordered_map config; + + bool operator==(const StorageCredential&) const = default; +}; + /// \brief Result body for table create/load/register APIs. struct ICEBERG_REST_EXPORT LoadTableResult { std::string metadata_location; std::shared_ptr metadata; // required std::unordered_map config; - // TODO(Li Feiyang): Add std::shared_ptr storage_credential; + /// Storage credentials for accessing the table's data. Clients should prefer these + /// over any credentials embedded in `config`. + std::vector storage_credentials; /// \brief Validates the LoadTableResult. Status Validate() const { diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index c86df2753..3f1abfbae 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -19,9 +19,15 @@ #include "iceberg/catalog/rest/rest_file_io.h" +#include +#include +#include +#include + #include #include +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io_registry.h" #include "iceberg/test/matchers.h" @@ -143,4 +149,68 @@ TEST(RestFileIOTest, MakeCatalogFileIOSkipsCheckWhenWarehouseAbsent) { ASSERT_THAT(result, IsOk()); } +TEST(RestFileIOTest, MatchStorageCredentialPicksLongestPrefix) { + std::vector credentials = { + {.prefix = "s3://bucket", .config = {{"k", "broad"}}}, + {.prefix = "s3://bucket/db/table", .config = {{"k", "specific"}}}, + {.prefix = "s3://other", .config = {{"k", "other"}}}, + }; + + const auto* match = + MatchStorageCredential("s3://bucket/db/table/data/f.parquet", credentials); + ASSERT_NE(match, nullptr); + EXPECT_EQ(match->prefix, "s3://bucket/db/table"); + + EXPECT_EQ(MatchStorageCredential("gs://nope/x", credentials), nullptr); +} + +TEST(RestFileIOTest, MatchStorageCredentialEmptyReturnsNull) { + EXPECT_EQ(MatchStorageCredential("s3://bucket/x", {}), nullptr); +} + +TEST(RestFileIOTest, MakeTableFileIOReusesCatalogIOWhenNoOverrides) { + auto catalog_io = std::make_shared(); + auto config = RestCatalogProperties::FromMap( + {{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}}); + + auto result = MakeTableFileIO(config, catalog_io, "s3://bucket/test", + /*table_config=*/{}, /*storage_credentials=*/{}); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), catalog_io); // shared catalog instance reused +} + +TEST(RestFileIOTest, MakeTableFileIOAppliesVendedCredentials) { + auto captured = std::make_shared>(); + FileIORegistry::Register( + std::string(FileIORegistry::kArrowS3FileIO), + [captured](const std::unordered_map& properties) + -> Result> { + *captured = properties; + return std::make_unique(); + }); + + auto catalog_io = std::make_shared(); + auto config = RestCatalogProperties::FromMap( + {{"io-impl", std::string(FileIORegistry::kArrowS3FileIO)}, + {"s3.access-key-id", "catalog-key"}}); + + std::vector credentials = { + {.prefix = "s3://bucket", .config = {{"s3.access-key-id", "broad"}}}, + {.prefix = "s3://bucket/test", + .config = {{"s3.access-key-id", "vended"}, {"s3.session-token", "tok"}}}, + }; + + auto result = MakeTableFileIO(config, catalog_io, "s3://bucket/test/data/f.parquet", + /*table_config=*/{{"write.parquet.compression", "zstd"}}, + credentials); + ASSERT_THAT(result, IsOk()); + EXPECT_NE(result.value(), catalog_io); // a new, table-scoped FileIO + + // The most specific vended credential wins over the catalog value. + EXPECT_EQ((*captured)["s3.access-key-id"], "vended"); + EXPECT_EQ((*captured)["s3.session-token"], "tok"); + // Table-specific config is layered in as well. + EXPECT_EQ((*captured)["write.parquet.compression"], "zstd"); +} + } // namespace iceberg::rest diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 86c852a1a..c6b8f60a9 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -1116,7 +1116,17 @@ INSTANTIATE_TEST_SUITE_P( .model = {.metadata_location = "s3://bucket/metadata/v1.json", .metadata = MakeSimpleTableMetadata(), .config = {{"warehouse", "s3://bucket/warehouse"}, - {"foo", "bar"}}}}), + {"foo", "bar"}}}}, + // With vended storage credentials + LoadTableResultParam{ + .test_name = "WithStorageCredentials", + .expected_json_str = + R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot-log":[],"snapshots":[],"sort-orders":[{"fields":[],"order-id":0}],"statistics":[],"table-uuid":"test-uuid-1234"},"storage-credentials":[{"config":{"s3.access-key-id":"AKIA","s3.secret-access-key":"secret"},"prefix":"s3://bucket/test"}]})", + .model = {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = {StorageCredential{ + .prefix = "s3://bucket/test", + .config = {{"s3.access-key-id", "AKIA"}, + {"s3.secret-access-key", "secret"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1145,7 +1155,20 @@ INSTANTIATE_TEST_SUITE_P( .json_str = R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"config":{"warehouse":"s3://bucket/warehouse"}})", .expected_model = {.metadata = MakeSimpleTableMetadata(), - .config = {{"warehouse", "s3://bucket/warehouse"}}}}), + .config = {{"warehouse", "s3://bucket/warehouse"}}}}, + // With multiple vended storage credentials + LoadTableResultDeserializeParam{ + .test_name = "WithStorageCredentials", + .json_str = + R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"storage-credentials":[{"prefix":"s3://bucket","config":{"s3.access-key-id":"BROAD"}},{"prefix":"s3://bucket/test","config":{"s3.access-key-id":"AKIA","s3.session-token":"tok"}}]})", + .expected_model = + {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = + {StorageCredential{.prefix = "s3://bucket", + .config = {{"s3.access-key-id", "BROAD"}}}, + StorageCredential{.prefix = "s3://bucket/test", + .config = {{"s3.access-key-id", "AKIA"}, + {"s3.session-token", "tok"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; });