Skip to content

Commit

Permalink
Merge pull request #141 from rhpvorderman/chunk_ubam
Browse files Browse the repository at this point in the history
Allow chunking uBAM
  • Loading branch information
rhpvorderman authored Nov 4, 2024
2 parents 059a66e + 497310f commit ec56277
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 51 deletions.
43 changes: 43 additions & 0 deletions src/dnaio/_bam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import struct
from io import BufferedIOBase


def read_bam_header(fileobj: BufferedIOBase) -> bytes:
magic = fileobj.read(4)
if not isinstance(magic, bytes):
raise TypeError(
f"fileobj {fileobj} (type: {type(fileobj)}), was not opened in binary mode."
)
if len(magic) < 4:
raise EOFError("Truncated BAM file")
if magic[:4] != b"BAM\1":
raise ValueError(
f"fileobj: {fileobj}, is not a BAM file. No BAM file signature, instead "
f"found {magic!r}"
)
return read_bam_header_after_magic(fileobj)


def read_bam_header_after_magic(fileobj: BufferedIOBase) -> bytes:
header_size = fileobj.read(4)
if len(header_size) < 4:
raise EOFError("Truncated BAM file")
(l_text,) = struct.unpack("<I", header_size)
header = fileobj.read(l_text)
if len(header) < l_text:
raise EOFError("Truncated BAM file")
n_ref_obj = fileobj.read(4)
if len(n_ref_obj) < 4:
raise EOFError("Truncated BAM file")
(n_ref,) = struct.unpack("<I", n_ref_obj)
for i in range(n_ref):
l_name_obj = fileobj.read(4)
if len(l_name_obj) < 4:
raise EOFError("Truncated BAM file")
(l_name,) = struct.unpack("<I", l_name_obj)
reference_chunk_size = l_name + 4 # Include name and uint32_t of size
reference_chunk = fileobj.read(reference_chunk_size)
if len(reference_chunk) < reference_chunk_size:
raise EOFError("Truncated BAM file")
# Fileobj is now skipped ahead and at the start of the BAM records
return header
4 changes: 3 additions & 1 deletion src/dnaio/_core.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
from typing import (
Generic,
Optional,
Expand Down Expand Up @@ -36,6 +37,7 @@ class SequenceRecord:
def paired_fastq_heads(
buf1: ByteString, buf2: ByteString, end1: int, end2: int
) -> Tuple[int, int]: ...
def bam_head(buf: bytes, end: Optional[int] = None) -> int: ...
def records_are_mates(
__first_record: SequenceRecord,
__second_record: SequenceRecord,
Expand All @@ -54,7 +56,7 @@ class FastqIter(Generic[T]):
def number_of_records(self) -> int: ...

class BamIter:
def __init__(self, file: BinaryIO, buffer_size: int): ...
def __init__(self, file: BinaryIO, read_in_size: int, with_header: bool = True): ...
def __iter__(self) -> Iterator[SequenceRecord]: ...
def __next__(self) -> SequenceRecord: ...
@property
Expand Down
86 changes: 51 additions & 35 deletions src/dnaio/_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING, PyBytes
from cpython.mem cimport PyMem_Free, PyMem_Malloc, PyMem_Realloc
from cpython.unicode cimport PyUnicode_CheckExact, PyUnicode_GET_LENGTH, PyUnicode_DecodeASCII
from cpython.object cimport Py_TYPE, PyTypeObject
from cpython.pyport cimport PY_SSIZE_T_MAX
from cpython.ref cimport PyObject
from cpython.tuple cimport PyTuple_GET_ITEM
from libc.string cimport memcmp, memcpy, memchr, strcspn, strspn, memmove
from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t

cimport cython

from ._bam import read_bam_header

cdef extern from "Python.h":
void *PyUnicode_DATA(object o)
bint PyUnicode_IS_COMPACT_ASCII(object o)
Expand Down Expand Up @@ -431,6 +434,29 @@ def paired_fastq_heads(buf1, buf2, Py_ssize_t end1, Py_ssize_t end2):
return record_start1 - data1, record_start2 - data2


def bam_head(buf, end = None):
"""Return the end of the last complete BAM record in the buf."""
cdef Py_ssize_t c_end = PY_SSIZE_T_MAX
if end is not None:
c_end = end
cdef Py_buffer buffer
PyObject_GetBuffer(buf, &buffer, PyBUF_SIMPLE)
cdef:
uint8_t *buffer_start = <uint8_t *>buffer.buf
uint8_t *record_start = buffer_start
uint8_t *buffer_end = buffer_start + min(c_end, buffer.len)
uint32_t block_size
size_t record_size

while (record_start + 4) < buffer_end:
record_size = (<uint32_t *>record_start)[0] + 4
if (record_start + record_size) > buffer_end:
break
record_start += record_size
cdef Py_ssize_t head = <Py_ssize_t>(record_start - buffer_start)
PyBuffer_Release(&buffer)
return head

cdef class FastqIter:
"""
Parse a FASTQ file and yield SequenceRecord objects
Expand Down Expand Up @@ -688,6 +714,22 @@ cdef struct BamRecordHeader:
int32_t tlen

cdef class BamIter:
"""
Parse a uBAM file and yield SequenceRecord objects
Arguments:
file: a file-like object, opened in binary mode (it must have a readinto
method)
buffer_size: size of the initial buffer. This is automatically grown
if a BAM record is encountered that does not fit.
with_header: The BAM file has a header that needs parsing. Default is True.
False can be used in circumstances where chunks of BAM records are read.
Yields:
SequenceRecord Objects
"""
cdef:
uint8_t *record_start
uint8_t *buffer_end
Expand All @@ -701,42 +743,16 @@ cdef class BamIter:
def __dealloc__(self):
PyMem_Free(self.read_in_buffer)

def __cinit__(self, fileobj, read_in_size = 48 * 1024):
def __cinit__(self, fileobj, read_in_size = 48 * 1024, with_header = True):
if read_in_size < 4:
raise ValueError(f"read_in_size must be at least 4 got "
f"{read_in_size}")

# Skip ahead and save the BAM header for later inspection
magic_and_header_size = fileobj.read(8)
if not isinstance(magic_and_header_size, bytes):
raise TypeError(f"fileobj {fileobj} is not a binary IO type, "
f"got {type(fileobj)}")
if len(magic_and_header_size) < 8:
raise EOFError("Truncated BAM file")
if magic_and_header_size[:4] != b"BAM\1":
raise ValueError(
f"fileobj: {fileobj}, is not a BAM file. No BAM magic, instead "
f"found {magic_and_header_size[:4]}")
l_text = int.from_bytes(magic_and_header_size[4:], "little", signed=False)
header = fileobj.read(l_text)
if len(header) < l_text:
raise EOFError("Truncated BAM file")
n_ref_obj = fileobj.read(4)
if len(n_ref_obj) < 4:
raise EOFError("Truncated BAM file")
n_ref = int.from_bytes(n_ref_obj, "little", signed=False)
for i in range(n_ref):
l_name_obj = fileobj.read(4)
if len(l_name_obj) < 4:
raise EOFError("Truncated BAM file")
l_name = int.from_bytes(l_name_obj, "little", signed=False)
reference_chunk_size = l_name + 4 # Include name and uint32_t of size
reference_chunk = fileobj.read(reference_chunk_size)
if len(reference_chunk) < reference_chunk_size:
raise EOFError("Truncated BAM file")
# Fileobj is now skipped ahead and at the start of the BAM records

self.header = header
if with_header:
# Skip ahead and save the BAM header for later inspection
self.header = read_bam_header(fileobj)
else:
self.header = b""
self.read_in_size = read_in_size
self.file = fileobj
self.read_in_buffer = NULL
Expand All @@ -746,9 +762,9 @@ cdef class BamIter:

def __iter__(self):
return self

cdef _read_into_buffer(self):
cdef size_t read_in_size
cdef size_t read_in_size
cdef size_t leftover_size = self.buffer_end - self.record_start
cdef uint32_t block_size
memmove(self.read_in_buffer, self.record_start, leftover_size)
Expand All @@ -769,7 +785,7 @@ cdef class BamIter:
raise StopIteration()
elif new_bytes_size == 0:
raise EOFError("Incomplete record at the end of file")
cdef uint8_t *tmp
cdef uint8_t *tmp
if new_buffer_size > self.read_in_buffer_size:
tmp = <uint8_t *>PyMem_Realloc(self.read_in_buffer, new_buffer_size)
if tmp == NULL:
Expand Down
30 changes: 19 additions & 11 deletions src/dnaio/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
or subprocess and be parsed and processed there.
"""

from io import RawIOBase
from io import BufferedIOBase
from typing import Optional, Iterator, Tuple

from ._bam import read_bam_header_after_magic
from ._core import paired_fastq_heads as _paired_fastq_heads
from ._core import bam_head as _bam_head
from .exceptions import FileFormatError, FastaFormatError, UnknownFileFormat


Expand Down Expand Up @@ -79,7 +81,9 @@ def _fastq_head(buf: bytes, end: Optional[int] = None) -> int:
return right + 1 # type: ignore


def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memoryview]:
def read_chunks(
f: BufferedIOBase, buffer_size: int = 4 * 1024**2
) -> Iterator[memoryview]:
"""
Read chunks of complete FASTA or FASTQ records from a file.
If the format is detected to be FASTQ, all chunks except possibly the last contain
Expand All @@ -104,19 +108,23 @@ def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memory

# Read one byte to determine file format.
# If there is a comment char, we assume FASTA!
start = f.readinto(memoryview(buf)[0:1])
start = f.readinto(memoryview(buf)[0:4])
if start == 0:
# Empty file
return
assert start == 1
assert start == 4
if buf[0:1] == b"@":
head = _fastq_head
elif buf[0:1] == b"#" or buf[0:1] == b">":
head = _fasta_head
elif buf[0:4] == b"BAM\x01":
head = _bam_head
_ = read_bam_header_after_magic(f)
start = 0 # Skip header and start at the records.
else:
raise UnknownFileFormat(
f"Cannnot determine input file format: First character expected to be '>' or '@', "
f"but found {repr(chr(buf[0]))}"
f"Cannnot determine input file format: First characters expected "
f"to be '>'. '@', or 'BAM\1', but found {repr(buf[0:4])}"
)

# Layout of buf
Expand All @@ -135,7 +143,7 @@ def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memory
while True:
if start == len(buf):
raise OverflowError("FASTA/FASTQ record does not fit into buffer")
bufend = f.readinto(memoryview(buf)[start:]) + start # type: ignore
bufend = f.readinto(memoryview(buf)[start:]) + start
if start == bufend:
# End of file
break
Expand All @@ -152,8 +160,8 @@ def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memory


def read_paired_chunks(
f: RawIOBase,
f2: RawIOBase,
f: BufferedIOBase,
f2: BufferedIOBase,
buffer_size: int = 4 * 1024**2,
) -> Iterator[Tuple[memoryview, memoryview]]:
"""
Expand Down Expand Up @@ -222,8 +230,8 @@ def read_paired_chunks(
raise ValueError(
f"FASTA/FASTQ records do not fit into buffer of size {buffer_size}"
)
bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1 # type: ignore
bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2 # type: ignore
bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1
bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2
if start1 == bufend1 and start2 == bufend2:
break

Expand Down
5 changes: 4 additions & 1 deletion src/dnaio/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,16 @@ def __init__(
buffer_size: int = 128 * 1024, # Buffer size used by cat, pigz etc.
opener=xopen,
_close_file: Optional[bool] = None,
with_header: bool = True,
):
super().__init__(file, opener=opener, _close_file=_close_file)
self.sequence_class = sequence_class
self.delivers_qualities = True
self.buffer_size = buffer_size
try:
self._iter: Iterator[SequenceRecord] = BamIter(self._file, self.buffer_size)
self._iter: Iterator[SequenceRecord] = BamIter(
self._file, read_in_size=self.buffer_size, with_header=with_header
)
except Exception:
self.close()
raise
Expand Down
7 changes: 6 additions & 1 deletion src/dnaio/singleend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .writers import FastaWriter, FastqWriter


def _open_single(
def _open_single( # noqa: C901
file_or_path: Union[str, os.PathLike, BinaryIO],
opener,
*,
Expand Down Expand Up @@ -64,6 +64,11 @@ def _open_single(
return BamReader(file, _close_file=close_file)
# This should not be reached
raise NotImplementedError("Only reading is supported for BAM files")
elif fileformat == "bam_no_header":
if "r" in mode:
return BamReader(file, _close_file=close_file, with_header=False)
# This should not be reached
raise NotImplementedError("Only reading is supported for headerless BAM files")
if close_file:
file.close()
raise UnknownFileFormat(
Expand Down
Binary file added tests/data/missing_header_no_bgzip_raw_bam_bytes
Binary file not shown.
22 changes: 22 additions & 0 deletions tests/test_chunks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import gzip
import struct
import textwrap

from pytest import raises
Expand All @@ -7,6 +9,7 @@
from dnaio import UnknownFileFormat, FileFormatError
from dnaio._core import paired_fastq_heads
from dnaio.chunks import (
_bam_head,
_fastq_head,
_fasta_head,
read_chunks,
Expand Down Expand Up @@ -41,6 +44,13 @@ def test_fasta_head_with_comment():
assert _fasta_head(b"#\n>3\n5\n>") == 7


def test_bam_head():
assert _bam_head(struct.pack("<Ix", 32)) == 0
assert _bam_head(struct.pack("<I32x", 32)) == 36
assert _bam_head(struct.pack("<I32xI32x", 32, 33)) == 36
assert _bam_head(struct.pack("<I32xI33x", 32, 33)) == 73


def test_paired_fasta_heads():
def pheads(buf1, buf2):
return _paired_fasta_heads(buf1, buf2, len(buf1), len(buf2))
Expand Down Expand Up @@ -179,3 +189,15 @@ def test_read_chunks_empty():
def test_invalid_file_format():
with raises(UnknownFileFormat):
list(read_chunks(BytesIO(b"invalid format")))


def test_read_chunks_bam():
with gzip.open("tests/data/simple.unaligned.bam", "rb") as f:
for i, chunk in enumerate(read_chunks(f, 70)):
if i == 0:
assert b"Myheader" in chunk.tobytes()
if i == 1:
assert b"AnotherHeader" in chunk.tobytes()
if i == 2:
assert b"YetAnotherHeader" in chunk.tobytes()
assert b"RGZA\x00" in chunk.tobytes()
3 changes: 1 addition & 2 deletions tests/test_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from dnaio.readers import BinaryFileReader
from dnaio._core import bytes_ascii_check


TEST_DATA = Path(__file__).parent / "data"
SIMPLE_FASTQ = str(TEST_DATA / "simple.fastq")
# files tests/data/simple.fast{q,a}
Expand Down Expand Up @@ -786,7 +785,7 @@ def test_bam_parser_not_binary_error(self):
)
with pytest.raises(TypeError) as error:
BamReader(file)
error.match("binary IO")
error.match("binary mode")

@pytest.mark.parametrize("buffersize", [4, 8, 10, 20, 40])
def test_small_buffersize(self, buffersize):
Expand Down
Loading

0 comments on commit ec56277

Please sign in to comment.