diff --git a/asdf/_block/manager.py b/asdf/_block/manager.py index 99c4bf957..dca8f94c5 100644 --- a/asdf/_block/manager.py +++ b/asdf/_block/manager.py @@ -1,3 +1,4 @@ +import collections import contextlib import copy @@ -10,23 +11,15 @@ from .options import Options -class ReadBlocks(store.LinearStore): - """ - {obj: block_index} : where obj is NDArrayType or other high level object - [block_0, block_1, ...] - """ - - def set_blocks(self, blocks): - self._items = blocks - - def append_block(self, block): - self._items.append(block) +class ReadBlocks(collections.UserList): + # workaround inability to weakref a list + pass class OptionsStore(store.Store): """ - {array_base: options} - read_blocks (instance of ReadBlocks) + A Store of Options that can be accessed by Key + (see ``asdf._block.store.Store``). """ def __init__(self, read_blocks=None): @@ -34,10 +27,45 @@ def __init__(self, read_blocks=None): self._read_blocks = read_blocks def has_options(self, array): + """ + Check of Options have been defined for this array + without falling back to generating Options from + a ReadBlock. + + Parameters + ---------- + array : ndarray + The base of this array (see `asdf.util.get_array_base`) will + be used to lookup any Options in the Store. + + Returns + ------- + has_options : bool + True if Options were previously defined for this array. + """ base = util.get_array_base(array) return self.lookup_by_object(base) is not None def get_options_from_block(self, array): + """ + Get Options for some array using only settings read from a + corresponding ReadBlock (one that shares the same base array). + Any Options defined using previous calls to set_options will + be ignored (use ``get_options`` if you would like these previously + set options to be considered). + + Parameters + ---------- + array : ndarray + The base of this array (see `asdf.util.get_array_base`) will + be used to lookup a corresponding ReadBlock. + + Returns + ------- + options : Options or None + Options initialized from settings read from a ReadBlock + or None if no corresponding block was found. + """ base = util.get_array_base(array) # look up by block with matching _data for block in self._read_blocks: @@ -52,6 +80,27 @@ def get_options_from_block(self, array): return None def get_options(self, array): + """ + Get Options for some array using either previously defined + options (as set by ``set_options``) or settings read from a + corresponding ReadBlock (one that shares the same base array). + + Note that if no options are found in the Store and options + are found from a ReadBlock the resulting Options will be added + to the Store. + + Parameters + ---------- + array : ndarray + The base of this array (see `asdf.util.get_array_base`) will + be used to lookup any Options in the Store. + + Returns + ------- + options : Options or None + Options read from the Store or ReadBlocks or None if + no options were found. + """ base = util.get_array_base(array) options = self.lookup_by_object(base) if options is None: @@ -64,6 +113,23 @@ def get_options(self, array): return options def set_options(self, array, options): + """ + Set Options for an array. + + Parameters + ---------- + array : ndarray + The base of this array (see `asdf.util.get_array_base`) will + be used to add options to the Store. + + options : Options + The Options to add to the Store for this array. + + Raises + ------ + ValueError + If more than one block is set as a streamed block. + """ if options.storage_type == "streamed": for oid, by_key in self._by_id.items(): for key, opt in by_key.items(): @@ -78,6 +144,15 @@ def set_options(self, array, options): self.assign_object(base, options) def get_output_compressions(self): + """ + Get all output compression types used for this Store of + Options. + + Returns + ------- + compressions : list of bytes + List of 4 byte compression labels used for this OptionsStore. + """ compressions = set() cfg = config.get_config() if cfg.all_array_compression == "input": @@ -96,26 +171,57 @@ def get_output_compressions(self): class Manager: + """ + Manager for reading, writing and storing options for ASDF blocks. + """ + def __init__(self, read_blocks=None, uri=None, lazy_load=False, memmap=False, validate_checksums=False): if read_blocks is None: read_blocks = ReadBlocks([]) self.options = OptionsStore(read_blocks) - self.blocks = read_blocks + + self._blocks = read_blocks + self._external_block_cache = external.ExternalBlockCache() self._data_callbacks = store.Store() + self._write_blocks = store.LinearStore() self._external_write_blocks = [] self._streamed_write_block = None self._streamed_obj_keys = set() self._write_fd = None + self._uri = uri - self._external_block_cache = external.ExternalBlockCache() + + # general block settings self._lazy_load = lazy_load self._memmap = memmap self._validate_checksums = validate_checksums + @property + def blocks(self): + """ + Get any ReadBlocks that were read from an ASDF file + + Returns + ------- + read_blocks : list of ReadBlock + List of ReadBlock instances created during a call to read + or update. + """ + return self._blocks + + @blocks.setter + def blocks(self, new_blocks): + if not isinstance(new_blocks, ReadBlocks): + new_blocks = ReadBlocks(new_blocks) + self._blocks = new_blocks + # we propagate these blocks to options so that + # options lookups can fallback to the new read blocks + self.options._read_blocks = new_blocks + def read(self, fd, after_magic=False): - self.blocks.set_blocks( - reader.read_blocks(fd, self._memmap, self._lazy_load, self._validate_checksums, after_magic=after_magic) + self.blocks = reader.read_blocks( + fd, self._memmap, self._lazy_load, self._validate_checksums, after_magic=after_magic ) def _load_external(self, uri): @@ -325,7 +431,7 @@ def update(self, new_tree_size, pad_blocks, include_block_index): # we have to be lazy here as any current memmap is invalid new_read_block = reader.ReadBlock(offset + 4, self._write_fd, self._memmap, True, False, header=header) - new_read_blocks.append_block(new_read_block) + new_read_blocks.append(new_read_block) new_index = len(new_read_blocks) - 1 # update all callbacks @@ -334,7 +440,6 @@ def update(self, new_tree_size, pad_blocks, include_block_index): if obj is None: # this object no longer exists so don't both assigning it continue - new_read_blocks.assign_object(obj, new_read_block) # update data callbacks to point to new block cb = self._data_callbacks.lookup_by_object(obj) @@ -343,4 +448,3 @@ def update(self, new_tree_size, pad_blocks, include_block_index): # update read blocks to reflect new state self.blocks = new_read_blocks - self.options._read_blocks = new_read_blocks diff --git a/asdf/_serialization_context.py b/asdf/_serialization_context.py index a31d3c59f..0925d04be 100644 --- a/asdf/_serialization_context.py +++ b/asdf/_serialization_context.py @@ -207,15 +207,13 @@ class _Deserialization(_Operation): def __init__(self, ctx): super().__init__(ctx) self._obj = None - self._blk = None self._cb = None self._keys_to_assign = {} def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: return - if self._blk is not None: - self._blocks.blocks.assign_object(self._obj, self._blk) + if self._cb is not None: self._blocks._data_callbacks.assign_object(self._obj, self._cb) for key, cb in self._keys_to_assign.items(): if cb is None: @@ -227,21 +225,15 @@ def __exit__(self, exc_type, exc_value, traceback): # assign the key to the callback self._blocks._data_callbacks.assign_object(key, cb) - # and the block - blk = self._blocks.blocks[cb._index] - self._blocks.blocks.assign_object(key, blk) - def get_block_data_callback(self, index, key=None): - blk = self._blocks.blocks[index] if key is None: - if blk is self._blk: - # return callback for a previously access block - return self._cb - if self._blk is not None: - # for attempts to access a second block without a key + if self._cb is not None: + # this operation has already accessed a block without using + # a key so check if the same index was accessed + if self._cb._index == index: + return self._cb msg = "Converters accessing >1 block must provide a key for each block" raise OSError(msg) - self._blk = blk self._cb = self._blocks._get_data_callback(index) return self._cb