Skip to content

Commit

Permalink
[OSS-118] Support for pagination (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefandanaita authored Oct 31, 2024
1 parent a3d496a commit 33886a5
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 48 deletions.
37 changes: 0 additions & 37 deletions src/api/connection.rs

This file was deleted.

27 changes: 23 additions & 4 deletions src/api/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::api::_generic::{handle_empty_response, handle_response};
use crate::api::binding::RabbitMqBinding;
use crate::api::pagination::RabbitMqPaginationRequest;
use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter};
use crate::errors::RabbitMqClientError;
use crate::RabbitMqClient;
use async_trait::async_trait;
Expand All @@ -10,7 +12,8 @@ pub trait ExchangeApi {
async fn list_exchanges(
&self,
vhost: Option<String>,
) -> Result<Vec<RabbitMqExchange>, RabbitMqClientError>;
pagination: Option<RabbitMqPagination>,
) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError>;

async fn get_exchange(
&self,
Expand Down Expand Up @@ -56,7 +59,10 @@ impl ExchangeApi for RabbitMqClient {
async fn list_exchanges(
&self,
vhost: Option<String>,
) -> Result<Vec<RabbitMqExchange>, RabbitMqClientError> {
pagination: Option<RabbitMqPagination>,
) -> Result<RabbitMqPaginatedResponse<RabbitMqExchange>, RabbitMqClientError> {
let pagination: RabbitMqPaginationRequest = pagination.unwrap_or_default().into();

let response = self
.client
.request(
Expand All @@ -67,6 +73,7 @@ impl ExchangeApi for RabbitMqClient {
vhost.unwrap_or_default()
),
)
.query(&pagination)
.send()
.await?;

Expand Down Expand Up @@ -96,8 +103,20 @@ impl ExchangeApi for RabbitMqClient {
exchange: String,
request: RabbitMqExchangeRequest,
) -> Result<(), RabbitMqClientError> {
let exchanges = self.list_exchanges(Some(vhost.clone())).await?;
if let Some(existing) = exchanges.iter().find(|e| e.name == exchange) {
let exchanges = self
.list_exchanges(
Some(vhost.clone()),
Some(RabbitMqPagination {
page: 1,
page_size: None,
filter: Some(RabbitMqPaginationFilter::RegexFilter(format!(
"({exchange}$)"
))),
}),
)
.await?;

if let Some(existing) = exchanges.items.iter().find(|e| e.name == exchange) {
return Err(RabbitMqClientError::AlreadyExists(format!(
"{} exchange",
existing.name
Expand Down
4 changes: 3 additions & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod _generic;
pub mod binding;
pub mod connection;
pub mod exchange;
pub mod message;
pub mod node;
pub mod overview;
mod pagination;
pub mod permission;
pub mod policy;
pub mod queue;
pub mod user;
pub mod vhost;

pub use pagination::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter};
66 changes: 66 additions & 0 deletions src/api/pagination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use serde::{Deserialize, Serialize};

const PAGINATION_PAGE_SIZE: u32 = 50;

#[derive(Debug, Clone)]
pub struct RabbitMqPagination {
pub page: u32,
pub page_size: Option<u32>,
pub filter: Option<RabbitMqPaginationFilter>,
}

#[derive(Debug, Clone)]
pub enum RabbitMqPaginationFilter {
StringFilter(String),
RegexFilter(String),
}

#[derive(Debug, Clone, Serialize)]
pub struct RabbitMqPaginationRequest {
pub page: u32,
pub page_size: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub use_regex: Option<bool>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct RabbitMqPaginatedResponse<T> {
pub filtered_count: u32,
pub item_count: u32,
pub items: Vec<T>,
pub page: u32,
pub page_count: u32,
pub page_size: u32,
pub total_count: u32,
}

impl Default for RabbitMqPagination {
fn default() -> Self {
Self {
page: 1,
page_size: None,
filter: None,
}
}
}

impl From<RabbitMqPagination> for RabbitMqPaginationRequest {
fn from(value: RabbitMqPagination) -> Self {
let (name, use_regex) = match value.filter {
None => (None, None),
Some(f) => match f {
RabbitMqPaginationFilter::StringFilter(s) => (Some(s), Some(false)),
RabbitMqPaginationFilter::RegexFilter(r) => (Some(r), Some(true)),
},
};

Self {
page: value.page,
page_size: value.page_size.unwrap_or(PAGINATION_PAGE_SIZE),
name,
use_regex,
}
}
}
11 changes: 9 additions & 2 deletions src/api/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::api::_generic::{handle_empty_response, handle_response};
use crate::api::binding::RabbitMqBinding;
use crate::api::pagination::RabbitMqPaginationRequest;
use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination};
use crate::errors::RabbitMqClientError;
use crate::RabbitMqClient;
use async_trait::async_trait;
Expand All @@ -12,7 +14,8 @@ pub trait QueueApi {
async fn list_queues(
&self,
vhost: Option<String>,
) -> Result<Vec<RabbitMqQueue>, RabbitMqClientError>;
pagination: Option<RabbitMqPagination>,
) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError>;

async fn get_queue(
&self,
Expand Down Expand Up @@ -58,13 +61,17 @@ impl QueueApi for RabbitMqClient {
async fn list_queues(
&self,
vhost: Option<String>,
) -> Result<Vec<RabbitMqQueue>, RabbitMqClientError> {
pagination: Option<RabbitMqPagination>,
) -> Result<RabbitMqPaginatedResponse<RabbitMqQueue>, RabbitMqClientError> {
let pagination: RabbitMqPaginationRequest = pagination.unwrap_or_default().into();

let response = self
.client
.request(
reqwest::Method::GET,
format!("{}/api/queues/{}", self.api_url, vhost.unwrap_or_default()),
)
.query(&pagination)
.send()
.await?;

Expand Down
144 changes: 142 additions & 2 deletions tests/all/exchanges.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::context::TestContext;
use rabbitmq_management_client::api::exchange::{ExchangeApi, RabbitMqExchangeRequest};
use rabbitmq_management_client::api::{RabbitMqPagination, RabbitMqPaginationFilter};
use rabbitmq_management_client::errors::RabbitMqClientError;

#[tokio::test]
Expand All @@ -8,11 +9,150 @@ async fn can_list_exchanges() {

let exchanges = ctx
.rabbitmq
.list_exchanges(None)
.list_exchanges(None, None)
.await
.expect("failed to list exchanges");

assert!(!exchanges.is_empty());
assert!(!exchanges.items.is_empty());
}

#[tokio::test]
async fn can_list_exchanges_paginated() {
let ctx = TestContext::new();

let vhost = ctx
.create_random_vhost()
.await
.expect("failed to create vhost");

for i in 0..20 {
ctx.rabbitmq
.create_exchange(
vhost.name.clone(),
format!("test-exchange_{i}"),
RabbitMqExchangeRequest {
kind: "direct".to_string(),
auto_delete: true,
durable: false,
internal: false,
},
)
.await
.expect("failed to create exchange");
}

let exchanges = ctx
.rabbitmq
.list_exchanges(
Some(vhost.name.clone()),
Some(RabbitMqPagination {
page: 1,
page_size: Some(5),
filter: None,
}),
)
.await
.expect("failed to list exchanges");

assert_eq!(exchanges.items.len(), 5);

ctx.delete_vhost(vhost.name)
.await
.expect("failed to delete vhost");
}

#[tokio::test]
async fn can_filter_exchanges() {
let ctx = TestContext::new();

let vhost = ctx
.create_random_vhost()
.await
.expect("failed to create vhost");

for i in 0..5 {
ctx.rabbitmq
.create_exchange(
vhost.name.clone(),
format!("test-exchange_{i}"),
RabbitMqExchangeRequest {
kind: "direct".to_string(),
auto_delete: true,
durable: false,
internal: false,
},
)
.await
.expect("failed to create exchange");
}

let exchanges = ctx
.rabbitmq
.list_exchanges(
Some(vhost.name.clone()),
Some(RabbitMqPagination {
page: 1,
page_size: None,
filter: Some(RabbitMqPaginationFilter::StringFilter(
"test-exchange_3".to_string(),
)),
}),
)
.await
.expect("failed to list exchanges");

assert_eq!(exchanges.items.len(), 1);

ctx.delete_vhost(vhost.name)
.await
.expect("failed to delete vhost");
}

#[tokio::test]
async fn can_regex_filter_exchanges() {
let ctx = TestContext::new();

let vhost = ctx
.create_random_vhost()
.await
.expect("failed to create vhost");

for i in 0..5 {
ctx.rabbitmq
.create_exchange(
vhost.name.clone(),
format!("test-exchange_{i}"),
RabbitMqExchangeRequest {
kind: "direct".to_string(),
auto_delete: true,
durable: false,
internal: false,
},
)
.await
.expect("failed to create exchange");
}

let exchanges = ctx
.rabbitmq
.list_exchanges(
Some(vhost.name.clone()),
Some(RabbitMqPagination {
page: 1,
page_size: None,
filter: Some(RabbitMqPaginationFilter::RegexFilter(
"(test-exchange_3$|test-exchange_0$)".to_string(),
)),
}),
)
.await
.expect("failed to list exchanges");

assert_eq!(exchanges.items.len(), 2);

ctx.delete_vhost(vhost.name)
.await
.expect("failed to delete vhost");
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 33886a5

Please sign in to comment.