Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with custom service for bigtable. #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ tempdir = { version = "0.3", optional = true }
tonic = { version = "0.6.2", optional = true }
tower = { version = "0.4", features = ["make"], optional = true }
uuid = { version = "0.8.1", features = ["v4"], optional = true }
http-body = "0.4.5"

[dev-dependencies]
approx = "0.5"
Expand Down
12 changes: 9 additions & 3 deletions src/bigtable/client_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
auth::grpc,
auth::grpc::{self, AuthGrpcService},
bigtable::{api, BigtableClient},
builder,
retry_policy::{exponential_backoff, ExponentialBackoff},
Expand All @@ -25,7 +25,13 @@ config_default! {
}

impl BigtableConfig {
fn auth_scopes(&self) -> Vec<String> {
/// Returns the oauth scopes required for this bigtable configuration.
///
/// These are handled automatically when using [`build_bigtable_client`],
/// but you will need them if you intend to provide your own authentication.
///
/// [`build_bigtable_client`]: crate::ClientBuilder::build_bigtable_client
pub fn auth_scopes(&self) -> Vec<String> {
if self.readonly {
vec![BIGTABLE_DATA_READONLY_SCOPE.to_owned()]
} else {
Expand Down Expand Up @@ -60,7 +66,7 @@ where
config: BigtableConfig,
project: &str,
instance_name: &str,
) -> Result<BigtableClient<C>, BuildError> {
) -> Result<BigtableClient<AuthGrpcService<tonic::transport::Channel, C>>, BuildError> {
let scopes = config.auth_scopes();
let endpoint = tonic::transport::Endpoint::new(config.endpoint)?;

Expand Down
89 changes: 72 additions & 17 deletions src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use hyper::body::Bytes;
use prost::bytes::BytesMut;
use std::ops::{Bound, RangeBounds};

use crate::{
auth::grpc::AuthGrpcService,
retry_policy::{ExponentialBackoff, RetryOperation, RetryPolicy, RetryPredicate},
use crate::auth::grpc::AuthGrpcService;
use crate::retry_policy::{
exponential_backoff, ExponentialBackoff, RetryOperation, RetryPolicy, RetryPredicate,
};

pub use http::Uri;
Expand Down Expand Up @@ -368,31 +368,86 @@ impl From<ReadInProgressError> for ReadRowsError {
/// function.
#[derive(Clone)]
pub struct BigtableClient<
C = crate::DefaultConnector,
Service = AuthGrpcService<tonic::transport::Channel, crate::DefaultConnector>,
Retry = ExponentialBackoff<BigtableRetryCheck>,
> {
inner: api::bigtable::v2::bigtable_client::BigtableClient<
AuthGrpcService<tonic::transport::Channel, C>,
>,
inner: api::bigtable::v2::bigtable_client::BigtableClient<Service>,
retry: Retry,
table_prefix: String,
}

impl<C, Retry> BigtableClient<C, Retry>
impl<Service> BigtableClient<Service>
where
C: tower::Service<http::Uri> + Clone + Send + Sync + 'static,
C::Response: hyper::client::connect::Connection
+ tokio::io::AsyncRead
+ tokio::io::AsyncWrite
+ Send
+ Unpin
+ 'static,
C::Future: Send + Unpin + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Service: tonic::client::GrpcService<tonic::body::BoxBody>,
Service::ResponseBody: http_body::Body + Send + 'static,
Service::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
<Service::ResponseBody as http_body::Body>::Error:
Into<Box<dyn std::error::Error + Send + Sync + 'static>> + Send,
{
/// Create a `BigtableClient` using a provided [`GrpcService`] for communicating
/// to bigtable.
///
/// To connect to a non-emulated bigtable, the service will need to provide authentication;
/// [`AuthGrpcService`] is a wrapper that adds authentication to an unauthenticated service.
///
/// # Example
/// ```
/// # use ya_gcp::bigtable::{BigtableConfig, BigtableClient};
/// # use ya_gcp::auth::grpc::AuthGrpcService;
/// # use hyper::body::Bytes;
/// # let _ = || async move {
/// // Set up auth using yup_oauth2. In this example, we grab it from the user credentials created
/// // by `gloud auth application-default login`.
/// let secret = yup_oauth2::read_authorized_user_secret("/home/me/.config/gcloud/application_default_credentials.json").await?;
/// let auth = yup_oauth2::AuthorizedUserAuthenticator::builder(secret).build().await?;
///
/// let config = BigtableConfig::default();
/// let channel = tonic::transport::Endpoint::from_shared(config.endpoint.clone())?.connect().await?;
/// let auth_channel = AuthGrpcService::new(channel, Some(auth), config.auth_scopes());
/// let client = BigtableClient::new(auth_channel, "my-project", "my-instance");
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// # };
/// ```
///
/// [`GrpcService`]: tonic::client::GrpcService
pub fn new(service: Service, project: &str, instance_name: &str) -> Self {
let retry = ExponentialBackoff::new(
BigtableRetryCheck::default(),
exponential_backoff::Config::default(),
);
Self::with_retry(service, retry, project, instance_name)
}
}

impl<Service, Retry> BigtableClient<Service, Retry>
where
Service: tonic::client::GrpcService<tonic::body::BoxBody>,
Service::ResponseBody: http_body::Body + Send + 'static,
Service::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
<Service::ResponseBody as http_body::Body>::Error:
Into<Box<dyn std::error::Error + Send + Sync + 'static>> + Send,
Retry: RetryPolicy<(), tonic::Status> + 'static,
Retry::RetryOp: Send + 'static,
<Retry::RetryOp as RetryOperation<(), tonic::Status>>::Sleep: Send + 'static,
{
/// Create a `BigtableClient` with a custom retry policy and using a
/// provided [`GrpcService`] for communicating to bigtable.
///
/// To connect to a non-emulated bigtable, the service will need to provide authentication;
/// [`AuthGrpcService`] is a wrapper that adds authentication to an unauthenticated service.
///
/// [`GrpcService`]: tonic::client::GrpcService
pub fn with_retry(service: Service, retry: Retry, project: &str, instance_name: &str) -> Self {
let table_prefix = format!("projects/{}/instances/{}/tables/", project, instance_name);
let inner = api::bigtable::v2::bigtable_client::BigtableClient::new(service);

BigtableClient {
inner,
table_prefix,
retry,
}
}

/// Request some rows from bigtable.
///
/// This is the most general read request; various other convenience methods are
Expand Down