Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ omit =
*__init__*

[report]
fail_under = 85
fail_under = 90
5 changes: 2 additions & 3 deletions dynamicio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""A package for wrapping your I/O operations."""

# pylint: disable=abstract-method
import os
from contextlib import suppress

Expand All @@ -9,7 +8,7 @@
try:
from importlib.metadata import PackageNotFoundError, version
except ImportError:
from importlib_metadata import PackageNotFoundError, version # type: ignore
from importlib_metadata import PackageNotFoundError, version

with suppress(Exception):
try:
Expand All @@ -24,7 +23,7 @@
os.environ["LC_CTYPE"] = "en_US.UTF" # Set your locale to a unicode-compatible one


class UnifiedIO(WithS3File, WithS3PathPrefix, WithLocalBatch, WithLocal, WithKafka, WithAthena, WithPostgres, DynamicDataIO): # type: ignore
class UnifiedIO(WithS3File, WithS3PathPrefix, WithLocalBatch, WithLocal, WithKafka, WithAthena, WithPostgres, DynamicDataIO):
"""A unified io composed of dynamicio.mixins."""


Expand Down
1 change: 1 addition & 0 deletions dynamicio/config/pydantic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def _validate_bindings(cls, value: Mapping):
def update_config_refs(self) -> "BindingsYaml":
"""Updates dynamic parts of the config.

Specifically:
- Configure _parent for all `IOEnvironment`s
- Replace all IOSchemaRef with actual schema objects
"""
Expand Down
4 changes: 2 additions & 2 deletions dynamicio/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Implements the DynamicDataIO class which provides functionality for data: loading; sinking, and; schema validation."""

# pylint: disable=too-many-positional-arguments, disable=no-member
# pylint: disable=no-member
__all__ = ["DynamicDataIO", "SCHEMA_FROM_FILE", "CASTING_WARNING_MSG"]

import asyncio
Expand All @@ -9,7 +9,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pandas as pd # type: ignore
import pandas as pd
import pydantic
from magic_logger import logger

Expand Down
1 change: 1 addition & 0 deletions dynamicio/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Hosts exception implementations for different errors."""

# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, super-init-not-called

__all__ = [
"DynamicIOError",
"DataSourceError",
Expand Down
64 changes: 34 additions & 30 deletions dynamicio/mixins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,58 @@
from contextlib import contextmanager
from enum import Enum
from functools import wraps
from types import FunctionType, MethodType
from typing import Any, Collection, Iterable, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Collection, Iterable, Mapping, MutableMapping, Optional, Union

from magic_logger import logger


def allow_options(options: Union[Iterable, FunctionType, MethodType]):
"""Validate **options for a decorated reader function.
def allow_options(options: Union[Iterable[str], Callable]):
"""Decorator to filter **kwargs passed to a function, allowing only valid ones.

Args:
options: A set of valid options for a reader (e.g. `pandas.read_parquet` or `pandas.read_csv`)
options: A list of valid options or a callable that returns a list of valid options.

Returns:
read_with_valid_options: The input function called with modified options.
Callable: A decorator that filters **kwargs passed to the function.
"""

def _filter_out_irrelevant_options(kwargs: Mapping, valid_options: Iterable):
filtered_options = {}
invalid_options = {}
for key_arg in kwargs.keys():
if key_arg in valid_options:
filtered_options[key_arg] = kwargs[key_arg]
for key, val in kwargs.items():
if key in valid_options:
filtered_options[key] = val
else:
invalid_options[key_arg] = kwargs[key_arg]
if len(invalid_options) > 0:
logger.warning(
f"Options {invalid_options} were not used because they were not supported by the read or write method configured for this source. "
"Check if you expected any of those to have been used by the operation!"
)
invalid_options[key] = val
if invalid_options:
logger.warning(f"Options {invalid_options} were not used because they are not supported by this operation. " f"Review your kwargs!")
return filtered_options

def read_with_valid_options(func):
@wraps(func)
def _(*args, **kwargs):
if callable(options):
return func(*args, **_filter_out_irrelevant_options(kwargs, args_of(options)))
return func(*args, **_filter_out_irrelevant_options(kwargs, options))
def wrapper(*args, **kwargs):
valid = args_of(options) if callable(options) else set(options)
return func(*args, **_filter_out_irrelevant_options(kwargs, valid))

return _
return wrapper

return read_with_valid_options


