diff --git a/Cargo.lock b/Cargo.lock index f2e156a21..08fc134e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,7 @@ dependencies = [ "once_cell", "parquet", "prost 0.13.3", + "rabbitmq-stream-client", "rand 0.8.5", "rdkafka 0.36.2", "rdkafka-sys", @@ -2517,6 +2518,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "copy-artifacts" version = "0.13.0-dev" @@ -3519,6 +3526,19 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "derive_more" +version = "0.99.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.87", +] + [[package]] name = "digest" version = "0.10.7" @@ -5946,6 +5966,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + [[package]] name = "nanoid" version = "0.4.0" @@ -6389,6 +6415,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c65ee1f9701bf938026630b455d5315f490640234259037edb259798b3bcf85e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_pipe" version = "1.2.1" @@ -7476,6 +7511,44 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rabbitmq-stream-client" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45321d9731c5121e3d769f05e3d06fe03649b04b649b8054a77dcf149277da3d" +dependencies = [ + "async-trait", + "bytes", + "dashmap 6.1.0", + "futures", + "murmur3", + "pin-project", + "rabbitmq-stream-protocol", + "rand 0.8.5", + "rustls-pemfile 1.0.4", + "thiserror 2.0.3", + "tokio", + "tokio-rustls 0.24.1", + "tokio-stream", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "rabbitmq-stream-protocol" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4acb05c1d51f5836f7358281890ac36d19092dc2095d71f4f577dbbfabcf13" +dependencies = [ + "byteorder", + "chrono", + "derive_more", + "num_enum 0.7.3", + "ordered-float 4.5.0", + "uuid", +] + [[package]] name = "radium" version = "0.7.0" diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index 4c9082651..2763ce002 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -65,6 +65,9 @@ tokio-tungstenite = { version = "0.24", features = ["native-tls"] } # Webhook reqwest = { workspace = true, features = ["stream"] } +# RabbitMQ Stream +rabbitmq-stream-client = "0.7" + # Redis redis = { version = "0.27", features = ["default", "tokio-rustls-comp", "cluster-async", "connection-manager"] } diff --git a/crates/arroyo-connectors/src/lib.rs b/crates/arroyo-connectors/src/lib.rs index 6765a06d3..ad42df538 100644 --- a/crates/arroyo-connectors/src/lib.rs +++ b/crates/arroyo-connectors/src/lib.rs @@ -22,6 +22,7 @@ use fluvio::FluvioConnector; use impulse::ImpulseConnector; use nats::NatsConnector; use nexmark::NexmarkConnector; +use rabbitmq_stream::RabbitmqStreamConnector; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::Client; use serde::{Deserialize, Serialize}; @@ -46,6 +47,7 @@ pub mod nats; pub mod nexmark; pub mod polling_http; pub mod preview; +pub mod rabbitmq_stream; pub mod redis; pub mod single_file; pub mod sse; @@ -68,6 +70,7 @@ pub fn connectors() -> HashMap<&'static str, Box> { Box::new(NexmarkConnector {}), Box::new(PollingHTTPConnector {}), Box::new(PreviewConnector {}), + Box::new(RabbitmqStreamConnector {}), Box::new(RedisConnector {}), Box::new(SingleFileConnector {}), Box::new(SSEConnector {}), diff --git a/crates/arroyo-connectors/src/rabbitmq_stream/mod.rs b/crates/arroyo-connectors/src/rabbitmq_stream/mod.rs new file mode 100644 index 000000000..05d3ae334 --- /dev/null +++ b/crates/arroyo-connectors/src/rabbitmq_stream/mod.rs @@ -0,0 +1,276 @@ +use anyhow::{anyhow, bail}; +use arroyo_operator::connector::{Connection, Connector}; +use arroyo_operator::operator::OperatorNode; +use arroyo_rpc::{api_types::connections::TestSourceMessage, OperatorConfig}; +use rabbitmq_stream_client::types::OffsetSpecification; +use rabbitmq_stream_client::{Environment, TlsConfiguration}; +use serde::{Deserialize, Serialize}; +use typify::import_types; + +use crate::rabbitmq_stream::source::RabbitmqStreamSourceFunc; +use crate::{pull_opt, ConnectionType}; + +mod source; + +pub struct RabbitmqStreamConnector {} + +const CONFIG_SCHEMA: &str = include_str!("./profile.json"); +const TABLE_SCHEMA: &str = include_str!("./table.json"); +const ICON: &str = include_str!("./rabbitmq_stream.svg"); + +import_types!(schema = "src/rabbitmq_stream/profile.json"); +import_types!(schema = "src/rabbitmq_stream/table.json"); + +impl Connector for RabbitmqStreamConnector { + type ProfileT = RabbitmqStreamConfig; + type TableT = RabbitmqStreamTable; + + fn name(&self) -> &'static str { + "rabbitmq_stream" + } + + fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector { + arroyo_rpc::api_types::connections::Connector { + id: self.name().to_string(), + name: "RabbitMQ Stream".to_string(), + icon: ICON.to_string(), + description: "RabbitMQ stream source".to_string(), + enabled: true, + source: true, + sink: false, + testing: false, + hidden: false, + custom_schemas: true, + connection_config: Some(CONFIG_SCHEMA.to_string()), + table_config: TABLE_SCHEMA.to_string(), + } + } + + fn table_type( + &self, + _: Self::ProfileT, + _: Self::TableT, + ) -> arroyo_rpc::api_types::connections::ConnectionType { + ConnectionType::Source + } + + fn test( + &self, + _: &str, + _: Self::ProfileT, + _: Self::TableT, + _: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>, + tx: tokio::sync::mpsc::Sender, + ) { + // TODO + tokio::task::spawn(async move { + let message = TestSourceMessage { + error: false, + done: true, + message: "Successfully validated connection".to_string(), + }; + tx.send(message).await.unwrap(); + }); + } + + fn from_options( + &self, + name: &str, + options: &mut std::collections::HashMap, + schema: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>, + profile: Option<&arroyo_rpc::api_types::connections::ConnectionProfile>, + ) -> anyhow::Result { + let connection_config = match profile { + Some(connection_profile) => { + serde_json::from_value(connection_profile.config.clone()) + .map_err(|e| anyhow!("Failed to parse connection config: {:?}", e))? + } + None => { + let host = options.remove("host"); + let username = options.remove("username"); + let password = options.remove("password"); + let virtual_host = options.remove("virtual_host"); + let port = match options.remove("port") { + Some(v) => Some(v.parse::()?), + None => None, + }; + + let tls_config = options + .remove("tls_config.enabled") + .map(|enabled| TlsConfig { + enabled: Some(enabled == "true"), + trust_certificates: options + .remove("tls_config.trust_certificates") + .map(|trust_certificates| trust_certificates == "true"), + root_certificates_path: options.remove("tls_config.root_certificates_path"), + client_certificates_path: options + .remove("tls_config.client_certificates_path"), + client_keys_path: options.remove("tls_config.client_keys_path"), + }); + + let load_balancer_mode = options.remove("json.unstructured").map(|t| t == "true"); + + RabbitmqStreamConfig { + host, + username, + password, + virtual_host, + port, + load_balancer_mode, + tls_config, + } + } + }; + + let stream = pull_opt("stream", options)?; + let table_type = pull_opt("type", options)?; + + let table_type = match table_type.as_str() { + "source" => { + let offset = options.remove("source.offset"); + TableType::Source { + offset: match offset.as_deref() { + Some("first") => SourceOffset::First, + Some("next") => SourceOffset::Next, + None | Some("last") => SourceOffset::Last, + Some(other) => bail!("invalid value for source.offset '{}'", other), + }, + } + } + _ => { + bail!("type must 'source'"); + } + }; + + let table = RabbitmqStreamTable { + stream, + type_: table_type, + }; + + Self::from_config(self, None, name, connection_config, table, schema) + } + + fn from_config( + &self, + id: Option, + name: &str, + config: Self::ProfileT, + table: Self::TableT, + schema: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>, + ) -> anyhow::Result { + let (typ, desc) = match table.type_ { + TableType::Source { .. } => ( + ConnectionType::Source, + format!("RabbitmqStreamSource<{}>", table.stream), + ), + TableType::Sink { .. } => ( + ConnectionType::Sink, + format!("RabbitmqStreamSink<{}>", table.stream), + ), + }; + + let schema = schema + .map(|s| s.to_owned()) + .ok_or_else(|| anyhow!("no schema defined for RabbitMQ Stream connection"))?; + + let format = schema + .format + .as_ref() + .map(|t| t.to_owned()) + .ok_or_else(|| anyhow!("'format' must be set for RabbitMQ Stream connection"))?; + + let config = OperatorConfig { + connection: serde_json::to_value(config).unwrap(), + table: serde_json::to_value(table).unwrap(), + rate_limit: None, + format: Some(format), + bad_data: schema.bad_data.clone(), + framing: schema.framing.clone(), + metadata_fields: schema.metadata_fields(), + }; + + Ok(Connection { + id, + connector: self.name(), + name: name.to_string(), + connection_type: typ, + schema, + config: serde_json::to_string(&config).unwrap(), + description: desc, + }) + } + + fn make_operator( + &self, + profile: Self::ProfileT, + table: Self::TableT, + config: arroyo_rpc::OperatorConfig, + ) -> anyhow::Result { + match table.type_ { + TableType::Source { offset } => Ok(OperatorNode::from_source(Box::new( + RabbitmqStreamSourceFunc { + config: profile, + stream: table.stream, + offset_mode: offset, + format: config + .format + .ok_or_else(|| anyhow!("format required for rabbitmq stream source"))?, + framing: config.framing, + bad_data: config.bad_data, + }, + ))), + TableType::Sink { .. } => { + todo!() + } + } + } +} + +impl RabbitmqStreamConfig { + async fn get_environment(&mut self) -> anyhow::Result { + let builder = Environment::builder() + .host(&self.host.clone().unwrap_or("localhost".to_owned())) + .username(&self.username.clone().unwrap_or("guest".to_owned())) + .password(&self.password.clone().unwrap_or("guest".to_owned())) + .virtual_host(&self.virtual_host.clone().unwrap_or("/".to_owned())) + .port(self.port.unwrap_or(5552)); + + let builder = if let Some(tls_config) = self.tls_config.clone() { + builder.tls(tls_config.into()) + } else { + builder + }; + let builder = if let Some(load_balancer_mode) = self.load_balancer_mode { + builder.load_balancer_mode(load_balancer_mode) + } else { + builder + }; + + let environment = builder.build().await?; + Ok(environment) + } +} + +impl SourceOffset { + pub fn offset(&self) -> OffsetSpecification { + match self { + SourceOffset::First => OffsetSpecification::First, + SourceOffset::Last => OffsetSpecification::Last, + SourceOffset::Next => OffsetSpecification::Next, + } + } +} + +impl From for TlsConfiguration { + fn from(val: TlsConfig) -> Self { + let mut config = TlsConfiguration::default(); + config.enable(val.enabled.unwrap_or(false)); + config.trust_certificates(val.trust_certificates.unwrap_or(false)); + config.add_root_certificates_path(val.root_certificates_path.unwrap_or(String::from(""))); + config.add_client_certificates_keys( + val.client_certificates_path.unwrap_or(String::from("")), + val.client_keys_path.clone().unwrap_or(String::from("")), + ); + config + } +} diff --git a/crates/arroyo-connectors/src/rabbitmq_stream/profile.json b/crates/arroyo-connectors/src/rabbitmq_stream/profile.json new file mode 100644 index 000000000..e4231e1a0 --- /dev/null +++ b/crates/arroyo-connectors/src/rabbitmq_stream/profile.json @@ -0,0 +1,66 @@ +{ + "type": "object", + "title": "RabbitmqStreamConfig", + "properties": { + "host": { + "title": "Host", + "type": "string", + "description": "The RabbitMQ stream host to connect to; leave blank to use default (localhost)" + }, + "username": { + "title": "Username", + "type": "string", + "description": "The RabbitMQ username to connect to; leave blank to use default (guest)" + }, + "password": { + "title": "Password", + "type": "string", + "description": "The RabbitMQ password to connect to; leave blank to use default (guest)" + }, + "virtual_host": { + "title": "Virtual Host", + "type": "string", + "description": "The RabbitMQ virtual host to connect to; leave blank to use default (/)" + }, + "port": { + "title": "Port", + "type": "integer", + "minimum": 0, + "maximum": 65535, + "description": "The RabbitMQ port to connect to; leave blank to use default (5552)" + }, + "tls_config": { + "title": "TLS config", + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "Enable TLS" + }, + "trust_certificates": { + "type": "boolean", + "description": "Trust certificate" + }, + "root_certificates_path": { + "type": "string", + "description": "Root certificates path" + }, + "client_certificates_path": { + "type": "string", + "description": "Client certificates path" + }, + "client_keys_path": { + "type": "string", + "description": "Client keys path" + } + }, + "additionalProperties": false + }, + "load_balancer_mode": { + "title": "Load balancer mode", + "type": "boolean", + "description": "Enable load balancer mode" + } + }, + "sensitive": ["password"] +} diff --git a/crates/arroyo-connectors/src/rabbitmq_stream/rabbitmq_stream.svg b/crates/arroyo-connectors/src/rabbitmq_stream/rabbitmq_stream.svg new file mode 100644 index 000000000..f64ddc71f --- /dev/null +++ b/crates/arroyo-connectors/src/rabbitmq_stream/rabbitmq_stream.svg @@ -0,0 +1 @@ + diff --git a/crates/arroyo-connectors/src/rabbitmq_stream/source.rs b/crates/arroyo-connectors/src/rabbitmq_stream/source.rs new file mode 100644 index 000000000..6664211c7 --- /dev/null +++ b/crates/arroyo-connectors/src/rabbitmq_stream/source.rs @@ -0,0 +1,178 @@ +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; + +use arroyo_operator::{context::ArrowContext, operator::SourceOperator, SourceFinishType}; +use arroyo_rpc::formats::{BadData, Format, Framing}; +use arroyo_rpc::grpc::rpc::TableConfig; +use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage}; +use arroyo_state::tables::global_keyed_map::GlobalKeyedView; +use arroyo_types::UserError; +use async_trait::async_trait; +use bincode::{Decode, Encode}; +use rabbitmq_stream_client::types::OffsetSpecification; +use rabbitmq_stream_client::Consumer; +use tokio::{select, time::MissedTickBehavior}; +use tokio_stream::StreamExt; +use tracing::{debug, error, info}; + +use super::{RabbitmqStreamConfig, SourceOffset}; + +pub struct RabbitmqStreamSourceFunc { + pub config: RabbitmqStreamConfig, + pub stream: String, + pub offset_mode: SourceOffset, + pub format: Format, + pub framing: Option, + pub bad_data: Option, +} + +#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, PartialOrd)] +pub struct RabbitmqStreamState { + offset: u64, +} + +#[async_trait] +impl SourceOperator for RabbitmqStreamSourceFunc { + fn name(&self) -> String { + format!("rabbitmq-stream-{}", self.stream) + } + + fn tables(&self) -> HashMap { + arroyo_state::global_table_config("s", "rabbitmq stream source state") + } + + async fn on_start(&mut self, ctx: &mut ArrowContext) { + ctx.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + ); + } + + async fn run(&mut self, ctx: &mut ArrowContext) -> SourceFinishType { + match self.run_int(ctx).await { + Ok(r) => r, + Err(e) => { + ctx.report_error(e.name.clone(), e.details.clone()).await; + + panic!("{}: {}", e.name, e.details); + } + } + } +} + +impl RabbitmqStreamSourceFunc { + async fn get_consumer(&mut self, ctx: &mut ArrowContext) -> anyhow::Result { + info!( + "Creating rabbitmq stream consumer for {}", + self.config.host.clone().unwrap_or("localhost".to_string()) + ); + let environment = self.config.get_environment().await?; + + let s: &mut GlobalKeyedView = ctx + .table_manager + .get_global_keyed_state("s") + .await + .expect("should be able to get rabbitmq stream state"); + let state: HashMap = s.get_all().clone(); + + let offset = state + .get(&self.stream) + .map(|s| OffsetSpecification::Offset(s.offset)) + .unwrap_or_else(|| self.offset_mode.offset()); + + let consumer = environment + .consumer() + .offset(offset) + .client_provided_name(&ctx.task_info.operator_name) + .build(&self.stream) + .await?; + + Ok(consumer) + } + + async fn run_int(&mut self, ctx: &mut ArrowContext) -> Result { + let mut consumer = self.get_consumer(ctx).await.map_err(|e| { + UserError::new( + "Could not create RabbitMQ Stream consumer", + format!("{:?}", e), + ) + })?; + + let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); + flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut offset: u64 = 0; + + loop { + select! { + delivery = consumer.next() => { + match delivery { + Some(Ok(delivery)) => { + let message = delivery.message(); + + if let Some(data) = message.data() { + let timestamp = SystemTime::now(); + ctx.deserialize_slice(data, timestamp, None).await?; + } + + if ctx.should_flush() { + ctx.flush_buffer().await?; + } + + offset = delivery.offset(); + }, + Some(Err(e)) => { + error!("encountered error {:?} while reading stream {}", e, self.stream); + }, + None => { + panic!("Stream closed"); + } + } + + }, + _ = flush_ticker.tick() => { + if ctx.should_flush() { + ctx.flush_buffer().await?; + } + }, + control_message = ctx.control_rx.recv() => { + info!("control_message {:?}", control_message); + match control_message { + Some(ControlMessage::Checkpoint(c)) => { + debug!("starting checkpointing {}", ctx.task_info.task_index); + + let s = ctx.table_manager.get_global_keyed_state("s") + .await + .expect("should be able to get rabbitmq stream state"); + s.insert(self.stream.clone(), offset).await; + + if self.start_checkpoint(c, ctx).await { + return Ok(SourceFinishType::Immediate); + } + }, + Some(ControlMessage::Stop { mode }) => { + info!("Stopping RabbitMQ Stream source: {:?}", mode); + match mode { + StopMode::Graceful => { + return Ok(SourceFinishType::Graceful); + } + StopMode::Immediate => { + return Ok(SourceFinishType::Immediate); + } + } + } + Some(ControlMessage::Commit{..}) => { + return Err(UserError::new("RabbitMQ Stream source does not support committing", "")); + } + Some(ControlMessage::LoadCompacted {compacted}) => { + ctx.load_compacted(compacted).await; + } + Some(ControlMessage::NoOp ) => {} + None => {} + } + } + } + } + } +} diff --git a/crates/arroyo-connectors/src/rabbitmq_stream/table.json b/crates/arroyo-connectors/src/rabbitmq_stream/table.json new file mode 100644 index 000000000..725c15ac5 --- /dev/null +++ b/crates/arroyo-connectors/src/rabbitmq_stream/table.json @@ -0,0 +1,37 @@ +{ + "type": "object", + "title": "RabbitmqStreamTable", + "properties": { + "stream": { + "title": "Stream", + "type": "string", + "description": "The RabbitMQ stream to use for this table" + }, + "type": { + "type": "object", + "title": "Table Type", + "oneOf": [ + { + "type": "object", + "title": "Source", + "properties": { + "offset": { + "type": "string", + "description": "The offset to start reading from", + "enum": ["first", "last", "next"] + } + }, + "required": ["offset"], + "additionalProperties": false + }, + { + "type": "object", + "title": "Sink", + "properties": {}, + "additionalProperties": false + } + ] + } + }, + "required": ["stream", "type"] +}