Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate core types into stable crate #49

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
[workspace]
members = ["hedwig_core"]

[package]
name = "hedwig"
version = "7.0.0"
version = "7.1.0"
authors = [
"Aniruddha Maru <[email protected]>",
"Simonas Kazlauskas <[email protected]>",
Expand Down Expand Up @@ -38,6 +41,7 @@ async-trait = { version = "0.1" }
bytes = "1"
either = { version = "1", features = ["use_std"], default-features = false }
futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false }
hedwig_core = { version = "0.1", path = "./hedwig_core" }
pin-project = "1"
smallstr = { version = "0.3.0", features = ["union"] }
thiserror = { version = "1", default-features = false }
Expand Down
9 changes: 9 additions & 0 deletions hedwig_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "hedwig_core"
version = "0.1.0"
edition = "2021"

[dependencies]
bytes = "1"
smallstr = { version = "0.3.0", features = ["union"] }
uuid = { version = "1.6", features = ["v4"], default-features = false }
16 changes: 16 additions & 0 deletions hedwig_core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! The core set of traits and types used in the hedwig format.
//!
//! This crate aims to provide better version stability over the primary batteries-included
//! `hedwig` crate. Top-level applications should typically use `hedwig`, while crates that define
//! message types should use `hedwig_core`

mod topic;
pub use topic::Topic;
pub mod message;

/// Custom headers associated with a message.
pub type Headers = std::collections::BTreeMap<String, String>;

/// A validated message.
pub type ValidatedMessage = message::ValidatedMessage<bytes::Bytes>;

59 changes: 58 additions & 1 deletion src/message.rs → hedwig_core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use std::{borrow::Cow, time::SystemTime};
use uuid::Uuid;

use crate::Headers;
use crate::{Headers, Topic};

/// A validated message.
///
Expand Down Expand Up @@ -87,3 +87,60 @@ impl<M> ValidatedMessage<M> {
self.data
}
}

/// Messages which can be decoded from a [`ValidatedMessage`] stream.
pub trait DecodableMessage {
/// The error returned when a message fails to decode
type Error;

/// The decoder used to decode a validated message
type Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}

impl<M> DecodableMessage for ValidatedMessage<M>
where
M: DecodableMessage,
{
/// The error returned when a message fails to decode
type Error = M::Error;

/// The decoder used to decode a validated message
type Decoder = M::Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized,
{
let message = M::decode(msg.clone(), decoder)?;
Ok(Self {
id: msg.id,
timestamp: msg.timestamp,
schema: msg.schema,
headers: msg.headers,
data: message,
})
}
}

/// Types that can be encoded and published.
pub trait EncodableMessage {
/// The errors that can occur when calling the [`EncodableMessage::encode`] method.
///
/// Will typically match the errors returned by the [`EncodableMessage::Validator`].
type Error;

/// The validator to use for this message.
type Validator;

/// Topic into which this message shall be published.
fn topic(&self) -> Topic;

/// Encode the message payload.
fn encode(&self, validator: &Self::Validator) -> Result<ValidatedMessage<Bytes>, Self::Error>;
}
File renamed without changes.
42 changes: 2 additions & 40 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::{
task::{Context, Poll},
};

pub use hedwig_core::message::DecodableMessage;

/// Message consumers ingest messages from a queue service and present them to the user application
/// as a [`Stream`](futures_util::stream::Stream).
///
Expand Down Expand Up @@ -78,46 +80,6 @@ pub trait Consumer {
}
}

/// Messages which can be decoded from a [`ValidatedMessage`] stream.
pub trait DecodableMessage {
/// The error returned when a message fails to decode
type Error;

/// The decoder used to decode a validated message
type Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}

impl<M> DecodableMessage for ValidatedMessage<M>
where
M: DecodableMessage,
{
/// The error returned when a message fails to decode
type Error = M::Error;

/// The decoder used to decode a validated message
type Decoder = M::Decoder;

/// Decode the given message, using the given decoder, into its structured type
fn decode(msg: ValidatedMessage<Bytes>, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized,
{
let message = M::decode(msg.clone(), decoder)?;
Ok(Self {
id: msg.id,
timestamp: msg.timestamp,
schema: msg.schema,
headers: msg.headers,
data: message,
})
}
}

/// A received message which can be acknowledged to prevent re-delivery by the backing message
/// service.
///
Expand Down
13 changes: 1 addition & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,15 @@
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]

pub use topic::Topic;

use bytes::Bytes;
use std::collections::BTreeMap;
pub use hedwig_core::{message, Headers, Topic, ValidatedMessage};

mod backends;
mod consumer;
pub mod message;
mod publisher;
mod tests;
mod topic;
pub mod validators;

pub use backends::*;

Check warning on line 86 in src/lib.rs

View workflow job for this annotation

GitHub Actions / test (nightly, ubuntu-latest)

unused import: `backends::*`
pub use consumer::*;
pub use publisher::*;

Expand All @@ -112,9 +107,3 @@
#[error("Unable to encode message payload")]
EncodeMessage(#[source] Box<dyn std::error::Error + Send + Sync>),
}

/// Custom headers associated with a message.
pub type Headers = BTreeMap<String, String>;

/// A validated message.
pub type ValidatedMessage = message::ValidatedMessage<Bytes>;
20 changes: 2 additions & 18 deletions src/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Types, traits, and functions necessary to publish messages using hedwig

use crate::{Topic, ValidatedMessage};
use futures_util::sink;
use std::{
pin::Pin,
task::{Context, Poll},
};

pub use hedwig_core::message::EncodableMessage;

/// Message publishers.
///
/// Message publishers validate, encode, and deliver messages to an endpoint, possibly a remote
Expand Down Expand Up @@ -45,23 +46,6 @@ pub trait Publisher<M: EncodableMessage, S: sink::Sink<M> = Drain<M>> {
) -> Self::PublishSink;
}

/// Types that can be encoded and published.
pub trait EncodableMessage {
/// The errors that can occur when calling the [`EncodableMessage::encode`] method.
///
/// Will typically match the errors returned by the [`EncodableMessage::Validator`].
type Error;

/// The validator to use for this message.
type Validator;

/// Topic into which this message shall be published.
fn topic(&self) -> Topic;

/// Encode the message payload.
fn encode(&self, validator: &Self::Validator) -> Result<ValidatedMessage, Self::Error>;
}

/// Like [`futures_util::sink::Drain`] but implements `Default`
#[derive(Debug)]
pub struct Drain<T>(std::marker::PhantomData<T>);
Expand Down
4 changes: 2 additions & 2 deletions src/validators/prost.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Validation and decoding for messages encoded with protobuf using [`prost`]
//! Validation and decoding for messages encoded with protobuf using [`prost`](::prost)

Check warning on line 1 in src/validators/prost.rs

View workflow job for this annotation

GitHub Actions / doc

redundant explicit link target
//!
//! ```
//! use hedwig::validators::prost::{ProstValidator, ProstDecoder, ExactSchemaMatcher};
Expand Down Expand Up @@ -120,7 +120,7 @@
.try_match_schema(msg.schema())
.map_err(ProstDecodeError::InvalidSchema)?;

Ok(M::decode(msg.data)?)
Ok(M::decode(msg.into_data())?)
}
}

Expand Down
Loading