Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 85 additions & 4 deletions go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,21 @@ func (c *connectionImpl) toArrowField(columnInfo driverbase.ColumnInfo) arrow.Fi
field.Type = arrow.FixedWidthTypes.Timestamp_ns
}
case "GEOGRAPHY":
fallthrough
// With GEOGRAPHY_OUTPUT_FORMAT=WKB, data arrives as binary WKB.
// GEOGRAPHY is always WGS84 (SRID 4326).
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
})
case "GEOMETRY":
field.Type = arrow.BinaryTypes.String
// With GEOMETRY_OUTPUT_FORMAT=WKB, data arrives as binary WKB.
// TODO: SRID for GEOMETRY requires inspecting data or a separate query.
// Same cross-driver issue as adbc-drivers/redshift#2 and adbc-drivers/databricks#350.
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
})
case "VECTOR":
// despite the fact that Snowflake *does* support returning data
// for VECTOR typed columns as Arrow FixedSizeLists, there's no way
Expand Down Expand Up @@ -559,9 +571,16 @@ func descToField(name, typ, isnull, primary string, comment sql.NullString, maxT
case "VARIANT":
field.Type = arrow.BinaryTypes.String
case "GEOGRAPHY":
fallthrough
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
})
case "GEOMETRY":
field.Type = arrow.BinaryTypes.String
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
})
case "BOOLEAN":
field.Type = arrow.FixedWidthTypes.Boolean
default:
Expand Down Expand Up @@ -623,6 +642,68 @@ func descToField(name, typ, isnull, primary string, comment sql.NullString, maxT
return
}

// detectGeoColumnsFromQuery attempts to extract a table name from a SQL query
// and runs DESCRIBE TABLE to identify GEOGRAPHY/GEOMETRY columns.
// Returns nil if the table name can't be determined or no geo columns exist.
// This works for table scans (SELECT ... FROM schema.table) which is the common
// case for adbc_scan. Arbitrary queries return nil — data is correct but without
// geoarrow metadata. TODO: Support arbitrary queries.
func (c *connectionImpl) detectGeoColumnsFromQuery(ctx context.Context, query string) map[string]geoColumnType {
// Simple extraction: find "FROM <table>" in the query.
// Handles: SELECT ... FROM schema.table, SELECT ... FROM "schema"."table", etc.
upper := strings.ToUpper(strings.TrimSpace(query))
fromIdx := strings.Index(upper, "FROM ")
if fromIdx == -1 {
return nil
}

// Extract table reference after FROM
rest := strings.TrimSpace(query[fromIdx+5:])
// Take until next SQL keyword or end
endIdx := len(rest)
for _, kw := range []string{" WHERE ", " ORDER ", " GROUP ", " HAVING ", " LIMIT ", " UNION ", " JOIN ", " LEFT ", " RIGHT ", " INNER ", " OUTER ", " CROSS "} {
if idx := strings.Index(strings.ToUpper(rest), kw); idx != -1 && idx < endIdx {
endIdx = idx
}
}
tableName := strings.TrimSpace(rest[:endIdx])
if tableName == "" {
return nil
}

// Run DESCRIBE TABLE to get original column types
rows, err := c.cn.QueryContext(ctx, "DESC TABLE "+tableName, nil)
if err != nil {
return nil
}
defer func() { _ = rows.Close() }()

geoCols := make(map[string]geoColumnType)
dest := make([]driver.Value, len(rows.Columns()))
for {
if err := rows.Next(dest); err != nil {
break
}
if len(dest) < 2 {
continue
}
name, _ := dest[0].(string)
typ, _ := dest[1].(string)
typ = strings.ToUpper(typ)

if strings.HasPrefix(typ, "GEOGRAPHY") {
geoCols[name] = geoColumnGeography
} else if strings.HasPrefix(typ, "GEOMETRY") {
geoCols[name] = geoColumnGeometry
}
}

if len(geoCols) == 0 {
return nil
}
return geoCols
}

func (c *connectionImpl) getStringQuery(query string) (value string, err error) {
result, err := c.cn.QueryContext(context.Background(), query, nil)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions go/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,22 @@ func (d *databaseImpl) Open(ctx context.Context) (adbcConnection adbc.Connection
ctx, span := driverbase.StartSpan(ctx, "databaseImpl.Open", d)
defer driverbase.EndSpan(span, err)

// Set WKB output for geospatial columns so they arrive as binary WKB
// instead of GeoJSON strings. Geo column detection is done separately
// via DESCRIBE TABLE (catalog metadata is unaffected by output format).
// Note: Snowflake's REST API rowtype metadata reports "binary" instead of
// "geography"/"geometry" when WKB format is set — we've reported this to Snowflake.
if d.cfg.Params == nil {
d.cfg.Params = make(map[string]*string)
}
wkb := "WKB"
if _, ok := d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"] = &wkb
}
if _, ok := d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"] = &wkb
}
Comment on lines +543 to +549
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
wkb := "WKB"
if _, ok := d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"] = &wkb
}
if _, ok := d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"] = &wkb
}
if _, ok := d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"] = new("WKB")
}
if _, ok := d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"]; !ok {
d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"] = new("WKB")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if a user explicitly sets this to something that is not WKB, we should avoid adding the GeoArrow type and metadata, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting point. Snowflake I believe is the only engine that allows you to configure how the output of geo data types are encoded.

