Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cluster admin operations #157

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ jobs:
TEST_INTEGRATION: 1
# Kafka support DeleteRecords
TEST_DELETE_RECORDS: 1
# Kafka supports ElectLeaders
TEST_ELECT_LEADERS: 1
# Kafka supports ReassignPartitions
TEST_REASSIGN_PARTITIONS: 1
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
Expand Down
131 changes: 129 additions & 2 deletions src/client/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,32 @@ use crate::{
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{CreateTopicRequest, CreateTopicsRequest},
primitives::{Int16, Int32, NullableString, String_},
messages::{
AlterPartitionReassignmentsPartitionRequest, AlterPartitionReassignmentsRequest,
AlterPartitionReassignmentsTopicRequest, CreateTopicRequest, CreateTopicsRequest,
ElectLeadersRequest, ElectLeadersTopicRequest,
},
primitives::{
Array, CompactArray, CompactString, Int16, Int32, Int8, NullableString, String_,
TaggedFields,
},
},
validation::ExactlyOne,
};

/// Election type of [`ControllerClient::elect_leaders`].
///
/// The names in this enum are borrowed from the
/// [Kafka source code](https://github.com/a0x8o/kafka/blob/5383311a5cfbdaf147411004106449e3ad8081fb/core/src/main/scala/kafka/controller/KafkaController.scala#L2186-L2194>).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ElectionType {
/// Elects the preferred replica.
Preferred,

/// Elects the first live replica if there are no in-sync replica.
Unclean,
}

#[derive(Debug)]
pub struct ControllerClient {
brokers: Arc<BrokerConnector>,
Expand Down Expand Up @@ -78,6 +98,113 @@ impl ControllerClient {
.await
}

/// Re-assign partitions.
pub async fn reassign_partitions(
&self,
topic: impl Into<String> + Send,
partition: i32,
replicas: Vec<i32>,
timeout_ms: i32,
) -> Result<()> {
let request = &AlterPartitionReassignmentsRequest {
topics: vec![AlterPartitionReassignmentsTopicRequest {
name: CompactString(topic.into()),
partitions: vec![AlterPartitionReassignmentsPartitionRequest {
partition_index: Int32(partition),
replicas: CompactArray(Some(replicas.into_iter().map(Int32).collect())),
tagged_fields: TaggedFields::default(),
}],
tagged_fields: TaggedFields::default(),
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: TaggedFields::default(),
};

maybe_retry(
&self.backoff_config,
self,
"reassign_partitions",
|| async move {
let broker = self.get().await?;
let response = broker.request(request).await?;

if let Some(protocol_error) = response.error {
return Err(Error::ServerError(protocol_error, Default::default()));
}

let topic = response
.responses
.exactly_one()
.map_err(Error::exactly_one_topic)?;

let partition = topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;

match partition.error {
None => Ok(()),
Some(protocol_error) => Err(Error::ServerError(
protocol_error,
partition.error_message.0.unwrap_or_default(),
)),
}
},
)
.await
}

/// Elect leaders for given topic and partition.
pub async fn elect_leaders(
&self,
topic: impl Into<String> + Send,
partition: i32,
election_type: ElectionType,
timeout_ms: i32,
) -> Result<()> {
let request = &ElectLeadersRequest {
election_type: Int8(match election_type {
ElectionType::Preferred => 0,
ElectionType::Unclean => 1,
}),
topic_partitions: vec![ElectLeadersTopicRequest {
topic: String_(topic.into()),
partitions: Array(Some(vec![Int32(partition)])),
tagged_fields: None,
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: None,
};

maybe_retry(&self.backoff_config, self, "elect_leaders", || async move {
let broker = self.get().await?;
let response = broker.request(request).await?;

if let Some(protocol_error) = response.error {
return Err(Error::ServerError(protocol_error, Default::default()));
}

let topic = response
.replica_election_results
.exactly_one()
.map_err(Error::exactly_one_topic)?;

let partition = topic
.partition_results
.exactly_one()
.map_err(Error::exactly_one_partition)?;

match partition.error {
None => Ok(()),
Some(protocol_error) => Err(Error::ServerError(
protocol_error,
partition.error_message.0.unwrap_or_default(),
)),
}
})
.await
}

/// Retrieve the broker ID of the controller
async fn get_controller_id(&self) -> Result<i32> {
let metadata = self.brokers.request_metadata(None, Some(vec![])).await?;
Expand Down
50 changes: 50 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;
use crate::{
client::partition::PartitionClient,
connection::{BrokerConnector, TlsConfig},
metadata::{Metadata, MetadataBroker, MetadataPartition, MetadataTopic},
protocol::primitives::Boolean,
topic::Topic,
};
Expand Down Expand Up @@ -145,4 +146,53 @@ impl Client {
})
.collect())
}

/// Return cluster-wide metadata.
pub async fn metadata(&self) -> Result<Metadata> {
let response = self.brokers.request_metadata(None, None).await?;

Ok(Metadata {
brokers: response
.brokers
.into_iter()
.map(|response| MetadataBroker {
node_id: response.node_id.0,
host: response.host.0,
port: response.port.0,
rack: response.rack.and_then(|s| s.0),
})
.collect(),
controller_id: response.controller_id.map(|id| id.0),
topics: response
.topics
.into_iter()
.map(|response| MetadataTopic {
name: response.name.0,
is_internal: response.is_internal.map(|b| b.0),
partitions: response
.partitions
.into_iter()
.map(|response| MetadataPartition {
partition_index: response.partition_index.0,
leader_id: response.leader_id.0,
replica_nodes: response
.replica_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
isr_nodes: response
.isr_nodes
.0
.unwrap_or_default()
.into_iter()
.map(|i| i.0)
.collect(),
})
.collect(),
})
.collect(),
})
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ mod backoff;
pub mod client;

mod connection;

#[cfg(feature = "unstable-fuzzing")]
pub mod messenger;
#[cfg(not(feature = "unstable-fuzzing"))]
mod messenger;

pub mod metadata;

#[cfg(feature = "unstable-fuzzing")]
pub mod protocol;
#[cfg(not(feature = "unstable-fuzzing"))]
Expand Down
59 changes: 59 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Cluster-wide Kafka metadata.

/// Metadata container for the entire cluster.
#[derive(Debug, PartialEq)]
pub struct Metadata {
/// Brokers.
pub brokers: Vec<MetadataBroker>,

/// The ID of the controller broker.
pub controller_id: Option<i32>,

/// Topics.
pub topics: Vec<MetadataTopic>,
}

/// Metadata for a certain broker.
#[derive(Debug, PartialEq)]
pub struct MetadataBroker {
/// The broker ID
pub node_id: i32,

/// The broker hostname
pub host: String,

/// The broker port
pub port: i32,

/// Rack.
pub rack: Option<String>,
}

/// Metadata for a certain topic.
#[derive(Debug, PartialEq)]
pub struct MetadataTopic {
/// The topic name
pub name: String,

/// True if the topic is internal
pub is_internal: Option<bool>,

/// Each partition in the topic
pub partitions: Vec<MetadataPartition>,
}

/// Metadata for a certain partition.
#[derive(Debug, PartialEq)]
pub struct MetadataPartition {
/// The partition index
pub partition_index: i32,

/// The ID of the leader broker
pub leader_id: i32,

/// The set of all nodes that host this partition
pub replica_nodes: Vec<i32>,

/// The set of all nodes that are in sync with the leader for this partition
pub isr_nodes: Vec<i32>,
}
Loading