Skip to content

Commit

Permalink
Adds a deserialization target that retains the ValidatedMessage metad…
Browse files Browse the repository at this point in the history
…ata (#45)

Currently the consume method on the MessageStream drops all of the ValidatedMessage metadata, this PR adds a wrapper type that enables clients to opt into deserializing into a wrapper type that includes the ValidatedMessage metadata fields alongside the target decoded message
  • Loading branch information
blogle authored Nov 7, 2023
1 parent dc22fa4 commit 7a3204f
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/hedwig.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
strategy:
fail-fast: false
matrix:
rust_toolchain: [nightly, stable, 1.53.0]
rust_toolchain: [nightly, stable, 1.60.0]
os: [ubuntu-latest]
timeout-minutes: 20
steps:
Expand Down
35 changes: 31 additions & 4 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
//!
//! See the [`Consumer`] trait.
use crate::ValidatedMessage;
use crate::message::ValidatedMessage;
use async_trait::async_trait;
use bytes::Bytes;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
Expand Down Expand Up @@ -53,7 +54,7 @@ pub trait Consumer {
type Error;
/// The stream returned by [`stream`]
type Stream: stream::Stream<
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage>, Self::Error>,
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage<Bytes>>, Self::Error>,
>;

/// Begin pulling messages from the backing message service.
Expand Down Expand Up @@ -86,11 +87,37 @@ pub trait DecodableMessage {
type Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error>
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}

impl<M> DecodableMessage for ValidatedMessage<M>
where
M: DecodableMessage,
{
/// The error returned when a message fails to decode
type Error = M::Error;

/// The decoder used to decode a validated message
type Decoder = M::Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized,
{
let message = M::decode(msg.clone(), decoder)?;
Ok(Self {
id: msg.id,
timestamp: msg.timestamp,
schema: msg.schema,
headers: msg.headers,
data: message,
})
}
}

/// A received message which can be acknowledged to prevent re-delivery by the backing message
/// service.
///
Expand Down Expand Up @@ -186,7 +213,7 @@ pub struct MessageStream<S, D, M> {
impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
S: stream::Stream<
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage>, StreamError>,
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage<Bytes>>, StreamError>,
>,
M: DecodableMessage<Decoder = D>,
{
Expand Down
83 changes: 3 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;

use bytes::Bytes;
use uuid::Uuid;
use std::collections::BTreeMap;

mod backends;
mod consumer;
pub mod message;
mod publisher;
mod tests;
mod topic;
Expand Down Expand Up @@ -117,81 +117,4 @@ pub enum Error {
pub type Headers = BTreeMap<String, String>;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
/// Unique message identifier.
id: Uuid,
/// The timestamp when message was created in the publishing service.
timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
headers: Headers,
/// The encoded message data.
data: Bytes,
}

impl ValidatedMessage {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}

/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The encoded message data.
pub fn data(&self) -> &[u8] {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> Bytes {
self.data
}
}
pub type ValidatedMessage = message::ValidatedMessage<Bytes>;
87 changes: 87 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use bytes::Bytes;
use std::{borrow::Cow, time::SystemTime};
use uuid::Uuid;

use crate::Headers;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage<M> {
/// Unique message identifier.
pub(crate) id: Uuid,
/// The timestamp when message was created in the publishing service.
pub(crate) timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub(crate) schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub(crate) headers: Headers,
/// The message data.
pub(crate) data: M,
}

impl ValidatedMessage<Bytes> {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}
}

impl<M> ValidatedMessage<M> {
/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The message data.
pub fn data(&self) -> &M {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> M {
self.data
}
}
23 changes: 22 additions & 1 deletion src/tests/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError,
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
message,
validators::{
prost::{ExactSchemaMatcher, SchemaMismatchError},
ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError,
Expand Down Expand Up @@ -42,7 +43,7 @@ impl EncodableMessage for TestMessage {
uuid::Uuid::nil(),
std::time::SystemTime::UNIX_EPOCH,
SCHEMA,
Headers::default(),
Headers::from([(String::from("key"), String::from("value"))]),
self,
)
}
Expand All @@ -57,6 +58,26 @@ impl DecodableMessage for TestMessage {
}
}

#[test]
fn decode_with_headers() -> Result<(), BoxError> {
let orig_message = TestMessage {
payload: "foobar".into(),
};

let encoded = orig_message.encode(&ProstValidator::new())?;

let decoded = message::ValidatedMessage::<TestMessage>::decode(
encoded,
&ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)),
)?;

let headers = Headers::from([(String::from("key"), String::from("value"))]);

assert_eq!(decoded.headers(), &headers);

Ok(())
}

#[tokio::test]
#[ignore = "pubsub emulator is finicky, run this test manually"]
async fn roundtrip_protobuf() -> Result<(), BoxError> {
Expand Down

0 comments on commit 7a3204f

Please sign in to comment.