diff --git a/Cargo.lock b/Cargo.lock index 56917b6e..76a7077a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,7 +424,7 @@ dependencies = [ "clap", "criterion-plot", "is-terminal", - "itertools", + "itertools 0.10.5", "num-traits", "once_cell", "oorandom", @@ -445,7 +445,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", - "itertools", + "itertools 0.10.5", ] [[package]] @@ -719,7 +719,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.20.1-alpha.3" +version = "0.21.0" dependencies = [ "atoi", "bincode", @@ -746,7 +746,7 @@ dependencies = [ "hex", "htp", "humantime", - "itertools", + "itertools 0.12.0", "itoa", "kqueue", "notify", @@ -859,6 +859,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" diff --git a/Cargo.toml b/Cargo.toml index f3a84ff1..0ab5fa76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ categories = ["command-line-utilities"] description = "Utility for viewing json-formatted log files." keywords = ["cli", "human", "log"] name = "hl" -version = "0.20.1-alpha.3" +version = "0.21.0" edition = "2021" build = "build.rs" diff --git a/README.md b/README.md index 90ee7846..7dc20306 100644 --- a/README.md +++ b/README.md @@ -354,7 +354,8 @@ Options: --paging Output paging options [env: HL_PAGING=] [default: auto] [possible values: auto, always, never] -P Handful alias for --paging=never, overrides --paging option --theme Color theme [env: HL_THEME=] [default: universal] - -r, --raw-fields Disable unescaping and prettifying of field values + -r, --raw Output raw JSON messages instead of formatter messages, it can be useful for applying filters and saving results in original format + --raw-fields Disable unescaping and prettifying of field values --interrupt-ignore-count Number of interrupts to ignore, i.e. Ctrl-C (SIGINT) [env: HL_INTERRUPT_IGNORE_COUNT=] [default: 3] --buffer-size Buffer size [env: HL_BUFFER_SIZE=] [default: "256 KiB"] --max-message-size Maximum message size [env: HL_MAX_MESSAGE_SIZE=] [default: "64 MiB"] diff --git a/benches/parse-and-format.rs b/benches/parse-and-format.rs index 109ff9b2..6e7209dd 100644 --- a/benches/parse-and-format.rs +++ b/benches/parse-and-format.rs @@ -21,7 +21,7 @@ fn benchmark(c: &mut Criterion) { c.bench_function(format!("{}/{}", name, theme), |b| { let settings = Settings::default(); let parser = Parser::new(ParserSettings::new(&settings.fields.predefined, empty(), false)); - let mut formatter = RecordFormatter::new( + let formatter = RecordFormatter::new( Arc::new(Theme::embedded(theme).unwrap()), DateTimeFormatter::new( LinuxDateFormat::new("%b %d %T.%3N").compile(), @@ -32,7 +32,7 @@ fn benchmark(c: &mut Criterion) { settings::Formatting::default(), ); let filter = Filter::default(); - let mut processor = SegmentProcessor::new(&parser, &mut formatter, &filter); + let mut processor = SegmentProcessor::new(&parser, &formatter, &filter); let mut buf = Vec::new(); b.iter(|| { processor.run(record, &mut buf, "", &mut RecordIgnorer {}); diff --git a/src/app.rs b/src/app.rs index b73022a3..a95a83b1 100644 --- a/src/app.rs +++ b/src/app.rs @@ -30,11 +30,12 @@ use crate::datefmt::{DateTimeFormat, DateTimeFormatter}; use crate::error::*; use crate::fmtx::aligned_left; use crate::fsmon::{self, EventKind}; -use crate::formatting::RecordFormatter; +use crate::formatting::{RecordFormatter, RecordWithSourceFormatter, RawRecordFormatter}; use crate::index::{Indexer, Timestamp}; use crate::input::{BlockLine, InputHolder, InputReference, Input}; -use crate::model::{Filter, Parser, ParserSettings, RawRecord, Record}; +use crate::model::{Filter, Parser, ParserSettings, RawRecord, Record, RecordWithSourceConstructor}; use crate::scanning::{BufFactory, Scanner, Segment, SegmentBufFactory}; +use crate::serdex::StreamDeserializerWithOffsets; use crate::settings::{Fields, Formatting}; use crate::theme::{Element, StylingPush, Theme}; use crate::timezone::Tz; @@ -47,6 +48,7 @@ use crate::IncludeExcludeKeyFilter; pub struct Options { pub theme: Arc, pub time_format: DateTimeFormat, + pub raw: bool, pub raw_fields: bool, pub buffer_size: NonZeroUsize, pub max_message_size: NonZeroUsize, @@ -131,8 +133,8 @@ impl App { // spawn processing threads for (rxi, txo) in izip!(rxi, txo) { scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, |_| { - let mut formatter = self.formatter(); - let mut processor = SegmentProcessor::new(&parser, &mut formatter, &self.options.filter); + let formatter = self.formatter(); + let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter); for (i, segment) in rxi.iter() { let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or(""); match segment { @@ -264,7 +266,7 @@ impl App { let mut workers = Vec::with_capacity(n); for (rxp, txw) in izip!(rxp, txw) { workers.push(scope.spawn(closure!(ref parser, |_| -> Result<()> { - let mut formatter = self.formatter(); + let formatter = self.formatter(); for (block, ts_min, i, j) in rxp.iter() { let mut buf = Vec::with_capacity(2 * usize::try_from(block.size())?); let mut items = Vec::with_capacity(2 * usize::try_from(block.lines_valid())?); @@ -276,7 +278,7 @@ impl App { let record = parser.parse(record); if record.matches(&self.options.filter) { let offset = buf.len(); - formatter.format_record(&mut buf, &record); + formatter.format_record(&mut buf, record.with_source(line.bytes())); if let Some(ts) = record.ts { if let Some(unix_ts) = ts.unix_utc() { items.push((unix_ts.into(), offset..buf.len())); @@ -445,8 +447,8 @@ impl App { let mut workers = Vec::with_capacity(n); for _ in 0..n { let worker = scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, clone rxi, clone txo, |_| { - let mut formatter = self.formatter(); - let mut processor = SegmentProcessor::new(&parser, &mut formatter, &self.options.filter); + let formatter = self.formatter(); + let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter); for (i, j, segment) in rxi.iter() { let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or(""); match segment { @@ -564,15 +566,19 @@ impl App { )) } - fn formatter(&self) -> RecordFormatter { - RecordFormatter::new( - self.options.theme.clone(), - DateTimeFormatter::new(self.options.time_format.clone(), self.options.time_zone), - self.options.hide_empty_fields, - self.options.fields.filter.clone(), - self.options.formatting.clone(), - ) - .with_field_unescaping(!self.options.raw_fields) + fn formatter(&self) -> Box { + if self.options.raw { + Box::new(RawRecordFormatter{}) + } else { + Box::new(RecordFormatter::new( + self.options.theme.clone(), + DateTimeFormatter::new(self.options.time_format.clone(), self.options.time_zone), + self.options.hide_empty_fields, + self.options.fields.filter.clone(), + self.options.formatting.clone(), + ) + .with_field_unescaping(!self.options.raw_fields)) + } } fn input_badges<'a, I: IntoIterator>(&self, inputs: I) -> Option> { @@ -676,14 +682,14 @@ impl App { // --- -pub struct SegmentProcessor<'a> { +pub struct SegmentProcessor<'a, F: RecordWithSourceFormatter> { parser: &'a Parser, - formatter: &'a mut RecordFormatter, + formatter: F, filter: &'a Filter, } -impl<'a> SegmentProcessor<'a> { - pub fn new(parser: &'a Parser, formatter: &'a mut RecordFormatter, filter: &'a Filter) -> Self { +impl<'a, F: RecordWithSourceFormatter> SegmentProcessor<'a, F> { + pub fn new(parser: &'a Parser, formatter: F, filter: &'a Filter) -> Self { Self { parser, formatter, @@ -699,20 +705,21 @@ impl<'a> SegmentProcessor<'a> { if data.len() == 0 { continue; } - let mut stream = json::Deserializer::from_slice(data).into_iter::(); + let stream = json::Deserializer::from_slice(data).into_iter::(); + let mut stream = StreamDeserializerWithOffsets(stream); let mut some = false; - while let Some(Ok(record)) = stream.next() { + while let Some(Ok((record, offsets))) = stream.next() { some = true; let record = self.parser.parse(record); if record.matches(self.filter) { let begin = buf.len(); buf.extend(prefix.as_bytes()); - self.formatter.format_record(buf, &record); + self.formatter.format_record(buf, record.with_source(&data[offsets])); let end = buf.len(); observer.observe_record(&record, begin..end); } } - let remainder = if some { &data[stream.byte_offset()..] } else { data }; + let remainder = if some { &data[stream.0.byte_offset()..] } else { data }; if remainder.len() != 0 && self.filter.is_empty() { buf.extend_from_slice(remainder); buf.push(b'\n'); diff --git a/src/formatting.rs b/src/formatting.rs index 0bffdc1e..2cbd3b04 100644 --- a/src/formatting.rs +++ b/src/formatting.rs @@ -24,6 +24,33 @@ type Buf = Vec; // --- +pub trait RecordWithSourceFormatter { + fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource); +} + +pub struct RawRecordFormatter {} + +impl RecordWithSourceFormatter for RawRecordFormatter { + fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) { + buf.extend_from_slice(rec.source); + buf.push(b'\n'); + } +} + +impl RecordWithSourceFormatter for &T { + fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) { + (**self).format_record(buf, rec) + } +} + +impl RecordWithSourceFormatter for Box { + fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) { + (**self).format_record(buf, rec) + } +} + +// --- + pub struct RecordFormatter { theme: Arc, unescape_fields: bool, @@ -59,7 +86,7 @@ impl RecordFormatter { self } - pub fn format_record(&mut self, buf: &mut Buf, rec: &model::Record) { + pub fn format_record(&self, buf: &mut Buf, rec: &model::Record) { self.theme.apply(buf, &rec.level, |s| { // // time @@ -275,11 +302,21 @@ impl RecordFormatter { } } +impl RecordWithSourceFormatter for RecordFormatter { + fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) { + RecordFormatter::format_record(self, buf, rec.record) + } +} + +// --- + fn format_str_unescaped(buf: &mut Buf, s: &str) { let mut reader = StrRead::new(&s[1..]); reader.parse_str_raw(buf).unwrap(); } +// --- + struct FieldFormatter<'a> { rf: &'a RecordFormatter, } @@ -429,7 +466,7 @@ mod tests { use serde_json as json; fn format(rec: &Record) -> Result { - let mut formatter = RecordFormatter::new( + let formatter = RecordFormatter::new( Arc::new(Theme::from(testing::theme()?)), DateTimeFormatter::new( LinuxDateFormat::new("%y-%m-%d %T.%3N").compile(), @@ -459,8 +496,7 @@ mod tests { ("ka", RawValue::from_string(r#"{"va":{"kb":42}}"#.into()).unwrap().as_ref()), ]).unwrap(), extrax: Vec::default(), - }) - .unwrap(), + }).unwrap(), String::from("\u{1b}[0;2;3m00-01-02 03:04:05.123 \u{1b}[0;36m|\u{1b}[0;95mDBG\u{1b}[0;36m|\u{1b}[0;2;3m \u{1b}[0;2;4mtl:\u{1b}[0;2;3m \u{1b}[0;1;39mtm \u{1b}[0;32mka\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mva\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mkb\u{1b}[0;2m:\u{1b}[0;94m42\u{1b}[0;33m } }\u{1b}[0;2;3m @ tc\u{1b}[0m\n"), ); } diff --git a/src/lib.rs b/src/lib.rs index 04bd2f8e..a64953ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ mod model; mod pool; mod replay; mod scanning; +mod serdex; mod tee; // conditional public modules diff --git a/src/main.rs b/src/main.rs index 7a4f12ec..c9934e06 100644 --- a/src/main.rs +++ b/src/main.rs @@ -67,8 +67,12 @@ struct Opt { )] theme: String, // - /// Disable unescaping and prettifying of field values. + /// Output raw JSON messages instead of formatter messages, it can be useful for applying filters and saving results in original format. #[arg(short, long)] + raw: bool, + // + /// Disable unescaping and prettifying of field values. + #[arg(long)] raw_fields: bool, // /// Number of interrupts to ignore, i.e. Ctrl-C (SIGINT). @@ -357,6 +361,7 @@ fn run() -> Result<()> { // Create app. let app = hl::App::new(hl::Options { theme: Arc::new(theme), + raw: opt.raw, raw_fields: opt.raw_fields, time_format, buffer_size, diff --git a/src/model.rs b/src/model.rs index 4a85b680..0d3ea495 100644 --- a/src/model.rs +++ b/src/model.rs @@ -63,6 +63,31 @@ impl<'a> Record<'a> { // --- +pub trait RecordWithSourceConstructor { + fn with_source<'a>(&'a self, source: &'a [u8]) -> RecordWithSource<'a>; +} + +// --- + +pub struct RecordWithSource<'a> { + pub record: &'a Record<'a>, + pub source: &'a [u8], +} + +impl<'a> RecordWithSource<'a> { + pub fn new(record: &'a Record<'a>, source: &'a [u8]) -> Self { + Self { record, source } + } +} + +impl RecordWithSourceConstructor for Record<'_> { + fn with_source<'a>(&'a self, source: &'a [u8]) -> RecordWithSource<'a> { + RecordWithSource::new(self, source) + } +} + +// --- + pub trait RecordFilter { fn apply<'a>(&self, record: &'a Record<'a>) -> bool; } diff --git a/src/serdex.rs b/src/serdex.rs new file mode 100644 index 00000000..e1d7fb2b --- /dev/null +++ b/src/serdex.rs @@ -0,0 +1,18 @@ +use serde_json::StreamDeserializer; +use std::ops::Range; + +pub struct StreamDeserializerWithOffsets<'de, R, T>(pub StreamDeserializer<'de, R, T>); + +impl<'de, R, T> Iterator for StreamDeserializerWithOffsets<'de, R, T> +where + R: serde_json::de::Read<'de>, + T: serde::de::Deserialize<'de>, +{ + type Item = serde_json::Result<(T, Range)>; + fn next(&mut self) -> Option { + let start_offset = self.0.byte_offset(); + self.0 + .next() + .map(|res| res.map(|v| (v, start_offset..self.0.byte_offset()))) + } +}