Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ omit =
*__init__*

[report]
fail_under = 85
fail_under = 90
6 changes: 3 additions & 3 deletions dynamicio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""A package for wrapping your I/O operations."""
# pylint: disable=abstract-method

import os
from contextlib import suppress

Expand All @@ -8,7 +8,7 @@
try:
from importlib.metadata import PackageNotFoundError, version
except ImportError:
from importlib_metadata import PackageNotFoundError, version # type: ignore
from importlib_metadata import PackageNotFoundError, version

with suppress(Exception):
try:
Expand All @@ -23,7 +23,7 @@
os.environ["LC_CTYPE"] = "en_US.UTF" # Set your locale to a unicode-compatible one


class UnifiedIO(WithS3File, WithS3PathPrefix, WithLocalBatch, WithLocal, WithKafka, WithAthena, WithPostgres, DynamicDataIO): # type: ignore
class UnifiedIO(WithS3File, WithS3PathPrefix, WithLocalBatch, WithLocal, WithKafka, WithAthena, WithPostgres, DynamicDataIO):
"""A unified io composed of dynamicio.mixins."""


Expand Down
2 changes: 2 additions & 0 deletions dynamicio/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Invokes dynamicio cli."""

# Application Imports
from dynamicio.cli import run

run()
2 changes: 2 additions & 0 deletions dynamicio/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implements the dynamicio Command Line Interface (CLI)."""

import argparse
import glob
import os
Expand All @@ -8,6 +9,7 @@
import pandas as pd # type: ignore
import yaml

# Application Imports
from dynamicio.errors import InvalidDatasetTypeError


Expand Down
2 changes: 2 additions & 0 deletions dynamicio/config/io_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
}
}
"""

__all__ = ["IOConfig", "SafeDynamicResourceLoader", "SafeDynamicSchemaLoader"]

import re
Expand All @@ -62,6 +63,7 @@
import yaml
from magic_logger import logger

# Application Imports
from dynamicio.config.pydantic import BindingsYaml, IOEnvironment


Expand Down
3 changes: 2 additions & 1 deletion dynamicio/config/pydantic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Pydantic config models."""

# Application Imports
from dynamicio.config.pydantic.config import BindingsYaml
from dynamicio.config.pydantic.io_resources import (
AthenaDataEnvironment,
IOEnvironment,
KafkaDataEnvironment,
LocalBatchDataEnvironment,
LocalDataEnvironment,
PostgresDataEnvironment,
S3DataEnvironment,
S3PathPrefixEnvironment,
AthenaDataEnvironment
)
from dynamicio.config.pydantic.table_schema import DataframeSchema, SchemaColumn
10 changes: 7 additions & 3 deletions dynamicio/config/pydantic/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Pydantic schema for YAML files."""

# pylint: disable=no-member, no-self-argument, unused-argument
"""Pydantic schema for YAML files"""

from typing import Mapping, MutableMapping

import pydantic

# Application Imports
import dynamicio.config.pydantic.io_resources as env_spec


Expand All @@ -21,14 +23,16 @@ def _validate_bindings(cls, value: Mapping):
if not isinstance(value, Mapping):
raise ValueError(f"Bindings must be a mapping. (got {value!r} instead).")
# Tell each binding its name
for (name, sub_config) in value.items():
for name, sub_config in value.items():
if not isinstance(sub_config, MutableMapping):
raise ValueError(f"Each element for the name binding must be a dict. (got {sub_config!r} instead)")
sub_config["__binding_name__"] = name
return value

def update_config_refs(self) -> "BindingsYaml":
"""Updates dynamic parts of the config:
"""Updates dynamic parts of the config.

Specifically:
- Configure _parent for all `IOEnvironment`s
- Replace all IOSchemaRef with actual schema objects
"""
Expand Down
6 changes: 1 addition & 5 deletions dynamicio/config/pydantic/io_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,7 @@ class PostgresDataEnvironment(IOEnvironment):
class AthenaDataSubSection(BaseModel):
"""AWS Athena configuration section."""

s3_staging_dir: str
region_name: str

# Optional fields, one must be provided via YAML or mixin options
query: Optional[str] = None
s3_output: str


class AthenaDataEnvironment(IOEnvironment):
Expand Down
9 changes: 5 additions & 4 deletions dynamicio/config/pydantic/table_schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module defines Config schema for data source (pandas dataframe)."""

# pylint: disable=no-member, no-self-argument, unused-argument

"""This module defines Config schema for data source (pandas dataframe)"""

import enum
from typing import Mapping, Sequence
Expand Down Expand Up @@ -51,11 +52,11 @@ def is_valid_pandas_type(cls, info):

@pydantic.validator("validations", pre=True)
def remap_validations(cls, info):
"""Remap the yaml structure of {validation_type: <params>} to a list with validation_type as a key"""
"""Remap the yaml structure of {validation_type: <params>} to a list with validation_type as a key."""
if not isinstance(info, dict):
raise ValueError(f"{info!r} should be a dict")
out = []
for (key, params) in info.items():
for key, params in info.items():
new_el = params.copy()
new_el.update({"name": key})
out.append(new_el)
Expand All @@ -79,7 +80,7 @@ class DataframeSchema(pydantic.BaseModel):

@pydantic.validator("columns", pre=True)
def supply_column_names(cls, info):
"""Tell each column its name (the key it is listed under)"""
"""Tell each column its name (the key it is listed under)."""
if not isinstance(info, Mapping):
raise ValueError(f"{info!r} shoudl be a dict.")

