diff --git a/c/driver/postgresql/CMakeLists.txt b/c/driver/postgresql/CMakeLists.txt index 35f56d1d79..a36a397f1d 100644 --- a/c/driver/postgresql/CMakeLists.txt +++ b/c/driver/postgresql/CMakeLists.txt @@ -31,6 +31,7 @@ add_arrow_lib(adbc_driver_postgresql connection.cc error.cc database.cc + ingest_partition.cc postgresql.cc result_helper.cc result_reader.cc @@ -81,6 +82,7 @@ if(ADBC_BUILD_TESTS) EXTRA_LABELS driver-postgresql SOURCES + partitioned_ingest_test.cc postgres_type_test.cc postgresql_test.cc EXTRA_LINK_LIBS diff --git a/c/driver/postgresql/connection.h b/c/driver/postgresql/connection.h index 02e0c4f1bc..1a868c6426 100644 --- a/c/driver/postgresql/connection.h +++ b/c/driver/postgresql/connection.h @@ -66,6 +66,25 @@ class PostgresConnection { AdbcStatusCode Init(struct AdbcDatabase* database, struct AdbcError* error); AdbcStatusCode Release(struct AdbcError* error); AdbcStatusCode Rollback(struct AdbcError* error); + AdbcStatusCode BeginIngestPartitions(const char* target_catalog, + const char* target_db_schema, + const char* target_table, const char* mode, + struct ArrowSchema* schema, + struct AdbcIngestHandle* out_handle, + struct AdbcError* error); + AdbcStatusCode WriteIngestPartition(const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, + struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error); + AdbcStatusCode CommitIngestPartitions(const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, + const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error); + AdbcStatusCode AbortIngestPartitions(const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, + const size_t* receipt_lens, + struct AdbcError* error); + AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error); AdbcStatusCode SetOptionBytes(const char* key, const uint8_t* value, size_t length, struct AdbcError* error); diff --git a/c/driver/postgresql/ingest_partition.cc b/c/driver/postgresql/ingest_partition.cc new file mode 100644 index 0000000000..2ce6f9ea1d --- /dev/null +++ b/c/driver/postgresql/ingest_partition.cc @@ -0,0 +1,622 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#include "ingest_partition.h" + +#include +#include +#include +#include + +#include + +#include "bind_stream.h" +#include "connection.h" +#include "driver/common/utils.h" +#include "error.h" +#include "postgres_type.h" +#include "result_helper.h" + +namespace adbcpq { + +namespace { + +void WriteU32(uint8_t** p, uint32_t v) { + std::memcpy(*p, &v, sizeof(v)); + *p += sizeof(v); +} + +void WriteI64(uint8_t** p, int64_t v) { + std::memcpy(*p, &v, sizeof(v)); + *p += sizeof(v); +} + +void WriteString(uint8_t** p, const std::string& s) { + WriteU32(p, static_cast(s.size())); + std::memcpy(*p, s.data(), s.size()); + *p += s.size(); +} + +bool ReadU32(const uint8_t** p, const uint8_t* end, uint32_t* out) { + if (end - *p < static_cast(sizeof(uint32_t))) return false; + std::memcpy(out, *p, sizeof(uint32_t)); + *p += sizeof(uint32_t); + return true; +} + +bool ReadI64(const uint8_t** p, const uint8_t* end, int64_t* out) { + if (end - *p < static_cast(sizeof(int64_t))) return false; + std::memcpy(out, *p, sizeof(int64_t)); + *p += sizeof(int64_t); + return true; +} + +bool ReadString(const uint8_t** p, const uint8_t* end, std::string* out) { + uint32_t n; + if (!ReadU32(p, end, &n)) return false; + if (end - *p < static_cast(n)) return false; + out->assign(reinterpret_cast(*p), n); + *p += n; + return true; +} + +} // namespace + +void IngestHandle::GenerateId(std::array* out) { + std::random_device rd; + std::mt19937_64 gen(rd()); + uint64_t a = gen(); + uint64_t b = gen(); + std::memcpy(out->data(), &a, 8); + std::memcpy(out->data() + 8, &b, 8); +} + +namespace { +// Staging table name is "adbc_stg_" (9) + 32-hex handle id + "_" (1) + 16-hex +// suffix. Postgres' default NAMEDATALEN is 64, giving a 63-char identifier +// limit before silent truncation — which would cause name collisions and miss +// staging tables during Abort. +constexpr size_t kStagingPrefixLen = 9 + 32 + 1; +constexpr size_t kStagingSuffixLen = 16; +constexpr size_t kIdentMaxLen = 63; +static_assert(kStagingPrefixLen + kStagingSuffixLen <= kIdentMaxLen, + "staging table name would exceed PostgreSQL NAMEDATALEN-1 and be " + "silently truncated"); + +// Commit savepoint name is IngestHandle::kCommitSavepointPrefix + a hex- +// encoded ingest_id. Guard against truncation for the same reason: a truncated +// name would alias across concurrent ingest handles on the same connection. +// The assert references the same constants CommitSavepointName() builds from, +// so a future rename or width change cannot drift past the assert. +constexpr size_t kHexIdLen = 2 * std::tuple_size_v; +static_assert(IngestHandle::kCommitSavepointPrefix.size() + kHexIdLen <= kIdentMaxLen, + "ingest commit savepoint name would exceed PostgreSQL " + "NAMEDATALEN-1 and be silently truncated"); +} // namespace + +std::string IngestHandle::StagingPrefix() const { + return "adbc_stg_" + internal::HexId16(ingest_id) + "_"; +} + +size_t IngestHandle::SerializedSize() const { + return kMagic.size() + ingest_id.size() + sizeof(uint32_t) * 3 + catalog.size() + + db_schema.size() + table.size(); +} + +void IngestHandle::Serialize(uint8_t* out) const { + uint8_t* p = out; + std::memcpy(p, kMagic.data(), kMagic.size()); + p += kMagic.size(); + std::memcpy(p, ingest_id.data(), ingest_id.size()); + p += ingest_id.size(); + WriteString(&p, catalog); + WriteString(&p, db_schema); + WriteString(&p, table); +} + +AdbcStatusCode IngestHandle::Parse(const uint8_t* bytes, size_t len, IngestHandle* out, + struct AdbcError* error) { + const uint8_t* p = bytes; + const uint8_t* end = bytes + len; + if (end - p < static_cast(kMagic.size() + 16)) { + InternalAdbcSetError(error, "[libpq] ingest handle truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::memcmp(p, kMagic.data(), kMagic.size()) != 0) { + InternalAdbcSetError(error, "[libpq] ingest handle bad magic"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + p += kMagic.size(); + std::memcpy(out->ingest_id.data(), p, 16); + p += 16; + if (!ReadString(&p, end, &out->catalog) || + !ReadString(&p, end, &out->db_schema) || + !ReadString(&p, end, &out->table)) { + InternalAdbcSetError(error, "[libpq] ingest handle truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +size_t IngestReceipt::SerializedSize() const { + return kMagic.size() + sizeof(uint32_t) * 3 + staging_schema.size() + + staging_table.size() + escaped_columns.size() + sizeof(int64_t); +} + +void IngestReceipt::Serialize(uint8_t* out) const { + uint8_t* p = out; + std::memcpy(p, kMagic.data(), kMagic.size()); + p += kMagic.size(); + WriteString(&p, staging_schema); + WriteString(&p, staging_table); + WriteString(&p, escaped_columns); + WriteI64(&p, row_count); +} + +AdbcStatusCode IngestReceipt::Parse(const uint8_t* bytes, size_t len, IngestReceipt* out, + struct AdbcError* error) { + const uint8_t* p = bytes; + const uint8_t* end = bytes + len; + if (end - p < static_cast(kMagic.size())) { + InternalAdbcSetError(error, "[libpq] ingest receipt truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::memcmp(p, kMagic.data(), kMagic.size()) != 0) { + InternalAdbcSetError(error, "[libpq] ingest receipt bad magic"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + p += kMagic.size(); + if (!ReadString(&p, end, &out->staging_schema) || + !ReadString(&p, end, &out->staging_table) || + !ReadString(&p, end, &out->escaped_columns) || + !ReadI64(&p, end, &out->row_count)) { + InternalAdbcSetError(error, "[libpq] ingest receipt truncated"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +namespace { + +enum class IngestMode { kCreate, kAppend, kReplace, kCreateAppend }; + +AdbcStatusCode ParseMode(const char* mode, IngestMode* out, struct AdbcError* error) { + if (mode == nullptr) { + InternalAdbcSetError(error, "[libpq] ingest mode is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0) { + *out = IngestMode::kCreate; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_APPEND) == 0) { + *out = IngestMode::kAppend; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_REPLACE) == 0) { + *out = IngestMode::kReplace; + } else if (std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE_APPEND) == 0) { + *out = IngestMode::kCreateAppend; + } else { + InternalAdbcSetError(error, "[libpq] unknown ingest mode: %s", mode); + return ADBC_STATUS_INVALID_ARGUMENT; + } + return ADBC_STATUS_OK; +} + +std::string EscapeIdent(PGconn* conn, const std::string& s, struct AdbcError* error, + AdbcStatusCode* status) { + char* esc = PQescapeIdentifier(conn, s.data(), s.size()); + if (esc == nullptr) { + InternalAdbcSetError(error, "[libpq] failed to escape identifier %s: %s", s.c_str(), + PQerrorMessage(conn)); + *status = ADBC_STATUS_INTERNAL; + return {}; + } + std::string out = esc; + PQfreemem(esc); + return out; +} + +AdbcStatusCode ResolveCurrentSchema(PGconn* conn, std::string* out, + struct AdbcError* error) { + PqResultHelper r(conn, "SELECT CURRENT_SCHEMA()"); + Status st = r.Execute(); + if (!st.ok()) { + return st.ToAdbc(error); + } + auto it = r.begin(); + if (it == r.end()) { + InternalAdbcSetError(error, "[libpq] CURRENT_SCHEMA returned no rows"); + return ADBC_STATUS_INTERNAL; + } + *out = (*it)[0].data; + return ADBC_STATUS_OK; +} + +AdbcStatusCode ExecSimple(PGconn* conn, const std::string& sql, struct AdbcError* error) { + PGresult* result = PQexec(conn, sql.c_str()); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + AdbcStatusCode code = + SetError(error, result, "[libpq] %s\nQuery was: %s", PQerrorMessage(conn), + sql.c_str()); + PQclear(result); + return code; + } + PQclear(result); + return ADBC_STATUS_OK; +} + +// Build CREATE TABLE statement from an Arrow schema. `escaped_qualified_table` is +// the already-escaped, schema-qualified target (e.g. `"public"."t"`). +AdbcStatusCode BuildCreateTable(PGconn* conn, const PostgresTypeResolver& resolver, + const std::string& escaped_qualified_table, + const struct ArrowSchema& schema, std::string* sql_out, + struct AdbcError* error) { + std::string sql = "CREATE TABLE " + escaped_qualified_table + " ("; + for (int64_t i = 0; i < schema.n_children; i++) { + if (i > 0) sql += ", "; + AdbcStatusCode code = ADBC_STATUS_OK; + std::string col = EscapeIdent(conn, schema.children[i]->name, error, &code); + if (code != ADBC_STATUS_OK) return code; + sql += col; + sql += " "; + + PostgresType pg_type; + struct ArrowError na_error; + int rc = + PostgresType::FromSchema(resolver, schema.children[i], &pg_type, &na_error); + if (rc != NANOARROW_OK) { + InternalAdbcSetError(error, "[libpq] cannot map column %s: %s", + schema.children[i]->name, na_error.message); + return ADBC_STATUS_INTERNAL; + } + sql += pg_type.sql_type_name(); + } + sql += ")"; + *sql_out = std::move(sql); + return ADBC_STATUS_OK; +} + +} // namespace + +AdbcStatusCode PostgresConnection::BeginIngestPartitions( + const char* target_catalog, const char* target_db_schema, const char* target_table, + const char* mode, struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (target_catalog != nullptr && *target_catalog != '\0') { + InternalAdbcSetError(error, + "[libpq] target_catalog is not supported for partitioned ingest"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (target_table == nullptr || *target_table == '\0') { + InternalAdbcSetError(error, "[libpq] target_table is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + IngestMode parsed_mode; + AdbcStatusCode code = ParseMode(mode, &parsed_mode, error); + if (code != ADBC_STATUS_OK) return code; + + bool needs_create = parsed_mode != IngestMode::kAppend; + if (needs_create && (schema == nullptr || schema->release == nullptr)) { + InternalAdbcSetError( + error, "[libpq] schema is required for create/replace/create_append modes"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + IngestHandle handle; + IngestHandle::GenerateId(&handle.ingest_id); + handle.table = target_table; + if (target_db_schema != nullptr && *target_db_schema != '\0') { + handle.db_schema = target_db_schema; + } else { + code = ResolveCurrentSchema(conn_, &handle.db_schema, error); + if (code != ADBC_STATUS_OK) return code; + } + + // Build the escaped, schema-qualified target name. + std::string escaped_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_table = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string qualified = escaped_schema + "." + escaped_table; + + if (parsed_mode == IngestMode::kReplace) { + code = ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified, error); + if (code != ADBC_STATUS_OK) return code; + } + + if (needs_create) { + std::string create_sql; + code = BuildCreateTable(conn_, *type_resolver_, qualified, *schema, &create_sql, + error); + if (code != ADBC_STATUS_OK) return code; + if (parsed_mode == IngestMode::kCreateAppend) { + // Replace "CREATE TABLE " with "CREATE TABLE IF NOT EXISTS " + create_sql.insert(std::strlen("CREATE TABLE "), "IF NOT EXISTS "); + } + code = ExecSimple(conn_, create_sql, error); + if (code != ADBC_STATUS_OK) return code; + } + + auto* buf = new std::vector(handle.SerializedSize()); + handle.Serialize(buf->data()); + out_handle->length = buf->size(); + out_handle->bytes = buf->data(); + out_handle->private_data = buf; + out_handle->release = [](struct AdbcIngestHandle* self) { + delete reinterpret_cast*>(self->private_data); + self->private_data = nullptr; + self->bytes = nullptr; + self->length = 0; + self->release = nullptr; + }; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::WriteIngestPartition( + const uint8_t* handle_bytes, size_t handle_len, struct ArrowArrayStream* data, + struct AdbcIngestReceipt* out_receipt, struct AdbcError* error) { + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + if (data == nullptr || data->release == nullptr) { + InternalAdbcSetError(error, "[libpq] data stream is required"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + + // Generate a unique staging table name scoped to the handle. + std::array suffix_bytes; + { + std::random_device rd; + std::mt19937_64 gen(rd()); + uint64_t v = gen(); + std::memcpy(suffix_bytes.data(), &v, 8); + } + static const char kHex[] = "0123456789abcdef"; + std::string suffix(16, '0'); + for (size_t i = 0; i < 8; i++) { + suffix[2 * i] = kHex[suffix_bytes[i] >> 4]; + suffix[2 * i + 1] = kHex[suffix_bytes[i] & 0x0F]; + } + std::string staging_table = handle.StagingPrefix() + suffix; + + std::string escaped_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_target = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_staging = EscapeIdent(conn_, staging_table, error, &code); + if (code != ADBC_STATUS_OK) return code; + + std::string qualified_target = escaped_schema + "." + escaped_target; + std::string qualified_staging = escaped_schema + "." + escaped_staging; + + // Initialize the bind stream so we can pull a schema for the column list. + BindStream bind_stream; + bind_stream.SetBind(data); + + std::string escaped_columns; + Status begin_st = bind_stream.Begin([&]() -> Status { + AdbcStatusCode inner = ADBC_STATUS_OK; + for (int64_t i = 0; i < bind_stream.bind_schema->n_children; i++) { + if (i > 0) escaped_columns += ", "; + std::string col = EscapeIdent(conn_, bind_stream.bind_schema->children[i]->name, + error, &inner); + if (inner != ADBC_STATUS_OK) { + return Status::Internal("[libpq] failed to escape column name"); + } + escaped_columns += col; + } + return Status::Ok(); + }); + if (!begin_st.ok()) return begin_st.ToAdbc(error); + + // CREATE UNLOGGED TABLE staging (LIKE target). Constraints/defaults are + // intentionally not copied — staging holds raw rows; the commit INSERT applies + // target's defaults via the explicit column list. + code = ExecSimple( + conn_, "CREATE UNLOGGED TABLE " + qualified_staging + " (LIKE " + qualified_target + + ")", + error); + if (code != ADBC_STATUS_OK) return code; + + // Issue COPY ... FROM STDIN, then stream data via ExecuteCopy. + std::string copy_sql = "COPY " + qualified_staging + " (" + escaped_columns + + ") FROM STDIN WITH (FORMAT binary)"; + PGresult* result = PQexec(conn_, copy_sql.c_str()); + if (PQresultStatus(result) != PGRES_COPY_IN) { + AdbcStatusCode err = SetError(error, result, "[libpq] COPY failed: %s\nQuery: %s", + PQerrorMessage(conn_), copy_sql.c_str()); + PQclear(result); + ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified_staging, nullptr); + return err; + } + PQclear(result); + + int64_t rows_written = 0; + Status copy_st = bind_stream.ExecuteCopy(conn_, *type_resolver_, &rows_written); + if (!copy_st.ok()) { + ExecSimple(conn_, "DROP TABLE IF EXISTS " + qualified_staging, nullptr); + return copy_st.ToAdbc(error); + } + + IngestReceipt receipt; + receipt.staging_schema = handle.db_schema; + receipt.staging_table = staging_table; + receipt.escaped_columns = escaped_columns; + receipt.row_count = rows_written; + + // Write does irrecoverable work; unlike Begin we do not support two-phase + // sizing. Caller must provide a reasonable buffer (a few KB is plenty). + auto* buf = new std::vector(receipt.SerializedSize()); + receipt.Serialize(buf->data()); + out_receipt->length = buf->size(); + out_receipt->bytes = buf->data(); + out_receipt->private_data = buf; + out_receipt->release = [](struct AdbcIngestReceipt* self) { + delete reinterpret_cast*>(self->private_data); + self->private_data = nullptr; + self->bytes = nullptr; + self->length = 0; + self->release = nullptr; + }; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::CommitIngestPartitions( + const uint8_t* handle_bytes, size_t handle_len, size_t num_receipts, + const uint8_t** receipts, const size_t* receipt_lens, int64_t* rows_affected, + struct AdbcError* error) { + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + std::vector parsed(num_receipts); + for (size_t i = 0; i < num_receipts; i++) { + code = IngestReceipt::Parse(receipts[i], receipt_lens[i], &parsed[i], error); + if (code != ADBC_STATUS_OK) return code; + } + + std::string escaped_target_schema = EscapeIdent(conn_, handle.db_schema, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string escaped_target_table = EscapeIdent(conn_, handle.table, error, &code); + if (code != ADBC_STATUS_OK) return code; + std::string qualified_target = escaped_target_schema + "." + escaped_target_table; + + // Decide how to scope the commit to avoid silently mutating caller transaction + // state: when no outer transaction is open, use BEGIN/COMMIT; when one is + // already active, use a SAVEPOINT so we only release ingest-local work and + // leave the caller's outer transaction intact. Reject error/unknown states. + PGTransactionStatusType txn_status = PQtransactionStatus(conn_); + bool use_savepoint; + switch (txn_status) { + case PQTRANS_IDLE: + use_savepoint = false; + break; + case PQTRANS_INTRANS: + use_savepoint = true; + break; + default: + InternalAdbcSetError( + error, + "[libpq] cannot commit partitioned ingest: connection transaction state " + "is not idle or in-transaction (status=%d)", + static_cast(txn_status)); + return ADBC_STATUS_INVALID_STATE; + } + + // Derive a unique savepoint name from the handle's ingest_id so the driver + // cannot collide with a caller-managed savepoint of the same name. Only used + // when use_savepoint is true, but cheap to compute unconditionally. + const std::string savepoint_name = handle.CommitSavepointName(); + const std::string open_sql = + use_savepoint ? std::string("SAVEPOINT ") + savepoint_name : "BEGIN"; + const std::string commit_sql = + use_savepoint ? std::string("RELEASE SAVEPOINT ") + savepoint_name : "COMMIT"; + const std::string rollback_sql = use_savepoint ? std::string("ROLLBACK TO SAVEPOINT ") + + savepoint_name + : "ROLLBACK"; + + // ROLLBACK TO SAVEPOINT leaves the savepoint on the stack in PG, so also + // RELEASE it after rollback to restore the caller's savepoint stack to its + // pre-call shape. Errors from these cleanup statements are intentionally + // discarded (nullptr error sink): the caller's `error` already holds the + // first-cause message from the failing step, and surfacing a secondary + // cleanup failure would overwrite it with a less useful diagnostic. + auto abort_ingest = [&]() { + ExecSimple(conn_, rollback_sql, nullptr); + if (use_savepoint) { + ExecSimple(conn_, std::string("RELEASE SAVEPOINT ") + savepoint_name, nullptr); + } + }; + + code = ExecSimple(conn_, open_sql, error); + if (code != ADBC_STATUS_OK) return code; + + int64_t total_rows = 0; + for (const auto& r : parsed) { + std::string esc_sch = EscapeIdent(conn_, r.staging_schema, error, &code); + if (code != ADBC_STATUS_OK) { + abort_ingest(); + return code; + } + std::string esc_tbl = EscapeIdent(conn_, r.staging_table, error, &code); + if (code != ADBC_STATUS_OK) { + abort_ingest(); + return code; + } + std::string qualified_staging = esc_sch + "." + esc_tbl; + + std::string insert = "INSERT INTO " + qualified_target + " (" + r.escaped_columns + + ") SELECT " + r.escaped_columns + " FROM " + qualified_staging; + code = ExecSimple(conn_, insert, error); + if (code != ADBC_STATUS_OK) { + abort_ingest(); + return code; + } + code = ExecSimple(conn_, "DROP TABLE " + qualified_staging, error); + if (code != ADBC_STATUS_OK) { + abort_ingest(); + return code; + } + total_rows += r.row_count; + } + + code = ExecSimple(conn_, commit_sql, error); + if (code != ADBC_STATUS_OK) { + abort_ingest(); + return code; + } + + if (rows_affected != nullptr) *rows_affected = total_rows; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PostgresConnection::AbortIngestPartitions( + const uint8_t* handle_bytes, size_t handle_len, size_t num_receipts, + const uint8_t** receipts, const size_t* receipt_lens, struct AdbcError* error) { + (void)num_receipts; + (void)receipts; + (void)receipt_lens; + // Receipts are a hint. The handle is the authority for cleanup scope: + // enumerate every staging table matching the handle's prefix and drop it. + IngestHandle handle; + AdbcStatusCode code = IngestHandle::Parse(handle_bytes, handle_len, &handle, error); + if (code != ADBC_STATUS_OK) return code; + + std::string prefix = handle.StagingPrefix(); + // pg LIKE pattern: escape '%' and '_' in the prefix (not present in our prefix + // by construction, but defensive). + std::string like_pattern; + for (char c : prefix) { + if (c == '%' || c == '_' || c == '\\') like_pattern += '\\'; + like_pattern += c; + } + like_pattern += '%'; + + PqResultHelper q( + conn_, + "SELECT table_schema, table_name FROM information_schema.tables " + "WHERE table_schema = $1 AND table_name LIKE $2"); + Status st = q.Execute({handle.db_schema, like_pattern}); + if (!st.ok()) return st.ToAdbc(error); + + for (auto row : q) { + AdbcStatusCode inner = ADBC_STATUS_OK; + std::string esc_sch = EscapeIdent(conn_, row[0].data, error, &inner); + if (inner != ADBC_STATUS_OK) return inner; + std::string esc_tbl = EscapeIdent(conn_, row[1].data, error, &inner); + if (inner != ADBC_STATUS_OK) return inner; + // Best-effort: ignore individual drop failures so one orphan doesn't block others. + ExecSimple(conn_, "DROP TABLE IF EXISTS " + esc_sch + "." + esc_tbl, nullptr); + } + + return ADBC_STATUS_OK; +} + +} // namespace adbcpq diff --git a/c/driver/postgresql/ingest_partition.h b/c/driver/postgresql/ingest_partition.h new file mode 100644 index 0000000000..9a26d0ab5b --- /dev/null +++ b/c/driver/postgresql/ingest_partition.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace adbcpq { + +namespace internal { +// Hex-encode a 16-byte ingest id. Header-inline so tests linking against the +// shared driver library (which hides internal symbols) can reuse it. +inline std::string HexId16(const std::array& id) { + static constexpr char kHex[] = "0123456789abcdef"; + std::string s(32, '0'); + for (size_t i = 0; i < 16; i++) { + s[2 * i] = kHex[id[i] >> 4]; + s[2 * i + 1] = kHex[id[i] & 0x0F]; + } + return s; +} +} // namespace internal + +// Wire format for the partitioned-ingest handle. Opaque to callers; symmetric +// across coordinator and workers. +struct IngestHandle { + static constexpr std::array kMagic = {'P', 'I', 'H', '1'}; + // Prefix for the savepoint Commit installs around its INSERT loop when the + // caller is already inside a transaction. Exposed so tests can reproduce the + // exact name without duplicating the literal. + static constexpr std::string_view kCommitSavepointPrefix = "adbc_ingest_commit_"; + + std::array ingest_id; + std::string catalog; + std::string db_schema; + std::string table; + + // staging table prefix shared by all writes scoped to this handle. + // Driver-internal — used by Abort to enumerate orphans. + std::string StagingPrefix() const; + + // Savepoint name used by Commit when wrapping the INSERT loop in a + // caller-provided transaction. Single source of truth shared between the + // driver and tests; defined inline so tests don't need to link against the + // driver-internal symbol. + std::string CommitSavepointName() const { + return std::string(kCommitSavepointPrefix) + internal::HexId16(ingest_id); + } + + size_t SerializedSize() const; + void Serialize(uint8_t* out) const; + static AdbcStatusCode Parse(const uint8_t* bytes, size_t len, IngestHandle* out, + struct AdbcError* error); + + static void GenerateId(std::array* out); +}; + +// Wire format for a per-partition receipt. +struct IngestReceipt { + static constexpr std::array kMagic = {'P', 'I', 'R', '1'}; + + std::string staging_schema; // empty -> default schema + std::string staging_table; + // Already-escaped, comma-separated column list (e.g. `"a", "b"`). Used by + // Commit to construct INSERT INTO target (cols) SELECT cols FROM staging. + std::string escaped_columns; + int64_t row_count = 0; + + size_t SerializedSize() const; + void Serialize(uint8_t* out) const; + static AdbcStatusCode Parse(const uint8_t* bytes, size_t len, IngestReceipt* out, + struct AdbcError* error); +}; + +} // namespace adbcpq diff --git a/c/driver/postgresql/meson.build b/c/driver/postgresql/meson.build index de9a119afa..aa4a7c8425 100644 --- a/c/driver/postgresql/meson.build +++ b/c/driver/postgresql/meson.build @@ -23,6 +23,7 @@ adbc_postgres_driver_lib = library( 'connection.cc', 'error.cc', 'database.cc', + 'ingest_partition.cc', 'postgresql.cc', 'result_helper.cc', 'result_reader.cc', diff --git a/c/driver/postgresql/partitioned_ingest_test.cc b/c/driver/postgresql/partitioned_ingest_test.cc new file mode 100644 index 0000000000..a44e6e6ef4 --- /dev/null +++ b/c/driver/postgresql/partitioned_ingest_test.cc @@ -0,0 +1,654 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "postgresql/ingest_partition.h" +#include "validation/adbc_validation_util.h" + +namespace { + +const char* RequireUri() { + const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI"); + if (!uri) { + ADD_FAILURE() << "ADBC_POSTGRESQL_TEST_URI must be set"; + } + return uri; +} + +struct ConnPair { + AdbcDatabase db{}; + AdbcConnection conn{}; +}; + +void OpenConn(ConnPair* p, AdbcError* error, const char* uri) { + ASSERT_EQ(AdbcDatabaseNew(&p->db, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcDatabaseSetOption(&p->db, "uri", uri, error), ADBC_STATUS_OK) + << error->message; + ASSERT_EQ(AdbcDatabaseInit(&p->db, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcConnectionNew(&p->conn, error), ADBC_STATUS_OK) << error->message; + ASSERT_EQ(AdbcConnectionInit(&p->conn, &p->db, error), ADBC_STATUS_OK) + << error->message; +} + +void CloseConn(ConnPair* p, AdbcError* error) { + AdbcConnectionRelease(&p->conn, error); + AdbcDatabaseRelease(&p->db, error); +} + +// Convenience: drop the test target table and any leftover staging tables. +void Cleanup(ConnPair* c, const std::string& table, AdbcError* error) { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c->conn, &stmt, error), ADBC_STATUS_OK); + std::string sql = "DROP TABLE IF EXISTS " + table; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), error), ADBC_STATUS_OK); + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, error); + AdbcStatementRelease(&stmt, error); +} + +// Read a count from the target table. +int64_t SelectCount(ConnPair* c, const std::string& table, AdbcError* error) { + AdbcStatement stmt{}; + EXPECT_EQ(AdbcStatementNew(&c->conn, &stmt, error), ADBC_STATUS_OK); + std::string sql = "SELECT COUNT(*) FROM " + table; + EXPECT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), error), ADBC_STATUS_OK); + + ArrowArrayStream stream{}; + int64_t rows_affected = 0; + EXPECT_EQ(AdbcStatementExecuteQuery(&stmt, &stream, &rows_affected, error), + ADBC_STATUS_OK) + << error->message; + + ArrowSchema schema{}; + ArrowArray batch{}; + stream.get_schema(&stream, &schema); + stream.get_next(&stream, &batch); + + int64_t count = 0; + if (batch.length > 0) { + nanoarrow::UniqueArrayView view; + ArrowArrayViewInitFromSchema(view.get(), &schema, nullptr); + ArrowArrayViewSetArray(view.get(), &batch, nullptr); + count = ArrowArrayViewGetIntUnsafe(view->children[0], 0); + } + + if (batch.release) batch.release(&batch); + if (schema.release) schema.release(&schema); + stream.release(&stream); + AdbcStatementRelease(&stmt, error); + return count; +} + +// Helper: build a struct schema { id int32, label utf8 }. +void MakeIngestSchema(ArrowSchema* out) { + ASSERT_EQ(adbc_validation::MakeSchema( + out, {{"id", NANOARROW_TYPE_INT32}, {"label", NANOARROW_TYPE_STRING}}), + 0); +} + +// Helper: build a one-batch stream of N rows starting at `start_id`. +void MakeBatchStream(ArrowArrayStream* stream, int32_t start_id, int32_t n) { + ArrowSchema schema{}; + MakeIngestSchema(&schema); + + ArrowArray batch{}; + std::vector> ids; + std::vector> labels; + ids.reserve(n); + labels.reserve(n); + for (int32_t i = 0; i < n; i++) { + ids.push_back(start_id + i); + labels.push_back("row-" + std::to_string(start_id + i)); + } + ArrowError na_error{}; + ASSERT_EQ(adbc_validation::MakeBatch(&schema, &batch, &na_error, ids, labels), 0); + + std::vector batches; + batches.push_back(batch); + adbc_validation::MakeStream(stream, &schema, std::move(batches)); +} + +} // namespace + +class PostgresPartitionedIngestTest : public ::testing::Test {}; + +TEST_F(PostgresPartitionedIngestTest, CreateThenWriteThenCommit) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, /*catalog=*/nullptr, /*db_schema=*/nullptr, + table.c_str(), ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, + &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Spawn N workers, each on its own connection, each writing a partition. + constexpr int kNumWorkers = 4; + constexpr int kRowsPerWorker = 1000; + std::vector> receipts(kNumWorkers); + std::vector workers; + std::mutex err_mu; + std::vector worker_errors; + + for (int w = 0; w < kNumWorkers; w++) { + workers.emplace_back([&, w]() { + AdbcError werr = ADBC_ERROR_INIT; + ConnPair wp; + OpenConn(&wp, &werr, uri); + + ArrowArrayStream stream{}; + MakeBatchStream(&stream, w * kRowsPerWorker, kRowsPerWorker); + + AdbcIngestReceipt rec{}; + AdbcStatusCode rc = AdbcConnectionWriteIngestPartition( + &wp.conn, handle.bytes, handle.length, &stream, &rec, &werr); + if (rc != ADBC_STATUS_OK) { + std::lock_guard g(err_mu); + worker_errors.push_back(std::string("write: ") + + (werr.message ? werr.message : "")); + } else { + // Copy receipt bytes out so we can release the driver-owned struct + // immediately. Mirrors the cross-process flow: caller serializes the + // bytes and ships them to the coordinator. + receipts[w].assign(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + } + CloseConn(&wp, &werr); + }); + } + for (auto& t : workers) t.join(); + ASSERT_TRUE(worker_errors.empty()) << worker_errors[0]; + + // Commit. + std::vector rec_ptrs(kNumWorkers); + std::vector rec_lens(kNumWorkers); + for (int i = 0; i < kNumWorkers; i++) { + rec_ptrs[i] = receipts[i].data(); + rec_lens[i] = receipts[i].size(); + } + int64_t rows_committed = 0; + ASSERT_EQ(AdbcConnectionCommitIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, kNumWorkers, + rec_ptrs.data(), rec_lens.data(), + &rows_committed, &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(rows_committed, kNumWorkers * kRowsPerWorker); + + // Verify target row count. + EXPECT_EQ(SelectCount(&coordinator, table, &error), kNumWorkers * kRowsPerWorker); + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + +TEST_F(PostgresPartitionedIngestTest, AbortDropsAllStagingIncludingOrphans) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_abort_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair c; + OpenConn(&c, &error, uri); + Cleanup(&c, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &c.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Write three partitions but only collect two receipts (simulate one lost). + std::vector> receipts; + for (int w = 0; w < 3; w++) { + ArrowArrayStream stream{}; + MakeBatchStream(&stream, w * 10, 10); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&c.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + if (w < 2) receipts.emplace_back(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + } + + // Abort with only the two known receipts. Driver must clean up the orphan too. + std::vector rec_ptrs; + std::vector rec_lens; + for (auto& r : receipts) { + rec_ptrs.push_back(r.data()); + rec_lens.push_back(r.size()); + } + ASSERT_EQ(AdbcConnectionAbortIngestPartitions(&c.conn, handle.bytes, handle.length, + rec_ptrs.size(), rec_ptrs.data(), + rec_lens.data(), &error), + ADBC_STATUS_OK) + << error.message; + + // Derive the handle's staging prefix from the wire format (4-byte magic + // "PIH1" + 16-byte id) so the verification query is scoped to this test's + // handle and not to every ingest ever run against the database. + ASSERT_GE(handle.length, static_cast(4 + 16)); + std::string handle_prefix = "adbc_stg_"; + static const char kHex[] = "0123456789abcdef"; + for (size_t i = 0; i < 16; i++) { + uint8_t b = handle.bytes[4 + i]; + handle_prefix += kHex[b >> 4]; + handle_prefix += kHex[b & 0x0F]; + } + handle_prefix += '_'; + + // Verify no staging tables left under the handle's prefix. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c.conn, &stmt, &error), ADBC_STATUS_OK); + std::string count_sql = + "SELECT COUNT(*) FROM information_schema.tables " + "WHERE table_schema = current_schema() " + "AND table_name LIKE '" + + handle_prefix + "%'"; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, count_sql.c_str(), &error), ADBC_STATUS_OK); + ArrowArrayStream stream{}; + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, &stream, nullptr, &error), ADBC_STATUS_OK); + ArrowSchema schema{}; + ArrowArray batch{}; + stream.get_schema(&stream, &schema); + stream.get_next(&stream, &batch); + nanoarrow::UniqueArrayView view; + ArrowArrayViewInitFromSchema(view.get(), &schema, nullptr); + ArrowArrayViewSetArray(view.get(), &batch, nullptr); + int64_t leftover = ArrowArrayViewGetIntUnsafe(view->children[0], 0); + EXPECT_EQ(leftover, 0); + + if (batch.release) batch.release(&batch); + if (schema.release) schema.release(&schema); + stream.release(&stream); + AdbcStatementRelease(&stmt, &error); + + handle.release(&handle); + Cleanup(&c, table, &error); + CloseConn(&c, &error); +} + +// With the coordinator connection already inside an outer transaction, +// CommitIngestPartitions must take the SAVEPOINT path: the ingest rows become +// visible to in-transaction SELECTs, and the lifetime of the ingest is tied to +// the outer transaction (persists on COMMIT, rolls back on ROLLBACK). +TEST_F(PostgresPartitionedIngestTest, CommitInsideOuterTransactionUsesSavepoint) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_savepoint_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // One worker writes one partition on a separate connection. + constexpr int32_t kRows = 25; + ConnPair worker; + OpenConn(&worker, &error, uri); + ArrowArrayStream stream{}; + MakeBatchStream(&stream, /*start_id=*/0, kRows); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&worker.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + CloseConn(&worker, &error); + + // Put the coordinator connection into an outer transaction so Commit must + // take the SAVEPOINT branch. Some drivers defer BEGIN to the next statement, + // which would leave libpq in PQTRANS_IDLE and silently take the BEGIN/COMMIT + // branch instead; issue a trivial SELECT to force the transaction open. + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + { + // Run a no-result statement so libpq ends in PQTRANS_INTRANS rather than + // PQTRANS_ACTIVE (which a streaming SELECT would leave behind until the + // cursor is drained). DO $$ BEGIN END $$ executes and returns no rows. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "DO $$ BEGIN END $$", &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + } + + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + int64_t rows_committed = 0; + ASSERT_EQ(AdbcConnectionCommitIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, 1, &rec_ptr, &rec_len, + &rows_committed, &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(rows_committed, kRows); + + // (a) Rows visible to in-transaction SELECT on the same connection. + EXPECT_EQ(SelectCount(&coordinator, table, &error), kRows); + + // (b) A caller-driven ROLLBACK undoes the ingest: RELEASE SAVEPOINT merges + // the ingest work into the outer transaction, and rolling back the outer + // transaction rolls back everything in it. + ASSERT_EQ(AdbcConnectionRollback(&coordinator.conn, &error), ADBC_STATUS_OK) + << error.message; + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + "true", &error), + ADBC_STATUS_OK) + << error.message; + EXPECT_EQ(SelectCount(&coordinator, table, &error), 0); + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + +// When CommitIngestPartitions fails mid-loop while inside an outer +// transaction, the savepoint must be fully unwound: ROLLBACK TO SAVEPOINT +// followed by RELEASE SAVEPOINT. If the RELEASE is missed, the caller's +// savepoint stack leaks an entry, and if the rollback is skipped the outer +// transaction is left in PQTRANS_INERROR. Force a failure by dropping a +// staging table out-of-band before Commit runs — the INSERT FROM that +// staging table will fail — then verify the outer transaction is still +// usable (not in error state) and that the next caller SAVEPOINT of the +// same name succeeds (proving the driver's savepoint was released). +TEST_F(PostgresPartitionedIngestTest, CommitFailureInOuterTxnReleasesSavepoint) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_savepoint_abort_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair coordinator; + OpenConn(&coordinator, &error, uri); + Cleanup(&coordinator, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &coordinator.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + // Write one partition on a separate worker connection. + constexpr int32_t kRows = 10; + ConnPair worker; + OpenConn(&worker, &error, uri); + ArrowArrayStream stream{}; + MakeBatchStream(&stream, /*start_id=*/0, kRows); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&worker.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + CloseConn(&worker, &error); + + // Derive the staging table name from the receipt wire format (4-byte + // "PIR1" magic + u32 schema len + schema + u32 table len + table + ...). + // The production wire format uses host-endian u32 (see WriteU32/ReadU32 in + // ingest_partition.cc) — match that here, with strict bounds checks so a + // malformed length cannot run rp past the end of the buffer. + auto receipt_remaining = [&](const uint8_t* rp) -> size_t { + return receipt_bytes.size() - static_cast(rp - receipt_bytes.data()); + }; + ASSERT_GE(receipt_bytes.size(), static_cast(4 + 4)); + const uint8_t* rp = receipt_bytes.data() + 4; + uint32_t schema_len = 0; + std::memcpy(&schema_len, rp, sizeof(schema_len)); + rp += sizeof(schema_len); + ASSERT_LE(schema_len, receipt_remaining(rp)); + std::string staging_schema(reinterpret_cast(rp), schema_len); + rp += schema_len; + ASSERT_GE(receipt_remaining(rp), sizeof(uint32_t)); + uint32_t tbl_len = 0; + std::memcpy(&tbl_len, rp, sizeof(tbl_len)); + rp += sizeof(tbl_len); + ASSERT_LE(tbl_len, receipt_remaining(rp)); + std::string staging_table(reinterpret_cast(rp), tbl_len); + std::string qualified_staging = + "\"" + staging_schema + "\".\"" + staging_table + "\""; + + // Drop the staging table out-of-band so the Commit loop's INSERT will fail. + { + ConnPair saboteur; + OpenConn(&saboteur, &error, uri); + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&saboteur.conn, &stmt, &error), ADBC_STATUS_OK); + std::string drop_sql = "DROP TABLE " + qualified_staging; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, drop_sql.c_str(), &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + CloseConn(&saboteur, &error); + } + + // Put the coordinator into an outer transaction and force libpq to + // PQTRANS_INTRANS before calling Commit. + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + { + // Run a no-result statement so libpq ends in PQTRANS_INTRANS rather than + // PQTRANS_ACTIVE (which a streaming SELECT would leave behind until the + // cursor is drained). DO $$ BEGIN END $$ executes and returns no rows. + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "DO $$ BEGIN END $$", &error), + ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error), + ADBC_STATUS_OK) + << error.message; + AdbcStatementRelease(&stmt, &error); + } + + // Commit must fail, but the outer transaction must remain usable. + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + AdbcError commit_err = ADBC_ERROR_INIT; + AdbcStatusCode rc = AdbcConnectionCommitIngestPartitions( + &coordinator.conn, handle.bytes, handle.length, 1, &rec_ptr, &rec_len, + /*rows_affected=*/nullptr, &commit_err); + EXPECT_NE(rc, ADBC_STATUS_OK); + if (commit_err.release) commit_err.release(&commit_err); + + // Post-failure, the outer transaction must still be usable: a SELECT must + // succeed (would be rejected with "current transaction is aborted" if the + // savepoint rollback was skipped). + { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, "SELECT 2", &error), ADBC_STATUS_OK); + ArrowArrayStream post{}; + ASSERT_EQ(AdbcStatementExecuteQuery(&stmt, &post, nullptr, &error), ADBC_STATUS_OK) + << error.message; + post.release(&post); + AdbcStatementRelease(&stmt, &error); + } + + // The driver's savepoint must have been RELEASEd (not left on the stack): + // if it were still live, a caller SAVEPOINT of the same name would still + // work, so instead prove release by issuing a ROLLBACK TO on the driver's + // savepoint name and expecting it to fail ("savepoint does not exist"). + // That asserts the savepoint is no longer on the stack after abort. Build + // the name from IngestHandle::kCommitSavepointPrefix and the inline HexId16 + // helper so the probe shares the literal and the encoding with the driver + // — a future rename forces this test to update too. + ASSERT_GE(handle.length, static_cast(4 + 16)); + std::array ingest_id{}; + std::memcpy(ingest_id.data(), handle.bytes + 4, ingest_id.size()); + const std::string driver_savepoint = + std::string(adbcpq::IngestHandle::kCommitSavepointPrefix) + + adbcpq::internal::HexId16(ingest_id); + { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&coordinator.conn, &stmt, &error), ADBC_STATUS_OK); + std::string sql = "ROLLBACK TO SAVEPOINT " + driver_savepoint; + ASSERT_EQ(AdbcStatementSetSqlQuery(&stmt, sql.c_str(), &error), ADBC_STATUS_OK); + AdbcError probe_err = ADBC_ERROR_INIT; + AdbcStatusCode probe_rc = + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &probe_err); + EXPECT_NE(probe_rc, ADBC_STATUS_OK) + << "driver savepoint was not released after commit failure"; + if (probe_err.release) probe_err.release(&probe_err); + AdbcStatementRelease(&stmt, &error); + } + + // The failed ROLLBACK TO above pushed libpq into PQTRANS_INERROR. Recover by + // explicitly rolling back the outer transaction; the rollback must succeed + // for AbortIngestPartitions below to operate on a clean connection. + ASSERT_EQ(AdbcConnectionRollback(&coordinator.conn, &error), ADBC_STATUS_OK) + << error.message; + ASSERT_EQ(AdbcConnectionSetOption(&coordinator.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + "true", &error), + ADBC_STATUS_OK) + << error.message; + + // Best-effort cleanup of any remaining staging tables. The saboteur dropped + // the only staging table out-of-band, so this should find nothing — but call + // it anyway to exercise the abort path on a connection that just unwound an + // error. + ASSERT_EQ(AdbcConnectionAbortIngestPartitions(&coordinator.conn, handle.bytes, + handle.length, 0, nullptr, nullptr, + &error), + ADBC_STATUS_OK) + << error.message; + + handle.release(&handle); + Cleanup(&coordinator, table, &error); + CloseConn(&coordinator, &error); +} + +// Calling CommitIngestPartitions while the connection is in an aborted +// transaction must fail cleanly with ADBC_STATUS_INVALID_STATE instead of +// issuing SQL that would further mutate caller transaction state. +TEST_F(PostgresPartitionedIngestTest, CommitRejectsAbortedTransaction) { + const char* uri = RequireUri(); + if (!uri) return; + const std::string table = "adbc_partitioned_ingest_inerror_test"; + + AdbcError error = ADBC_ERROR_INIT; + ConnPair c; + OpenConn(&c, &error, uri); + Cleanup(&c, table, &error); + + ArrowSchema ingest_schema{}; + MakeIngestSchema(&ingest_schema); + + AdbcIngestHandle handle{}; + ASSERT_EQ(AdbcConnectionBeginIngestPartitions( + &c.conn, nullptr, nullptr, table.c_str(), + ADBC_INGEST_OPTION_MODE_CREATE, &ingest_schema, &handle, &error), + ADBC_STATUS_OK) + << error.message; + ingest_schema.release(&ingest_schema); + + ArrowArrayStream stream{}; + MakeBatchStream(&stream, 0, 5); + AdbcIngestReceipt rec{}; + ASSERT_EQ(AdbcConnectionWriteIngestPartition(&c.conn, handle.bytes, handle.length, + &stream, &rec, &error), + ADBC_STATUS_OK) + << error.message; + std::vector receipt_bytes(rec.bytes, rec.bytes + rec.length); + rec.release(&rec); + + // Put the connection in an aborted-transaction state: disable autocommit, + // then issue a statement that errors (SELECT from a missing relation). + ASSERT_EQ(AdbcConnectionSetOption(&c.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, + ADBC_OPTION_VALUE_DISABLED, &error), + ADBC_STATUS_OK) + << error.message; + { + AdbcStatement stmt{}; + ASSERT_EQ(AdbcStatementNew(&c.conn, &stmt, &error), ADBC_STATUS_OK); + ASSERT_EQ(AdbcStatementSetSqlQuery( + &stmt, "SELECT * FROM nonexistent_relation_adbc_x", &error), + ADBC_STATUS_OK); + AdbcStatementExecuteQuery(&stmt, nullptr, nullptr, &error); + AdbcStatementRelease(&stmt, &error); + } + + const uint8_t* rec_ptr = receipt_bytes.data(); + size_t rec_len = receipt_bytes.size(); + AdbcError commit_err = ADBC_ERROR_INIT; + AdbcStatusCode rc = AdbcConnectionCommitIngestPartitions( + &c.conn, handle.bytes, handle.length, 1, &rec_ptr, &rec_len, + /*rows_affected=*/nullptr, &commit_err); + EXPECT_EQ(rc, ADBC_STATUS_INVALID_STATE) << (commit_err.message ? commit_err.message : ""); + if (commit_err.release) commit_err.release(&commit_err); + + // Restore autocommit so Cleanup can drop the table. + AdbcConnectionRollback(&c.conn, &error); + AdbcConnectionSetOption(&c.conn, ADBC_CONNECTION_OPTION_AUTOCOMMIT, "true", &error); + + // Best-effort cleanup of staging tables left behind by the failed commit. + AdbcConnectionAbortIngestPartitions(&c.conn, handle.bytes, handle.length, 0, nullptr, + nullptr, &error); + + handle.release(&handle); + Cleanup(&c, table, &error); + CloseConn(&c, &error); +} diff --git a/c/driver/postgresql/postgresql.cc b/c/driver/postgresql/postgresql.cc index 16645d0678..a88f47cc82 100644 --- a/c/driver/postgresql/postgresql.cc +++ b/c/driver/postgresql/postgresql.cc @@ -421,6 +421,50 @@ AdbcStatusCode PostgresConnectionRollback(struct AdbcConnection* connection, return (*ptr)->Rollback(error); } +AdbcStatusCode PostgresConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->BeginIngestPartitions(target_catalog, target_db_schema, target_table, + mode, schema, out_handle, error); +} + +AdbcStatusCode PostgresConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->WriteIngestPartition(handle, handle_len, data, out_receipt, error); +} + +AdbcStatusCode PostgresConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->CommitIngestPartitions(handle, handle_len, num_receipts, receipts, + receipt_lens, rows_affected, error); +} + +AdbcStatusCode PostgresConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + if (!connection->private_data) return ADBC_STATUS_INVALID_STATE; + auto ptr = + reinterpret_cast*>(connection->private_data); + return (*ptr)->AbortIngestPartitions(handle, handle_len, num_receipts, receipts, + receipt_lens, error); +} + AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection, const char* key, const char* value, struct AdbcError* error) { @@ -562,6 +606,42 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, serialized_length, out, error); } +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + return PostgresConnectionBeginIngestPartitions(connection, target_catalog, + target_db_schema, target_table, mode, + schema, out_handle, error); +} + +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + return PostgresConnectionWriteIngestPartition(connection, handle, handle_len, data, + out_receipt, error); +} + +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + return PostgresConnectionCommitIngestPartitions(connection, handle, handle_len, + num_receipts, receipts, receipt_lens, + rows_affected, error); +} + +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + return PostgresConnectionAbortIngestPartitions(connection, handle, handle_len, + num_receipts, receipts, receipt_lens, + error); +} + AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, struct AdbcError* error) { return PostgresConnectionRelease(connection, error); @@ -888,15 +968,23 @@ extern "C" { ADBC_EXPORT AdbcStatusCode AdbcDriverPostgresqlInit(int version, void* raw_driver, struct AdbcError* error) { - if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0) { + if (version != ADBC_VERSION_1_0_0 && version != ADBC_VERSION_1_1_0 && + version != ADBC_VERSION_1_2_0) { return ADBC_STATUS_NOT_IMPLEMENTED; } if (!raw_driver) return ADBC_STATUS_INVALID_ARGUMENT; auto* driver = reinterpret_cast(raw_driver); - if (version >= ADBC_VERSION_1_1_0) { + if (version >= ADBC_VERSION_1_2_0) { + std::memset(driver, 0, ADBC_DRIVER_1_2_0_SIZE); + driver->ConnectionBeginIngestPartitions = PostgresConnectionBeginIngestPartitions; + driver->ConnectionWriteIngestPartition = PostgresConnectionWriteIngestPartition; + driver->ConnectionCommitIngestPartitions = PostgresConnectionCommitIngestPartitions; + driver->ConnectionAbortIngestPartitions = PostgresConnectionAbortIngestPartitions; + } else if (version >= ADBC_VERSION_1_1_0) { std::memset(driver, 0, ADBC_DRIVER_1_1_0_SIZE); - + } + if (version >= ADBC_VERSION_1_1_0) { driver->ErrorGetDetailCount = PostgresErrorGetDetailCount; driver->ErrorGetDetail = PostgresErrorGetDetail; driver->ErrorFromArrayStream = PostgresErrorFromArrayStream; diff --git a/c/driver_manager/adbc_driver_manager_api.cc b/c/driver_manager/adbc_driver_manager_api.cc index 19f8e7b4c5..45cc63c159 100644 --- a/c/driver_manager/adbc_driver_manager_api.cc +++ b/c/driver_manager/adbc_driver_manager_api.cc @@ -17,6 +17,7 @@ // ADBC API implementations +#include #include #include #include @@ -415,6 +416,36 @@ AdbcStatusCode StatementSetSubstraitPlan(struct AdbcStatement*, const uint8_t*, SetError(error, "AdbcStatementSetSubstraitPlan not implemented"); return ADBC_STATUS_NOT_IMPLEMENTED; } + +AdbcStatusCode ConnectionBeginIngestPartitions( + struct AdbcConnection*, const char*, const char*, const char*, const char*, + struct ArrowSchema*, struct AdbcIngestHandle*, struct AdbcError* error) { + SetError(error, "AdbcConnectionBeginIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionWriteIngestPartition(struct AdbcConnection*, const uint8_t*, + size_t, struct ArrowArrayStream*, + struct AdbcIngestReceipt*, + struct AdbcError* error) { + SetError(error, "AdbcConnectionWriteIngestPartition not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionCommitIngestPartitions(struct AdbcConnection*, const uint8_t*, + size_t, size_t, const uint8_t**, + const size_t*, int64_t*, + struct AdbcError* error) { + SetError(error, "AdbcConnectionCommitIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} + +AdbcStatusCode ConnectionAbortIngestPartitions(struct AdbcConnection*, const uint8_t*, + size_t, size_t, const uint8_t**, + const size_t*, struct AdbcError* error) { + SetError(error, "AdbcConnectionAbortIngestPartitions not implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; +} } // namespace // ============================================================================= @@ -1063,6 +1094,115 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, out, connection); } +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionBeginIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!out_handle) { + SetError(error, "AdbcConnectionBeginIngestPartitions: out_handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionBeginIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionBeginIngestPartitions( + connection, target_catalog, target_db_schema, target_table, mode, schema, out_handle, + error); +} + +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionWriteIngestPartition: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!handle) { + SetError(error, "AdbcConnectionWriteIngestPartition: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!data) { + SetError(error, "AdbcConnectionWriteIngestPartition: data is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!out_receipt) { + SetError(error, "AdbcConnectionWriteIngestPartition: out_receipt is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionWriteIngestPartition: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionWriteIngestPartition( + connection, handle, handle_len, data, out_receipt, error); +} + +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionCommitIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!handle) { + SetError(error, "AdbcConnectionCommitIngestPartitions: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (num_receipts > 0 && (!receipts || !receipt_lens)) { + SetError(error, + "AdbcConnectionCommitIngestPartitions: receipts/receipt_lens are NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionCommitIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionCommitIngestPartitions( + connection, handle, handle_len, num_receipts, receipts, receipt_lens, rows_affected, + error); +} + +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error) { + if (!connection) { + SetError(error, "AdbcConnectionAbortIngestPartitions: connection is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!handle) { + SetError(error, "AdbcConnectionAbortIngestPartitions: handle is NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (num_receipts > 0 && (!receipts || !receipt_lens)) { + SetError(error, + "AdbcConnectionAbortIngestPartitions: receipts/receipt_lens are NULL"); + return ADBC_STATUS_INVALID_ARGUMENT; + } + if (!connection->private_driver) { + SetError(error, + "AdbcConnectionAbortIngestPartitions: must call AdbcConnectionNew first"); + return ADBC_STATUS_INVALID_STATE; + } + INIT_ERROR(error, connection); + return connection->private_driver->ConnectionAbortIngestPartitions( + connection, handle, handle_len, num_receipts, receipts, receipt_lens, error); +} + AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection, struct AdbcError* error) { if (!connection->private_driver) { @@ -1442,7 +1582,8 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint, AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int version, void* raw_driver, struct AdbcError* error) { - constexpr std::array kSupportedVersions = { + constexpr std::array kSupportedVersions = { + ADBC_VERSION_1_2_0, ADBC_VERSION_1_1_0, ADBC_VERSION_1_0_0, }; @@ -1452,13 +1593,10 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers return ADBC_STATUS_INVALID_ARGUMENT; } - switch (version) { - case ADBC_VERSION_1_0_0: - case ADBC_VERSION_1_1_0: - break; - default: - SetError(error, "Only ADBC 1.0.0 and 1.1.0 are supported"); - return ADBC_STATUS_NOT_IMPLEMENTED; + if (std::find(kSupportedVersions.begin(), kSupportedVersions.end(), version) == + kSupportedVersions.end()) { + SetError(error, "Only ADBC 1.0.0, 1.1.0, and 1.2.0 are supported"); + return ADBC_STATUS_NOT_IMPLEMENTED; } #define FILL_DEFAULT(DRIVER, STUB) \ @@ -1550,6 +1688,13 @@ AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, int vers FILL_DEFAULT(driver, StatementSetOptionDouble); FILL_DEFAULT(driver, StatementSetOptionInt); } + if (version >= ADBC_VERSION_1_2_0) { + auto* driver = reinterpret_cast(raw_driver); + FILL_DEFAULT(driver, ConnectionBeginIngestPartitions); + FILL_DEFAULT(driver, ConnectionWriteIngestPartition); + FILL_DEFAULT(driver, ConnectionCommitIngestPartitions); + FILL_DEFAULT(driver, ConnectionAbortIngestPartitions); + } return ADBC_STATUS_OK; diff --git a/c/driver_manager/adbc_version_100_compatibility_test.cc b/c/driver_manager/adbc_version_100_compatibility_test.cc index 43079ecb3e..62b48f04e6 100644 --- a/c/driver_manager/adbc_version_100_compatibility_test.cc +++ b/c/driver_manager/adbc_version_100_compatibility_test.cc @@ -59,7 +59,7 @@ TEST_F(AdbcVersion, StructSize) { ASSERT_EQ(sizeof(AdbcError), ADBC_ERROR_1_1_0_SIZE); ASSERT_EQ(sizeof(AdbcDriverVersion100), ADBC_DRIVER_1_0_0_SIZE); - ASSERT_EQ(sizeof(AdbcDriver), ADBC_DRIVER_1_1_0_SIZE); + ASSERT_EQ(sizeof(AdbcDriver), ADBC_DRIVER_1_2_0_SIZE); } // Initialize a version 1.0.0 driver with the version 1.1.0 driver struct. @@ -105,6 +105,209 @@ TEST_F(AdbcVersion, OldDriverNewManager) { EXPECT_NE(driver.StatementSetOptionDouble, nullptr); } +// When a pre-1.2.0 driver is loaded at ADBC_VERSION_1_2_0, the new +// partitioned-ingest entry points must be populated with default stubs that +// return ADBC_STATUS_NOT_IMPLEMENTED, so the driver-manager wrappers do not +// dereference null function pointers. +TEST_F(AdbcVersion, OldDriverNewManagerPartitionedIngest) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + ASSERT_NE(driver.ConnectionBeginIngestPartitions, nullptr); + ASSERT_NE(driver.ConnectionWriteIngestPartition, nullptr); + ASSERT_NE(driver.ConnectionCommitIngestPartitions, nullptr); + ASSERT_NE(driver.ConnectionAbortIngestPartitions, nullptr); + + struct AdbcError stub_error = {}; + + EXPECT_THAT(driver.ConnectionBeginIngestPartitions(nullptr, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr, + &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionWriteIngestPartition(nullptr, nullptr, 0, nullptr, nullptr, + &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionCommitIngestPartitions(nullptr, nullptr, 0, 0, nullptr, + nullptr, nullptr, &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); + + EXPECT_THAT(driver.ConnectionAbortIngestPartitions(nullptr, nullptr, 0, 0, nullptr, + nullptr, &stub_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &stub_error)); + if (stub_error.release) stub_error.release(&stub_error); +} + +// The public AdbcConnection*IngestPartitions wrappers must reject a NULL +// connection with ADBC_STATUS_INVALID_ARGUMENT instead of dereferencing it. +TEST_F(AdbcVersion, IngestPartitionsWrappersRejectNullConnection) { + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestHandle out_handle = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + EXPECT_THAT( + AdbcConnectionBeginIngestPartitions(nullptr, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + &out_handle, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionWriteIngestPartition(nullptr, handle_bytes, 1, &stream, + &out_receipt, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionCommitIngestPartitions(nullptr, handle_bytes, 1, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(nullptr, handle_bytes, 1, 0, nullptr, + nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); +} + +// With a real connection wired to a 1.2.0-loaded driver, the public wrappers +// must dispatch to the FILL_DEFAULT stubs and surface NOT_IMPLEMENTED. +TEST_F(AdbcVersion, IngestPartitionsWrappersDispatchToStubs) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + struct AdbcConnection connection = {}; + connection.private_driver = &driver; + + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestHandle out_handle = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + EXPECT_THAT(AdbcConnectionBeginIngestPartitions(&connection, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + &out_handle, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, &stream, + &out_receipt, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, handle_bytes, 1, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, handle_bytes, 1, 0, + nullptr, nullptr, &local_error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Don't let TearDown observe a borrowed driver pointer. + connection.private_driver = nullptr; +} + +// With a valid connection, the public wrappers must still reject NULL +// handle/data/out_handle/out_receipt and (when num_receipts > 0) NULL +// receipts/receipt_lens with ADBC_STATUS_INVALID_ARGUMENT, so a future reorder +// that hides these guards behind the connection check is caught here. +TEST_F(AdbcVersion, IngestPartitionsWrappersGuardNullArgsWithValidConnection) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, ADBC_VERSION_1_2_0, + &driver, &error), + IsOkStatus(&error)); + + struct AdbcConnection connection = {}; + connection.private_driver = &driver; + + uint8_t handle_bytes[1] = {0}; + struct ArrowArrayStream stream = {}; + struct ArrowSchema schema = {}; + struct AdbcIngestReceipt out_receipt = {}; + int64_t rows_affected = 0; + struct AdbcError local_error = {}; + + // Begin: NULL out_handle. + EXPECT_THAT(AdbcConnectionBeginIngestPartitions(&connection, nullptr, nullptr, "t", + ADBC_INGEST_OPTION_MODE_CREATE, &schema, + /*out_handle=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL handle. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, /*handle=*/nullptr, 0, + &stream, &out_receipt, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL data. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, + /*data=*/nullptr, &out_receipt, + &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Write: NULL out_receipt. + EXPECT_THAT(AdbcConnectionWriteIngestPartition(&connection, handle_bytes, 1, &stream, + /*out_receipt=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Commit: NULL handle. + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, /*handle=*/nullptr, 0, 0, nullptr, + nullptr, &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Commit: num_receipts > 0 with NULL receipts/receipt_lens. + EXPECT_THAT( + AdbcConnectionCommitIngestPartitions(&connection, handle_bytes, 1, /*num=*/1, + /*receipts=*/nullptr, /*lens=*/nullptr, + &rows_affected, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Abort: NULL handle. + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, /*handle=*/nullptr, 0, 0, + nullptr, nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Abort: num_receipts > 0 with NULL receipts/receipt_lens. + EXPECT_THAT(AdbcConnectionAbortIngestPartitions(&connection, handle_bytes, 1, /*num=*/1, + /*receipts=*/nullptr, + /*lens=*/nullptr, &local_error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &local_error)); + if (local_error.release) local_error.release(&local_error); + + // Don't let TearDown observe a borrowed driver pointer. + connection.private_driver = nullptr; +} + +// AdbcLoadDriverFromInitFunc must reject unrecognized version constants with +// ADBC_STATUS_NOT_IMPLEMENTED so a typo in kSupportedVersions cannot silently +// regress version gating. +TEST_F(AdbcVersion, LoadDriverFromInitFuncRejectsUnknownVersion) { + ASSERT_THAT(AdbcLoadDriverFromInitFunc(&Version100DriverInit, /*bogus*/ 0x12345678, + &driver, &error), + IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error)); + ASSERT_EQ(driver.release, nullptr); +} + // N.B. see postgresql_test.cc for backwards compatibility test of AdbcError // N.B. see postgresql_test.cc for backwards compatibility test of AdbcDriver diff --git a/c/include/arrow-adbc/adbc.h b/c/include/arrow-adbc/adbc.h index 57e665f84a..48ec4fc82c 100644 --- a/c/include/arrow-adbc/adbc.h +++ b/c/include/arrow-adbc/adbc.h @@ -423,6 +423,14 @@ const struct AdbcError* AdbcErrorFromArrayStream(struct ArrowArrayStream* stream /// \since ADBC API revision 1.1.0 #define ADBC_VERSION_1_1_0 1001000 +/// \brief ADBC revision 1.2.0. +/// +/// When passed to an AdbcDriverInitFunc(), the driver parameter must +/// point to an AdbcDriver. +/// +/// \since ADBC API revision 1.2.0 +#define ADBC_VERSION_1_2_0 1002000 + /// \brief Canonical option value for enabling an option. /// /// For use as the value in SetOption calls. @@ -979,6 +987,9 @@ struct AdbcPartitions { /// driver and the driver manager. /// @{ +struct AdbcIngestHandle; +struct AdbcIngestReceipt; + /// \brief An instance of an initialized database driver. /// /// This provides a common interface for vendor-specific driver @@ -1135,6 +1146,30 @@ struct ADBC_EXPORT AdbcDriver { struct AdbcError*); /// @} + + /// \defgroup adbc-1.2.0 ADBC API Revision 1.2.0 + /// + /// Functions added in ADBC 1.2.0. For backwards compatibility, + /// these members must not be accessed unless the version passed to + /// the AdbcDriverInitFunc is greater than or equal to + /// ADBC_VERSION_1_2_0. + /// + /// @{ + + AdbcStatusCode (*ConnectionBeginIngestPartitions)( + struct AdbcConnection*, const char*, const char*, const char*, const char*, + struct ArrowSchema*, struct AdbcIngestHandle*, struct AdbcError*); + AdbcStatusCode (*ConnectionWriteIngestPartition)( + struct AdbcConnection*, const uint8_t*, size_t, struct ArrowArrayStream*, + struct AdbcIngestReceipt*, struct AdbcError*); + AdbcStatusCode (*ConnectionCommitIngestPartitions)( + struct AdbcConnection*, const uint8_t*, size_t, size_t, const uint8_t**, + const size_t*, int64_t*, struct AdbcError*); + AdbcStatusCode (*ConnectionAbortIngestPartitions)( + struct AdbcConnection*, const uint8_t*, size_t, size_t, const uint8_t**, + const size_t*, struct AdbcError*); + + /// @} }; /// \brief The size of the AdbcDriver structure in ADBC 1.0.0. @@ -1151,7 +1186,15 @@ struct ADBC_EXPORT AdbcDriver { /// ADBC_VERSION_1_1_0. /// /// \since ADBC API revision 1.1.0 -#define ADBC_DRIVER_1_1_0_SIZE (sizeof(struct AdbcDriver)) +#define ADBC_DRIVER_1_1_0_SIZE (offsetof(struct AdbcDriver, ConnectionBeginIngestPartitions)) + +/// \brief The size of the AdbcDriver structure in ADBC 1.2.0. +/// Drivers written for ADBC 1.2.0 and later should never touch more +/// than this portion of an AdbcDriver struct when given +/// ADBC_VERSION_1_2_0. +/// +/// \since ADBC API revision 1.2.0 +#define ADBC_DRIVER_1_2_0_SIZE (sizeof(struct AdbcDriver)) /// @} @@ -1946,6 +1989,235 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection, /// @} +/// \defgroup adbc-connection-ingest-partition Partitioned Bulk Ingest +/// @{ + +/// \brief Driver-owned bytes returned by +/// AdbcConnectionBeginIngestPartitions. +/// +/// The bytes are opaque and serializable: the caller may copy +/// `bytes[0..length)` and ship that copy to workers (other processes +/// or hosts) which can pass it to AdbcConnectionWriteIngestPartition +/// directly. +/// +/// The struct itself is owned by the driver. Call `release` exactly +/// once to free it. Releasing the handle does NOT roll back the +/// ingest — call AdbcConnectionAbortIngestPartitions for that. +/// +/// \since ADBC API revision 1.2.0 +struct AdbcIngestHandle { + /// \brief The length of `bytes`. + size_t length; + + /// \brief The serialized handle bytes (driver-owned). + const uint8_t* bytes; + + /// \brief Private driver state. + void* private_data; + + /// \brief Release the handle's memory. Sets `release` to NULL. + void (*release)(struct AdbcIngestHandle* self); +}; + +/// \brief Driver-owned bytes returned by +/// AdbcConnectionWriteIngestPartition. +/// +/// Mirror of AdbcIngestHandle: opaque, serializable, single-use +/// `release`. Releasing a receipt does NOT discard the underlying +/// write; that happens at Commit (commit it) or Abort (drop it). +/// +/// \since ADBC API revision 1.2.0 +struct AdbcIngestReceipt { + /// \brief The length of `bytes`. + size_t length; + + /// \brief The serialized receipt bytes (driver-owned). + const uint8_t* bytes; + + /// \brief Private driver state. + void* private_data; + + /// \brief Release the receipt's memory. Sets `release` to NULL. + void (*release)(struct AdbcIngestReceipt* self); +}; +/// @} + +/// \addtogroup adbc-connection-ingest-partition +/// Some drivers can accept bulk writes from a distributed writer: a +/// coordinator configures an ingest, many workers write partitions in +/// parallel (possibly from different processes or hosts), and the +/// coordinator commits or aborts atomically. +/// +/// This mirrors the read-side partitioned execution model. The +/// coordinator calls AdbcConnectionBeginIngestPartitions to obtain an +/// opaque, serializable handle. The handle is shipped to workers by +/// the caller (e.g. a Spark driver sending it to executors). Workers +/// call AdbcConnectionWriteIngestPartition on their own connections — +/// the connection does not have to be the same one that created the +/// handle. Each write returns an opaque receipt. The coordinator +/// collects receipts and calls AdbcConnectionCommitIngestPartitions +/// (or AdbcConnectionAbortIngestPartitions on failure). +/// +/// Handles and receipts are driver-defined opaque byte strings. They +/// are safe to transmit between processes and to use concurrently +/// from multiple connections. +/// +/// Drivers are not required to support partitioned ingest. +/// +/// \since ADBC API revision 1.2.0 +/// +/// @{ + +/// \brief Begin a partitioned bulk ingest. +/// +/// Uses the same semantics as the ADBC_INGEST_OPTION_* options on +/// AdbcStatement. For ADBC_INGEST_OPTION_MODE_CREATE, +/// ADBC_INGEST_OPTION_MODE_CREATE_APPEND, and +/// ADBC_INGEST_OPTION_MODE_REPLACE, `schema` is required and the +/// driver creates (or recreates) the target table at this call. For +/// ADBC_INGEST_OPTION_MODE_APPEND, `schema` is optional; if provided, +/// the driver validates it against the target and returns +/// ADBC_STATUS_ALREADY_EXISTS on mismatch. +/// +/// The returned handle is opaque, serializable, and usable from any +/// connection that can open the same database. The caller releases +/// it via `out_handle->release`; the bytes can be copied and shipped +/// to workers before release. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection The coordinator's connection. +/// \param[in] target_catalog Catalog of the target table, or NULL. +/// \param[in] target_db_schema Schema of the target table, or NULL. +/// \param[in] target_table Name of the target table. Required. +/// \param[in] mode One of ADBC_INGEST_OPTION_MODE_*. Required. +/// \param[in] schema Arrow schema of the data to be written. +/// Required for create/replace/create_append modes; optional for +/// append. +/// \param[out] out_handle Driver-owned handle. Must be released by +/// the caller via `out_handle->release`. +/// \param[out] error Error details, if any. +/// \return ADBC_STATUS_INVALID_ARGUMENT if mode requires a schema +/// but none was provided. +/// \return ADBC_STATUS_ALREADY_EXISTS if append mode is requested +/// and the target schema disagrees with the provided schema. +/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not +/// support partitioned ingest. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionBeginIngestPartitions( + struct AdbcConnection* connection, const char* target_catalog, + const char* target_db_schema, const char* target_table, const char* mode, + struct ArrowSchema* schema, struct AdbcIngestHandle* out_handle, + struct AdbcError* error); + +/// \brief Write one partition of a partitioned bulk ingest. +/// +/// Called by a worker, typically on a different connection than the +/// one that created the handle. The driver reads the bound stream +/// to completion, writes its contents to driver-specific staging +/// (per-call: a unique staging table, unique object-store path, etc. +/// — never shared across concurrent writes), and returns an opaque +/// receipt. +/// +/// The stream's schema should be compatible with the target table's +/// schema. Drivers may validate this at any point during the write; +/// on mismatch the call fails and produces no receipt. The exact +/// validation mechanism is driver-specific (e.g., RDBMS drivers may +/// rely on the staging table DDL to enforce compatibility). +/// +/// On error of any kind, `out_receipt` is left with `release == +/// NULL` and the caller should retry the whole partition. Partial +/// receipts are never produced. The driver may, however, leave +/// partial server-side state (for example, a per-call staging +/// table); the caller must still call `AbortIngestPartitions` for +/// the handle (with no receipt for this failed write) to release +/// any staging resources, or rely on driver housekeeping. +/// +/// This call is safe to invoke concurrently from many connections +/// using the same handle. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection The worker's connection. +/// \param[in] handle The handle bytes from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] data Arrow stream of partition data. The driver +/// consumes the stream and releases it. +/// \param[out] out_receipt Driver-owned receipt. Must be released +/// by the caller via `out_receipt->release`. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionWriteIngestPartition( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + struct ArrowArrayStream* data, struct AdbcIngestReceipt* out_receipt, + struct AdbcError* error); + +/// \brief Commit a partitioned bulk ingest. +/// +/// Atomically promotes all writes named by `receipts` into the +/// target table. Semantics of "atomic" are driver-specific: RDBMS +/// drivers typically swap staging into the target in a transaction; +/// table-format drivers (Iceberg, Delta) write a catalog or +/// transaction-log entry referencing the data files in the +/// receipts. +/// +/// After Commit returns successfully, the handle is consumed and +/// must not be used again. +/// +/// Receipts from failed writes, or writes whose receipts were never +/// observed by the coordinator, are not included in the commit. +/// Their staging data is orphaned and is cleaned up as described in +/// AdbcConnectionAbortIngestPartitions. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection A connection — typically the coordinator's, +/// but any connection that can open the same database works. +/// \param[in] handle The handle from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] num_receipts Number of receipts in the batch. +/// \param[in] receipts Array of receipt byte-pointers. +/// \param[in] receipt_lens Array of receipt lengths. +/// \param[out] rows_affected Number of rows committed, or -1 if +/// unknown. Pass NULL if not wanted. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionCommitIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + int64_t* rows_affected, struct AdbcError* error); + +/// \brief Abort a partitioned bulk ingest. +/// +/// Discards all writes scoped to the handle and releases any +/// driver-side resources. The handle is consumed. +/// +/// Drivers must clean up every write scoped to the handle, including +/// writes whose receipts were lost or never observed — not only +/// those named in `receipts`. The handle is the authority for +/// cleanup scope; `receipts`, when provided, are a hint that allows +/// the driver to fast-path deletion of known writes. +/// +/// Abort is best-effort. If cleanup is incomplete, the driver +/// returns a warning status and orphaned storage may remain; it is +/// the driver's responsibility to provide housekeeping (e.g. TTL, +/// background GC, or documented manual cleanup). Callers may also +/// call Abort if the coordinator crashed and was restarted without +/// the original receipts. +/// +/// \since ADBC API revision 1.2.0 +/// \param[in] connection A connection. +/// \param[in] handle The handle from Begin. +/// \param[in] handle_len Length of handle. +/// \param[in] num_receipts Number of receipts, or 0. +/// \param[in] receipts Array of receipt byte-pointers, or NULL. +/// \param[in] receipt_lens Array of receipt lengths, or NULL. +/// \param[out] error Error details, if any. +ADBC_EXPORT +AdbcStatusCode AdbcConnectionAbortIngestPartitions( + struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len, + size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens, + struct AdbcError* error); + +/// @} + /// \defgroup adbc-connection-transaction Transaction Semantics /// /// Connections start out in auto-commit mode by default (if diff --git a/docs/source/format/partitioned_bulk_ingest.rst b/docs/source/format/partitioned_bulk_ingest.rst new file mode 100644 index 0000000000..b8a90aee44 --- /dev/null +++ b/docs/source/format/partitioned_bulk_ingest.rst @@ -0,0 +1,293 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 + +================================ +Proposal: Partitioned Bulk Ingest +================================ + +.. note:: + + Status: draft. Targets ADBC API revision 1.2.0. + +Motivation +========== + +Today ADBC supports two ingest shapes: + +- **Single-writer bulk ingest** — one connection, one statement, one + ``ArrowArrayStream``, one transaction. Good for loading from a single + process; useless for distributed writers. +- **Per-row binding** — slower, also single-connection. + +Two real workloads do not fit: + +1. **Distributed-writer to RDBMS.** A Spark/Flink/Beam job runs N + executors, each producing a partition of the output. Today each + executor opens its own ADBC connection and runs its own bulk + ingest, but the result is *not atomic*: there is no commit point at + which all N partitions become visible together. Workarounds + (per-job staging tables, ad-hoc swap SQL) are database-specific and + leak into application code. + +2. **Distributed-writer to table-format catalogs (Apache Iceberg, + Delta Lake).** These formats are *designed* for distributed + writes: many workers write data files in parallel, and a single + commit step writes a snapshot/manifest in the catalog or + transaction log. ADBC currently has no way to expose this shape. + A driver author who wants to write to Iceberg today has to pick + between (a) routing all writes through one process (defeats the + point) or (b) inventing a private API. + +The unifying observation is that both workloads need the same shape: +**coordinator decides what to ingest, workers write partitions in +parallel, coordinator commits or aborts atomically**. That is the +mirror image of partitioned read (``ExecutePartitions`` / +``ReadPartition``), which ADBC already supports. + +Goals +----- + +- Allow a coordinator to start an ingest, ship an opaque token to N + workers (possibly in different processes or hosts), have each + worker independently write a partition over its own connection, and + finally commit (or abort) atomically from the coordinator. +- Be implementable by both RDBMS drivers (via per-worker staging + tables) and table-format drivers (via per-worker data files + + catalog commit) without forcing either model on the other. +- Survive lost worker writes, dropped receipts, and coordinator + restarts without leaving silent data corruption. +- Keep the per-driver cost low: most of the ingest plumbing + (CREATE TABLE, COPY, schema mapping) is reused from existing + single-writer ingest. + +Non-goals +--------- + +- Schema evolution mid-ingest. Schema is fixed when ``Begin`` is + called; changing it requires starting a new ingest. +- Cross-driver atomicity (writing to two databases in one commit). +- Defining how a distributed engine (Spark, Flink) ships handles and + receipts between processes. That is the application's problem; + the API guarantees only that handles and receipts are opaque, + serializable byte strings. +- Idempotency on the coordinator side. If the coordinator + double-commits (calls ``Commit`` twice on the same handle) the + second call is undefined. + +Design overview +=============== + +Three new operations on ``AdbcConnection``, plus an ``Abort``: + +:: + + coordinator: Begin(table, mode, schema) → handle + workers: Write(handle, stream) → receipt + Write(handle, stream) → receipt + ... + coordinator: Commit(handle, [receipt, receipt, …]) → rows_affected + (or) Abort(handle, [receipts...]) + +The handle and each receipt are **opaque, serializable byte +strings**. This is the same shape as the existing partitioned-read +side, where ``AdbcStatementExecutePartitions`` returns opaque +``AdbcPartitions`` byte strings that can be shipped to workers and +passed to ``AdbcConnectionReadPartition`` over a different +connection. + +API surface +----------- + +C declarations (see ``adbc.h`` for full doc comments): + +.. code-block:: c + + struct AdbcIngestHandle { + size_t length; + const uint8_t* bytes; + void* private_data; + void (*release)(struct AdbcIngestHandle*); + }; + + struct AdbcIngestReceipt { + size_t length; + const uint8_t* bytes; + void* private_data; + void (*release)(struct AdbcIngestReceipt*); + }; + + AdbcConnectionBeginIngestPartitions( + conn, target_catalog, target_db_schema, target_table, mode, + schema, *out_handle, *error); + + AdbcConnectionWriteIngestPartition( + conn, handle_bytes, handle_len, *data_stream, + *out_receipt, *error); + + AdbcConnectionCommitIngestPartitions( + conn, handle_bytes, handle_len, num_receipts, receipts, + receipt_lens, *rows_affected, *error); + + AdbcConnectionAbortIngestPartitions( + conn, handle_bytes, handle_len, num_receipts, receipts, + receipt_lens, *error); + +The asymmetry — outputs are driver-owned structs, inputs are raw +``bytes + len`` — is deliberate and matches the read side: the bytes +are the part the caller serializes for transport, while the structs +hold driver-owned memory that callers release locally. + +Driver-side semantics +--------------------- + +- **Begin** validates options, performs whatever setup the driver + requires for writes to proceed (e.g., creating the target table for + ``create``/``replace``/``create_append`` modes, reserving a + transaction snapshot, allocating an object-store prefix), and returns + a handle that encodes the state needed to scope subsequent writes. +- **Write** takes a handle and a stream, writes the partition into + driver-private staging (a per-write staging table, a per-write + object-store path), and returns a receipt encoding what was + written (staging name, file paths, row count, statistics, ...). + Each ``Write`` call must produce output that can be committed or + discarded *independently* — no shared state across concurrent + writes that would cause duplicate rows on retry. +- **Commit** atomically promotes the union of the supplied receipts + into the target. Atomic semantics are driver-specific: RDBMS + drivers swap staging into target in a transaction; table-format + drivers write a catalog or transaction-log entry referencing the + data files in the receipts. After successful commit the handle is + consumed. +- **Abort** discards all writes scoped to the handle. The driver + must clean up *every* write under the handle, not just the ones + named in the supplied receipts (see "Lost receipts" below). + +Cross-process flow +------------------ + +:: + + ┌──────────────┐ + │ coordinator │ Begin(...) ─→ handle + └──────┬───────┘ + │ copy handle.bytes; ship to workers + ▼ + ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ + │ worker 1 │ │ worker 2 │ … │ worker N │ + │ Write(...) →│ │ Write(...) →│ │ Write(...) →│ + │ receipt₁ │ │ receipt₂ │ │ receipt_N │ + └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ + │ copy receipt.bytes; ship back │ + ▼ ▼ ▼ + ┌──────────────────────────────────────────────────────┐ + │ coordinator: Commit(handle, [r₁, r₂, ..., r_N]) │ + └──────────────────────────────────────────────────────┘ + +Workers may use *different* connections than the coordinator — the +handle is self-contained. + +Key design decisions +==================== + +The decisions below were the ones with non-obvious tradeoffs. + +1. Opaque handles and receipts +------------------------------ + +Driver-defined byte strings, no schema imposed by ADBC. This lets a +Postgres driver encode "staging table prefix + UUID" while an +Iceberg driver encodes "snapshot id + data file paths + column +stats" — without ADBC having to model both. The cost is that +applications cannot inspect handles or receipts. Worth it: the only +party that ever needs to interpret them is the driver. + +2. Schema is fixed at ``Begin``, not per-``Write`` +-------------------------------------------------- + +For ``create``/``replace``/``create_append`` modes, the driver +issues ``CREATE TABLE`` (or the catalog equivalent) at ``Begin`` +time, before any worker writes. Workers cannot race to "create on +first write" because they are on different machines. Iceberg/Delta +also need the schema pinned into the transaction snapshot at start. + +For ``append`` mode, the schema parameter is optional; if supplied +it is validated against the target so a thousand workers don't all +fail independently with the same schema-mismatch error. + +3. Driver-owned output structs (handle, receipt) +------------------------------------------------- + +An earlier draft used the ``GetOptionBytes`` two-phase sizing +pattern: caller passes a buffer + capacity, driver reports required +length, caller retries with a larger buffer. This is correct only +for *idempotent* operations. ``Begin`` and ``Write`` produce +irrecoverable side effects (``CREATE TABLE``, ``COPY``); a +buffer-too-small failure left the side effects in place but gave the +caller no handle/receipt to pass to ``Abort`` — an unrecoverable +orphan. + +The chosen pattern (driver-owned struct with a release callback) +mirrors ``AdbcPartitions`` on the read side, eliminates the orphan +window, and gives drivers a clean place to free internal state. + +4. ``Commit`` and ``Abort`` take raw bytes, not structs +------------------------------------------------------- + +Symmetric with ``AdbcConnectionReadPartition``, which takes the raw +bytes from a ``partitions[i]`` entry rather than the +``AdbcPartitions`` struct. Receipts that traveled across processes +arrive as raw bytes; forcing the caller to wrap them in +``AdbcIngestReceipt`` structs (with bogus ``release`` callbacks) +would be friction without benefit. + +5. Lost receipts are handled by handle-scoped sweep, not by receipts +-------------------------------------------------------------------- + +If a worker writes data but its receipt is lost in transit, the +coordinator's receipt list is incomplete. ``Commit`` will not +include the orphan (correct: only acknowledged writes are +committed). ``Abort``, however, must clean it up — and ``Abort`` +cannot rely on the supplied receipts alone, because the orphan +isn't in them. + +The handle therefore must encode enough scope (UUID prefix, +transaction id, object-store path) for the driver to enumerate +*everything* written under it. Receipts passed to ``Abort`` are an +optimization (fast-path deletion of known writes); the handle is the +authority for cleanup scope. Drivers that cannot enumerate from +the handle alone cannot correctly implement partitioned ingest. + +6. Coordinator may die without calling ``Commit`` or ``Abort`` +-------------------------------------------------------------- + +The handle is opaque to the driver outside of ``Write``, so the +driver has no built-in liveness signal. Recommended (not required) +behaviors: + +- Drivers may TTL or background-GC handle-scoped writes. +- Callers may persist the handle bytes and call ``Abort`` after + restart to recover. +- Iceberg/Delta drivers can rely on existing orphan-file cleanup + tooling. + +The spec does not mandate any of these; it documents the failure +mode and leaves the policy to drivers. + +Reference implementation +======================== + +A prototype lives in the PostgreSQL driver +(``c/driver/postgresql/ingest_partition.{h,cc}``). It uses +per-worker ``UNLOGGED`` staging tables of the form +``adbc_stg__``, a single ``BEGIN``/``COMMIT`` +wrapping ``INSERT INTO target SELECT cols FROM staging`` for each +receipt, and an ``Abort`` that scans +``information_schema.tables`` for the handle's prefix. Test +coverage is in ``c/driver/postgresql/partitioned_ingest_test.cc``. diff --git a/docs/source/index.rst b/docs/source/index.rst index 5d4b8966ff..5f01e11078 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -253,6 +253,7 @@ Why ADBC? :hidden: format/specification + format/partitioned_bulk_ingest format/versioning format/comparison format/how_manager