Skip to content

Commit

Permalink
Support runtime Topic designation (#44)
Browse files Browse the repository at this point in the history
Previously the Topic type was only constructible from static strings.
This works for code-gen messages, but doesn't work if the topic needs to
be determined at runtime. This change permits constructing Topics from
arbitrary strings
  • Loading branch information
rnarubin authored Nov 8, 2023
1 parent 74174bb commit e96fb3c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hedwig"
version = "6.3.0"
version = "6.4.0"
authors = [
"Aniruddha Maru <[email protected]>",
"Simonas Kazlauskas <[email protected]>",
Expand Down Expand Up @@ -39,6 +39,7 @@ bytes = "1"
either = { version = "1", features = ["use_std"], default-features = false }
futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false }
pin-project = "1"
smallstr = { version = "0.3.0", features = ["union"] }
thiserror = { version = "1", default-features = false }
url = { version = "2", default-features = false }
uuid = { version = "^0.8", features = ["v4"], default-features = false }
Expand Down
23 changes: 14 additions & 9 deletions src/backends/googlepubsub/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,21 @@ where
let sink = {
let retry_policy = this.retry_policy;
let response_sink = this.response_sink;
this.topic_sinks.entry(topic.clone()).or_insert_with(|| {
Box::pin(TopicSink::new(
client.client.publish_topic_sink(
TopicName::new(topic.as_ref())
.into_project_topic_name(client.project()),

// avoid cloning the topic if the key exists
match this.topic_sinks.get_mut(&topic) {
Some(existing) => existing,
None => this.topic_sinks.entry(topic.clone()).or_insert(Box::pin(
TopicSink::new(
client.client.publish_topic_sink(
TopicName::new(topic.as_ref())
.into_project_topic_name(client.project()),
),
retry_policy.clone(),
Shared::clone(response_sink),
),
retry_policy.clone(),
Shared::clone(response_sink),
))
})
)),
}
};

// poll the sink to see if it's ready
Expand Down
29 changes: 22 additions & 7 deletions src/topic.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
use smallstr::SmallString;

/// A message queue topic name to which messages can be published
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct Topic(&'static str);
// A survey of common topics found lengths between 16 and 35 bytes
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Topic(SmallString<[u8; 36]>);

impl Default for Topic {
fn default() -> Self {
Topic(SmallString::new())
}
}

impl std::fmt::Display for Topic {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(self.0, f)
std::fmt::Display::fmt(self.0.as_str(), f)
}
}

impl<'a> From<&'a str> for Topic {
fn from(s: &'a str) -> Topic {
Topic(s.into())
}
}

impl From<&'static str> for Topic {
fn from(s: &'static str) -> Topic {
Topic(s)
impl From<String> for Topic {
fn from(s: String) -> Topic {
Topic(s.into())
}
}

impl AsRef<str> for Topic {
fn as_ref(&self) -> &str {
self.0
self.0.as_ref()
}
}

0 comments on commit e96fb3c

Please sign in to comment.