Skip to content

Commit

Permalink
Resolves issue #134 (#136)
Browse files Browse the repository at this point in the history
* Resolves issue #134
  • Loading branch information
k1o0 authored Oct 10, 2024
1 parent 0804835 commit b2cff8a
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This version improves behaviour of loading revisions and loading datasets from l
- bugfix of spurious error raised when loading dataset with a revision provided
- default_revisions_only parameter in One.list_datasets filters non-default datasets
- permit data frame input to One.load_datasets and load precise relative paths provided (instead of default revisions)
- redundent session_path column has been dropped from the datasets cache table

### Added

Expand Down
2 changes: 1 addition & 1 deletion docs/one_installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ one = ONE()
To change your default database, or re-run the setup for a given database, you can use the following

```python
ONE._setup(base_url='https://test.alyx.internationalbrainlab.org', make_default=True)
ONE.setup(base_url='https://test.alyx.internationalbrainlab.org', make_default=True)
```

## 4. Update
Expand Down
30 changes: 20 additions & 10 deletions one/alf/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from one.alf.io import iter_sessions, iter_datasets
from one.alf.files import session_path_parts, get_alf_path
from one.converters import session_record2path
from one.util import QC_TYPE
from one.util import QC_TYPE, patch_cache

__all__ = ['make_parquet_db', 'remove_missing_datasets', 'DATASETS_COLUMNS', 'SESSIONS_COLUMNS']
_logger = logging.getLogger(__name__)
Expand All @@ -52,7 +52,6 @@
DATASETS_COLUMNS = (
'id', # int64
'eid', # int64
'session_path', # relative to the root
'rel_path', # relative to the session path, includes the filename
'file_size', # file size in bytes
'hash', # sha1/md5, computed in load function
Expand Down Expand Up @@ -89,7 +88,6 @@ def _get_dataset_info(full_ses_path, rel_dset_path, ses_eid=None, compute_hash=F
return {
'id': Path(rel_ses_path, rel_dset_path).as_posix(),
'eid': str(ses_eid),
'session_path': str(rel_ses_path),
'rel_path': Path(rel_dset_path).as_posix(),
'file_size': file_size,
'hash': md5(full_dset_path) if compute_hash else None,
Expand Down Expand Up @@ -297,18 +295,30 @@ def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True,
if tables is None:
tables = {}
for name in ('datasets', 'sessions'):
tables[name], _ = parquet.load(cache_dir / f'{name}.pqt')
to_delete = []
table, m = parquet.load(cache_dir / f'{name}.pqt')
tables[name] = patch_cache(table, m.get('min_api_version'), name)

INDEX_KEY = '.?id'
for name in tables:
# Set the appropriate index if none already set
if isinstance(tables[name].index, pd.RangeIndex):
idx_columns = sorted(tables[name].filter(regex=INDEX_KEY).columns)
tables[name].set_index(idx_columns, inplace=True)

to_delete = set()
gen_path = partial(session_record2path, root_dir=cache_dir)
sessions = sorted(map(lambda x: gen_path(x[1]), tables['sessions'].iterrows()))
# map of session path to eid
sessions = {gen_path(rec): eid for eid, rec in tables['sessions'].iterrows()}
for session_path in iter_sessions(cache_dir):
rel_session_path = session_path.relative_to(cache_dir).as_posix()
datasets = tables['datasets'][tables['datasets']['session_path'] == rel_session_path]
try:
datasets = tables['datasets'].loc[sessions[session_path]]
except KeyError:
datasets = tables['datasets'].iloc[0:0, :]
for dataset in iter_datasets(session_path):
if dataset.as_posix() not in datasets['rel_path']:
to_delete.append(session_path.joinpath(dataset))
to_delete.add(session_path.joinpath(dataset))
if session_path not in sessions and remove_empty_sessions:
to_delete.append(session_path)
to_delete.add(session_path)

if dry:
print('The following session and datasets would be removed:', end='\n\t')
Expand Down
62 changes: 40 additions & 22 deletions one/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def load_cache(self, tables_dir=None, **kwargs):

# Set the appropriate index if none already set
if isinstance(cache.index, pd.RangeIndex):
idx_columns = cache.filter(regex=INDEX_KEY).columns.tolist()
idx_columns = sorted(cache.filter(regex=INDEX_KEY).columns)
if len(idx_columns) == 0:
raise KeyError('Failed to set index')
cache.set_index(idx_columns, inplace=True)
Expand Down Expand Up @@ -552,9 +552,8 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha
Given a set of datasets, check whether records correctly reflect the filesystem.
Called by load methods, this returns a list of file paths to load and return.
TODO This needs changing; overload for downloading?
This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to
update and save tables. Download_datasets can also call this function.
This changes datasets frame, calls _update_cache(sessions=None, datasets=None) to
update and save tables. Download_datasets may also call this function.
Parameters
----------
Expand All @@ -575,12 +574,18 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha
"""
if isinstance(datasets, pd.Series):
datasets = pd.DataFrame([datasets])
assert datasets.index.nlevels <= 2
idx_names = ['eid', 'id'] if datasets.index.nlevels == 2 else ['id']
datasets.index.set_names(idx_names, inplace=True)
elif not isinstance(datasets, pd.DataFrame):
# Cast set of dicts (i.e. from REST datasets endpoint)
datasets = util.datasets2records(list(datasets))
else:
datasets = datasets.copy()
indices_to_download = [] # indices of datasets that need (re)downloading
files = [] # file path list to return
# If the session_path field is missing from the datasets table, fetch from sessions table
# Typically only aggregate frames contain this column
if 'session_path' not in datasets.columns:
if 'eid' not in datasets.index.names:
# Get slice of full frame with eid in index
Expand Down Expand Up @@ -647,12 +652,12 @@ def _check_filesystem(self, datasets, offline=None, update_exists=True, check_ha

if self.record_loaded:
loaded = np.fromiter(map(bool, files), bool)
loaded_ids = np.array(datasets.index.to_list())[loaded]
loaded_ids = datasets.index.get_level_values('id')[loaded].to_numpy()
if '_loaded_datasets' not in self._cache:
self._cache['_loaded_datasets'] = np.unique(loaded_ids)
else:
loaded_set = np.hstack([self._cache['_loaded_datasets'], loaded_ids])
self._cache['_loaded_datasets'] = np.unique(loaded_set, axis=0)
self._cache['_loaded_datasets'] = np.unique(loaded_set)

# Return full list of file paths
return files
Expand Down Expand Up @@ -1013,6 +1018,9 @@ def load_object(self,

# For those that don't exist, download them
offline = None if query_type == 'auto' else self.mode == 'local'
if datasets.index.nlevels == 1:
# Reinstate eid index
datasets = pd.concat({str(eid): datasets}, names=['eid'])
files = self._check_filesystem(datasets, offline=offline, check_hash=check_hash)
files = [x for x in files if x]
if not files:
Expand Down Expand Up @@ -1117,6 +1125,9 @@ def load_dataset(self,
wildcards=self.wildcards, assert_unique=assert_unique)
if len(datasets) == 0:
raise alferr.ALFObjectNotFound(f'Dataset "{dataset}" not found')
if datasets.index.nlevels == 1:
# Reinstate eid index
datasets = pd.concat({str(eid): datasets}, names=['eid'])

# Check files exist / download remote files
offline = None if query_type == 'auto' else self.mode == 'local'
Expand Down Expand Up @@ -1288,6 +1299,9 @@ def _verify_specifiers(specifiers):
for x, y, z in zip(datasets, collections, revisions)]
present = [len(x) == 1 for x in slices]
present_datasets = pd.concat(slices)
if present_datasets.index.nlevels == 1:
# Reinstate eid index
present_datasets = pd.concat({str(eid): present_datasets}, names=['eid'])

# Check if user is blindly downloading all data and warn of non-default revisions
if 'default_revision' in present_datasets and \
Expand Down Expand Up @@ -1326,7 +1340,7 @@ def _verify_specifiers(specifiers):

# Make list of metadata Bunches out of the table
records = (present_datasets
.reset_index(names='id')
.reset_index(names=['eid', 'id'])
.to_dict('records', into=Bunch))

# Ensure result same length as input datasets list
Expand Down Expand Up @@ -1459,6 +1473,9 @@ def load_collection(self,
if len(datasets) == 0:
raise alferr.ALFObjectNotFound(object or '')
parts = [alfiles.rel_path_parts(x) for x in datasets.rel_path]
if datasets.index.nlevels == 1:
# Reinstate eid index
datasets = pd.concat({str(eid): datasets}, names=['eid'])

# For those that don't exist, download them
offline = None if query_type == 'auto' else self.mode == 'local'
Expand Down Expand Up @@ -1868,8 +1885,7 @@ def list_aggregates(self, relation: str, identifier: str = None,
all_aggregates = self.alyx.rest('datasets', 'list', django=query)
records = (util.datasets2records(all_aggregates)
.reset_index(level=0)
.drop('eid', axis=1)
.rename_axis(index={'id': 'did'}))
.drop('eid', axis=1))
# Since rel_path for public FI file records starts with 'public/aggregates' instead of just
# 'aggregates', we should discard the file path parts before 'aggregates' (if present)
records['rel_path'] = records['rel_path'].str.replace(
Expand All @@ -1890,11 +1906,6 @@ def path2id(p) -> str:
# NB: We avoid exact matches as most users will only include subject, not lab/subject
records = records[records['identifier'].str.contains(identifier)]

# Add exists_aws field for download method
for i, rec in records.iterrows():
fr = next(x['file_records'] for x in all_aggregates if x['url'].endswith(i))
records.loc[i, 'exists_aws'] = any(
x['data_repository'].startswith('aws') and x['exists'] for x in fr)
return util.filter_datasets(records, filename=dataset, revision=revision,
wildcards=True, assert_unique=assert_unique)

Expand Down Expand Up @@ -1950,6 +1961,7 @@ def load_aggregate(self, relation: str, identifier: str,
raise alferr.ALFObjectNotFound(
f'{dataset or "dataset"} not found for {relation}/{identifier}')
# update_exists=False because these datasets are not in the cache table
records['session_path'] = '' # explicitly add session path column
file, = self._check_filesystem(records, update_exists=False)
if not file:
raise alferr.ALFObjectNotFound('Dataset file not found on disk')
Expand Down Expand Up @@ -2286,9 +2298,12 @@ def _download_datasets(self, dsets, **kwargs) -> List[Path]:
try:
if not isinstance(dsets, pd.DataFrame):
raise TypeError('Input datasets must be a pandas data frame for AWS download.')
if 'exists_aws' in dsets and np.all(np.equal(dsets['exists_aws'].values, True)):
_logger.info('Downloading from AWS')
return self._download_aws(map(lambda x: x[1], dsets.iterrows()), **kwargs)
assert 'exists_aws' not in dsets or np.all(np.equal(dsets['exists_aws'].values, True))
_logger.debug('Downloading from AWS')
files = self._download_aws(map(lambda x: x[1], dsets.iterrows()), **kwargs)
# Trigger fallback download of any files missing on AWS
assert all(files), f'{sum(map(bool, files))} datasets not found on AWS'
return files
except Exception as ex:
_logger.debug(ex)
return self._download_dataset(dsets, **kwargs)
Expand Down Expand Up @@ -2338,15 +2353,18 @@ def _download_aws(self, dsets, update_exists=True, keep_uuid=None, **_) -> List[
# Fetch file record path
record = next((x for x in record['file_records']
if x['data_repository'].startswith('aws') and x['exists']), None)
if not record and update_exists and 'exists_aws' in self._cache['datasets']:
_logger.debug('Updating exists field')
self._cache['datasets'].loc[(slice(None), uuid), 'exists_aws'] = False
self._cache['_meta']['modified_time'] = datetime.now()
if not record:
if update_exists and 'exists_aws' in self._cache['datasets']:
_logger.debug('Updating exists field')
self._cache['datasets'].loc[(slice(None), uuid), 'exists_aws'] = False
self._cache['_meta']['modified_time'] = datetime.now()
out_files.append(None)
continue
assert record['relative_path'].endswith(dset['rel_path']), \
f'Relative path for dataset {uuid} does not match Alyx record'
source_path = PurePosixPath(record['data_repository_path'], record['relative_path'])
source_path = alfiles.add_uuid_string(source_path, uuid)
local_path = self.cache_dir.joinpath(dset['session_path'], dset['rel_path'])
local_path = self.cache_dir.joinpath(record['relative_path'])
if keep_uuid is True or (keep_uuid is None and self.uuid_filenames is True):
local_path = alfiles.add_uuid_string(local_path, uuid)
local_path.parent.mkdir(exist_ok=True, parents=True)
Expand Down
86 changes: 47 additions & 39 deletions one/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
from uuid import UUID
from inspect import unwrap
from pathlib import Path, PurePosixPath
from urllib.parse import urlsplit
from typing import Optional, Union, Mapping, List, Iterable as Iter

import pandas as pd
from iblutil.util import Bunch

from one.alf.spec import is_session_path, is_uuid_string
from one.alf.files import get_session_path, add_uuid_string, session_path_parts, remove_uuid_string
from .util import Listable, ensure_list
from one.alf.files import get_session_path, add_uuid_string, session_path_parts, get_alf_path
from .util import Listable


def recurse(func):
Expand Down Expand Up @@ -232,39 +231,43 @@ def path2record(self, path) -> pd.Series:
A cache file record
"""
is_session = is_session_path(path)
rec = self._cache['sessions' if is_session else 'datasets']
if rec.empty:
return
# if (rec := self._cache['datasets']).empty: # py 3.8
# return
if self._cache['sessions' if is_session else 'datasets'].empty:
return # short circuit: no records in the cache

if is_session_path(path):
lab, subject, date, number = session_path_parts(path)
rec = rec[
(rec['lab'] == lab) & (rec['subject'] == subject) &
(rec['number'] == int(number)) &
(rec['date'] == datetime.date.fromisoformat(date))
df = self._cache['sessions']
rec = df[
(df['lab'] == lab) & (df['subject'] == subject) &
(df['number'] == int(number)) &
(df['date'] == datetime.date.fromisoformat(date))
]
return None if rec.empty else rec.squeeze()

# Deal with file path
if isinstance(path, str) and path.startswith('http'):
# Remove the UUID from path
path = urlsplit(path).path.strip('/')
path = remove_uuid_string(PurePosixPath(path))
session_path = get_session_path(path).as_posix()
else:
# No way of knowing root session path parts without cache tables
eid = self.path2eid(path)
session_series = self.list_datasets(eid, details=True).session_path
if not eid or session_series.empty:
# Deal with dataset path
if isinstance(path, str):
path = Path(path)
# If there's a UUID in the path, use that to fetch the record
name_parts = path.stem.split('.')
if is_uuid_string(uuid := name_parts[-1]):
try:
return self._cache['datasets'].loc[pd.IndexSlice[:, uuid], :].squeeze()
except KeyError:
return
session_path, *_ = session_series

rec = rec[rec['session_path'] == session_path]
rec = rec[rec['rel_path'].apply(lambda x: path.as_posix().endswith(x))]
# Fetch via session record
eid = self.path2eid(path)
df = self.list_datasets(eid, details=True)
if not eid or df.empty:
return

# Find row where relative path matches
rec = df[df['rel_path'] == path.relative_to(get_session_path(path)).as_posix()]
assert len(rec) < 2, 'Multiple records found'
return None if rec.empty else rec.squeeze()
if rec.empty:
return None
# Convert slice to series and reinstate eid index if dropped
return rec.squeeze().rename(index=(eid, rec.index.get_level_values('id')[0]))

@recurse
def path2url(self, filepath):
Expand Down Expand Up @@ -313,19 +316,18 @@ def record2url(self, record):
session_spec = '{lab}/Subjects/{subject}/{date}/{number:03d}'
url = record.get('session_path') or session_spec.format(**record)
return webclient.rel_path2url(url)
uuid = ensure_list(record.name)[-1] # may be (eid, did) or simply did
else:
raise TypeError(
f'record must be pandas.DataFrame or pandas.Series, got {type(record)} instead')

session_path, rel_path = record[['session_path', 'rel_path']].to_numpy().flatten()
url = PurePosixPath(session_path, rel_path)
assert isinstance(record.name, tuple) and len(record.name) == 2
eid, uuid = record.name # must be (eid, did)
session_path = self.eid2path(eid)
url = PurePosixPath(get_alf_path(session_path), record['rel_path'])
return webclient.rel_path2url(add_uuid_string(url, uuid).as_posix())

def record2path(self, dataset) -> Optional[Path]:
"""
Given a set of dataset records, checks the corresponding exists flag in the cache
correctly reflects the files system.
Given a set of dataset records, returns the corresponding paths
Parameters
----------
Expand All @@ -337,13 +339,19 @@ def record2path(self, dataset) -> Optional[Path]:
pathlib.Path
File path for the record
"""
assert isinstance(dataset, pd.Series) or len(dataset) == 1
session_path, rel_path = dataset[['session_path', 'rel_path']].to_numpy().flatten()
file = Path(self.cache_dir, session_path, rel_path)
if isinstance(dataset, pd.DataFrame):
return [self.record2path(r) for _, r in dataset.iterrows()]
elif not isinstance(dataset, pd.Series):
raise TypeError(
f'record must be pandas.DataFrame or pandas.Series, got {type(dataset)} instead')
assert isinstance(dataset.name, tuple) and len(dataset.name) == 2
eid, uuid = dataset.name # must be (eid, did)
if not (session_path := self.eid2path(eid)):
raise ValueError(f'Failed to determine session path for eid "{eid}"')
file = session_path / dataset['rel_path']
if self.uuid_filenames:
i = dataset.name if isinstance(dataset, pd.Series) else dataset.index[0]
file = add_uuid_string(file, i[1] if isinstance(i, tuple) else i)
return file # files[0] if len(datasets) == 1 else files
file = add_uuid_string(file, uuid)
return file

@recurse
def eid2ref(self, eid: Union[str, Iter], as_dict=True, parse=True) \
Expand Down
Loading

0 comments on commit b2cff8a

Please sign in to comment.