Skip to content

Commit

Permalink
Refactor PubSub acks to independent requests (standard-ai#37)
Browse files Browse the repository at this point in the history
* Refactor PubSub acks to independent requests

Previously pubsub acknowledgements were sent over the client stream of
the streaming pull gRPC. This provided limited feedback for the
ack/nack/modify callers, who had no (good) way to know their request was
actually submitted. Furthermore there's evidence that some bug existed
around connection resets and long ack times, although a definitive cause
was not identified.

This change uses explicit `acknowledge` and `modify_ack_deadline` rpc
calls to submit acks instead of the client stream. These enable much
clearer feedback for ack callers, as well as better backpressure
regulation in general. This implementation was loosely inspired by the
approach in the golang pubsub library[1].

[1]: https://github.com/googleapis/google-cloud-go/blob/94d040898cc9e85fdac76560765b01cfd019d0b4/pubsub/iterator.go#L422-L446
  • Loading branch information
rnarubin authored Dec 7, 2023
1 parent b300c0a commit e257431
Show file tree
Hide file tree
Showing 13 changed files with 6,782 additions and 453 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,6 @@ jobs:
with:
toolchain: nightly-2023-04-18
command: doc
args: --all-features
env:
RUSTDOCFLAGS: "--cfg docsrs"
167 changes: 88 additions & 79 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ya-gcp"
version = "0.11.0"
version = "0.11.1"
authors = ["Renar Narubin <[email protected]>"]
edition = "2021"
description = "APIs for using Google Cloud Platform services"
Expand Down Expand Up @@ -39,7 +39,7 @@ webpki-roots = ["dep:webpki-roots", "tonic?/tls-webpki-roots"]
grpc = ["tonic", "prost", "prost-types", "tower", "derive_more"]

bigtable = ["async-stream", "grpc", "prost", "tower"]
pubsub = ["grpc", "uuid", "async-stream", "pin-project", "async-channel", "tokio/time"]
pubsub = ["grpc", "uuid", "async-stream", "pin-project", "tokio/sync"]
storage = ["tame-gcs", "tower"]

# whether to include service emulator implementations. useful for testing
Expand All @@ -57,12 +57,11 @@ rand = "0.8"
rustls = "0.21.8"
serde = { version = "1", features = ["derive"] }
thiserror = "1"
tokio = { version = "1", features = ["time"] }
tokio = { version = "1.34", features = ["time"] }
tracing = "0.1.37"
yup-oauth2 = "8.3.0"

async-stream = { version = "0.3", optional = true }
async-channel = { version = "1", optional = true }
derive_more = { version = "0.99", optional = true }
pin-project = { version = "1.0.11", optional = true }
prost = { version = "0.12.3", optional = true }
Expand All @@ -82,7 +81,7 @@ quickcheck = "1"
quickcheck_macros = "1"
serde_json = "1"
structopt = "0.3" # for examples
tokio = { version = "1.4.0", features = ["rt-multi-thread", "time", "test-util"] }
tokio = { version = "1.34.0", features = ["rt-multi-thread", "time", "test-util"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-tree = "0.2"

Expand Down
4 changes: 3 additions & 1 deletion generators/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ fn main() -> Result<(), Error> {

tonic_build::configure()
.build_client(true)
.build_server(false)
.build_server(true) // build servers for tests
.server_mod_attribute(".", "#[cfg(test)]")
.generate_default_stubs(true)
.out_dir(&args.output_dir)
.compile_with_config(
prost_config,
Expand Down
7 changes: 6 additions & 1 deletion src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ pub use mutation::{MutateRowRequest, MutateRowsError, MutateRowsRequest};
#[cfg_attr(docsrs, doc(cfg(feature = "emulators")))]
pub mod emulator;

#[allow(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, missing_docs)]
#[allow(
rustdoc::broken_intra_doc_links,
rustdoc::bare_urls,
missing_docs,
unreachable_pub
)]
pub mod api {
pub mod rpc {
include!("../generated/google.rpc.rs");
Expand Down
Loading

0 comments on commit e257431

Please sign in to comment.