diff --git a/Cargo.toml b/Cargo.toml index bdb63b2..e3f5c6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bgpkit-parser" -version = "0.3.0" +version = "0.4.0" authors = ["Mingwei Zhang "] edition = "2021" readme = "README.md" @@ -25,7 +25,9 @@ log="0.4" env_logger="0.9" itertools = "0.10.1" +bzip2="0.4.3" +flate2="1.0.22" +reqwest = { version = "0.11", features = ["json", "blocking", "stream"]} + [dev-dependencies] -bzip2="0.4" -reqwest = { version = "0.11", features = ["json", "blocking", "stream"] } bgpkit-broker = "0.3.0" diff --git a/README.md b/README.md index 8eff526..1498ff7 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,104 @@ # BGPKIT Parser -BGPKIT Parser provides MRT/BGP message parsing functionalities written in Rust. - -## Features +BGPKIT Parser aims to provides the most ergonomic MRT/BGP message parsing Rust API. BGPKIT Parser has the following features: -- performance comparable to C-based implementations like `bgpdump` or `bgpreader`. -- supporting most of the relevant BGP RFCs. -- simple API serves as building block for more complex workflows. +- **performant**: comparable to C-based implementations like `bgpdump` or `bgpreader`. +- **actively maintained**: we consistently introduce feature updates and bug fixes, and support most of the relevant BGP RFCs. +- **ergonomic API**: a three-line for loop can already get you started. +- **battery-included**: ready to handle remote or local, bzip2 or gz data files out of the box + +## Examples + +For complete examples, check out the [examples folder](examples). + +### Parsing single MRT file + +Let's say we want to print out all the BGP announcements/withdrawal from a single MRT file, either located remotely or locally. +Here is an example that does so. + +```rust +use bgpkit_parser::BgpkitParser; +fn main() { + let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"); + for elem in parser { + println!("{}", elem) + } +} +``` + +Yes, it is this simple! + +You can even do some more interesting iterator operations that are event shorter. +For example, counting the number of announcements/withdrawals in that file: +```rust +use bgpkit_parser::BgpkitParser; +fn main() { + let url = "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"; + let count = BgpkitParser::new(url).into_iter().count(); + println!("total: {}", count); +} +``` + +and it prints out +``` +total: 255849 +``` + +### Parsing multiple MRT files with BGPKIT Broker + +[BGPKIT Broker][broker-repo] library provides search API for all RouteViews and RIPE RIS MRT data files. Using the +broker's Rust API ([`bgpkit-broker`][broker-crates-io]), we can easily compile a list of MRT files that we are interested +in for any time period and any data type (`update` or `rib`). This allows users to gather information without needing to +know about locations of specific data files. + +[broker-repo]: https://github.com/bgpkit/bgpkit-broker +[broker-crates-io]: https://crates.io/crates/bgpkit-broker + +The example below shows a relatively more interesting example that does the following: +- find all BGP archive data created on time 1634693400 +- filter to only BGP updates files +- find all announcements originated from AS13335 +- print out the total count of the announcements + +```rust +fn main(){ + // set broker query parameters + let mut params = bgpkit_broker::QueryParams::new(); + params = params.start_ts(1634693400); + params = params.end_ts(1634693400); + params = params.data_type("update"); + let mut broker = bgpkit_broker::BgpkitBroker::new("https://api.broker.bgpkit.com/v1"); + broker.set_params(¶ms); + + // loop through data files found by broker + for item in broker { + + // create a parser that takes an URL and automatically determine + // the file location and file type, and handles data download and + // decompression streaming intelligently + let parser = BgpkitParser::new(item.url.as_str()); + + // iterating through the parser. the iterator returns `BgpElem` one at a time. + let elems = parser.into_elem_iter().map(|elem|{ + if let Some(origins) = &elem.origin_asns { + if origins.contains(&13335) { + Some(elem) + } else { + None + } + } else { + None + } + }).filter_map(|x|x).collect::>(); + log::info!("{} elems matches", elems.len()); + } +} +``` -## Key Data Structures +## Data Representation -There are two key data structure to understand the parsing results:`MrtRecord` and `BgpElem`. +There are two key data structure to understand for the parsing results:`MrtRecord` and `BgpElem`. ### `MrtRecord`: unmodified MRT information representation @@ -32,16 +119,16 @@ pub enum MrtMessage { ``` `MrtRecord` record representation is concise, storage efficient, but often less convenient to use. For example, when -trying to find out specific BGP announcements for certain IP prefix, we often needs to go through nested layers of +trying to find out specific BGP announcements for certain IP prefix, we often needs to go through nested layers of internal data structure (NLRI, announced, prefix, or even looking up peer index table for Table Dump V2 format), which could be irrelevant to what users really want to do. ### `BgpElem`: per-prefix BGP information, MRT-format-agnostic -To facilitate simpler data analysis of BGP data, we defined a new data structure called `BgpElem` in this crate. Each +To facilitate simpler data analysis of BGP data, we defined a new data structure called `BgpElem` in this crate. Each `BgpElem` contains a piece of self-containing BGP information about one single IP prefix. For example, when a bundled announcement of three prefixes P1, P2, P3 that shares the same AS path is processed, we break -the single record into three different `BgpElem` objects, each presenting a prefix. +the single record into three different `BgpElem` objects, each presenting a prefix. ```rust pub struct BgpElem { @@ -63,88 +150,12 @@ pub struct BgpElem { } ``` -The main benefit of using `BgpElem` is that the analysis can be executed on a per-prefix basis, generic to what the +The main benefit of using `BgpElem` is that the analysis can be executed on a per-prefix basis, generic to what the backend MRT data format (bgp4mp, tabledumpv1, tabledumpv2, etc.). The obvious drawback is that we will have to duplicate -information to save at each elem, that consuming more memory. +information to save at each elem, that consuming more memory. [mrt-record-doc]: https://docs.rs/bgp-models/0.3.4/bgp_models/mrt/struct.MrtRecord.html -## Examples - -For complete examples, check out the [examples folder](examples) - -### Parsing single MRT file - -If having a file location already known, a user can directly read the data into memory and parse the bytes to creat -a parser object. The BGPKIT Parser provides convenient iterator over either `BgpElem` (the default iterator, or `.into_iter()`), -or `MrtRecord` (use `.into_record_iter()`). The example below iterates through all the elements in a single MRT updates file, -and logging all the announcements that were originated from a specific ASN. - -```rust -// read updates data into bytes -let data_bytes = reqwest::blocking::get("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2") - .unwrap().bytes().unwrap().to_vec(); -// create a buffered reader that wraps around a bzip2 decoder -let reader = BufReader::new(BzDecoder::new(&*data_bytes)); -// create a parser that takes the buffered reader -let parser = BgpkitParser::new(reader); - -// iterating through the parser. the iterator returns `BgpElem` one at a time. -for elem in parser { - // each BGP announcement contains one AS path, which depending on the path segment's type - // there could be multiple origin ASNs (e.g. AS-Set as the origin) - if let Some(origins) = &elem.origin_asns { - if origins.contains(&13335) { - log::info!("{}", &elem); - } - } -} -``` - -### Parsing multiple MRT files with BGPKIT Broker - -[BGPKIT Broker][broker-repo] library provides search API for all RouteViews and RIPE RIS MRT data files. Using the -broker's Rust API ([`bgpkit-broker`][broker-crates-io]), we can easily compile a list of MRT files that we are interested -in for any time period and any data type (`update` or `rib`). This allows users to gather information without needing to -know about locations of specific data files. - -[broker-repo]: https://github.com/bgpkit/bgpkit-broker -[broker-crates-io]: https://crates.io/crates/bgpkit-broker - -```rust -let mut params = bgpkit_broker::QueryParams::new(); -params = params.start_ts(1634693400); -params = params.end_ts(1634693400); -params = params.data_type("update"); -let mut broker = bgpkit_broker::BgpkitBroker::new("https://api.broker.bgpkit.com/v1"); -broker.set_params(¶ms); - -for item in broker { - log::info!("downloading updates file: {}", &item.url); - // read updates data into bytes - let data_bytes = reqwest::blocking::get(item.url) - .unwrap().bytes().unwrap().to_vec(); - // create a buffered reader that wraps around a bzip2 decoder - let reader = BufReader::new(BzDecoder::new(&*data_bytes)); - // create a parser that takes the buffered reader - let parser = BgpkitParser::new(reader); - - log::info!("parsing updates file"); - // iterating through the parser. the iterator returns `BgpElem` one at a time. - let elems = parser.into_elem_iter().map(|elem|{ - if let Some(origins) = &elem.origin_asns { - if origins.contains(&13335) { - Some(elem) - } else { - None - } - } else { - None - } - }).filter_map(|x|x).collect::>(); - log::info!("{} elems matches", elems.len()); -} -``` ## Contribution diff --git a/examples/count_elems.rs b/examples/count_elems.rs new file mode 100644 index 0000000..11f141f --- /dev/null +++ b/examples/count_elems.rs @@ -0,0 +1,8 @@ +use bgpkit_parser::BgpkitParser; + +/// an very simple example that reads a remote BGP data file and print out the message count. +fn main() { + let url = "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"; + let count = BgpkitParser::new(url).into_iter().count(); + println!("{}", count); +} \ No newline at end of file diff --git a/examples/parse-files-from-broker.rs b/examples/parse-files-from-broker.rs index 8cfc28d..d9a7b8d 100644 --- a/examples/parse-files-from-broker.rs +++ b/examples/parse-files-from-broker.rs @@ -1,16 +1,7 @@ -use std::io::BufReader; -use bzip2::read::BzDecoder; use bgpkit_parser::{BgpElem, BgpkitParser}; /// This example shows how use BGPKIT Broker to retrieve a number of data file pointers that matches /// the time range criteria, and then parse the data files for each one. -/// -/// The dependency needed for this example are: -/// ``` -/// bzip2="0.4" -/// reqwest = { version = "0.11", features = ["json", "blocking", "stream"] } -/// bgpkit-broker = "0.3.0" -/// ``` fn main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -23,13 +14,7 @@ fn main() { for item in broker { log::info!("downloading updates file: {}", &item.url); - // read updates data into bytes - let data_bytes = reqwest::blocking::get(item.url) - .unwrap().bytes().unwrap().to_vec(); - // create a buffered reader that wraps around a bzip2 decoder - let reader = BufReader::new(BzDecoder::new(&*data_bytes)); - // create a parser that takes the buffered reader - let parser = BgpkitParser::new(reader); + let parser = BgpkitParser::new(item.url.as_str()); log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. diff --git a/examples/parse-single-file.rs b/examples/parse-single-file.rs index b19c4c9..2f586f4 100644 --- a/examples/parse-single-file.rs +++ b/examples/parse-single-file.rs @@ -1,36 +1,18 @@ -use std::io::BufReader; -use bzip2::read::BzDecoder; use bgpkit_parser::BgpkitParser; /// This example shows how to download and process a single BGP archive file with BGPKIT Parser. -/// -/// The dependency needed for this example are: -/// ``` -/// bzip2="0.4" -/// reqwest = { version = "0.11", features = ["json", "blocking", "stream"] } -/// ``` fn main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); log::info!("downloading updates file"); - // read updates data into bytes - let data_bytes = reqwest::blocking::get("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2") - .unwrap().bytes().unwrap().to_vec(); - // create a buffered reader that wraps around a bzip2 decoder - let reader = BufReader::new(BzDecoder::new(&*data_bytes)); + // create a parser that takes the buffered reader - let parser = BgpkitParser::new(reader); + let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"); log::info!("parsing updates file"); // iterating through the parser. the iterator returns `BgpElem` one at a time. for elem in parser { - // each BGP announcement contains one AS path, which depending on the path segment's type - // there could be multiple origin ASNs (e.g. AS-Set as the origin) - if let Some(origins) = &elem.origin_asns { - if origins.contains(&13335) { - log::info!("{}", &elem); - } - } + log::info!("{}", &elem); } log::info!("done"); } \ No newline at end of file diff --git a/src/formats/bgpdump.rs b/src/formats/bgpdump.rs deleted file mode 100644 index 7880126..0000000 --- a/src/formats/bgpdump.rs +++ /dev/null @@ -1,452 +0,0 @@ -use bgp_models::bgp::attributes::{AsPathSegment, AtomicAggregate, AttrType, Attribute, Community, Origin, AsPath}; -use bgp_models::bgp::BgpMessage; -use bgp_models::mrt::bgp4mp::{Bgp4Mp, Bgp4MpMessage, Bgp4MpStateChange}; -use bgp_models::mrt::{MrtMessage, MrtRecord}; -use std::collections::HashMap; -use bgp_models::mrt::tabledump::{Peer, RibEntry, TableDumpMessage, TableDumpV2Message}; -use bgp_models::network::{Asn, NextHopAddress}; -use log::warn; - -pub struct BgpdumpFormatter { - peer_table: Option>, -} - -impl BgpdumpFormatter { - pub fn new() -> BgpdumpFormatter { - BgpdumpFormatter { peer_table: None } - } - - fn format_state_change(&self, _msg: &Bgp4MpStateChange) -> Vec { - todo!("state change display not implemented") - } - - fn format_message(&self, msg: &Bgp4MpMessage) -> Vec { - match &msg.bgp_message { - BgpMessage::Open(_) => { - vec![format!("U|O")] - } - BgpMessage::Update(m) => { - let mut elems = vec![]; - let mp = attr_map_to_str_map(&m.attributes); - let mut path_str = "".to_string(); - // if let Some(p) = mp.get(&AttrType::AS_PATH) { - // path_str = p.clone(); - // } - // if let Some(p) = mp.get(&AttrType::AS4_PATH) { - // path_str = p.clone(); - // } - let aspath = merge_aspath_as4path(m.attributes.get(&AttrType::AS_PATH), m.attributes.get(&AttrType::AS4_PATH)); - if let Some(p) = aspath { - path_str = aspath_to_string(&p) - } - - let mut aggr_str = "".to_string(); - if let Some(a) = mp.get(&AttrType::AGGREGATOR) { - aggr_str = a.clone(); - } - if let Some(a) = mp.get(&AttrType::AS4_AGGREGATOR) { - aggr_str = a.clone(); - } - - let origin = mp.get(&AttrType::ORIGIN).unwrap_or(&"".to_string()).clone(); - let nexthop = mp.get(&AttrType::NEXT_HOP) - .unwrap_or(&"".to_string()) - .clone(); - let local_pref = mp.get(&AttrType::LOCAL_PREFERENCE) - .unwrap_or(&"0".to_string()) - .clone(); - let med = mp.get(&AttrType::MULTI_EXIT_DISCRIMINATOR) - .unwrap_or(&"0".to_string()) - .clone(); - let communities = mp.get(&AttrType::COMMUNITIES) - .unwrap_or(&"".to_string()) - .clone(); - let atomic = mp.get(&AttrType::ATOMIC_AGGREGATE) - .unwrap_or(&"NAG".to_string()) - .clone(); - - - if let Some(Attribute::MpReachableNlri(nlri)) = m.attributes.get(&AttrType::MP_REACHABLE_NLRI) { - elems.extend(nlri.prefixes.iter().map(|p| { - format!( - "A|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", - msg.peer_ip.to_string(), - msg.peer_asn.to_string(), - p, - &path_str, - &origin, - &nexthop, - &local_pref, - &med, - &communities, - &atomic, - &aggr_str - ) - })); - } - - if let Some(Attribute::MpUnreachableNlri(nlri)) = m.attributes.get(&AttrType::MP_UNREACHABLE_NLRI) { - elems.extend(nlri.prefixes.iter().map(|p| { - format!( - "W|{}|{}|{}", - msg.peer_ip.to_string(), - msg.peer_asn.to_string(), - p, - ) - })); - } - - elems.extend(m.announced_prefixes.iter().map(|p| { - format!( - "A|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", - msg.peer_ip.to_string(), - msg.peer_asn.to_string(), - p, - &path_str, - &origin, - &nexthop, - &local_pref, - &med, - &communities, - &atomic, - &aggr_str - ) - })); - - elems.extend(m.withdrawn_prefixes.iter().map(|p|{ - format!( - "W|{}|{}|{}", - msg.peer_ip.to_string(), - msg.peer_asn.to_string(), - p, - ) - })); - - - elems - } - BgpMessage::Notification(_) => { - vec![format!("U|N")] - } - BgpMessage::KeepAlive(_) => { - vec![format!("U|K")] - } - } - } - - pub fn to_elems(&mut self, mrt_record: &MrtRecord) -> Vec { - let mrt = mrt_record; - let timestamp = &mrt.common_header.timestamp; - let timestamp_micro = &mrt.common_header.microsecond_timestamp; - match &mrt.message { - MrtMessage::TableDumpMessage(msg) => { - let header = format!("TABLE_DUMP|{}|B", timestamp); - let mut entries: Vec = vec![]; - - let mp = attr_map_to_str_map(&msg.attributes); - let mut path_str = "".to_string(); - if let Some(p) = mp.get(&AttrType::AS_PATH) { - path_str = p.clone(); - } - - let origin = mp.get(&AttrType::ORIGIN).unwrap_or(&"".to_string()).clone(); - let mut nexthop_str = mp.get(&AttrType::NEXT_HOP) - .unwrap_or(&"".to_string()) - .clone(); - if nexthop_str ==""{ - if let Some(Attribute::MpReachableNlri(nlri)) = msg.attributes.get(&AttrType::MP_REACHABLE_NLRI) { - if let Some(next_hop) = &nlri.next_hop { - nexthop_str = match next_hop{ - NextHopAddress::Ipv4(v) => { v.to_string()} - NextHopAddress::Ipv6(v) => { v.to_string()} - NextHopAddress::Ipv6LinkLocal(v1, v2) => { v1.to_string()} - }; - } - } - } - let local_pref = mp.get(&AttrType::LOCAL_PREFERENCE) - .unwrap_or(&"0".to_string()) - .clone(); - let med = mp.get(&AttrType::MULTI_EXIT_DISCRIMINATOR) - .unwrap_or(&"0".to_string()) - .clone(); - let communities = mp.get(&AttrType::COMMUNITIES) - .unwrap_or(&"".to_string()) - .clone(); - let atomic = mp.get(&AttrType::ATOMIC_AGGREGATE) - .unwrap_or(&"NAG".to_string()) - .clone(); - - let mut aggr_str = "".to_string(); - if let Some(a) = mp.get(&AttrType::AGGREGATOR) { - aggr_str = a.clone(); - } - if let Some(a) = mp.get(&AttrType::AS4_AGGREGATOR) { - aggr_str = a.clone(); - } - match msg { - &_ => {} - } - vec![ - format!("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", - header, - msg.peer_address, msg.peer_asn, msg.prefix, - path_str, - &origin, - &nexthop_str, - &local_pref, - &med, - &communities, - &atomic, - &aggr_str - ) - ] - } - - MrtMessage::TableDumpV2Message(msg) => { - let header = format!("TABLE_DUMP2|{}|B", timestamp); - let mut entries: Vec = vec![]; - match msg { - TableDumpV2Message::PeerIndexTable(p) => { - self.peer_table = Some(p.peers_map.clone()); - } - TableDumpV2Message::RibAfiEntries(t) => { - for e in &t.rib_entries { - let pid = e.peer_index; - let peer = self.peer_table.as_ref().unwrap().get(&(pid as u32)).unwrap(); - - let prefix_str = format!("{}",t.prefix); - - let mp = attr_map_to_str_map(&e.attributes); - let mut path_str = "".to_string(); - let aspath = merge_aspath_as4path(e.attributes.get(&AttrType::AS_PATH), e.attributes.get(&AttrType::AS4_PATH)); - if let Some(p) = aspath { - path_str = aspath_to_string(&p) - } - - - let origin = mp.get(&AttrType::ORIGIN).unwrap_or(&"".to_string()).clone(); - let mut nexthop_str = mp.get(&AttrType::NEXT_HOP) - .unwrap_or(&"".to_string()) - .clone(); - if nexthop_str ==""{ - if let Some(Attribute::MpReachableNlri(nlri)) = e.attributes.get(&AttrType::MP_REACHABLE_NLRI) { - if let Some(next_hop) = &nlri.next_hop { - nexthop_str = match next_hop{ - NextHopAddress::Ipv4(v) => { v.to_string()} - NextHopAddress::Ipv6(v) => { v.to_string()} - NextHopAddress::Ipv6LinkLocal(v1, v2) => { v1.to_string()} - }; - } - } - } - let local_pref = mp.get(&AttrType::LOCAL_PREFERENCE) - .unwrap_or(&"0".to_string()) - .clone(); - let med = mp.get(&AttrType::MULTI_EXIT_DISCRIMINATOR) - .unwrap_or(&"0".to_string()) - .clone(); - let communities = mp.get(&AttrType::COMMUNITIES) - .unwrap_or(&"".to_string()) - .clone(); - let atomic = mp.get(&AttrType::ATOMIC_AGGREGATE) - .unwrap_or(&"NAG".to_string()) - .clone(); - - let mut aggr_str = "".to_string(); - if let Some(a) = mp.get(&AttrType::AGGREGATOR) { - aggr_str = a.clone(); - } - if let Some(a) = mp.get(&AttrType::AS4_AGGREGATOR) { - aggr_str = a.clone(); - } - - entries.push(format!("{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", - peer.peer_address, peer.peer_asn, prefix_str, - path_str, - &origin, - &nexthop_str, - &local_pref, - &med, - &communities, - &atomic, - &aggr_str - )); - } - } - TableDumpV2Message::RibGenericEntries(t) => {} - } - - entries.iter().map(|e| format!("{}|{}", header, e)).collect::>() - } - - MrtMessage::Bgp4Mp(msg) => { - let header = if let Some(micro) = timestamp_micro { - let m = (micro.clone() as f64)/1000000.0; - let t_micro: f64 = timestamp.clone() as f64 + m; - format!("BGP4MP_ET|{:.6}", t_micro) - } else { - format!("BGP4MP|{}", timestamp) - }; - - match msg { - Bgp4Mp::Bgp4MpStateChange(v) | Bgp4Mp::Bgp4MpStateChangeAs4(v) => { - self.format_state_change(v) - } - Bgp4Mp::Bgp4MpMessage(v) - | Bgp4Mp::Bgp4MpMessageLocal(v) - | Bgp4Mp::Bgp4MpMessageAs4(v) - | Bgp4Mp::Bgp4MpMessageAs4Local(v) => self.format_message(v), - } - .iter() - .map(|s| format!("{}|{}", &header, s)) - .collect::>() - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::parser::Parser; - use bzip2::read::BzDecoder; - use std::io::{BufRead, BufReader}; - use std::fs::File; - use env_logger::Env; - - #[test] - fn test_full_file_comparison_old() { - use log::info; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - let file = File::open("examples/updates.20011026.1352.bz2").unwrap(); - let reader = BzDecoder::new(&file); - let parser = Parser::new(reader); - - info!("reading via prism"); - let mut formatter = BgpdumpFormatter::new(); - let mut lines1: Vec = parser.into_iter().map(|record|{ - formatter.to_elems(&record) - }).flat_map(|x|x).collect::>(); - - info!("reading via bgpdump"); - let file2 = File::open("examples/updates.20011026.1352.bgpdump.txt.bz2").unwrap(); - let reader2 = BzDecoder::new(file2); - let mut lines2 = BufReader::new(reader2).lines() - .filter_map(|x|x.ok()) - .collect::>(); - info!("sorting bgpdump results"); - lines1.sort(); - lines2.sort(); - info!("comapring results"); - - let mut iter1 = lines1.iter(); - for line1 in lines2 { - let line2 = iter1.next().unwrap().clone(); - assert_eq!(line1, line2); - } - } - - #[test] - fn test_full_file_comparison_new() { - use log::info; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - info!("reading via bgpdump"); - let file2 = File::open("examples/updates.20210101.0000.bgpdump.txt").unwrap(); - let mut lines2 = BufReader::new(file2).lines() - .filter_map(|x|x.ok()) - .collect::>(); - lines2.sort(); - let mut iter2 = lines2.iter(); - - let file = File::open("examples/updates.20210101.0000.bz2").unwrap(); - let reader = BzDecoder::new(&file); - let parser = Parser::new(reader); - - info!("reading via prism"); - let mut formatter = BgpdumpFormatter::new(); - let mut lines1: Vec = parser.into_iter().map(|record|{ - formatter.to_elems(&record) - }).flat_map(|x|x).collect::>(); - - lines1.sort(); - - for prism_line in lines1 { - let bgpdump_line = iter2.next().unwrap().clone(); - if prism_line != bgpdump_line { - // dbg!(prism_line, bgpdump_line); - } - assert_eq!(prism_line, bgpdump_line) - } - } - - #[test] - fn test_full_file_comparison_ribs_new() { - use log::info; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - info!("reading via bgpdump"); - let file2 = File::open("examples/table-dump-v2-rib.20140604.1600.bgpdump.txt").unwrap(); - let mut lines2 = BufReader::new(file2).lines() - .filter_map(|x|x.ok()) - .collect::>(); - lines2.sort(); - let mut iter2 = lines2.iter(); - - let file = File::open("examples/table-dump-v2-rib.20140604.1600.bz2").unwrap(); - let reader = BzDecoder::new(&file); - let parser = Parser::new(reader); - - info!("reading via prism"); - let mut formatter = BgpdumpFormatter::new(); - let mut lines1: Vec = parser.into_iter().map(|record|{ - formatter.to_elems(&record) - }).flat_map(|x|x).collect::>(); - - lines1.sort(); - - for prism_line in lines1 { - let bgpdump_line = iter2.next().unwrap().clone(); - if prism_line != bgpdump_line { - // dbg!(prism_line, bgpdump_line); - } - assert_eq!(prism_line, bgpdump_line) - } - } - - #[test] - fn test_full_file_comparison_ribs_old() { - use log::info; - env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); - - info!("reading via bgpdump"); - let file2 = File::open("examples/table-dump-rib.20011026.1648.bgpdump.txt").unwrap(); - let mut lines2 = BufReader::new(file2).lines() - .filter_map(|x|x.ok()) - .collect::>(); - lines2.sort(); - let mut iter2 = lines2.iter(); - - let file = File::open("examples/table-dump-rib.20011026.1648.bz2").unwrap(); - let reader = BzDecoder::new(&file); - let parser = Parser::new(reader); - - info!("reading via prism"); - let mut formatter = BgpdumpFormatter::new(); - let mut lines1: Vec = parser.into_iter().map(|record|{ - formatter.to_elems(&record) - }).flat_map(|x|x).collect::>(); - - lines1.sort(); - - for prism_line in lines1 { - let bgpdump_line = iter2.next().unwrap().clone(); - if prism_line != bgpdump_line { - // dbg!(prism_line, bgpdump_line); - } - assert_eq!(prism_line, bgpdump_line) - } - } -} diff --git a/src/formats/mod.rs b/src/formats/mod.rs deleted file mode 100644 index 139597f..0000000 --- a/src/formats/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ - - diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..486f28c --- /dev/null +++ b/src/io.rs @@ -0,0 +1,64 @@ +use std::fs::File; +use std::io::{BufReader, Cursor, Read}; +use bzip2::bufread::BzDecoder; +use flate2::bufread::GzDecoder; + +pub(crate) fn get_reader(path: &str) -> Box { + let file_type = path.split(".").collect::>().last().unwrap().clone(); + assert!(file_type == "gz" || file_type== "bz2"); + + let bytes = Cursor::new( + match path.starts_with("http") { + true => { + reqwest::blocking::get(path) + .unwrap().bytes().unwrap().to_vec() + } + false => { + let mut bytes: Vec = vec![]; + let f = File::open(path).unwrap(); + let mut reader = BufReader::new(f); + + // Read file into vector. + reader.read_to_end(&mut bytes).unwrap(); + bytes + } + } + ); + match file_type { + "gz" => { + let reader = Box::new(GzDecoder::new(bytes)); + Box::new(BufReader::new(reader)) + } + "bz2" => { + let reader = Box::new(BzDecoder::new(bytes)); + Box::new(BufReader::new(reader)) + } + t => { + panic!("unknown file type: {}", t) + } + } +} + +#[cfg(test)] +mod tests { + use crate::BgpkitParser; + + #[test] + fn test_open_any() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + let url = "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"; + let parser = BgpkitParser::new(url); + + log::info!("parsing updates file"); + // iterating through the parser. the iterator returns `BgpElem` one at a time. + for elem in parser { + // each BGP announcement contains one AS path, which depending on the path segment's type + // there could be multiple origin ASNs (e.g. AS-Set as the origin) + if let Some(origins) = &elem.origin_asns { + if origins.contains(&13335) { + log::info!("{}", &elem); + } + } + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 9601fe8..20b27e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,159 @@ +/*! +BGPKIT Parser aims to provides the most ergonomic MRT/BGP message parsing Rust API. + +Features: +- **performant**: comparable to C-based implementations like `bgpdump` or `bgpreader`. +- **actively maintained**: we consistently introduce feature updates and bug fixes, and support most of the relevant BGP RFCs. +- **ergonomic API**: a three-line for loop can already get you started. +- **battery-included**: ready to handle remote or local, bzip2 or gz data files out of the box + +## Examples + +### Parsing single MRT file + +Let's say we want to print out all the BGP announcements/withdrawal from a single MRT file, either located remotely or locally. +Here is an example that does so. + +```rust +use bgpkit_parser::BgpkitParser; +fn main() { + let parser = BgpkitParser::new("http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"); + for elem in parser { + println!("{}", elem) + } +} +``` + +Yes, it is this simple! + +You can even do some more interesting iterator operations that are event shorter. +For example, counting the number of announcements/withdrawals in that file: +```rust +use bgpkit_parser::BgpkitParser; +fn main() { + let url = "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"; + let count = BgpkitParser::new(url).into_iter().count(); + println!("total: {}", count); +} +``` + +and it prints out +```no_run +total: 255849 +``` + +### Parsing multiple MRT files with BGPKIT Broker + +[BGPKIT Broker][broker-repo] library provides search API for all RouteViews and RIPE RIS MRT data files. Using the +broker's Rust API ([`bgpkit-broker`][broker-crates-io]), we can easily compile a list of MRT files that we are interested +in for any time period and any data type (`update` or `rib`). This allows users to gather information without needing to +know about locations of specific data files. + +[broker-repo]: https://github.com/bgpkit/bgpkit-broker +[broker-crates-io]: https://crates.io/crates/bgpkit-broker + +The example below shows a relatively more interesting example that does the following: +- find all BGP archive data created on time 1634693400 +- filter to only BGP updates files +- find all announcements originated from AS13335 +- print out the total count of the announcements + +```rust +use bgpkit_parser::{BgpkitParser, BgpElem}; +fn main(){ + // set broker query parameters + let mut params = bgpkit_broker::QueryParams::new(); + params = params.start_ts(1634693400); + params = params.end_ts(1634693400); + params = params.data_type("update"); + let mut broker = bgpkit_broker::BgpkitBroker::new("https://api.broker.bgpkit.com/v1"); + broker.set_params(¶ms); + + // loop through data files found by broker + for item in broker { + + // create a parser that takes an URL and automatically determine + // the file location and file type, and handles data download and + // decompression streaming intelligently + let parser = BgpkitParser::new(item.url.as_str()); + + // iterating through the parser. the iterator returns `BgpElem` one at a time. + let elems = parser.into_elem_iter().map(|elem|{ + if let Some(origins) = &elem.origin_asns { + if origins.contains(&13335) { + Some(elem) + } else { + None + } + } else { + None + } + }).filter_map(|x|x).collect::>(); + log::info!("{} elems matches", elems.len()); + } +} +``` + +## Data Representation + +There are two key data structure to understand for the parsing results: [MrtRecord][bgp_models::mrt::MrtRecord] and [BgpElem]. + +### `MrtRecord`: unmodified MRT information representation + +The MrtRecord is the data structrue that holds the unmodified, complete information parsed +from the MRT data file. The code definition of the `MrtRecord` is defined in the crate `bgp-models` ([documentation][mrt-record-doc]). + +```no_run +pub struct MrtRecord { + pub common_header: CommonHeader, + pub message: MrtMessage, +} + +pub enum MrtMessage { + TableDumpMessage(TableDumpMessage), + TableDumpV2Message(TableDumpV2Message), + Bgp4Mp(Bgp4Mp), +} +``` + +MrtRecord record representation is concise, storage efficient, but often less convenient to use. For example, when +trying to find out specific BGP announcements for certain IP prefix, we often needs to go through nested layers of +internal data structure (NLRI, announced, prefix, or even looking up peer index table for Table Dump V2 format), which +could be irrelevant to what users really want to do. + +### [BgpElem]: per-prefix BGP information, MRT-format-agnostic + +To facilitate simpler data analysis of BGP data, we defined a new data structure called [BgpElem] in this crate. Each +[BgpElem] contains a piece of self-containing BGP information about one single IP prefix. +For example, when a bundled announcement of three prefixes P1, P2, P3 that shares the same AS path is processed, we break +the single record into three different [BgpElem] objects, each presenting a prefix. + +```no_run +pub struct BgpElem { + pub timestamp: f64, + pub elem_type: ElemType, + pub peer_ip: IpAddr, + pub peer_asn: Asn, + pub prefix: NetworkPrefix, + pub next_hop: Option, + pub as_path: Option, + pub origin_asns: Option>, + pub origin: Option, + pub local_pref: Option, + pub med: Option, + pub communities: Option>, + pub atomic: Option, + pub aggr_asn: Option, + pub aggr_ip: Option, +} +``` + +The main benefit of using [BgpElem] is that the analysis can be executed on a per-prefix basis, generic to what the +backend MRT data format (bgp4mp, tabledumpv1, tabledumpv2, etc.). The obvious drawback is that we will have to duplicate +information to save at each elem, that consuming more memory. + +[mrt-record-doc]: https://docs.rs/bgp-models/0.3.4/bgp_models/mrt/struct.MrtRecord.html +*/ extern crate byteorder; extern crate chrono; extern crate ipnetwork; @@ -5,7 +161,8 @@ extern crate num_traits; pub mod error; pub mod parser; -pub mod formats; + +mod io; pub use parser::BgpkitParser; pub use parser::BgpElem; diff --git a/src/parser/iters.rs b/src/parser/iters.rs index 6b1670b..fe770da 100644 --- a/src/parser/iters.rs +++ b/src/parser/iters.rs @@ -1,25 +1,24 @@ use log::{error, warn}; use bgp_models::mrt::MrtRecord; -use std::io::Read; use crate::{BgpElem, Elementor}; use crate::error::ParserError; use crate::parser::BgpkitParser; /// Use [BgpElemIterator] as the default iterator to return [BgpElem]s instead of [MrtRecord]s. -impl IntoIterator for BgpkitParser { +impl IntoIterator for BgpkitParser { type Item = BgpElem; - type IntoIter = ElemIterator; + type IntoIter = ElemIterator; fn into_iter(self) -> Self::IntoIter { ElemIterator::new(self) } } -impl BgpkitParser { - pub fn into_record_iter(self) -> RecordIterator { +impl BgpkitParser { + pub fn into_record_iter(self) -> RecordIterator { RecordIterator::new(self) } - pub fn into_elem_iter(self) -> ElemIterator { + pub fn into_elem_iter(self) -> ElemIterator { ElemIterator::new(self) } } @@ -28,18 +27,20 @@ impl BgpkitParser { MrtRecord Iterator **********/ -pub struct RecordIterator { - parser: BgpkitParser, +pub struct RecordIterator { + parser: BgpkitParser, count: u64, } -impl RecordIterator { - fn new(parser: BgpkitParser) -> RecordIterator { - RecordIterator { parser , count: 0 } +impl RecordIterator { + fn new(parser: BgpkitParser) -> RecordIterator { + RecordIterator { + parser , count: 0, + } } } -impl Iterator for RecordIterator { +impl Iterator for RecordIterator { type Item = MrtRecord; fn next(&mut self) -> Option { @@ -78,20 +79,20 @@ impl Iterator for RecordIterator { BgpElem Iterator **********/ -pub struct ElemIterator { +pub struct ElemIterator { cache_elems: Vec, - record_iter: RecordIterator, + record_iter: RecordIterator, elementor: Elementor, count: u64, } -impl ElemIterator { - fn new(parser: BgpkitParser) -> ElemIterator { +impl ElemIterator { + fn new(parser: BgpkitParser) -> ElemIterator { ElemIterator { record_iter: RecordIterator::new(parser) , count: 0 , cache_elems: vec![], elementor: Elementor::new()} } } -impl Iterator for ElemIterator { +impl Iterator for ElemIterator { type Item = BgpElem; fn next(&mut self) -> Option { diff --git a/src/parser/mod.rs b/src/parser/mod.rs index a85d744..7298eff 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -13,22 +13,24 @@ pub(crate) use mrt::{parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2 pub use crate::error::ParserError; pub use mrt::mrt_elem::{BgpElem, Elementor, ElemType}; +use crate::io::get_reader; -pub struct BgpkitParser { - input: T, +pub struct BgpkitParser { + reader: Box } -impl BgpkitParser { - pub fn new(input: T) -> BgpkitParser { - BgpkitParser { - // input: Some(BufReader::new(input)), - input: input, +impl BgpkitParser { + /// Creating a new parser from a object that implements [Read] trait. + pub fn new(path: &str) -> BgpkitParser{ + BgpkitParser{ + reader: get_reader(path) } } /// This is used in for loop `for item in parser{}` pub fn next(&mut self) -> Result { - parse_mrt_record(&mut self.input) + parse_mrt_record(&mut self.reader) } } + diff --git a/src/parser/mrt/mrt_elem.rs b/src/parser/mrt/mrt_elem.rs index 2751248..c7ac8d1 100644 --- a/src/parser/mrt/mrt_elem.rs +++ b/src/parser/mrt/mrt_elem.rs @@ -486,7 +486,7 @@ impl Display for BgpElem { ElemType::WITHDRAW => "W", }; let format = format!( - "{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", + "|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|", t, &self.timestamp, &self.peer_ip, &self.peer_asn,