diff --git a/.github/workflows/hedwig.yml b/.github/workflows/hedwig.yml index 0bbb7c0..16f59b7 100644 --- a/.github/workflows/hedwig.yml +++ b/.github/workflows/hedwig.yml @@ -55,7 +55,7 @@ jobs: strategy: fail-fast: false matrix: - rust_toolchain: [nightly, stable, 1.53.0] + rust_toolchain: [nightly, stable, 1.60.0] os: [ubuntu-latest] timeout-minutes: 20 steps: diff --git a/src/consumer.rs b/src/consumer.rs index 575946d..74b15a5 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -2,8 +2,9 @@ //! //! See the [`Consumer`] trait. -use crate::ValidatedMessage; +use crate::message::ValidatedMessage; use async_trait::async_trait; +use bytes::Bytes; use either::Either; use futures_util::stream; use pin_project::pin_project; @@ -53,7 +54,7 @@ pub trait Consumer { type Error; /// The stream returned by [`stream`] type Stream: stream::Stream< - Item = Result, Self::Error>, + Item = Result>, Self::Error>, >; /// Begin pulling messages from the backing message service. @@ -86,11 +87,37 @@ pub trait DecodableMessage { type Decoder; /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result where Self: Sized; } +impl DecodableMessage for ValidatedMessage +where + M: DecodableMessage, +{ + /// The error returned when a message fails to decode + type Error = M::Error; + + /// The decoder used to decode a validated message + type Decoder = M::Decoder; + + /// Decode the given message, using the given decoder, into its structured type + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + where + Self: Sized, + { + let message = M::decode(msg.clone(), decoder)?; + Ok(Self { + id: msg.id, + timestamp: msg.timestamp, + schema: msg.schema, + headers: msg.headers, + data: message, + }) + } +} + /// A received message which can be acknowledged to prevent re-delivery by the backing message /// service. /// @@ -186,7 +213,7 @@ pub struct MessageStream { impl stream::Stream for MessageStream where S: stream::Stream< - Item = Result, StreamError>, + Item = Result>, StreamError>, >, M: DecodableMessage, { diff --git a/src/lib.rs b/src/lib.rs index f5bd990..0aa5a93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,14 +75,14 @@ //! ``` #![cfg_attr(docsrs, feature(doc_cfg))] -use std::{borrow::Cow, collections::BTreeMap, time::SystemTime}; pub use topic::Topic; use bytes::Bytes; -use uuid::Uuid; +use std::collections::BTreeMap; mod backends; mod consumer; +pub mod message; mod publisher; mod tests; mod topic; @@ -117,81 +117,4 @@ pub enum Error { pub type Headers = BTreeMap; /// A validated message. -/// -/// These are created by validators after encoding a user message, or when pulling messages from -/// the message service. -#[derive(Debug, Clone)] -// derive Eq only in tests so that users can't foot-shoot an expensive == over data -#[cfg_attr(test, derive(PartialEq, Eq))] -pub struct ValidatedMessage { - /// Unique message identifier. - id: Uuid, - /// The timestamp when message was created in the publishing service. - timestamp: SystemTime, - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - schema: Cow<'static, str>, - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - headers: Headers, - /// The encoded message data. - data: Bytes, -} - -impl ValidatedMessage { - /// Create a new validated message - pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self - where - S: Into>, - D: Into, - { - Self { - id, - timestamp, - schema: schema.into(), - headers, - data: data.into(), - } - } - - /// Unique message identifier. - pub fn uuid(&self) -> &Uuid { - &self.id - } - - /// The timestamp when message was created in the publishing service. - pub fn timestamp(&self) -> &SystemTime { - &self.timestamp - } - - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - pub fn schema(&self) -> &str { - &self.schema - } - - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - pub fn headers(&self) -> &Headers { - &self.headers - } - - /// Mutable access to the message headers - pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers - } - - /// The encoded message data. - pub fn data(&self) -> &[u8] { - &self.data - } - - /// Destructure this message into just the contained data - pub fn into_data(self) -> Bytes { - self.data - } -} +pub type ValidatedMessage = message::ValidatedMessage; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..332a874 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; +use std::{borrow::Cow, time::SystemTime}; +use uuid::Uuid; + +use crate::Headers; + +/// A validated message. +/// +/// These are created by validators after encoding a user message, or when pulling messages from +/// the message service. +#[derive(Debug, Clone)] +// derive Eq only in tests so that users can't foot-shoot an expensive == over data +#[cfg_attr(test, derive(PartialEq, Eq))] +pub struct ValidatedMessage { + /// Unique message identifier. + pub(crate) id: Uuid, + /// The timestamp when message was created in the publishing service. + pub(crate) timestamp: SystemTime, + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub(crate) schema: Cow<'static, str>, + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub(crate) headers: Headers, + /// The message data. + pub(crate) data: M, +} + +impl ValidatedMessage { + /// Create a new validated message + pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self + where + S: Into>, + D: Into, + { + Self { + id, + timestamp, + schema: schema.into(), + headers, + data: data.into(), + } + } +} + +impl ValidatedMessage { + /// Unique message identifier. + pub fn uuid(&self) -> &Uuid { + &self.id + } + + /// The timestamp when message was created in the publishing service. + pub fn timestamp(&self) -> &SystemTime { + &self.timestamp + } + + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub fn schema(&self) -> &str { + &self.schema + } + + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Mutable access to the message headers + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// The message data. + pub fn data(&self) -> &M { + &self.data + } + + /// Destructure this message into just the contained data + pub fn into_data(self) -> M { + self.data + } +} diff --git a/src/tests/google.rs b/src/tests/google.rs index 444ff84..e00c8f3 100644 --- a/src/tests/google.rs +++ b/src/tests/google.rs @@ -7,6 +7,7 @@ use crate::{ AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError, StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName, }, + message, validators::{ prost::{ExactSchemaMatcher, SchemaMismatchError}, ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError, @@ -42,7 +43,7 @@ impl EncodableMessage for TestMessage { uuid::Uuid::nil(), std::time::SystemTime::UNIX_EPOCH, SCHEMA, - Headers::default(), + Headers::from([(String::from("key"), String::from("value"))]), self, ) } @@ -57,6 +58,26 @@ impl DecodableMessage for TestMessage { } } +#[test] +fn decode_with_headers() -> Result<(), BoxError> { + let orig_message = TestMessage { + payload: "foobar".into(), + }; + + let encoded = orig_message.encode(&ProstValidator::new())?; + + let decoded = message::ValidatedMessage::::decode( + encoded, + &ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)), + )?; + + let headers = Headers::from([(String::from("key"), String::from("value"))]); + + assert_eq!(decoded.headers(), &headers); + + Ok(()) +} + #[tokio::test] #[ignore = "pubsub emulator is finicky, run this test manually"] async fn roundtrip_protobuf() -> Result<(), BoxError> {