diff --git a/scrunch/__init__.py b/scrunch/__init__.py index 1bebb302..eeebb3bc 100644 --- a/scrunch/__init__.py +++ b/scrunch/__init__.py @@ -1,8 +1,14 @@ from .session import connect from .datasets import ( - get_user, get_project, get_dataset, get_team, create_team) + get_user, + get_project, + get_dataset, + get_team, + create_team, + create_dataset +) from .streaming_dataset import get_streaming_dataset -from .mutable_dataset import get_mutable_dataset, create_dataset +from .mutable_dataset import get_mutable_dataset from .version import __version__ diff --git a/scrunch/datasets.py b/scrunch/datasets.py index 1164f989..ae36bf3e 100644 --- a/scrunch/datasets.py +++ b/scrunch/datasets.py @@ -20,6 +20,7 @@ import pycrunch from pycrunch.exporting import export_dataset +from pycrunch.importing import Importer from pycrunch.shoji import Entity, TaskProgressTimeoutError from scrunch.session import connect from scrunch.categories import CategoryList @@ -140,7 +141,7 @@ def _get_dataset(dataset, connection=None, editor=False, project=None): Returns a Dataset Entity record if the dataset exists. Raises a KeyError if no such dataset exists. - To get a.BaseDataset from a Project we are building a url and + To get a.Dataset from a Project we are building a url and making a request through pycrunch.session object, we instead should use the /search endpoint from crunch, but currently it's not working by id's. @@ -180,8 +181,6 @@ def _get_dataset(dataset, connection=None, editor=False, project=None): return shoji_ds, root -# FIXME: to be deprecated in favor of get_streaming_dataset and -# get_mutable_dataset def get_dataset(dataset, connection=None, editor=False, project=None): """ A simple wrapper of _get_dataset with streaming=False @@ -193,6 +192,27 @@ def get_dataset(dataset, connection=None, editor=False, project=None): return ds +def create_dataset(name, variables, connection=None, **kwargs): + if connection is None: + connection = _get_connection() + if not connection: + raise AttributeError( + "Authenticate first with scrunch.connect() or by providing " + "config/environment variables") + + dataset_doc = { + 'name': name, + 'table': { + 'element': 'crunch:table', + 'metadata': variables + } + } + dataset_doc.update(**kwargs) + + shoji_ds = connection.datasets.create(shoji_entity_wrapper(dataset_doc)).refresh() + return Dataset(shoji_ds) + + def get_project(project, connection=None): """ :param project: Crunch project ID or Name @@ -398,12 +418,15 @@ def edit(self, member, edit): 'permissions': {self._EDIT_ATTRIBUTE: edit}} }) + class ProjectMembers(Members): _EDIT_ATTRIBUTE = 'edit' + class TeamMembers(Members): _EDIT_ATTRIBUTE = 'team_admin' + class Team: _MUTABLE_ATTRIBUTES = {'name'} _IMMUTABLE_ATTRIBUTES = {'id'} @@ -431,6 +454,7 @@ def members(self): def delete(self): return self.resource.delete() + class Project: _MUTABLE_ATTRIBUTES = {'name', 'description', 'icon'} _IMMUTABLE_ATTRIBUTES = {'id'} @@ -517,7 +541,7 @@ def get_dataset(self, dataset): except KeyError: raise KeyError( "Dataset (name or id: %s) not found in project." % dataset) - ds = BaseDataset(shoji_ds) + ds = Dataset(shoji_ds) return ds def create_project(self, name): @@ -695,7 +719,7 @@ class CrunchBox(object): index with the updated metadata. :param shoji_tuple: pycrunch.shoji.Tuple of boxdata - :param dataset: scrunch.datasets.BaseDataset instance + :param dataset: scrunch.datasets.Dataset instance NOTE: since the boxdata entity is different regarding the mapping of body and metadata fields, methods etc... it is made `readonly`. @@ -931,7 +955,7 @@ class DefaultWeight: pass -class BaseDataset(ReadOnly, DatasetVariablesMixin): +class Dataset(ReadOnly, DatasetVariablesMixin): """ A pycrunch.shoji.Entity wrapper that provides basic dataset methods. """ @@ -949,7 +973,7 @@ def __init__(self, resource): """ :param resource: Points to a pycrunch Shoji Entity for a dataset. """ - super(BaseDataset, self).__init__(resource) + super(Dataset, self).__init__(resource) self._settings = None # since we no longer have an __init__ on DatasetVariablesMixin because # of the multiple inheritance, we just initiate self._vars here @@ -994,15 +1018,6 @@ def change_editor(self, user): self.resource.patch({'current_editor': user.url}) self.resource.refresh() - def make_mutable(self): - from scrunch.mutable_dataset import MutableDataset - return MutableDataset(self.resource) - - def make_streaming(self): - from scrunch.streaming_dataset import StreamingDataset - self.edit(streaming='streaming') - return StreamingDataset(self.resource) - @property def project(self): return Project(self.resource.project) @@ -2328,9 +2343,8 @@ def fork(self, description=None, name=None, is_published=False, dataset otherwise the owner will be the current user in the session and the Dataset will be set under `Persona Project` - :returns _fork: scrunch.datasets.BaseDataset + :returns _fork: scrunch.datasets.Dataset """ - from scrunch.mutable_dataset import MutableDataset nforks = len(self.resource.forks.index) if name is None: if six.PY2: @@ -2356,9 +2370,8 @@ def fork(self, description=None, name=None, is_published=False, # not returning a dataset payload = shoji_entity_wrapper(body) _fork = self.resource.forks.create(payload).refresh() - # return a MutableDataset always user = get_user(self.resource.session.email) - fork_ds = MutableDataset(_fork) + fork_ds = Dataset(_fork) fork_ds.change_editor(user) return fork_ds @@ -2409,7 +2422,7 @@ def replace_from_csv(self, filename, chunksize=1000): streaming_state = self.resource.body.get('streaming', 'no') ds = self if streaming_state != 'streaming': - ds = self.make_streaming() + ds.edit(streaming='streaming') importer = pycrunch.importing.Importer() df_chunks = pd.read_csv( filename, @@ -2697,18 +2710,334 @@ def derive_weight(self, targets, alias, name, description=''): }) return self._var_create_reload_return(payload) + # -------------------------------------------------------------------------------- + # Methods inherited from MutableDataset, need to make sure self.streaming == 'no' + # -------------------------------------------------------------------------------- + def delete(self): + """ + Delete a non-streaming dataset. + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + self.resource.delete() -class Dataset(BaseDataset): + def join(self, left_var, right_ds, right_var, columns=None, + filter=None, wait=True): + """ + Joins a given variable. In crunch joins are left joins, where + left is the dataset variable and right is other dataset variable. + For more information see: + http://docs.crunch.io/?http#merging-and-joining-datasets - _BASE_MUTABLE_ATTRIBUTES = {'streaming'} + :param: columns: Specify a list of variables from right dataset + to bring in the merge: + http://docs.crunch.io/?http#joining-a-subset-of-variables - def __init__(self, resource): - LOG.warning("""Dataset is deprecated, instead use now - mutable_datasets.MutableDataset or streaming_dataset.StreamingDataset - with it's corresponding get_mutable_dataset and get_streaming_dataset - methods""") # noqa: E501 - super(Dataset, self).__init__(resource) - self._MUTABLE_ATTRIBUTES = self._BASE_MUTABLE_ATTRIBUTES | self._BASE_MUTABLE_ATTRIBUTES + :param: wait: Wait for the join progress to finish by polling + or simply return a url to the progress resource + + :param: filter: Filters out rows based on the given expression, + or on a given url for an existing filter. TODO: for the moment + we only allow expressions + """ + if self.resource.body.get('streaming', 'no') == 'streaming': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + right_var_url = right_ds[right_var].url + left_var_url = self[left_var].url + # this dictionary sets the main part of the join + adapter = { + 'function': 'adapt', + 'args': [ + {'dataset': right_ds.url}, + {'variable': right_var_url}, + {'variable': left_var_url} + ] + } + + # wrap the adapter method on a shoji and body entity + payload = shoji_entity_wrapper(adapter) + + if columns and isinstance(columns, list): + # overwrite body to new format + payload['body'] = { + 'frame': adapter, + 'function': 'select', + 'args': [ + {'map': {}} + ] + } + # add the individual variable columns to the payload + for var in columns: + var_url = right_ds[var].url + payload['body']['args'][0]['map'][var_url] = {'variable': var_url} + + if filter: + # in the case of a filter, convert it to crunch + # and attach the filter to the payload + expr = process_expr(parse_expr(filter), right_ds) + payload['body']['filter'] = {'expression': expr} + + progress = self.resource.variables.post(payload) + # poll for progress to finish or return the url to progress + if wait: + return wait_progress(r=progress, session=self.resource.session, entity=self) + return progress.json()['value'] + + def compare_dataset(self, dataset, use_crunch=False): + """ + compare the difference in structure between datasets. The + criterion is the following: + + (1) variables that, when matched across datasets by alias, have different types. + (2) variables that have the same name but don't match on alias. + (3) for variables that match and have categories, any categories that have the + same id but don't match on name. + (4) for array variables that match, any subvariables that have the same name but + don't match on alias. + (5) array variables that, after assembling the union of their subvariables, + point to subvariables that belong to other ds (Not implemented) + (6) missing rules of the variable. + + :param: dataset: Daatset instance to append from + :param: use_crunch: Use the Crunch comparison to compare + :return: a dictionary of differences + + NOTE: this sould be done via: http://docs.crunch.io/#post217 + but doesn't seem to be a working feature of Crunch + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + if use_crunch: + resp = self.resource.batches.follow( + 'compare', 'dataset={}'.format(dataset.url)) + return resp + + diff = { + 'variables': { + 'by_type': [], + 'by_alias': [], + 'by_missing_rules': [], + }, + 'categories': {}, + 'subvariables': {} + } + + array_types = ['multiple_response', 'categorical_array'] + + vars_a = {v.alias: v.type for v in self.values()} + vars_b = {v.alias: v.type for v in dataset.values()} + + # 1. match variables by alias and compare types + common_aliases = frozenset(vars_a.keys()) & frozenset(vars_b.keys()) + for alias in common_aliases: + if vars_a[alias] != vars_b[alias]: + diff['variables']['by_type'].append(dataset[alias].name) + + # 3. match variable alias and distcint categories names for same id's + if vars_b[alias] == 'categorical' and vars_a[alias] == 'categorical': + a_ids = frozenset([v.id for v in self[alias].categories.values()]) + b_ids = frozenset([v.id for v in dataset[alias].categories.values()]) + common_ids = a_ids & b_ids + + for id in common_ids: + a_name = self[alias].categories[id].name + b_name = dataset[alias].categories[id].name + if a_name != b_name: + if diff['categories'].get(dataset[alias].name): + diff['categories'][dataset[alias].name].append(id) + else: + diff['categories'][dataset[alias].name] = [] + diff['categories'][dataset[alias].name].append(id) + + # 2. match variables by names and compare aliases + common_names = frozenset(self.variable_names()) & frozenset(dataset.variable_names()) + for name in common_names: + if self[name].alias != dataset[name].alias: + diff['variables']['by_alias'].append(name) + + # 4. array types that match, subvars with same name and != alias + if dataset[name].type == self[name].type and \ + self[name].type in array_types and \ + self[name].type in array_types: + + a_names = frozenset(self[name].variable_names()) + b_names = frozenset(dataset[name].variable_names()) + common_subnames = a_names & b_names + + for sv_name in common_subnames: + if self[name][sv_name].alias != dataset[name][sv_name].alias: + if diff['subvariables'].get(name): + diff['subvariables'][name].append(dataset[name][sv_name].alias) + else: + diff['subvariables'][name] = [] + diff['subvariables'][name].append(dataset[name][sv_name].alias) + + # 6. missing rules mismatch + if self[name].type not in CATEGORICAL_TYPES and dataset[name].type not in CATEGORICAL_TYPES: + if self[name].missing_rules != dataset[name].missing_rules: + rules1 = self[name].missing_rules + rules2 = dataset[name].missing_rules + if len(rules1.keys()) == len(rules2.keys()): + for key, value in rules1.items(): + if key not in rules2 or rules2[key] != value: + diff['variables']['by_missing_rules'].append(name) + else: + diff['variables']['by_missing_rules'].append(name) + return diff + + def append_dataset(self, dataset, filter=None, variables=None, + autorollback=True, delete_pk=True): + """ Append dataset into self. If this operation fails, the + append is rolledback. Dataset variables and subvariables + are matched on their aliases and categories are matched by name. + + :param: dataset: Daatset instance to append from + :param: filter: An expression to filter dataset rows. cannot be a Filter + according to: http://docs.crunch.io/#get211 + :param: variables: A list of variable names to include from dataset + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + if self.url == dataset.url: + raise ValueError("Cannot append dataset to self") + + if variables and not isinstance(variables, list): + raise AttributeError("'variables' must be a list of variable names") + + if delete_pk: + LOG.info("Any pk's found will be deleted, to avoid these pass delete_pk=False") + self.resource.pk.delete() + dataset.resource.pk.delete() + + payload = shoji_entity_wrapper({'dataset': dataset.url}) + payload['autorollback'] = autorollback + + if variables: + id_vars = [] + for var in variables: + id_vars.append(dataset[var].url) + # build the payload with selected variables + payload['body']['where'] = { + 'function': 'select', + 'args': [{ + 'map': { + x: {'variable': x} for x in id_vars + } + }] + } + + if filter: + # parse the filter expression + payload['body']['filter'] = process_expr(parse_expr(filter), dataset.resource) + + return self.resource.batches.create(payload) + + def move_to_categorical_array( + self, name, alias, subvariables, description='', notes=''): + """ + This is a dangerous method that allows moving variables (effectively + translating them as variables in a dataset) as subvariables in the + newly created categorical_array created. + + :param: name: Name of the new variable. + :param: alias: Alias of the new variable + :param: subvariables: A list of existing Dataset variables aliases + to move into the new variable as subvariables .i.e; + subvariables = ['var1_alias', 'var2_alias'] + :param: description: A description of the new variable + :param: notes: Notes to attach to the new variable + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + payload = { + 'name': name, + 'alias': alias, + 'description': description, + 'notes': notes, + 'type': 'categorical_array', + 'subvariables': [self[v].url for v in subvariables] + } + self.resource.variables.create(shoji_entity_wrapper(payload)) + self._reload_variables() + return self[alias] + + def move_to_multiple_response( + self, name, alias, subvariables, description='', notes=''): + """ + This method is a replication of the method move_to_categorical_array, + only this time we are creting a multiple_response variable. + Note: the subvariables need to have at least 1 selected catagory. + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + payload = { + 'name': name, + 'alias': alias, + 'description': description, + 'notes': notes, + 'type': 'multiple_response', + 'subvariables': [self[v].url for v in subvariables] + } + self.resource.variables.create(shoji_entity_wrapper(payload)) + self._reload_variables() + return self[alias] + + def move_as_subvariable(self, destination, source): + """ + Moves a variable as a subvariable of an existing array + type variable. + + :param: destination: The alias of the variable that will receive the subvariable + :param: source: Alias of the variable to move into destination as subvariable + """ + if self.resource.body.get('streaming', 'no') != 'no': + LOG.warning('Method not allowed in Datasets of type "streaming"') + return + payload = json.dumps({"element": "shoji:catalog", "index": {self[source].url: {}}}) + self[destination].resource.subvariables.patch(payload) + + # -------------------------------------------------------------------------------- + # Methods inherited from MutableDataset, need to make sure self.streaming == 'no' + # -------------------------------------------------------------------------------- + def stream_rows(self, columns): + """ + Receives a dict with columns of values to add and streams them + into the dataset. Client must call .push_rows(n) later or wait until + Crunch automatically processes the batch. + + Returns the total of rows streamed + """ + if self.resource.body.get('streaming', 'no') != 'streaming': + LOG.warning('Method not allowed in non-streaming Datasets') + return + importer = Importer() + count = len(list(columns.values())[0]) + for x in range(count): + importer.stream_rows(self.resource, + {a: columns[a][x] for a in columns}) + return count + + def push_rows(self, count=None): + """ + Batches in the rows that have been recently streamed. This forces + the rows to appear in the dataset instead of waiting for crunch + automatic batcher process. + """ + if self.resource.body.get('streaming', 'no') != 'streaming': + LOG.warning('Method not allowed in non-streaming Datasets') + return + if bool(self.resource.stream.body.pending_messages): + self.resource.batches.create( + shoji_entity_wrapper({ + 'stream': count, + 'type': 'ldjson'} + )) class DatasetSubvariablesMixin(DatasetVariablesMixin): diff --git a/scrunch/exceptions.py b/scrunch/exceptions.py index f53e08fb..7c5f9c66 100644 --- a/scrunch/exceptions.py +++ b/scrunch/exceptions.py @@ -18,10 +18,6 @@ class InvalidReferenceError(ValueError): pass -class InvalidDatasetTypeError(Exception): - pass - - class InvalidVariableTypeError(Exception): pass diff --git a/scrunch/expressions.py b/scrunch/expressions.py index 9e4a98ee..cc4ef994 100644 --- a/scrunch/expressions.py +++ b/scrunch/expressions.py @@ -707,7 +707,7 @@ def _resolve_variable(var): if not is_url: return var - elif not isinstance(ds, scrunch.datasets.BaseDataset): + elif not isinstance(ds, scrunch.datasets.Dataset): raise Exception( 'Valid Dataset instance is required to resolve variable urls ' 'in the expression' diff --git a/scrunch/mutable_dataset.py b/scrunch/mutable_dataset.py index 86e6e0f9..32bae146 100644 --- a/scrunch/mutable_dataset.py +++ b/scrunch/mutable_dataset.py @@ -1,320 +1,26 @@ -import json - -from pycrunch.shoji import wait_progress -from scrunch.datasets import (LOG, BaseDataset, _get_connection, _get_dataset, - CATEGORICAL_TYPES) -from scrunch.exceptions import InvalidDatasetTypeError -from scrunch.expressions import parse_expr, process_expr -from scrunch.helpers import shoji_entity_wrapper +from scrunch.datasets import LOG, Dataset, _get_dataset def get_mutable_dataset(dataset, connection=None, editor=False, project=None): """ A simple wrapper of _get_dataset with streaming=False """ + LOG.warning("""MutableDataset is deprecated, instead use now + Dataset with it's corresponding get_dataset() method""") # noqa: E501 shoji_ds, root = _get_dataset(dataset, connection, editor, project) - # make sure the Dataset is of type streaming != "streaming" - if shoji_ds['body'].get('streaming') == 'streaming': - raise InvalidDatasetTypeError("Dataset %s is of type 'streaming',\ - use get_streaming_dataset method instead" % dataset) - ds = MutableDataset(shoji_ds) + ds = Dataset(shoji_ds) if editor is True: ds.change_editor(root.session.email) return ds -def create_dataset(name, variables, connection=None, **kwargs): - if connection is None: - connection = _get_connection() - if not connection: - raise AttributeError( - "Authenticate first with scrunch.connect() or by providing " - "config/environment variables") - - dataset_doc = { - 'name': name, - 'table': { - 'element': 'crunch:table', - 'metadata': variables - } - } - dataset_doc.update(**kwargs) - - shoji_ds = connection.datasets.create(shoji_entity_wrapper(dataset_doc)).refresh() - return MutableDataset(shoji_ds) - - -class MutableDataset(BaseDataset): +class MutableDataset(Dataset): """ Class that enclose mutable dataset methods or any method that varies the state of the dataset and/or it's data. """ - def delete(self): - """ - Delete a dataset. - """ - self.resource.delete() - - def join(self, left_var, right_ds, right_var, columns=None, - filter=None, wait=True): - """ - Joins a given variable. In crunch joins are left joins, where - left is the dataset variable and right is other dataset variable. - For more information see: - http://docs.crunch.io/?http#merging-and-joining-datasets - - :param: columns: Specify a list of variables from right dataset - to bring in the merge: - http://docs.crunch.io/?http#joining-a-subset-of-variables - - :param: wait: Wait for the join progress to finish by polling - or simply return a url to the progress resource - - :param: filter: Filters out rows based on the given expression, - or on a given url for an existing filter. TODO: for the moment - we only allow expressions - """ - right_var_url = right_ds[right_var].url - left_var_url = self[left_var].url - # this dictionary sets the main part of the join - adapter = { - 'function': 'adapt', - 'args': [ - {'dataset': right_ds.url}, - {'variable': right_var_url}, - {'variable': left_var_url} - ] - } - - # wrap the adapter method on a shoji and body entity - payload = shoji_entity_wrapper(adapter) - - if columns and isinstance(columns, list): - # overwrite body to new format - payload['body'] = { - 'frame': adapter, - 'function': 'select', - 'args': [ - {'map': {}} - ] - } - # add the individual variable columns to the payload - for var in columns: - var_url = right_ds[var].url - payload['body']['args'][0]['map'][var_url] = {'variable': var_url} - - if filter: - # in the case of a filter, convert it to crunch - # and attach the filter to the payload - expr = process_expr(parse_expr(filter), right_ds) - payload['body']['filter'] = {'expression': expr} - - progress = self.resource.variables.post(payload) - # poll for progress to finish or return the url to progress - if wait: - return wait_progress(r=progress, session=self.resource.session, entity=self) - return progress.json()['value'] - - def compare_dataset(self, dataset, use_crunch=False): - """ - compare the difference in structure between datasets. The - criterion is the following: - - (1) variables that, when matched across datasets by alias, have different types. - (2) variables that have the same name but don't match on alias. - (3) for variables that match and have categories, any categories that have the - same id but don't match on name. - (4) for array variables that match, any subvariables that have the same name but - don't match on alias. - (5) array variables that, after assembling the union of their subvariables, - point to subvariables that belong to other ds (Not implemented) - (6) missing rules of the variable. - - :param: dataset: Daatset instance to append from - :param: use_crunch: Use the Crunch comparison to compare - :return: a dictionary of differences - - NOTE: this sould be done via: http://docs.crunch.io/#post217 - but doesn't seem to be a working feature of Crunch - """ - - if use_crunch: - resp = self.resource.batches.follow( - 'compare', 'dataset={}'.format(dataset.url)) - return resp - - diff = { - 'variables': { - 'by_type': [], - 'by_alias': [], - 'by_missing_rules': [], - }, - 'categories': {}, - 'subvariables': {} - } - - array_types = ['multiple_response', 'categorical_array'] - - vars_a = {v.alias: v.type for v in self.values()} - vars_b = {v.alias: v.type for v in dataset.values()} - - # 1. match variables by alias and compare types - common_aliases = frozenset(vars_a.keys()) & frozenset(vars_b.keys()) - for alias in common_aliases: - if vars_a[alias] != vars_b[alias]: - diff['variables']['by_type'].append(dataset[alias].name) - - # 3. match variable alias and distcint categories names for same id's - if vars_b[alias] == 'categorical' and vars_a[alias] == 'categorical': - a_ids = frozenset([v.id for v in self[alias].categories.values()]) - b_ids = frozenset([v.id for v in dataset[alias].categories.values()]) - common_ids = a_ids & b_ids - - for id in common_ids: - a_name = self[alias].categories[id].name - b_name = dataset[alias].categories[id].name - if a_name != b_name: - if diff['categories'].get(dataset[alias].name): - diff['categories'][dataset[alias].name].append(id) - else: - diff['categories'][dataset[alias].name] = [] - diff['categories'][dataset[alias].name].append(id) - - # 2. match variables by names and compare aliases - common_names = frozenset(self.variable_names()) & frozenset(dataset.variable_names()) - for name in common_names: - if self[name].alias != dataset[name].alias: - diff['variables']['by_alias'].append(name) - - # 4. array types that match, subvars with same name and != alias - if dataset[name].type == self[name].type and \ - self[name].type in array_types and \ - self[name].type in array_types: - - a_names = frozenset(self[name].variable_names()) - b_names = frozenset(dataset[name].variable_names()) - common_subnames = a_names & b_names - - for sv_name in common_subnames: - if self[name][sv_name].alias != dataset[name][sv_name].alias: - if diff['subvariables'].get(name): - diff['subvariables'][name].append(dataset[name][sv_name].alias) - else: - diff['subvariables'][name] = [] - diff['subvariables'][name].append(dataset[name][sv_name].alias) - - # 6. missing rules mismatch - if self[name].type not in CATEGORICAL_TYPES and dataset[name].type not in CATEGORICAL_TYPES: - if self[name].missing_rules != dataset[name].missing_rules: - rules1 = self[name].missing_rules - rules2 = dataset[name].missing_rules - if len(rules1.keys()) == len(rules2.keys()): - for key, value in rules1.items(): - if key not in rules2 or rules2[key] != value: - diff['variables']['by_missing_rules'].append(name) - else: - diff['variables']['by_missing_rules'].append(name) - return diff - - def append_dataset(self, dataset, filter=None, variables=None, - autorollback=True, delete_pk=True): - """ Append dataset into self. If this operation fails, the - append is rolledback. Dataset variables and subvariables - are matched on their aliases and categories are matched by name. - - :param: dataset: Daatset instance to append from - :param: filter: An expression to filter dataset rows. cannot be a Filter - according to: http://docs.crunch.io/#get211 - :param: variables: A list of variable names to include from dataset - """ - if self.url == dataset.url: - raise ValueError("Cannot append dataset to self") - - if variables and not isinstance(variables, list): - raise AttributeError("'variables' must be a list of variable names") - - if delete_pk: - LOG.info("Any pk's found will be deleted, to avoid these pass delete_pk=False") - self.resource.pk.delete() - dataset.resource.pk.delete() - - payload = shoji_entity_wrapper({'dataset': dataset.url}) - payload['autorollback'] = autorollback - - if variables: - id_vars = [] - for var in variables: - id_vars.append(dataset[var].url) - # build the payload with selected variables - payload['body']['where'] = { - 'function': 'select', - 'args': [{ - 'map': { - x: {'variable': x} for x in id_vars - } - }] - } - - if filter: - # parse the filter expression - payload['body']['filter'] = process_expr(parse_expr(filter), dataset.resource) - - return self.resource.batches.create(payload) - - def move_to_categorical_array( - self, name, alias, subvariables, description='', notes=''): - """ - This is a dangerous method that allows moving variables (effectively - translating them as variables in a dataset) as subvariables in the - newly created categorical_array created. - - :param: name: Name of the new variable. - :param: alias: Alias of the new variable - :param: subvariables: A list of existing Dataset variables aliases - to move into the new variable as subvariables .i.e; - subvariables = ['var1_alias', 'var2_alias'] - :param: description: A description of the new variable - :param: notes: Notes to attach to the new variable - """ - payload = { - 'name': name, - 'alias': alias, - 'description': description, - 'notes': notes, - 'type': 'categorical_array', - 'subvariables': [self[v].url for v in subvariables] - } - self.resource.variables.create(shoji_entity_wrapper(payload)) - self._reload_variables() - return self[alias] - - def move_to_multiple_response( - self, name, alias, subvariables, description='', notes=''): - """ - This method is a replication of the method move_to_categorical_array, - only this time we are creting a multiple_response variable. - Note: the subvariables need to have at least 1 selected catagory. - """ - payload = { - 'name': name, - 'alias': alias, - 'description': description, - 'notes': notes, - 'type': 'multiple_response', - 'subvariables': [self[v].url for v in subvariables] - } - self.resource.variables.create(shoji_entity_wrapper(payload)) - self._reload_variables() - return self[alias] - - def move_as_subvariable(self, destination, source): - """ - Moves a variable as a subvariable of an existing array - type variable. - - :param: destination: The alias of the variable that will receive the subvariable - :param: source: Alias of the variable to move into destination as subvariable - """ - payload = json.dumps({"element": "shoji:catalog", "index": {self[source].url: {}}}) - self[destination].resource.subvariables.patch(payload) + def __init__(self, resource): + LOG.warning("""MutableDataset is deprecated, instead use now + Dataset with it's corresponding get_dataset() method""") # noqa: E501 + super(Dataset, self).__init__(resource) diff --git a/scrunch/order.py b/scrunch/order.py index 77d49ddc..e00b795d 100644 --- a/scrunch/order.py +++ b/scrunch/order.py @@ -93,7 +93,7 @@ def __init__(self, obj, order, parent=None): # TODO unreached code elif isinstance(element, scrunch.datasets.Variable): self.elements[element.alias] = element - elif isinstance(element, scrunch.datasets.BaseDataset): + elif isinstance(element, scrunch.datasets.Dataset): self.elements[element.id] = element else: raise TypeError('Invalid OrderObject %s' % element) @@ -106,7 +106,7 @@ def _get_elements(group): elements.append({key: _get_elements(obj)}) # TODO unreached code elif isinstance(obj, (scrunch.datasets.Variable, - scrunch.datasets.BaseDataset)): + scrunch.datasets.Dataset)): elements.append(obj.name) else: elements.append(obj.name) @@ -479,7 +479,7 @@ def place(self, entity, path, position=-1, before=None, after=None): target_group = self.group[str(path)] if isinstance(entity, scrunch.datasets.Variable): element = entity.alias - elif isinstance(entity, scrunch.datasets.BaseDataset): + elif isinstance(entity, scrunch.datasets.Dataset): element = entity.id else: raise TypeError('entity must be a `Variable` or `Dataset`') diff --git a/scrunch/streaming_dataset.py b/scrunch/streaming_dataset.py index 60533827..7b688487 100644 --- a/scrunch/streaming_dataset.py +++ b/scrunch/streaming_dataset.py @@ -1,54 +1,26 @@ -from pycrunch.importing import Importer -from scrunch.datasets import BaseDataset, _get_dataset -from scrunch.exceptions import InvalidDatasetTypeError -from scrunch.helpers import shoji_entity_wrapper +from scrunch.datasets import LOG, Dataset, _get_dataset def get_streaming_dataset(dataset, connection=None, editor=False, project=None): """ A simple wrapper of _get_dataset with streaming=True """ + LOG.warning("""StreamingDataset is deprecated, instead use now + Dataset with it's corresponding get_dataset() method""") # noqa: E501 shoji_ds, root = _get_dataset(dataset, connection, editor, project) - # make sure the Dataset is of type streaming != "streaming" - if shoji_ds['body'].get('streaming') != 'streaming': - raise InvalidDatasetTypeError("Dataset %s is of type 'mutable',\ - use get_mutable_dataset method instead" % dataset) - ds = StreamingDataset(shoji_ds) + ds = Dataset(shoji_ds) if editor is True: ds.change_editor(root.session.email) return ds -class StreamingDataset(BaseDataset): +class StreamingDataset(Dataset): """ A Crunch entity that represents Datasets that are currently of the "streaming" class """ - def stream_rows(self, columns): - """ - Receives a dict with columns of values to add and streams them - into the dataset. Client must call .push_rows(n) later or wait until - Crunch automatically processes the batch. - - Returns the total of rows streamed - """ - importer = Importer() - count = len(list(columns.values())[0]) - for x in range(count): - importer.stream_rows(self.resource, - {a: columns[a][x] for a in columns}) - return count - - def push_rows(self, count=None): - """ - Batches in the rows that have been recently streamed. This forces - the rows to appear in the dataset instead of waiting for crunch - automatic batcher process. - """ - if bool(self.resource.stream.body.pending_messages): - self.resource.batches.create( - shoji_entity_wrapper({ - 'stream': count, - 'type': 'ldjson'} - )) + def __init__(self, resource): + LOG.warning("""StreamingDataset is deprecated, instead use now + Dataset with it's corresponding get_dataset() method""") # noqa: E501 + super(Dataset, self).__init__(resource) diff --git a/scrunch/tests/test_cubes.py b/scrunch/tests/test_cubes.py index 791eadb2..028ad9c0 100644 --- a/scrunch/tests/test_cubes.py +++ b/scrunch/tests/test_cubes.py @@ -3,10 +3,9 @@ from unittest import TestCase from pycrunch.cubes import fetch_cube, count -from scrunch.streaming_dataset import StreamingDataset from scrunch.tests.test_datasets import TestDatasetBase from scrunch.cubes import crtabs, variable_to_url -from scrunch.datasets import Variable +from scrunch.datasets import Variable, Dataset class TestCubes(TestDatasetBase, TestCase): @@ -17,7 +16,7 @@ def test_crtabs_passes_string_arguments(self, mock_fetch_cube): Test url aliases are converted to urls """ ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) variables = ['var1_alias', 'var2_alias'] urls = [variable_to_url(var, ds) for var in variables] crtabs(dataset=ds, variables=variables) @@ -30,7 +29,7 @@ def test_weight_to_url(self, mock_fetch_cube): Test weight alias is converted to url """ ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) variables = ['var1_alias', 'var2_alias'] weight_url = variable_to_url('var3_alias', ds) urls = [variable_to_url(var, ds) for var in variables] @@ -41,7 +40,7 @@ def test_weight_to_url(self, mock_fetch_cube): @patch('scrunch.cubes.fetch_cube') def test_pass_filter_expression(self, mock_fetch_cube): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) variables = ['var1_alias', 'var2_alias'] urls = [variable_to_url(var, ds) for var in variables] crtabs(dataset=ds, variables=variables, filter='var1_alias > 1') diff --git a/scrunch/tests/test_datasets.py b/scrunch/tests/test_datasets.py index ce58a573..790e9524 100644 --- a/scrunch/tests/test_datasets.py +++ b/scrunch/tests/test_datasets.py @@ -27,10 +27,8 @@ from pycrunch.variables import cast import scrunch -from scrunch.datasets import Variable, BaseDataset, Project +from scrunch.datasets import Variable, Dataset, Project from scrunch.subentity import Filter, Multitable, Deck -from scrunch.mutable_dataset import MutableDataset -from scrunch.streaming_dataset import StreamingDataset from scrunch.tests.test_categories import EditableMock, TEST_CATEGORIES from .mock_session import MockSession @@ -86,7 +84,7 @@ class TestDatasetBase(object): 'notes': '', 'description': '', 'is_published': False, - 'streaming': '', + 'streaming': 'no', 'archived': False, 'end_date': None, 'start_date': None, @@ -214,7 +212,7 @@ class TestDatasets(TestDatasetBase, TestCase): def test_edit_dataset(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) assert ds.name == 'test_dataset_name' changes = dict(name='changed') @@ -262,13 +260,13 @@ def process_expr_side_effect(self, expr, ds): return expr @pytest.mark.skipif(pandas is None, reason='pandas is not installed') - @mock.patch('scrunch.streaming_dataset.StreamingDataset.push_rows') + @mock.patch('scrunch.streaming_dataset.Dataset.push_rows') @mock.patch('pycrunch.importing.Importer.stream_rows') def test_replace_from_csv(self, mocked_stream_rows, mocked_push_rows): ds_shoji = copy.deepcopy(self.ds_shoji) ds_shoji['body']['streaming'] = 'negative' ds_mock = self._dataset_mock(ds_shoji=ds_shoji) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) assert ds.resource.body.get('streaming') == 'negative' file = StringIO() file.write("id, age\n1, 15") @@ -296,7 +294,7 @@ def test_replace_values(self, mocked_process): } } ds_mock = self._dataset_mock(variables=variables) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) ds.resource = mock.MagicMock() ds.replace_values({'birthyr': 9, 'level': 8}) call = json.loads(ds.resource.table.post.call_args[0][0]) @@ -324,7 +322,7 @@ def test_create_numeric(self, mocked_process): } ds_mock = self._dataset_mock(variables=variables) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) ds.resource = mock.MagicMock() ds.create_numeric( alias='monthly_rent', @@ -371,7 +369,7 @@ def test_rollup(self, mocked_process): }, } ds_mock = self._dataset_mock(variables=variables) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) ds.resource = mock.MagicMock() ds.rollup('datetime_var', 'new_rolledup_var', 'new_rolledup_var', 'Y') ds.resource.variables.create.assert_called_with( @@ -395,7 +393,7 @@ def test_rollup(self, mocked_process): def test_create_crunchbox_full(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) call_params = dict( title='my title', @@ -455,7 +453,7 @@ def _session_get(*args): ds_mock = self._dataset_mock() # we need to include weight in preferences to test defaults mock_ds_preferences(ds_mock) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) expected_payload = { 'element': 'shoji:entity', @@ -481,10 +479,11 @@ def test_derive_weight(self): 'alias': 'foo', 'name': 'bar', 'type': 'numeric', + 'derived': False, } } ds_mock = self._dataset_mock(variables=variables) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) targets = [ { @@ -560,7 +559,7 @@ def test_apply_exclusion(self): apply an exclusion filter to a dataset. """ ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) var = ds['var1_alias'] # Action! @@ -590,7 +589,7 @@ def test_remove_exclusion(self): clear (i.e. remove) the exclusion filter from a dataset. """ ds_res = MagicMock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) ds.exclude() ds.resource.session.patch.assert_called_once_with( @@ -605,7 +604,7 @@ def _exclude_payload(self, ds, expr): def test_gt(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias > 5') @@ -622,7 +621,7 @@ def test_gt(self): def test_in(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias in [32766]') @@ -640,7 +639,7 @@ def test_in(self): def test_in_multiple(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias in (32766, 32767)') @@ -662,18 +661,20 @@ def test_not_and(self): id='0001', alias='disposition', name='Disposition', - type='numeric' + type='numeric', + derived=False, ), '0002': dict( id='0002', alias='exit_status', name='Exit', - type='numeric' + type='numeric', + derived=False, ) } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var1 = ds['disposition'] var2 = ds['exit_status'] @@ -720,7 +721,7 @@ def test_not_and(self): def test_any(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias.any([32766])') @@ -744,7 +745,7 @@ def test_any(self): def test_not_any(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'not var1_alias.any([32766])') @@ -773,7 +774,7 @@ def test_not_any(self): def test_any_multiple(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias.any([32766, 32767])') @@ -798,7 +799,7 @@ def test_any_multiple(self): def test_all(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias.all([32767])') @@ -820,7 +821,7 @@ def test_all(self): def test_not_all(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'not var1_alias.all([32767])') @@ -849,7 +850,7 @@ def test_not_all(self): def test_all_or_all(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias.all([1]) or var1_alias.all([2])') @@ -891,7 +892,7 @@ def test_all_or_all(self): def test_not_all_or_all(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'not(var1_alias.all([1]) or var1_alias.all([2]))') @@ -938,7 +939,7 @@ def test_not_all_or_all(self): def test_duplicates(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias.duplicates()') @@ -957,7 +958,7 @@ def test_duplicates(self): def test_valid(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'valid(var1_alias)') @@ -976,7 +977,7 @@ def test_valid(self): def test_not_valid(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'not valid(var1_alias)') @@ -1000,7 +1001,7 @@ def test_not_valid(self): def test_missing(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'missing(var1_alias)') @@ -1019,7 +1020,7 @@ def test_missing(self): def test_not_missing(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'not missing(var1_alias)') @@ -1043,7 +1044,7 @@ def test_not_missing(self): def test_equal(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] data = self._exclude_payload(ds, 'var1_alias == 1') @@ -1069,17 +1070,19 @@ def test_nested(self): id='0001', alias='disposition', name='Disposition', - type='numeric' + type='numeric', + derived=False, ), '0002': dict( id='0002', alias='exit_status', name='Exit', - type='numeric' + type='numeric', + derived=False, ) } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var1 = ds['disposition'] var2 = ds['exit_status'] @@ -1200,7 +1203,7 @@ def test_nested(self): def test_dict_expr(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) expr = { "args": [ @@ -1223,7 +1226,7 @@ class TestProtectAttributes(TestDatasetBase, TestCase): def test_Dataset_attribute_writes(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) assert ds.name == 'test_dataset_name' with pytest.raises(AttributeError) as excinfo: @@ -1263,7 +1266,7 @@ def test_Dataset_attribute_writes(self): def test_Variable_attribute_writes(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] with pytest.raises(AttributeError) as excinfo: @@ -1295,7 +1298,7 @@ def test_Variable_attribute_writes(self): class TestVariables(TestDatasetBase, TestCase): def test_variable_as_member(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) assert ds.name == self.ds_shoji['body']['name'] assert ds.id == self.ds_shoji['body']['id'] @@ -1309,7 +1312,7 @@ def test_variable_as_member(self): with pytest.raises(AttributeError) as err: ds.some_variable assert str(err.value) == \ - "'StreamingDataset' object has no attribute 'some_variable'" + "'Dataset' object has no attribute 'some_variable'" def test_variable_cast(self): variable = MagicMock() @@ -1331,7 +1334,7 @@ def test_variable_cast(self): def test_edit_Variables(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] assert var.name == 'var1_name' @@ -1366,10 +1369,7 @@ def test_edit_Variables(self): def test_edit_alias(self): ds_mock = self._dataset_mock() - ds = BaseDataset(ds_mock) - var = ds['var1_alias'] - with pytest.raises(AttributeError) as e: - var.edit(alias='test1') + ds = Dataset(ds_mock) ds.resource.body['streaming'] = 'no' var = ds['var1_alias'] var.edit(alias='test1') @@ -1390,7 +1390,7 @@ def test_edit_resolution(self): }, } ds_mock = self._dataset_mock(variables=variables) - ds = MutableDataset(ds_mock) + ds = Dataset(ds_mock) ds.resource = mock.MagicMock() var = ds['datetime_var'] updated_var = var.edit_resolution('M') @@ -1405,7 +1405,7 @@ def test_edit_resolution(self): def test_add_category(self): ds_mock = self._dataset_mock() - ds = BaseDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var4_alias'] var.resource.body['type'] = 'categorical' var.resource.body['categories'] = [ @@ -1441,7 +1441,7 @@ def getitem(key): def test_update_missing_rules(self): ds_mock = self._dataset_mock() - ds = BaseDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var1_alias'] assert var.name == 'var1_name' @@ -1491,7 +1491,7 @@ def _get(*args, **kwargs): ds_res = MagicMock(session=sess) ds_res.self = self.ds_url ds_res.patch = MagicMock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) ds.change_editor('jane.doe@crunch.io') ds_res.patch.assert_called_with({ @@ -1506,7 +1506,7 @@ class TestCurrentOwner(TestDatasetBase, TestCase): def test_change_owner_exception(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) with pytest.raises(AttributeError) as e: ds.change_owner(user=self.user_url, project=self.project_url) assert e.message == "Must provide user or project. Not both" @@ -1518,7 +1518,7 @@ def test_change_owner(self, mocked_get_user): user.url = self.user_url mocked_get_user.return_value = user ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) ds.change_owner(user=user) ds_mock.patch.assert_called_with({ 'owner': self.user_url @@ -1558,7 +1558,7 @@ def test_move_to_project(self): self._load_dataset(dataset_resource) project = self._project(session, project_url) - dataset = StreamingDataset(dataset_resource) + dataset = Dataset(dataset_resource) dataset.move(project) project.move_here.assert_called_once_with([dataset]) @@ -1579,7 +1579,7 @@ def test_owner_to_project(self): self._load_dataset(dataset_resource) project = self._project(session, project_url) - dataset = StreamingDataset(dataset_resource) + dataset = Dataset(dataset_resource) with mock.patch("scrunch.datasets.warn") as mock_warn: dataset.change_owner(project=project) @@ -1613,7 +1613,7 @@ def test_dataset_project(self): } } session.add_fixture(project_url, project_payload) - dataset = StreamingDataset(dataset_resource) + dataset = Dataset(dataset_resource) project = dataset.project assert project.name == "My Project" assert project.url == project_url @@ -1627,7 +1627,7 @@ def test_cast_not_allowed(self): sess = MagicMock() ds_res = MagicMock(session=sess) ds_res.views.cast = MagicMock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) with pytest.raises(AssertionError) as excinfo: ds.cast('var_a', 'not_allowed') ds_res.resource.session.post.assert_called_with( @@ -1643,7 +1643,7 @@ def test_create_savepoint(self): sess = MagicMock() ds_res = MagicMock(session=sess) ds_res.savepoints = MagicMock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) ds.create_savepoint('savepoint description') ds_res.savepoints.create.assert_called_with({ 'element': 'shoji:entity', @@ -1661,7 +1661,7 @@ def test_create_savepoint_keyerror(self): 'description': 'savepoint description' } } - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) with pytest.raises(KeyError): ds.create_savepoint('savepoint description') @@ -1674,7 +1674,7 @@ def test_load_initial_savepoint(self): 'description': 'savepoint description' } } - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) with pytest.raises(KeyError): ds.create_savepoint('savepoint description') @@ -1683,7 +1683,7 @@ def test_load_empty_savepoint(self): ds_res = MagicMock(session=sess) ds_res.savepoints = MagicMock() ds_res.savepoints.index = {} - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) with pytest.raises(KeyError): ds.load_savepoint('savepoint') @@ -1722,9 +1722,9 @@ def _get(*args, **kwargs): ds_res = MagicMock(session=sess, body=body) ds_res.forks = MagicMock() ds_res.forks.index = {} - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) forked_ds = ds.fork(preserve_owner=False) - assert isinstance(forked_ds, MutableDataset) + assert isinstance(forked_ds, Dataset) ds_res.forks.create.assert_called_with({ 'element': 'shoji:entity', 'body': { @@ -1763,7 +1763,7 @@ def _get(*args, **kwargs): ds_res = MagicMock(session=sess, body=body) ds_res.forks = MagicMock() ds_res.forks.index = {} - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) ds.fork(preserve_owner=True) ds_res.forks.create.assert_called_with({ 'element': 'shoji:entity', @@ -1788,7 +1788,7 @@ def test_delete_forks(self): 'abc3': f3 } - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) ds.delete_forks() assert f1.entity.delete.call_count == 1 @@ -1815,7 +1815,7 @@ def test_forks_dataframe(self): 'abc1': f1 } - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) df = ds.forks_dataframe() assert isinstance(df, DataFrame) keys = [k for k in df.keys()] @@ -1832,7 +1832,7 @@ def test_forks_dataframe_empty(self): ds_res.forks = MagicMock() ds_res.forks.index = {} - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) df = ds.forks_dataframe() assert df is None @@ -1845,7 +1845,7 @@ def test_forks_no_pandas(self): ds_res.forks = MagicMock() ds_res.forks.index = {} - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) with pytest.raises(ImportError) as err: ds.forks_dataframe() @@ -1877,7 +1877,7 @@ def test_merge_fork(self): } fork_url = 'http://test.crunch.io/api/datasets/123/actions/' ds_res.actions.self = fork_url - ds = BaseDataset(ds_res) + ds = Dataset(ds_res) expected_call = { 'element': 'shoji:entity', @@ -1984,7 +1984,7 @@ def test_recode_single_categorical(self): }, } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) responses = [ {'id': 1, 'name': 'Facebook', 'case': 'var_a > 5'}, @@ -2106,7 +2106,7 @@ def test_recode_multiple_response(self): }, } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) responses = [ {'id': 1, 'name': 'Facebook', 'case': 'var_a > 5'}, @@ -2240,7 +2240,7 @@ def test_recode_multiple_response(self): def test_create_categorical_missing_case_dups(self): ds_mock = self._dataset_mock() - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) kwargs = { 'name': 'my mr', 'alias': 'mr', @@ -2277,7 +2277,7 @@ def test_create_categorical_else_case(self): } } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) kwargs = { 'name': 'Age Range', 'alias': 'agerange', @@ -2410,7 +2410,7 @@ def test_create_2_multiple_response_else_case(self): } } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) with pytest.raises(ValueError) as err: ds.create_categorical( categories=[ @@ -2574,7 +2574,7 @@ def test_create_3_multiple_response_else_case(self): } } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) with pytest.raises(ValueError) as err: ds.create_categorical( categories = [ @@ -2961,7 +2961,7 @@ def test_create_categorical_missing_case(self): } } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) kwargs = { 'name': 'age2', 'alias': 'age2', @@ -3229,7 +3229,7 @@ def test_derive_multiple_response(self): } } ds_mock = self._dataset_mock(variables=variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) kwargs = { 'name': 'my mr', 'alias': 'mr', @@ -3366,7 +3366,7 @@ def getitem(key): return False var_res.__getitem__.side_effect = getitem var_res.entity.self = '/variable/url/' - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) var = Variable(var_res, ds_res) ds.copy_variable(var, name='copy', alias='copy') ds_res.variables.create.assert_called_with({ @@ -3408,7 +3408,7 @@ def getitem(key): }] }} var_res.entity.self = '/variable/url/' - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) var = Variable(var_res, ds_res) ds.copy_variable(var, name='copy', alias='copy') ds_res.variables.create.assert_called_with({ @@ -3509,7 +3509,7 @@ def setUp(self): _datasets[ds.name] = ds # we only need one Dataset to move around - self.ds = StreamingDataset(_datasets['12345']) + self.ds = Dataset(_datasets['12345']) datasets = AttributeDict() datasets.by = MagicMock(return_value=_datasets) @@ -4041,7 +4041,7 @@ def setUp(self): ds_resource = MagicMock() ds_resource.self = self.ds_url ds_resource.variables = variables - self.ds = BaseDataset(ds_resource) + self.ds = Dataset(ds_resource) def test_order_property_is_loaded_correctly(self): ds = self.ds @@ -5625,7 +5625,7 @@ def _session_get(*args): ds_resource.self = self.ds_url ds_resource.fragments.settings = '%ssettings/' % self.ds_url ds_resource.session.get.side_effect = _session_get - self.ds = StreamingDataset(ds_resource) + self.ds = Dataset(ds_resource) def test_settings_are_displayed_as_dict_obj(self): ds = self.ds @@ -5732,7 +5732,7 @@ def setUp(self): left_ds_res = MagicMock() left_ds_res.self = self.left_ds_url left_ds_res.variables.by.return_value = left_variable - self.left_ds = MutableDataset(left_ds_res) + self.left_ds = Dataset(left_ds_res) # setup for right dataset _right_var_mock = self._variable_mock(self.right_ds_url, var) @@ -5741,7 +5741,7 @@ def setUp(self): right_ds_res = MagicMock() right_ds_res.self = self.right_ds_url right_ds_res.variables.by.return_value = right_variable - self.right_ds = MutableDataset(right_ds_res) + self.right_ds = Dataset(right_ds_res) def test_dataset_joins(self): left_ds = self.left_ds @@ -5779,7 +5779,7 @@ class TestDatasetExport(TestCase): def setUp(self): ds_resource = mock.MagicMock() ds_resource.self = self.ds_url - self.ds = StreamingDataset(ds_resource) + self.ds = Dataset(ds_resource) def test_basic_csv_export(self, export_ds_mock, dl_file_mock): ds = self.ds @@ -5916,12 +5916,12 @@ class TestVariableIterator(TestDatasetBase): def test_ds_keys(self): ds_mock = self._dataset_mock(variables=self.variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) assert isinstance(ds.keys(), list) def test_ds_values(self): ds_mock = self._dataset_mock(variables=self.variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) assert isinstance(ds.values(), list) def test_subvar_order(self): @@ -5994,10 +5994,10 @@ class TestFilter(TestDatasetBase, TestCase): } } - @mock.patch('scrunch.streaming_dataset.StreamingDataset.filters') + @mock.patch('scrunch.streaming_dataset.Dataset.filters') def test_add_filter(self, filters): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) var = ds['var1_alias'] ds.add_filter(name='filter', expr='var1_alias != 0') @@ -6045,10 +6045,10 @@ class TestDeck(TestDatasetBase, TestCase): } } - @mock.patch('scrunch.streaming_dataset.StreamingDataset.decks') + @mock.patch('scrunch.streaming_dataset.Dataset.decks') def test_add_deck(self, decks): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) ds.add_deck(name='mydeck', description='description') @@ -6064,7 +6064,7 @@ def test_add_deck(self, decks): def test_deck_accessor(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) deck = EditableMock(entity=self._deck) Deck(deck) @@ -6105,10 +6105,10 @@ class TestMultitable(TestDatasetBase, TestCase): } } - @mock.patch('scrunch.streaming_dataset.StreamingDataset.multitables') + @mock.patch('scrunch.streaming_dataset.Dataset.multitables') def test_add_multitable(self, multitables): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) ds.create_multitable(name='mymulti', template=['var1_alias']) expected_payload = { 'element': 'shoji:entity', @@ -6128,14 +6128,14 @@ def test_add_multitable(self, multitables): def test_multitable_accessor(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) mt = EditableMock(entity=self._multitable) Multitable(mt, ds) assert type(ds.multitables) == dict def test_edit_multitable(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) mt = EditableMock(entity=self._multitable) mockmulti = Multitable(mt, ds) with pytest.raises(AttributeError): @@ -6144,17 +6144,17 @@ def test_edit_multitable(self): def test_multitable_class(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) mt = MagicMock(entity=self._multitable) mockmulti = Multitable(mt, ds) assert mockmulti def test_multitable_import(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) mt = EditableMock(entity=self._multitable) mockmulti = Multitable(mt, ds) - ds_2 = StreamingDataset(ds_res) + ds_2 = Dataset(ds_res) expected_payload = { 'element': 'shoji:entity', 'body': { @@ -6168,7 +6168,7 @@ def test_multitable_import(self): def test_export_tabbook(self): ds_res = self._dataset_mock() - ds = StreamingDataset(ds_res) + ds = Dataset(ds_res) mt = EditableMock(entity=self._multitable) mockmulti = Multitable(mt, ds) expected_payload = { @@ -6231,9 +6231,9 @@ class TestMutableMixin(TestDatasetBase): def test_compare_datasets(self): ds_a_mock = self._dataset_mock(variables=self.variables) - ds_a = MutableDataset(ds_a_mock) + ds_a = Dataset(ds_a_mock) ds_b_mock = self._dataset_mock(variables=self.variables_b) - ds_b = MutableDataset(ds_b_mock) + ds_b = Dataset(ds_b_mock) diff = ds_b.compare_dataset(ds_a) expected_diff = { 'variables': { @@ -6248,9 +6248,9 @@ def test_compare_datasets(self): def test_append_dataset(self): ds_a_mock = self._dataset_mock(variables=self.variables) - ds_a = MutableDataset(ds_a_mock) + ds_a = Dataset(ds_a_mock) ds_b_mock = self._dataset_mock(variables=self.variables_b) - ds_b = MutableDataset(ds_b_mock) + ds_b = Dataset(ds_b_mock) with pytest.raises(ValueError) as e: ds_b.append_dataset(ds_a) @@ -6285,7 +6285,7 @@ class TestHeadingSubtotals(TestDatasetBase): def test_categories_as_int(self): ds_mock = self._dataset_mock(variables=self.variables) - ds = StreamingDataset(ds_mock) + ds = Dataset(ds_mock) var = ds['var_a'] expected_payload = { diff --git a/scrunch/tests/test_folders.py b/scrunch/tests/test_folders.py index 09a14e33..f23915dc 100644 --- a/scrunch/tests/test_folders.py +++ b/scrunch/tests/test_folders.py @@ -1,6 +1,6 @@ from mock import MagicMock from scrunch.folders import Folder -from scrunch.mutable_dataset import MutableDataset +from scrunch.datasets import Dataset from pycrunch.shoji import Entity, Catalog from .test_datasets import AttributeDict @@ -87,7 +87,7 @@ def test_unique_folders(): session.add_fixture(hidden_url, hidden_resource) session.add_fixture(secure_url, secure_resource) session.add_fixture(trash_url, trash_resource) - dataset = MutableDataset(dataset_resource) + dataset = Dataset(dataset_resource) assert dataset.folders.root.name == "Root" assert dataset.folders.hidden.name == "Hidden" @@ -146,7 +146,7 @@ def test_unique_folders_no_secure(): session.add_fixture(folders_url, folders_resource) session.add_fixture(hidden_url, hidden_resource) session.add_fixture(trash_url, trash_resource) - dataset = MutableDataset(dataset_resource) + dataset = Dataset(dataset_resource) assert dataset.folders.root.name == "Root" assert dataset.folders.hidden.name == "Hidden" @@ -190,7 +190,7 @@ def test_unique_folders_no_hidden(): } }) session.add_fixture(folders_url, folders_resource) - dataset = MutableDataset(dataset_resource) + dataset = Dataset(dataset_resource) assert dataset.folders.root.name == "Root" assert not hasattr(dataset.folders, "secure") diff --git a/scrunch/tests/test_recodes.py b/scrunch/tests/test_recodes.py index 90455e7f..3176cdbd 100644 --- a/scrunch/tests/test_recodes.py +++ b/scrunch/tests/test_recodes.py @@ -2,8 +2,7 @@ from unittest import TestCase import pytest -from scrunch.datasets import Variable -from scrunch.mutable_dataset import MutableDataset +from scrunch.datasets import Variable, Dataset from scrunch.variables import responses_from_map from scrunch.helpers import subvar_alias @@ -141,7 +140,7 @@ def test_combine_categories_unknown_alias(self): 'test': entity_mock } resource.variables.index = {} # Var not present - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_categorical('unknown', CATEGORY_MAP, CATEGORY_NAMES, name='name', alias='alias') @@ -156,7 +155,7 @@ def test_combine_categories_from_alias(self): 'test': entity_mock, } resource.variables.index = {} - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_categorical('test', CATEGORY_MAP, CATEGORY_NAMES, name='name', alias='alias') ds.resource.variables.create.assert_called_with(RECODES_PAYLOAD) @@ -177,7 +176,7 @@ def test_combine_categories_from_entity(self): tuple_mock.entity.self = var_url entity = Variable(tuple_mock, resource) - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_categorical(entity, CATEGORY_MAP, CATEGORY_NAMES, name='name', alias='alias') ds.resource.variables.create.assert_called_with(RECODES_PAYLOAD) @@ -207,7 +206,7 @@ def test_combine_responses_unknown_alias(self): 'test': entity_mock } - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_multiple_response('test', RESPONSE_MAP, RESPONSE_NAMES, name='name', alias='alias') @@ -237,7 +236,7 @@ def test_combine_responses_by_alias(self): } # make the actual response call - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_multiple_response('test', RESPONSE_MAP, RESPONSE_NAMES, name='name', alias='alias') resource.variables.create.assert_called_with(COMBINE_RESPONSES_PAYLOAD) @@ -270,7 +269,7 @@ def test_combine_responses_by_entity(self): 'test': entity_mock } - ds = MutableDataset(resource) + ds = Dataset(resource) with pytest.raises(ValueError) as err: ds.combine_multiple_response(entity_mock, RESPONSE_MAP, RESPONSE_NAMES, name='name', alias='alias') @@ -338,7 +337,7 @@ def test_recode_categoricals(self, get_dataset_mock): ds_res = mock.MagicMock() ds_res.self = dataset_url ds_res.follow.return_value = table_mock - dataset = MutableDataset(ds_res) + dataset = Dataset(ds_res) dataset.create_categorical([ {'id': 1, 'name': 'Straight', 'case': 'sexuality.any([1])'}, {'id': 2, 'name': 'LGBTQ+', 'case': 'sexuality.any([2, 3, 4, 5])'} @@ -468,7 +467,7 @@ def test_recode_multiple_responses(self, get_dataset_mock): ds_res = mock.MagicMock() ds_res.self = dataset_url ds_res.follow.return_value = table_mock - dataset = MutableDataset(ds_res) + dataset = Dataset(ds_res) subvar_mock = mock.MagicMock() subvar_mock.self = var_url subvar_mock.id = 'subvar' diff --git a/scrunch/tests/test_utilities.py b/scrunch/tests/test_utilities.py index 918cc57f..06a82f7a 100644 --- a/scrunch/tests/test_utilities.py +++ b/scrunch/tests/test_utilities.py @@ -7,8 +7,7 @@ import scrunch from scrunch.variables import validate_variable_url from scrunch import get_project, get_mutable_dataset, get_user -from scrunch.datasets import Project, User -from scrunch.mutable_dataset import MutableDataset +from scrunch.datasets import Project, User, Dataset from scrunch.order import Path from scrunch.exceptions import InvalidPathError @@ -163,7 +162,7 @@ def _get(url, *args, **kwargs): "name": "dataset_name" }) - assert isinstance(ds, MutableDataset) + assert isinstance(ds, Dataset) assert ds.name == 'dataset_name' assert ds.url == dataset_url @@ -195,7 +194,7 @@ def _get(*args, **kwargs): ds = get_mutable_dataset(dataset_id) session.session.get.assert_called_with('https://test.crunch.io/api/b2c4c6b7d3a94e58937b23c1fed1b65e/') - assert isinstance(ds, MutableDataset) + assert isinstance(ds, Dataset) assert ds.name == 'dataset_name' assert ds.id == dataset_id