def args_of(func):
"""Retrieve allowed options for a given function.
def args_of(*funcs) -> set[str]:
"""Retrieve a set of accepted keyword arguments from one or more functions.

Args:
func: A function like, e.g., pd.read_csv
funcs: A list of functions to inspect.

Returns:
A set of allowed options
set[str]: A set of accepted keyword arguments.
"""
return set(inspect.signature(func).parameters.keys())
allowed_args = set()
for func in funcs:
sig = inspect.signature(func)
allowed_args.update(sig.parameters.keys())
return allowed_args


def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disable=C0103
Expand All @@ -68,13 +67,12 @@ def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disab
If `s` is not a string template, the returned `Collection` is empty.

Args:
s:
s: A string which is either a template, e.g. /path/to/file/{replace_me}.h5 or just a path /path/to/file/dont_replace_me.h5

Returns:
Collection[str]

Example:

>>> get_string_template_field_names("abc{def}{efg}")
["def", "efg"]
>>> get_string_template_field_names("{0}-{1}")
Expand Down Expand Up @@ -103,8 +101,7 @@ def resolve_template(path: str, options: MutableMapping[str, Any]) -> str: # py
str: Returns a static path replaced with the value in the options mapping.

Raises:
ValueError: if any template fields in s are not named using valid Python identifiers
ValueError: if a given template field cannot be resolved in `options`
ValueError: if any template fields in s are not named using valid Python identifiers or if a given template field cannot be resolved in `options`
"""
fields = get_string_template_field_names(path)

Expand Down Expand Up @@ -144,5 +141,12 @@ def pickle_protocol(protocol: Optional[int]):


def get_file_type_value(file_type: Union[str, Enum]) -> str:
"""Get the value of the file type."""
"""Get the value of the file type.

Args:
file_type: The file type, which can be a string or an Enum.

Returns:
str: The value of the file type.
"""
return file_type.value if isinstance(file_type, Enum) else file_type
10 changes: 9 additions & 1 deletion dynamicio/mixins/with_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@


class WithAthena:
"""Handles I/O operations for AWS Athena."""
"""Handles I/O operations for AWS Athena.

Note:
The `__abstractmethods__ = frozenset()` is used to silence false positives from pylint,
which might wrongly assume this class has abstract methods due to NotImplementedError.
This class is *not* an abstract base class and does not use `abc.ABC`.
"""

__abstractmethods__ = frozenset()
Comment on lines +22 to +30

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes a pylint false positive on abstract methods not implemented...


sources_config: AthenaDataEnvironment
options: MutableMapping[str, Any]
Expand Down
7 changes: 7 additions & 0 deletions dynamicio/mixins/with_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,15 @@ class WithKafka:
>>> {"key": "key-02", "value": {"bar": 1000, "baz": "ABC", "foo": "id_2", "id": "cm_2", "new_field": "new_value"}},
>>> {"key": "key-03", "value": {"bar": 1000, "baz": "ABC", "foo": "id_3", "id": "cm_3", "new_field": "new_value"}},
>>> ]

Note:
The `__abstractmethods__ = frozenset()` is used to silence false positives from pylint,
which might wrongly assume this class has abstract methods due to NotImplementedError.
This class is *not* an abstract base class and does not use `abc.ABC`.
"""

__abstractmethods__ = frozenset()

sources_config: KafkaDataEnvironment
schema: DataframeSchema
options: MutableMapping[str, Any]
Expand Down
73 changes: 58 additions & 15 deletions dynamicio/mixins/with_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from threading import Lock
from typing import Any, MutableMapping

import pandas as pd # type: ignore
from fastparquet import ParquetFile, write # type: ignore
from pyarrow.parquet import read_table, write_table # type: ignore # pylint: disable=no-name-in-module
import pandas as pd
from fastparquet import ParquetFile, write
from magic_logger import logger
from pyarrow.parquet import read_table, write_table

# Application Imports
from dynamicio.config.pydantic import DataframeSchema, LocalBatchDataEnvironment, LocalDataEnvironment
from dynamicio.mixins import utils
from dynamicio.mixins.utils import get_file_type_value
Expand All @@ -32,8 +34,10 @@ def _read_from_local(self) -> pd.DataFrame:
- `file_path`
- `file_type`

To actually read the file, a method is dynamically invoked by name, using
"_read_{file_type}_file".
To actually read the file, a method is dynamically invoked by name, using "_read_{file_type}_file".

Additional options:
- single_record: bool: used for json files. If True, treats the file as a single JSON object instead of a list of records.

Returns:
DataFrame
Expand Down Expand Up @@ -106,23 +110,49 @@ def _read_csv_file(file_path: str, schema: DataframeSchema, **options: Any) -> p
return pd.read_csv(file_path, **options)

@staticmethod
@utils.allow_options(pd.read_json)
@utils.allow_options([*utils.args_of(pd.read_json), *["single_record"]])
def _read_json_file(file_path: str, schema: DataframeSchema, **options: Any) -> pd.DataFrame:
"""Read a json file as a DataFrame using `pd.read_hdf`.