Expand Down
5 changes: 3 additions & 2 deletions dynamicio/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Implements the DynamicDataIO class which provides functionality for data: loading; sinking, and; schema validation."""

# pylint: disable=no-member
__all__ = ["DynamicDataIO", "SCHEMA_FROM_FILE", "CASTING_WARNING_MSG"]

Expand All @@ -8,7 +9,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pandas as pd # type: ignore
import pandas as pd
import pydantic
from magic_logger import logger

Expand Down Expand Up @@ -100,7 +101,7 @@ def _schema_from_obj(target) -> DataframeSchema:
- CAN have `schema_validations` and `schema_metrics` attributes
"""
col_info = {}
for (col_name, dtype) in target.schema.items():
for col_name, dtype in target.schema.items():
col_validations = {}
col_metrics = []
try:
Expand Down
2 changes: 2 additions & 0 deletions dynamicio/errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Hosts exception implementations for different errors."""

# pylint: disable=missing-module-docstring, missing-class-docstring, missing-function-docstring, super-init-not-called

__all__ = [
"DynamicIOError",
"DataSourceError",
Expand Down
1 change: 1 addition & 0 deletions dynamicio/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A module responsible for metrics generation and logging."""

# pylint: disable=missing-function-docstring,missing-class-docstring
import json
import logging
Expand Down
65 changes: 34 additions & 31 deletions dynamicio/mixins/utils.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,62 @@
"""Mixin utility functions."""
# pylint: disable=no-member, protected-access, too-few-public-methods

import inspect
import string
from contextlib import contextmanager
from enum import Enum
from functools import wraps
from types import FunctionType, MethodType
from typing import Any, Collection, Iterable, Mapping, MutableMapping, Optional, Union
from typing import Any, Callable, Collection, Iterable, Mapping, MutableMapping, Optional, Union

from magic_logger import logger


def allow_options(options: Union[Iterable, FunctionType, MethodType]):
"""Validate **options for a decorated reader function.
def allow_options(options: Union[Iterable[str], Callable]):
"""Decorator to filter **kwargs passed to a function, allowing only valid ones.

Args:
options: A set of valid options for a reader (e.g. `pandas.read_parquet` or `pandas.read_csv`)
options: A list of valid options or a callable that returns a list of valid options.

Returns:
read_with_valid_options: The input function called with modified options.
Callable: A decorator that filters **kwargs passed to the function.
"""

def _filter_out_irrelevant_options(kwargs: Mapping, valid_options: Iterable):
filtered_options = {}
invalid_options = {}
for key_arg in kwargs.keys():
if key_arg in valid_options:
filtered_options[key_arg] = kwargs[key_arg]
for key, val in kwargs.items():
if key in valid_options:
filtered_options[key] = val
else:
invalid_options[key_arg] = kwargs[key_arg]
if len(invalid_options) > 0:
logger.warning(
f"Options {invalid_options} were not used because they were not supported by the read or write method configured for this source. "
"Check if you expected any of those to have been used by the operation!"
)
invalid_options[key] = val
if invalid_options:
logger.warning(f"Options {invalid_options} were not used because they are not supported by this operation. " f"Review your kwargs!")
return filtered_options

def read_with_valid_options(func):
@wraps(func)
def _(*args, **kwargs):
if callable(options):
return func(*args, **_filter_out_irrelevant_options(kwargs, args_of(options)))
return func(*args, **_filter_out_irrelevant_options(kwargs, options))
def wrapper(*args, **kwargs):
valid = args_of(options) if callable(options) else set(options)
return func(*args, **_filter_out_irrelevant_options(kwargs, valid))

return _
return wrapper

return read_with_valid_options


def args_of(func):
"""Retrieve allowed options for a given function.
def args_of(*funcs) -> set[str]:
"""Retrieve a set of accepted keyword arguments from one or more functions.

Args:
func: A function like, e.g., pd.read_csv
funcs: A list of functions to inspect.

Returns:
A set of allowed options
set[str]: A set of accepted keyword arguments.
"""
return set(inspect.signature(func).parameters.keys())
allowed_args = set()
for func in funcs:
sig = inspect.signature(func)
allowed_args.update(sig.parameters.keys())
return allowed_args


def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disable=C0103
Expand All @@ -67,13 +65,12 @@ def get_string_template_field_names(s: str) -> Collection[str]: # pylint: disab
If `s` is not a string template, the returned `Collection` is empty.

Args:
s:
s: A string which is either a template, e.g. /path/to/file/{replace_me}.h5 or just a path /path/to/file/dont_replace_me.h5

Returns:
Collection[str]

Example:

>>> get_string_template_field_names("abc{def}{efg}")
["def", "efg"]
>>> get_string_template_field_names("{0}-{1}")
Expand Down Expand Up @@ -102,8 +99,7 @@ def resolve_template(path: str, options: MutableMapping[str, Any]) -> str: # py
str: Returns a static path replaced with the value in the options mapping.

Raises:
ValueError: if any template fields in s are not named using valid Python identifiers
ValueError: if a given template field cannot be resolved in `options`
ValueError: if any template fields in s are not named using valid Python identifiers or if a given template field cannot be resolved in `options`
"""
fields = get_string_template_field_names(path)

Expand Down Expand Up @@ -143,5 +139,12 @@ def pickle_protocol(protocol: Optional[int]):


def get_file_type_value(file_type: Union[str, Enum]) -> str:
"""Get the value of the file type."""
"""Get the value of the file type.

Args:
file_type: The file type, which can be a string or an Enum.

Returns:
str: The value of the file type.
"""
return file_type.value if isinstance(file_type, Enum) else file_type
Loading