Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update private block api usage #64

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 72 additions & 40 deletions sphinx_asdf/asdf2rst.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,50 @@ def run(self):
return []


def _block_to_string(block):
if hasattr(block, "header"):
header = block.header
data = block.data
else:
data = block.data
header = {
"flags": block._flags,
"allocated_size": block._allocated,
"used_size": block._size,
"data_size": block._data_size,
"compression": block.input_compression,
}
if header["flags"] & BLOCK_FLAG_STREAMED:
header["allocated"] = header["used_size"] = header["data_size"] = 0

# convert data to hex representation
data = codecs.encode(data.tobytes(), "hex")
if len(data) > 40:
data = data[:40] + b"..."

lines = []

# convert header to string, add to lines
human_flags = []
for key, val in FLAGS.items():
if header["flags"] & key:
human_flags.append(val)
if len(human_flags):
lines.append(" flags: {}".format(" | ".join(human_flags)))
if header["compression"] and header["compression"] != b"\0\0\0\0":
lines.append(f" compression: {header['compression']}")
lines.append(f" allocated_size: {header['allocated_size']}")
lines.append(f" used_size: {header['used_size']}")
lines.append(f" data_size: {header['data_size']}")

# add data as string to lines
lines.append(f" data: {data}")

# add lines to code block
code = "\n".join(lines)
return code


class AsdfDirective(Directive):
required_arguments = 1
optional_arguments = 1
Expand Down Expand Up @@ -91,52 +135,40 @@ def run(self):

if show_bocks:
with asdf.open(filename, **kwargs) as ff:
for i, block in enumerate(ff._blocks.internal_blocks):
data = codecs.encode(block.data.tobytes(), "hex")
if len(data) > 40:
data = data[:40] + b"..."
allocated = block._allocated
size = block._size
data_size = block._data_size
flags = block._flags

if flags & BLOCK_FLAG_STREAMED:
allocated = size = data_size = 0

lines = []
lines.append(f"BLOCK {i}:")

human_flags = []
for key, val in FLAGS.items():
if flags & key:
human_flags.append(val)
if len(human_flags):
lines.append(" flags: {}".format(" | ".join(human_flags)))
if block.input_compression:
lines.append(f" compression: {block.input_compression}")
lines.append(f" allocated_size: {allocated}")
lines.append(f" used_size: {size}")
lines.append(f" data_size: {data_size}")
lines.append(f" data: {data}")

code = "\n".join(lines)
code += "\n"
if hasattr(ff._blocks, "internal_blocks"):
blocks = list(ff._blocks.internal_blocks)
else:
blocks = ff._blocks.blocks

for i, block in enumerate(blocks):
code = "\n".join([f"BLOCK {i}:", _block_to_string(block), ""])
literal = nodes.literal_block(code, code)
literal["language"] = "yaml"
set_source_info(self, literal)
parts.append(literal)

internal_blocks = list(ff._blocks.internal_blocks)
if len(internal_blocks) and internal_blocks[-1].array_storage != "streamed":
buff = io.BytesIO()
ff._blocks.write_block_index(buff, ff)
block_index = buff.getvalue().decode("utf-8")
literal = nodes.literal_block(block_index, block_index)
literal["language"] = "yaml"
set_source_info(self, literal)
parts.append(literal)

# re-write out the block index
if len(blocks):
if hasattr(blocks[-1], "header"):
streamed = blocks[-1].header["flags"] & BLOCK_FLAG_STREAMED
else:
streamed = blocks[-1].array_storage == "streamed"
if not streamed:
# write out the block index
if hasattr(blocks[0], "header"):
block_index_offset = asdf._block.io.find_block_index(ff._fd)
buff = io.BytesIO()
ff._fd.seek(block_index_offset)
buff = io.BytesIO(ff._fd.read())
else:
buff = io.BytesIO()
ff._blocks.write_block_index(buff, ff)

block_index = buff.getvalue().decode("utf-8")
literal = nodes.literal_block(block_index, block_index)
literal["language"] = "yaml"
set_source_info(self, literal)
parts.append(literal)
finally:
os.chdir(cwd)

Expand Down