diff --git a/src/consumer.rs b/src/consumer.rs index caafbaf..74b15a5 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -2,18 +2,16 @@ //! //! See the [`Consumer`] trait. -use crate::{Headers, ValidatedMessage}; +use crate::message::ValidatedMessage; use async_trait::async_trait; +use bytes::Bytes; use either::Either; use futures_util::stream; use pin_project::pin_project; use std::{ - borrow::Cow, pin::Pin, task::{Context, Poll}, - time::SystemTime, }; -use uuid::Uuid; /// Message consumers ingest messages from a queue service and present them to the user application /// as a [`Stream`](futures_util::stream::Stream). @@ -56,7 +54,7 @@ pub trait Consumer { type Error; /// The stream returned by [`stream`] type Stream: stream::Stream< - Item = Result, Self::Error>, + Item = Result>, Self::Error>, >; /// Begin pulling messages from the backing message service. @@ -89,72 +87,12 @@ pub trait DecodableMessage { type Decoder; /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result where Self: Sized; } -#[derive(Debug, Clone)] -pub struct DecodedMessage { - /// Unique message identifier. - id: Uuid, - /// The timestamp when message was created in the publishing service. - timestamp: SystemTime, - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - schema: Cow<'static, str>, - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - headers: Headers, - - /// The contents of the decoded message. - message: M, -} - -impl DecodedMessage { - /// Unique message identifier. - pub fn uuid(&self) -> &Uuid { - &self.id - } - - /// The timestamp when message was created in the publishing service. - pub fn timestamp(&self) -> &SystemTime { - &self.timestamp - } - - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - pub fn schema(&self) -> &str { - &self.schema - } - - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - pub fn headers(&self) -> &Headers { - &self.headers - } - - /// Mutable access to the message headers - pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers - } - - /// The encoded message data. - pub fn message(&self) -> &M { - &self.message - } - - /// Destructure this message into just the contained data - pub fn into_message(self) -> M { - self.message - } -} - -impl DecodableMessage for DecodedMessage +impl DecodableMessage for ValidatedMessage where M: DecodableMessage, { @@ -165,17 +103,17 @@ where type Decoder = M::Decoder; /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result where Self: Sized, { let message = M::decode(msg.clone(), decoder)?; - Ok(DecodedMessage { + Ok(Self { id: msg.id, timestamp: msg.timestamp, schema: msg.schema, headers: msg.headers, - message: message, + data: message, }) } } @@ -275,7 +213,7 @@ pub struct MessageStream { impl stream::Stream for MessageStream where S: stream::Stream< - Item = Result, StreamError>, + Item = Result>, StreamError>, >, M: DecodableMessage, { diff --git a/src/lib.rs b/src/lib.rs index f5bd990..0aa5a93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,14 +75,14 @@ //! ``` #![cfg_attr(docsrs, feature(doc_cfg))] -use std::{borrow::Cow, collections::BTreeMap, time::SystemTime}; pub use topic::Topic; use bytes::Bytes; -use uuid::Uuid; +use std::collections::BTreeMap; mod backends; mod consumer; +pub mod message; mod publisher; mod tests; mod topic; @@ -117,81 +117,4 @@ pub enum Error { pub type Headers = BTreeMap; /// A validated message. -/// -/// These are created by validators after encoding a user message, or when pulling messages from -/// the message service. -#[derive(Debug, Clone)] -// derive Eq only in tests so that users can't foot-shoot an expensive == over data -#[cfg_attr(test, derive(PartialEq, Eq))] -pub struct ValidatedMessage { - /// Unique message identifier. - id: Uuid, - /// The timestamp when message was created in the publishing service. - timestamp: SystemTime, - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - schema: Cow<'static, str>, - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - headers: Headers, - /// The encoded message data. - data: Bytes, -} - -impl ValidatedMessage { - /// Create a new validated message - pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self - where - S: Into>, - D: Into, - { - Self { - id, - timestamp, - schema: schema.into(), - headers, - data: data.into(), - } - } - - /// Unique message identifier. - pub fn uuid(&self) -> &Uuid { - &self.id - } - - /// The timestamp when message was created in the publishing service. - pub fn timestamp(&self) -> &SystemTime { - &self.timestamp - } - - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - pub fn schema(&self) -> &str { - &self.schema - } - - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - pub fn headers(&self) -> &Headers { - &self.headers - } - - /// Mutable access to the message headers - pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers - } - - /// The encoded message data. - pub fn data(&self) -> &[u8] { - &self.data - } - - /// Destructure this message into just the contained data - pub fn into_data(self) -> Bytes { - self.data - } -} +pub type ValidatedMessage = message::ValidatedMessage; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..aaf6e2e --- /dev/null +++ b/src/message.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; +use std::{borrow::Cow, time::SystemTime}; +use uuid::Uuid; + +use crate::Headers; + +/// A validated message. +/// +/// These are created by validators after encoding a user message, or when pulling messages from +/// the message service. +#[derive(Debug, Clone)] +// derive Eq only in tests so that users can't foot-shoot an expensive == over data +#[cfg_attr(test, derive(PartialEq, Eq))] +pub struct ValidatedMessage { + /// Unique message identifier. + pub(crate) id: Uuid, + /// The timestamp when message was created in the publishing service. + pub(crate) timestamp: SystemTime, + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub(crate) schema: Cow<'static, str>, + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub(crate) headers: Headers, + /// The message data. + pub(crate) data: M, +} + +impl ValidatedMessage { + /// Create a new validated message + pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self + where + S: Into>, + D: Into, + { + Self { + id, + timestamp, + schema: schema.into(), + headers, + data: data.into(), + } + } +} + +impl ValidatedMessage { + /// Unique message identifier. + pub fn uuid(&self) -> &Uuid { + &self.id + } + + /// The timestamp when message was created in the publishing service. + pub fn timestamp(&self) -> &SystemTime { + &self.timestamp + } + + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub fn schema(&self) -> &str { + &self.schema + } + + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Mutable access to the message headers + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// The encoded message data. + pub fn data(&self) -> &M { + &self.data + } + + /// Destructure this message into just the contained data + pub fn into_data(self) -> M { + self.data + } +} diff --git a/src/tests/google.rs b/src/tests/google.rs index 9def63d..e00c8f3 100644 --- a/src/tests/google.rs +++ b/src/tests/google.rs @@ -7,6 +7,7 @@ use crate::{ AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError, StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName, }, + message, validators::{ prost::{ExactSchemaMatcher, SchemaMismatchError}, ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError, @@ -65,7 +66,7 @@ fn decode_with_headers() -> Result<(), BoxError> { let encoded = orig_message.encode(&ProstValidator::new())?; - let decoded = crate::DecodedMessage::::decode( + let decoded = message::ValidatedMessage::::decode( encoded, &ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)), )?;