My feeling is that if the query or table is returning a native geometry or geography type we should always encode as WKB and mark it as geoarrow. It is not that you can have gearrow encoded on different formats (well you can with WKB vs native arrow objects but that’s another thing). The goal is to preserve the type so I don’t think it makes sense to change that.

But if the user does st_astext(geom) then it becomes text and it should be retuned as that. So essentially behaving like everybody else. You use functions to specify output formats (as_text, as_wkb, as_geojson,as_gml…).

So since this is adbc I would say no.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words. We ignore because we override whatever output format the user has specified.


connector := gosnowflake.NewConnector(drv, *d.cfg)

ctx = gosnowflake.WithArrowAllocator(
Expand Down
42 changes: 38 additions & 4 deletions go/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ import (

const MetadataKeySnowflakeType = "SNOWFLAKE_TYPE"

// geoColumnType identifies the Snowflake geospatial type of a column.
type geoColumnType int

const (
geoColumnNone geoColumnType = iota
geoColumnGeography // GEOGRAPHY — always WGS84/SRID 4326
geoColumnGeometry // GEOMETRY — SRID unknown without data inspection
)

func identCol(_ context.Context, a arrow.Array) (arrow.Array, error) {
a.Retain()
return a, nil
Expand Down Expand Up @@ -80,14 +89,39 @@ func getRecTransformer(sc *arrow.Schema, tr []colTransformer) recordTransformer
}
}

func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision) (*arrow.Schema, recordTransformer) {
func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision, geoCols map[string]geoColumnType) (*arrow.Schema, recordTransformer) {
loc, types := ld.Location(), ld.RowTypes()

fields := make([]arrow.Field, len(sc.Fields()))
transformers := make([]func(context.Context, arrow.Array) (arrow.Array, error), len(sc.Fields()))
for i, f := range sc.Fields() {
srcMeta := types[i]
originalArrowUnit := arrow.TimeUnit(srcMeta.Scale / 3)

// With GEOGRAPHY/GEOMETRY_OUTPUT_FORMAT=WKB, geo columns arrive as binary WKB
// but srcMeta.Type is "binary" (Snowflake REST API limitation). Use the geoCols
// map (from DESCRIBE TABLE) to identify them and tag with geoarrow.wkb metadata.
// Data is already WKB binary — no conversion needed, just pass through.
if geoType, ok := geoCols[f.Name]; ok && geoType != geoColumnNone {
f.Type = arrow.BinaryTypes.Binary
if geoType == geoColumnGeography {
f.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
})
} else {
// TODO: GEOMETRY SRID requires inspecting data or a separate query.
// Same cross-driver issue as adbc-drivers/redshift#2 and
// adbc-drivers/databricks#350.
f.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
})
}
transformers[i] = identCol
fields[i] = f
continue
}

switch strings.ToUpper(srcMeta.Type) {
case "FIXED":
switch f.Type.ID() {
Expand Down Expand Up @@ -551,7 +585,7 @@ type reader struct {
done chan struct{} // signals all producer goroutines have finished
}

func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision) (array.RecordReader, error) {
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision, geoCols map[string]geoColumnType) (array.RecordReader, error) {
batches, err := ld.GetBatches()
if err != nil {
return nil, errToAdbcErr(adbc.StatusInternal, err)
Expand Down Expand Up @@ -671,7 +705,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
done: make(chan struct{}),
}
close(rdr.done) // No goroutines to wait for
rdr.schema, _ = getTransformer(schema, ld, useHighPrecision, maxTimestampPrecision)
rdr.schema, _ = getTransformer(schema, ld, useHighPrecision, maxTimestampPrecision, nil)
return rdr, nil
}

Expand Down Expand Up @@ -710,7 +744,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
}

var recTransform recordTransformer
rdr.schema, recTransform = getTransformer(rr.Schema(), ld, useHighPrecision, maxTimestampPrecision)
rdr.schema, recTransform = getTransformer(rr.Schema(), ld, useHighPrecision, maxTimestampPrecision, geoCols)

group.Go(func() (err error) {
defer rr.Release()
Expand Down
10 changes: 8 additions & 2 deletions go/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (reader array.RecordReade
return nil, err
}

reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision)
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision, nil)
return reader, err
},
currentBatch: st.bound,
Expand All @@ -566,14 +566,20 @@ func (st *statement) ExecuteQuery(ctx context.Context) (reader array.RecordReade
return
}

// Detect geo columns before executing the query. For table scans,
// try to extract the table name and run DESCRIBE TABLE to identify
// GEOGRAPHY/GEOMETRY columns (catalog metadata is unaffected by WKB output format).
// TODO: Support arbitrary queries — currently only table scans get geoarrow metadata.
geoCols := st.cnxn.detectGeoColumnsFromQuery(ctx, st.query)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this opt-in. It would also be good to document it in snowflake.md, and add validation cases for geometry/geography.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don’t know. It really feels this should be fixed upstream, and I agree this is too much… yeah affecting the entire driver for this issue is too much. Maybe this whole thing should not be fixed until snowflake fixes upstream. The workaround is tooo bad.


var loader gosnowflake.ArrowStreamLoader
loader, err = st.cnxn.cn.QueryArrowStream(ctx, st.query)
if err != nil {
err = errToAdbcErr(adbc.StatusInternal, err)
return
}

reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision)
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision, geoCols)
nRows = loader.TotalRows()
return
}
Expand Down
Loading