From 718898802cab6bea1551a1954fa503b40a39a499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 14:52:29 +0200 Subject: [PATCH 1/6] refactor: move session allocator to a more functional approach This also removes the need to have a wrapper struct that holds an Arc to the StringStore. --- linkup-cli/src/local_server.rs | 20 ++- linkup/src/lib.rs | 78 ++++++----- linkup/src/session_allocator.rs | 238 ++++++++++++++++---------------- worker/src/lib.rs | 31 ++--- worker/src/ws.rs | 4 +- 5 files changed, 186 insertions(+), 185 deletions(-) diff --git a/linkup-cli/src/local_server.rs b/linkup-cli/src/local_server.rs index 865729b..0d25c06 100644 --- a/linkup-cli/src/local_server.rs +++ b/linkup-cli/src/local_server.rs @@ -22,8 +22,6 @@ async fn linkup_config_handler( string_store: web::Data, req_body: web::Bytes, ) -> impl Responder { - let sessions = SessionAllocator::new(string_store.into_inner()); - let input_json_conf = match String::from_utf8(req_body.to_vec()) { Ok(input_json_conf) => input_json_conf, Err(_) => { @@ -35,9 +33,13 @@ async fn linkup_config_handler( match update_session_req_from_json(input_json_conf) { Ok((desired_name, server_conf)) => { - let session_name = sessions - .store_session(server_conf, NameKind::Animal, desired_name) - .await; + let session_name = store_session( + string_store.as_ref(), + server_conf, + NameKind::Animal, + desired_name, + ) + .await; match session_name { Ok(session_name) => HttpResponse::Ok().body(session_name), Err(e) => HttpResponse::InternalServerError() @@ -59,12 +61,10 @@ async fn linkup_ws_request_handler( req: HttpRequest, req_stream: web::Payload, ) -> impl Responder { - let sessions = SessionAllocator::new(string_store.into_inner()); - let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_result = sessions.get_request_session(&url, &headers).await; + let session_result = get_request_session(string_store.as_ref(), &url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); @@ -161,12 +161,10 @@ async fn linkup_request_handler( req: HttpRequest, req_body: web::Bytes, ) -> impl Responder { - let sessions = SessionAllocator::new(string_store.into_inner()); - let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_result = sessions.get_request_session(&url, &headers).await; + let session_result = get_request_session(string_store.as_ref(), &url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); diff --git a/linkup/src/lib.rs b/linkup/src/lib.rs index e21b436..07f4e59 100644 --- a/linkup/src/lib.rs +++ b/linkup/src/lib.rs @@ -325,36 +325,36 @@ mod tests { #[tokio::test] async fn test_get_request_session_by_subdomain() { - let sessions = SessionAllocator::new(Arc::new(MemoryStringStore::new())); + let string_store = MemoryStringStore::new(); let config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let config: Session = config_value.try_into().unwrap(); - let name = sessions - .store_session(config, NameKind::Animal, "".to_string()) + let name = store_session(&string_store, config, NameKind::Animal, "".to_string()) .await .unwrap(); // Normal subdomain - sessions - .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) - .await - .unwrap(); + get_request_session( + &string_store, + &format!("{}.example.com", name), + &HeaderMap::new(), + ) + .await + .unwrap(); // Referer let mut referer_headers = HeaderMap::new(); // TODO check header capitalization referer_headers.insert("referer", format!("http://{}.example.com", name)); - sessions - .get_request_session("example.com", &referer_headers) + get_request_session(&string_store, "example.com", &referer_headers) .await .unwrap(); // Origin let mut origin_headers = HeaderMap::new(); origin_headers.insert("origin", format!("http://{}.example.com", name)); - sessions - .get_request_session("example.com", &origin_headers) + get_request_session(&string_store, "example.com", &origin_headers) .await .unwrap(); @@ -364,15 +364,13 @@ mod tests { HeaderName::TraceState, format!("some-other=xyz,linkup-session={}", name), ); - sessions - .get_request_session("example.com", &trace_headers) + get_request_session(&string_store, "example.com", &trace_headers) .await .unwrap(); let mut trace_headers_two = HeaderMap::new(); trace_headers_two.insert(HeaderName::TraceState, format!("linkup-session={}", name)); - sessions - .get_request_session("example.com", &trace_headers_two) + get_request_session(&string_store, "example.com", &trace_headers_two) .await .unwrap(); } @@ -455,20 +453,27 @@ mod tests { #[tokio::test] async fn test_get_target_url() { - let sessions = SessionAllocator::new(Arc::new(MemoryStringStore::new())); + let string_store = MemoryStringStore::new(); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = sessions - .store_session(input_config, NameKind::Animal, "".to_string()) - .await - .unwrap(); + let name = store_session( + &string_store, + input_config, + NameKind::Animal, + "".to_string(), + ) + .await + .unwrap(); - let (name, config) = sessions - .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) - .await - .unwrap(); + let (name, config) = get_request_session( + &string_store, + &format!("{}.example.com", name), + &HeaderMap::new(), + ) + .await + .unwrap(); // Standard named subdomain assert_eq!( @@ -544,20 +549,27 @@ mod tests { #[tokio::test] async fn test_repeatable_rewritten_routes() { - let sessions = SessionAllocator::new(Arc::new(MemoryStringStore::new())); + let string_store = MemoryStringStore::new(); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = sessions - .store_session(input_config, NameKind::Animal, "".to_string()) - .await - .unwrap(); + let name = store_session( + &string_store, + input_config, + NameKind::Animal, + "".to_string(), + ) + .await + .unwrap(); - let (name, config) = sessions - .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) - .await - .unwrap(); + let (name, config) = get_request_session( + &string_store, + &format!("{}.example.com", name), + &HeaderMap::new(), + ) + .await + .unwrap(); // Case is, target service on the remote side is a tunnel. // If the path gets rewritten once remotely, it can throw off finding diff --git a/linkup/src/session_allocator.rs b/linkup/src/session_allocator.rs index 1cb3afa..fcec4e0 100644 --- a/linkup/src/session_allocator.rs +++ b/linkup/src/session_allocator.rs @@ -1,159 +1,153 @@ -use std::sync::Arc; - use crate::{ extract_tracestate_session, first_subdomain, headers::HeaderName, random_animal, random_six_char, session_to_json, ConfigError, HeaderMap, NameKind, Session, SessionError, StringStore, }; -pub struct SessionAllocator { - store: Arc, -} - -impl SessionAllocator { - pub fn new(store: Arc) -> Self { - Self { store } +pub async fn get_request_session( + string_store: &impl StringStore, + url: &str, + headers: &HeaderMap, +) -> Result<(String, Session), SessionError> { + let url_name = first_subdomain(url); + if let Some(config) = get_session_config(string_store, url_name.to_string()).await? { + return Ok((url_name, config)); } - pub async fn get_request_session( - &self, - url: &str, - headers: &HeaderMap, - ) -> Result<(String, Session), SessionError> { - let url_name = first_subdomain(url); - if let Some(config) = self.get_session_config(url_name.to_string()).await? { - return Ok((url_name, config)); - } - - if let Some(forwarded_host) = headers.get(HeaderName::ForwardedHost) { - let forwarded_host_name = first_subdomain(forwarded_host); - if let Some(config) = self - .get_session_config(forwarded_host_name.to_string()) - .await? - { - return Ok((forwarded_host_name, config)); - } + if let Some(forwarded_host) = headers.get(HeaderName::ForwardedHost) { + let forwarded_host_name = first_subdomain(forwarded_host); + if let Some(config) = + get_session_config(string_store, forwarded_host_name.to_string()).await? + { + return Ok((forwarded_host_name, config)); } + } - if let Some(referer) = headers.get("referer") { - let referer_name = first_subdomain(referer); - if let Some(config) = self.get_session_config(referer_name.to_string()).await? { - return Ok((referer_name, config)); - } + if let Some(referer) = headers.get("referer") { + let referer_name = first_subdomain(referer); + if let Some(config) = get_session_config(string_store, referer_name.to_string()).await? { + return Ok((referer_name, config)); } + } - if let Some(origin) = headers.get("origin") { - let origin_name = first_subdomain(origin); - if let Some(config) = self.get_session_config(origin_name.to_string()).await? { - return Ok((origin_name, config)); - } + if let Some(origin) = headers.get("origin") { + let origin_name = first_subdomain(origin); + if let Some(config) = get_session_config(string_store, origin_name.to_string()).await? { + return Ok((origin_name, config)); } + } - if let Some(tracestate) = headers.get("tracestate") { - let trace_name = extract_tracestate_session(tracestate); - if let Some(config) = self.get_session_config(trace_name.to_string()).await? { - return Ok((trace_name, config)); - } + if let Some(tracestate) = headers.get("tracestate") { + let trace_name = extract_tracestate_session(tracestate); + if let Some(config) = get_session_config(string_store, trace_name.to_string()).await? { + return Ok((trace_name, config)); } - - Err(SessionError::NoSuchSession(url.to_string())) } - pub async fn store_session( - &self, - config: Session, - name_kind: NameKind, - desired_name: String, - ) -> Result { - let name = self - .choose_name(desired_name, config.session_token.clone(), name_kind) - .await?; + Err(SessionError::NoSuchSession(url.to_string())) +} - let config_str = session_to_json(config); +async fn get_session_config( + string_store: &impl StringStore, + name: String, +) -> Result, SessionError> { + let value = match string_store.get(name).await { + Ok(Some(v)) => v, + Ok(None) => return Ok(None), + Err(e) => return Err(e), + }; - self.store.put(name.clone(), config_str).await?; + let config_value: serde_json::Value = + serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; - Ok(name) - } + let session_config = config_value + .try_into() + .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; - async fn choose_name( - &self, - desired_name: String, - session_token: String, - name_kind: NameKind, - ) -> Result { - if desired_name.is_empty() { - return self.new_session_name(name_kind, desired_name).await; - } + Ok(Some(session_config)) +} - if let Some(session) = self.get_session_config(desired_name.clone()).await? { - if session.session_token == session_token { - return Ok(desired_name); - } - } +pub async fn store_session( + string_store: &impl StringStore, + config: Session, + name_kind: NameKind, + desired_name: String, +) -> Result { + let name = choose_name( + string_store, + desired_name, + config.session_token.clone(), + name_kind, + ) + .await?; + + let config_str = session_to_json(config); + + string_store.put(name.clone(), config_str).await?; + + Ok(name) +} - self.new_session_name(name_kind, desired_name).await +async fn choose_name( + string_store: &impl StringStore, + desired_name: String, + session_token: String, + name_kind: NameKind, +) -> Result { + if desired_name.is_empty() { + return new_session_name(string_store, name_kind, desired_name).await; } - async fn get_session_config(&self, name: String) -> Result, SessionError> { - let value = match self.store.get(name).await { - Ok(Some(v)) => v, - Ok(None) => return Ok(None), - Err(e) => return Err(e), - }; + if let Some(session) = get_session_config(string_store, desired_name.clone()).await? { + if session.session_token == session_token { + return Ok(desired_name); + } + } - let config_value: serde_json::Value = - serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; + new_session_name(string_store, name_kind, desired_name).await +} - let session_config = config_value - .try_into() - .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; +async fn new_session_name( + string_store: &impl StringStore, + name_kind: NameKind, + desired_name: String, +) -> Result { + let mut key = String::new(); - Ok(Some(session_config)) + if !desired_name.is_empty() && !string_store.exists(desired_name.clone()).await? { + key = desired_name; } - async fn new_session_name( - &self, - name_kind: NameKind, - desired_name: String, - ) -> Result { - let mut key = String::new(); - - if !desired_name.is_empty() && !self.store.exists(desired_name.clone()).await? { - key = desired_name; - } - - if key.is_empty() { - let mut tried_animal_key = false; - loop { - let generated_key = if !tried_animal_key && name_kind == NameKind::Animal { - tried_animal_key = true; - self.generate_unique_animal_key(20).await? - } else { - random_six_char() - }; - - if !self.store.exists(generated_key.clone()).await? { - key = generated_key; - break; - } + if key.is_empty() { + let mut tried_animal_key = false; + loop { + let generated_key = if !tried_animal_key && name_kind == NameKind::Animal { + tried_animal_key = true; + generate_unique_animal_key(string_store, 20).await? + } else { + random_six_char() + }; + + if !string_store.exists(generated_key.clone()).await? { + key = generated_key; + break; } } - - Ok(key) } - async fn generate_unique_animal_key( - &self, - max_attempts: usize, - ) -> Result { - for _ in 0..max_attempts { - let generated_key = random_animal(); - if !self.store.exists(generated_key.clone()).await? { - return Ok(generated_key); - } + Ok(key) +} + +async fn generate_unique_animal_key( + string_store: &impl StringStore, + max_attempts: usize, +) -> Result { + for _ in 0..max_attempts { + let generated_key = random_animal(); + if !string_store.exists(generated_key.clone()).await? { + return Ok(generated_key); } - // Fallback to SixChar logic - Ok(random_six_char()) } + // Fallback to SixChar logic + Ok(random_six_char()) } diff --git a/worker/src/lib.rs b/worker/src/lib.rs index d7d75eb..923cdbf 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -1,13 +1,7 @@ -// TODO(augustoccesar)[2023-10-16]: We need to revisit the session allocator and how we are using Arc. -// It seems that `CfWorkerStringStore` is not really Send or Sync, which is making so that clippy warn about this. -// For more info check: https://rust-lang.github.io/rust-clippy/master/index.html#arc_with_non_send_sync -#![allow(clippy::arc_with_non_send_sync)] - use http_util::*; use kv_store::CfWorkerStringStore; use linkup::{HeaderMap as LinkupHeaderMap, *}; use regex::Regex; -use std::sync::Arc; use worker::*; use ws::linkup_ws_handler; @@ -16,7 +10,10 @@ mod kv_store; mod utils; mod ws; -async fn linkup_session_handler(mut req: Request, sessions: SessionAllocator) -> Result { +async fn linkup_session_handler( + mut req: Request, + string_store: &impl StringStore, +) -> Result { let body_bytes = match req.bytes().await { Ok(bytes) => bytes, Err(_) => return plaintext_error("Bad or missing request body", 400), @@ -29,9 +26,8 @@ async fn linkup_session_handler(mut req: Request, sessions: SessionAllocator) -> match update_session_req_from_json(input_yaml_conf) { Ok((desired_name, server_conf)) => { - let session_name = sessions - .store_session(server_conf, NameKind::Animal, desired_name) - .await; + let session_name = + store_session(string_store, server_conf, NameKind::Animal, desired_name).await; match session_name { Ok(session_name) => Response::ok(session_name), @@ -84,7 +80,10 @@ async fn set_cached_req( Ok(resp) } -async fn linkup_request_handler(mut req: Request, sessions: SessionAllocator) -> Result { +async fn linkup_request_handler( + mut req: Request, + string_store: &impl StringStore, +) -> Result { let url = match req.url() { Ok(url) => url.to_string(), Err(_) => return plaintext_error("Bad or missing request url", 400), @@ -93,7 +92,7 @@ async fn linkup_request_handler(mut req: Request, sessions: SessionAllocator) -> let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match sessions.get_request_session(&url, &headers).await { + match get_request_session(string_store, &url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; @@ -154,17 +153,15 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result Result { +pub async fn linkup_ws_handler(req: Request, string_store: &impl StringStore) -> Result { let url = match req.url() { Ok(url) => url.to_string(), Err(_) => return plaintext_error("Bad or missing request url", 400), @@ -17,7 +17,7 @@ pub async fn linkup_ws_handler(req: Request, sessions: SessionAllocator) -> Resu let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match sessions.get_request_session(&url, &headers).await { + match get_request_session(string_store, &url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; From 193ffb530b6ecd1bc6a2afa71810b67776be1008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 15:21:13 +0200 Subject: [PATCH 2/6] refactor: move back to SessionAllocator as a Struct Will keep the same structure to avoid changing the ergonomics around the feature. --- linkup-cli/src/local_server.rs | 17 ++- linkup/src/lib.rs | 77 +++++------ linkup/src/session_allocator.rs | 236 ++++++++++++++++---------------- worker/src/lib.rs | 22 +-- worker/src/ws.rs | 7 +- 5 files changed, 178 insertions(+), 181 deletions(-) diff --git a/linkup-cli/src/local_server.rs b/linkup-cli/src/local_server.rs index 0d25c06..1de23ee 100644 --- a/linkup-cli/src/local_server.rs +++ b/linkup-cli/src/local_server.rs @@ -33,13 +33,10 @@ async fn linkup_config_handler( match update_session_req_from_json(input_json_conf) { Ok((desired_name, server_conf)) => { - let session_name = store_session( - string_store.as_ref(), - server_conf, - NameKind::Animal, - desired_name, - ) - .await; + let session_allocator = SessionAllocator::new(string_store.as_ref()); + let session_name = session_allocator + .store_session(server_conf, NameKind::Animal, desired_name) + .await; match session_name { Ok(session_name) => HttpResponse::Ok().body(session_name), Err(e) => HttpResponse::InternalServerError() @@ -64,7 +61,8 @@ async fn linkup_ws_request_handler( let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_result = get_request_session(string_store.as_ref(), &url, &headers).await; + let session_allocator = SessionAllocator::new(string_store.as_ref()); + let session_result = session_allocator.get_request_session(&url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); @@ -164,7 +162,8 @@ async fn linkup_request_handler( let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_result = get_request_session(string_store.as_ref(), &url, &headers).await; + let session_allocator = SessionAllocator::new(string_store.as_ref()); + let session_result = session_allocator.get_request_session(&url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); diff --git a/linkup/src/lib.rs b/linkup/src/lib.rs index 07f4e59..40fac61 100644 --- a/linkup/src/lib.rs +++ b/linkup/src/lib.rs @@ -271,8 +271,6 @@ fn extrace_tracestate(tracestate: &str, linkup_key: String) -> String { #[cfg(test)] mod tests { - use std::sync::Arc; - use super::*; const CONF_STR: &str = r#" @@ -326,35 +324,36 @@ mod tests { #[tokio::test] async fn test_get_request_session_by_subdomain() { let string_store = MemoryStringStore::new(); + let session_allocator = SessionAllocator::new(&string_store); let config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let config: Session = config_value.try_into().unwrap(); - let name = store_session(&string_store, config, NameKind::Animal, "".to_string()) + let name = session_allocator + .store_session(config, NameKind::Animal, "".to_string()) .await .unwrap(); // Normal subdomain - get_request_session( - &string_store, - &format!("{}.example.com", name), - &HeaderMap::new(), - ) - .await - .unwrap(); + session_allocator + .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) + .await + .unwrap(); // Referer let mut referer_headers = HeaderMap::new(); // TODO check header capitalization referer_headers.insert("referer", format!("http://{}.example.com", name)); - get_request_session(&string_store, "example.com", &referer_headers) + session_allocator + .get_request_session("example.com", &referer_headers) .await .unwrap(); // Origin let mut origin_headers = HeaderMap::new(); origin_headers.insert("origin", format!("http://{}.example.com", name)); - get_request_session(&string_store, "example.com", &origin_headers) + session_allocator + .get_request_session("example.com", &origin_headers) .await .unwrap(); @@ -364,13 +363,15 @@ mod tests { HeaderName::TraceState, format!("some-other=xyz,linkup-session={}", name), ); - get_request_session(&string_store, "example.com", &trace_headers) + session_allocator + .get_request_session("example.com", &trace_headers) .await .unwrap(); let mut trace_headers_two = HeaderMap::new(); trace_headers_two.insert(HeaderName::TraceState, format!("linkup-session={}", name)); - get_request_session(&string_store, "example.com", &trace_headers_two) + session_allocator + .get_request_session("example.com", &trace_headers_two) .await .unwrap(); } @@ -454,26 +455,20 @@ mod tests { #[tokio::test] async fn test_get_target_url() { let string_store = MemoryStringStore::new(); + let session_allocator = SessionAllocator::new(&string_store); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = store_session( - &string_store, - input_config, - NameKind::Animal, - "".to_string(), - ) - .await - .unwrap(); + let name = session_allocator + .store_session(input_config, NameKind::Animal, "".to_string()) + .await + .unwrap(); - let (name, config) = get_request_session( - &string_store, - &format!("{}.example.com", name), - &HeaderMap::new(), - ) - .await - .unwrap(); + let (name, config) = session_allocator + .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) + .await + .unwrap(); // Standard named subdomain assert_eq!( @@ -550,26 +545,20 @@ mod tests { #[tokio::test] async fn test_repeatable_rewritten_routes() { let string_store = MemoryStringStore::new(); + let session_allocator = SessionAllocator::new(&string_store); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = store_session( - &string_store, - input_config, - NameKind::Animal, - "".to_string(), - ) - .await - .unwrap(); + let name = session_allocator + .store_session(input_config, NameKind::Animal, "".to_string()) + .await + .unwrap(); - let (name, config) = get_request_session( - &string_store, - &format!("{}.example.com", name), - &HeaderMap::new(), - ) - .await - .unwrap(); + let (name, config) = session_allocator + .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) + .await + .unwrap(); // Case is, target service on the remote side is a tunnel. // If the path gets rewritten once remotely, it can throw off finding diff --git a/linkup/src/session_allocator.rs b/linkup/src/session_allocator.rs index fcec4e0..17e2ece 100644 --- a/linkup/src/session_allocator.rs +++ b/linkup/src/session_allocator.rs @@ -4,150 +4,154 @@ use crate::{ StringStore, }; -pub async fn get_request_session( - string_store: &impl StringStore, - url: &str, - headers: &HeaderMap, -) -> Result<(String, Session), SessionError> { - let url_name = first_subdomain(url); - if let Some(config) = get_session_config(string_store, url_name.to_string()).await? { - return Ok((url_name, config)); +pub struct SessionAllocator<'a> { + store: &'a dyn StringStore, +} + +impl<'a> SessionAllocator<'a> { + pub fn new(store: &'a dyn StringStore) -> Self { + Self { store } } - if let Some(forwarded_host) = headers.get(HeaderName::ForwardedHost) { - let forwarded_host_name = first_subdomain(forwarded_host); - if let Some(config) = - get_session_config(string_store, forwarded_host_name.to_string()).await? - { - return Ok((forwarded_host_name, config)); + pub async fn get_request_session( + &self, + url: &str, + headers: &HeaderMap, + ) -> Result<(String, Session), SessionError> { + let url_name = first_subdomain(url); + if let Some(config) = self.get_session_config(url_name.to_string()).await? { + return Ok((url_name, config)); } - } - if let Some(referer) = headers.get("referer") { - let referer_name = first_subdomain(referer); - if let Some(config) = get_session_config(string_store, referer_name.to_string()).await? { - return Ok((referer_name, config)); + if let Some(forwarded_host) = headers.get(HeaderName::ForwardedHost) { + let forwarded_host_name = first_subdomain(forwarded_host); + if let Some(config) = self + .get_session_config(forwarded_host_name.to_string()) + .await? + { + return Ok((forwarded_host_name, config)); + } } - } - if let Some(origin) = headers.get("origin") { - let origin_name = first_subdomain(origin); - if let Some(config) = get_session_config(string_store, origin_name.to_string()).await? { - return Ok((origin_name, config)); + if let Some(referer) = headers.get("referer") { + let referer_name = first_subdomain(referer); + if let Some(config) = self.get_session_config(referer_name.to_string()).await? { + return Ok((referer_name, config)); + } } - } - if let Some(tracestate) = headers.get("tracestate") { - let trace_name = extract_tracestate_session(tracestate); - if let Some(config) = get_session_config(string_store, trace_name.to_string()).await? { - return Ok((trace_name, config)); + if let Some(origin) = headers.get("origin") { + let origin_name = first_subdomain(origin); + if let Some(config) = self.get_session_config(origin_name.to_string()).await? { + return Ok((origin_name, config)); + } } + + if let Some(tracestate) = headers.get("tracestate") { + let trace_name = extract_tracestate_session(tracestate); + if let Some(config) = self.get_session_config(trace_name.to_string()).await? { + return Ok((trace_name, config)); + } + } + + Err(SessionError::NoSuchSession(url.to_string())) } - Err(SessionError::NoSuchSession(url.to_string())) -} + async fn get_session_config(&self, name: String) -> Result, SessionError> { + let value = match self.store.get(name).await { + Ok(Some(v)) => v, + Ok(None) => return Ok(None), + Err(e) => return Err(e), + }; -async fn get_session_config( - string_store: &impl StringStore, - name: String, -) -> Result, SessionError> { - let value = match string_store.get(name).await { - Ok(Some(v)) => v, - Ok(None) => return Ok(None), - Err(e) => return Err(e), - }; + let config_value: serde_json::Value = + serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; - let config_value: serde_json::Value = - serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; + let session_config = config_value + .try_into() + .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; - let session_config = config_value - .try_into() - .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; + Ok(Some(session_config)) + } - Ok(Some(session_config)) -} + pub async fn store_session( + &self, + config: Session, + name_kind: NameKind, + desired_name: String, + ) -> Result { + let name = self + .choose_name(desired_name, config.session_token.clone(), name_kind) + .await?; -pub async fn store_session( - string_store: &impl StringStore, - config: Session, - name_kind: NameKind, - desired_name: String, -) -> Result { - let name = choose_name( - string_store, - desired_name, - config.session_token.clone(), - name_kind, - ) - .await?; - - let config_str = session_to_json(config); - - string_store.put(name.clone(), config_str).await?; - - Ok(name) -} + let config_str = session_to_json(config); -async fn choose_name( - string_store: &impl StringStore, - desired_name: String, - session_token: String, - name_kind: NameKind, -) -> Result { - if desired_name.is_empty() { - return new_session_name(string_store, name_kind, desired_name).await; - } + self.store.put(name.clone(), config_str).await?; - if let Some(session) = get_session_config(string_store, desired_name.clone()).await? { - if session.session_token == session_token { - return Ok(desired_name); - } + Ok(name) } - new_session_name(string_store, name_kind, desired_name).await -} + async fn choose_name( + &self, + desired_name: String, + session_token: String, + name_kind: NameKind, + ) -> Result { + if desired_name.is_empty() { + return self.new_session_name(name_kind, desired_name).await; + } -async fn new_session_name( - string_store: &impl StringStore, - name_kind: NameKind, - desired_name: String, -) -> Result { - let mut key = String::new(); + if let Some(session) = self.get_session_config(desired_name.clone()).await? { + if session.session_token == session_token { + return Ok(desired_name); + } + } - if !desired_name.is_empty() && !string_store.exists(desired_name.clone()).await? { - key = desired_name; + self.new_session_name(name_kind, desired_name).await } - if key.is_empty() { - let mut tried_animal_key = false; - loop { - let generated_key = if !tried_animal_key && name_kind == NameKind::Animal { - tried_animal_key = true; - generate_unique_animal_key(string_store, 20).await? - } else { - random_six_char() - }; - - if !string_store.exists(generated_key.clone()).await? { - key = generated_key; - break; + async fn new_session_name( + &self, + name_kind: NameKind, + desired_name: String, + ) -> Result { + let mut key = String::new(); + + if !desired_name.is_empty() && !self.store.exists(desired_name.clone()).await? { + key = desired_name; + } + + if key.is_empty() { + let mut tried_animal_key = false; + loop { + let generated_key = if !tried_animal_key && name_kind == NameKind::Animal { + tried_animal_key = true; + self.generate_unique_animal_key(20).await? + } else { + random_six_char() + }; + + if !self.store.exists(generated_key.clone()).await? { + key = generated_key; + break; + } } } - } - Ok(key) -} + Ok(key) + } -async fn generate_unique_animal_key( - string_store: &impl StringStore, - max_attempts: usize, -) -> Result { - for _ in 0..max_attempts { - let generated_key = random_animal(); - if !string_store.exists(generated_key.clone()).await? { - return Ok(generated_key); + async fn generate_unique_animal_key( + &self, + max_attempts: usize, + ) -> Result { + for _ in 0..max_attempts { + let generated_key = random_animal(); + if !self.store.exists(generated_key.clone()).await? { + return Ok(generated_key); + } } + // Fallback to SixChar logic + Ok(random_six_char()) } - // Fallback to SixChar logic - Ok(random_six_char()) } diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 923cdbf..f82727b 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -10,9 +10,9 @@ mod kv_store; mod utils; mod ws; -async fn linkup_session_handler( +async fn linkup_session_handler<'a>( mut req: Request, - string_store: &impl StringStore, + session_allocator: &'a SessionAllocator<'a>, ) -> Result { let body_bytes = match req.bytes().await { Ok(bytes) => bytes, @@ -26,8 +26,9 @@ async fn linkup_session_handler( match update_session_req_from_json(input_yaml_conf) { Ok((desired_name, server_conf)) => { - let session_name = - store_session(string_store, server_conf, NameKind::Animal, desired_name).await; + let session_name = session_allocator + .store_session(server_conf, NameKind::Animal, desired_name) + .await; match session_name { Ok(session_name) => Response::ok(session_name), @@ -80,9 +81,9 @@ async fn set_cached_req( Ok(resp) } -async fn linkup_request_handler( +async fn linkup_request_handler<'a>( mut req: Request, - string_store: &impl StringStore, + session_allocator: &'a SessionAllocator<'a>, ) -> Result { let url = match req.url() { Ok(url) => url.to_string(), @@ -92,7 +93,7 @@ async fn linkup_request_handler( let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match get_request_session(string_store, &url, &headers).await { + match session_allocator.get_request_session(&url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; @@ -152,16 +153,17 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result Result { +pub async fn linkup_ws_handler<'a>( + req: Request, + allocator: &SessionAllocator<'a>, +) -> Result { let url = match req.url() { Ok(url) => url.to_string(), Err(_) => return plaintext_error("Bad or missing request url", 400), @@ -17,7 +20,7 @@ pub async fn linkup_ws_handler(req: Request, string_store: &impl StringStore) -> let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match get_request_session(string_store, &url, &headers).await { + match allocator.get_request_session(&url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; From d190d99183d8f7f611c3fa0803fe71fbbee0bd53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 15:28:30 +0200 Subject: [PATCH 3/6] refactor: remove unnecessary Arc on SessionAllocator --- linkup-cli/src/local_server.rs | 15 +++++++++------ linkup/src/lib.rs | 6 +++--- worker/src/lib.rs | 17 +++++++++-------- worker/src/ws.rs | 4 ++-- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/linkup-cli/src/local_server.rs b/linkup-cli/src/local_server.rs index 1de23ee..e1d4739 100644 --- a/linkup-cli/src/local_server.rs +++ b/linkup-cli/src/local_server.rs @@ -22,6 +22,8 @@ async fn linkup_config_handler( string_store: web::Data, req_body: web::Bytes, ) -> impl Responder { + let sessions = SessionAllocator::new(string_store.as_ref()); + let input_json_conf = match String::from_utf8(req_body.to_vec()) { Ok(input_json_conf) => input_json_conf, Err(_) => { @@ -33,8 +35,7 @@ async fn linkup_config_handler( match update_session_req_from_json(input_json_conf) { Ok((desired_name, server_conf)) => { - let session_allocator = SessionAllocator::new(string_store.as_ref()); - let session_name = session_allocator + let session_name = sessions .store_session(server_conf, NameKind::Animal, desired_name) .await; match session_name { @@ -58,11 +59,12 @@ async fn linkup_ws_request_handler( req: HttpRequest, req_stream: web::Payload, ) -> impl Responder { + let sessions = SessionAllocator::new(string_store.as_ref()); + let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_allocator = SessionAllocator::new(string_store.as_ref()); - let session_result = session_allocator.get_request_session(&url, &headers).await; + let session_result = sessions.get_request_session(&url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); @@ -159,11 +161,12 @@ async fn linkup_request_handler( req: HttpRequest, req_body: web::Bytes, ) -> impl Responder { + let sessions = SessionAllocator::new(string_store.as_ref()); + let url = format!("http://localhost:{}{}", LINKUP_LOCALSERVER_PORT, req.uri()); let mut headers = LinkupHeaderMap::from_actix_request(&req); - let session_allocator = SessionAllocator::new(string_store.as_ref()); - let session_result = session_allocator.get_request_session(&url, &headers).await; + let session_result = sessions.get_request_session(&url, &headers).await; if session_result.is_err() { println!("Failed to get session: {:?}", session_result); diff --git a/linkup/src/lib.rs b/linkup/src/lib.rs index 40fac61..3e51b74 100644 --- a/linkup/src/lib.rs +++ b/linkup/src/lib.rs @@ -324,7 +324,7 @@ mod tests { #[tokio::test] async fn test_get_request_session_by_subdomain() { let string_store = MemoryStringStore::new(); - let session_allocator = SessionAllocator::new(&string_store); + let sessions = SessionAllocator::new(&string_store); let config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let config: Session = config_value.try_into().unwrap(); @@ -455,7 +455,7 @@ mod tests { #[tokio::test] async fn test_get_target_url() { let string_store = MemoryStringStore::new(); - let session_allocator = SessionAllocator::new(&string_store); + let sessions = SessionAllocator::new(&string_store); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); @@ -545,7 +545,7 @@ mod tests { #[tokio::test] async fn test_repeatable_rewritten_routes() { let string_store = MemoryStringStore::new(); - let session_allocator = SessionAllocator::new(&string_store); + let sessions = SessionAllocator::new(&string_store); let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); diff --git a/worker/src/lib.rs b/worker/src/lib.rs index f82727b..cea6b75 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -12,7 +12,7 @@ mod ws; async fn linkup_session_handler<'a>( mut req: Request, - session_allocator: &'a SessionAllocator<'a>, + sessions: &'a SessionAllocator<'a>, ) -> Result { let body_bytes = match req.bytes().await { Ok(bytes) => bytes, @@ -26,7 +26,7 @@ async fn linkup_session_handler<'a>( match update_session_req_from_json(input_yaml_conf) { Ok((desired_name, server_conf)) => { - let session_name = session_allocator + let session_name = sessions .store_session(server_conf, NameKind::Animal, desired_name) .await; @@ -83,7 +83,7 @@ async fn set_cached_req( async fn linkup_request_handler<'a>( mut req: Request, - session_allocator: &'a SessionAllocator<'a>, + sessions: &'a SessionAllocator<'a>, ) -> Result { let url = match req.url() { Ok(url) => url.to_string(), @@ -93,7 +93,7 @@ async fn linkup_request_handler<'a>( let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match session_allocator.get_request_session(&url, &headers).await { + match sessions.get_request_session(&url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; @@ -153,17 +153,18 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result( req: Request, - allocator: &SessionAllocator<'a>, + sessions: &'a SessionAllocator<'a>, ) -> Result { let url = match req.url() { Ok(url) => url.to_string(), @@ -20,7 +20,7 @@ pub async fn linkup_ws_handler<'a>( let mut headers = LinkupHeaderMap::from_worker_request(&req); let (session_name, config) = - match allocator.get_request_session(&url, &headers).await { + match sessions.get_request_session(&url, &headers).await { Ok(result) => result, Err(_) => return plaintext_error("Could not find a linkup session for this request. Use a linkup subdomain or context headers like Referer/tracestate", 422), }; From 9cea70e757d399cf6eb5f46246c37c494ae92b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 15:30:57 +0200 Subject: [PATCH 4/6] fix: undo unnecessary method move --- linkup/src/session_allocator.rs | 34 ++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/linkup/src/session_allocator.rs b/linkup/src/session_allocator.rs index 17e2ece..4609beb 100644 --- a/linkup/src/session_allocator.rs +++ b/linkup/src/session_allocator.rs @@ -57,23 +57,6 @@ impl<'a> SessionAllocator<'a> { Err(SessionError::NoSuchSession(url.to_string())) } - async fn get_session_config(&self, name: String) -> Result, SessionError> { - let value = match self.store.get(name).await { - Ok(Some(v)) => v, - Ok(None) => return Ok(None), - Err(e) => return Err(e), - }; - - let config_value: serde_json::Value = - serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; - - let session_config = config_value - .try_into() - .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; - - Ok(Some(session_config)) - } - pub async fn store_session( &self, config: Session, @@ -110,6 +93,23 @@ impl<'a> SessionAllocator<'a> { self.new_session_name(name_kind, desired_name).await } + async fn get_session_config(&self, name: String) -> Result, SessionError> { + let value = match self.store.get(name).await { + Ok(Some(v)) => v, + Ok(None) => return Ok(None), + Err(e) => return Err(e), + }; + + let config_value: serde_json::Value = + serde_json::from_str(&value).map_err(|e| SessionError::ConfigErr(e.to_string()))?; + + let session_config = config_value + .try_into() + .map_err(|e: ConfigError| SessionError::ConfigErr(e.to_string()))?; + + Ok(Some(session_config)) + } + async fn new_session_name( &self, name_kind: NameKind, From d2b47d96ce03cf0cb9cb60ec07fed6036b65aefc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 15:31:50 +0200 Subject: [PATCH 5/6] fix: correct variable name --- linkup/src/lib.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/linkup/src/lib.rs b/linkup/src/lib.rs index 3e51b74..781ecea 100644 --- a/linkup/src/lib.rs +++ b/linkup/src/lib.rs @@ -329,13 +329,13 @@ mod tests { let config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let config: Session = config_value.try_into().unwrap(); - let name = session_allocator + let name = sessions .store_session(config, NameKind::Animal, "".to_string()) .await .unwrap(); // Normal subdomain - session_allocator + sessions .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) .await .unwrap(); @@ -344,7 +344,7 @@ mod tests { let mut referer_headers = HeaderMap::new(); // TODO check header capitalization referer_headers.insert("referer", format!("http://{}.example.com", name)); - session_allocator + sessions .get_request_session("example.com", &referer_headers) .await .unwrap(); @@ -352,7 +352,7 @@ mod tests { // Origin let mut origin_headers = HeaderMap::new(); origin_headers.insert("origin", format!("http://{}.example.com", name)); - session_allocator + sessions .get_request_session("example.com", &origin_headers) .await .unwrap(); @@ -363,14 +363,14 @@ mod tests { HeaderName::TraceState, format!("some-other=xyz,linkup-session={}", name), ); - session_allocator + sessions .get_request_session("example.com", &trace_headers) .await .unwrap(); let mut trace_headers_two = HeaderMap::new(); trace_headers_two.insert(HeaderName::TraceState, format!("linkup-session={}", name)); - session_allocator + sessions .get_request_session("example.com", &trace_headers_two) .await .unwrap(); @@ -460,12 +460,12 @@ mod tests { let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = session_allocator + let name = sessions .store_session(input_config, NameKind::Animal, "".to_string()) .await .unwrap(); - let (name, config) = session_allocator + let (name, config) = sessions .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) .await .unwrap(); @@ -550,12 +550,12 @@ mod tests { let input_config_value: serde_json::Value = serde_json::from_str(CONF_STR).unwrap(); let input_config: Session = input_config_value.try_into().unwrap(); - let name = session_allocator + let name = sessions .store_session(input_config, NameKind::Animal, "".to_string()) .await .unwrap(); - let (name, config) = session_allocator + let (name, config) = sessions .get_request_session(&format!("{}.example.com", name), &HeaderMap::new()) .await .unwrap(); From f0cf5fab004df99910fc54ceca7d88c3670ff8ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Augusto=20C=C3=A9sar?= Date: Sun, 22 Oct 2023 15:39:05 +0200 Subject: [PATCH 6/6] refactor: move session allocator to the branch where is used --- linkup-cli/src/local_server.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/linkup-cli/src/local_server.rs b/linkup-cli/src/local_server.rs index e1d4739..d162632 100644 --- a/linkup-cli/src/local_server.rs +++ b/linkup-cli/src/local_server.rs @@ -22,8 +22,6 @@ async fn linkup_config_handler( string_store: web::Data, req_body: web::Bytes, ) -> impl Responder { - let sessions = SessionAllocator::new(string_store.as_ref()); - let input_json_conf = match String::from_utf8(req_body.to_vec()) { Ok(input_json_conf) => input_json_conf, Err(_) => { @@ -35,6 +33,7 @@ async fn linkup_config_handler( match update_session_req_from_json(input_json_conf) { Ok((desired_name, server_conf)) => { + let sessions = SessionAllocator::new(string_store.as_ref()); let session_name = sessions .store_session(server_conf, NameKind::Animal, desired_name) .await;