diff --git a/src/iceberg/catalog/rest/endpoint.h b/src/iceberg/catalog/rest/endpoint.h index 7382955ce..fdcd2108e 100644 --- a/src/iceberg/catalog/rest/endpoint.h +++ b/src/iceberg/catalog/rest/endpoint.h @@ -128,6 +128,26 @@ class ICEBERG_REST_EXPORT Endpoint { return {HttpMethod::kPost, "/v1/{prefix}/transactions/commit"}; } + // Scan planning endpoints + static Endpoint PlanTableScan() { + return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"}; + } + + static Endpoint FetchPlanningResult() { + return {HttpMethod::kGet, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint CancelPlanning() { + return {HttpMethod::kDelete, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint FetchScanTasks() { + return {HttpMethod::kPost, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"}; + } + private: Endpoint(HttpMethod method, std::string_view path) : method_(method), path_(path) {} diff --git a/src/iceberg/catalog/rest/error_handlers.cc b/src/iceberg/catalog/rest/error_handlers.cc index f3e5b8fb3..67146e745 100644 --- a/src/iceberg/catalog/rest/error_handlers.cc +++ b/src/iceberg/catalog/rest/error_handlers.cc @@ -30,6 +30,9 @@ namespace { constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException"; constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException"; constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException"; +constexpr std::string_view kNoSuchTableException = "NoSuchTableException"; +constexpr std::string_view kNoSuchPlanIdException = "NoSuchPlanIdException"; +constexpr std::string_view kNoSuchPlanTaskException = "NoSuchPlanTaskException"; } // namespace @@ -183,4 +186,52 @@ Status ViewCommitErrorHandler::Accept(const ErrorResponse& error) const { return DefaultErrorHandler::Accept(error); } +const std::shared_ptr& PlanErrorHandler::Instance() { + static const std::shared_ptr instance{new PlanErrorHandler()}; + return instance; +} + +Status PlanErrorHandler::Accept(const ErrorResponse& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + if (error.type == kNoSuchTableException) { + return NoSuchTable(error.message); + } + if (error.type == kNoSuchPlanIdException) { + return NoSuchPlanId(error.message); + } + return NotFound(error.message); + case 406: + return NotSupported(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + +const std::shared_ptr& PlanTaskErrorHandler::Instance() { + static const std::shared_ptr instance{new PlanTaskErrorHandler()}; + return instance; +} + +Status PlanTaskErrorHandler::Accept(const ErrorResponse& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + if (error.type == kNoSuchTableException) { + return NoSuchTable(error.message); + } + if (error.type == kNoSuchPlanTaskException) { + return NoSuchPlanTask(error.message); + } + return NotFound(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/error_handlers.h b/src/iceberg/catalog/rest/error_handlers.h index eae2c9b7f..ee338fb3e 100644 --- a/src/iceberg/catalog/rest/error_handlers.h +++ b/src/iceberg/catalog/rest/error_handlers.h @@ -127,4 +127,26 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand constexpr ViewCommitErrorHandler() = default; }; +/// \brief Plan operation error handler. +class ICEBERG_REST_EXPORT PlanErrorHandler final : public DefaultErrorHandler { + public: + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorResponse& error) const override; + + private: + constexpr PlanErrorHandler() = default; +}; + +/// \brief Fetch scan tasks operation error handler. +class ICEBERG_REST_EXPORT PlanTaskErrorHandler final : public DefaultErrorHandler { + public: + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorResponse& error) const override; + + private: + constexpr PlanTaskErrorHandler() = default; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index eebdc1969..2eb0d19de 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -17,8 +17,12 @@ * under the License. */ +#include +#include #include +#include #include +#include #include #include @@ -26,12 +30,18 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/expression/json_serde_internal.h" +#include "iceberg/file_format.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/partition_spec.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" +#include "iceberg/util/data_file_set.h" #include "iceberg/util/json_util_internal.h" #include "iceberg/util/macros.h" @@ -78,6 +88,468 @@ constexpr std::string_view kExpiresIn = "expires_in"; constexpr std::string_view kIssuedTokenType = "issued_token_type"; constexpr std::string_view kRefreshToken = "refresh_token"; constexpr std::string_view kOAuthScope = "scope"; +constexpr std::string_view kPlanStatus = "status"; +constexpr std::string_view kPlanId = "plan-id"; +constexpr std::string_view kPlanTasks = "plan-tasks"; +constexpr std::string_view kFileScanTasks = "file-scan-tasks"; +constexpr std::string_view kDeleteFiles = "delete-files"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kSelect = "select"; +constexpr std::string_view kFilter = "filter"; +constexpr std::string_view kCaseSensitive = "case-sensitive"; +constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema"; +constexpr std::string_view kStartSnapshotId = "start-snapshot-id"; +constexpr std::string_view kEndSnapshotId = "end-snapshot-id"; +constexpr std::string_view kStatsFields = "stats-fields"; +constexpr std::string_view kMinRowsRequested = "min-rows-requested"; +constexpr std::string_view kPlanTask = "plan-task"; +constexpr std::string_view kContent = "content"; +constexpr std::string_view kContentData = "data"; +constexpr std::string_view kContentPositionDeletes = "position-deletes"; +constexpr std::string_view kContentEqualityDeletes = "equality-deletes"; +constexpr std::string_view kFilePath = "file-path"; +constexpr std::string_view kFileFormat = "file-format"; +constexpr std::string_view kSpecId = "spec-id"; +constexpr std::string_view kPartition = "partition"; +constexpr std::string_view kRecordCount = "record-count"; +constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes"; +constexpr std::string_view kColumnSizes = "column-sizes"; +constexpr std::string_view kValueCounts = "value-counts"; +constexpr std::string_view kNullValueCounts = "null-value-counts"; +constexpr std::string_view kNanValueCounts = "nan-value-counts"; +constexpr std::string_view kLowerBounds = "lower-bounds"; +constexpr std::string_view kUpperBounds = "upper-bounds"; +constexpr std::string_view kKeyMetadata = "key-metadata"; +constexpr std::string_view kSplitOffsets = "split-offsets"; +constexpr std::string_view kEqualityIds = "equality-ids"; +constexpr std::string_view kSortOrderId = "sort-order-id"; +constexpr std::string_view kFirstRowId = "first-row-id"; +constexpr std::string_view kReferencedDataFile = "referenced-data-file"; +constexpr std::string_view kContentOffset = "content-offset"; +constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes"; +constexpr std::string_view kDataFile = "data-file"; +constexpr std::string_view kDeleteFileReferences = "delete-file-references"; +constexpr std::string_view kResidualFilter = "residual-filter"; +constexpr std::string_view kMapKeys = "keys"; +constexpr std::string_view kMapValues = "values"; + +template +Result> KeyValueMapFromJson(const nlohmann::json& json, + std::string_view key) { + std::map result; + if (!json.contains(key) || json.at(key).is_null()) { + return result; + } + + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + ICEBERG_ASSIGN_OR_RAISE(auto keys, + GetJsonValue>(map_json, kMapKeys)); + ICEBERG_ASSIGN_OR_RAISE(auto values, + GetJsonValue>(map_json, kMapValues)); + if (keys.size() != values.size()) { + return JsonParseError("'{}' map keys and values have different lengths", key); + } + + for (size_t i = 0; i < keys.size(); ++i) { + result[keys[i]] = std::move(values[i]); + } + return result; +} + +template +void SetKeyValueMap(nlohmann::json& json, std::string_view key, + const std::map& map) { + if (map.empty()) { + return; + } + + std::vector keys; + std::vector values; + keys.reserve(map.size()); + values.reserve(map.size()); + for (const auto& [field_id, value] : map) { + keys.push_back(field_id); + values.push_back(value); + } + json[key] = {{kMapKeys, std::move(keys)}, {kMapValues, std::move(values)}}; +} + +} // namespace + +Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_spec_by_id, + const Schema& schema) { + if (!json.is_object()) { + return JsonParseError("DataFile must be a JSON object: {}", SafeDumpJson(json)); + } + DataFile data_file; + + ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue(json, kContent)); + if (content_str == kContentData) { + data_file.content = DataFile::Content::kData; + } else if (content_str == kContentPositionDeletes) { + data_file.content = DataFile::Content::kPositionDeletes; + } else if (content_str == kContentEqualityDeletes) { + data_file.content = DataFile::Content::kEqualityDeletes; + } else { + return JsonParseError("Unknown data file content: {}", content_str); + } + + ICEBERG_ASSIGN_OR_RAISE(data_file.file_path, + GetJsonValue(json, kFilePath)); + ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue(json, kFileFormat)); + ICEBERG_ASSIGN_OR_RAISE(data_file.file_format, FileFormatTypeFromString(format_str)); + + ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue(json, kSpecId)); + data_file.partition_spec_id = spec_id; + + ICEBERG_ASSIGN_OR_RAISE(auto partition_vals, + GetJsonValue(json, kPartition)); + if (!partition_vals.is_array()) { + return JsonParseError("PartitionValues must be a JSON array: {}", + SafeDumpJson(partition_vals)); + } + std::vector literals; + auto it = partition_spec_by_id.find(spec_id); + if (it == partition_spec_by_id.end()) { + return JsonParseError("Invalid partition spec id: {}", spec_id); + } + ICEBERG_ASSIGN_OR_RAISE(auto struct_type, it->second->PartitionType(schema)); + auto fields = struct_type->fields(); + if (partition_vals.size() != fields.size()) { + return JsonParseError("Invalid partition data size: expected = {}, actual = {}", + fields.size(), partition_vals.size()); + } + for (size_t pos = 0; pos < fields.size(); ++pos) { + ICEBERG_ASSIGN_OR_RAISE( + auto literal, LiteralFromJson(partition_vals[pos], fields[pos].type().get())); + literals.push_back(std::move(literal)); + } + data_file.partition = PartitionValues(std::move(literals)); + + ICEBERG_ASSIGN_OR_RAISE(data_file.record_count, + GetJsonValue(json, kRecordCount)); + ICEBERG_ASSIGN_OR_RAISE(data_file.file_size_in_bytes, + GetJsonValue(json, kFileSizeInBytes)); + + ICEBERG_ASSIGN_OR_RAISE(data_file.column_sizes, + KeyValueMapFromJson(json, kColumnSizes)); + ICEBERG_ASSIGN_OR_RAISE(data_file.value_counts, + KeyValueMapFromJson(json, kValueCounts)); + ICEBERG_ASSIGN_OR_RAISE(data_file.null_value_counts, + KeyValueMapFromJson(json, kNullValueCounts)); + ICEBERG_ASSIGN_OR_RAISE(data_file.nan_value_counts, + KeyValueMapFromJson(json, kNanValueCounts)); + ICEBERG_ASSIGN_OR_RAISE(data_file.lower_bounds, + KeyValueMapFromJson>(json, kLowerBounds)); + ICEBERG_ASSIGN_OR_RAISE(data_file.upper_bounds, + KeyValueMapFromJson>(json, kUpperBounds)); + + if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.key_metadata, + GetJsonValue>(json, kKeyMetadata)); + } + if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.split_offsets, + GetJsonValue>(json, kSplitOffsets)); + } + if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.equality_ids, + GetJsonValue>(json, kEqualityIds)); + } + if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.sort_order_id, + GetJsonValue(json, kSortOrderId)); + } + if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.first_row_id, + GetJsonValue(json, kFirstRowId)); + } + if (json.contains(kReferencedDataFile) && !json.at(kReferencedDataFile).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.referenced_data_file, + GetJsonValue(json, kReferencedDataFile)); + } + if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.content_offset, + GetJsonValue(json, kContentOffset)); + } + if (json.contains(kContentSizeInBytes) && !json.at(kContentSizeInBytes).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(data_file.content_size_in_bytes, + GetJsonValue(json, kContentSizeInBytes)); + } + + return data_file; +} + +Result>> FileScanTasksFromJson( + const nlohmann::json& json, + const std::vector>& delete_files, + const std::unordered_map>& + partition_spec_by_id, + const Schema& schema) { + if (!json.is_array()) { + return JsonParseError("Cannot parse file scan tasks from non-array: {}", + SafeDumpJson(json)); + } + std::vector> file_scan_tasks; + for (const auto& task_json : json) { + if (!task_json.is_object()) { + return JsonParseError("Cannot parse file scan task from a non-object: {}", + SafeDumpJson(task_json)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto data_file_json, + GetJsonValue(task_json, kDataFile)); + ICEBERG_ASSIGN_OR_RAISE( + auto data_file, DataFileFromJson(data_file_json, partition_spec_by_id, schema)); + + std::vector> task_delete_files; + if (task_json.contains(kDeleteFileReferences) && + !task_json.at(kDeleteFileReferences).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue>( + task_json, kDeleteFileReferences)); + for (int32_t ref : refs) { + if (ref < 0 || static_cast(ref) >= delete_files.size()) { + return JsonParseError( + "delete-file-references index {} is out of range (delete_files size: {})", + ref, delete_files.size()); + } + task_delete_files.push_back(delete_files[ref]); + } + } + + std::shared_ptr residual_filter; + if (task_json.contains(kResidualFilter) && !task_json.at(kResidualFilter).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, + GetJsonValue(task_json, kResidualFilter)); + ICEBERG_ASSIGN_OR_RAISE(residual_filter, ExpressionFromJson(filter_json)); + } + + file_scan_tasks.push_back(std::make_shared( + std::make_shared(std::move(data_file)), std::move(task_delete_files), + std::move(residual_filter))); + } + return file_scan_tasks; +} + +Result DataFileToJsonUnchecked(const DataFile& data_file) { + nlohmann::json json; + switch (data_file.content) { + case DataFile::Content::kData: + json[kContent] = kContentData; + break; + case DataFile::Content::kPositionDeletes: + json[kContent] = kContentPositionDeletes; + break; + case DataFile::Content::kEqualityDeletes: + json[kContent] = kContentEqualityDeletes; + break; + } + json[kFilePath] = data_file.file_path; + json[kFileFormat] = ToString(data_file.file_format); + + if (!data_file.partition_spec_id.has_value()) { + return ValidationFailed("Cannot serialize REST content file without 'spec-id'"); + } + json[kSpecId] = data_file.partition_spec_id.value(); + + nlohmann::json partition_json = nlohmann::json::array(); + for (const auto& literal : data_file.partition.values()) { + ICEBERG_ASSIGN_OR_RAISE(auto lit_json, iceberg::ToJson(literal)); + partition_json.push_back(std::move(lit_json)); + } + json[kPartition] = std::move(partition_json); + + json[kRecordCount] = data_file.record_count; + json[kFileSizeInBytes] = data_file.file_size_in_bytes; + + SetKeyValueMap(json, kColumnSizes, data_file.column_sizes); + SetKeyValueMap(json, kValueCounts, data_file.value_counts); + SetKeyValueMap(json, kNullValueCounts, data_file.null_value_counts); + SetKeyValueMap(json, kNanValueCounts, data_file.nan_value_counts); + SetKeyValueMap(json, kLowerBounds, data_file.lower_bounds); + SetKeyValueMap(json, kUpperBounds, data_file.upper_bounds); + + if (!data_file.key_metadata.empty()) { + json[kKeyMetadata] = data_file.key_metadata; + } + if (!data_file.split_offsets.empty()) { + json[kSplitOffsets] = data_file.split_offsets; + } + if (!data_file.equality_ids.empty()) { + json[kEqualityIds] = data_file.equality_ids; + } + if (data_file.sort_order_id.has_value()) { + json[kSortOrderId] = data_file.sort_order_id.value(); + } + if (data_file.first_row_id.has_value()) { + json[kFirstRowId] = data_file.first_row_id.value(); + } + if (data_file.referenced_data_file.has_value()) { + json[kReferencedDataFile] = data_file.referenced_data_file.value(); + } + if (data_file.content_offset.has_value()) { + json[kContentOffset] = data_file.content_offset.value(); + } + if (data_file.content_size_in_bytes.has_value()) { + json[kContentSizeInBytes] = data_file.content_size_in_bytes.value(); + } + + return json; +} + +Result ToJson( + const DataFile& data_file, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + if (!data_file.partition_spec_id.has_value()) { + return ValidationFailed("Invalid partition spec id from content file: null"); + } + auto it = partition_specs_by_id.find(data_file.partition_spec_id.value()); + if (it == partition_specs_by_id.end() || !it->second) { + return ValidationFailed("Invalid partition spec: null"); + } + if (data_file.partition_spec_id.value() != it->second->spec_id()) { + return ValidationFailed( + "Invalid partition spec id from content file: expected = {}, actual = {}", + it->second->spec_id(), data_file.partition_spec_id.value()); + } + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, it->second->PartitionType(schema)); + if (data_file.partition.num_fields() != partition_type->fields().size()) { + return ValidationFailed( + "Invalid partition data from content file: expected = {}, actual = {}", + partition_type->fields().empty() ? "unpartitioned" : "partitioned", + data_file.partition.num_fields() == 0 ? "unpartitioned" : "partitioned"); + } + return DataFileToJsonUnchecked(data_file); +} + +namespace { + +template +Result ScanTaskFieldsToJson( + const Response& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + nlohmann::json json; + + if (response.plan_tasks.has_value()) { + json[kPlanTasks] = *response.plan_tasks; + } + + DeleteFileSet response_delete_files; + for (const auto& file : response.delete_files) { + response_delete_files.insert(file); + } + + DeleteFileSet delete_files; + auto add_delete_file = [&](const std::shared_ptr& task_file) { + if (!task_file) { + return; + } + auto [response_file, inserted_response_file] = + response_delete_files.insert(task_file); + delete_files.insert(inserted_response_file ? task_file : *response_file); + }; + if (response.file_scan_tasks.has_value()) { + for (const auto& task : *response.file_scan_tasks) { + if (!task) continue; + for (const auto& file : task->delete_files()) { + add_delete_file(file); + } + } + } + + nlohmann::json tasks_json = nlohmann::json::array(); + if (response.file_scan_tasks.has_value()) { + for (const auto& task : *response.file_scan_tasks) { + if (!task) continue; + nlohmann::json task_json; + if (task->data_file()) { + ICEBERG_ASSIGN_OR_RAISE( + auto data_file_json, + ToJson(*task->data_file(), partition_specs_by_id, schema)); + task_json[kDataFile] = std::move(data_file_json); + } + if (!task->delete_files().empty()) { + std::vector refs; + for (const auto& delete_file : task->delete_files()) { + if (delete_file) { + auto [file, _] = delete_files.insert(delete_file); + refs.push_back( + static_cast(std::distance(delete_files.begin(), file))); + } + } + if (!refs.empty()) { + task_json[kDeleteFileReferences] = std::move(refs); + } + } + if (task->residual_filter()) { + ICEBERG_ASSIGN_OR_RAISE(auto residual_json, + iceberg::ToJson(*task->residual_filter())); + task_json[kResidualFilter] = std::move(residual_json); + } + tasks_json.push_back(std::move(task_json)); + } + } + nlohmann::json delete_files_json = nlohmann::json::array(); + for (const auto& file : delete_files) { + ICEBERG_ASSIGN_OR_RAISE(auto df_json, ToJson(*file, partition_specs_by_id, schema)); + delete_files_json.push_back(std::move(df_json)); + } + if (!delete_files_json.empty()) { + json[kDeleteFiles] = std::move(delete_files_json); + } + if (response.file_scan_tasks.has_value()) { + json[kFileScanTasks] = std::move(tasks_json); + } + + return json; +} + +template +Status ScanTaskFieldsFromJson( + const nlohmann::json& json, Response& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + // 1. plan_tasks + if (json.contains(kPlanTasks)) { + ICEBERG_ASSIGN_OR_RAISE(response.plan_tasks, + GetJsonValue>(json, kPlanTasks)); + } + + // 2. delete_files + nlohmann::json delete_files_json = nlohmann::json::array(); + if (json.contains(kDeleteFiles)) { + ICEBERG_ASSIGN_OR_RAISE(delete_files_json, + GetJsonValue(json, kDeleteFiles)); + } + if (!delete_files_json.is_array()) { + return JsonParseError("Cannot parse delete files from non-array: {}", + SafeDumpJson(delete_files_json)); + } + for (const auto& entry_json : delete_files_json) { + ICEBERG_ASSIGN_OR_RAISE(auto delete_file, + DataFileFromJson(entry_json, partition_specs_by_id, schema)); + response.delete_files.push_back(std::make_shared(std::move(delete_file))); + } + + // 3. file_scan_tasks + if (json.contains(kFileScanTasks)) { + ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json, + GetJsonValue(json, kFileScanTasks)); + ICEBERG_ASSIGN_OR_RAISE( + response.file_scan_tasks, + FileScanTasksFromJson(file_scan_tasks_json, response.delete_files, + partition_specs_by_id, schema)); + } + return {}; +} } // namespace @@ -506,6 +978,167 @@ Result OAuthTokenResponseFromJson(const nlohmann::json& json return response; } +Result PlanTableScanRequestFromJson(const nlohmann::json& json) { + PlanTableScanRequest request; + ICEBERG_ASSIGN_OR_RAISE(request.snapshot_id, + GetJsonValueOptional(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(request.select, + GetJsonValueOrDefault>(json, kSelect)); + if (json.contains(kFilter)) { + ICEBERG_ASSIGN_OR_RAISE(request.filter, ExpressionFromJson(json.at(kFilter))); + } + ICEBERG_ASSIGN_OR_RAISE(request.case_sensitive, + GetJsonValueOrDefault(json, kCaseSensitive, true)); + ICEBERG_ASSIGN_OR_RAISE(request.use_snapshot_schema, + GetJsonValueOrDefault(json, kUseSnapshotSchema, false)); + ICEBERG_ASSIGN_OR_RAISE(request.start_snapshot_id, + GetJsonValueOptional(json, kStartSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(request.end_snapshot_id, + GetJsonValueOptional(json, kEndSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE( + request.stats_fields, + GetJsonValueOrDefault>(json, kStatsFields)); + ICEBERG_ASSIGN_OR_RAISE(request.min_rows_requested, + GetJsonValueOptional(json, kMinRowsRequested)); + ICEBERG_RETURN_UNEXPECTED(request.Validate()); + return request; +} + +Result ToJson(const PlanTableScanRequest& request) { + nlohmann::json json; + if (request.snapshot_id.has_value()) { + json[kSnapshotId] = request.snapshot_id.value(); + } + if (!request.select.empty()) { + json[kSelect] = request.select; + } + if (request.filter) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter)); + json[kFilter] = std::move(filter_json); + } + json[kCaseSensitive] = request.case_sensitive; + json[kUseSnapshotSchema] = request.use_snapshot_schema; + if (request.start_snapshot_id.has_value()) { + json[kStartSnapshotId] = request.start_snapshot_id.value(); + } + if (request.end_snapshot_id.has_value()) { + json[kEndSnapshotId] = request.end_snapshot_id.value(); + } + if (!request.stats_fields.empty()) { + json[kStatsFields] = request.stats_fields; + } + if (request.min_rows_requested.has_value()) { + json[kMinRowsRequested] = request.min_rows_requested.value(); + } + return json; +} + +Result FetchScanTasksRequestFromJson(const nlohmann::json& json) { + FetchScanTasksRequest request; + ICEBERG_ASSIGN_OR_RAISE(request.planTask, GetJsonValue(json, kPlanTask)); + ICEBERG_RETURN_UNEXPECTED(request.Validate()); + return request; +} + +nlohmann::json ToJson(const FetchScanTasksRequest& request) { + nlohmann::json json; + json[kPlanTask] = request.planTask; + return json; +} + +Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + PlanTableScanResponse response; + ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str, + GetJsonValue(json, kPlanStatus)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_id, + GetJsonValueOrDefault(json, kPlanId)); + if (response.plan_status == PlanStatus::kFailed) { + ICEBERG_ASSIGN_OR_RAISE(response.error, ErrorResponseFromJson(json)); + } else if (json.contains(kError)) { + return ValidationFailed("error can only be present when status is 'failed'"); + } + ICEBERG_RETURN_UNEXPECTED( + ScanTaskFieldsFromJson(json, response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result ToJson( + const PlanTableScanResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + ICEBERG_ASSIGN_OR_RAISE(nlohmann::json json, + ScanTaskFieldsToJson(response, partition_specs_by_id, schema)); + json[kPlanStatus] = ToString(response.plan_status); + if (!response.plan_id.empty()) { + json[kPlanId] = response.plan_id; + } + if (response.error.has_value()) { + json[kError] = ToJson(*response.error)[kError]; + } + return json; +} + +Result FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchPlanningResultResponse response; + ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str, + GetJsonValue(json, kPlanStatus)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str)); + if (response.plan_status == PlanStatus::kFailed) { + ICEBERG_ASSIGN_OR_RAISE(response.error, ErrorResponseFromJson(json)); + } else if (json.contains(kError)) { + return ValidationFailed("error can only be present when status is 'failed'"); + } + ICEBERG_RETURN_UNEXPECTED( + ScanTaskFieldsFromJson(json, response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result ToJson( + const FetchPlanningResultResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + ICEBERG_ASSIGN_OR_RAISE(nlohmann::json json, + ScanTaskFieldsToJson(response, partition_specs_by_id, schema)); + json[kPlanStatus] = ToString(response.plan_status); + if (response.error.has_value()) { + json[kError] = ToJson(*response.error)[kError]; + } + return json; +} + +Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchScanTasksResponse response; + ICEBERG_RETURN_UNEXPECTED( + ScanTaskFieldsFromJson(json, response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result ToJson( + const FetchScanTasksResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + return ScanTaskFieldsToJson(response, partition_specs_by_id, schema); +} + #define ICEBERG_DEFINE_FROM_JSON(Model) \ template <> \ Result FromJson(const nlohmann::json& json) { \ @@ -528,5 +1161,7 @@ ICEBERG_DEFINE_FROM_JSON(CreateTableRequest) ICEBERG_DEFINE_FROM_JSON(CommitTableRequest) ICEBERG_DEFINE_FROM_JSON(CommitTableResponse) ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse) +ICEBERG_DEFINE_FROM_JSON(PlanTableScanRequest) +ICEBERG_DEFINE_FROM_JSON(FetchScanTasksRequest) } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/json_serde_internal.h b/src/iceberg/catalog/rest/json_serde_internal.h index 820e077d7..61d2eb6e0 100644 --- a/src/iceberg/catalog/rest/json_serde_internal.h +++ b/src/iceberg/catalog/rest/json_serde_internal.h @@ -19,11 +19,16 @@ #pragma once +#include +#include +#include + #include #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/types.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" /// \file iceberg/catalog/rest/json_serde_internal.h /// JSON serialization and deserialization for Iceberg REST Catalog API types. @@ -62,4 +67,69 @@ ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse) #undef ICEBERG_DECLARE_JSON_SERDE +ICEBERG_REST_EXPORT Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); +ICEBERG_REST_EXPORT Result ToJson( + const PlanTableScanResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result +FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); +ICEBERG_REST_EXPORT Result ToJson( + const FetchPlanningResultResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); +ICEBERG_REST_EXPORT Result ToJson( + const FetchScanTasksResponse& response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result PlanTableScanRequestFromJson( + const nlohmann::json& json); +template <> +ICEBERG_REST_EXPORT Result FromJson(const nlohmann::json& json); +ICEBERG_REST_EXPORT Result ToJson(const PlanTableScanRequest& request); + +ICEBERG_REST_EXPORT Result FetchScanTasksRequestFromJson( + const nlohmann::json& json); +template <> +ICEBERG_REST_EXPORT Result FromJson(const nlohmann::json& json); +ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request); + +ICEBERG_REST_EXPORT Result ToJson( + const DataFile& df, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_spec_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result>> +FileScanTasksFromJson(const nlohmann::json& json, + const std::vector>& delete_files, + const std::unordered_map>& + partition_spec_by_id, + const Schema& schema); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.cc b/src/iceberg/catalog/rest/resource_paths.cc index 7bdde7f04..354400c4d 100644 --- a/src/iceberg/catalog/rest/resource_paths.cc +++ b/src/iceberg/catalog/rest/resource_paths.cc @@ -20,6 +20,7 @@ #include "iceberg/catalog/rest/resource_paths.h" #include +#include #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/table_identifier.h" @@ -114,4 +115,26 @@ Result ResourcePaths::CommitTransaction() const { return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); } +Result ResourcePaths::Plan(const TableIdentifier& ident, + std::optional plan_id) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, + EncodeNamespace(ident.ns, namespace_separator_)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + if (plan_id.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_plan_id, EncodeString(plan_id.value())); + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_, + encoded_namespace, encoded_table_name, encoded_plan_id); + } + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::FetchScanTasks(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, + EncodeNamespace(ident.ns, namespace_separator_)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.h b/src/iceberg/catalog/rest/resource_paths.h index db326b133..27135bb22 100644 --- a/src/iceberg/catalog/rest/resource_paths.h +++ b/src/iceberg/catalog/rest/resource_paths.h @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include "iceberg/catalog/rest/iceberg_rest_export.h" @@ -83,6 +84,15 @@ class ICEBERG_REST_EXPORT ResourcePaths { /// \brief Get the /v1/{prefix}/transactions/commit endpoint path. Result CommitTransaction() const; + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint + /// path, or /plan/{plan_id} if plan_id is provided. + Result Plan(const TableIdentifier& ident, + std::optional plan_id = std::nullopt) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks endpoint + /// path. + Result FetchScanTasks(const TableIdentifier& ident) const; + private: ResourcePaths(std::string base_uri, const std::string& prefix, std::string namespace_separator); diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 3abfb1406..8d96bccb2 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -20,16 +20,42 @@ #include "iceberg/catalog/rest/types.h" #include +#include +#include "iceberg/expression/expression.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" namespace iceberg::rest { +std::string_view ToString(PlanStatus status) { + switch (status) { + case PlanStatus::kSubmitted: + return "submitted"; + case PlanStatus::kCompleted: + return "completed"; + case PlanStatus::kCancelled: + return "cancelled"; + case PlanStatus::kFailed: + return "failed"; + } + return "unknown"; +} + +Result PlanStatusFromString(std::string_view status_str) { + if (status_str == "submitted") return PlanStatus::kSubmitted; + if (status_str == "completed") return PlanStatus::kCompleted; + if (status_str == "cancelled") return PlanStatus::kCancelled; + if (status_str == "failed") return PlanStatus::kFailed; + return JsonParseError("Unknown plan status: {}", status_str); +} + bool CreateTableRequest::operator==(const CreateTableRequest& other) const { if (name != other.name || location != other.location || stage_create != other.stage_create || properties != other.properties) { @@ -118,6 +144,107 @@ bool CommitTableResponse::operator==(const CommitTableResponse& other) const { return true; } +namespace { + +bool ExpressionPtrEqual(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) { + if (lhs == rhs) { + return true; + } + return lhs && rhs && lhs->Equals(*rhs); +} + +} // namespace + +bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const { + return snapshot_id == other.snapshot_id && select == other.select && + ExpressionPtrEqual(filter, other.filter) && + case_sensitive == other.case_sensitive && + use_snapshot_schema == other.use_snapshot_schema && + start_snapshot_id == other.start_snapshot_id && + end_snapshot_id == other.end_snapshot_id && stats_fields == other.stats_fields && + min_rows_requested == other.min_rows_requested; +} + +namespace { + +template +bool SharedPtrEqual(const std::shared_ptr& lhs, const std::shared_ptr& rhs) { + if (lhs == rhs) { + return true; + } + return lhs && rhs && *lhs == *rhs; +} + +template +bool SharedPtrVectorEqual(const std::vector>& lhs, + const std::vector>& rhs) { + return std::ranges::equal(lhs, rhs, SharedPtrEqual); +} + +bool FileScanTaskEqual(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) { + if (lhs == rhs) { + return true; + } + if (!lhs || !rhs) { + return false; + } + return SharedPtrEqual(lhs->data_file(), rhs->data_file()) && + SharedPtrVectorEqual(lhs->delete_files(), rhs->delete_files()) && + ExpressionPtrEqual(lhs->residual_filter(), rhs->residual_filter()); +} + +template +bool OptionalSharedPtrVectorEqual( + const std::optional>>& lhs, + const std::optional>>& rhs, + bool (*eq)(const std::shared_ptr&, const std::shared_ptr&)) { + if (lhs.has_value() != rhs.has_value()) { + return false; + } + return !lhs.has_value() || std::ranges::equal(*lhs, *rhs, eq); +} + +template +bool ScanTaskFieldsEqual(const Response& lhs, const Response& rhs) { + return lhs.plan_tasks == rhs.plan_tasks && + SharedPtrVectorEqual(lhs.delete_files, rhs.delete_files) && + OptionalSharedPtrVectorEqual(lhs.file_scan_tasks, rhs.file_scan_tasks, + FileScanTaskEqual); +} + +template +bool HasTaskFields(const Response& response) { + return response.plan_tasks.has_value() || response.file_scan_tasks.has_value(); +} + +template +bool HasNonEmptyFileScanTasks(const Response& response) { + return response.file_scan_tasks.has_value() && !response.file_scan_tasks->empty(); +} + +} // namespace + +bool PlanTableScanResponse::operator==(const PlanTableScanResponse& other) const { + return ScanTaskFieldsEqual(*this, other) && plan_status == other.plan_status && + plan_id == other.plan_id && error == other.error; +} + +bool FetchPlanningResultResponse::operator==( + const FetchPlanningResultResponse& other) const { + return ScanTaskFieldsEqual(*this, other) && plan_status == other.plan_status && + error == other.error; +} + +bool FetchScanTasksRequest::operator==(const FetchScanTasksRequest& other) const { + return planTask == other.planTask; +} + +bool FetchScanTasksResponse::operator==(const FetchScanTasksResponse& other) const { + return ScanTaskFieldsEqual(*this, other); +} + Status OAuthTokenResponse::Validate() const { if (access_token.empty()) { return ValidationFailed("OAuth2 token response missing required 'access_token'"); @@ -135,4 +262,100 @@ Status OAuthTokenResponse::Validate() const { return {}; } +Status PlanTableScanRequest::Validate() const { + if (snapshot_id.has_value()) { + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid scan: cannot provide both snapshotId and " + "startSnapshotId/endSnapshotId"); + } + } + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + if (!start_snapshot_id.has_value() || !end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid incremental scan: startSnapshotId and endSnapshotId is required"); + } + } + if (min_rows_requested.has_value() && min_rows_requested.value() < 0) { + return ValidationFailed("Invalid scan: minRowsRequested is negative"); + } + return {}; +} + +Status PlanTableScanResponse::Validate() const { + if (plan_status == PlanStatus::kSubmitted && plan_id.empty()) { + return ValidationFailed( + "Invalid response: plan id should be defined when status is 'submitted'"); + } + if (plan_status == PlanStatus::kCancelled) { + return ValidationFailed( + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + if (plan_status != PlanStatus::kCompleted && HasTaskFields(*this)) { + return ValidationFailed( + "Invalid response: tasks can only be defined when status is 'completed'"); + } + if (!plan_id.empty() && plan_status != PlanStatus::kSubmitted && + plan_status != PlanStatus::kCompleted) { + return ValidationFailed( + "Invalid response: plan id can only be defined when status is 'submitted' or " + "'completed'"); + } + if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + if (plan_status == PlanStatus::kFailed && !error.has_value()) { + return ValidationFailed( + "Invalid response: error must be present when status is 'failed'"); + } + if (plan_status != PlanStatus::kFailed && error.has_value()) { + return ValidationFailed( + "Invalid response: error can only be present when status is 'failed'"); + } + return {}; +} + +Status FetchPlanningResultResponse::Validate() const { + if (plan_status != PlanStatus::kCompleted && HasTaskFields(*this)) { + return ValidationFailed( + "Invalid response: tasks can only be returned in a 'completed' status"); + } + if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + if (plan_status == PlanStatus::kFailed && !error.has_value()) { + return ValidationFailed( + "Invalid response: error must be present when status is 'failed'"); + } + if (plan_status != PlanStatus::kFailed && error.has_value()) { + return ValidationFailed( + "Invalid response: error can only be present when status is 'failed'"); + } + return {}; +} + +Status FetchScanTasksRequest::Validate() const { + if (planTask.empty()) { + return ValidationFailed("Invalid planTask: null"); + } + return {}; +} + +Status FetchScanTasksResponse::Validate() const { + if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + if (!HasTaskFields(*this)) { + return ValidationFailed( + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + return {}; +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 6495a6517..7849b366b 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -23,13 +23,13 @@ #include #include #include +#include #include #include #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/result.h" -#include "iceberg/schema.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" #include "iceberg/util/macros.h" @@ -39,6 +39,17 @@ namespace iceberg::rest { +/// \brief Status of a REST server-side scan planning operation. +enum class PlanStatus { + kSubmitted, + kCompleted, + kCancelled, + kFailed, +}; + +ICEBERG_REST_EXPORT std::string_view ToString(PlanStatus status); +ICEBERG_REST_EXPORT Result PlanStatusFromString(std::string_view status_str); + /// \brief Server-provided configuration for the catalog. struct ICEBERG_REST_EXPORT CatalogConfig { std::unordered_map defaults; // required @@ -295,4 +306,72 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse { bool operator==(const OAuthTokenResponse&) const = default; }; +/// \brief Request to initiate a server-side scan planning operation. +struct ICEBERG_REST_EXPORT PlanTableScanRequest { + std::optional snapshot_id; + std::vector select; + std::shared_ptr filter; + bool case_sensitive = true; + bool use_snapshot_schema = false; + std::optional start_snapshot_id; + std::optional end_snapshot_id; + std::vector stats_fields; + std::optional min_rows_requested; + + Status Validate() const; + + bool operator==(const PlanTableScanRequest&) const; +}; + +/// \brief Response from initiating a scan planning operation, including plan status and +/// initial scan tasks. +struct ICEBERG_REST_EXPORT PlanTableScanResponse { + std::optional> plan_tasks; + std::optional>> file_scan_tasks; + std::vector> delete_files; + PlanStatus plan_status = PlanStatus::kCompleted; + std::string plan_id; + std::optional error; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const PlanTableScanResponse&) const; +}; + +/// \brief Response from polling an asynchronous scan plan, including current status and +/// available scan tasks. +struct ICEBERG_REST_EXPORT FetchPlanningResultResponse { + std::optional> plan_tasks; + std::optional>> file_scan_tasks; + std::vector> delete_files; + PlanStatus plan_status = PlanStatus::kCompleted; + std::optional error; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const FetchPlanningResultResponse&) const; +}; + +/// \brief Request to fetch the scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksRequest { + std::string planTask; + + Status Validate() const; + + bool operator==(const FetchScanTasksRequest&) const; +}; + +/// \brief Response containing the file scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksResponse { + std::optional> plan_tasks; + std::optional>> file_scan_tasks; + std::vector> delete_files; + + Status Validate() const; + + bool operator==(const FetchScanTasksResponse&) const; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/expression/aggregate.h b/src/iceberg/expression/aggregate.h index 6cf659d6f..bbb01a4ab 100644 --- a/src/iceberg/expression/aggregate.h +++ b/src/iceberg/expression/aggregate.h @@ -80,7 +80,7 @@ class ICEBERG_EXPORT UnboundAggregateImpl : public UnboundAggregate, static Result>> Make( Expression::Operation op, std::shared_ptr> term); - std::shared_ptr reference() override { + std::shared_ptr reference() const override { return BASE::term() ? BASE::term()->reference() : nullptr; } diff --git a/src/iceberg/expression/expression.h b/src/iceberg/expression/expression.h index 35ffbfdfe..1663c69f8 100644 --- a/src/iceberg/expression/expression.h +++ b/src/iceberg/expression/expression.h @@ -350,7 +350,7 @@ class ICEBERG_EXPORT Unbound { Result> Bind(const Schema& schema) const; /// \brief Returns the underlying named reference for this unbound term. - virtual std::shared_ptr reference() = 0; + virtual std::shared_ptr reference() const = 0; }; /// \brief Interface for bound expressions that can be evaluated. diff --git a/src/iceberg/expression/predicate.cc b/src/iceberg/expression/predicate.cc index 307c3c609..44c4a3a70 100644 --- a/src/iceberg/expression/predicate.cc +++ b/src/iceberg/expression/predicate.cc @@ -25,6 +25,7 @@ #include "iceberg/expression/expressions.h" #include "iceberg/expression/literal.h" #include "iceberg/result.h" +#include "iceberg/transform.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/formatter_internal.h" @@ -121,7 +122,32 @@ UnboundPredicateImpl::UnboundPredicateImpl(Expression::Operation op, template UnboundPredicateImpl::~UnboundPredicateImpl() = default; -namespace {} +namespace { + +bool UnboundTermEqual(const Term& lhs, const Term& rhs) { + if (!lhs.is_unbound() || !rhs.is_unbound() || lhs.kind() != rhs.kind()) { + return false; + } + + switch (lhs.kind()) { + case Term::Kind::kReference: { + const auto& lhs_ref = internal::checked_cast(lhs); + const auto& rhs_ref = internal::checked_cast(rhs); + return lhs_ref.name() == rhs_ref.name(); + } + case Term::Kind::kTransform: { + const auto& lhs_transform = internal::checked_cast(lhs); + const auto& rhs_transform = internal::checked_cast(rhs); + return lhs_transform.reference()->name() == rhs_transform.reference()->name() && + *lhs_transform.transform() == *rhs_transform.transform(); + } + case Term::Kind::kExtract: + return false; + } + std::unreachable(); +} + +} // namespace template std::string UnboundPredicateImpl::ToString() const { @@ -174,6 +200,24 @@ std::string UnboundPredicateImpl::ToString() const { } } +template +bool UnboundPredicateImpl::Equals(const Expression& other) const { + if (!other.is_unbound_predicate()) { + return false; + } + + if (BASE::op() != other.op()) { + return false; + } + + const auto* other_pred = dynamic_cast(&other); + if (other_pred == nullptr) { + return false; + } + return UnboundTermEqual(unbound_term(), other_pred->unbound_term()) && + std::ranges::equal(literals(), other_pred->literals()); +} + template Result> UnboundPredicateImpl::Negate() const { ICEBERG_ASSIGN_OR_RAISE(auto negated_op, ::iceberg::Negate(BASE::op())); @@ -626,7 +670,7 @@ bool BoundSetPredicate::Equals(const Expression& other) const { if (const auto* other_pred = dynamic_cast(&other); other_pred) { - return value_set_ == other_pred->value_set_; + return term_->Equals(*other_pred->term()) && value_set_ == other_pred->value_set_; } return false; diff --git a/src/iceberg/expression/predicate.h b/src/iceberg/expression/predicate.h index 6df0de1a6..af883fe19 100644 --- a/src/iceberg/expression/predicate.h +++ b/src/iceberg/expression/predicate.h @@ -65,7 +65,7 @@ class ICEBERG_EXPORT UnboundPredicate : public virtual Expression, ~UnboundPredicate() override = default; /// \brief Returns the reference of this UnboundPredicate. - std::shared_ptr reference() override = 0; + std::shared_ptr reference() const override = 0; /// \brief Bind this UnboundPredicate. Result> Bind(const Schema& schema, @@ -125,12 +125,14 @@ class ICEBERG_EXPORT UnboundPredicateImpl : public UnboundPredicate, ~UnboundPredicateImpl() override; - std::shared_ptr reference() override { + std::shared_ptr reference() const override { return BASE::term()->reference(); } std::string ToString() const override; + bool Equals(const Expression& other) const override; + Result> Bind(const Schema& schema, bool case_sensitive) const override; diff --git a/src/iceberg/expression/term.h b/src/iceberg/expression/term.h index b50ea9242..4d5f09795 100644 --- a/src/iceberg/expression/term.h +++ b/src/iceberg/expression/term.h @@ -113,7 +113,9 @@ class ICEBERG_EXPORT NamedReference Result> Bind(const Schema& schema, bool case_sensitive) const override; - std::shared_ptr reference() override { return shared_from_this(); } + std::shared_ptr reference() const override { + return shared_from_this(); + } std::string ToString() const override; @@ -185,7 +187,7 @@ class ICEBERG_EXPORT UnboundTransform : public UnboundTerm Result> Bind(const Schema& schema, bool case_sensitive) const override; - std::shared_ptr reference() override { return ref_; } + std::shared_ptr reference() const override { return ref_; } const std::shared_ptr& transform() const { return transform_; } diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 01d17b299..70155db6a 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -50,6 +50,8 @@ enum class ErrorKind { kJsonParseError, kNamespaceNotEmpty, kNoSuchNamespace, + kNoSuchPlanId, + kNoSuchPlanTask, kNoSuchTable, kNoSuchView, kNotAllowed, @@ -113,6 +115,8 @@ DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) DEFINE_ERROR_FUNCTION(NoSuchNamespace) +DEFINE_ERROR_FUNCTION(NoSuchPlanId) +DEFINE_ERROR_FUNCTION(NoSuchPlanTask) DEFINE_ERROR_FUNCTION(NoSuchTable) DEFINE_ERROR_FUNCTION(NoSuchView) DEFINE_ERROR_FUNCTION(NotAllowed) diff --git a/src/iceberg/test/predicate_test.cc b/src/iceberg/test/predicate_test.cc index fab0b5617..6b990462a 100644 --- a/src/iceberg/test/predicate_test.cc +++ b/src/iceberg/test/predicate_test.cc @@ -38,7 +38,8 @@ class PredicateTest : public ::testing::Test { SchemaField::MakeOptional(2, "name", string()), SchemaField::MakeRequired(3, "age", int32()), SchemaField::MakeOptional(4, "salary", float64()), - SchemaField::MakeRequired(5, "active", boolean())}, + SchemaField::MakeRequired(5, "active", boolean()), + SchemaField::MakeRequired(6, "age2", int32())}, /*schema_id=*/0); } @@ -288,6 +289,34 @@ TEST_F(PredicateTest, UnboundPredicateToString) { EXPECT_EQ(starts_with_pred->ToString(), "ref(name=\"name\") startsWith \"John\""); } +TEST_F(PredicateTest, UnboundPredicateEquality) { + auto pred = Expressions::GreaterThan("age", Literal::Int(25)); + auto same = Expressions::GreaterThan("age", Literal::Int(25)); + auto different_op = Expressions::GreaterThanOrEqual("age", Literal::Int(25)); + auto different_ref = Expressions::GreaterThan("id", Literal::Int(25)); + auto different_literal = Expressions::GreaterThan("age", Literal::Int(26)); + + EXPECT_TRUE(pred->Equals(*same)); + EXPECT_TRUE(same->Equals(*pred)); + EXPECT_FALSE(pred->Equals(*different_op)); + EXPECT_FALSE(pred->Equals(*different_ref)); + EXPECT_FALSE(pred->Equals(*different_literal)); + + auto bucket = [](std::string name, + int32_t num_buckets) -> std::shared_ptr> { + return Expressions::Bucket(std::move(name), num_buckets); + }; + + auto transformed = Expressions::Equal(bucket("id", 16), Literal::Int(3)); + auto same_transformed = Expressions::Equal(bucket("id", 16), Literal::Int(3)); + auto different_transform = Expressions::Equal(bucket("id", 32), Literal::Int(3)); + auto different_transform_ref = Expressions::Equal(bucket("age", 16), Literal::Int(3)); + + EXPECT_TRUE(transformed->Equals(*same_transformed)); + EXPECT_FALSE(transformed->Equals(*different_transform)); + EXPECT_FALSE(transformed->Equals(*different_transform_ref)); +} + TEST_F(PredicateTest, UnboundPredicateNegate) { auto equal_pred = Expressions::Equal("age", Literal::Int(25)); auto negated_result = equal_pred->Negate(); @@ -581,10 +610,12 @@ TEST_F(PredicateTest, BoundSetPredicateEquals) { Expressions::In("age", {Literal::Int(20), Literal::Int(10)}); // Different order auto in3 = Expressions::In("age", {Literal::Int(10), Literal::Int(30)}); // Different values + auto in4 = Expressions::In("age2", {Literal::Int(10), Literal::Int(20)}); auto bound_in1 = in1->Bind(*schema_, /*case_sensitive=*/true).value(); auto bound_in2 = in2->Bind(*schema_, /*case_sensitive=*/true).value(); auto bound_in3 = in3->Bind(*schema_, /*case_sensitive=*/true).value(); + auto bound_in4 = in4->Bind(*schema_, /*case_sensitive=*/true).value(); // Same values in different order should be equal (unordered_set) EXPECT_TRUE(bound_in1->Equals(*bound_in2)); @@ -592,6 +623,9 @@ TEST_F(PredicateTest, BoundSetPredicateEquals) { // Different values should not be equal EXPECT_FALSE(bound_in1->Equals(*bound_in3)); + + // Same values on a different term should not be equal + EXPECT_FALSE(bound_in1->Equals(*bound_in4)); } namespace { diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 9da052e6a..86c852a1a 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -17,7 +17,10 @@ * under the License. */ +#include #include +#include +#include #include #include @@ -25,14 +28,20 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" +#include "iceberg/transform.h" namespace iceberg::rest { @@ -1380,4 +1389,1161 @@ INSTANTIATE_TEST_SUITE_P( return info.param.test_name; }); +// Helper: empty schema and specs for scan response tests that don't need content-file +// partition parsing. +static Schema EmptySchema() { return Schema({}, 0); } +static std::unordered_map> EmptySpecs() { + return {}; +} +static std::unordered_map> UnpartitionedSpecs() { + return {{PartitionSpec::kInitialSpecId, PartitionSpec::Unpartitioned()}}; +} +static Schema PartitionedSchema() { + return Schema({SchemaField::MakeRequired(1, "id", int32())}, 0); +} +static std::unordered_map> PartitionedSpecs( + const Schema& schema) { + auto spec_result = PartitionSpec::Make( + schema, /*spec_id=*/1, {PartitionField(1, 1000, "id", Transform::Identity())}, + /*allow_missing_fields=*/false); + if (!spec_result.has_value()) { + ADD_FAILURE() << spec_result.error().message; + return {}; + } + auto spec = std::move(spec_result.value()); + return {{spec->spec_id(), std::shared_ptr(std::move(spec))}}; +} + +// --- PlanTableScanResponse --- + +TEST(PlanTableScanResponseFromJsonTest, SubmittedStatusMissingOptionalFields) { + auto json = nlohmann::json::parse(R"({"status":"submitted","plan-id":"abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted); + EXPECT_EQ(result->plan_id, "abc-123"); + EXPECT_FALSE(result->plan_tasks.has_value()); + EXPECT_FALSE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(PlanTableScanResponseFromJsonTest, CompletedStatusWithPlanTasks) { + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-id":"abc-123","plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kCompleted); + EXPECT_EQ(result->plan_id, "abc-123"); + ASSERT_TRUE(result->plan_tasks.has_value()); + ASSERT_EQ(result->plan_tasks->size(), 2); + EXPECT_EQ(result->plan_tasks->at(0), "task-1"); + EXPECT_EQ(result->plan_tasks->at(1), "task-2"); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->file_scan_tasks->empty()); +} + +TEST(PlanTableScanResponseFromJsonTest, FailedStatusWithError) { + auto json = nlohmann::json::parse( + R"({"status":"failed","error":{"message":"Planning failed","type":"PlanningException","code":500}})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kFailed); + EXPECT_TRUE(result->plan_id.empty()); + ASSERT_TRUE(result->error.has_value()); + EXPECT_EQ(result->error->message, "Planning failed"); + EXPECT_EQ(result->error->type, "PlanningException"); + EXPECT_EQ(result->error->code, 500); +} + +struct PlanTableScanResponseInvalidParam { + std::string test_name; + std::string json_str; + ErrorKind expected_error_kind; + std::string expected_error_msg; +}; + +class PlanTableScanResponseInvalidTest + : public ::testing::TestWithParam {}; + +TEST_P(PlanTableScanResponseInvalidTest, InvalidInput) { + const auto& param = GetParam(); + auto result = PlanTableScanResponseFromJson(nlohmann::json::parse(param.json_str), + UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(param.expected_error_kind)); + if (!param.expected_error_msg.empty()) { + EXPECT_THAT(result, HasErrorMessage(param.expected_error_msg)); + } +} + +INSTANTIATE_TEST_SUITE_P( + PlanTableScanResponseInvalidCases, PlanTableScanResponseInvalidTest, + ::testing::Values( + PlanTableScanResponseInvalidParam{ + .test_name = "EmptyJson", + .json_str = R"({})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'status'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "UnknownStatus", + .json_str = R"({"status":"someStatus"})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Unknown plan status"}, + PlanTableScanResponseInvalidParam{ + .test_name = "SubmittedWithoutPlanId", + .json_str = R"({"status":"submitted"})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "plan id should be defined when status is 'submitted'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "CancelledStatus", + .json_str = R"({"status":"cancelled"})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "'cancelled' is not a valid status for planTableScan"}, + PlanTableScanResponseInvalidParam{ + .test_name = "SubmittedWithTasks", + .json_str = + R"({"status":"submitted","plan-id":"somePlanId","plan-tasks":["task1","task2"]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "tasks can only be defined when status is 'completed'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "SubmittedWithEmptyTasks", + .json_str = + R"({"status":"submitted","plan-id":"somePlanId","plan-tasks":[]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "tasks can only be defined when status is 'completed'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "FailedWithPlanId", + .json_str = + R"({"status":"failed","plan-id":"somePlanId","error":{"message":"x","type":"y","code":500}})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = + "plan id can only be defined when status is 'submitted' or 'completed'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "FailedWithoutError", + .json_str = R"({"status":"failed"})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'error'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "CompletedWithError", + .json_str = + R"({"status":"completed","error":{"message":"x","type":"y","code":500}})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "error can only be present when status is 'failed'"}, + PlanTableScanResponseInvalidParam{ + .test_name = "DeleteFilesWithoutFileScanTasks", + .json_str = + R"({"status":"completed","delete-files":[{"content":"position-deletes","file-path":"s3://bucket/d.parquet","file-format":"PARQUET","spec-id":0,"partition":[],"file-size-in-bytes":512,"record-count":5}],"file-scan-tasks":[]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = + "deleteFiles should only be returned with fileScanTasks"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +// --- FetchPlanningResultResponse --- + +TEST(FetchPlanningResultResponseFromJsonTest, SubmittedStatusNoTasks) { + auto json = nlohmann::json::parse(R"({"status":"submitted"})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted); + EXPECT_FALSE(result->plan_tasks.has_value()); + EXPECT_FALSE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(FetchPlanningResultResponseFromJsonTest, CompletedStatusWithPlanTasks) { + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-tasks":["task-1"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kCompleted); + ASSERT_TRUE(result->plan_tasks.has_value()); + ASSERT_EQ(result->plan_tasks->size(), 1); + EXPECT_EQ(result->plan_tasks->at(0), "task-1"); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->file_scan_tasks->empty()); +} + +TEST(FetchPlanningResultResponseFromJsonTest, FailedStatusWithError) { + auto json = nlohmann::json::parse( + R"({"status":"failed","error":{"message":"Planning failed","type":"PlanningException","code":500}})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, PlanStatus::kFailed); + ASSERT_TRUE(result->error.has_value()); + EXPECT_EQ(result->error->message, "Planning failed"); + EXPECT_EQ(result->error->type, "PlanningException"); + EXPECT_EQ(result->error->code, 500); +} + +struct FetchPlanningResultResponseInvalidParam { + std::string test_name; + std::string json_str; + ErrorKind expected_error_kind; + std::string expected_error_msg; +}; + +class FetchPlanningResultResponseInvalidTest + : public ::testing::TestWithParam {}; + +TEST_P(FetchPlanningResultResponseInvalidTest, InvalidInput) { + const auto& param = GetParam(); + auto result = FetchPlanningResultResponseFromJson(nlohmann::json::parse(param.json_str), + UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(param.expected_error_kind)); + if (!param.expected_error_msg.empty()) { + EXPECT_THAT(result, HasErrorMessage(param.expected_error_msg)); + } +} + +INSTANTIATE_TEST_SUITE_P( + FetchPlanningResultResponseInvalidCases, FetchPlanningResultResponseInvalidTest, + ::testing::Values( + FetchPlanningResultResponseInvalidParam{ + .test_name = "EmptyJson", + .json_str = R"({})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'status'"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "UnknownStatus", + .json_str = R"({"status":"someStatus"})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Unknown plan status"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "SubmittedWithTasks", + .json_str = R"({"status":"submitted","plan-tasks":["task1","task2"]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "tasks can only be returned in a 'completed' status"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "SubmittedWithEmptyTasks", + .json_str = R"({"status":"submitted","plan-tasks":[]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "tasks can only be returned in a 'completed' status"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "DeleteFilesWithoutFileScanTasks", + .json_str = + R"({"status":"submitted","delete-files":[{"content":"position-deletes","file-path":"s3://bucket/d.parquet","file-format":"PARQUET","spec-id":0,"partition":[],"file-size-in-bytes":512,"record-count":5}]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = + "deleteFiles should only be returned with fileScanTasks"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "FailedWithoutError", + .json_str = R"({"status":"failed"})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'error'"}, + FetchPlanningResultResponseInvalidParam{ + .test_name = "CompletedWithError", + .json_str = + R"({"status":"completed","error":{"message":"x","type":"y","code":500}})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "error can only be present when status is 'failed'"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +// --- FetchScanTasksResponse --- + +TEST(FetchScanTasksResponseFromJsonTest, WithFileScanTasks) { + auto json = nlohmann::json::parse(R"({ + "plan-tasks": [], + "delete-files": [ + { + "content": "position-deletes", + "file-path": "s3://bucket/deletes/delete.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 512, + "record-count": 5 + } + ], + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + } + ] + })"); + auto result = FetchScanTasksResponseFromJson(json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->plan_tasks.has_value()); + EXPECT_TRUE(result->plan_tasks->empty()); + ASSERT_EQ(result->delete_files.size(), 1); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + ASSERT_EQ(result->file_scan_tasks->size(), 1); + EXPECT_EQ(result->file_scan_tasks->at(0)->data_file()->file_path, + "s3://bucket/data/file.parquet"); + ASSERT_EQ(result->file_scan_tasks->at(0)->delete_files().size(), 1); + EXPECT_EQ(result->file_scan_tasks->at(0)->delete_files()[0]->file_path, + "s3://bucket/deletes/delete.parquet"); +} + +TEST(FetchScanTasksResponseFromJsonTest, WithPlanTasksOnly) { + auto json = nlohmann::json::parse( + R"({"plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->plan_tasks.has_value()); + ASSERT_EQ(result->plan_tasks->size(), 2); + EXPECT_EQ(result->plan_tasks->at(0), "task-1"); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->file_scan_tasks->empty()); +} + +TEST(FetchScanTasksResponseFromJsonTest, AllowsPresentEmptyTaskFields) { + auto json = nlohmann::json::parse( + R"({"plan-tasks":[],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->plan_tasks.has_value()); + EXPECT_TRUE(result->plan_tasks->empty()); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + EXPECT_TRUE(result->file_scan_tasks->empty()); +} + +struct FetchScanTasksResponseInvalidParam { + std::string test_name; + std::string json_str; + ErrorKind expected_error_kind; + std::string expected_error_msg; +}; + +class FetchScanTasksResponseInvalidTest + : public ::testing::TestWithParam {}; + +TEST_P(FetchScanTasksResponseInvalidTest, InvalidInput) { + const auto& param = GetParam(); + auto result = FetchScanTasksResponseFromJson(nlohmann::json::parse(param.json_str), + UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(param.expected_error_kind)); + if (!param.expected_error_msg.empty()) { + EXPECT_THAT(result, HasErrorMessage(param.expected_error_msg)); + } +} + +INSTANTIATE_TEST_SUITE_P( + FetchScanTasksResponseInvalidCases, FetchScanTasksResponseInvalidTest, + ::testing::Values( + FetchScanTasksResponseInvalidParam{ + .test_name = "EmptyJson", + .json_str = R"({})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "planTasks and fileScanTask cannot both be null"}, + FetchScanTasksResponseInvalidParam{ + .test_name = "NullPlanTasks", + .json_str = R"({"plan-tasks":null})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'plan-tasks'"}, + FetchScanTasksResponseInvalidParam{ + .test_name = "NullFileScanTasks", + .json_str = R"({"file-scan-tasks":null})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'file-scan-tasks'"}, + FetchScanTasksResponseInvalidParam{ + .test_name = "NullDeleteFiles", + .json_str = R"({"plan-tasks":[],"delete-files":null})", + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "Missing 'delete-files'"}, + FetchScanTasksResponseInvalidParam{ + .test_name = "DeleteFilesWithoutFileScanTasks", + .json_str = + R"({"plan-tasks":["task1","task2"],"delete-files":[{"content":"position-deletes","file-path":"s3://bucket/d.parquet","file-format":"PARQUET","spec-id":0,"partition":[],"file-size-in-bytes":512,"record-count":5}],"file-scan-tasks":[]})", + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = + "deleteFiles should only be returned with fileScanTasks"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +// --- PlanTableScanRequest validation --- + +struct PlanTableScanRequestValidationParam { + std::string test_name; + PlanTableScanRequest request; + std::string expected_error_msg; +}; + +class PlanTableScanRequestValidationTest + : public ::testing::TestWithParam {}; + +TEST_P(PlanTableScanRequestValidationTest, ValidationFailed) { + const auto& param = GetParam(); + auto status = param.request.Validate(); + ASSERT_THAT(status, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(status, HasErrorMessage(param.expected_error_msg)); +} + +INSTANTIATE_TEST_SUITE_P( + PlanTableScanRequestValidationCases, PlanTableScanRequestValidationTest, + ::testing::Values( + PlanTableScanRequestValidationParam{ + .test_name = "SnapshotIdWithStartSnapshotId", + .request = {.snapshot_id = 1, .start_snapshot_id = 2, .end_snapshot_id = 5}, + .expected_error_msg = + "cannot provide both snapshotId and startSnapshotId/endSnapshotId"}, + PlanTableScanRequestValidationParam{ + .test_name = "SnapshotIdWithEndSnapshotId", + .request = {.snapshot_id = 1, .end_snapshot_id = 5}, + .expected_error_msg = + "cannot provide both snapshotId and startSnapshotId/endSnapshotId"}, + PlanTableScanRequestValidationParam{ + .test_name = "StartSnapshotIdWithoutEnd", + .request = {.start_snapshot_id = 1}, + .expected_error_msg = "startSnapshotId and endSnapshotId is required"}, + PlanTableScanRequestValidationParam{ + .test_name = "EndSnapshotIdWithoutStart", + .request = {.end_snapshot_id = 5}, + .expected_error_msg = "startSnapshotId and endSnapshotId is required"}, + PlanTableScanRequestValidationParam{ + .test_name = "NegativeMinRowsRequested", + .request = {.min_rows_requested = -1}, + .expected_error_msg = "minRowsRequested is negative"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +TEST(PlanTableScanRequestEqualityTest, ComparesFilterByExpressionContent) { + PlanTableScanRequest lhs; + lhs.filter = Expressions::GreaterThan("id", Literal::Int(10)); + + PlanTableScanRequest rhs; + rhs.filter = Expressions::GreaterThan("id", Literal::Int(10)); + + ASSERT_NE(lhs.filter, rhs.filter); + EXPECT_EQ(lhs, rhs); +} + +TEST(PlanTableScanRequestJsonSerdeTest, RoundtripAndDefaults) { + PlanTableScanRequest request; + request.snapshot_id = 123; + request.select = {"id", "data"}; + request.filter = Expressions::GreaterThan("id", Literal::Long(10)); + request.case_sensitive = false; + request.use_snapshot_schema = true; + request.stats_fields = {"id"}; + request.min_rows_requested = 100; + + ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(request)); + auto result = PlanTableScanRequestFromJson(json); + + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(*result, request); + + auto defaults_result = PlanTableScanRequestFromJson(R"({"select":["id"]})"_json); + + ASSERT_THAT(defaults_result, IsOk()); + EXPECT_EQ(defaults_result->select, std::vector{"id"}); + EXPECT_TRUE(defaults_result->case_sensitive); + EXPECT_FALSE(defaults_result->use_snapshot_schema); +} + +TEST(FetchScanTasksRequestJsonSerdeTest, Roundtrip) { + FetchScanTasksRequest request{.planTask = "task-token"}; + + auto json = ToJson(request); + auto result = FetchScanTasksRequestFromJson(json); + + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(*result, request); +} + +struct ScanRequestFromJsonInvalidParam { + std::string test_name; + nlohmann::json json; + std::function(const nlohmann::json&)> parse; + ErrorKind expected_error_kind; + std::string expected_error_msg; +}; + +class ScanRequestFromJsonInvalidTest + : public ::testing::TestWithParam {}; + +TEST_P(ScanRequestFromJsonInvalidTest, InvalidInput) { + const auto& param = GetParam(); + auto result = param.parse(param.json); + ASSERT_THAT(result, IsError(param.expected_error_kind)); + EXPECT_THAT(result, HasErrorMessage(param.expected_error_msg)); +} + +INSTANTIATE_TEST_SUITE_P( + ScanRequestFromJsonInvalidCases, ScanRequestFromJsonInvalidTest, + ::testing::Values( + ScanRequestFromJsonInvalidParam{ + .test_name = "PlanTableScanMissingEndSnapshotId", + .json = R"({"start-snapshot-id":1})"_json, + .parse = [](const nlohmann::json& json) -> Result { + ICEBERG_ASSIGN_OR_RAISE(auto request, PlanTableScanRequestFromJson(json)); + return {}; + }, + .expected_error_kind = ErrorKind::kValidationFailed, + .expected_error_msg = "startSnapshotId and endSnapshotId is required"}, + ScanRequestFromJsonInvalidParam{ + .test_name = "FetchScanTasksMissingPlanTask", + .json = nlohmann::json::object(), + .parse = [](const nlohmann::json& json) -> Result { + ICEBERG_ASSIGN_OR_RAISE(auto request, FetchScanTasksRequestFromJson(json)); + return {}; + }, + .expected_error_kind = ErrorKind::kJsonParseError, + .expected_error_msg = "plan-task"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +// --- DataFileFromJson --- + +TEST(DataFileFromJsonTest, RequiredFieldsOnly) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kData); + EXPECT_EQ(df.file_path, "s3://bucket/data/file.parquet"); + EXPECT_EQ(df.file_format, FileFormatType::kParquet); + EXPECT_EQ(df.file_size_in_bytes, 12345); + EXPECT_EQ(df.record_count, 100); + EXPECT_TRUE(df.column_sizes.empty()); + EXPECT_FALSE(df.sort_order_id.has_value()); + EXPECT_EQ(df.partition_spec_id, PartitionSpec::kInitialSpecId); + EXPECT_EQ(df.partition.num_fields(), 0); +} + +TEST(DataFileFromJsonTest, LowercaseFormat) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.avro", + "file-format": "avro", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 500, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().content, DataFile::Content::kData); + EXPECT_EQ(result.value().file_format, FileFormatType::kAvro); +} + +TEST(DataFileFromJsonTest, WithOptionalFields) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100, + "column-sizes": {"keys": [1, 2], "values": [1000, 2000]}, + "value-counts": {"keys": [1, 2], "values": [100, 100]}, + "null-value-counts": {"keys": [1], "values": [0]}, + "nan-value-counts": {"keys": [2], "values": [5]}, + "split-offsets": [0, 4096], + "sort-order-id": 0 + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.partition_spec_id, PartitionSpec::kInitialSpecId); + ASSERT_EQ(df.column_sizes.size(), 2U); + EXPECT_EQ(df.column_sizes.at(1), 1000); + EXPECT_EQ(df.column_sizes.at(2), 2000); + ASSERT_EQ(df.value_counts.size(), 2U); + EXPECT_EQ(df.value_counts.at(1), 100); + ASSERT_EQ(df.null_value_counts.size(), 1U); + EXPECT_EQ(df.null_value_counts.at(1), 0); + ASSERT_EQ(df.nan_value_counts.size(), 1U); + EXPECT_EQ(df.nan_value_counts.at(2), 5); + ASSERT_EQ(df.split_offsets.size(), 2U); + EXPECT_EQ(df.split_offsets[0], 0); + EXPECT_EQ(df.split_offsets[1], 4096); + EXPECT_EQ(df.sort_order_id, 0); +} + +TEST(DataFileFromJsonTest, EqualityDeleteFile) { + auto json = R"({ + "content": "equality-deletes", + "file-path": "s3://bucket/deletes/eq_delete.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 5000, + "record-count": 50, + "equality-ids": [1, 2] + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kEqualityDeletes); + ASSERT_EQ(df.equality_ids.size(), 2U); + EXPECT_EQ(df.equality_ids[0], 1); + EXPECT_EQ(df.equality_ids[1], 2); +} + +TEST(DataFileFromJsonTest, PositionDeleteFileWithReferencedDataFile) { + auto json = R"({ + "content": "position-deletes", + "file-path": "s3://bucket/deletes/pos_delete.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 3000, + "record-count": 20, + "referenced-data-file": "s3://bucket/data/file.parquet" + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kPositionDeletes); + ASSERT_TRUE(df.referenced_data_file.has_value()); + EXPECT_EQ(df.referenced_data_file.value(), "s3://bucket/data/file.parquet"); +} + +TEST(DataFileFromJsonTest, InvalidContentType) { + auto json = R"({ + "content": "UNKNOWN", + "file-path": "s3://bucket/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Unknown data file content")); +} + +TEST(DataFileFromJsonTest, MissingRequiredField) { + auto json = R"({ + "content": "data", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); +} + +TEST(DataFileFromJsonTest, MissingSpecId) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "partition": [], + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'spec-id'")); +} + +TEST(DataFileFromJsonTest, MissingPartition) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'partition'")); +} + +TEST(DataFileFromJsonTest, NotAnObject) { + auto result = DataFileFromJson(nlohmann::json::array(), {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("DataFile must be a JSON object")); +} + +TEST(DataFileRoundtripTest, PartitionedFileRequiresMatchingSpecForSerialization) { + auto schema = PartitionedSchema(); + auto specs = PartitionedSpecs(schema); + + DataFile df; + df.content = DataFile::Content::kData; + df.file_path = "s3://bucket/data/file.parquet"; + df.file_format = FileFormatType::kParquet; + df.partition_spec_id = 1; + df.partition = PartitionValues({Literal::Int(34)}); + df.file_size_in_bytes = 12345; + df.record_count = 100; + + ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(df, specs, schema)); + EXPECT_EQ(json["partition"], nlohmann::json::array({34})); + + auto parsed = DataFileFromJson(json, specs, schema); + ASSERT_THAT(parsed, IsOk()); + EXPECT_EQ(parsed->partition_spec_id, 1); + ASSERT_EQ(parsed->partition.num_fields(), 1); + EXPECT_EQ(parsed->partition.values()[0], Literal::Int(34)); +} + +DataFile MakeDataFileForSerializationTest(std::optional spec_id, + PartitionValues partition) { + DataFile df; + df.content = DataFile::Content::kData; + df.file_path = "s3://bucket/data/file.parquet"; + df.file_format = FileFormatType::kParquet; + df.partition_spec_id = spec_id; + df.partition = std::move(partition); + df.file_size_in_bytes = 12345; + df.record_count = 100; + return df; +} + +enum class DataFileToJsonInvalidCase { + kPartitionedSpecWithoutPartitionData, + kUnknownSpec, + kMismatchedSpecId, + kMissingSpecId, +}; + +struct DataFileToJsonInvalidParam { + std::string test_name; + DataFileToJsonInvalidCase test_case; + std::string expected_error_msg; +}; + +class DataFileToJsonInvalidTest + : public ::testing::TestWithParam {}; + +TEST_P(DataFileToJsonInvalidTest, InvalidInput) { + const auto& param = GetParam(); + Result result = JsonParseError("uninitialized"); + switch (param.test_case) { + case DataFileToJsonInvalidCase::kPartitionedSpecWithoutPartitionData: { + auto schema = PartitionedSchema(); + result = ToJson(MakeDataFileForSerializationTest(1, PartitionValues{}), + PartitionedSpecs(schema), schema); + break; + } + case DataFileToJsonInvalidCase::kUnknownSpec: { + auto schema = PartitionedSchema(); + result = + ToJson(MakeDataFileForSerializationTest(2, PartitionValues({Literal::Int(34)})), + PartitionedSpecs(schema), schema); + break; + } + case DataFileToJsonInvalidCase::kMismatchedSpecId: { + auto schema = PartitionedSchema(); + auto partitioned_specs = PartitionedSpecs(schema); + auto specs = std::unordered_map>{ + {2, partitioned_specs.at(1)}}; + result = + ToJson(MakeDataFileForSerializationTest(2, PartitionValues({Literal::Int(34)})), + specs, schema); + break; + } + case DataFileToJsonInvalidCase::kMissingSpecId: { + auto schema = PartitionedSchema(); + result = ToJson(MakeDataFileForSerializationTest(std::nullopt, PartitionValues{}), + PartitionedSpecs(schema), schema); + break; + } + } + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage(param.expected_error_msg)); +} + +INSTANTIATE_TEST_SUITE_P( + DataFileToJsonInvalidCases, DataFileToJsonInvalidTest, + ::testing::Values( + DataFileToJsonInvalidParam{ + .test_name = "PartitionedSpecWithoutPartitionData", + .test_case = DataFileToJsonInvalidCase::kPartitionedSpecWithoutPartitionData, + .expected_error_msg = "Invalid partition data from content file"}, + DataFileToJsonInvalidParam{.test_name = "UnknownSpec", + .test_case = DataFileToJsonInvalidCase::kUnknownSpec, + .expected_error_msg = "Invalid partition spec: null"}, + DataFileToJsonInvalidParam{ + .test_name = "MismatchedSpecId", + .test_case = DataFileToJsonInvalidCase::kMismatchedSpecId, + .expected_error_msg = "Invalid partition spec id from content file"}, + DataFileToJsonInvalidParam{ + .test_name = "MissingSpecId", + .test_case = DataFileToJsonInvalidCase::kMissingSpecId, + .expected_error_msg = "Invalid partition spec id from content file"}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +// --- FileScanTasksFromJson --- + +TEST(FileScanTasksFromJsonTest, EmptyArray) { + auto result = FileScanTasksFromJson(nlohmann::json::array(), {}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result.value().empty()); +} + +TEST(FileScanTasksFromJsonTest, SingleTaskNoDeleteFiles) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + } + }])"_json; + + auto result = FileScanTasksFromJson(json, {}, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_NE(task->data_file(), nullptr); + EXPECT_EQ(task->data_file()->file_path, "s3://bucket/data/file.parquet"); + EXPECT_TRUE(task->delete_files().empty()); + EXPECT_EQ(task->residual_filter(), nullptr); +} + +TEST(FileScanTasksFromJsonTest, TaskWithDeleteFileReferences) { + DataFile delete_file; + delete_file.content = DataFile::Content::kPositionDeletes; + delete_file.file_path = "s3://bucket/deletes/pos_delete.parquet"; + delete_file.file_format = FileFormatType::kParquet; + delete_file.partition_spec_id = PartitionSpec::kInitialSpecId; + delete_file.partition = PartitionValues{}; + delete_file.file_size_in_bytes = 1000; + delete_file.record_count = 5; + + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + }])"_json; + + auto result = FileScanTasksFromJson(json, {std::make_shared(delete_file)}, + UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_EQ(task->delete_files().size(), 1U); + EXPECT_EQ(task->delete_files()[0]->file_path, "s3://bucket/deletes/pos_delete.parquet"); +} + +TEST(FileScanTasksFromJsonTest, DeleteFileReferenceOutOfRange) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 100, + "record-count": 10 + }, + "delete-file-references": [5] + }])"_json; + + auto result = FileScanTasksFromJson(json, {}, UnpartitionedSpecs(), Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("out of range")); +} + +TEST(FileScanTasksFromJsonTest, NotAnArray) { + auto result = FileScanTasksFromJson(nlohmann::json::object(), {}, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("non-array")); +} + +// --- Roundtrip tests --- + +TEST(DataFileRoundtripTest, RequiredFieldsOnly) { + DataFile df; + df.content = DataFile::Content::kData; + df.file_path = "s3://bucket/data/file.parquet"; + df.file_format = FileFormatType::kParquet; + df.partition_spec_id = PartitionSpec::kInitialSpecId; + df.partition = PartitionValues{}; + df.file_size_in_bytes = 12345; + df.record_count = 100; + + ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(df, UnpartitionedSpecs(), Schema({}, 0))); + EXPECT_TRUE(json.contains("partition")); + EXPECT_EQ(json["partition"], nlohmann::json::array()); + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), df); +} + +TEST(DataFileRoundtripTest, WithOptionalFields) { + DataFile df; + df.content = DataFile::Content::kPositionDeletes; + df.file_path = "s3://bucket/deletes/pos.parquet"; + df.file_format = FileFormatType::kParquet; + df.file_size_in_bytes = 5000; + df.record_count = 50; + df.partition_spec_id = PartitionSpec::kInitialSpecId; + df.partition = PartitionValues{}; + df.column_sizes = {{1, 1000}, {2, 2000}}; + df.value_counts = {{1, 100}, {2, 100}}; + df.null_value_counts = {{1, 0}}; + df.nan_value_counts = {{2, 5}}; + df.split_offsets = {0, 4096}; + df.sort_order_id = 0; + df.referenced_data_file = "s3://bucket/data/file.parquet"; + + ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(df, UnpartitionedSpecs(), Schema({}, 0))); + auto result = DataFileFromJson(json, UnpartitionedSpecs(), Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value(), df); +} + +TEST(FetchScanTasksResponseRoundtripTest, WithFileScanTasksAndDeleteFiles) { + auto json = nlohmann::json::parse(R"({ + "plan-tasks": [], + "delete-files": [ + { + "content": "position-deletes", + "file-path": "s3://bucket/deletes/delete.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 512, + "record-count": 5 + } + ], + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + } + ] + })"); + + auto result = FetchScanTasksResponseFromJson(json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, UnpartitionedSpecs(), EmptySchema())); + auto result2 = + FetchScanTasksResponseFromJson(roundtrip_json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(FetchScanTasksResponseRoundtripTest, PreservesResidualFilter) { + auto json = nlohmann::json::parse(R"({ + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 0, + "partition": [], + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "residual-filter": {"type": "gt", "term": "id", "value": 21} + } + ] + })"); + + auto result = FetchScanTasksResponseFromJson(json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + ASSERT_EQ(result->file_scan_tasks->size(), 1); + ASSERT_NE(result->file_scan_tasks->at(0)->residual_filter(), nullptr); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, UnpartitionedSpecs(), EmptySchema())); + ASSERT_EQ(roundtrip_json["file-scan-tasks"].size(), 1); + ASSERT_TRUE(roundtrip_json["file-scan-tasks"][0].contains("residual-filter")); + EXPECT_EQ(roundtrip_json["file-scan-tasks"][0]["residual-filter"], + json["file-scan-tasks"][0]["residual-filter"]); + + auto result2 = + FetchScanTasksResponseFromJson(roundtrip_json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(FetchScanTasksResponseRoundtripTest, ToJsonDerivesDeleteFilesFromTasks) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = "s3://bucket/data/file.parquet"; + data_file->file_format = FileFormatType::kParquet; + data_file->partition_spec_id = PartitionSpec::kInitialSpecId; + data_file->partition = PartitionValues{}; + data_file->file_size_in_bytes = 12345; + data_file->record_count = 100; + + auto delete_file = std::make_shared(); + delete_file->content = DataFile::Content::kPositionDeletes; + delete_file->file_path = "s3://bucket/deletes/delete.parquet"; + delete_file->file_format = FileFormatType::kParquet; + delete_file->partition_spec_id = PartitionSpec::kInitialSpecId; + delete_file->partition = PartitionValues{}; + delete_file->file_size_in_bytes = 512; + delete_file->record_count = 5; + + FetchScanTasksResponse response; + response.file_scan_tasks = std::vector>{}; + response.file_scan_tasks->push_back( + std::make_shared(data_file, std::vector{delete_file})); + + ICEBERG_UNWRAP_OR_FAIL(auto json, + ToJson(response, UnpartitionedSpecs(), EmptySchema())); + ASSERT_TRUE(json.contains("delete-files")); + ASSERT_TRUE(json.contains("file-scan-tasks")); + ASSERT_EQ(json["delete-files"].size(), 1); + EXPECT_EQ(json["delete-files"][0]["file-path"], delete_file->file_path); + ASSERT_EQ(json["file-scan-tasks"].size(), 1); + ASSERT_TRUE(json["file-scan-tasks"][0].contains("delete-file-references")); + EXPECT_EQ(json["file-scan-tasks"][0]["delete-file-references"], + nlohmann::json::array({0})); + + auto result = FetchScanTasksResponseFromJson(json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + ASSERT_EQ(result->file_scan_tasks->size(), 1); + ASSERT_EQ(result->file_scan_tasks->at(0)->delete_files().size(), 1); + EXPECT_EQ(result->file_scan_tasks->at(0)->delete_files()[0]->file_path, + delete_file->file_path); +} + +TEST(FetchScanTasksResponseRoundtripTest, ToJsonKeepsDeleteFilesWithSamePath) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = "s3://bucket/data/file.parquet"; + data_file->file_format = FileFormatType::kParquet; + data_file->partition_spec_id = PartitionSpec::kInitialSpecId; + data_file->partition = PartitionValues{}; + data_file->file_size_in_bytes = 12345; + data_file->record_count = 100; + + auto delete_file_a = std::make_shared(); + delete_file_a->content = DataFile::Content::kPositionDeletes; + delete_file_a->file_path = "s3://bucket/deletes/dv.puffin"; + delete_file_a->file_format = FileFormatType::kPuffin; + delete_file_a->partition_spec_id = PartitionSpec::kInitialSpecId; + delete_file_a->partition = PartitionValues{}; + delete_file_a->file_size_in_bytes = 1024; + delete_file_a->record_count = 5; + delete_file_a->content_offset = 10; + delete_file_a->content_size_in_bytes = 100; + + auto delete_file_b = std::make_shared(*delete_file_a); + delete_file_b->content_offset = 110; + delete_file_b->content_size_in_bytes = 120; + + FetchScanTasksResponse response; + response.file_scan_tasks = std::vector>{}; + response.file_scan_tasks->push_back(std::make_shared( + data_file, std::vector{delete_file_a, delete_file_b})); + + ICEBERG_UNWRAP_OR_FAIL(auto json, + ToJson(response, UnpartitionedSpecs(), EmptySchema())); + ASSERT_EQ(json["delete-files"].size(), 2); + EXPECT_EQ(json["file-scan-tasks"][0]["delete-file-references"], + nlohmann::json::array({0, 1})); + + auto result = FetchScanTasksResponseFromJson(json, UnpartitionedSpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result->file_scan_tasks.has_value()); + ASSERT_EQ(result->file_scan_tasks->at(0)->delete_files().size(), 2); + EXPECT_EQ(result->file_scan_tasks->at(0)->delete_files()[0]->content_offset, 10); + EXPECT_EQ(result->file_scan_tasks->at(0)->delete_files()[1]->content_offset, 110); +} + +TEST(PlanTableScanResponseRoundtripTest, SubmittedStatus) { + auto json = nlohmann::json::parse(R"({"status": "submitted", "plan-id": "abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, EmptySpecs(), EmptySchema())); + auto result2 = + PlanTableScanResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(PlanTableScanResponseRoundtripTest, FailedWithError) { + auto json = nlohmann::json::parse( + R"({"status":"failed","error":{"message":"Planning failed","type":"PlanningException","code":500}})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, EmptySpecs(), EmptySchema())); + auto result2 = + PlanTableScanResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(FetchPlanningResultResponseRoundtripTest, CompletedWithPlanTasks) { + auto json = nlohmann::json::parse( + R"({"status": "completed", "plan-tasks": ["task-1", "task-2"]})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, EmptySpecs(), EmptySchema())); + auto result2 = + FetchPlanningResultResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + +TEST(FetchPlanningResultResponseRoundtripTest, FailedWithError) { + auto json = nlohmann::json::parse( + R"({"status":"failed","error":{"message":"Planning failed","type":"PlanningException","code":500}})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto roundtrip_json, + ToJson(*result, EmptySpecs(), EmptySchema())); + auto result2 = + FetchPlanningResultResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result2, IsOk()); + EXPECT_EQ(*result, *result2); +} + } // namespace iceberg::rest