Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ jobs:
SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}

if [[ "${SPARK_VERSION}" == "3.5"* ]] || [[ "${SPARK_VERSION}" == "4."* ]]; then
pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona pyarrow
pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona pyarrow geoarrow-pyarrow sedonadb
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
(cd python; pip install --force-reinstall --no-deps -e .)
fi

mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION} -Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pyflink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
run: |
cd python
uv add apache-flink==1.20.1
uv sync
# uv sync --extra flink
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

- name: Run PyFlink tests
run: |
wget -q https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.8.0-33.1-rc1/geotools-wrapper-1.8.0-33.1-rc1.jar
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
strategy:
matrix:
os: ['ubuntu-latest', 'windows-latest', 'macos-15']
python: ['3.11', '3.10', '3.9', '3.8']
python: ['3.11', '3.10', '3.9']
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had trouble integrating with Python 3.8, it's already one year since it reached EOL, what would you think about removing it? and maybe start supporting Python 3.12 and 3.13

runs-on: ${{ matrix.os }}
defaults:
run:
Expand Down
26 changes: 12 additions & 14 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ jobs:
java: '11'
python: '3.9'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.11'
Expand All @@ -101,15 +97,6 @@ jobs:
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
shapely: '1'

steps:
- uses: actions/checkout@v6
Expand Down Expand Up @@ -181,7 +168,18 @@ jobs:
run: |
cd python
export SPARK_HOME=$(uv run python -c "import site; print(site.getsitepackages()[0]+'/pyspark')")
uv run pytest -v tests
uv run pytest -m "not vectorized" -v tests
- name: Run vectorized udf tests
if: ${{ matrix.spark < '4.0.0' }}
run: |
cd python
export SPARK_HOME=$(uv run python -c "import site; print(site.getsitepackages()[0]+'/pyspark')")
uv pip install --force-reinstall --no-deps -e .
uv remove apache-flink --optional flink
uv add "pyarrow>=16.0.0"
uv add "shapely>=2.0.0"
uv add sedonadb geopandas geoarrow-pyarrow
uv run pytest -m vectorized tests
- name: Run basic tests without rasterio
run: |
cd python
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<!-- <version>3.12.0</version>-->
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

<version>2.10.4</version>
<executions>
<execution>
Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[build-system]
requires = ["setuptools>=69", "wheel"]
requires = ["setuptools", "wheel", "numpy"]
build-backend = "setuptools.build_meta"

[project]
Expand All @@ -26,7 +26,7 @@ description = "Apache Sedona is a cluster computing system for processing large-
readme = "README.md"
license = { text = "Apache-2.0" }
authors = [ { name = "Apache Sedona", email = "dev@sedona.apache.org" } ]
requires-python = ">=3.8"
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
Expand Down
87 changes: 84 additions & 3 deletions python/sedona/spark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@

import pandas as pd

from sedona.spark.sql.types import GeometryType
from sedona.spark.utils import geometry_serde
from pyspark.sql.udf import UserDefinedFunction
from pyspark.sql.types import DataType
from shapely.geometry.base import BaseGeometry
from pyspark.sql.udf import UserDefinedFunction
from sedona.spark.sql.types import GeometryType
from pyspark.sql.types import (
DataType,
FloatType,
DoubleType,
IntegerType,
StringType,
ByteType,
)

from sedona.spark.utils.geometry_serde import sedona_db_speedup_enabled

SEDONA_SCALAR_EVAL_TYPE = 5200
SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
Expand Down Expand Up @@ -142,3 +150,76 @@ def serialize_to_geometry_if_geom(data, return_type: DataType):
return geometry_serde.serialize(data)

return data


def infer_pa_type(spark_type: DataType):
import pyarrow as pa
import geoarrow.pyarrow as ga

if isinstance(spark_type, GeometryType):
return ga.wkb()
elif isinstance(spark_type, FloatType):
return pa.float32()
elif isinstance(spark_type, DoubleType):
return pa.float64()
elif isinstance(spark_type, IntegerType):
return pa.int32()
elif isinstance(spark_type, StringType):
return pa.string()
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_type(spark_type: DataType):
from sedonadb import udf as sedona_udf_module

if isinstance(spark_type, GeometryType):
return sedona_udf_module.GEOMETRY
elif (
isinstance(spark_type, FloatType)
or isinstance(spark_type, DoubleType)
or isinstance(spark_type, IntegerType)
):
return sedona_udf_module.NUMERIC
elif isinstance(spark_type, StringType):
return sedona_udf_module.STRING
elif isinstance(spark_type, ByteType):
return sedona_udf_module.BINARY
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_types(spark_types: list[DataType]):
pa_types = []
for spark_type in spark_types:
pa_type = infer_input_type(spark_type)
pa_types.append(pa_type)

return pa_types


def sedona_db_vectorized_udf(
return_type: DataType,
input_types: list[DataType],
):
from sedonadb import udf as sedona_udf_module

eval_type = 6200
if sedona_db_speedup_enabled:
eval_type = 6201
Comment on lines +207 to +209
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define thse eval types as constants such as SQL_SCALAR_SEDONA_DB_UDF. I believe that we should follow the same pattern as SEDONA_SCALAR_EVAL_TYPE.


def apply_fn(fn):
out_type = infer_pa_type(return_type)
input_types_sedona_db = infer_input_types(input_types)

@sedona_udf_module.arrow_udf(out_type, input_types=input_types_sedona_db)
def shapely_udf(*args, **kwargs):
return fn(*args, **kwargs)

udf = UserDefinedFunction(
lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=eval_type
)

return udf

return apply_fn
38 changes: 37 additions & 1 deletion python/sedona/spark/utils/geometry_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from shapely.geometry.base import BaseGeometry

speedup_enabled = False

sedona_db_speedup_enabled = False

# Use geomserde_speedup when available, otherwise fallback to general pure
# python implementation.
Expand Down Expand Up @@ -62,7 +62,14 @@ def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
return None
return geomserde_speedup.deserialize(buf)

def to_sedona(arr):
return geomserde_speedup.to_sedona_func(arr)

def from_sedona(arr):
return geomserde_speedup.from_sedona_func(arr)

speedup_enabled = True
sedona_db_speedup_enabled = True

elif shapely.__version__.startswith("1."):
# Shapely 1.x uses ctypes.CDLL to load geos_c library. We can obtain the
Expand Down Expand Up @@ -123,14 +130,43 @@ def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
ob.__dict__["_is_empty"] = False
return ob, bytes_read

warn(
f"optimized sedonadb vectorized function is only available for shapely 2.x, using fallback implementation."
)

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

speedup_enabled = True

else:

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

# fallback to our general pure python implementation
from .geometry_serde_general import deserialize, serialize


except Exception as e:
warn(
f"Cannot load geomserde_speedup, fallback to general python implementation. Reason: {e}"
)

warn(
f"Cannot load optimized version of sedonadb vectorized function, using fallback implementation. Reason: {e}"
)

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

from .geometry_serde_general import deserialize, serialize
16 changes: 16 additions & 0 deletions python/sedona/spark/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading
Loading