-
Notifications
You must be signed in to change notification settings - Fork 192
feat(c/driver/postgresql): implement copy writer for time types #4057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -425,6 +425,81 @@ class PostgresCopyNumericFieldReader : public PostgresCopyFieldReader { | |
| static const uint16_t kNumericNinf = 0xF000; | ||
| }; | ||
|
|
||
| // Microseconds per day (24h) | ||
| constexpr int64_t kUsecsPerDay = 86400LL * 1000000LL; | ||
| // Nanoseconds per day (24h) | ||
| constexpr int64_t kNsecsPerDay = 86400LL * 1000000000LL; | ||
|
|
||
| template <enum ArrowTimeUnit TU, typename OutT> | ||
| class PostgresCopyTimeOfDayFieldReader : public PostgresCopyFieldReader { | ||
| public: | ||
| ArrowErrorCode Read(ArrowBufferView* data, int32_t field_size_bytes, ArrowArray* array, | ||
| ArrowError* error) override { | ||
| if (field_size_bytes <= 0) { | ||
| return ArrowArrayAppendNull(array, 1); | ||
| } | ||
|
|
||
| // PostgreSQL TIME binary payload is int64 microseconds since midnight. https://www.postgresql.org/docs/current/datatype-datetime.html | ||
| if (field_size_bytes != static_cast<int32_t>(sizeof(int64_t))) { | ||
| ArrowErrorSet(error, "Expected field with %d bytes but found field with %d bytes", | ||
| static_cast<int>(sizeof(int64_t)), | ||
| static_cast<int>(field_size_bytes)); // NOLINT(runtime/int) | ||
| return EINVAL; | ||
| } | ||
|
|
||
| const int64_t time_usec = ReadUnsafe<int64_t>(data); | ||
|
|
||
| // PostgreSQL time_recv validates microseconds since midnight (0..USECS_PER_DAY). | ||
| // Keep this validation here so we don't produce nonsensical Arrow values. | ||
| if (time_usec < 0 || time_usec > kUsecsPerDay) { | ||
| ArrowErrorSet(error, | ||
| "[libpq] TIME value %" PRId64 | ||
| " usec is out of range [0, %" PRId64 "]", | ||
| time_usec, kUsecsPerDay); | ||
| return EINVAL; | ||
| } | ||
|
|
||
| // Convert to Arrow representation requested by schema: | ||
| // Arrow TIME32 uses int32 in seconds or milliseconds; TIME64 uses int64 in microseconds or nanoseconds. | ||
| int64_t out64 = 0; | ||
| switch (TU) { | ||
| case NANOARROW_TIME_UNIT_SECOND: | ||
| out64 = time_usec / 1000000LL; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_MILLI: | ||
| out64 = time_usec / 1000LL; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_MICRO: | ||
| out64 = time_usec; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_NANO: | ||
| out64 = time_usec * 1000LL; | ||
| break; | ||
| } | ||
|
|
||
| // Ensure the target type can hold the converted value (TIME32 -> int32). | ||
| if constexpr (std::is_same<OutT, int32_t>::value) { | ||
| if (out64 < (std::numeric_limits<int32_t>::min)() || | ||
| out64 > (std::numeric_limits<int32_t>::max)()) { | ||
| ArrowErrorSet(error, | ||
| "[libpq] TIME value %" PRId64 | ||
| " usec converts to %" PRId64 | ||
| " which overflows int32 for Arrow TIME32", | ||
| time_usec, out64); | ||
| return EOVERFLOW; | ||
| } | ||
|
|
||
| const int32_t out32 = static_cast<int32_t>(out64); | ||
| NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &out32, sizeof(out32))); | ||
| } else { | ||
| const int64_t out = static_cast<int64_t>(out64); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: redundant cast?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the symmetry and avoided the redundant cast. |
||
| NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_, &out, sizeof(out))); | ||
| } | ||
|
|
||
| return AppendValid(array); | ||
| } | ||
| }; | ||
|
|
||
| // Reader for Pg->Arrow conversions whose Arrow representation is simply the | ||
| // bytes of the field representation. This can be used with binary and string | ||
| // Arrow types and any Postgres type. | ||
|
|
@@ -935,11 +1010,43 @@ static inline ArrowErrorCode MakeCopyFieldReader( | |
| return NANOARROW_OK; | ||
| } | ||
|
|
||
| case NANOARROW_TYPE_TIME32: { | ||
| switch (pg_type.type_id()) { | ||
| case PostgresTypeId::kTime: | ||
| switch (schema_view.time_unit) { | ||
| case NANOARROW_TIME_UNIT_SECOND: | ||
| *out = std::make_unique< | ||
| PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_SECOND, int32_t>>(); | ||
| return NANOARROW_OK; | ||
| case NANOARROW_TIME_UNIT_MILLI: | ||
| *out = std::make_unique< | ||
| PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_MILLI, int32_t>>(); | ||
| return NANOARROW_OK; | ||
| default: | ||
| // TIME32 only supports second/milli in Arrow. [3](https://arrow.apache.org/docs/cpp/api/datatype.html) | ||
| return ErrorCantConvert(error, pg_type, schema_view); | ||
| } | ||
| default: | ||
| return ErrorCantConvert(error, pg_type, schema_view); | ||
| } | ||
| } | ||
|
|
||
| case NANOARROW_TYPE_TIME64: { | ||
| switch (pg_type.type_id()) { | ||
| case PostgresTypeId::kTime: | ||
| *out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>(); | ||
| return NANOARROW_OK; | ||
| switch (schema_view.time_unit) { | ||
| case NANOARROW_TIME_UNIT_MICRO: | ||
| *out = std::make_unique< | ||
| PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_MICRO, int64_t>>(); | ||
| return NANOARROW_OK; | ||
| case NANOARROW_TIME_UNIT_NANO: | ||
| *out = std::make_unique< | ||
| PostgresCopyTimeOfDayFieldReader<NANOARROW_TIME_UNIT_NANO, int64_t>>(); | ||
| return NANOARROW_OK; | ||
| default: | ||
| // TIME64 only supports micro/nano in Arrow. [3](https://arrow.apache.org/docs/cpp/api/datatype.html) | ||
| return ErrorCantConvert(error, pg_type, schema_view); | ||
| } | ||
| default: | ||
| return ErrorCantConvert(error, pg_type, schema_view); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -735,6 +735,50 @@ class PostgresCopyTimestampFieldWriter : public PostgresCopyFieldWriter { | |
| } | ||
| }; | ||
|
|
||
| // Microseconds per day (24h) | ||
| constexpr int64_t kUsecsPerDay = 86400LL * 1000000LL; | ||
|
|
||
| template <enum ArrowTimeUnit TU> | ||
| class PostgresCopyTimeFieldWriter : public PostgresCopyFieldWriter { | ||
| public: | ||
| ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override { | ||
| // PostgreSQL TIME binary format is an int64 microseconds-since-midnight | ||
| // and the COPY binary field length must be 8 bytes. https://www.postgresql.org/docs/current/datatype-datetime.html | ||
| constexpr int32_t field_size_bytes = sizeof(int64_t); | ||
| NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error)); | ||
|
|
||
| const int64_t raw_value = ArrowArrayViewGetIntUnsafe(array_view_, index); | ||
| int64_t micros = 0; | ||
|
|
||
| switch (TU) { | ||
| case NANOARROW_TIME_UNIT_SECOND: | ||
| micros = raw_value * 1000000LL; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_MILLI: | ||
| micros = raw_value * 1000LL; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_MICRO: | ||
| micros = raw_value; | ||
| break; | ||
| case NANOARROW_TIME_UNIT_NANO: | ||
| micros = raw_value / 1000LL; | ||
| break; | ||
| } | ||
|
|
||
| if (micros < 0 || micros > kUsecsPerDay) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. If we assume the Arrow data isn't necessarily valid, don't we have to watch for overflow when we do the multiplication above? Or if we do assume the data is valid, then this can't happen, right?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reused the overflow validation logic of Duration or Timestamp |
||
| ArrowErrorSet(error, | ||
| "[libpq] Row %" PRId64 | ||
| " time value %" PRId64 " (unit %d) -> %" PRId64 | ||
| " microseconds is out of range [0, %" PRId64 "]", | ||
| index, raw_value, TU, micros, kUsecsPerDay); | ||
| return ADBC_STATUS_INVALID_ARGUMENT; | ||
| } | ||
|
|
||
| NANOARROW_RETURN_NOT_OK(WriteChecked<int64_t>(buffer, micros, error)); | ||
| return ADBC_STATUS_OK; | ||
| } | ||
| }; | ||
|
|
||
| static inline ArrowErrorCode MakeCopyFieldWriter( | ||
| struct ArrowSchema* schema, struct ArrowArrayView* array_view, | ||
| const PostgresTypeResolver& type_resolver, | ||
|
|
@@ -773,12 +817,34 @@ static inline ArrowErrorCode MakeCopyFieldWriter( | |
| *out = T::Create<T>(array_view); | ||
| return NANOARROW_OK; | ||
| } | ||
| case NANOARROW_TYPE_TIME32: { | ||
| switch (schema_view.time_unit) { | ||
| case NANOARROW_TIME_UNIT_SECOND: { | ||
| using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_SECOND>; | ||
| *out = T::Create<T>(array_view); | ||
| return NANOARROW_OK; | ||
| } | ||
| case NANOARROW_TIME_UNIT_MILLI: { | ||
| using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_MILLI>; | ||
| *out = T::Create<T>(array_view); | ||
| return NANOARROW_OK; | ||
| } | ||
| default: | ||
| return ADBC_STATUS_NOT_IMPLEMENTED; | ||
| } | ||
| } | ||
| case NANOARROW_TYPE_TIME64: { | ||
| switch (schema_view.time_unit) { | ||
| case NANOARROW_TIME_UNIT_MICRO: | ||
| using T = PostgresCopyNetworkEndianFieldWriter<int64_t>; | ||
| case NANOARROW_TIME_UNIT_MICRO: { | ||
| using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_MICRO>; | ||
| *out = T::Create<T>(array_view); | ||
| return NANOARROW_OK; | ||
| } | ||
| case NANOARROW_TIME_UNIT_NANO: { | ||
| using T = PostgresCopyTimeFieldWriter<NANOARROW_TIME_UNIT_NANO>; | ||
| *out = T::Create<T>(array_view); | ||
| return NANOARROW_OK; | ||
| } | ||
| default: | ||
| return ADBC_STATUS_NOT_IMPLEMENTED; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,29 @@ | |||||
| // under the License. | ||||||
|
|
||||||
|
|
||||||
| // part: metadata | ||||||
| // part: expected_schema | ||||||
|
|
||||||
| skip = "COPY Writer not implemented" | ||||||
| { | ||||||
| "format": "+s", | ||||||
| "children": [ | ||||||
| { | ||||||
| "name": "idx", | ||||||
| "format": "l", | ||||||
| "flags": ["nullable"] | ||||||
| }, | ||||||
| { | ||||||
| "name": "value", | ||||||
| "format": "ttm", | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We always read microseconds, so let's expect microseconds (also the values below need to be adjusted)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| "flags": ["nullable"] | ||||||
| } | ||||||
| ] | ||||||
| } | ||||||
|
|
||||||
| // part: expected | ||||||
|
|
||||||
| {"idx": 0, "value": null} | ||||||
| {"idx": 1, "value": 0} | ||||||
| {"idx": 2, "value": 1} | ||||||
| {"idx": 3, "value": 3723123} | ||||||
| {"idx": 4, "value": 86399999} | ||||||
| {"idx": 5, "value": 86400000} | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
|
|
||
| // part: expected_schema | ||
|
|
||
| { | ||
| "format": "+s", | ||
| "children": [ | ||
| { | ||
| "name": "idx", | ||
| "format": "l", | ||
| "flags": ["nullable"] | ||
| }, | ||
| { | ||
| "name": "value", | ||
| "format": "ttu", | ||
| "flags": ["nullable"] | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| // part: expected | ||
|
|
||
| {"idx": 0, "value": null} | ||
| {"idx": 1, "value": 0} | ||
| {"idx": 2, "value": 1} | ||
| {"idx": 3, "value": 3723123456} | ||
| {"idx": 4, "value": 86399999999} | ||
| {"idx": 5, "value": 86400000000} | ||
|
Comment on lines
+37
to
+44
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It appears these values do not line up with what's expected (https://github.com/adbc-drivers/validation/blob/main/adbc_drivers_validation/queries/ingest/time_us.txtcase) Frankly there should be no need to override this case? |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,24 @@ | |
| // under the License. | ||
|
|
||
|
|
||
| // part: metadata | ||
| // part: expected_schema | ||
|
|
||
| skip = "COPY Writer not implemented" | ||
| { | ||
| "format": "+s", | ||
| "children": [ | ||
| { | ||
| "name": "value", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Field name should be "res" |
||
| "format": "ttm", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I expect you will need microseconds here and below |
||
| "flags": ["nullable"] | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| // part: expected | ||
|
|
||
| {"value": null} | ||
| {"value": 0} | ||
| {"value": 0} | ||
| {"value": 3723123} | ||
| {"value": 86399999} | ||
| {"value": 86400000} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible given we're already validating that the value fits in 24 hours?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this should never happen.