Skip to content

Commit

Permalink
vcf/reader: Add query iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Apr 28, 2021
1 parent 3ca4b73 commit 302033d
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 1 deletion.
2 changes: 2 additions & 0 deletions noodles-vcf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ edition = "2018"
indexmap = "1.4.0"
nom = "6.1.2"
noodles-bgzf = { path = "../noodles-bgzf" }
noodles-core = { path = "../noodles-core" }
noodles-tabix = { path = "../noodles-tabix" }
percent-encoding = "2.1.0"
89 changes: 88 additions & 1 deletion noodles-vcf/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
//! VCF reader and iterators.
mod query;
mod records;

pub use self::records::Records;
pub use self::{query::Query, records::Records};

use std::io::{self, BufRead, Read, Seek};

use noodles_bgzf as bgzf;
use noodles_core::Region;
use noodles_tabix as tabix;

const LINE_FEED: char = '\n';
const CARRIAGE_RETURN: char = '\r';
Expand Down Expand Up @@ -241,6 +244,64 @@ where
pub fn seek(&mut self, pos: bgzf::VirtualPosition) -> io::Result<bgzf::VirtualPosition> {
self.inner.seek(pos)
}

/// Returns an iterator over records that intersects the given region.
///
/// # Examples
///
/// ```no_run
/// # use std::{fs::File, io};
/// use noodles_core::Region;
/// use noodles_bgzf as bgzf;;
/// use noodles_tabix as tabix;
/// use noodles_vcf as vcf;
///
/// let mut reader = File::open("sample.vcf.gz")
/// .map(bgzf::Reader::new)
/// .map(vcf::Reader::new)?;
///
/// let index = tabix::read("sample.vcf.gz.tbi")?;
/// let region = Region::mapped("sq0", 8, 13);
/// let query = reader.query(&index, &region)?;
///
/// for result in query {
/// let record = result?;
/// println!("{:?}", record);
/// }
/// Ok::<(), io::Error>(())
/// ```
pub fn query(&mut self, index: &tabix::Index, region: &Region) -> io::Result<Query<'_, R>> {
let (i, reference_sequence_name, start, end) = resolve_region(index, region)?;

let index_reference_sequence = index.reference_sequences().get(i).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"could not find reference in index: {} >= {}",
i,
index.reference_sequences().len()
),
)
})?;

let query_bins = index_reference_sequence.query(start, end);

let chunks: Vec<_> = query_bins
.iter()
.flat_map(|bin| bin.chunks())
.cloned()
.collect();

// TODO: optimize chunks

Ok(Query::new(
self,
chunks,
reference_sequence_name,
start,
end,
))
}
}

// Reads all bytes until a line feed ('\n') or EOF is reached.
Expand All @@ -267,6 +328,32 @@ where
}
}

fn resolve_region(index: &tabix::Index, region: &Region) -> io::Result<(usize, String, i32, i32)> {
match region {
Region::Mapped { name, start, end } => {
let i = index
.reference_sequence_names()
.iter()
.position(|n| name == n)
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"region reference sequence does not exist in reference sequences: {:?}",
region
),
)
})?;

Ok((i, name.into(), *start, *end))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"region is not mapped",
)),
}
}