All `options` are passed directly to `pd.read_hdf`.

Args:
file_path:
options:
file_path: The path to the json file to be read.
options: The pandas `read_json` options.

Returns:
DataFrame
DataFrame: The dataframe read from the json file.
"""
df = pd.read_json(file_path, **options)
columns = [column for column in df.columns.to_list() if column in schema.column_names]
df = df[columns]
return df
user_orient = options.pop("orient", None)
user_lines = options.pop("lines", None)

if user_orient is not None and user_orient != "records":
raise ValueError("[local-json] Unsupported orient='{user_orient}'. Only 'records' orientation is supported.")

if user_lines is not None and user_lines is not False:
logger.warning("[local-json-read] Overriding lines=%s with lines=False for consistency with aws-wrangler expectations.", user_lines)

if options.get("convert_dates") is True:
logger.warning("[local-json-read] Ignoring 'convert_dates=True'. Handle datetime parsing post-read.")
options.pop("convert_dates", None)

is_single_record = options.pop("single_record", False)
df = pd.read_json(file_path, orient="records", convert_dates=False, lines=False, **options)

# 🧼 Check if this is a single-record json file
if is_single_record:
# Re-wrap as single dict row β€” i.e., rehydrate the record
df = pd.DataFrame([{df.columns[0]: dict(zip(df.index, df.iloc[:, 0]))}])
elif (
df.shape[1] == 1
and df.columns.dtype == "object"
and df.index.dtype == "object"
and all(isinstance(i, str) for i in df.index)
and all(isinstance(v, (str, int, float, bool, type(None))) for v in df.iloc[:, 0])
):
logger.warning("[local-json-read] File appears to be a single-record JSON object. Pass 'single_record=True' in options to handle this case.")
Comment on lines +142 to +153

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary for alignment with wrangler.read_json() behaviour.


return df[[col for col in df.columns if col in schema.column_names]]

@staticmethod
def _read_parquet_file(file_path: str, schema: DataframeSchema, **options: Any) -> pd.DataFrame:
Expand Down Expand Up @@ -192,7 +222,10 @@ def _write_csv_file(df: pd.DataFrame, file_path: str, **options: Any):
@staticmethod
@utils.allow_options(pd.DataFrame.to_json)
def _write_json_file(df: pd.DataFrame, file_path: str, **options: Any):
"""Write a dataframe as a json file using `df.to_json`.
"""Writes a JSON file using 'records' orientation with lines=True.

If the user provides an unsupported `orient`, raise an error.
This mirrors wr.s3.to_json and guarantees tabular consistency.

All `options` are passed directly to `df.to_json`.

Expand All @@ -201,6 +234,16 @@ def _write_json_file(df: pd.DataFrame, file_path: str, **options: Any):
file_path: The location where the file needs to be written.
options: Options relative to writing a json file.
"""
user_orient = options.pop("orient", None)
user_lines = options.pop("lines", None)

if user_orient is not None and user_orient != "records":
raise ValueError(
f"[local-json] Unsupported orient='{user_orient}'. Only 'records' orientation is supported for tabular output (imposed for aws-wrangler consistency reasons)."
)
if user_lines is not None and user_lines is not True:
logger.warning("[local-json-write] Overriding lines=%s with lines=True for consistency.", user_lines)

df.to_json(file_path, **options)

@staticmethod
Expand Down Expand Up @@ -283,6 +326,6 @@ def _read_from_local_batch(self) -> pd.DataFrame:
dfs_to_concatenate = []
for file in files:
file_to_load = os.path.join(file_path, file)
dfs_to_concatenate.append(getattr(self, f"_read_{file_type}_file")(file_to_load, self.schema, **self.options)) # type: ignore
dfs_to_concatenate.append(getattr(self, f"_read_{file_type}_file")(file_to_load, self.schema, **self.options))

return pd.concat(dfs_to_concatenate).reset_index(drop=True)
Loading
Loading