Skip to content

Commit

Permalink
feat: Add 'skip_lines' for CSV (#20301)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 15, 2024
1 parent 0c2eb01 commit 2d65cc9
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 36 deletions.
16 changes: 15 additions & 1 deletion crates/polars-io/src/csv/read/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub struct CsvReadOptions {
pub parse_options: Arc<CsvParseOptions>,
pub has_header: bool,
pub chunk_size: usize,
/// Skip rows according to the CSV spec.
pub skip_rows: usize,
/// Skip lines according to newline char (e.g. escaping will be ignored)
pub skip_lines: usize,
pub skip_rows_after_header: usize,
pub infer_schema_length: Option<usize>,
pub raise_if_empty: bool,
Expand Down Expand Up @@ -76,6 +79,7 @@ impl Default for CsvReadOptions {
has_header: true,
chunk_size: 1 << 18,
skip_rows: 0,
skip_lines: 0,
skip_rows_after_header: 0,
infer_schema_length: Some(100),
raise_if_empty: true,
Expand Down Expand Up @@ -197,12 +201,22 @@ impl CsvReadOptions {
self
}

/// Number of rows to skip before the header row.
/// Start reading after ``skip_rows`` rows. The header will be parsed at this
/// offset. Note that we respect CSV escaping/comments when skipping rows.
/// If you want to skip by newline char only, use `skip_lines`.
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
self
}

/// Start reading after `skip_lines` lines. The header will be parsed at this
/// offset. Note that CSV escaping will not be respected when skipping lines.
/// If you want to skip valid CSV rows, use ``skip_rows``.
pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
self.skip_lines = skip_lines;
self
}

/// Number of rows to skip after the header row.
pub fn with_skip_rows_after_header(mut self, skip_rows_after_header: usize) -> Self {
self.skip_rows_after_header = skip_rows_after_header;
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usi
Some(pos)
}

pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
for _ in 0..skip {
if let Some(pos) = next_line_position_naive(input, eol_char) {
input = &input[pos..];
} else {
return input;
}
}
input
}

