From b2cff8a8f6de2550b37523cdb94745147b480b63 Mon Sep 17 00:00:00 2001 From: k1o0 Date: Thu, 10 Oct 2024 14:24:54 +0300 Subject: [PATCH] Resolves issue #134 (#136) * Resolves issue #134 --- CHANGELOG.md | 1 + docs/one_installation.md | 2 +- one/alf/cache.py | 30 ++++++++---- one/api.py | 62 +++++++++++++++--------- one/converters.py | 86 ++++++++++++++++++--------------- one/tests/alf/test_cache.py | 9 +--- one/tests/remote/test_globus.py | 2 +- one/tests/test_converters.py | 20 ++++++-- one/tests/test_one.py | 34 +++++++------ one/tests/util.py | 9 ++-- one/util.py | 14 +++--- 11 files changed, 160 insertions(+), 109 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a57eb883..d5ff3a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This version improves behaviour of loading revisions and loading datasets from l - bugfix of spurious error raised when loading dataset with a revision provided - default_revisions_only parameter in One.list_datasets filters non-default datasets - permit data frame input to One.load_datasets and load precise relative paths provided (instead of default revisions) +- redundent session_path column has been dropped from the datasets cache table ### Added diff --git a/docs/one_installation.md b/docs/one_installation.md index b4f8ba81..6296523e 100644 --- a/docs/one_installation.md +++ b/docs/one_installation.md @@ -61,7 +61,7 @@ one = ONE() To change your default database, or re-run the setup for a given database, you can use the following ```python -ONE._setup(base_url='https://test.alyx.internationalbrainlab.org', make_default=True) +ONE.setup(base_url='https://test.alyx.internationalbrainlab.org', make_default=True) ``` ## 4. Update diff --git a/one/alf/cache.py b/one/alf/cache.py index 4cfde509..8fccaa95 100644 --- a/one/alf/cache.py +++ b/one/alf/cache.py @@ -30,7 +30,7 @@ from one.alf.io import iter_sessions, iter_datasets from one.alf.files import session_path_parts, get_alf_path from one.converters import session_record2path -from one.util import QC_TYPE +from one.util import QC_TYPE, patch_cache __all__ = ['make_parquet_db', 'remove_missing_datasets', 'DATASETS_COLUMNS', 'SESSIONS_COLUMNS'] _logger = logging.getLogger(__name__) @@ -52,7 +52,6 @@ DATASETS_COLUMNS = ( 'id', # int64 'eid', # int64 - 'session_path', # relative to the root 'rel_path', # relative to the session path, includes the filename 'file_size', # file size in bytes 'hash', # sha1/md5, computed in load function @@ -89,7 +88,6 @@ def _get_dataset_info(full_ses_path, rel_dset_path, ses_eid=None, compute_hash=F return { 'id': Path(rel_ses_path, rel_dset_path).as_posix(), 'eid': str(ses_eid), - 'session_path': str(rel_ses_path), 'rel_path': Path(rel_dset_path).as_posix(), 'file_size': file_size, 'hash': md5(full_dset_path) if compute_hash else None, @@ -297,18 +295,30 @@ def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True, if tables is None: tables = {} for name in ('datasets', 'sessions'): - tables[name], _ = parquet.load(cache_dir / f'{name}.pqt') - to_delete = [] + table, m = parquet.load(cache_dir / f'{name}.pqt') + tables[name] = patch_cache(table, m.get('min_api_version'), name) + + INDEX_KEY = '.?id' + for name in tables: + # Set the appropriate index if none already set + if isinstance(tables[name].index, pd.RangeIndex): + idx_columns = sorted(tables[name].filter(regex=INDEX_KEY).columns) + tables[name].set_index(idx_columns, inplace=True) + + to_delete = set() gen_path = partial(session_record2path, root_dir=cache_dir) - sessions = sorted(map(lambda x: gen_path(x[1]), tables['sessions'].iterrows())) + # map of session path to eid + sessions = {gen_path(rec): eid for eid, rec in tables['sessions'].iterrows()} for session_path in iter_sessions(cache_dir): - rel_session_path = session_path.relative_to(cache_dir).as_posix() - datasets = tables['datasets'][tables['datasets']['session_path'] == rel_session_path] + try: + datasets = tables['datasets'].loc[sessions[session_path]] + except KeyError: + datasets = tables['datasets'].iloc[0:0, :] for dataset in iter_datasets(session_path): if dataset.as_posix() not in datasets['rel_path']: - to_delete.append(session_path.joinpath(dataset)) + to_delete.add(session_path.joinpath(dataset)) if session_path not in sessions and remove_empty_sessions: - to_delete.append(session_path) + to_delete.add(session_path) if dry: print('The following session and datasets would be removed:', end='\n\t') diff --git a/one/api.py b/one/api.py index 6663f6b5..63e1ec0c 100644 --- a/one/api.py +++ b/one/api.py @@ -131,7 +131,7 @@ def load_cache(self, tables_dir=None, **kwargs): # Set the appropriate index if none already set if isinstance(cache.index, pd.RangeIndex): - idx_columns = cache.filter(regex=INDEX_KEY).columns.tolist() + idx_columns = sorted(cache.filter(regex=INDEX_KEY).columns) if len(idx_columns) == 0: raise KeyError('Failed to set index') cache.set_index(idx_columns, inplace=True) @@ -552,9 +552,8 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha Given a set of datasets, check whether records correctly reflect the filesystem. Called by load methods, this returns a list of file paths to load and return. - TODO This needs changing; overload for downloading? - This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to - update and save tables. Download_datasets can also call this function. + This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to + update and save tables. Download_datasets may also call this function. Parameters ---------- @@ -575,12 +574,18 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha """ if isinstance(datasets, pd.Series): datasets = pd.DataFrame([datasets]) + assert datasets.index.nlevels <= 2 + idx_names = ['eid', 'id'] if datasets.index.nlevels == 2 else ['id'] + datasets.index.set_names(idx_names, inplace=True) elif not isinstance(datasets, pd.DataFrame): # Cast set of dicts (i.e. from REST datasets endpoint) datasets = util.datasets2records(list(datasets)) + else: + datasets = datasets.copy() indices_to_download = [] # indices of datasets that need (re)downloading files = [] # file path list to return # If the session_path field is missing from the datasets table, fetch from sessions table + # Typically only aggregate frames contain this column if 'session_path' not in datasets.columns: if 'eid' not in datasets.index.names: # Get slice of full frame with eid in index @@ -647,12 +652,12 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha if self.record_loaded: loaded = np.fromiter(map(bool, files), bool) - loaded_ids = np.array(datasets.index.to_list())[loaded] + loaded_ids = datasets.index.get_level_values('id')[loaded].to_numpy() if '_loaded_datasets' not in self._cache: self._cache['_loaded_datasets'] = np.unique(loaded_ids) else: loaded_set = np.hstack([self._cache['_loaded_datasets'], loaded_ids]) - self._cache['_loaded_datasets'] = np.unique(loaded_set, axis=0) + self._cache['_loaded_datasets'] = np.unique(loaded_set) # Return full list of file paths return files @@ -1013,6 +1018,9 @@ def load_object(self, # For those that don't exist, download them offline = None if query_type == 'auto' else self.mode == 'local' + if datasets.index.nlevels == 1: + # Reinstate eid index + datasets = pd.concat({str(eid): datasets}, names=['eid']) files = self._check_filesystem(datasets, offline=offline, check_hash=check_hash) files = [x for x in files if x] if not files: @@ -1117,6 +1125,9 @@ def load_dataset(self, wildcards=self.wildcards, assert_unique=assert_unique) if len(datasets) == 0: raise alferr.ALFObjectNotFound(f'Dataset "{dataset}" not found') + if datasets.index.nlevels == 1: + # Reinstate eid index + datasets = pd.concat({str(eid): datasets}, names=['eid']) # Check files exist / download remote files offline = None if query_type == 'auto' else self.mode == 'local' @@ -1288,6 +1299,9 @@ def _verify_specifiers(specifiers): for x, y, z in zip(datasets, collections, revisions)] present = [len(x) == 1 for x in slices] present_datasets = pd.concat(slices) + if present_datasets.index.nlevels == 1: + # Reinstate eid index + present_datasets = pd.concat({str(eid): present_datasets}, names=['eid']) # Check if user is blindly downloading all data and warn of non-default revisions if 'default_revision' in present_datasets and \ @@ -1326,7 +1340,7 @@ def _verify_specifiers(specifiers): # Make list of metadata Bunches out of the table records = (present_datasets - .reset_index(names='id') + .reset_index(names=['eid', 'id']) .to_dict('records', into=Bunch)) # Ensure result same length as input datasets list @@ -1459,6 +1473,9 @@ def load_collection(self, if len(datasets) == 0: raise alferr.ALFObjectNotFound(object or '') parts = [alfiles.rel_path_parts(x) for x in datasets.rel_path] + if datasets.index.nlevels == 1: + # Reinstate eid index + datasets = pd.concat({str(eid): datasets}, names=['eid']) # For those that don't exist, download them offline = None if query_type == 'auto' else self.mode == 'local' @@ -1868,8 +1885,7 @@ def list_aggregates(self, relation: str, identifier: str = None, all_aggregates = self.alyx.rest('datasets', 'list', django=query) records = (util.datasets2records(all_aggregates) .reset_index(level=0) - .drop('eid', axis=1) - .rename_axis(index={'id': 'did'})) + .drop('eid', axis=1)) # Since rel_path for public FI file records starts with 'public/aggregates' instead of just # 'aggregates', we should discard the file path parts before 'aggregates' (if present) records['rel_path'] = records['rel_path'].str.replace( @@ -1890,11 +1906,6 @@ def path2id(p) -> str: # NB: We avoid exact matches as most users will only include subject, not lab/subject records = records[records['identifier'].str.contains(identifier)] - # Add exists_aws field for download method - for i, rec in records.iterrows(): - fr = next(x['file_records'] for x in all_aggregates if x['url'].endswith(i)) - records.loc[i, 'exists_aws'] = any( - x['data_repository'].startswith('aws') and x['exists'] for x in fr) return util.filter_datasets(records, filename=dataset, revision=revision, wildcards=True, assert_unique=assert_unique) @@ -1950,6 +1961,7 @@ def load_aggregate(self, relation: str, identifier: str, raise alferr.ALFObjectNotFound( f'{dataset or "dataset"} not found for {relation}/{identifier}') # update_exists=False because these datasets are not in the cache table + records['session_path'] = '' # explicitly add session path column file, = self._check_filesystem(records, update_exists=False) if not file: raise alferr.ALFObjectNotFound('Dataset file not found on disk') @@ -2286,9 +2298,12 @@ def _download_datasets(self, dsets, **kwargs) -> List[Path]: try: if not isinstance(dsets, pd.DataFrame): raise TypeError('Input datasets must be a pandas data frame for AWS download.') - if 'exists_aws' in dsets and np.all(np.equal(dsets['exists_aws'].values, True)): - _logger.info('Downloading from AWS') - return self._download_aws(map(lambda x: x[1], dsets.iterrows()), **kwargs) + assert 'exists_aws' not in dsets or np.all(np.equal(dsets['exists_aws'].values, True)) + _logger.debug('Downloading from AWS') + files = self._download_aws(map(lambda x: x[1], dsets.iterrows()), **kwargs) + # Trigger fallback download of any files missing on AWS + assert all(files), f'{sum(map(bool, files))} datasets not found on AWS' + return files except Exception as ex: _logger.debug(ex) return self._download_dataset(dsets, **kwargs) @@ -2338,15 +2353,18 @@ def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[ # Fetch file record path record = next((x for x in record['file_records'] if x['data_repository'].startswith('aws') and x['exists']), None) - if not record and update_exists and 'exists_aws' in self._cache['datasets']: - _logger.debug('Updating exists field') - self._cache['datasets'].loc[(slice(None), uuid), 'exists_aws'] = False - self._cache['_meta']['modified_time'] = datetime.now() + if not record: + if update_exists and 'exists_aws' in self._cache['datasets']: + _logger.debug('Updating exists field') + self._cache['datasets'].loc[(slice(None), uuid), 'exists_aws'] = False + self._cache['_meta']['modified_time'] = datetime.now() out_files.append(None) continue + assert record['relative_path'].endswith(dset['rel_path']), \ + f'Relative path for dataset {uuid} does not match Alyx record' source_path = PurePosixPath(record['data_repository_path'], record['relative_path']) source_path = alfiles.add_uuid_string(source_path, uuid) - local_path = self.cache_dir.joinpath(dset['session_path'], dset['rel_path']) + local_path = self.cache_dir.joinpath(record['relative_path']) if keep_uuid is True or (keep_uuid is None and self.uuid_filenames is True): local_path = alfiles.add_uuid_string(local_path, uuid) local_path.parent.mkdir(exist_ok=True, parents=True) diff --git a/one/converters.py b/one/converters.py index dbfc8c43..20b80ea7 100644 --- a/one/converters.py +++ b/one/converters.py @@ -13,15 +13,14 @@ from uuid import UUID from inspect import unwrap from pathlib import Path, PurePosixPath -from urllib.parse import urlsplit from typing import Optional, Union, Mapping, List, Iterable as Iter import pandas as pd from iblutil.util import Bunch from one.alf.spec import is_session_path, is_uuid_string -from one.alf.files import get_session_path, add_uuid_string, session_path_parts, remove_uuid_string -from .util import Listable, ensure_list +from one.alf.files import get_session_path, add_uuid_string, session_path_parts, get_alf_path +from .util import Listable def recurse(func): @@ -232,39 +231,43 @@ def path2record(self, path) -> pd.Series: A cache file record """ is_session = is_session_path(path) - rec = self._cache['sessions' if is_session else 'datasets'] - if rec.empty: - return - # if (rec := self._cache['datasets']).empty: # py 3.8 - # return + if self._cache['sessions' if is_session else 'datasets'].empty: + return # short circuit: no records in the cache if is_session_path(path): lab, subject, date, number = session_path_parts(path) - rec = rec[ - (rec['lab'] == lab) & (rec['subject'] == subject) & - (rec['number'] == int(number)) & - (rec['date'] == datetime.date.fromisoformat(date)) + df = self._cache['sessions'] + rec = df[ + (df['lab'] == lab) & (df['subject'] == subject) & + (df['number'] == int(number)) & + (df['date'] == datetime.date.fromisoformat(date)) ] return None if rec.empty else rec.squeeze() - # Deal with file path - if isinstance(path, str) and path.startswith('http'): - # Remove the UUID from path - path = urlsplit(path).path.strip('/') - path = remove_uuid_string(PurePosixPath(path)) - session_path = get_session_path(path).as_posix() - else: - # No way of knowing root session path parts without cache tables - eid = self.path2eid(path) - session_series = self.list_datasets(eid, details=True).session_path - if not eid or session_series.empty: + # Deal with dataset path + if isinstance(path, str): + path = Path(path) + # If there's a UUID in the path, use that to fetch the record + name_parts = path.stem.split('.') + if is_uuid_string(uuid := name_parts[-1]): + try: + return self._cache['datasets'].loc[pd.IndexSlice[:, uuid], :].squeeze() + except KeyError: return - session_path, *_ = session_series - rec = rec[rec['session_path'] == session_path] - rec = rec[rec['rel_path'].apply(lambda x: path.as_posix().endswith(x))] + # Fetch via session record + eid = self.path2eid(path) + df = self.list_datasets(eid, details=True) + if not eid or df.empty: + return + + # Find row where relative path matches + rec = df[df['rel_path'] == path.relative_to(get_session_path(path)).as_posix()] assert len(rec) < 2, 'Multiple records found' - return None if rec.empty else rec.squeeze() + if rec.empty: + return None + # Convert slice to series and reinstate eid index if dropped + return rec.squeeze().rename(index=(eid, rec.index.get_level_values('id')[0])) @recurse def path2url(self, filepath): @@ -313,19 +316,18 @@ def record2url(self, record): session_spec = '{lab}/Subjects/{subject}/{date}/{number:03d}' url = record.get('session_path') or session_spec.format(**record) return webclient.rel_path2url(url) - uuid = ensure_list(record.name)[-1] # may be (eid, did) or simply did else: raise TypeError( f'record must be pandas.DataFrame or pandas.Series, got {type(record)} instead') - - session_path, rel_path = record[['session_path', 'rel_path']].to_numpy().flatten() - url = PurePosixPath(session_path, rel_path) + assert isinstance(record.name, tuple) and len(record.name) == 2 + eid, uuid = record.name # must be (eid, did) + session_path = self.eid2path(eid) + url = PurePosixPath(get_alf_path(session_path), record['rel_path']) return webclient.rel_path2url(add_uuid_string(url, uuid).as_posix()) def record2path(self, dataset) -> Optional[Path]: """ - Given a set of dataset records, checks the corresponding exists flag in the cache - correctly reflects the files system. + Given a set of dataset records, returns the corresponding paths Parameters ---------- @@ -337,13 +339,19 @@ def record2path(self, dataset) -> Optional[Path]: pathlib.Path File path for the record """ - assert isinstance(dataset, pd.Series) or len(dataset) == 1 - session_path, rel_path = dataset[['session_path', 'rel_path']].to_numpy().flatten() - file = Path(self.cache_dir, session_path, rel_path) + if isinstance(dataset, pd.DataFrame): + return [self.record2path(r) for _, r in dataset.iterrows()] + elif not isinstance(dataset, pd.Series): + raise TypeError( + f'record must be pandas.DataFrame or pandas.Series, got {type(dataset)} instead') + assert isinstance(dataset.name, tuple) and len(dataset.name) == 2 + eid, uuid = dataset.name # must be (eid, did) + if not (session_path := self.eid2path(eid)): + raise ValueError(f'Failed to determine session path for eid "{eid}"') + file = session_path / dataset['rel_path'] if self.uuid_filenames: - i = dataset.name if isinstance(dataset, pd.Series) else dataset.index[0] - file = add_uuid_string(file, i[1] if isinstance(i, tuple) else i) - return file # files[0] if len(datasets) == 1 else files + file = add_uuid_string(file, uuid) + return file @recurse def eid2ref(self, eid: Union[str, Iter], as_dict=True, parse=True) \ diff --git a/one/tests/alf/test_cache.py b/one/tests/alf/test_cache.py index f729a06a..05c894c3 100644 --- a/one/tests/alf/test_cache.py +++ b/one/tests/alf/test_cache.py @@ -81,7 +81,6 @@ def test_datasets_df(self): print('Datasets dataframe') print(df) dset_info = df.loc[0].to_dict() - self.assertEqual(dset_info['session_path'], self.rel_ses_path[:-1]) self.assertEqual(dset_info['rel_path'], self.rel_ses_files[0].as_posix()) self.assertTrue(dset_info['file_size'] > 0) self.assertFalse(df.rel_path.str.contains('invalid').any()) @@ -100,7 +99,6 @@ def tests_db(self): df_dsets, metadata2 = parquet.load(fn_dsets) self.assertEqual(metadata2, metadata_exp) dset_info = df_dsets.loc[0].to_dict() - self.assertEqual(dset_info['session_path'], self.rel_ses_path[:-1]) self.assertEqual(dset_info['rel_path'], self.rel_ses_files[0].as_posix()) # Check behaviour when no files found @@ -115,12 +113,9 @@ def tests_db(self): apt.make_parquet_db(self.tmpdir, hash_ids=False, lab='another') # Create some more datasets in a session folder outside of a lab directory - dsets = revisions_datasets_table() with tempfile.TemporaryDirectory() as tdir: - for session_path, rel_path in dsets[['session_path', 'rel_path']].values: - filepath = Path(tdir).joinpath(session_path, rel_path) - filepath.parent.mkdir(exist_ok=True, parents=True) - filepath.touch() + session_path = Path(tdir).joinpath('subject', '1900-01-01', '001') + _ = revisions_datasets_table(touch_path=session_path) # create some files fn_ses, _ = apt.make_parquet_db(tdir, hash_ids=False, lab='another') df_ses, _ = parquet.load(fn_ses) self.assertTrue((df_ses['lab'] == 'another').all()) diff --git a/one/tests/remote/test_globus.py b/one/tests/remote/test_globus.py index 47943d9b..6fa99a76 100644 --- a/one/tests/remote/test_globus.py +++ b/one/tests/remote/test_globus.py @@ -107,7 +107,7 @@ def test_as_globus_path(self): # Only test this on windows if sys.platform == 'win32': actual = globus.as_globus_path('/foo/bar') - self.assertEqual(actual, f'/{Path.cwd().drive[0]}/foo/bar') + self.assertEqual(actual, f'/{Path.cwd().drive[0].upper()}/foo/bar') # On all systems an explicit Windows path should be converted to a POSIX one actual = globus.as_globus_path(PureWindowsPath('E:\\FlatIron\\integration')) diff --git a/one/tests/test_converters.py b/one/tests/test_converters.py index 27360417..79aa0b78 100644 --- a/one/tests/test_converters.py +++ b/one/tests/test_converters.py @@ -2,7 +2,7 @@ import unittest from unittest import mock from pathlib import Path, PurePosixPath, PureWindowsPath -from uuid import UUID +from uuid import UUID, uuid4 import datetime import pandas as pd @@ -116,11 +116,17 @@ def test_path2record(self): self.assertTrue(file.as_posix().endswith(rec['rel_path'])) # Test URL - parts = add_uuid_string(file, '94285bfd-7500-4583-83b1-906c420cc667').parts[-7:] + uuid = '6cbb724e-c7ec-4eab-b24b-555001502d10' + parts = add_uuid_string(file, uuid).parts[-7:] url = TEST_DB_2['base_url'] + '/'.join(('', *parts)) rec = self.one.path2record(url) self.assertIsInstance(rec, pd.Series) self.assertTrue(file.as_posix().endswith(rec['rel_path'])) + # With a UUID missing from cache, should return None + uuid = '94285bfd-7500-4583-83b1-906c420cc667' + parts = add_uuid_string(file, uuid).parts[-7:] + url = TEST_DB_2['base_url'] + '/'.join(('', *parts)) + self.assertIsNone(self.one.path2record(url)) file = file.parent / '_fake_obj.attr.npy' self.assertIsNone(self.one.path2record(file)) @@ -277,14 +283,18 @@ def test_record2path(self): # As pd.DataFrame idx = rec.rel_path == 'alf/probe00/_phy_spikes_subset.channels.npy' path = self.one.record2path(rec[idx]) - self.assertEqual(expected, path) + self.assertEqual([expected], path) + # Test validation + self.assertRaises(AssertionError, self.one.record2path, rec[idx].droplevel(0)) # no eid + self.assertRaises(TypeError, self.one.record2path, rec[idx].to_dict()) + unknown = rec[idx].squeeze().rename(index=(str(uuid4()), data_id)) + self.assertRaises(ValueError, self.one.record2path, unknown) # unknown eid # With UUID in file name try: self.one.uuid_filenames = True expected = expected.with_suffix(f'.{data_id}.npy') - self.assertEqual(expected, self.one.record2path(rec[idx])) # as pd.DataFrame + self.assertEqual([expected], self.one.record2path(rec[idx])) # as pd.DataFrame self.assertEqual(expected, self.one.record2path(rec[idx].squeeze())) # as pd.Series - self.assertEqual(expected, self.one.record2path(rec[idx].droplevel(0))) # no eid finally: self.one.uuid_filenames = False diff --git a/one/tests/test_one.py b/one/tests/test_one.py index 013542a5..19ae98f8 100644 --- a/one/tests/test_one.py +++ b/one/tests/test_one.py @@ -525,12 +525,13 @@ def test_get_details(self): def test_check_filesystem(self): """Test for One._check_filesystem. + Most is already covered by other tests, this checks that it can deal with dataset frame without eid index and without a session_path column. """ # Get two eids to test eids = self.one._cache['datasets'].index.get_level_values(0)[[0, -1]] - datasets = self.one._cache['datasets'].loc[eids].drop('session_path', axis=1) + datasets = self.one._cache['datasets'].loc[eids] files = self.one._check_filesystem(datasets) self.assertEqual(53, len(files)) # Expect same number of unique session paths as eids @@ -540,7 +541,7 @@ def test_check_filesystem(self): session_parts = self.one._cache['sessions'].loc[eids, ['subject', 'date', 'number']].values self.assertCountEqual(expected, map(lambda x: f'{x[0]}/{x[1]}/{x[2]:03}', session_parts)) # Attempt the same with the eid index missing - datasets = datasets.droplevel(0).drop('session_path', axis=1) + datasets = datasets.droplevel(0) self.assertEqual(files, self.one._check_filesystem(datasets)) # Test with uuid_filenames as True self.one.uuid_filenames = True @@ -675,7 +676,7 @@ def test_load_datasets(self): # With relative paths provided as input, dataset uniqueness validation is suppressed. eid = self.one._cache.sessions.iloc[0].name datasets = util.revisions_datasets_table( - revisions=('', '2020-01-08'), attributes=('times',), touch_path=self.one.cache_dir) + revisions=('', '2020-01-08'), attributes=('times',), touch_path=self.one.eid2path(eid)) datasets['default_revision'] = [False, True] * 3 datasets.index = datasets.index.set_levels([eid], level=0) self.one._cache.datasets = datasets @@ -958,7 +959,8 @@ def test_save_loaded_ids(self): old_cache = self.one._cache['datasets'] try: datasets = [self.one._cache.datasets, dset.to_frame().T] - datasets = pd.concat(datasets).astype(old_cache.dtypes) + datasets = pd.concat(datasets).astype(old_cache.dtypes).sort_index() + datasets.index.set_names(('eid', 'id'), inplace=True) self.one._cache['datasets'] = datasets dsets = [dset['rel_path'], '_ibl_trials.feedback_times.npy'] new_files, rec = self.one.load_datasets(eid, dsets, assert_present=False) @@ -1330,9 +1332,6 @@ def test_list_aggregates(self): # Test listing by relation datasets = self.one.list_aggregates('subjects') self.assertTrue(all(datasets['rel_path'].str.startswith('aggregates/Subjects'))) - self.assertIn('exists_aws', datasets.columns) - self.assertIn('session_path', datasets.columns) - self.assertTrue(all(datasets['session_path'] == '')) self.assertTrue(self.one.list_aggregates('foobar').empty) # Test filtering with an identifier datasets = self.one.list_aggregates('subjects', 'ZM_1085') @@ -1700,6 +1699,8 @@ def test_download_datasets(self): rec = self.one.list_datasets(self.eid, details=True) rec = rec[rec.rel_path.str.contains('00/pykilosort/channels.brainLocation')] rec['exists_aws'] = False # Ensure we use FlatIron for this + rec = pd.concat({str(self.eid): rec}, names=['eid']) + files = self.one._download_datasets(rec) self.assertFalse(None in files) @@ -1710,8 +1711,11 @@ def test_download_datasets(self): def test_download_aws(self): """Test for OneAlyx._download_aws method.""" # Test download datasets via AWS - dsets = self.one.list_datasets(self.eid, details=True) - file = self.one.cache_dir / dsets['rel_path'].values[0] + dsets = self.one.list_datasets( + self.eid, filename='*wiring.json', collection='raw_ephys_data/probe??', details=True) + dsets = pd.concat({str(self.eid): dsets}, names=['eid']) + + file = self.one.eid2path(self.eid) / dsets['rel_path'].values[0] with mock.patch('one.remote.aws.get_s3_from_alyx', return_value=(None, None)), \ mock.patch('one.remote.aws.s3_download_file', return_value=file) as method: self.one._download_datasets(dsets) @@ -1723,23 +1727,25 @@ def test_download_aws(self): # Check keep_uuid = True self.one._download_datasets(dsets, keep_uuid=True) _, local = method.call_args.args - self.assertIn(dsets.iloc[-1].name, local.name) + self.assertIn(dsets.iloc[-1].name[1], local.name) # Test behaviour when dataset not remotely accessible dsets = dsets[:1].copy() - rec = self.one.alyx.rest('datasets', 'read', id=dsets.index[0]) + rec = self.one.alyx.rest('datasets', 'read', id=dsets.index[0][1]) # need to find the index of matching aws repo, this is not constant across releases iaws = list(map(lambda x: x['data_repository'].startswith('aws'), rec['file_records'])).index(True) rec['file_records'][iaws]['exists'] = False # Set AWS file record to non-existent + self.one._cache.datasets['exists_aws'] = True # Only changes column if exists with mock.patch('one.remote.aws.get_s3_from_alyx', return_value=(None, None)), \ mock.patch.object(self.one.alyx, 'rest', return_value=[rec]), \ self.assertLogs('one.api', logging.DEBUG) as log: - self.assertEqual([None], self.one._download_datasets(dsets)) - self.assertRegex(log.output[-1], 'Updating exists field') + # should still download file via fallback method + self.assertEqual([file], self.one._download_datasets(dsets)) + self.assertRegex(log.output[1], 'Updating exists field') datasets = self.one._cache['datasets'] self.assertFalse( - datasets.loc[pd.IndexSlice[:, dsets.index[0]], 'exists_aws'].any() + datasets.loc[pd.IndexSlice[:, dsets.index[0][1]], 'exists_aws'].any() ) # Check falls back to HTTP when error raised diff --git a/one/tests/util.py b/one/tests/util.py index b215ea09..98286c8d 100644 --- a/one/tests/util.py +++ b/one/tests/util.py @@ -11,6 +11,7 @@ import one.params from one.util import QC_TYPE +from one.converters import session_record2path def set_up_env() -> tempfile.TemporaryDirectory: @@ -68,7 +69,8 @@ def create_file_tree(one): """ # Create dset files from cache - for session_path, rel_path in one._cache.datasets[['session_path', 'rel_path']].values: + for (eid, _), rel_path in one._cache.datasets['rel_path'].items(): + session_path = session_record2path(one._cache.sessions.loc[eid]) filepath = Path(one.cache_dir).joinpath(session_path, rel_path) filepath.parent.mkdir(exist_ok=True, parents=True) filepath.touch() @@ -139,7 +141,7 @@ def revisions_datasets_table(collections=('', 'alf/probe00', 'alf/probe01'), Returns ------- pd.DataFrame - A datasets cache table containing datasets made from the input names + A datasets cache table containing datasets made from the input names. """ rel_path = [] @@ -149,7 +151,6 @@ def revisions_datasets_table(collections=('', 'alf/probe00', 'alf/probe01'), rel_path.append('/'.join(x for x in (collec, rev, f'{object}.{attr}.npy') if x)) d = { 'rel_path': rel_path, - 'session_path': 'subject/1900-01-01/001', 'file_size': 0, 'hash': None, 'exists': True, @@ -160,7 +161,7 @@ def revisions_datasets_table(collections=('', 'alf/probe00', 'alf/probe01'), if touch_path: for p in rel_path: - path = Path(touch_path).joinpath(d['session_path'] + '/' + p) + path = Path(touch_path).joinpath(p) path.parent.mkdir(parents=True, exist_ok=True) path.touch() diff --git a/one/util.py b/one/util.py index e9766a1b..8c5a3719 100644 --- a/one/util.py +++ b/one/util.py @@ -60,8 +60,8 @@ def _to_record(d): rec['eid'] = session.name file_path = urllib.parse.urlsplit(d['data_url'], allow_fragments=False).path.strip('/') file_path = get_alf_path(remove_uuid_string(file_path)) - rec['session_path'] = get_session_path(file_path).as_posix() - rec['rel_path'] = file_path[len(rec['session_path']):].strip('/') + session_path = get_session_path(file_path).as_posix() + rec['rel_path'] = file_path[len(session_path):].strip('/') rec['default_revision'] = d['default_revision'] == 'True' rec['qc'] = d.get('qc', 'NOT_SET') return rec @@ -106,10 +106,10 @@ def datasets2records(datasets, additional=None) -> pd.DataFrame: data_url = urllib.parse.urlsplit(file_record['data_url'], allow_fragments=False) file_path = get_alf_path(data_url.path.strip('/')) file_path = remove_uuid_string(file_path).as_posix() - rec['session_path'] = get_session_path(file_path) or '' - if rec['session_path']: - rec['session_path'] = rec['session_path'].as_posix() - rec['rel_path'] = file_path[len(rec['session_path']):].strip('/') + session_path = get_session_path(file_path) or '' + if session_path: + session_path = session_path.as_posix() + rec['rel_path'] = file_path[len(session_path):].strip('/') rec['default_revision'] = d['default_dataset'] rec['qc'] = d.get('qc') for field in additional or []: @@ -677,4 +677,6 @@ def patch_cache(table: pd.DataFrame, min_api_version=None, name=None) -> pd.Data if name == 'datasets' and min_version < version.Version('2.7.0') and 'qc' not in table.columns: qc = pd.Categorical.from_codes(np.zeros(len(table.index), dtype=int), dtype=QC_TYPE) table = table.assign(qc=qc) + if name == 'datasets' and 'session_path' in table.columns: + table = table.drop('session_path', axis=1) return table