Skip to content

Commit

Permalink
simplify ReadBlocks to a subclass of UserList
Browse files Browse the repository at this point in the history
  • Loading branch information
braingram committed May 15, 2023
1 parent b32a8be commit 2f32403
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 34 deletions.
144 changes: 124 additions & 20 deletions asdf/_block/manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import contextlib
import copy

Expand All @@ -10,34 +11,61 @@
from .options import Options


class ReadBlocks(store.LinearStore):
"""
{obj: block_index} : where obj is NDArrayType or other high level object
[block_0, block_1, ...]
"""

def set_blocks(self, blocks):
self._items = blocks

def append_block(self, block):
self._items.append(block)
class ReadBlocks(collections.UserList):
# workaround inability to weakref a list
pass


class OptionsStore(store.Store):
"""
{array_base: options}
read_blocks (instance of ReadBlocks)
A Store of Options that can be accessed by Key
(see ``asdf._block.store.Store``).
"""

def __init__(self, read_blocks=None):
super().__init__()
self._read_blocks = read_blocks

def has_options(self, array):
"""
Check of Options have been defined for this array
without falling back to generating Options from
a ReadBlock.
Parameters
----------
array : ndarray
The base of this array (see `asdf.util.get_array_base`) will
be used to lookup any Options in the Store.
Returns
-------
has_options : bool
True if Options were previously defined for this array.
"""
base = util.get_array_base(array)
return self.lookup_by_object(base) is not None

def get_options_from_block(self, array):
"""
Get Options for some array using only settings read from a
corresponding ReadBlock (one that shares the same base array).
Any Options defined using previous calls to set_options will
be ignored (use ``get_options`` if you would like these previously
set options to be considered).
Parameters
----------
array : ndarray
The base of this array (see `asdf.util.get_array_base`) will
be used to lookup a corresponding ReadBlock.
Returns
-------
options : Options or None
Options initialized from settings read from a ReadBlock
or None if no corresponding block was found.
"""
base = util.get_array_base(array)
# look up by block with matching _data
for block in self._read_blocks:
Expand All @@ -52,6 +80,27 @@ def get_options_from_block(self, array):
return None

def get_options(self, array):
"""
Get Options for some array using either previously defined
options (as set by ``set_options``) or settings read from a
corresponding ReadBlock (one that shares the same base array).
Note that if no options are found in the Store and options
are found from a ReadBlock the resulting Options will be added
to the Store.
Parameters
----------
array : ndarray
The base of this array (see `asdf.util.get_array_base`) will
be used to lookup any Options in the Store.
Returns
-------
options : Options or None
Options read from the Store or ReadBlocks or None if
no options were found.
"""
base = util.get_array_base(array)
options = self.lookup_by_object(base)
if options is None:
Expand All @@ -64,6 +113,23 @@ def get_options(self, array):
return options

def set_options(self, array, options):
"""
Set Options for an array.
Parameters
----------
array : ndarray
The base of this array (see `asdf.util.get_array_base`) will
be used to add options to the Store.
options : Options
The Options to add to the Store for this array.
Raises
------
ValueError
If more than one block is set as a streamed block.
"""
if options.storage_type == "streamed":
for oid, by_key in self._by_id.items():
for key, opt in by_key.items():
Expand All @@ -78,6 +144,15 @@ def set_options(self, array, options):
self.assign_object(base, options)

def get_output_compressions(self):
"""
Get all output compression types used for this Store of
Options.
Returns
-------
compressions : list of bytes
List of 4 byte compression labels used for this OptionsStore.
"""
compressions = set()
cfg = config.get_config()
if cfg.all_array_compression == "input":
Expand All @@ -96,26 +171,57 @@ def get_output_compressions(self):


class Manager:
"""
Manager for reading, writing and storing options for ASDF blocks.
"""

def __init__(self, read_blocks=None, uri=None, lazy_load=False, memmap=False, validate_checksums=False):
if read_blocks is None:
read_blocks = ReadBlocks([])
self.options = OptionsStore(read_blocks)
self.blocks = read_blocks

