From 7d589281a3b1aa7d78f4bc31a048a8dfdc6d7fd6 Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Thu, 10 Oct 2024 21:29:21 +0300 Subject: [PATCH 1/5] Remote database querying by default --- CHANGELOG.md | 8 +- docs/notebooks/one_modes.ipynb | 221 +++++++++++++++++---------------- one/api.py | 8 +- one/tests/test_one.py | 6 +- 4 files changed, 128 insertions(+), 115 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c317aeb..c7a601f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,15 @@ This version drops support for python 3.9 and below, and ONE is now in remote mo ### Modified - Supports python >= 3.10 only +- OneAlyx uses remote mode by default, instead of auto +- OneAlyx.search now updates the cache tables in remote mode as paginated sessions are accessed + +### Added + +- one.alf.cache.remove_cache_table_files and One.\_remove_cache_table_files for deleting cache table files ## [2.10.0] -This version improves behaviour of loading revisions and loading datasets from list_datasets output. +This version fixes issues with Alyx authentication in silent mode, and improves behaviour of loading revisions. ### Modified diff --git a/docs/notebooks/one_modes.ipynb b/docs/notebooks/one_modes.ipynb index a129c7d6..2830fbd8 100644 --- a/docs/notebooks/one_modes.ipynb +++ b/docs/notebooks/one_modes.ipynb @@ -2,6 +2,9 @@ "cells": [ { "cell_type": "markdown", + "metadata": { + "collapsed": false + }, "source": [ "# ONE API modes\n", "## Online vs Offline\n", @@ -11,16 +14,19 @@ "made [via REST](./one_advanced/one_advanced.html). Other online methods include `pid2eid` and\n", "`describe_revision`.\n", "\n", - "When the mode is not specified, it is usually set to 'auto', unless a cache_dir is specified for\n", + "When the mode is not specified, it is usually set to 'remote', unless a cache_dir is specified for\n", "which no database has been configured." - ], - "metadata": { - "collapsed": false - } + ] }, { "cell_type": "code", "execution_count": 2, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, "outputs": [ { "name": "stdout", @@ -36,31 +42,32 @@ "from one.api import ONE\n", "\n", "one = ONE()\n", - "print(one, one.mode) # online, 'auto' mode\n", + "print(one, one.mode) # online, 'remote' mode\n", "assert not one.offline\n", "\n", "one_offline = ONE(cache_dir=r'C:\\data')\n", "print(one_offline, one_offline.mode) # offline, 'local' mode\n", "assert one_offline.offline" - ], + ] + }, + { + "cell_type": "markdown", "metadata": { "collapsed": false, "pycharm": { - "name": "#%%\n" + "name": "#%% md\n" } - } - }, - { - "cell_type": "markdown", + }, "source": [ "## Query modes\n", "In 'auto' mode, the list, search and load methods will use the local cache tables and not\n", - "connect to Alyx, however the option to use Alyx is always there. If the cache tables can't be\n", - "downloaded from Alyx, or authentication fails, the mode will fall back to 'local'.\n", + "connect to Alyx, however the option to use Alyx is always there. In this mode, an up-to-date\n", + "copy of the database session and dataset tables is automatically downloaded. If these cache\n", + "tables can't be downloaded from Alyx, or authentication fails, the mode will fall back to 'local'.\n", "\n", "If 'remote' mode is specified, ONE will only query the remote database and will not use the\n", "local cache tables. Avoiding the database whenever possible is recommended as it doesn't rely\n", - "on a stable internet connection and reduces the load on the remote Alyx.\n", + "on a stable internet connection and reduces the load on the remote database.\n", "\n", "While in 'remote' mode, the local cache may be used by providing the query_type='local' keyword\n", "argument to any method. Likewise, in 'auto'/'local' mode, a remote query can be made by\n", @@ -70,165 +77,171 @@ "NB: The 'remote' query type is not valid in offline mode as there is no database associated to\n", "the local cache directory.\n", "" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } + ] }, { "cell_type": "code", "execution_count": 3, - "outputs": [], - "source": [ - "eids = one.search(lab='cortexlab', query_type='remote') # Search Alyx instead of the local cache" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "eids = one.search(lab='cortexlab', query_type='remote') # Search Alyx instead of the local cache" + ] }, { "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, "source": [ "## REST caching\n", "In remote mode ONE makes a REST query instead of using the local cache tables. The results of\n", "the remote REST queries are also cached for 24 hours. This means that making the same remote\n", "REST query twice in a row will only hit the database once. The default cache expiry can be set\n", "by changing the relevant AlyxClient property:" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } + ] }, { "cell_type": "code", "execution_count": null, - "outputs": [], - "source": [ - "from datetime import timedelta\n", - "one.alyx.default_expiry = timedelta(days=20) # Cache results for up to 20 days" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "from datetime import timedelta\n", + "one.alyx.default_expiry = timedelta(days=20) # Cache results for up to 20 days" + ] }, { "cell_type": "markdown", - "source": [ - "You can temporarily deactivate the REST cache using the `no_cache` function in `one.webclient`.\n", - "When in this context no REST responses are cached and any existing cache files are not used.\n", - "Use this when the most up-to-date information is required:" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } - } + }, + "source": [ + "You can temporarily deactivate the REST cache using the `no_cache` function in `one.webclient`.\n", + "When in this context no REST responses are cached and any existing cache files are not used.\n", + "Use this when the most up-to-date information is required:" + ] }, { "cell_type": "code", "execution_count": null, - "outputs": [], - "source": [ - "with webclient.no_cache(one.alyx):\n", - " eids, det = one.search(lab='cortexlab', query_type='remote')" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "with webclient.no_cache(one.alyx):\n", + " eids, det = one.search(lab='cortexlab', query_type='remote')" + ] }, { "cell_type": "markdown", - "source": [ - "When calling the alyx `rest` method directly you can deactivate the cache with the `no_cache`\n", - "keyword argument:\n" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } - } + }, + "source": [ + "When calling the alyx `rest` method directly you can deactivate the cache with the `no_cache`\n", + "keyword argument:\n" + ] }, { "cell_type": "code", "execution_count": null, - "outputs": [], - "source": [ - "ses = one.alyx.rest('sessions', 'list', lab='cortexlab', no_cache=True)" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "ses = one.alyx.rest('sessions', 'list', lab='cortexlab', no_cache=True)" + ] }, { "cell_type": "markdown", - "source": [ - "Caching greatly improves performance and should be used whenever possible.\n", - "For more information on ONE REST queries, see [this guide](./one_advanced/one_advanced.html).\n", - "\n", - "You can turn off REST caching when instantiating ONE with the `cache_rest` keyword argument:" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } - } + }, + "source": [ + "Caching greatly improves performance and should be used whenever possible.\n", + "For more information on ONE REST queries, see [this guide](./one_advanced/one_advanced.html).\n", + "\n", + "You can turn off REST caching when instantiating ONE with the `cache_rest` keyword argument:" + ] }, { "cell_type": "code", "execution_count": null, - "outputs": [], - "source": [ - "one = ONE(cache_rest=None, mode='remote')" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "one = ONE(cache_rest=None, mode='remote')" + ] }, { "cell_type": "markdown", - "source": [ - "## Refreshing the cache tables\n", - "By default ONE will try to update the cache tables once every 24 hours. This can be set by\n", - "changing the 'cache_expiry' property:" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } - } + }, + "source": [ + "## Refreshing the cache tables\n", + "While in 'auto' mode, ONE will try to update the cache tables once every 24 hours.\n", + "This can be changed by changing the 'cache_expiry' property:" + ] }, { "cell_type": "code", "execution_count": 4, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, "outputs": [ { "data": { - "text/plain": "{'expired': False,\n 'created_time': datetime.datetime(2021, 9, 14, 13, 0),\n 'loaded_time': datetime.datetime(2021, 9, 14, 18, 15, 54, 384591),\n 'raw': {'datasets': {'date_created': '2021-09-14 13:00', 'origin': 'alyx'},\n 'sessions': {'date_created': '2021-09-14 13:00', 'origin': 'alyx'}}}" + "text/plain": [ + "{'expired': False,\n", + " 'created_time': datetime.datetime(2021, 9, 14, 13, 0),\n", + " 'loaded_time': datetime.datetime(2021, 9, 14, 18, 15, 54, 384591),\n", + " 'raw': {'datasets': {'date_created': '2021-09-14 13:00', 'origin': 'alyx'},\n", + " 'sessions': {'date_created': '2021-09-14 13:00', 'origin': 'alyx'}}}" + ] }, "execution_count": 4, "metadata": {}, @@ -241,44 +254,44 @@ "\n", "# The time when the cache was generated can be found in the cache metadata:\n", "one._cache._meta" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } + ] }, { "cell_type": "markdown", - "source": [ - "Note that the cache won't be downloaded if the remote cache hasn't been updated since the last\n", - "download. The cache can be explicitly refreshed in two ways:" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } - } + }, + "source": [ + "Note that the cache won't be downloaded if the remote cache hasn't been updated since the last\n", + "download. The cache can be explicitly refreshed in two ways:" + ] }, { "cell_type": "code", "execution_count": 5, - "outputs": [], - "source": [ - "loaded_time = one.refresh_cache('refresh') # Explicitly refresh the cache\n", - "eids = one.search(lab='cortexlab', query_type='refresh') # Calls `refresh_cache` before searching" - ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } + }, + "outputs": [], + "source": [ + "loaded_time = one.refresh_cache('refresh') # Explicitly refresh the cache\n", + "eids = one.search(lab='cortexlab', query_type='refresh') # Calls `refresh_cache` before searching" + ] }, { "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, "source": [ "## Summary\n", "Mode overview:\n", @@ -297,13 +310,7 @@ "\n", "**I want to use a remote ONE query with up-to-the-minute information**\n", "Call the ONE method from within the `webclient.no_cache` context." - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } + ] } ], "metadata": { @@ -327,4 +334,4 @@ }, "nbformat": 4, "nbformat_minor": 0 -} \ No newline at end of file +} diff --git a/one/api.py b/one/api.py index c375f735..ac1416f0 100644 --- a/one/api.py +++ b/one/api.py @@ -49,7 +49,7 @@ class One(ConversionMixin): uuid_filenames = None """bool: whether datasets on disk have a UUID in their filename.""" - def __init__(self, cache_dir=None, mode='auto', wildcards=True, tables_dir=None): + def __init__(self, cache_dir=None, mode='local', wildcards=True, tables_dir=None): """An API for searching and loading data on a local filesystem Parameters @@ -1541,7 +1541,7 @@ def setup(cache_dir=None, silent=False, **kwargs): @lru_cache(maxsize=1) -def ONE(*, mode='auto', wildcards=True, **kwargs): +def ONE(*, mode='remote', wildcards=True, **kwargs): """ONE API factory. Determine which class to instantiate depending on parameters passed. @@ -1579,7 +1579,7 @@ def ONE(*, mode='auto', wildcards=True, **kwargs): if kwargs.pop('offline', False): _logger.warning('the offline kwarg will probably be removed. ' 'ONE is now offline by default anyway') - warnings.warn('"offline" param will be removed; use mode="local"', DeprecationWarning) + warnings.warn('"offline" param will be removed; use mode="local"', FutureWarning) mode = 'local' if (any(x in kwargs for x in ('base_url', 'username', 'password')) or @@ -1598,7 +1598,7 @@ def ONE(*, mode='auto', wildcards=True, **kwargs): class OneAlyx(One): """An API for searching and loading data through the Alyx database.""" def __init__(self, username=None, password=None, base_url=None, cache_dir=None, - mode='auto', wildcards=True, tables_dir=None, **kwargs): + mode='remote', wildcards=True, tables_dir=None, **kwargs): """An API for searching and loading data through the Alyx database. Parameters diff --git a/one/tests/test_one.py b/one/tests/test_one.py index c88351be..8bd01797 100644 --- a/one/tests/test_one.py +++ b/one/tests/test_one.py @@ -1419,7 +1419,7 @@ def tearDownClass(cls) -> None: class TestOneRemote(unittest.TestCase): """Test remote queries using OpenAlyx""" def setUp(self) -> None: - self.one = OneAlyx(**TEST_DB_2) + self.one = OneAlyx(**TEST_DB_2, mode='auto') self.eid = '4ecb5d24-f5cc-402c-be28-9d0f7cb14b3a' self.pid = 'da8dfec1-d265-44e8-84ce-6ae9c109b8bd' # Set cache directory to a temp dir to ensure that we re-download files @@ -1634,7 +1634,7 @@ def setUp(self) -> None: self.patch = mock.patch('one.params.iopar.getfile', new=partial(util.get_file, self.tempdir.name)) self.patch.start() - self.one = OneAlyx(**TEST_DB_2, cache_dir=self.tempdir.name) + self.one = OneAlyx(**TEST_DB_2, cache_dir=self.tempdir.name, mode='auto') self.fid = '17ab5b57-aaf6-4016-9251-66daadc200c7' # File record of channels.brainLocation self.eid = 'aad23144-0e52-4eac-80c5-c4ee2decb198' @@ -1976,7 +1976,7 @@ def test_one_factory(self): self.assertIsInstance(one_obj, One) # The offline param was given, raise deprecation warning (via log) - with self.assertWarns(DeprecationWarning): + with self.assertWarns(FutureWarning): ONE(offline=True, cache_dir=self.tempdir.name) # Test setup with virtual ONE method From 770d2089d3a87b517245ad17b0677ebdcd0cc52b Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Fri, 11 Oct 2024 12:06:51 +0300 Subject: [PATCH 2/5] Handle loaded_time == None in refresh_cache --- one/api.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/one/api.py b/one/api.py index ac1416f0..3276c210 100644 --- a/one/api.py +++ b/one/api.py @@ -236,7 +236,8 @@ def refresh_cache(self, mode='auto'): if mode in {'local', 'remote'}: pass elif mode == 'auto': - if datetime.now() - self._cache['_meta']['loaded_time'] >= self.cache_expiry: + loaded_time = self._cache['_meta']['loaded_time'] + if not loaded_time or (datetime.now() - loaded_time >= self.cache_expiry): _logger.info('Cache expired, refreshing') self.load_cache() elif mode == 'refresh': From 46472037bf994f386d5786c692da63e66fa492d8 Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Fri, 11 Oct 2024 14:01:00 +0300 Subject: [PATCH 3/5] Update cache from records in OneAlyx.search --- one/api.py | 50 +++++++++++++++++++++++++++++------- one/tests/test_alyxclient.py | 22 ++++++++++++++++ one/tests/test_one.py | 21 ++++++++++++++- one/webclient.py | 43 +++++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 10 deletions(-) diff --git a/one/api.py b/one/api.py index 3276c210..7791e8ac 100644 --- a/one/api.py +++ b/one/api.py @@ -3,6 +3,7 @@ import urllib.parse import warnings import logging +from weakref import WeakMethod from datetime import datetime, timedelta from functools import lru_cache, partial from inspect import unwrap @@ -98,14 +99,17 @@ def search_terms(self, query_type=None) -> tuple: def _reset_cache(self): """Replace the cache object with a Bunch that contains the right fields.""" - self._cache = Bunch({'_meta': { - 'expired': False, - 'created_time': None, - 'loaded_time': None, - 'modified_time': None, - 'saved_time': None, - 'raw': {} # map of original table metadata - }}) + self._cache = Bunch({ + 'datasets': pd.DataFrame(columns=DATASETS_COLUMNS).set_index(['eid', 'id']), + 'sessions': pd.DataFrame(columns=SESSIONS_COLUMNS).set_index('id'), + '_meta': { + 'expired': False, + 'created_time': None, + 'loaded_time': None, + 'modified_time': None, + 'saved_time': None, + 'raw': {}} # map of original table metadata + }) def load_cache(self, tables_dir=None, **kwargs): """ @@ -187,7 +191,7 @@ def _save_cache(self, save_dir=None, force=False): If True, the cache is saved regardless of modification time. """ TIMEOUT = 5 # Delete lock file this many seconds after creation/modification or waiting - lock_file = Path(self.cache_dir).joinpath('.cache.lock') + lock_file = Path(self.cache_dir).joinpath('.cache.lock') # TODO use iblutil method here save_dir = Path(save_dir or self.cache_dir) meta = self._cache['_meta'] modified = meta.get('modified_time') or datetime.min @@ -2265,6 +2269,18 @@ def search(self, details=False, query_type=None, **kwargs): params.pop('django') # Make GET request ses = self.alyx.rest(self._search_endpoint, 'list', **params) + + # Update cache table with results + if len(ses) == 0: + pass # no need to update cache here + elif isinstance(ses, list): # not a paginated response + self._update_sessions_table(ses) + else: + # populate first page + self._update_sessions_table(ses._cache[:ses.limit]) + # Add callback for updating cache on future fetches + ses.add_callback(WeakMethod(self._update_sessions_table)) + # LazyId only transforms records when indexed eids = util.LazyId(ses) if not details: @@ -2278,6 +2294,22 @@ def _add_date(records): return eids, util.LazyId(ses, func=_add_date) + def _update_sessions_table(self, session_records): + """Update the sessions tables with a list of session records. + + Parameters + ---------- + session_records : list of dict + A list of session records from the /sessions list endpoint. + + Returns + ------- + datetime.datetime: + A timestamp of when the cache was updated. + """ + df = pd.DataFrame(next(zip(*map(util.ses2records, session_records)))) + return self._update_cache_from_records(sessions=df) + def _download_datasets(self, dsets, **kwargs) -> List[Path]: """ Download a single or multitude of datasets if stored on AWS, otherwise calls diff --git a/one/tests/test_alyxclient.py b/one/tests/test_alyxclient.py index 09615e6e..1b352b89 100644 --- a/one/tests/test_alyxclient.py +++ b/one/tests/test_alyxclient.py @@ -3,6 +3,7 @@ from unittest import mock import urllib.parse import random +import weakref import os import one.webclient as wc import one.params @@ -498,12 +499,24 @@ def test_paginated_response(self): self.assertTrue(not any(pg._cache[lim:])) self.assertIs(pg.alyx, alyx) + # Check adding callbacks + self.assertRaises(TypeError, pg.add_callback, None) + wf = mock.Mock(spec_set=weakref.ref) + cb1, cb2 = mock.MagicMock(), wf() + pg.add_callback(cb1) + pg.add_callback(wf) + self.assertEqual(2, len(pg._callbacks)) + # Check fetching cached item with +ve int self.assertEqual({'id': 1}, pg[1]) alyx._generic_request.assert_not_called() + for cb in [cb1, cb2]: + cb.assert_not_called() # Check fetching cached item with +ve slice self.assertEqual([{'id': 1}, {'id': 2}], pg[1:3]) alyx._generic_request.assert_not_called() + for cb in [cb1, cb2]: + cb.assert_not_called() # Check fetching cached item with -ve int self.assertEqual({'id': 100}, pg[-1900]) alyx._generic_request.assert_not_called() @@ -518,6 +531,10 @@ def test_paginated_response(self): self.assertEqual(res['results'], pg._cache[offset:offset + lim]) alyx._generic_request.assert_called_once_with(requests.get, mock.ANY, clobber=True) self._check_get_query(alyx._generic_request.call_args, lim, offset) + for cb in [cb1, cb2]: + cb.assert_called_once_with(res['results']) + # Check that dead weakreaf will be removed from the list on next call + wf.return_value = None # Check fetching uncached item with -ve int offset = lim * 3 res['results'] = [{'id': i} for i in range(offset, offset + lim)] @@ -527,6 +544,7 @@ def test_paginated_response(self): self.assertEqual(res['results'], pg._cache[offset:offset + lim]) alyx._generic_request.assert_called_with(requests.get, mock.ANY, clobber=True) self._check_get_query(alyx._generic_request.call_args, lim, offset) + self.assertEqual(1, len(pg._callbacks), 'failed to remove weakref callback') # Check fetching uncached item with +ve slice offset = lim * 5 res['results'] = [{'id': i} for i in range(offset, offset + lim)] @@ -548,6 +566,10 @@ def test_paginated_response(self): self.assertEqual(expected_calls := 4, alyx._generic_request.call_count) self.assertEqual((expected_calls + 1) * lim, sum(list(map(bool, pg._cache)))) + # Check callbacks cleared when cache fully populated + self.assertTrue(all(map(bool, pg))) + self.assertEqual(0, len(pg._callbacks)) + def _check_get_query(self, call_args, limit, offset): """Check URL get query contains the expected limit and offset params.""" (_, url), _ = call_args diff --git a/one/tests/test_one.py b/one/tests/test_one.py index 8bd01797..62f8cddc 100644 --- a/one/tests/test_one.py +++ b/one/tests/test_one.py @@ -1470,8 +1470,17 @@ def test_list_datasets(self): def test_search(self): """Test OneAlyx.search method in remote mode.""" + # Modify sessions dataframe so we can check that the records get updated + records = self.one._cache.sessions[self.one._cache.sessions.subject == 'SWC_043'] + self.one._cache.sessions.loc[records.index, 'lab'] = 'foolab' # change a field + self.one._cache.sessions.drop(self.eid, inplace=True) # remove a row + + # Check remote seach of subject eids = self.one.search(subject='SWC_043', query_type='remote') self.assertIn(self.eid, list(eids)) + updated = self.one._cache.sessions[self.one._cache.sessions.subject == 'SWC_043'] + self.assertCountEqual(eids, updated.index) + self.assertFalse('foolab' in updated['lab']) eids, det = self.one.search(subject='SWC_043', query_type='remote', details=True) correct = len(det) == len(eids) and 'url' in det[0] and det[0]['url'].endswith(eids[0]) @@ -1496,10 +1505,20 @@ def test_search(self): dates = set(map(lambda x: self.one.get_details(x)['date'], eids)) self.assertTrue(dates <= set(date_range)) - # Test limit arg and LazyId + # Test limit arg, LazyId, and update with paginated response callback + self.one._reset_cache() # Remove sessions table + assert self.one._cache.sessions.empty eids = self.one.search(date='2020-03-23', limit=2, query_type='remote') + self.assertEqual(2, len(self.one._cache.sessions), + 'failed to update cache with first page of search results') self.assertIsInstance(eids, LazyId) + assert len(eids) > 5, 'in order to check paginated response callback we need several pages' + e = eids[-3] # access an uncached value + self.assertEqual( + 4, len(self.one._cache.sessions), 'failed to update cache after page access') + self.assertTrue(e in self.one._cache.sessions.index) self.assertTrue(all(len(x) == 36 for x in eids)) + self.assertEqual(len(eids), len(self.one._cache.sessions)) # Test laboratory kwarg eids = self.one.search(laboratory='hoferlab', query_type='remote') diff --git a/one/webclient.py b/one/webclient.py index d6b749d7..64b83f68 100644 --- a/one/webclient.py +++ b/one/webclient.py @@ -40,6 +40,7 @@ from typing import Optional from datetime import datetime, timedelta from pathlib import Path +from weakref import ReferenceType import warnings import hashlib import zipfile @@ -206,6 +207,23 @@ def __init__(self, alyx, rep, cache_args=None): # fill the cache with results of the query for i in range(self.limit): self._cache[i] = rep['results'][i] + self._callbacks = set() + + def add_callback(self, cb): + """Add a callback function to use each time a new page is fetched. + + The callback function will be called with the page results each time :meth:`populate` + is called. + + Parameters + ---------- + cb : callable + A callable that takes the results of each paginated resonse. + """ + if not callable(cb): + raise TypeError(f'Expected type "callable", got "{type(cb)}" instead') + else: + self._callbacks.add(cb) def __len__(self): return self.count @@ -222,6 +240,16 @@ def __getitem__(self, item): return self._cache[item] def populate(self, idx): + """Populate response cache with new page of results. + + Fetches the specific page of results containing the index passed and populates + stores the results in the :prop:`_cache` property. + + Parameters + ---------- + idx : int + The index of a given record to fetch. + """ offset = self.limit * math.floor(idx / self.limit) query = update_url_params(self.query, {'limit': self.limit, 'offset': offset}) res = self.alyx._generic_request(requests.get, query, **self._cache_args) @@ -231,6 +259,21 @@ def populate(self, idx): f'results may be inconsistent', RuntimeWarning) for i, r in enumerate(res['results'][:self.count - offset]): self._cache[i + offset] = res['results'][i] + # Notify callbacks + pending_removal = [] + for callback in self._callbacks: + # Handle weak reference callbacks first + if isinstance(callback, ReferenceType): + wf = callback + if (callback := wf()) is None: + pending_removal.append(wf) + continue + callback(res['results']) + for wf in pending_removal: + self._callbacks.discard(wf) + # When cache is complete, clear our callbacks + if all(reversed(self._cache)): + self._callbacks.clear() def __iter__(self): for i in range(self.count): From 76211110b5542f5747a9283fa6c5e3e23d799d2c Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Tue, 15 Oct 2024 11:54:33 +0300 Subject: [PATCH 4/5] Minor fixes --- one/api.py | 6 +----- one/tests/test_one.py | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/one/api.py b/one/api.py index 7791e8ac..ea94c13b 100644 --- a/one/api.py +++ b/one/api.py @@ -150,13 +150,10 @@ def load_cache(self, tables_dir=None, **kwargs): self._cache[table] = cache - if len(self._cache) == 1: + if meta['loaded_time'] is None: # No tables present meta['expired'] = True meta['raw'] = {} - self._cache.update({ - 'datasets': pd.DataFrame(columns=DATASETS_COLUMNS).set_index(['eid', 'id']), - 'sessions': pd.DataFrame(columns=SESSIONS_COLUMNS).set_index('id')}) if self.offline: # In online mode, the cache tables should be downloaded later warnings.warn(f'No cache tables found in {self._tables_dir}') created = [datetime.fromisoformat(x['date_created']) @@ -502,7 +499,6 @@ def sort_fcn(itm): search_terms = self.search_terms(query_type='local') queries = {util.autocomplete(k, search_terms): v for k, v in kwargs.items()} for key, value in sorted(queries.items(), key=sort_fcn): - # key = util.autocomplete(key) # Validate and get full name # No matches; short circuit if sessions.size == 0: return ([], None) if details else [] diff --git a/one/tests/test_one.py b/one/tests/test_one.py index 62f8cddc..2350a310 100644 --- a/one/tests/test_one.py +++ b/one/tests/test_one.py @@ -53,6 +53,7 @@ import one.params import one.alf.exceptions as alferr from one.alf import spec +from one.alf.files import get_alf_path from . import util from . import OFFLINE_ONLY, TEST_DB_1, TEST_DB_2 # 1 = TestAlyx; 2 = OpenAlyx @@ -1654,18 +1655,16 @@ def setUp(self) -> None: new=partial(util.get_file, self.tempdir.name)) self.patch.start() self.one = OneAlyx(**TEST_DB_2, cache_dir=self.tempdir.name, mode='auto') - self.fid = '17ab5b57-aaf6-4016-9251-66daadc200c7' # File record of channels.brainLocation + self.fid = '6f175a7a-e20b-4622-81fc-08947a4fd1d3' # File record of wiring.json self.eid = 'aad23144-0e52-4eac-80c5-c4ee2decb198' + self.did = 'd693fbf9-2f90-4123-839e-41474c44742d' def test_download_datasets(self): """Test OneAlyx._download_dataset, _download_file and _dset2url.""" det = self.one.get_details(self.eid, True) - rec = next(x for x in det['data_dataset_session_related'] - if 'channels.brainLocation' in x['dataset_type']) + rec = next(x for x in det['data_dataset_session_related'] if x['id'] == self.did) # FIXME hack because data_url may be AWS - from one.alf.files import get_alf_path rec['data_url'] = self.one.alyx.rel_path2url(get_alf_path(rec['data_url'])) - # FIXME order may not be stable, this only works file = self.one._download_dataset(rec) self.assertIsInstance(file, Path) self.assertTrue(file.exists()) @@ -1704,10 +1703,10 @@ def test_download_datasets(self): if fi != 0: rec['file_records'] = [rec['file_records'].pop(fi), *rec['file_records']] file = self.one._download_dataset(rec, keep_uuid=True) - self.assertEqual(str(file).split('.')[2], rec['url'].split('/')[-1]) + self.assertEqual(file.stem.split('.')[-1], rec['url'].split('/')[-1]) # Check list input - recs = [rec, sorted(det['data_dataset_session_related'], key=lambda x: x['file_size'])[0]] + recs = [rec, sorted(det['data_dataset_session_related'], key=lambda x: x['file_size'])[1]] files = self.one._download_dataset(recs) self.assertIsInstance(files, list) self.assertTrue(all(isinstance(x, Path) for x in files)) @@ -1715,7 +1714,7 @@ def test_download_datasets(self): # Check Series input r_ = datasets2records(rec).squeeze() file = self.one._download_dataset(r_) - self.assertIn('channels.brainLocation', file.as_posix()) + self.assertIn('imec0.wiring.json', file.name) # Check behaviour when URL invalid did = rec['url'].split('/')[-1] @@ -1737,10 +1736,11 @@ def test_download_datasets(self): file = self.one._download_dataset(path) self.assertIsNone(file) + # Check data frame record rec = self.one.list_datasets(self.eid, details=True) - rec = rec[rec.rel_path.str.contains('00/pykilosort/channels.brainLocation')] - rec['exists_aws'] = False # Ensure we use FlatIron for this - rec = pd.concat({str(self.eid): rec}, names=['eid']) + rec = rec[rec.rel_path.str.contains('00/_spikeglx_ephysData_g0_t0.imec0.wiring')] + rec.loc[self.did, 'exist_aws'] = False # Ensure we use FlatIron for this + rec = pd.concat({str(self.eid): rec}, names=['eid']) # Add back eid index files = self.one._download_datasets(rec) self.assertFalse(None in files) @@ -1755,6 +1755,7 @@ def test_download_aws(self): dsets = self.one.list_datasets( self.eid, filename='*wiring.json', collection='raw_ephys_data/probe??', details=True) dsets = pd.concat({str(self.eid): dsets}, names=['eid']) + assert len(dsets) == 2 file = self.one.eid2path(self.eid) / dsets['rel_path'].values[0] with mock.patch('one.remote.aws.get_s3_from_alyx', return_value=(None, None)), \ From 5c0d0000687bf7799e356ab67016a3ed8f116c10 Mon Sep 17 00:00:00 2001 From: Miles Wells Date: Tue, 15 Oct 2024 12:22:27 +0300 Subject: [PATCH 5/5] added remove_cache_table_files function --- CHANGELOG.md | 1 + one/alf/cache.py | 31 ++++++++++++++++++++++++++++++- one/api.py | 21 ++++++++++++++++++++- one/tests/test_one.py | 14 ++++++++++++++ 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a601f2..0a1e0b35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This version fixes issues with Alyx authentication in silent mode, and improves ### Added - one.alf.exceptions.ALFWarning category allows users to filter warnings relating to mixed revisions +- one.alf.cache.remove_cache_table_files and One._remove_cache_table_files for deleting cache table files ## [2.9.1] diff --git a/one/alf/cache.py b/one/alf/cache.py index 8fccaa95..572ba210 100644 --- a/one/alf/cache.py +++ b/one/alf/cache.py @@ -32,7 +32,9 @@ from one.converters import session_record2path from one.util import QC_TYPE, patch_cache -__all__ = ['make_parquet_db', 'remove_missing_datasets', 'DATASETS_COLUMNS', 'SESSIONS_COLUMNS'] +__all__ = [ + 'make_parquet_db', 'remove_missing_datasets', 'remove_cache_table_files', + 'DATASETS_COLUMNS', 'SESSIONS_COLUMNS'] _logger = logging.getLogger(__name__) # ------------------------------------------------------------------------------------------------- @@ -338,3 +340,30 @@ def remove_missing_datasets(cache_dir, tables=None, remove_empty_sessions=True, path = path.parent return sorted(to_delete) + + +def remove_cache_table_files(folder, tables=('sessions', 'datasets')): + """Delete cache tables on disk. + + Parameters + ---------- + folder : pathlib.Path + The directory path containing cache tables to remove. + tables : list of str + A list of table names to remove, e.g. ['sessions', 'datasets']. + NB: This will also delete the cache_info.json metadata file. + + Returns + ------- + list of pathlib.Path + A list of the removed files. + """ + filenames = ('cache_info.json', *(f'{t}.pqt' for t in tables)) + removed = [] + for file in map(folder.joinpath, filenames): + if file.exists(): + file.unlink() + removed.append(file) + else: + _logger.warning('%s not found', file) + return removed diff --git a/one/api.py b/one/api.py index ea94c13b..1d6503fc 100644 --- a/one/api.py +++ b/one/api.py @@ -28,7 +28,8 @@ import one.alf.io as alfio import one.alf.files as alfiles import one.alf.exceptions as alferr -from .alf.cache import make_parquet_db, DATASETS_COLUMNS, SESSIONS_COLUMNS +from .alf.cache import ( + make_parquet_db, remove_cache_table_files, DATASETS_COLUMNS, SESSIONS_COLUMNS) from .alf.spec import is_uuid_string, QC, to_alf from . import __version__ from one.converters import ConversionMixin, session_record2path @@ -111,6 +112,24 @@ def _reset_cache(self): 'raw': {}} # map of original table metadata }) + def _remove_cache_table_files(self, tables=None): + """Delete cache tables on disk. + + Parameters + ---------- + tables : list of str + A list of table names to removes, e.g. ['sessions', 'datasets']. + If None, the currently loaded table names are removed. NB: This + will also delete the cache_info.json metadata file. + + Returns + ------- + list of pathlib.Path + A list of the removed files. + """ + tables = tables or filter(lambda x: x[0] != '_', self._cache) + return remove_cache_table_files(self._tables_dir, tables) + def load_cache(self, tables_dir=None, **kwargs): """ Load parquet cache files from a local directory. diff --git a/one/tests/test_one.py b/one/tests/test_one.py index 2350a310..26b3543b 100644 --- a/one/tests/test_one.py +++ b/one/tests/test_one.py @@ -60,6 +60,7 @@ class TestONECache(unittest.TestCase): """Test methods that use sessions and datasets tables. + This class loads the parquet tables from the fixtures and builds a file tree in a temp folder """ tempdir = None @@ -1020,6 +1021,19 @@ def test_save_loaded_ids(self): self.assertEqual(ids, []) self.assertIsNone(filename) + def test_remove_cache_table_files(self): + """Test One._remove_cache_table_files method.""" + root = self.one._tables_dir + for name in ('cache_info.json', 'foo.pqt'): + root.joinpath(name).touch() + removed = self.one._remove_cache_table_files() + expected = ['sessions.pqt', 'datasets.pqt', 'cache_info.json'] + self.assertCountEqual(expected, [str(x.relative_to(root)) for x in removed]) + with self.assertLogs('one.alf.cache', 30) as cm: + removed = self.one._remove_cache_table_files(tables=('foo',)) + self.assertEqual([root / 'foo.pqt'], removed) + self.assertIn('cache_info.json not found', cm.records[0].message) + @unittest.skipIf(OFFLINE_ONLY, 'online only test') class TestOneAlyx(unittest.TestCase):