From 5b444763ecfd0f8561ee5dcd4eb4bb9f78d9e6e1 Mon Sep 17 00:00:00 2001 From: Brandon Ogle Date: Mon, 6 Nov 2023 13:39:42 -0800 Subject: [PATCH 1/3] Adds a deserialization target that retains the ValidatedMessage metadata --- src/consumer.rs | 91 ++++++++++++++++++++++++++++++++++++++++++++- src/tests/google.rs | 22 ++++++++++- 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index 575946d..caafbaf 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -2,15 +2,18 @@ //! //! See the [`Consumer`] trait. -use crate::ValidatedMessage; +use crate::{Headers, ValidatedMessage}; use async_trait::async_trait; use either::Either; use futures_util::stream; use pin_project::pin_project; use std::{ + borrow::Cow, pin::Pin, task::{Context, Poll}, + time::SystemTime, }; +use uuid::Uuid; /// Message consumers ingest messages from a queue service and present them to the user application /// as a [`Stream`](futures_util::stream::Stream). @@ -91,6 +94,92 @@ pub trait DecodableMessage { Self: Sized; } +#[derive(Debug, Clone)] +pub struct DecodedMessage { + /// Unique message identifier. + id: Uuid, + /// The timestamp when message was created in the publishing service. + timestamp: SystemTime, + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + schema: Cow<'static, str>, + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + headers: Headers, + + /// The contents of the decoded message. + message: M, +} + +impl DecodedMessage { + /// Unique message identifier. + pub fn uuid(&self) -> &Uuid { + &self.id + } + + /// The timestamp when message was created in the publishing service. + pub fn timestamp(&self) -> &SystemTime { + &self.timestamp + } + + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub fn schema(&self) -> &str { + &self.schema + } + + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Mutable access to the message headers + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// The encoded message data. + pub fn message(&self) -> &M { + &self.message + } + + /// Destructure this message into just the contained data + pub fn into_message(self) -> M { + self.message + } +} + +impl DecodableMessage for DecodedMessage +where + M: DecodableMessage, +{ + /// The error returned when a message fails to decode + type Error = M::Error; + + /// The decoder used to decode a validated message + type Decoder = M::Decoder; + + /// Decode the given message, using the given decoder, into its structured type + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + where + Self: Sized, + { + let message = M::decode(msg.clone(), decoder)?; + Ok(DecodedMessage { + id: msg.id, + timestamp: msg.timestamp, + schema: msg.schema, + headers: msg.headers, + message: message, + }) + } +} + /// A received message which can be acknowledged to prevent re-delivery by the backing message /// service. /// diff --git a/src/tests/google.rs b/src/tests/google.rs index 444ff84..9def63d 100644 --- a/src/tests/google.rs +++ b/src/tests/google.rs @@ -42,7 +42,7 @@ impl EncodableMessage for TestMessage { uuid::Uuid::nil(), std::time::SystemTime::UNIX_EPOCH, SCHEMA, - Headers::default(), + Headers::from([(String::from("key"), String::from("value"))]), self, ) } @@ -57,6 +57,26 @@ impl DecodableMessage for TestMessage { } } +#[test] +fn decode_with_headers() -> Result<(), BoxError> { + let orig_message = TestMessage { + payload: "foobar".into(), + }; + + let encoded = orig_message.encode(&ProstValidator::new())?; + + let decoded = crate::DecodedMessage::::decode( + encoded, + &ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)), + )?; + + let headers = Headers::from([(String::from("key"), String::from("value"))]); + + assert_eq!(decoded.headers(), &headers); + + Ok(()) +} + #[tokio::test] #[ignore = "pubsub emulator is finicky, run this test manually"] async fn roundtrip_protobuf() -> Result<(), BoxError> { From c35cfce5ed46160bed573336de06adef61988f64 Mon Sep 17 00:00:00 2001 From: Brandon Ogle Date: Mon, 6 Nov 2023 15:51:03 -0800 Subject: [PATCH 2/3] Recycle the ValidatedMessage type to allow for decoding messages while retaining metadata --- src/consumer.rs | 80 +++++------------------------------------ src/lib.rs | 83 ++---------------------------------------- src/message.rs | 87 +++++++++++++++++++++++++++++++++++++++++++++ src/tests/google.rs | 3 +- 4 files changed, 101 insertions(+), 152 deletions(-) create mode 100644 src/message.rs diff --git a/src/consumer.rs b/src/consumer.rs index caafbaf..74b15a5 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -2,18 +2,16 @@ //! //! See the [`Consumer`] trait. -use crate::{Headers, ValidatedMessage}; +use crate::message::ValidatedMessage; use async_trait::async_trait; +use bytes::Bytes; use either::Either; use futures_util::stream; use pin_project::pin_project; use std::{ - borrow::Cow, pin::Pin, task::{Context, Poll}, - time::SystemTime, }; -use uuid::Uuid; /// Message consumers ingest messages from a queue service and present them to the user application /// as a [`Stream`](futures_util::stream::Stream). @@ -56,7 +54,7 @@ pub trait Consumer { type Error; /// The stream returned by [`stream`] type Stream: stream::Stream< - Item = Result, Self::Error>, + Item = Result>, Self::Error>, >; /// Begin pulling messages from the backing message service. @@ -89,72 +87,12 @@ pub trait DecodableMessage { type Decoder; /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result where Self: Sized; } -#[derive(Debug, Clone)] -pub struct DecodedMessage { - /// Unique message identifier. - id: Uuid, - /// The timestamp when message was created in the publishing service. - timestamp: SystemTime, - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - schema: Cow<'static, str>, - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - headers: Headers, - - /// The contents of the decoded message. - message: M, -} - -impl DecodedMessage { - /// Unique message identifier. - pub fn uuid(&self) -> &Uuid { - &self.id - } - - /// The timestamp when message was created in the publishing service. - pub fn timestamp(&self) -> &SystemTime { - &self.timestamp - } - - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - pub fn schema(&self) -> &str { - &self.schema - } - - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - pub fn headers(&self) -> &Headers { - &self.headers - } - - /// Mutable access to the message headers - pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers - } - - /// The encoded message data. - pub fn message(&self) -> &M { - &self.message - } - - /// Destructure this message into just the contained data - pub fn into_message(self) -> M { - self.message - } -} - -impl DecodableMessage for DecodedMessage +impl DecodableMessage for ValidatedMessage where M: DecodableMessage, { @@ -165,17 +103,17 @@ where type Decoder = M::Decoder; /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result where Self: Sized, { let message = M::decode(msg.clone(), decoder)?; - Ok(DecodedMessage { + Ok(Self { id: msg.id, timestamp: msg.timestamp, schema: msg.schema, headers: msg.headers, - message: message, + data: message, }) } } @@ -275,7 +213,7 @@ pub struct MessageStream { impl stream::Stream for MessageStream where S: stream::Stream< - Item = Result, StreamError>, + Item = Result>, StreamError>, >, M: DecodableMessage, { diff --git a/src/lib.rs b/src/lib.rs index f5bd990..0aa5a93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,14 +75,14 @@ //! ``` #![cfg_attr(docsrs, feature(doc_cfg))] -use std::{borrow::Cow, collections::BTreeMap, time::SystemTime}; pub use topic::Topic; use bytes::Bytes; -use uuid::Uuid; +use std::collections::BTreeMap; mod backends; mod consumer; +pub mod message; mod publisher; mod tests; mod topic; @@ -117,81 +117,4 @@ pub enum Error { pub type Headers = BTreeMap; /// A validated message. -/// -/// These are created by validators after encoding a user message, or when pulling messages from -/// the message service. -#[derive(Debug, Clone)] -// derive Eq only in tests so that users can't foot-shoot an expensive == over data -#[cfg_attr(test, derive(PartialEq, Eq))] -pub struct ValidatedMessage { - /// Unique message identifier. - id: Uuid, - /// The timestamp when message was created in the publishing service. - timestamp: SystemTime, - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - schema: Cow<'static, str>, - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - headers: Headers, - /// The encoded message data. - data: Bytes, -} - -impl ValidatedMessage { - /// Create a new validated message - pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self - where - S: Into>, - D: Into, - { - Self { - id, - timestamp, - schema: schema.into(), - headers, - data: data.into(), - } - } - - /// Unique message identifier. - pub fn uuid(&self) -> &Uuid { - &self.id - } - - /// The timestamp when message was created in the publishing service. - pub fn timestamp(&self) -> &SystemTime { - &self.timestamp - } - - /// URI of the schema validating this message. - /// - /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` - pub fn schema(&self) -> &str { - &self.schema - } - - /// Custom message headers. - /// - /// This may be used to track request_id, for example. - pub fn headers(&self) -> &Headers { - &self.headers - } - - /// Mutable access to the message headers - pub fn headers_mut(&mut self) -> &mut Headers { - &mut self.headers - } - - /// The encoded message data. - pub fn data(&self) -> &[u8] { - &self.data - } - - /// Destructure this message into just the contained data - pub fn into_data(self) -> Bytes { - self.data - } -} +pub type ValidatedMessage = message::ValidatedMessage; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..332a874 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; +use std::{borrow::Cow, time::SystemTime}; +use uuid::Uuid; + +use crate::Headers; + +/// A validated message. +/// +/// These are created by validators after encoding a user message, or when pulling messages from +/// the message service. +#[derive(Debug, Clone)] +// derive Eq only in tests so that users can't foot-shoot an expensive == over data +#[cfg_attr(test, derive(PartialEq, Eq))] +pub struct ValidatedMessage { + /// Unique message identifier. + pub(crate) id: Uuid, + /// The timestamp when message was created in the publishing service. + pub(crate) timestamp: SystemTime, + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub(crate) schema: Cow<'static, str>, + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub(crate) headers: Headers, + /// The message data. + pub(crate) data: M, +} + +impl ValidatedMessage { + /// Create a new validated message + pub fn new(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self + where + S: Into>, + D: Into, + { + Self { + id, + timestamp, + schema: schema.into(), + headers, + data: data.into(), + } + } +} + +impl ValidatedMessage { + /// Unique message identifier. + pub fn uuid(&self) -> &Uuid { + &self.id + } + + /// The timestamp when message was created in the publishing service. + pub fn timestamp(&self) -> &SystemTime { + &self.timestamp + } + + /// URI of the schema validating this message. + /// + /// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0` + pub fn schema(&self) -> &str { + &self.schema + } + + /// Custom message headers. + /// + /// This may be used to track request_id, for example. + pub fn headers(&self) -> &Headers { + &self.headers + } + + /// Mutable access to the message headers + pub fn headers_mut(&mut self) -> &mut Headers { + &mut self.headers + } + + /// The message data. + pub fn data(&self) -> &M { + &self.data + } + + /// Destructure this message into just the contained data + pub fn into_data(self) -> M { + self.data + } +} diff --git a/src/tests/google.rs b/src/tests/google.rs index 9def63d..e00c8f3 100644 --- a/src/tests/google.rs +++ b/src/tests/google.rs @@ -7,6 +7,7 @@ use crate::{ AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError, StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName, }, + message, validators::{ prost::{ExactSchemaMatcher, SchemaMismatchError}, ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError, @@ -65,7 +66,7 @@ fn decode_with_headers() -> Result<(), BoxError> { let encoded = orig_message.encode(&ProstValidator::new())?; - let decoded = crate::DecodedMessage::::decode( + let decoded = message::ValidatedMessage::::decode( encoded, &ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)), )?; From 8660c4d95fcaeedb58104e4014a49dc8fae805d8 Mon Sep 17 00:00:00 2001 From: Brandon Ogle Date: Mon, 6 Nov 2023 15:59:43 -0800 Subject: [PATCH 3/3] Bump minimum rust version in test matrix to 1.60 --- .github/workflows/hedwig.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/hedwig.yml b/.github/workflows/hedwig.yml index 0bbb7c0..16f59b7 100644 --- a/.github/workflows/hedwig.yml +++ b/.github/workflows/hedwig.yml @@ -55,7 +55,7 @@ jobs: strategy: fail-fast: false matrix: - rust_toolchain: [nightly, stable, 1.53.0] + rust_toolchain: [nightly, stable, 1.60.0] os: [ubuntu-latest] timeout-minutes: 20 steps: