Skip to content

Commit

Permalink
feat:根据 namespace 导入导出
Browse files Browse the repository at this point in the history
  • Loading branch information
bestK committed Jun 19, 2024
1 parent 7378ad1 commit 714e224
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub mod utils;
pub struct ConfigUtils;

pub const DEFAULT_TENANT: &str = "public";
pub const __INNER_SYSTEM__TENANT: &str = "__INNER_SYSTEM__";
pub const MANIFEST: &str = "manifest";

impl ConfigUtils {
pub fn default_tenant(val: String) -> String {
Expand Down
164 changes: 146 additions & 18 deletions src/console/config_api.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,45 @@
#![allow(unused_imports)]

use std::default;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::Arc;

use actix::prelude::Addr;
use actix_multipart::form::MultipartForm;
use actix_multipart::form::tempfile::TempFile;
use actix_multipart::form::text::Text;
use actix_multipart::form::MultipartForm;
use actix_multipart::Multipart;
use actix_web::{http::header, web, Error, HttpRequest, HttpResponse, Responder};
use actix_web::{Error, http::header, HttpRequest, HttpResponse, Responder, web};
use tempfile::NamedTempFile;
use zip::write::FileOptions;
use zip::{ZipArchive, ZipWriter};

use crate::common::appdata::AppShareData;
use crate::common::model::ApiResult;
use crate::config::{__INNER_SYSTEM__TENANT, ConfigUtils, DEFAULT_TENANT, MANIFEST};
use crate::config::core::{
ConfigActor, ConfigAsyncCmd, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigResult,
ConfigActor, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigResult,
};
use crate::config::ConfigUtils;
use crate::console::model::config_model::{
OpsConfigOptQueryListResponse, OpsConfigQueryListRequest,
};
use crate::console::{DEFAULT_NAMESPACE_INFO, NamespaceUtils};
use crate::now_millis;
use crate::raft::cluster::model::SetConfigReq;
use actix::prelude::Addr;
use tokio_stream::StreamExt;
use uuid::Uuid;
use zip::{ZipArchive, ZipWriter};

use super::model::config_model::OpsConfigImportInfo;
use super::model::PageResult;
use super::model::{NamespaceInfo, PageResult};

pub async fn query_config_list(
request: web::Query<OpsConfigQueryListRequest>,
config_addr: web::Data<Addr<ConfigActor>>,
) -> impl Responder {
let cmd = ConfigCmd::QueryPageInfo(Box::new(request.0.to_param().unwrap()));
let mut param = request.0.to_param().unwrap();
if let Some(tenant) = param.tenant.as_ref() {
if tenant.as_str() == "all" {
param.tenant = None;
}
}
let cmd = ConfigCmd::QueryPageInfo(Box::from(param));
match config_addr.send(cmd).await {
Ok(res) => {
let r: ConfigResult = res.unwrap();
Expand Down Expand Up @@ -107,7 +113,23 @@ pub async fn import_config(
},
));
//let tenant = Arc::new(ConfigUtils::default_tenant(config_info.0.tenant.unwrap_or_default()));

let mut folder_depth = 2;
let mut import_by_tenant = false;
let mut data_id_index = 1;
let mut group_index = 0;
let mut namespaces: Vec<NamespaceInfo> = vec![];

for f in form.files {
let namespace_str = get_contains_file_content(f.file.as_file(), MANIFEST);
if !namespace_str.is_empty() {
folder_depth = 3;
import_by_tenant = true;
data_id_index = 2;
group_index = 1;
namespaces = serde_json::from_str(&*namespace_str)?;
}

match zip::ZipArchive::new(f.file) {
Ok(mut archive) => {
for i in 0..archive.len() {
Expand All @@ -121,15 +143,37 @@ pub async fn import_config(
let filename = file.name();
if !(*filename).ends_with('/') {
let parts = filename.split('/').collect::<Vec<_>>();
if parts.len() != 2 {

if parts.len() != folder_depth {
continue;
}
assert!(parts.len() == 2);
let config_key = ConfigKey::new_by_arc(
Arc::new(parts[1].to_owned()),
Arc::new(parts[0].to_owned()),

assert_eq!(parts.len(), folder_depth);

if MANIFEST.eq(filename) {
continue;
}


let mut config_key = ConfigKey::new_by_arc(
Arc::new(parts[data_id_index].to_owned()),
Arc::new(parts[group_index].to_owned()),
tenant.clone(),
);

if import_by_tenant {
config_key.tenant = if DEFAULT_TENANT.eq(&parts[0].to_owned()) {
Arc::from(String::new())
} else {
Arc::new(parts[0].to_owned())
};
let namespace_info = get_namespace_by_id(config_key.tenant.to_string(), namespaces.clone());
match NamespaceUtils::add_namespace(&app, namespace_info.clone()).await {
Ok(_) => log::info!("add namespace {:?} success", namespace_info.clone().namespace_name),
Err(e) => log::error!("add namespace {:?} error: {}", namespace_info.clone().namespace_name, e.to_string()),
};
}

let value = match io::read_to_string(&mut file) {
Ok(v) => v,
Err(_) => continue,
Expand Down Expand Up @@ -174,6 +218,77 @@ fn zip_file(mut zip: ZipWriter<&mut File>, list: Vec<ConfigInfoDto>) -> anyhow::
Ok(())
}

async fn zip_file_for_tenant(mut zip: ZipWriter<&mut File>, list: Vec<ConfigInfoDto>, config_addr: web::Data<Addr<ConfigActor>>) -> anyhow::Result<()> {
let options = FileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
.unix_permissions(0o755);

if list.is_empty() {
zip.start_file(".ignore", options)?;
zip.write_all("empty config".as_bytes())?;
}


for item in &list {
if __INNER_SYSTEM__TENANT.eq(&(*item.tenant.as_str())) {
continue;
}
let tenant = if item.tenant.as_str().is_empty() {
DEFAULT_TENANT
} else {
item.tenant.as_str()
};

zip.add_directory(&format!("{}/{}", tenant, &item.group.as_str()), Default::default())
.ok();
zip.start_file(
&format!("{}/{}/{}", tenant, &item.group.as_str(), &item.data_id.as_str()),
options,
)?;
zip.write_all(item.content.as_ref().unwrap().as_bytes())?;
}

let namespaces = NamespaceUtils::get_namespaces(&config_addr).await;
let v = serde_json::to_string(&namespaces).unwrap();

zip.start_file("manifest", options)?;
zip.write_all(v.as_bytes())?;

zip.finish()?;
Ok(())
}

fn get_contains_file_content<R: Read + Seek>(reader: R, filename: &str) -> String {
let mut archive = ZipArchive::new(reader).expect("Failed to read ZIP archive");

for i in 0..archive.len() {
let mut file = archive.by_index(i).expect("Failed to access file in ZIP archive");
if file.name() == filename {
let mut content = String::new();
file.read_to_string(&mut content).expect("Failed to read file content");
return content;
}
}

String::new()
}

fn get_namespace_by_id(namespace_id: String, namespaces: Vec<NamespaceInfo>) -> NamespaceInfo {
if namespace_id.is_empty() {
let mut default = NamespaceInfo::default();
default.namespace_name = Some(DEFAULT_TENANT.to_owned());
default.r#type = Some("0".parse().unwrap());
return default;
}

for namespace in namespaces {
if namespace.clone().namespace_id.unwrap_or_default().eq(&namespace_id) {
return namespace.clone();
}
}
NamespaceInfo::default()
}

///
/// 按查询条件导出配置
pub async fn download_config(
Expand All @@ -183,6 +298,13 @@ pub async fn download_config(
let mut param = request.0.to_param().unwrap();
param.limit = 0xffff_ffff;
param.query_context = true;
let mut download_all = false;
if let Some(tenant) = param.tenant.as_ref() {
if tenant.as_str() == "all" {
param.tenant = None;
download_all = true
}
}
let cmd = ConfigCmd::QueryPageInfo(Box::new(param));
match config_addr.send(cmd).await {
Ok(res) => {
Expand All @@ -193,7 +315,13 @@ pub async fn download_config(
{
let write = std::io::Write::by_ref(&mut tmpfile);
let zip = ZipWriter::new(write);
zip_file(zip, list).ok();
let result = if download_all {
zip_file_for_tenant(zip, list, config_addr).await
} else {
zip_file(zip, list)
};

result.ok();
}
// Seek to start
tmpfile.seek(SeekFrom::Start(0)).unwrap();
Expand Down

0 comments on commit 714e224

Please sign in to comment.