self._blocks = read_blocks
self._external_block_cache = external.ExternalBlockCache()
self._data_callbacks = store.Store()

self._write_blocks = store.LinearStore()
self._external_write_blocks = []
self._streamed_write_block = None
self._streamed_obj_keys = set()
self._write_fd = None

self._uri = uri
self._external_block_cache = external.ExternalBlockCache()

# general block settings
self._lazy_load = lazy_load
self._memmap = memmap
self._validate_checksums = validate_checksums

@property
def blocks(self):
"""
Get any ReadBlocks that were read from an ASDF file
Returns
-------
read_blocks : list of ReadBlock
List of ReadBlock instances created during a call to read
or update.
"""
return self._blocks

@blocks.setter
def blocks(self, new_blocks):
if not isinstance(new_blocks, ReadBlocks):
new_blocks = ReadBlocks(new_blocks)
self._blocks = new_blocks
# we propagate these blocks to options so that
# options lookups can fallback to the new read blocks
self.options._read_blocks = new_blocks

def read(self, fd, after_magic=False):
self.blocks.set_blocks(
reader.read_blocks(fd, self._memmap, self._lazy_load, self._validate_checksums, after_magic=after_magic)
self.blocks = reader.read_blocks(
fd, self._memmap, self._lazy_load, self._validate_checksums, after_magic=after_magic
)

def _load_external(self, uri):
Expand Down Expand Up @@ -325,7 +431,7 @@ def update(self, new_tree_size, pad_blocks, include_block_index):

# we have to be lazy here as any current memmap is invalid
new_read_block = reader.ReadBlock(offset + 4, self._write_fd, self._memmap, True, False, header=header)
new_read_blocks.append_block(new_read_block)
new_read_blocks.append(new_read_block)
new_index = len(new_read_blocks) - 1

# update all callbacks
Expand All @@ -334,7 +440,6 @@ def update(self, new_tree_size, pad_blocks, include_block_index):
if obj is None:
# this object no longer exists so don't both assigning it
continue
new_read_blocks.assign_object(obj, new_read_block)

# update data callbacks to point to new block
cb = self._data_callbacks.lookup_by_object(obj)
Expand All @@ -343,4 +448,3 @@ def update(self, new_tree_size, pad_blocks, include_block_index):

# update read blocks to reflect new state
self.blocks = new_read_blocks
self.options._read_blocks = new_read_blocks
20 changes: 6 additions & 14 deletions asdf/_serialization_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,13 @@ class _Deserialization(_Operation):
def __init__(self, ctx):
super().__init__(ctx)
self._obj = None
self._blk = None
self._cb = None
self._keys_to_assign = {}

def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
return
if self._blk is not None:
self._blocks.blocks.assign_object(self._obj, self._blk)
if self._cb is not None:
self._blocks._data_callbacks.assign_object(self._obj, self._cb)
for key, cb in self._keys_to_assign.items():
if cb is None:
Expand All @@ -227,21 +225,15 @@ def __exit__(self, exc_type, exc_value, traceback):
# assign the key to the callback
self._blocks._data_callbacks.assign_object(key, cb)

# and the block
blk = self._blocks.blocks[cb._index]
self._blocks.blocks.assign_object(key, blk)

def get_block_data_callback(self, index, key=None):
blk = self._blocks.blocks[index]
if key is None:
if blk is self._blk:
# return callback for a previously access block
return self._cb
if self._blk is not None:
# for attempts to access a second block without a key
if self._cb is not None:
# this operation has already accessed a block without using
# a key so check if the same index was accessed
if self._cb._index == index:
return self._cb
msg = "Converters accessing >1 block must provide a key for each block"
raise OSError(msg)
self._blk = blk
self._cb = self._blocks._get_data_callback(index)
return self._cb

Expand Down

0 comments on commit 2f32403

Please sign in to comment.