#[cfg(test)]
mod tests {
use std::io::BufReader;
Expand Down
137 changes: 137 additions & 0 deletions noodles-vcf/src/reader/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use std::io::{self, Read, Seek};

use noodles_bgzf as bgzf;
use noodles_tabix::index::reference_sequence::bin::Chunk;

use crate::{record::Chromosome, Record};

use super::Reader;

enum State {
Seek,
Read(bgzf::VirtualPosition),
End,
}

/// An iterator over records of a BAM reader that intersects a given region.
///
/// This is created by calling [`Reader::query`].
pub struct Query<'a, R>
where
R: Read + Seek,
{
reader: &'a mut Reader<bgzf::Reader<R>>,
chunks: Vec<Chunk>,
reference_sequence_name: String,
start: i32,

This comment has been minimized.

Copy link
@brainstorm

brainstorm May 11, 2021

@zaeleus @chris-zen @mmalenic @victorskl, would it make sense to have start/end on u64 instead of i32 here in noodles Query struct? I guess the signedness comes from some edge case from the specs (as in encoding some metada with -1 or similar?). In my opinion start/end should be u64 because genomic positions should be inherently positive and not 32-bit "limited"?

Reason I'm asking this is because I'm trying to get the start/end fields returned from query and putting them back into our htsget-rs Vec<BytesRange>... here's some pseudocode coming from the /variants branch:

          // TODO: Turn the result from vcf_reader.query into Vec<ByteRanges>`
          let ranges: Vec<BytesRange> = vec!();
          let mut br: BytesRange;
          for range in byte_ranges {
            br = BytesRange::new(range.start.try_into().unwrap(), range.end.try_into().unwrap());
            ranges.push(br);
          }

And while getting the i32 -> u64 compiler errors, I thought about this.

Also wondering to which extent we want to expose Query's start/end struct fields:

--- a/noodles-vcf/src/reader/query.rs
+++ b/noodles-vcf/src/reader/query.rs
@@ -21,10 +21,10 @@ where
     R: Read + Seek,
 {
     reader: &'a mut Reader<bgzf::Reader<R>>,
-    chunks: Vec<Chunk>,
-    reference_sequence_name: String,
-    start: i32,
-    end: i32,
+    pub chunks: Vec<Chunk>,
+    pub reference_sequence_name: String,
+    pub start: i32,
+    pub end: i32,
     i: usize,
     state: State,
     line_buf: String,

to accommodate the use case above or whether there are better ways to do this that don't break encapsulation.

This comment has been minimized.

Copy link
@zaeleus

zaeleus May 11, 2021

Author Owner

It is indeed unintuitive to have signed positions, and, as you guessed, that's how they're all defined in the specs:

Format Field name Type/range
SAM 1.6 POS [0, 231-1]
BAM 1.6 pos int32_t
CRAM 3.0 alignment start itf8 (int32)
VCF 4.3 POS Integer (32-bit, signed)
BCF 2.2 POS int32_t

I think most (all?) publicly accessible types or conversions in noodles return what's defined in each spec. It creates a good mapping to the reference specs, but I can see this being confusing to an end user when types don't semantically make sense. I'm not sure if diverging from the spec descriptions is preferable in this case; regardless of type, there will always be a required bounds check, whether it's i32 > 0 or u32 < 231.

Since this is in the context of querying, one interesting limitation of the index formats is that, with the default minimum interval size (~16 Kbp), the highest supported genomic position is actually 229-1, not 231-1!

Also wondering to which extent we want to expose Query's start/end struct fields:

I would recommend copying how the the inputs are built for Query rather than relying on the fields of the iterator.

This comment has been minimized.

Copy link
@jkbonfield

jkbonfield May 12, 2021

Note using unsigned doesn't buy you much when you have fields like template length in BAM which by design is +/- (it's an addition to alignment start/end to get to the other end). So you gain double the range with POS by making it unsigned, but then another part of the spec promptly breaks. :-(

I suspect also many of these POS fields are signed so they can use -1 as an out of bounds value for error or unknown, which isn't so unreasonable.

Really these should have been (signed) int64, but when the formats were designed people only cared about human chromosomes. Internally htslib uses int64 now, so that SAM at least can work on large chromosomes, even if the binary formats cannot. I don't know if we fixed VCF though. That may be a consideration for noodles too perhaps.

This comment has been minimized.

Copy link
@brainstorm

brainstorm May 18, 2021

@jkbonfield , thanks for chiming in, always good to have you watching :)... also totally forgot our discussion on samtools/hts-specs#460, thanks for the refresher ;)

This comment has been minimized.

Copy link
@brainstorm

brainstorm May 21, 2021

I would recommend copying how the the inputs are built for Query rather than relying on the fields of the iterator.

@zaeleus Do you mean using resolve_region() instead of query()? I'm not sure I grok what you mean... in my mind I would just go through the iterator returned by query, fetching Result<Query, E>'s start & end for each element and then building a Vec<ByteRanges> and call build_response()?

This comment has been minimized.

Copy link
@zaeleus

zaeleus May 21, 2021

Author Owner

Perhaps you're confusing the start and end fields in Query? Those are the start and end positions in the region, not a byte range in the file. To get the byte positions, you're interested in querying the index and iterating through all the chunks. This is why I think you don't actually need to call vcf::Reader::query and build a Query iterator.

end: i32,
i: usize,
state: State,
line_buf: String,
}

impl<'a, R> Query<'a, R>
where
R: Read + Seek,
{
pub(crate) fn new(
reader: &'a mut Reader<bgzf::Reader<R>>,
chunks: Vec<Chunk>,
reference_sequence_name: String,
start: i32,
end: i32,
) -> Self {
Self {
reader,
chunks,
reference_sequence_name,
start,
end,
i: 0,
state: State::Seek,
line_buf: String::new(),
}
}

fn next_chunk(&mut self) -> io::Result<Option<bgzf::VirtualPosition>> {
if self.i >= self.chunks.len() {
return Ok(None);
}

let chunk = self.chunks[self.i];
self.reader.seek(chunk.start())?;

self.i += 1;

Ok(Some(chunk.end()))
}

fn read_and_parse_record(&mut self) -> Option<io::Result<Record>> {
self.line_buf.clear();

match self.reader.read_record(&mut self.line_buf) {
Ok(0) => None,
Ok(_) => Some(
self.line_buf
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)),
),
Err(e) => Some(Err(e)),
}
}
}

impl<'a, R> Iterator for Query<'a, R>
where
R: Read + Seek,
{
type Item = io::Result<Record>;

fn next(&mut self) -> Option<Self::Item> {
loop {
match self.state {
State::Seek => {
self.state = match self.next_chunk() {
Ok(Some(chunk_end)) => State::Read(chunk_end),
Ok(None) => State::End,
Err(e) => return Some(Err(e)),
}
}
State::Read(chunk_end) => match self.read_and_parse_record() {
Some(result) => {
if self.reader.virtual_position() >= chunk_end {
self.state = State::Seek;
}

match result {
Ok(record) => {
let reference_sequence_name = match record.chromosome() {
Chromosome::Name(n) => n.into(),
Chromosome::Symbol(n) => n.to_string(),
};

let start = i32::from(record.position());
let end = start + 1; // TODO: records with END

if reference_sequence_name == self.reference_sequence_name
&& in_interval(start, end, self.start, self.end)
{
return Some(Ok(record));
}
}
Err(e) => return Some(Err(e)),
}
}
None => {
self.state = State::Seek;
}
},
State::End => return None,
}
}
}
}

fn in_interval(a_start: i32, a_end: i32, b_start: i32, b_end: i32) -> bool {
a_start <= b_end && b_start <= a_end
}

0 comments on commit 302033d

Please sign in to comment.