From 97863564b2dc1db26abffe73027d1e94e02bfe6d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 21 Jul 2022 08:18:51 +0200 Subject: [PATCH] feat: expose low-level metadata to user Helpful for debugging and certain admin operations like topic re-assignment and leader election. --- src/client/mod.rs | 50 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +++ src/metadata.rs | 59 +++++++++++++++++++++++++++++++++++++++++++++++ tests/client.rs | 23 ++++++++++++++++++ 4 files changed, 135 insertions(+) create mode 100644 src/metadata.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index 9bddeda8..40ff8e58 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -5,6 +5,7 @@ use thiserror::Error; use crate::{ client::partition::PartitionClient, connection::{BrokerConnector, TlsConfig}, + metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic}, protocol::primitives::Boolean, topic::Topic, }; @@ -145,4 +146,53 @@ impl Client { }) .collect()) } + + /// Return cluster-wide metadata. + pub async fn metadata(&self) -> Result { + let response = self.brokers.request_metadata(None, None).await?; + + Ok(Metadata { + brokers: response + .brokers + .into_iter() + .map(|response| MetadataBroker { + node_id: response.node_id.0, + host: response.host.0, + port: response.port.0, + rack: response.rack.and_then(|s| s.0), + }) + .collect(), + controller_id: response.controller_id.map(|id| id.0), + topics: response + .topics + .into_iter() + .map(|response| MetadataTopic { + name: response.name.0, + is_internal: response.is_internal.map(|b| b.0), + partitions: response + .partitions + .into_iter() + .map(|response| MetadataPartition { + partition_index: response.partition_index.0, + leader_id: response.leader_id.0, + replica_nodes: response + .replica_nodes + .0 + .unwrap_or_default() + .into_iter() + .map(|i| i.0) + .collect(), + isr_nodes: response + .isr_nodes + .0 + .unwrap_or_default() + .into_iter() + .map(|i| i.0) + .collect(), + }) + .collect(), + }) + .collect(), + }) + } } diff --git a/src/lib.rs b/src/lib.rs index b7543fdd..dbb0524c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,11 +22,14 @@ mod backoff; pub mod client; mod connection; + #[cfg(feature = "unstable-fuzzing")] pub mod messenger; #[cfg(not(feature = "unstable-fuzzing"))] mod messenger; +pub mod metadata; + #[cfg(feature = "unstable-fuzzing")] pub mod protocol; #[cfg(not(feature = "unstable-fuzzing"))] diff --git a/src/metadata.rs b/src/metadata.rs new file mode 100644 index 00000000..db653adb --- /dev/null +++ b/src/metadata.rs @@ -0,0 +1,59 @@ +//! Cluster-wide Kafka metadata. + +/// Metadata container for the entire cluster. +#[derive(Debug, PartialEq)] +pub struct Metadata { + /// Brokers. + pub brokers: Vec, + + /// The ID of the controller broker. + pub controller_id: Option, + + /// Topics. + pub topics: Vec, +} + +/// Metadata for a certain broker. +#[derive(Debug, PartialEq)] +pub struct MetadataBroker { + /// The broker ID + pub node_id: i32, + + /// The broker hostname + pub host: String, + + /// The broker port + pub port: i32, + + /// Rack. + pub rack: Option, +} + +/// Metadata for a certain topic. +#[derive(Debug, PartialEq)] +pub struct MetadataTopic { + /// The topic name + pub name: String, + + /// True if the topic is internal + pub is_internal: Option, + + /// Each partition in the topic + pub partitions: Vec, +} + +/// Metadata for a certain partition. +#[derive(Debug, PartialEq)] +pub struct MetadataPartition { + /// The partition index + pub partition_index: i32, + + /// The ID of the leader broker + pub leader_id: i32, + + /// The set of all nodes that host this partition + pub replica_nodes: Vec, + + /// The set of all nodes that are in sync with the leader for this partition + pub isr_nodes: Vec, +} diff --git a/tests/client.rs b/tests/client.rs index dfd4c86c..3892d926 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -541,6 +541,29 @@ async fn test_delete_records() { ); } +#[tokio::test] +async fn test_metadata() { + maybe_start_logging(); + + let connection = maybe_skip_kafka_integration!(); + let topic_name = random_topic_name(); + + let client = ClientBuilder::new(connection).build().await.unwrap(); + + let controller_client = client.controller_client().unwrap(); + controller_client + .create_topic(&topic_name, 1, 1, 5_000) + .await + .unwrap(); + + let md = client.metadata().await.unwrap(); + assert!(!md.brokers.is_empty()); + md.topics + .into_iter() + .find(|topic| topic.name == topic_name) + .unwrap(); +} + #[tokio::test] async fn test_reassign_partitions() { maybe_start_logging();