/// Find the nearest next line position that is not embedded in a String field.
pub(super) fn next_line_position(
mut input: &[u8],
Expand Down
30 changes: 21 additions & 9 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use rayon::prelude::*;
use super::buffer::init_buffers;
use super::options::{CommentPrefix, CsvEncoding, NullValues, NullValuesCompiled};
use super::parser::{
is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_this_line, CountLines,
SplitLines,
is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_lines_naive, skip_this_line,
CountLines, SplitLines,
};
use super::reader::prepare_csv_schema;
use super::schema_inference::{check_decimal_comma, infer_file_schema};
Expand Down Expand Up @@ -105,6 +105,7 @@ pub(crate) struct CoreReader<'a> {
/// Current line number, used in error reporting
current_line: usize,
ignore_errors: bool,
skip_lines: usize,
skip_rows_before_header: usize,
// after the header, we need to take embedded lines into account
skip_rows_after_header: usize,
Expand Down Expand Up @@ -144,6 +145,7 @@ impl<'a> CoreReader<'a> {
reader_bytes: ReaderBytes<'a>,
n_rows: Option<usize>,
skip_rows: usize,
skip_lines: usize,
mut projection: Option<Vec<usize>>,
max_records: Option<usize>,
separator: Option<u8>,
Expand Down Expand Up @@ -207,6 +209,7 @@ impl<'a> CoreReader<'a> {
has_header,
schema_overwrite.as_deref(),
skip_rows,
skip_lines,
skip_rows_after_header,
comment_prefix.as_ref(),
quote_char,
Expand Down Expand Up @@ -247,6 +250,7 @@ impl<'a> CoreReader<'a> {
projection,
current_line: usize::from(has_header),
ignore_errors,
skip_lines,
skip_rows_before_header: skip_rows,
skip_rows_after_header,
n_rows,
Expand Down Expand Up @@ -280,6 +284,7 @@ impl<'a> CoreReader<'a> {
quote_char,
eol_char,
self.schema.len(),
self.skip_lines,
self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_prefix.as_ref(),
Expand Down Expand Up @@ -608,6 +613,7 @@ pub fn find_starting_point(
quote_char: Option<u8>,
eol_char: u8,
schema_len: usize,
skip_lines: usize,
skip_rows_before_header: usize,
skip_rows_after_header: usize,
comment_prefix: Option<&CommentPrefix>,
Expand All @@ -616,14 +622,20 @@ pub fn find_starting_point(
let full_len = bytes.len();
let starting_point_offset = bytes.as_ptr() as usize;

// Skip utf8 byte-order-mark (BOM)
bytes = skip_bom(bytes);
bytes = if skip_lines > 0 {
polars_ensure!(skip_rows_before_header == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
skip_lines_naive(bytes, eol_char, skip_lines)
} else {
// Skip utf8 byte-order-mark (BOM)
bytes = skip_bom(bytes);

// \n\n can be a empty string row of a single column
// in other cases we skip it.
if schema_len > 1 {
bytes = skip_line_ending(bytes, eol_char)
}
// \n\n can be a empty string row of a single column
// in other cases we skip it.
if schema_len > 1 {
bytes = skip_line_ending(bytes, eol_char)
}
bytes
};

// skip 'n' leading rows
if skip_rows_before_header > 0 {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/csv/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl<R: MmapBytesReader> CsvReader<R> {
reader_bytes,
self.options.n_rows,
self.options.skip_rows,
self.options.skip_lines,
self.options.projection.clone().map(|x| x.as_ref().clone()),
self.options.infer_schema_length,
Some(parse_options.separator),
Expand Down
67 changes: 47 additions & 20 deletions crates/polars-io/src/csv/read/schema_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::options::{CommentPrefix, CsvEncoding, NullValues};
use super::parser::{is_comment_line, skip_bom, skip_line_ending, SplitLines};
use super::splitfields::SplitFields;
use super::CsvReadOptions;
use crate::csv::read::parser::skip_lines_naive;
use crate::mmap::ReaderBytes;
use crate::utils::{BOOLEAN_RE, FLOAT_RE, FLOAT_RE_DECIMAL, INTEGER_RE};

Expand All @@ -37,6 +38,7 @@ impl SchemaInferenceResult {
let schema_overwrite_arc = options.schema_overwrite.clone();
let schema_overwrite = schema_overwrite_arc.as_ref().map(|x| x.as_ref());
let skip_rows = options.skip_rows;
let skip_lines = options.skip_lines;
let skip_rows_after_header = options.skip_rows_after_header;
let comment_prefix = parse_options.comment_prefix.as_ref();
let quote_char = parse_options.quote_char;
Expand All @@ -56,6 +58,7 @@ impl SchemaInferenceResult {
has_header,
schema_overwrite,
skip_rows,
skip_lines,
skip_rows_after_header,
comment_prefix,
quote_char,
Expand Down Expand Up @@ -527,9 +530,8 @@ pub fn infer_file_schema(
max_read_rows: Option<usize>,
has_header: bool,
schema_overwrite: Option<&Schema>,
// we take &mut because we maybe need to skip more rows dependent
// on the schema inference
skip_rows: usize,
skip_lines: usize,
skip_rows_after_header: usize,
comment_prefix: Option<&CommentPrefix>,
quote_char: Option<u8>,
Expand All @@ -541,22 +543,47 @@ pub fn infer_file_schema(
decimal_comma: bool,
) -> PolarsResult<(Schema, usize, usize)> {
check_decimal_comma(decimal_comma, separator)?;
infer_file_schema_inner(
reader_bytes,
separator,
max_read_rows,
has_header,
schema_overwrite,
skip_rows,
skip_rows_after_header,
comment_prefix,
quote_char,
eol_char,
null_values,
try_parse_dates,
0,
raise_if_empty,
n_threads,
decimal_comma,
)

if skip_lines > 0 {
polars_ensure!(skip_rows == 0, InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set");
let bytes = skip_lines_naive(reader_bytes, eol_char, skip_lines);
let reader_bytes = ReaderBytes::Borrowed(bytes);
infer_file_schema_inner(
&reader_bytes,
separator,
max_read_rows,
has_header,
schema_overwrite,
skip_rows,
skip_rows_after_header,
comment_prefix,
quote_char,
eol_char,
null_values,
try_parse_dates,
0,
raise_if_empty,
n_threads,
decimal_comma,
)
} else {
infer_file_schema_inner(
reader_bytes,
separator,
max_read_rows,
has_header,
schema_overwrite,
skip_rows,
skip_rows_after_header,
comment_prefix,
quote_char,
eol_char,
null_values,
try_parse_dates,
0,
raise_if_empty,
n_threads,
decimal_comma,
)
}
}
11 changes: 11 additions & 0 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,21 @@ impl LazyCsvReader {
}

/// Skip the first `n` rows during parsing. The header will be parsed at row `n`.
/// Note that by row we mean valid CSV, encoding and comments are respected.
#[must_use]
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.read_options.skip_rows = skip_rows;
self
}

/// Skip the first `n` lines during parsing. The header will be parsed at line `n`.
/// We don't respect CSV escaping when skipping lines.
#[must_use]
pub fn with_skip_lines(mut self, skip_lines: usize) -> Self {
self.read_options.skip_lines = skip_lines;
self
}

/// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
/// of the total schema.
#[must_use]
Expand Down Expand Up @@ -235,6 +244,7 @@ impl LazyCsvReader {

let mut infer_schema = |reader_bytes: ReaderBytes| {
let skip_rows = self.read_options.skip_rows;
let skip_lines = self.read_options.skip_lines;
let parse_options = self.read_options.get_parse_options();

PolarsResult::Ok(
Expand All @@ -246,6 +256,7 @@ impl LazyCsvReader {
// we set it to None and modify them after the schema is updated
None,
skip_rows,
skip_lines,
self.read_options.skip_rows_after_header,
parse_options.comment_prefix.as_ref(),
parse_options.quote_char,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/batched_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct PyBatchedCsv {
impl PyBatchedCsv {
#[staticmethod]
#[pyo3(signature = (
infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows,
infer_schema_length, chunk_size, has_header, ignore_errors, n_rows, skip_rows, skip_lines,
projection, separator, rechunk, columns, encoding, n_threads, path, schema_overrides,
overwrite_dtype_slice, low_memory, comment_prefix, quote_char, null_values,
missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header, row_index,
Expand All @@ -35,6 +35,7 @@ impl PyBatchedCsv {
ignore_errors: bool,
n_rows: Option<usize>,
skip_rows: usize,
skip_lines: usize,
projection: Option<Vec<usize>>,
separator: &str,
rechunk: bool,
Expand Down Expand Up @@ -97,6 +98,7 @@ impl PyBatchedCsv {
.with_has_header(has_header)
.with_n_rows(n_rows)
.with_skip_rows(skip_rows)
.with_skip_rows(skip_lines)
.with_ignore_errors(ignore_errors)
.with_projection(projection.map(Arc::new))
.with_rechunk(rechunk)
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl PyDataFrame {
#[cfg(feature = "csv")]
#[pyo3(signature = (
py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
skip_rows, projection, separator, rechunk, columns, encoding, n_threads, path,
skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
Expand All @@ -44,6 +44,7 @@ impl PyDataFrame {
ignore_errors: bool,
n_rows: Option<usize>,
skip_rows: usize,
skip_lines: usize,
projection: Option<Vec<usize>>,
separator: &str,
rechunk: bool,
Expand Down Expand Up @@ -100,6 +101,7 @@ impl PyDataFrame {
.with_has_header(has_header)
.with_n_rows(n_rows)
.with_skip_rows(skip_rows)
.with_skip_lines(skip_lines)
.with_ignore_errors(ignore_errors)
.with_projection(projection.map(Arc::new))
.with_rechunk(rechunk)
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl PyLazyFrame {

#[staticmethod]
#[cfg(feature = "csv")]
#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype,
#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
Expand All @@ -127,6 +127,7 @@ impl PyLazyFrame {
has_header: bool,
ignore_errors: bool,
skip_rows: usize,
skip_lines: usize,
n_rows: Option<usize>,
cache: bool,
overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
Expand Down Expand Up @@ -213,6 +214,7 @@ impl PyLazyFrame {
.with_has_header(has_header)
.with_ignore_errors(ignore_errors)
.with_skip_rows(skip_rows)
.with_skip_lines(skip_lines)
.with_n_rows(n_rows)
.with_cache(cache)
.with_dtype_overwrite(overwrite_dtype.map(Arc::new))
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl CsvSourceNode {
let quote_char = parse_options.quote_char;
let eol_char = parse_options.eol_char;

let skip_lines = options.skip_lines;
let skip_rows_before_header = options.skip_rows;
let skip_rows_after_header = options.skip_rows_after_header;
let comment_prefix = parse_options.comment_prefix.clone();
Expand Down Expand Up @@ -353,6 +354,7 @@ impl CsvSourceNode {
quote_char,
eol_char,
schema_len,
skip_lines,
skip_rows_before_header,
skip_rows_after_header,
comment_prefix,
Expand Down
Loading

0 comments on commit 2d65cc9

Please sign in to comment.