From bf0dbaf343023e2ec6a303f3fe1f6baca6c1e67c Mon Sep 17 00:00:00 2001 From: Dervex Date: Thu, 11 Apr 2024 22:48:46 +0200 Subject: [PATCH] Add `write` API endpoint --- src/core/changes.rs | 4 ++-- src/core/meta.rs | 14 +++++++++++++- src/core/mod.rs | 8 ++++++-- src/core/processor.rs | 17 ++++++++++++++--- src/core/snapshot.rs | 8 ++++---- src/server/mod.rs | 2 ++ src/server/write.rs | 28 ++++++++++++++++++++++++++++ 7 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 src/server/write.rs diff --git a/src/core/changes.rs b/src/core/changes.rs index 6bbd8f1..2335507 100644 --- a/src/core/changes.rs +++ b/src/core/changes.rs @@ -1,9 +1,9 @@ use rbx_dom_weak::types::Ref; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use super::snapshot::{AddedSnapshot, Snapshot, UpdatedSnapshot}; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Changes { pub additions: Vec, pub updates: Vec, diff --git a/src/core/meta.rs b/src/core/meta.rs index c85b064..1cbef57 100644 --- a/src/core/meta.rs +++ b/src/core/meta.rs @@ -117,6 +117,12 @@ impl Source { } } +impl Default for Source { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug, Clone)] pub struct ResolvedSyncRule { pub file_type: FileType, @@ -278,7 +284,13 @@ impl Context { } } -#[derive(Debug, Clone, PartialEq, Serialize)] +impl Default for Context { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Meta { /// Instance source that is guaranteed to exist diff --git a/src/core/mod.rs b/src/core/mod.rs index 5750256..c338ea3 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -28,7 +28,7 @@ pub struct Core { project: Arc>, tree: Arc>, queue: Arc, - _processor: Arc, + processor: Arc, _vfs: Arc, } @@ -68,7 +68,7 @@ impl Core { project, tree, queue, - _processor: processor, + processor, _vfs: vfs, }) } @@ -97,6 +97,10 @@ impl Core { self.queue.clone() } + pub fn processor(&self) -> Arc { + self.processor.clone() + } + /// Create snapshot of the tree pub fn snapshot(&self) -> Snapshot { let tree = lock!(self.tree); diff --git a/src/core/processor.rs b/src/core/processor.rs index affa1e5..0605f83 100644 --- a/src/core/processor.rs +++ b/src/core/processor.rs @@ -1,4 +1,4 @@ -use crossbeam_channel::select; +use crossbeam_channel::{select, Sender}; use log::{debug, error, info, trace, warn}; use rbx_dom_weak::types::Ref; use std::{ @@ -22,7 +22,9 @@ use crate::{ BLACKLISTED_PATHS, }; -pub struct Processor {} +pub struct Processor { + writer: Sender, +} impl Processor { pub fn new(queue: Arc, tree: Arc>, vfs: Arc, project: Arc>) -> Self { @@ -34,23 +36,32 @@ impl Processor { }); let handler = handler.clone(); + let (sender, receiver) = crossbeam_channel::unbounded(); Builder::new() .name("processor".to_owned()) .spawn(move || { let vfs_receiver = vfs.receiver(); + let client_receiver = receiver; loop { select! { recv(vfs_receiver) -> event => { handler.on_vfs_event(event.unwrap()); } + recv(client_receiver) -> changes => { + println!("{:#?}", changes); + } } } }) .unwrap(); - Self {} + Self { writer: sender } + } + + pub fn write(&self, changes: Changes) { + self.writer.send(changes).unwrap(); } } diff --git a/src/core/snapshot.rs b/src/core/snapshot.rs index cc08c2d..2d141bb 100644 --- a/src/core/snapshot.rs +++ b/src/core/snapshot.rs @@ -2,7 +2,7 @@ use rbx_dom_weak::{ types::{Ref, Variant}, Instance, WeakDom, }; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, fmt::{self, Debug, Formatter}, @@ -12,7 +12,7 @@ use crate::middleware::data::DataSnapshot; use super::meta::Meta; -#[derive(Clone, Serialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct Snapshot { pub id: Ref, pub meta: Meta, @@ -191,7 +191,7 @@ impl Debug for Snapshot { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AddedSnapshot { pub id: Ref, pub meta: Meta, @@ -202,7 +202,7 @@ pub struct AddedSnapshot { pub children: Vec, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct UpdatedSnapshot { pub id: Ref, pub meta: Option, diff --git a/src/server/mod.rs b/src/server/mod.rs index ff04742..b906116 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -15,6 +15,7 @@ mod snapshot; mod stop; mod subscribe; mod unsubscribe; +mod write; async fn default_redirect() -> impl Responder { web::Redirect::to("/") @@ -48,6 +49,7 @@ impl Server { .service(home::main) .service(stop::main) .service(read::main) + .service(write::main) .service(snapshot::main) .service(exec::main) .service(open::main) diff --git a/src/server/write.rs b/src/server/write.rs new file mode 100644 index 0000000..5495d93 --- /dev/null +++ b/src/server/write.rs @@ -0,0 +1,28 @@ +use actix_msgpack::MsgPack; +use actix_web::{post, web::Data, HttpResponse, Responder}; +use serde::Deserialize; +use std::sync::Arc; + +use crate::core::{changes::Changes, Core}; + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct Request { + client_id: u32, + changes: Changes, +} + +#[post("/write")] +async fn main(request: MsgPack, core: Data>) -> impl Responder { + println!("{:#?}", 1); + let id = request.client_id; + let queue = core.queue(); + + if !queue.is_subscribed(id) { + return HttpResponse::Unauthorized().body("Not subscribed"); + } + + core.processor().write(request.changes.clone()); + + HttpResponse::Ok().body("Written changes successfully") +}