diff --git a/docs/index.rst b/docs/index.rst index b83b4b0..8da4b06 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -36,6 +36,7 @@ Getting started dataset.md dataframe.md field.md + view.md .. toctree:: :maxdepth: 1 diff --git a/docs/view.md b/docs/view.md new file mode 100644 index 0000000..6d8fe2b --- /dev/null +++ b/docs/view.md @@ -0,0 +1,48 @@ +# View +## What is a view +A view is a special field that has a ‘source_field’ in it’s hdf5 attributes. In this case, the view will initialize the dataset from the destination specified by the ‘source_field’ rather than in it’s own hdf5 storage. + + +The benefit of using a view is to reduce disk IO during apply_index and apply_filter operations. The index or filter will be stored in the dataframe, the view can read from the source field and combing the index or filter to achieve the filtering operation without writing an extra copy of data. + +Specifically, the dataframe._filters_grp is the hdf5 group where the index or filters are stored. The boolean filter will be transferred as an integer index during apply_filter function. The index filter is stored as a NumericField in the dataframe._add_view function. + +![View Structure](view_arch.png) + +## How to generate a view + +The user can either 1) call apply_filter or apply_index to generate views with index filters, 2) or simply call dataframe.view() to generate views without index filters. The internal function is dataframe._add_view() which take care the construction. Please note the view can only be created from the field that co-exist in the same dataset/file, as associate view with field from a different file will bring lots of uncertainty. + +The view is only a special instance of field, so that the construction is similar: 1) call the field.base_view_constructor to setup the h5group and, 2) call the specific field type constructor to initialize the field instance. However, there is also a special action, that is 3) attach the view to the source field so that the source field can notify the view if the underly data is changed. These three actions can be seen from in the upper part from dataframe._add_view(). + +You can tell if a field is a view by field.is_view(), this method will check the ‘source_field’ attributes in the field’s hdf5 group. If this attribute is present, then this field is a view. + +## Fetch data from a view +As a view is just like a field to users, you can still use field.data[:] to fetch the data from the view. In the field implementation, the member ‘data’ is a FieldArray. Hence, the difference of a view and field is during the initialization of the FieldArray. Normally, FieldArray will load hdf5 dataset of the current hdf5 group (where field is stored); however in case of a view, the FieldArray will load data from the hdf5 group specified by the ‘source_field’ attribute. + +Also in the case of where there is a filter/index for this view (field.filter is not None), the FieldArray will fetch the filter/index first and mask the underlying data first. These can be found on FieldArray.__getitem__ or IndexFieldArray.__getitem__ for indexed string. + +## Life-cycle of a view +### A view from a field +Step0: you have a field in the dataframe, and called dataframe.apply_filter(), apply_index() or view() + +Step1: the view will be created and attach to the source field. The attach method is in the field, but call in dataframe._add_view. + +Step2: When the view.data is called, the view will initialize a FieldArray that point to the soure field rather than it’s own dataset. + +Step3: When the field.data.write or field.data.clear is called, means the data will be modified, the data.write or data.clear will call field.update() to notify the field of the action. And then the field will pass the notification to the views in field.notify(). Once received the notification, the view.update() can perform certain actions. + +Step4: At the moment, the view.update() will copy the original data to it’s own dataset, , re-initilize the data interface and delete the ‘source_field’ attribute (so that it’s not a view anymore). + +![View Structure](view_life.png) + +### An existing view +As the view is stored in the hdf5, the view relationship can be presistenced over sessions. Upon loading a dataset (in dataset.__init__), the dataset will check if there is a view and call dataframe._bind_view() to attach the view to the field during initialization of dataset/dataframe/fields. This is why the view can only be created from a field that co-exist in the same dataset (hdf5 file). + + +## Future works +### Data fetching performance +Different ways of getting data out from the HDF5 can vary the performance a lot. For example, it's generally better to get the data out of HDF5 by chunk rather than indexes. In the current implementation (fieldarray.__getitem__), we mask the index filter with item first, then fetch the data out from hdf5. As hdf5 doesn't support un-ordered data access, we sort the mask and convert them back when return the data. Further work can be done on how to arrange the order of index filter and item (specified by the user through __getitem__). For example, with large volume of data and small set of index filter, it might make sense to mask the filter first. However in the case of large filters, it will be faster to load the data into memory first. Where is the boundary worth investigating. + +### Dependency between views +In the current implementation, the views all dependent on the source field. In the case of changing the data in the field, all the attached views will copy the data over and write it's own copy. This is not efficient with a number of views attached. One better way could be only one of the view to copy the data over and become the source field of the rest views. This needs a detailed design and implementation in fields.update(). diff --git a/docs/view_arch.png b/docs/view_arch.png new file mode 100644 index 0000000..7f36f47 Binary files /dev/null and b/docs/view_arch.png differ diff --git a/docs/view_life.png b/docs/view_life.png new file mode 100644 index 0000000..6183b03 Binary files /dev/null and b/docs/view_life.png differ diff --git a/exetera/core/abstract_types.py b/exetera/core/abstract_types.py index 48bede9..556ef91 100644 --- a/exetera/core/abstract_types.py +++ b/exetera/core/abstract_types.py @@ -65,6 +65,11 @@ def indexed(self): def data(self): raise NotImplementedError() + @property + @abstractmethod + def filter(self): + raise NotImplementedError() + @abstractmethod def __bool__(self): raise NotImplementedError() @@ -491,3 +496,39 @@ def ordered_merge_right(self, right_on, left_on, left_field_sources=tuple(), right_field_sinks=None, right_to_left_map=None, right_unique=False, left_unique=False): raise NotImplementedError() + + +class SubjectObserver(ABC): + def attach(self, observer): + """ + Attach the observer (view) to the subject (field). + """ + raise NotImplementedError() + + def detach(self, observer): + """ + Detach the observer (view) from the subject (field), this is to remove the association between observer with subject. + This method id called by the observer. + """ + raise NotImplementedError() + + def notify_deletion(self, observer=None): + """ + Delete the observer from the subject, but called from the subject side. + """ + raise NotImplementedError() + + def notify(self, msg=None): + """ + Called by the Subject to notify the observer on something. + """ + raise NotImplementedError() + + def update(self, subject, msg=None): + """ + Called inside the observer, to perform actions based on subject and message type. + """ + raise NotImplementedError() + + + diff --git a/exetera/core/dataframe.py b/exetera/core/dataframe.py index a99af88..f5e3af2 100644 --- a/exetera/core/dataframe.py +++ b/exetera/core/dataframe.py @@ -17,6 +17,7 @@ from exetera.core import fields as fld from exetera.core import operations as ops from exetera.core import validation as val +from exetera.core.utils import INT64_INDEX_LENGTH import h5py import csv as csvlib @@ -58,10 +59,16 @@ def __init__(self, self.name = name self._columns = OrderedDict() self._dataset = dataset - self._h5group = h5group + self._h5group = h5group # the HDF5 group to store all fields for subg in h5group.keys(): - self._columns[subg] = dataset.session.get(h5group[subg]) + if subg[0] != '_': # stores metadata, for example filters + self._columns[subg] = dataset.session.get(h5group[subg]) + + if '_filters' not in h5group.keys(): + self._filters_grp = self._h5group.create_group('_filters') + else: + self._filters_grp = h5group['_filters'] @property def columns(self): @@ -101,6 +108,57 @@ def add(self, nfield.data.write(field.data[:]) self._columns[dname] = nfield + def _add_view(self, field: fld.Field, filter: np.ndarray = None): + """ + Internal function called by apply_filter to add a field view into the dataframe. + + :param field: The field to apply filter to. + :param filter: The filter to apply. + :return: The field view. + + """ + # add view + h5group = fld.base_view_contructor(field._session, self, field) + view = type(field)(field._session, h5group, self, write_enabled=True) + field.attach(view) + self._columns[view.name] = view + + # add filter + if filter is not None: + nformat = 'int32' + if len(filter) > 0 and np.max(filter) >= INT64_INDEX_LENGTH: + nformat = 'int64' + filter_name = view.name + if filter_name not in self._filters_grp.keys(): + fld.numeric_field_constructor(self._dataset.session, self._filters_grp, filter_name, nformat) + filter_field = fld.NumericField(self._dataset.session, self._filters_grp[filter_name], self, + write_enabled=True) + filter_field.data.write(filter) + else: + filter_field = fld.NumericField(self._dataset.session, self._filters_grp[filter_name], self, + write_enabled=True) + if nformat not in filter_field._fieldtype: + filter_field = filter_field.astype(nformat) + filter_field.data.clear() + filter_field.data.write(filter) + + view._filter_index_wrapper = fld.ReadOnlyFieldArray(filter_field, 'values') # read-only + + return self._columns[view.name] + + def _bind_view(self, view: fld.Field, source_field: fld.Field): + """ + Binding view is when the view (reference field) is already set, but has not attach to the original field yet, for + instance during the initializing of an existing dataset/dataframe. + :param view: The view field. + :param source_field: The original field. + """ + source_field.attach(view) + if view.name in self._filters_grp.keys(): + filter_field = fld.NumericField(self._dataset.session, self._filters_grp[view.name], self, + write_enabled=True) + view._filter_index_wrapper = fld.ReadOnlyFieldArray(filter_field, 'values') # read-only + def drop(self, name: str): """ @@ -108,8 +166,9 @@ def drop(self, :param name: name of field to be dropped """ - del self._columns[name] - del self._h5group[name] + del self._columns[name] # should always be + if name in self._h5group.keys(): # in case of reference only + del self._h5group[name] def create_group(self, name: str): @@ -317,8 +376,10 @@ def __delitem__(self, name): if not self.__contains__(name=name): raise ValueError("There is no field named '{}' in this dataframe".format(name)) else: - del self._h5group[name] - del self._columns[name] + del self._columns[name] # should always be + if name in self._h5group.keys(): # in case of reference only + del self._h5group[name] + def delete_field(self, field): """ @@ -478,18 +539,23 @@ def apply_filter(self, filter_to_apply, ddf=None): :returns: a dataframe contains all the fields filterd, self if ddf is not set """ filter_to_apply_ = val.validate_filter(filter_to_apply) - - if ddf is not None: - if not isinstance(ddf, DataFrame): - raise TypeError("The destination object must be an instance of DataFrame.") + ddf = self if ddf is None else ddf + if not isinstance(ddf, DataFrame): + raise TypeError("The destination object must be an instance of DataFrame.") + if ddf == self: + for field in self._columns.values(): + field.apply_filter(filter_to_apply_, in_place=True) + elif ddf.dataset == self.dataset: # another df in the same ds, create view + filter_to_apply_ = filter_to_apply_.nonzero()[0] + for name, field in self._columns.items(): + if name in ddf: + del ddf[name] + ddf._add_view(field, filter_to_apply_) + else: # another df in different ds, do hard copy for name, field in self._columns.items(): newfld = field.create_like(ddf, name) field.apply_filter(filter_to_apply_, target=newfld) - return ddf - else: - for field in self._columns.values(): - field.apply_filter(filter_to_apply_, in_place=True) - return self + return ddf def apply_index(self, index_to_apply, ddf=None): """ @@ -514,20 +580,23 @@ def apply_index(self, index_to_apply, ddf=None): :param ddf: optional- the destination data frame :returns: a dataframe contains all the fields re-indexed, self if ddf is not set """ - if ddf is not None: - if not isinstance(ddf, DataFrame): - raise TypeError("The destination object must be an instance of DataFrame.") + ddf = self if ddf is None else ddf + if not isinstance(ddf, DataFrame): + raise TypeError("The destination object must be an instance of DataFrame.") + if ddf == self: # in_place + val.validate_all_field_length_in_df(self) + for field in self._columns.values(): + field.apply_index(index_to_apply, in_place=True) + elif ddf.dataset == self.dataset: # view + for name, field in self._columns.items(): + if name in ddf: + del ddf[name] + ddf._add_view(field, index_to_apply) + else: # hard copy for name, field in self._columns.items(): newfld = field.create_like(ddf, name) field.apply_index(index_to_apply, target=newfld) - return ddf - else: - val.validate_all_field_length_in_df(self) - - for field in self._columns.values(): - field.apply_index(index_to_apply, in_place=True) - return self - + return ddf def sort_values(self, by: Union[str, List[str]], ddf: DataFrame = None, axis=0, ascending=True, kind='stable'): """ @@ -981,6 +1050,17 @@ def describe(self, include=None, exclude=None, output='terminal'): print('\n') return result + def view(self): + """ + Create a view of this dataframe. + """ + view_name = '_' + self.name + '_view' + if view_name in self.dataset: + self.dataset.drop(view_name) + dfv = self.dataset.create_dataframe(view_name) + for f in self.columns.values(): + dfv._add_view(f) + return dfv class HDF5DataFrameGroupBy(DataFrameGroupBy): @@ -1656,4 +1736,4 @@ def _ordered_merge(left: DataFrame, if right[k].indexed: ops.ordered_map_valid_indexed_stream(right[k], right_map, dest_f, invalid) else: - ops.ordered_map_valid_stream(right[k], right_map, dest_f, invalid) \ No newline at end of file + ops.ordered_map_valid_stream(right[k], right_map, dest_f, invalid) diff --git a/exetera/core/dataset.py b/exetera/core/dataset.py index b09d0fd..905373c 100644 --- a/exetera/core/dataset.py +++ b/exetera/core/dataset.py @@ -48,11 +48,20 @@ def __init__(self, session, dataset_path, mode, name): self._file = h5py.File(dataset_path, mode) self._dataframes = dict() + #initilize the dataframe and fields for group in self._file.keys(): if group not in ('trash',): h5group = self._file[group] dataframe = edf.HDF5DataFrame(self, group, h5group=h5group) self._dataframes[group] = dataframe + # bind the views + for df in self._dataframes.values(): + for field in df.columns.values(): + if field.is_view(): + source_name = field._field.attrs['source_field'] + idx = source_name.rfind('/') + source_field = self._dataframes[source_name[1:idx]][source_name[idx+1:]] + df._bind_view(field, source_field) @property def session(self): diff --git a/exetera/core/fields.py b/exetera/core/fields.py index 0448091..3a94192 100644 --- a/exetera/core/fields.py +++ b/exetera/core/fields.py @@ -16,10 +16,11 @@ import numpy as np import h5py -from exetera.core.abstract_types import Field +from exetera.core.abstract_types import Field, SubjectObserver from exetera.core.data_writer import DataWriter from exetera.core import operations as ops from exetera.core import validation as val +from exetera.core import utils def isin(field:Field, test_elements:Union[list, set, np.ndarray]): @@ -39,7 +40,7 @@ def isin(field:Field, test_elements:Union[list, set, np.ndarray]): return ret -class HDF5Field(Field): +class HDF5Field(Field, SubjectObserver): def __init__(self, session, group, dataframe, write_enabled=False): super().__init__() @@ -55,6 +56,10 @@ def __init__(self, session, group, dataframe, write_enabled=False): self._value_wrapper = None self._valid_reference = True + self._filter_index_wrapper = None + self._view_refs = list() + + @property def valid(self): """ @@ -113,6 +118,13 @@ def indexed(self): self._ensure_valid() return False + @property + def filter(self): + if self._filter_index_wrapper is None: # poential returns: raise error or return a full-index array + return None + else: + return self._filter_index_wrapper + def __bool__(self): # this method is required to prevent __len__ being called on derived methods when fields are queried as # if f: @@ -143,6 +155,11 @@ def _ensure_valid(self): if not self._valid_reference: raise ValueError("This field no longer refers to a valid underlying field object") + def is_view(self): + """ + Return if the dataframe's name matches the field h5group path; if not, means this field is a view. + """ + return 'source_field' in self._field.attrs class MemoryField(Field): @@ -203,6 +220,10 @@ def indexed(self): """ return False + @property + def filter(self): + return None + def __bool__(self): # this method is required to prevent __len__ being called on derived methods when fields are queried as # if f: @@ -225,9 +246,14 @@ def apply_index(self, index_to_apply, dstfld=None): class ReadOnlyFieldArray: def __init__(self, field, dataset_name): - self._field = field + self._field_instance = field + if 'source_field' in field._field.attrs: # is a view + data_h5group = field._field.file.get(field._field.attrs['source_field']) + else: + data_h5group = field._field + self._field = data_h5group # HDF5 group instance self._name = dataset_name - self._dataset = field[dataset_name] + self._dataset = self._field[dataset_name] def __len__(self): return len(self._dataset) @@ -240,7 +266,14 @@ def dtype(self): return self._dataset.dtype def __getitem__(self, item): - return self._dataset[item] + if self._field_instance.filter is not None and not isinstance(self._field_instance, IndexedStringField): + mask = self._field_instance.filter[item] + if utils.is_sorted(mask): + return self._dataset[mask] + else: + return self._dataset[np.sort(mask)][mask] + else: + return self._dataset[item] def __setitem__(self, key, value): raise PermissionError("This field was created read-only; call .writeable() " @@ -280,9 +313,14 @@ def complete(self): class WriteableFieldArray: def __init__(self, field, dataset_name): - self._field = field + self._field_instance = field + if 'source_field' in field._field.attrs: # is a view + data_h5group = field._field.file.get(field._field.attrs['source_field']) + else: + data_h5group = field._field + self._field = data_h5group # HDF5 group instance self._name = dataset_name - self._dataset = field[dataset_name] + self._dataset = self._field[dataset_name] def __len__(self): """ @@ -300,7 +338,14 @@ def dtype(self): return self._dataset.dtype def __getitem__(self, item): - return self._dataset[item] + if self._field_instance.filter is not None and not isinstance(self._field_instance, IndexedStringField): + mask = self._field_instance.filter[item] + if utils.is_sorted(mask): + return self._dataset[mask] + else: + return self._dataset[np.sort(mask)][mask] + else: + return self._dataset[item] def __setitem__(self, key, value): self._dataset[key] = value @@ -310,6 +355,8 @@ def clear(self): Replaces current dataset with empty dataset. :return: None """ + self._field_instance.update(self, msg=WriteableFieldArray.clear.__name__) + nformat = self._dataset.dtype DataWriter._clear_dataset(self._field, self._name) DataWriter.write(self._field, self._name, [], 0, nformat) @@ -340,6 +387,8 @@ def write(self, part): :param part: numpy array to write to field :return: None """ + self._field_instance.update(self, msg=WriteableFieldArray.write.__name__) + if isinstance(part, Field): part = part.data[:] DataWriter.write(self._field, self._name, part, len(part), dtype=self._dataset.dtype) @@ -456,16 +505,17 @@ def complete(self): class ReadOnlyIndexedFieldArray: - def __init__(self, field, indices, values): + def __init__(self, chunksize, indices, values, field): """ :param field: Field to use :param indices: Indices for numpy array :param values: Values for numpy array :return: None """ - self._field = field + self._chunksize = chunksize self._indices = indices self._values = values + self._field_instance = field def __len__(self): """ @@ -495,27 +545,55 @@ def __getitem__(self, item): :return: Item value from dataset """ if isinstance(item, slice): - start = item.start if item.start is not None else 0 - stop = item.stop if item.stop is not None else len(self._indices) - 1 - step = item.step - # TODO: validate slice - index = self._indices[start:stop + 1] - bytestr = self._values[index[0]:index[-1]] - results = [None] * (len(index) - 1) - startindex = self._indices[start] - for ir in range(len(results)): - results[ir] = \ - bytestr[index[ir] - np.int64(startindex): - index[ir + 1] - np.int64(startindex)].tobytes().decode() - return results + if self._field_instance.filter is None: # This field is not a view so no filtered_index to deal with + start = item.start if item.start is not None else 0 + stop = item.stop if item.stop is not None else len(self._indices) - 1 + step = item.step + # TODO: validate slice + index = self._indices[start:stop + 1] + bytestr = self._values[index[0]:index[-1]] + results = [None] * (len(index) - 1) + startindex = self._indices[start] + for ir in range(len(results)): + results[ir] = \ + bytestr[index[ir] - np.int64(startindex): + index[ir + 1] - np.int64(startindex)].tobytes().decode() + return results + else: + mask = self._field_instance.filter[item] + if utils.is_sorted(mask): + index_s = self._indices[mask] + index_e = self._indices[mask + 1] + results = [None] * len(mask) + for ir in range(len(results)): + results[ir] = self._values[index_s[ir]: index_e[ir]].tobytes().decode() + else: + s_mask = np.sort(mask) + #orignal_order = np.argsort(mask) + index_s = self._indices[s_mask][mask] + index_e = self._indices[s_mask + 1][mask] + results = [None] * len(s_mask) + for ir in range(len(results)): + results[ir] = self._values[index_s[ir]: index_e[ir]].tobytes().decode() + return results + elif isinstance(item, int): - if item >= len(self._indices) - 1: - raise ValueError(f"Index is out of range, item ({item}) >= len(self._indices) - 1 ({len(self._indices) - 1})") - start, stop = self._indices[item:item + 2] - if start == stop: - return '' - value = self._values[start:stop].tobytes().decode() - return value + if self._field_instance.filter is None: + if item >= len(self._indices) - 1: + raise ValueError("index is out of range") + start, stop = self._indices[item:item + 2] + if start == stop: + return '' + value = self._values[start:stop].tobytes().decode() + return value + else: + if item >= len(self._field_instance.filter) - 1: + raise ValueError("index is out of range") + mask = self._field_instance.filter[item] + index_s = self._indices[mask] + index_e = self._indices[mask + 1] + results = self._values[index_s: index_e].tobytes().decode() + return results def __setitem__(self, key, value): raise PermissionError("This field was created read-only; call .writeable() " @@ -551,7 +629,7 @@ def complete(self): class WriteableIndexedFieldArray: - def __init__(self, chunksize, indices, values): + def __init__(self, chunksize, indices, values, field): """ :param: chunksize: Size of each chunk :param indices: Numpy array of indices @@ -569,6 +647,10 @@ def __init__(self, chunksize, indices, values): self._index_index = 0 self._value_index = 0 + self._field_instance = field + + + def __len__(self): """ Length of field @@ -595,32 +677,57 @@ def __getitem__(self, item): :return: Item value from dataset """ if isinstance(item, slice): - start = item.start if item.start is not None else 0 - stop = item.stop if item.stop is not None else len(self._indices) - 1 - step = item.step - # TODO: validate slice - - index = self._indices[start:stop + 1] - if len(index) == 0: - return [] - bytestr = self._values[index[0]:index[-1]] - results = [None] * (len(index) - 1) - startindex = self._indices[start] - rmax = min(len(results), stop - start) - for ir in range(rmax): - rbytes = bytestr[index[ir] - np.int64(startindex): - index[ir + 1] - np.int64(startindex)].tobytes() - rstr = rbytes.decode() - results[ir] = rstr - return results + if self._field_instance.filter is None: + start = item.start if item.start is not None else 0 + stop = item.stop if item.stop is not None else len(self._indices) - 1 + if stop <= 0: # empty field + return [] + step = item.step + # TODO: validate slice + index = self._indices[start:stop + 1] + bytestr = self._values[index[0]:index[-1]] + results = [None] * (len(index) - 1) + startindex = self._indices[start] + for ir in range(len(results)): + results[ir] = \ + bytestr[index[ir] - np.int64(startindex): + index[ir + 1] - np.int64(startindex)].tobytes().decode() + return results + else: + mask = self._field_instance.filter[item] + if utils.is_sorted(mask): + index_s = self._indices[mask] + index_e = self._indices[mask + 1] + results = [None] * len(mask) + for ir in range(len(results)): + results[ir] = self._values[index_s[ir]: index_e[ir]].tobytes().decode() + else: + s_mask = np.sort(mask) + #orignal_order = np.argsort(mask) + index_s = self._indices[s_mask][mask] + index_e = self._indices[s_mask + 1][mask] + results = [None] * len(s_mask) + for ir in range(len(results)): + results[ir] = self._values[index_s[ir]: index_e[ir]].tobytes().decode() + return results + elif isinstance(item, int): - if item >= len(self._indices) - 1: - raise ValueError(f"Index is out of range, item ({item}) >= len(self._indices) - 1 ({len(self._indices) - 1})") - start, stop = self._indices[item:item + 2] - if start == stop: - return '' - value = self._values[start:stop].tobytes().decode() - return value + if self._field_instance.filter is None: + if item >= len(self._indices) - 1: + raise ValueError("index is out of range") + start, stop = self._indices[item:item + 2] + if start == stop: + return '' + value = self._values[start:stop].tobytes().decode() + return value + else: + if item >= len(self._field_instance.filter) - 1: + raise ValueError("index is out of range") + mask = self._field_instance.filter[item] + index_s = self._indices[mask] + index_e = self._indices[mask + 1] + results = self._values[index_s: index_e].tobytes().decode() + return results def __setitem__(self, key, value): raise PermissionError("IndexedStringField instances cannot be edited via array syntax;" @@ -747,7 +854,7 @@ def data(self): :return: WriteableIndexedFieldArray """ if self._data_wrapper is None: - self._data_wrapper = WriteableIndexedFieldArray(self._chunksize, self.indices, self.values) + self._data_wrapper = WriteableIndexedFieldArray(self._chunksize, self.indices, self.values, self) return self._data_wrapper def is_sorted(self): @@ -2050,6 +2157,22 @@ def timestamp_field_constructor(session, group, name, timestamp=None, chunksize= DataWriter.write(field, 'values', [], 0, 'float64') +def base_view_contructor(session, group, source): + """ + Constructor are for setup the hdf5 group that going to be a container for a view (rather than a field). + :param session: The ExeTera session. + :param group: The dataframe to locate this view. + :param source: The source field to copy the attributes. + :return: The h5group where this view is created. + """ + if source.name in group: + msg = "Field '{}' already exists in group '{}'" + raise ValueError(msg.format(source.name, group)) + field = source.create_like(group, source.name) # copy other attributes + field._field.attrs['source_field'] = source._field.name + return field._field + + # HDF5 fields # =========== @@ -2058,7 +2181,7 @@ class IndexedStringField(HDF5Field): def __init__(self, session, group, dataframe, write_enabled=False): super().__init__(session, group, dataframe, write_enabled=write_enabled) self._session = session - self._dataframe = None + self._dataframe = dataframe self._data_wrapper = None self._index_wrapper = None self._value_wrapper = None @@ -2104,9 +2227,53 @@ def data(self): if self._data_wrapper is None: wrapper = \ WriteableIndexedFieldArray if self._write_enabled else ReadOnlyIndexedFieldArray - self._data_wrapper = wrapper(self.chunksize, self.indices, self.values) + self._data_wrapper = wrapper(self.chunksize, self.indices, self.values, self) return self._data_wrapper + def attach(self, view): + self._view_refs.append(view) + + def detach(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify_deletion(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify(self, msg=None): + for view in self._view_refs: + view.update(self, msg) + + def update(self, subject, msg=None): + if isinstance(subject, (WriteableFieldArray, WriteableIndexedFieldArray)): + """ + This field is being notified by its own field array + It needs to notify other fields that it is about to change before the change goes ahead + """ + self.notify(msg) + self.notify_deletion() + + if isinstance(subject, HDF5Field): + """ + This field is being notified by the field that owns the data that it has a view of + At present, the behavior is that it copies the data and then detaches from the view that notified it, as it + no longer has an observation relationship with that field + """ + if msg == 'write' or msg == 'clear': + if self.is_view(): + field_data = self.data[:] + del self._field.attrs['source_field'] # del view attr + self._index_wrapper = None + self._value_wrapper = None + self._data_wrapper = None # re-init the field array + self._filter_index_wrapper = None # reset the filter + self.data.write(field_data) + def is_sorted(self): """ Returns if data in field is sorted @@ -2134,7 +2301,7 @@ def indices(self): self._ensure_valid() if self._index_wrapper is None: wrapper = WriteableFieldArray if self._write_enabled else ReadOnlyFieldArray - self._index_wrapper = wrapper(self._field, 'index') + self._index_wrapper = wrapper(self, 'index') return self._index_wrapper @property @@ -2145,7 +2312,7 @@ def values(self): self._ensure_valid() if self._value_wrapper is None: wrapper = WriteableFieldArray if self._write_enabled else ReadOnlyFieldArray - self._value_wrapper = wrapper(self._field, 'values') + self._value_wrapper = wrapper(self, 'values') return self._value_wrapper def __len__(self): @@ -2375,11 +2542,53 @@ def data(self): self._ensure_valid() if self._value_wrapper is None: if self._write_enabled: - self._value_wrapper = WriteableFieldArray(self._field, 'values') + self._value_wrapper = WriteableFieldArray(self, 'values') else: - self._value_wrapper = ReadOnlyFieldArray(self._field, 'values') + self._value_wrapper = ReadOnlyFieldArray(self, 'values') return self._value_wrapper + def attach(self, view): + self._view_refs.append(view) + + def detach(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify_deletion(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify(self, msg=None): + for view in self._view_refs: + view.update(self, msg) + + def update(self, subject, msg=None): + if isinstance(subject, (WriteableFieldArray, WriteableIndexedFieldArray)): + """ + This field is being notified by its own field array + It needs to notify other fields that it is about to change before the change goes ahead + """ + self.notify(msg) + self.notify_deletion() + + if isinstance(subject, HDF5Field): + """ + This field is being notified by the field that owns the data that it has a view of + At present, the behavior is that it copies the data and then detaches from the view that notified it, as it + no longer has an observation relationship with that field + """ + if msg == 'write' or msg == 'clear': + if self.is_view(): + field_data = self.data[:] + del self._field.attrs['source_field'] # del view attr + self._value_wrapper = None # re-init the field array + self._filter_index_wrapper = None # reset the filter + self.data.write(field_data) + def is_sorted(self): """ Returns if data in field is sorted @@ -2582,11 +2791,54 @@ def data(self): self._ensure_valid() if self._value_wrapper is None: if self._write_enabled: - self._value_wrapper = WriteableFieldArray(self._field, 'values') + self._value_wrapper = WriteableFieldArray(self, 'values') else: - self._value_wrapper = ReadOnlyFieldArray(self._field, 'values') + self._value_wrapper = ReadOnlyFieldArray(self, 'values') return self._value_wrapper + def attach(self, view): + self._view_refs.append(view) + + def detach(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify_deletion(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify(self, msg=None): + for view in self._view_refs: + view.update(self, msg) + + def update(self, subject, msg=None): + if isinstance(subject, (WriteableFieldArray, WriteableIndexedFieldArray)): + """ + This field is being notified by its own field array + It needs to notify other fields that it is about to change before the change goes ahead + """ + self.notify(msg) + self.notify_deletion() + + if isinstance(subject, HDF5Field): + """ + This field is being notified by the field that owns the data that it has a view of + At present, the behavior is that it copies the data and then detaches from the view that notified it, as it + no longer has an observation relationship with that field + """ + if msg == 'write' or msg == 'clear': + if self.is_view(): + field_data = self.data[:] + del self._field.attrs['source_field'] # del view attr + self._value_wrapper = None # re-init the field array + self._filter_index_wrapper = None # reset the filter + self.data.write(field_data) + + def is_sorted(self): """ Returns if data in field is sorted @@ -2922,11 +3174,53 @@ def data(self): self._ensure_valid() if self._value_wrapper is None: if self._write_enabled: - self._value_wrapper = WriteableFieldArray(self._field, 'values') + self._value_wrapper = WriteableFieldArray(self, 'values') else: - self._value_wrapper = ReadOnlyFieldArray(self._field, 'values') + self._value_wrapper = ReadOnlyFieldArray(self, 'values') return self._value_wrapper + def attach(self, view): + self._view_refs.append(view) + + def detach(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify_deletion(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify(self, msg=None): + for view in self._view_refs: + view.update(self, msg) + + def update(self, subject, msg=None): + if isinstance(subject, (WriteableFieldArray, WriteableIndexedFieldArray)): + """ + This field is being notified by its own field array + It needs to notify other fields that it is about to change before the change goes ahead + """ + self.notify(msg) + self.notify_deletion() + + if isinstance(subject, HDF5Field): + """ + This field is being notified by the field that owns the data that it has a view of + At present, the behavior is that it copies the data and then detaches from the view that notified it, as it + no longer has an observation relationship with that field + """ + if msg == 'write' or msg == 'clear': + if self.is_view(): + field_data = self.data[:] + del self._field.attrs['source_field'] # del view attr + self._value_wrapper = None # re-init the field array + self._filter_index_wrapper = None # reset the filter + self.data.write(field_data) + def is_sorted(self): """ Returns if data in field is sorted @@ -3243,11 +3537,53 @@ def data(self): self._ensure_valid() if self._value_wrapper is None: if self._write_enabled: - self._value_wrapper = WriteableFieldArray(self._field, 'values') + self._value_wrapper = WriteableFieldArray(self, 'values') else: - self._value_wrapper = ReadOnlyFieldArray(self._field, 'values') + self._value_wrapper = ReadOnlyFieldArray(self, 'values') return self._value_wrapper + def attach(self, view): + self._view_refs.append(view) + + def detach(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify_deletion(self, view=None): + if view is None: # detach all + self._view_refs.clear() + else: + self._view_refs.remove(view) + + def notify(self, msg=None): + for view in self._view_refs: + view.update(self, msg) + + def update(self, subject, msg=None): + if isinstance(subject, (WriteableFieldArray, WriteableIndexedFieldArray)): + """ + This field is being notified by its own field array + It needs to notify other fields that it is about to change before the change goes ahead + """ + self.notify(msg) + self.notify_deletion() + + if isinstance(subject, HDF5Field): + """ + This field is being notified by the field that owns the data that it has a view of + At present, the behavior is that it copies the data and then detaches from the view that notified it, as it + no longer has an observation relationship with that field + """ + if msg == 'write' or msg == 'clear': + if self.is_view(): + field_data = self.data[:] + del self._field.attrs['source_field'] # del view attr + self._value_wrapper = None # re-init the field array + self._filter_index_wrapper = None # reset the filter + self.data.write(field_data) + def is_sorted(self): """ Returns if data in field is sorted @@ -4151,7 +4487,6 @@ def timestamp_field_create_like(source, group, name, timestamp): else: return group.create_timestamp(name, ts) - @staticmethod def apply_isin(source: Field, test_elements: Union[list, set, np.ndarray]): """ diff --git a/exetera/core/utils.py b/exetera/core/utils.py index 8fb0a13..79c84eb 100644 --- a/exetera/core/utils.py +++ b/exetera/core/utils.py @@ -214,3 +214,10 @@ def guess_encoding(filename): else: return "utf-8" +def is_sorted(array): + """ + Check if an array is ordered. + """ + if len(array) < 2: + return True + return np.all(array[:-1] <= array[1:]) \ No newline at end of file diff --git a/tests/test_dataframe.py b/tests/test_dataframe.py index 2e785a9..2fbfe54 100644 --- a/tests/test_dataframe.py +++ b/tests/test_dataframe.py @@ -1,6 +1,7 @@ import pandas as pd from exetera.core.operations import INVALID_INDEX import unittest +from parameterized import parameterized from io import BytesIO import numpy as np import tempfile @@ -8,7 +9,7 @@ from exetera.core import session from exetera.core import dataframe - +from .utils import SessionTestCase, DEFAULT_FIELD_DATA class TestDataFrameCreateFields(unittest.TestCase): @@ -912,6 +913,7 @@ def test_sort_values_on_other_df(self): self.assertListEqual(list(val), df['val'].data[:].tolist()) self.assertListEqual(list(val2), df['val2'].data[:]) + self.assertListEqual([b'a', b'b', b'c', b'd', b'e'], ddf['idx'].data[:].tolist()) self.assertListEqual([10, 30, 50, 40, 20], ddf['val'].data[:].tolist()) self.assertListEqual(['a', 'bbb', 'ccccc', 'dddd', 'ee'], ddf['val2'].data[:]) @@ -1326,4 +1328,123 @@ def test_raise_errors(self): with self.assertRaises(Exception) as context: df.describe(exclude=['num', 'num2', 'ts1']) - self.assertTrue(isinstance(context.exception, ValueError)) \ No newline at end of file + self.assertTrue(isinstance(context.exception, ValueError)) + + +class TestDataFrameView(SessionTestCase): + + @parameterized.expand(DEFAULT_FIELD_DATA) + def test_get_view(self, creator, name, kwargs, data): + """ + Test dataframe.view, field.is_view, apply_filter, and apply_index + """ + f = self.setup_field(self.df, creator, name, (), kwargs, data) + if "nformat" in kwargs: + data = np.asarray(data, dtype=kwargs["nformat"]) + else: + data = np.asarray(data) + + view = self.df.view() + self.assertTrue(view[name].is_view()) + self.assertListEqual(data[:].tolist(), np.asarray(view[name].data[:]).tolist()) + + with self.subTest('All False:'): + df2 = self.ds.create_dataframe('df2') + d_filter = np.array([False]) + self.df.apply_filter(d_filter, df2) + self.assertTrue(df2[name].is_view()) + d_filter = np.nonzero(d_filter)[0] + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + + d_filter = np.array([True]*len(data)) + self.df.apply_filter(d_filter, df2) + d_filter = np.nonzero(d_filter)[0] + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + self.ds.drop('df2') + + with self.subTest('All True:'): + df2 = self.ds.create_dataframe('df2') + d_filter = np.array([True]*len(data)) + self.df.apply_filter(d_filter, df2) + self.assertTrue(df2[name].is_view()) + d_filter = np.nonzero(d_filter)[0] + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + + d_filter = np.array([np.random.random()>=0.5 for i in range(len(data))]) + self.df.apply_filter(d_filter, df2) + d_filter = np.nonzero(d_filter)[0] + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + self.ds.drop('df2') + + with self.subTest('Ramdon T/F'): + df2 = self.ds.create_dataframe('df2') + d_filter = np.array([np.random.random()>=0.5 for i in range(len(data))]) + self.df.apply_filter(d_filter, df2) + self.assertTrue(df2[name].is_view()) + d_filter = np.nonzero(d_filter)[0] + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + self.ds.drop('df2') + + with self.subTest('All Index:'): + df2 = self.ds.create_dataframe('df2') + d_filter = np.array([i for i in range(len(data))]) + self.df.apply_index(d_filter, df2) + self.assertTrue(df2[name].is_view()) + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + self.ds.drop('df2') + + with self.subTest('Random Index:'): + df2 = self.ds.create_dataframe('df2') + d_filter = [] + for i in range(len(data)): + if np.random.random() >= 0.5: + d_filter.append(i) + d_filter = np.array(d_filter) + self.df.apply_index(d_filter, df2) + self.assertTrue(df2[name].is_view()) + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + + d_filter = np.array([i for i in range(len(data))]) + self.df.apply_index(d_filter, df2) + self.assertTrue(df2[name].is_view()) + self.assertListEqual(data[d_filter].tolist(), np.asarray(df2[name].data[:]).tolist()) + self.ds.drop('df2') + + @parameterized.expand(DEFAULT_FIELD_DATA) + def test_concrete_field(self, creator, name, kwargs, data): + """ + Test field.attach, field.detach, field.notify, field.update + """ + f = self.setup_field(self.df, creator, name, (), kwargs, data) + if "nformat" in kwargs: + data = np.asarray(data, dtype=kwargs["nformat"]) + else: + data = np.asarray(data) + view = self.df.view() + self.assertTrue(view[name] in f._view_refs) # attached + f.data.clear() + self.assertListEqual([], np.asarray(f.data[:]).tolist()) + self.assertListEqual(data.tolist(), np.asarray(view[name].data[:]).tolist()) # notify and update + self.assertFalse(view[name] in f._view_refs) # detached + + @parameterized.expand(DEFAULT_FIELD_DATA) + def test_view_persistence(self, creator, name, kwargs, data): + """ + The view should be persistent over sessions. + """ + bio = BytesIO() + src = self.s.open_dataset(bio, 'w', 'src') + df = src.create_dataframe('df') + f = self.setup_field(df, creator, name, (), kwargs, data) + df2 = src.create_dataframe('df2') + d_filter = np.array([np.random.random()>=0.5 for i in range(len(data))]) + df.apply_filter(d_filter, df2) + self.assertTrue(df2[name].is_view()) + self.s.close() + + src = self.s.open_dataset(bio, 'r+', 'src') + df = src['df'] + df2 = src['df2'] + self.assertTrue(df2[name].is_view()) + self.assertTrue(df2[name] in df[name]._view_refs) +