Skip to content

Commit

Permalink
new: added option for raw source message output
Browse files Browse the repository at this point in the history
  • Loading branch information
pamburus committed Nov 26, 2023
1 parent adc0cb8 commit 482e9c3
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 38 deletions.
17 changes: 13 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ categories = ["command-line-utilities"]
description = "Utility for viewing json-formatted log files."
keywords = ["cli", "human", "log"]
name = "hl"
version = "0.20.1-alpha.3"
version = "0.21.0"
edition = "2021"
build = "build.rs"

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ Options:
--paging <PAGING> Output paging options [env: HL_PAGING=] [default: auto] [possible values: auto, always, never]
-P Handful alias for --paging=never, overrides --paging option
--theme <THEME> Color theme [env: HL_THEME=] [default: universal]
-r, --raw-fields Disable unescaping and prettifying of field values
-r, --raw Output raw JSON messages instead of formatter messages, it can be useful for applying filters and saving results in original format
--raw-fields Disable unescaping and prettifying of field values
--interrupt-ignore-count <INTERRUPT_IGNORE_COUNT> Number of interrupts to ignore, i.e. Ctrl-C (SIGINT) [env: HL_INTERRUPT_IGNORE_COUNT=] [default: 3]
--buffer-size <BUFFER_SIZE> Buffer size [env: HL_BUFFER_SIZE=] [default: "256 KiB"]
--max-message-size <MAX_MESSAGE_SIZE> Maximum message size [env: HL_MAX_MESSAGE_SIZE=] [default: "64 MiB"]
Expand Down
4 changes: 2 additions & 2 deletions benches/parse-and-format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn benchmark(c: &mut Criterion) {
c.bench_function(format!("{}/{}", name, theme), |b| {
let settings = Settings::default();
let parser = Parser::new(ParserSettings::new(&settings.fields.predefined, empty(), false));
let mut formatter = RecordFormatter::new(
let formatter = RecordFormatter::new(
Arc::new(Theme::embedded(theme).unwrap()),
DateTimeFormatter::new(
LinuxDateFormat::new("%b %d %T.%3N").compile(),
Expand All @@ -32,7 +32,7 @@ fn benchmark(c: &mut Criterion) {
settings::Formatting::default(),
);
let filter = Filter::default();
let mut processor = SegmentProcessor::new(&parser, &mut formatter, &filter);
let mut processor = SegmentProcessor::new(&parser, &formatter, &filter);
let mut buf = Vec::new();
b.iter(|| {
processor.run(record, &mut buf, "", &mut RecordIgnorer {});
Expand Down
57 changes: 32 additions & 25 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ use crate::datefmt::{DateTimeFormat, DateTimeFormatter};
use crate::error::*;
use crate::fmtx::aligned_left;
use crate::fsmon::{self, EventKind};
use crate::formatting::RecordFormatter;
use crate::formatting::{RecordFormatter, RecordWithSourceFormatter, RawRecordFormatter};
use crate::index::{Indexer, Timestamp};
use crate::input::{BlockLine, InputHolder, InputReference, Input};
use crate::model::{Filter, Parser, ParserSettings, RawRecord, Record};
use crate::model::{Filter, Parser, ParserSettings, RawRecord, Record, RecordWithSourceConstructor};
use crate::scanning::{BufFactory, Scanner, Segment, SegmentBufFactory};
use crate::serdex::StreamDeserializerWithOffsets;
use crate::settings::{Fields, Formatting};
use crate::theme::{Element, StylingPush, Theme};
use crate::timezone::Tz;
Expand All @@ -47,6 +48,7 @@ use crate::IncludeExcludeKeyFilter;
pub struct Options {
pub theme: Arc<Theme>,
pub time_format: DateTimeFormat,
pub raw: bool,
pub raw_fields: bool,
pub buffer_size: NonZeroUsize,
pub max_message_size: NonZeroUsize,
Expand Down Expand Up @@ -131,8 +133,8 @@ impl App {
// spawn processing threads
for (rxi, txo) in izip!(rxi, txo) {
scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, |_| {
let mut formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &mut formatter, &self.options.filter);
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
for (i, segment) in rxi.iter() {
let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or("");
match segment {
Expand Down Expand Up @@ -264,7 +266,7 @@ impl App {
let mut workers = Vec::with_capacity(n);
for (rxp, txw) in izip!(rxp, txw) {
workers.push(scope.spawn(closure!(ref parser, |_| -> Result<()> {
let mut formatter = self.formatter();
let formatter = self.formatter();
for (block, ts_min, i, j) in rxp.iter() {
let mut buf = Vec::with_capacity(2 * usize::try_from(block.size())?);
let mut items = Vec::with_capacity(2 * usize::try_from(block.lines_valid())?);
Expand All @@ -276,7 +278,7 @@ impl App {
let record = parser.parse(record);
if record.matches(&self.options.filter) {
let offset = buf.len();
formatter.format_record(&mut buf, &record);
formatter.format_record(&mut buf, record.with_source(line.bytes()));
if let Some(ts) = record.ts {
if let Some(unix_ts) = ts.unix_utc() {
items.push((unix_ts.into(), offset..buf.len()));
Expand Down Expand Up @@ -445,8 +447,8 @@ impl App {
let mut workers = Vec::with_capacity(n);
for _ in 0..n {
let worker = scope.spawn(closure!(ref bfo, ref parser, ref sfi, ref input_badges, clone rxi, clone txo, |_| {
let mut formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &mut formatter, &self.options.filter);
let formatter = self.formatter();
let mut processor = SegmentProcessor::new(&parser, &formatter, &self.options.filter);
for (i, j, segment) in rxi.iter() {
let prefix = input_badges.as_ref().map(|b|b[i].as_str()).unwrap_or("");
match segment {
Expand Down Expand Up @@ -564,15 +566,19 @@ impl App {
))
}

fn formatter(&self) -> RecordFormatter {
RecordFormatter::new(
self.options.theme.clone(),
DateTimeFormatter::new(self.options.time_format.clone(), self.options.time_zone),
self.options.hide_empty_fields,
self.options.fields.filter.clone(),
self.options.formatting.clone(),
)
.with_field_unescaping(!self.options.raw_fields)
fn formatter(&self) -> Box<dyn RecordWithSourceFormatter> {
if self.options.raw {
Box::new(RawRecordFormatter{})
} else {
Box::new(RecordFormatter::new(
self.options.theme.clone(),
DateTimeFormatter::new(self.options.time_format.clone(), self.options.time_zone),
self.options.hide_empty_fields,
self.options.fields.filter.clone(),
self.options.formatting.clone(),
)
.with_field_unescaping(!self.options.raw_fields))
}
}

fn input_badges<'a, I: IntoIterator<Item = &'a InputReference>>(&self, inputs: I) -> Option<Vec<String>> {
Expand Down Expand Up @@ -676,14 +682,14 @@ impl App {

// ---

pub struct SegmentProcessor<'a> {
pub struct SegmentProcessor<'a, F: RecordWithSourceFormatter> {
parser: &'a Parser,
formatter: &'a mut RecordFormatter,
formatter: F,
filter: &'a Filter,
}

impl<'a> SegmentProcessor<'a> {
pub fn new(parser: &'a Parser, formatter: &'a mut RecordFormatter, filter: &'a Filter) -> Self {
impl<'a, F: RecordWithSourceFormatter> SegmentProcessor<'a, F> {
pub fn new(parser: &'a Parser, formatter: F, filter: &'a Filter) -> Self {
Self {
parser,
formatter,
Expand All @@ -699,20 +705,21 @@ impl<'a> SegmentProcessor<'a> {
if data.len() == 0 {
continue;
}
let mut stream = json::Deserializer::from_slice(data).into_iter::<RawRecord>();
let stream = json::Deserializer::from_slice(data).into_iter::<RawRecord>();
let mut stream = StreamDeserializerWithOffsets(stream);
let mut some = false;
while let Some(Ok(record)) = stream.next() {
while let Some(Ok((record, offsets))) = stream.next() {
some = true;
let record = self.parser.parse(record);
if record.matches(self.filter) {
let begin = buf.len();
buf.extend(prefix.as_bytes());
self.formatter.format_record(buf, &record);
self.formatter.format_record(buf, record.with_source(&data[offsets]));
let end = buf.len();
observer.observe_record(&record, begin..end);
}
}
let remainder = if some { &data[stream.byte_offset()..] } else { data };
let remainder = if some { &data[stream.0.byte_offset()..] } else { data };
if remainder.len() != 0 && self.filter.is_empty() {
buf.extend_from_slice(remainder);
buf.push(b'\n');
Expand Down
44 changes: 40 additions & 4 deletions src/formatting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,33 @@ type Buf = Vec<u8>;

// ---

pub trait RecordWithSourceFormatter {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource);
}

pub struct RawRecordFormatter {}

impl RecordWithSourceFormatter for RawRecordFormatter {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) {
buf.extend_from_slice(rec.source);
buf.push(b'\n');
}
}

impl<T: RecordWithSourceFormatter> RecordWithSourceFormatter for &T {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) {
(**self).format_record(buf, rec)
}
}

impl RecordWithSourceFormatter for Box<dyn RecordWithSourceFormatter> {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) {
(**self).format_record(buf, rec)
}
}

// ---

pub struct RecordFormatter {
theme: Arc<Theme>,
unescape_fields: bool,
Expand Down Expand Up @@ -59,7 +86,7 @@ impl RecordFormatter {
self
}

pub fn format_record(&mut self, buf: &mut Buf, rec: &model::Record) {
pub fn format_record(&self, buf: &mut Buf, rec: &model::Record) {
self.theme.apply(buf, &rec.level, |s| {
//
// time
Expand Down Expand Up @@ -275,11 +302,21 @@ impl RecordFormatter {
}
}

impl RecordWithSourceFormatter for RecordFormatter {
fn format_record(&self, buf: &mut Buf, rec: model::RecordWithSource) {
RecordFormatter::format_record(self, buf, rec.record)
}
}

// ---

fn format_str_unescaped(buf: &mut Buf, s: &str) {
let mut reader = StrRead::new(&s[1..]);
reader.parse_str_raw(buf).unwrap();
}

// ---

struct FieldFormatter<'a> {
rf: &'a RecordFormatter,
}
Expand Down Expand Up @@ -429,7 +466,7 @@ mod tests {
use serde_json as json;

fn format(rec: &Record) -> Result<String, Error> {
let mut formatter = RecordFormatter::new(
let formatter = RecordFormatter::new(
Arc::new(Theme::from(testing::theme()?)),
DateTimeFormatter::new(
LinuxDateFormat::new("%y-%m-%d %T.%3N").compile(),
Expand Down Expand Up @@ -459,8 +496,7 @@ mod tests {
("ka", RawValue::from_string(r#"{"va":{"kb":42}}"#.into()).unwrap().as_ref()),
]).unwrap(),
extrax: Vec::default(),
})
.unwrap(),
}).unwrap(),
String::from("\u{1b}[0;2;3m00-01-02 03:04:05.123 \u{1b}[0;36m|\u{1b}[0;95mDBG\u{1b}[0;36m|\u{1b}[0;2;3m \u{1b}[0;2;4mtl:\u{1b}[0;2;3m \u{1b}[0;1;39mtm \u{1b}[0;32mka\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mva\u{1b}[0;2m:\u{1b}[0;33m{ \u{1b}[0;32mkb\u{1b}[0;2m:\u{1b}[0;94m42\u{1b}[0;33m } }\u{1b}[0;2;3m @ tc\u{1b}[0m\n"),
);
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod model;
mod pool;
mod replay;
mod scanning;
mod serdex;
mod tee;

// conditional public modules
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ struct Opt {
)]
theme: String,
//
/// Disable unescaping and prettifying of field values.
/// Output raw JSON messages instead of formatter messages, it can be useful for applying filters and saving results in original format.
#[arg(short, long)]
raw: bool,
//
/// Disable unescaping and prettifying of field values.
#[arg(long)]
raw_fields: bool,
//
/// Number of interrupts to ignore, i.e. Ctrl-C (SIGINT).
Expand Down Expand Up @@ -357,6 +361,7 @@ fn run() -> Result<()> {
// Create app.
let app = hl::App::new(hl::Options {
theme: Arc::new(theme),
raw: opt.raw,
raw_fields: opt.raw_fields,
time_format,
buffer_size,
Expand Down
25 changes: 25 additions & 0 deletions src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ impl<'a> Record<'a> {

// ---

pub trait RecordWithSourceConstructor {
fn with_source<'a>(&'a self, source: &'a [u8]) -> RecordWithSource<'a>;
}

// ---

pub struct RecordWithSource<'a> {
pub record: &'a Record<'a>,
pub source: &'a [u8],
}

impl<'a> RecordWithSource<'a> {
pub fn new(record: &'a Record<'a>, source: &'a [u8]) -> Self {
Self { record, source }
}
}

impl RecordWithSourceConstructor for Record<'_> {
fn with_source<'a>(&'a self, source: &'a [u8]) -> RecordWithSource<'a> {
RecordWithSource::new(self, source)
}
}

// ---

pub trait RecordFilter {
fn apply<'a>(&self, record: &'a Record<'a>) -> bool;
}
Expand Down
18 changes: 18 additions & 0 deletions src/serdex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use serde_json::StreamDeserializer;
use std::ops::Range;

pub struct StreamDeserializerWithOffsets<'de, R, T>(pub StreamDeserializer<'de, R, T>);

impl<'de, R, T> Iterator for StreamDeserializerWithOffsets<'de, R, T>
where
R: serde_json::de::Read<'de>,
T: serde::de::Deserialize<'de>,
{
type Item = serde_json::Result<(T, Range<usize>)>;
fn next(&mut self) -> Option<Self::Item> {
let start_offset = self.0.byte_offset();
self.0
.next()
.map(|res| res.map(|v| (v, start_offset..self.0.byte_offset())))
}
}

0 comments on commit 482e9c3

Please sign in to comment.