From 73fae2bba35a81e2e409a0378a109422a562741d Mon Sep 17 00:00:00 2001 From: Renar Narubin Date: Tue, 19 Dec 2023 15:46:06 -0800 Subject: [PATCH] Separate core types into stable crate This change moves the fundamental message-related traits and types into their own hedwig_core crate. This crate will have more version stability than the primary crate, which should allow less dependency churn when a set of message types are used across many crates. Further divisions of the library were considered. For example, it could be sensible to separate out hedwig-gcp, hedwig-prost, hedwig-json, etc. This would enable even better upgrade granularity, but at a maintenance cost. Both this repo and its users would then have to specify/coordinate/upgrade a plethora of crates to get a working system. Ultimately a smaller split of core+rest seems like a better compromise. --- Cargo.lock | 12 +++++- Cargo.toml | 6 ++- hedwig_core/Cargo.toml | 9 +++++ hedwig_core/src/lib.rs | 16 ++++++++ {src => hedwig_core/src}/message.rs | 59 ++++++++++++++++++++++++++++- {src => hedwig_core/src}/topic.rs | 0 src/consumer.rs | 42 +------------------- src/lib.rs | 13 +------ src/publisher.rs | 20 +--------- src/validators/prost.rs | 4 +- 10 files changed, 106 insertions(+), 75 deletions(-) create mode 100644 hedwig_core/Cargo.toml create mode 100644 hedwig_core/src/lib.rs rename {src => hedwig_core/src}/message.rs (56%) rename {src => hedwig_core/src}/topic.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index df22fcd..1fa84a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,7 +485,7 @@ dependencies = [ [[package]] name = "hedwig" -version = "7.0.0" +version = "7.1.0" dependencies = [ "async-channel", "async-trait", @@ -493,6 +493,7 @@ dependencies = [ "either", "futures-channel", "futures-util", + "hedwig_core", "parking_lot", "pin-project", "prost", @@ -510,6 +511,15 @@ dependencies = [ "ya-gcp", ] +[[package]] +name = "hedwig_core" +version = "0.1.0" +dependencies = [ + "bytes", + "smallstr", + "uuid 1.6.1", +] + [[package]] name = "hermit-abi" version = "0.1.19" diff --git a/Cargo.toml b/Cargo.toml index cd3d10e..5f73cb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,9 @@ +[workspace] +members = ["hedwig_core"] + [package] name = "hedwig" -version = "7.0.0" +version = "7.1.0" authors = [ "Aniruddha Maru ", "Simonas Kazlauskas ", @@ -38,6 +41,7 @@ async-trait = { version = "0.1" } bytes = "1" either = { version = "1", features = ["use_std"], default-features = false } futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false } +hedwig_core = { version = "0.1", path = "./hedwig_core" } pin-project = "1" smallstr = { version = "0.3.0", features = ["union"] } thiserror = { version = "1", default-features = false } diff --git a/hedwig_core/Cargo.toml b/hedwig_core/Cargo.toml new file mode 100644 index 0000000..aa89516 --- /dev/null +++ b/hedwig_core/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "hedwig_core" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = "1" +smallstr = { version = "0.3.0", features = ["union"] } +uuid = { version = "1.6", features = ["v4"], default-features = false } diff --git a/hedwig_core/src/lib.rs b/hedwig_core/src/lib.rs new file mode 100644 index 0000000..6f7d4ed --- /dev/null +++ b/hedwig_core/src/lib.rs @@ -0,0 +1,16 @@ +//! The core set of traits and types used in the hedwig format. +//! +//! This crate aims to provide better version stability over the primary batteries-included +//! `hedwig` crate. Top-level applications should typically use `hedwig`, while crates that define +//! message types should use `hedwig_core` + +mod topic; +pub use topic::Topic; +pub mod message; + +/// Custom headers associated with a message. +pub type Headers = std::collections::BTreeMap; + +/// A validated message. +pub type ValidatedMessage = message::ValidatedMessage; + diff --git a/src/message.rs b/hedwig_core/src/message.rs similarity index 56% rename from src/message.rs rename to hedwig_core/src/message.rs index 2b8899a..ebff868 100644 --- a/src/message.rs +++ b/hedwig_core/src/message.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use std::{borrow::Cow, time::SystemTime}; use uuid::Uuid; -use crate::Headers; +use crate::{Headers, Topic}; /// A validated message. /// @@ -87,3 +87,60 @@ impl ValidatedMessage { self.data } } + +/// Messages which can be decoded from a [`ValidatedMessage`] stream. +pub trait DecodableMessage { + /// The error returned when a message fails to decode + type Error; + + /// The decoder used to decode a validated message + type Decoder; + + /// Decode the given message, using the given decoder, into its structured type + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + where + Self: Sized; +} + +impl DecodableMessage for ValidatedMessage +where + M: DecodableMessage, +{ + /// The error returned when a message fails to decode + type Error = M::Error; + + /// The decoder used to decode a validated message + type Decoder = M::Decoder; + + /// Decode the given message, using the given decoder, into its structured type + fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result + where + Self: Sized, + { + let message = M::decode(msg.clone(), decoder)?; + Ok(Self { + id: msg.id, + timestamp: msg.timestamp, + schema: msg.schema, + headers: msg.headers, + data: message, + }) + } +} + +/// Types that can be encoded and published. +pub trait EncodableMessage { + /// The errors that can occur when calling the [`EncodableMessage::encode`] method. + /// + /// Will typically match the errors returned by the [`EncodableMessage::Validator`]. + type Error; + + /// The validator to use for this message. + type Validator; + + /// Topic into which this message shall be published. + fn topic(&self) -> Topic; + + /// Encode the message payload. + fn encode(&self, validator: &Self::Validator) -> Result, Self::Error>; +} diff --git a/src/topic.rs b/hedwig_core/src/topic.rs similarity index 100% rename from src/topic.rs rename to hedwig_core/src/topic.rs diff --git a/src/consumer.rs b/src/consumer.rs index 74b15a5..df25e3d 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -13,6 +13,8 @@ use std::{ task::{Context, Poll}, }; +pub use hedwig_core::message::DecodableMessage; + /// Message consumers ingest messages from a queue service and present them to the user application /// as a [`Stream`](futures_util::stream::Stream). /// @@ -78,46 +80,6 @@ pub trait Consumer { } } -/// Messages which can be decoded from a [`ValidatedMessage`] stream. -pub trait DecodableMessage { - /// The error returned when a message fails to decode - type Error; - - /// The decoder used to decode a validated message - type Decoder; - - /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result - where - Self: Sized; -} - -impl DecodableMessage for ValidatedMessage -where - M: DecodableMessage, -{ - /// The error returned when a message fails to decode - type Error = M::Error; - - /// The decoder used to decode a validated message - type Decoder = M::Decoder; - - /// Decode the given message, using the given decoder, into its structured type - fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result - where - Self: Sized, - { - let message = M::decode(msg.clone(), decoder)?; - Ok(Self { - id: msg.id, - timestamp: msg.timestamp, - schema: msg.schema, - headers: msg.headers, - data: message, - }) - } -} - /// A received message which can be acknowledged to prevent re-delivery by the backing message /// service. /// diff --git a/src/lib.rs b/src/lib.rs index 0aa5a93..04802db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,17 +75,12 @@ //! ``` #![cfg_attr(docsrs, feature(doc_cfg))] -pub use topic::Topic; - -use bytes::Bytes; -use std::collections::BTreeMap; +pub use hedwig_core::{message, Headers, Topic, ValidatedMessage}; mod backends; mod consumer; -pub mod message; mod publisher; mod tests; -mod topic; pub mod validators; pub use backends::*; @@ -112,9 +107,3 @@ pub enum Error { #[error("Unable to encode message payload")] EncodeMessage(#[source] Box), } - -/// Custom headers associated with a message. -pub type Headers = BTreeMap; - -/// A validated message. -pub type ValidatedMessage = message::ValidatedMessage; diff --git a/src/publisher.rs b/src/publisher.rs index 72ff6e0..5295fbd 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -1,12 +1,13 @@ //! Types, traits, and functions necessary to publish messages using hedwig -use crate::{Topic, ValidatedMessage}; use futures_util::sink; use std::{ pin::Pin, task::{Context, Poll}, }; +pub use hedwig_core::message::EncodableMessage; + /// Message publishers. /// /// Message publishers validate, encode, and deliver messages to an endpoint, possibly a remote @@ -45,23 +46,6 @@ pub trait Publisher = Drain> { ) -> Self::PublishSink; } -/// Types that can be encoded and published. -pub trait EncodableMessage { - /// The errors that can occur when calling the [`EncodableMessage::encode`] method. - /// - /// Will typically match the errors returned by the [`EncodableMessage::Validator`]. - type Error; - - /// The validator to use for this message. - type Validator; - - /// Topic into which this message shall be published. - fn topic(&self) -> Topic; - - /// Encode the message payload. - fn encode(&self, validator: &Self::Validator) -> Result; -} - /// Like [`futures_util::sink::Drain`] but implements `Default` #[derive(Debug)] pub struct Drain(std::marker::PhantomData); diff --git a/src/validators/prost.rs b/src/validators/prost.rs index 3ae0e5b..bd51ffa 100644 --- a/src/validators/prost.rs +++ b/src/validators/prost.rs @@ -1,4 +1,4 @@ -//! Validation and decoding for messages encoded with protobuf using [`prost`] +//! Validation and decoding for messages encoded with protobuf using [`prost`](::prost) //! //! ``` //! use hedwig::validators::prost::{ProstValidator, ProstDecoder, ExactSchemaMatcher}; @@ -120,7 +120,7 @@ impl ProstDecoder { .try_match_schema(msg.schema()) .map_err(ProstDecodeError::InvalidSchema)?; - Ok(M::decode(msg.data)?) + Ok(M::decode(msg.into_data())?) } }