Skip to content

Commit

Permalink
Recycle the ValidatedMessage type to allow for decoding messages whil…
Browse files Browse the repository at this point in the history
…e retaining metadata
  • Loading branch information
Brandon Ogle committed Nov 6, 2023
1 parent 5b44476 commit cd9bf6b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 152 deletions.
80 changes: 9 additions & 71 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
//!
//! See the [`Consumer`] trait.
use crate::{Headers, ValidatedMessage};
use crate::message::ValidatedMessage;
use async_trait::async_trait;
use bytes::Bytes;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
use std::{
borrow::Cow,
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use uuid::Uuid;

/// Message consumers ingest messages from a queue service and present them to the user application
/// as a [`Stream`](futures_util::stream::Stream).
Expand Down Expand Up @@ -56,7 +54,7 @@ pub trait Consumer {
type Error;
/// The stream returned by [`stream`]
type Stream: stream::Stream<
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage>, Self::Error>,
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage<Bytes>>, Self::Error>,
>;

/// Begin pulling messages from the backing message service.
Expand Down Expand Up @@ -89,72 +87,12 @@ pub trait DecodableMessage {
type Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error>
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}

#[derive(Debug, Clone)]
pub struct DecodedMessage<M> {
/// Unique message identifier.
id: Uuid,
/// The timestamp when message was created in the publishing service.
timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
headers: Headers,

/// The contents of the decoded message.
message: M,
}

impl<M> DecodedMessage<M> {
/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The encoded message data.
pub fn message(&self) -> &M {
&self.message
}

/// Destructure this message into just the contained data
pub fn into_message(self) -> M {
self.message
}
}

impl<M> DecodableMessage for DecodedMessage<M>
impl<M> DecodableMessage for ValidatedMessage<M>
where
M: DecodableMessage,
{
Expand All @@ -165,17 +103,17 @@ where
type Decoder = M::Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error>
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized,
{
let message = M::decode(msg.clone(), decoder)?;
Ok(DecodedMessage {
Ok(Self {
id: msg.id,
timestamp: msg.timestamp,
schema: msg.schema,
headers: msg.headers,
message: message,
data: message,
})
}
}
Expand Down Expand Up @@ -275,7 +213,7 @@ pub struct MessageStream<S, D, M> {
impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
S: stream::Stream<
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage>, StreamError>,
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage<Bytes>>, StreamError>,
>,
M: DecodableMessage<Decoder = D>,
{
Expand Down
83 changes: 3 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{borrow::Cow, collections::BTreeMap, time::SystemTime};
pub use topic::Topic;

use bytes::Bytes;
use uuid::Uuid;
use std::collections::BTreeMap;

mod backends;
mod consumer;
pub mod message;
mod publisher;
mod tests;
mod topic;
Expand Down Expand Up @@ -117,81 +117,4 @@ pub enum Error {
pub type Headers = BTreeMap<String, String>;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage {
/// Unique message identifier.
id: Uuid,
/// The timestamp when message was created in the publishing service.
timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
headers: Headers,
/// The encoded message data.
data: Bytes,
}

impl ValidatedMessage {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}

/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The encoded message data.
pub fn data(&self) -> &[u8] {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> Bytes {
self.data
}
}
pub type ValidatedMessage = message::ValidatedMessage<Bytes>;
87 changes: 87 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use bytes::Bytes;
use std::{borrow::Cow, time::SystemTime};
use uuid::Uuid;

use crate::Headers;

/// A validated message.
///
/// These are created by validators after encoding a user message, or when pulling messages from
/// the message service.
#[derive(Debug, Clone)]
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct ValidatedMessage<M> {
/// Unique message identifier.
pub(crate) id: Uuid,
/// The timestamp when message was created in the publishing service.
pub(crate) timestamp: SystemTime,
/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub(crate) schema: Cow<'static, str>,
/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub(crate) headers: Headers,
/// The message data.
pub(crate) data: M,
}

impl ValidatedMessage<Bytes> {
/// Create a new validated message
pub fn new<S, D>(id: Uuid, timestamp: SystemTime, schema: S, headers: Headers, data: D) -> Self
where
S: Into<Cow<'static, str>>,
D: Into<Bytes>,
{
Self {
id,
timestamp,
schema: schema.into(),
headers,
data: data.into(),
}
}
}

impl<M> ValidatedMessage<M> {
/// Unique message identifier.
pub fn uuid(&self) -> &Uuid {
&self.id
}

/// The timestamp when message was created in the publishing service.
pub fn timestamp(&self) -> &SystemTime {
&self.timestamp
}

/// URI of the schema validating this message.
///
/// E.g. `https://hedwig.domain.xyz/schemas#/schemas/user.created/1.0`
pub fn schema(&self) -> &str {
&self.schema
}

/// Custom message headers.
///
/// This may be used to track request_id, for example.
pub fn headers(&self) -> &Headers {
&self.headers
}

/// Mutable access to the message headers
pub fn headers_mut(&mut self) -> &mut Headers {
&mut self.headers
}

/// The encoded message data.
pub fn data(&self) -> &M {
&self.data
}

/// Destructure this message into just the contained data
pub fn into_data(self) -> M {
self.data
}
}
3 changes: 2 additions & 1 deletion src/tests/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubError, PublishError,
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
},
message,
validators::{
prost::{ExactSchemaMatcher, SchemaMismatchError},
ProstDecodeError, ProstDecoder, ProstValidator, ProstValidatorError,
Expand Down Expand Up @@ -65,7 +66,7 @@ fn decode_with_headers() -> Result<(), BoxError> {

let encoded = orig_message.encode(&ProstValidator::new())?;

let decoded = crate::DecodedMessage::<TestMessage>::decode(
let decoded = message::ValidatedMessage::<TestMessage>::decode(
encoded,
&ProstDecoder::new(ExactSchemaMatcher::new(SCHEMA)),
)?;
Expand Down

0 comments on commit cd9bf6b

Please sign in to comment.