From 33886a5b31587ae1173cd31093753152827ce2df Mon Sep 17 00:00:00 2001 From: Stefan Danaita Date: Thu, 31 Oct 2024 15:48:46 +0000 Subject: [PATCH] [OSS-118] Support for pagination (#53) --- src/api/connection.rs | 37 ----------- src/api/exchange.rs | 27 ++++++-- src/api/mod.rs | 4 +- src/api/pagination.rs | 66 ++++++++++++++++++ src/api/queue.rs | 11 ++- tests/all/exchanges.rs | 144 ++++++++++++++++++++++++++++++++++++++- tests/all/queues.rs | 148 ++++++++++++++++++++++++++++++++++++++++- 7 files changed, 389 insertions(+), 48 deletions(-) delete mode 100644 src/api/connection.rs create mode 100644 src/api/pagination.rs diff --git a/src/api/connection.rs b/src/api/connection.rs deleted file mode 100644 index 2a37990..0000000 --- a/src/api/connection.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::api::_generic::handle_response; -use crate::errors::RabbitMqClientError; -use crate::RabbitMqClient; -use async_trait::async_trait; -use serde::Deserialize; - -#[async_trait] -pub trait ConnectionApi { - async fn list_connections( - &self, - vhost: Option, - ) -> Result, RabbitMqClientError>; -} - -#[async_trait] -impl ConnectionApi for RabbitMqClient { - async fn list_connections( - &self, - vhost: Option, - ) -> Result, RabbitMqClientError> { - let endpoint = match vhost { - None => format!("{}/api/connections", self.api_url), - Some(vhost) => format!("{}/api/vhosts/{}/connections", self.api_url, vhost), - }; - - let response = self - .client - .request(reqwest::Method::GET, &endpoint) - .send() - .await?; - - handle_response(response).await - } -} - -#[derive(Debug, Deserialize)] -pub struct RabbitMqConnection {} diff --git a/src/api/exchange.rs b/src/api/exchange.rs index eedd97d..b79e774 100644 --- a/src/api/exchange.rs +++ b/src/api/exchange.rs @@ -1,5 +1,7 @@ use crate::api::_generic::{handle_empty_response, handle_response}; use crate::api::binding::RabbitMqBinding; +use crate::api::pagination::RabbitMqPaginationRequest; +use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter}; use crate::errors::RabbitMqClientError; use crate::RabbitMqClient; use async_trait::async_trait; @@ -10,7 +12,8 @@ pub trait ExchangeApi { async fn list_exchanges( &self, vhost: Option, - ) -> Result, RabbitMqClientError>; + pagination: Option, + ) -> Result, RabbitMqClientError>; async fn get_exchange( &self, @@ -56,7 +59,10 @@ impl ExchangeApi for RabbitMqClient { async fn list_exchanges( &self, vhost: Option, - ) -> Result, RabbitMqClientError> { + pagination: Option, + ) -> Result, RabbitMqClientError> { + let pagination: RabbitMqPaginationRequest = pagination.unwrap_or_default().into(); + let response = self .client .request( @@ -67,6 +73,7 @@ impl ExchangeApi for RabbitMqClient { vhost.unwrap_or_default() ), ) + .query(&pagination) .send() .await?; @@ -96,8 +103,20 @@ impl ExchangeApi for RabbitMqClient { exchange: String, request: RabbitMqExchangeRequest, ) -> Result<(), RabbitMqClientError> { - let exchanges = self.list_exchanges(Some(vhost.clone())).await?; - if let Some(existing) = exchanges.iter().find(|e| e.name == exchange) { + let exchanges = self + .list_exchanges( + Some(vhost.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: None, + filter: Some(RabbitMqPaginationFilter::RegexFilter(format!( + "({exchange}$)" + ))), + }), + ) + .await?; + + if let Some(existing) = exchanges.items.iter().find(|e| e.name == exchange) { return Err(RabbitMqClientError::AlreadyExists(format!( "{} exchange", existing.name diff --git a/src/api/mod.rs b/src/api/mod.rs index ed8b33d..7b3ec1f 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,12 +1,14 @@ mod _generic; pub mod binding; -pub mod connection; pub mod exchange; pub mod message; pub mod node; pub mod overview; +mod pagination; pub mod permission; pub mod policy; pub mod queue; pub mod user; pub mod vhost; + +pub use pagination::{RabbitMqPaginatedResponse, RabbitMqPagination, RabbitMqPaginationFilter}; diff --git a/src/api/pagination.rs b/src/api/pagination.rs new file mode 100644 index 0000000..947b012 --- /dev/null +++ b/src/api/pagination.rs @@ -0,0 +1,66 @@ +use serde::{Deserialize, Serialize}; + +const PAGINATION_PAGE_SIZE: u32 = 50; + +#[derive(Debug, Clone)] +pub struct RabbitMqPagination { + pub page: u32, + pub page_size: Option, + pub filter: Option, +} + +#[derive(Debug, Clone)] +pub enum RabbitMqPaginationFilter { + StringFilter(String), + RegexFilter(String), +} + +#[derive(Debug, Clone, Serialize)] +pub struct RabbitMqPaginationRequest { + pub page: u32, + pub page_size: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub use_regex: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct RabbitMqPaginatedResponse { + pub filtered_count: u32, + pub item_count: u32, + pub items: Vec, + pub page: u32, + pub page_count: u32, + pub page_size: u32, + pub total_count: u32, +} + +impl Default for RabbitMqPagination { + fn default() -> Self { + Self { + page: 1, + page_size: None, + filter: None, + } + } +} + +impl From for RabbitMqPaginationRequest { + fn from(value: RabbitMqPagination) -> Self { + let (name, use_regex) = match value.filter { + None => (None, None), + Some(f) => match f { + RabbitMqPaginationFilter::StringFilter(s) => (Some(s), Some(false)), + RabbitMqPaginationFilter::RegexFilter(r) => (Some(r), Some(true)), + }, + }; + + Self { + page: value.page, + page_size: value.page_size.unwrap_or(PAGINATION_PAGE_SIZE), + name, + use_regex, + } + } +} diff --git a/src/api/queue.rs b/src/api/queue.rs index bd11f31..da44607 100644 --- a/src/api/queue.rs +++ b/src/api/queue.rs @@ -1,5 +1,7 @@ use crate::api::_generic::{handle_empty_response, handle_response}; use crate::api::binding::RabbitMqBinding; +use crate::api::pagination::RabbitMqPaginationRequest; +use crate::api::{RabbitMqPaginatedResponse, RabbitMqPagination}; use crate::errors::RabbitMqClientError; use crate::RabbitMqClient; use async_trait::async_trait; @@ -12,7 +14,8 @@ pub trait QueueApi { async fn list_queues( &self, vhost: Option, - ) -> Result, RabbitMqClientError>; + pagination: Option, + ) -> Result, RabbitMqClientError>; async fn get_queue( &self, @@ -58,13 +61,17 @@ impl QueueApi for RabbitMqClient { async fn list_queues( &self, vhost: Option, - ) -> Result, RabbitMqClientError> { + pagination: Option, + ) -> Result, RabbitMqClientError> { + let pagination: RabbitMqPaginationRequest = pagination.unwrap_or_default().into(); + let response = self .client .request( reqwest::Method::GET, format!("{}/api/queues/{}", self.api_url, vhost.unwrap_or_default()), ) + .query(&pagination) .send() .await?; diff --git a/tests/all/exchanges.rs b/tests/all/exchanges.rs index d464c90..c693c0b 100644 --- a/tests/all/exchanges.rs +++ b/tests/all/exchanges.rs @@ -1,5 +1,6 @@ use crate::context::TestContext; use rabbitmq_management_client::api::exchange::{ExchangeApi, RabbitMqExchangeRequest}; +use rabbitmq_management_client::api::{RabbitMqPagination, RabbitMqPaginationFilter}; use rabbitmq_management_client::errors::RabbitMqClientError; #[tokio::test] @@ -8,11 +9,150 @@ async fn can_list_exchanges() { let exchanges = ctx .rabbitmq - .list_exchanges(None) + .list_exchanges(None, None) .await .expect("failed to list exchanges"); - assert!(!exchanges.is_empty()); + assert!(!exchanges.items.is_empty()); +} + +#[tokio::test] +async fn can_list_exchanges_paginated() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + for i in 0..20 { + ctx.rabbitmq + .create_exchange( + vhost.name.clone(), + format!("test-exchange_{i}"), + RabbitMqExchangeRequest { + kind: "direct".to_string(), + auto_delete: true, + durable: false, + internal: false, + }, + ) + .await + .expect("failed to create exchange"); + } + + let exchanges = ctx + .rabbitmq + .list_exchanges( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: Some(5), + filter: None, + }), + ) + .await + .expect("failed to list exchanges"); + + assert_eq!(exchanges.items.len(), 5); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); +} + +#[tokio::test] +async fn can_filter_exchanges() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + for i in 0..5 { + ctx.rabbitmq + .create_exchange( + vhost.name.clone(), + format!("test-exchange_{i}"), + RabbitMqExchangeRequest { + kind: "direct".to_string(), + auto_delete: true, + durable: false, + internal: false, + }, + ) + .await + .expect("failed to create exchange"); + } + + let exchanges = ctx + .rabbitmq + .list_exchanges( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: None, + filter: Some(RabbitMqPaginationFilter::StringFilter( + "test-exchange_3".to_string(), + )), + }), + ) + .await + .expect("failed to list exchanges"); + + assert_eq!(exchanges.items.len(), 1); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); +} + +#[tokio::test] +async fn can_regex_filter_exchanges() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + for i in 0..5 { + ctx.rabbitmq + .create_exchange( + vhost.name.clone(), + format!("test-exchange_{i}"), + RabbitMqExchangeRequest { + kind: "direct".to_string(), + auto_delete: true, + durable: false, + internal: false, + }, + ) + .await + .expect("failed to create exchange"); + } + + let exchanges = ctx + .rabbitmq + .list_exchanges( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: None, + filter: Some(RabbitMqPaginationFilter::RegexFilter( + "(test-exchange_3$|test-exchange_0$)".to_string(), + )), + }), + ) + .await + .expect("failed to list exchanges"); + + assert_eq!(exchanges.items.len(), 2); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); } #[tokio::test] diff --git a/tests/all/queues.rs b/tests/all/queues.rs index 6e75492..b0cd205 100644 --- a/tests/all/queues.rs +++ b/tests/all/queues.rs @@ -9,6 +9,7 @@ use rabbitmq_management_client::api::message::{ RabbitMqPublishMessageRequest, }; use rabbitmq_management_client::api::queue::{QueueApi, RabbitMqQueueAction, RabbitMqQueueRequest}; +use rabbitmq_management_client::api::{RabbitMqPagination, RabbitMqPaginationFilter}; use rabbitmq_management_client::errors::RabbitMqClientError; use std::collections::HashMap; @@ -52,11 +53,154 @@ async fn can_list_queues() { let queues = ctx .rabbitmq - .list_queues(Some(vhost.name.clone())) + .list_queues(Some(vhost.name.clone()), None) .await .expect("failed to list queues"); - assert_eq!(queues.len(), 2); + assert_eq!(queues.items.len(), 2); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); +} + +#[tokio::test] +async fn can_list_queues_paginated() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + // Create a couple of queues + for i in 0..25 { + ctx.rabbitmq + .create_queue( + vhost.name.clone(), + format!("test-pagination_{}", i), + RabbitMqQueueRequest { + auto_delete: false, + durable: false, + arguments: None, + node: None, + }, + ) + .await + .expect("failed to create queue"); + } + + let queues = ctx + .rabbitmq + .list_queues( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: Some(5), + filter: None, + }), + ) + .await + .expect("failed to list queues"); + + assert_eq!(queues.items.len(), 5); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); +} + +#[tokio::test] +async fn can_filter_queues() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + // Create a couple of queues + for i in 0..5 { + ctx.rabbitmq + .create_queue( + vhost.name.clone(), + format!("test-pagination_{}", i), + RabbitMqQueueRequest { + auto_delete: false, + durable: false, + arguments: None, + node: None, + }, + ) + .await + .expect("failed to create queue"); + } + + let queues = ctx + .rabbitmq + .list_queues( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: None, + filter: Some(RabbitMqPaginationFilter::StringFilter( + "test-pagination_3".to_string(), + )), + }), + ) + .await + .expect("failed to list queues"); + + assert_eq!(queues.items.len(), 1); + assert_eq!(queues.items[0].name, "test-pagination_3"); + + ctx.delete_vhost(vhost.name) + .await + .expect("failed to delete vhost"); +} + +#[tokio::test] +async fn can_regex_filter_queues() { + let ctx = TestContext::new(); + + let vhost = ctx + .create_random_vhost() + .await + .expect("failed to create vhost"); + + // Create a couple of queues + for i in 0..5 { + ctx.rabbitmq + .create_queue( + vhost.name.clone(), + format!("test-pagination_{}", i), + RabbitMqQueueRequest { + auto_delete: false, + durable: false, + arguments: None, + node: None, + }, + ) + .await + .expect("failed to create queue"); + } + + let queues = ctx + .rabbitmq + .list_queues( + Some(vhost.name.clone()), + Some(RabbitMqPagination { + page: 1, + page_size: None, + filter: Some(RabbitMqPaginationFilter::RegexFilter( + "(test-pagination_3|test-pagination_0)".to_string(), + )), + }), + ) + .await + .expect("failed to list queues"); + + assert_eq!(queues.items.len(), 2); ctx.delete_vhost(vhost.name) .await