From ffdc27aa784bdf54c2af28e0cd2af540140046e4 Mon Sep 17 00:00:00 2001 From: fpagliughi Date: Sat, 27 Mar 2021 15:45:15 -0400 Subject: [PATCH] Added initial implementation of ThreadedActor. Bumped version to 0.1.1 --- Cargo.toml | 4 +- examples/shared_keyval.rs | 4 +- examples/threaded_keyval.rs | 71 +++++++++++++++++++ examples/unique_id.rs | 2 +- src/actor.rs | 136 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 127 ++------------------------------- src/threaded_actor.rs | 130 ++++++++++++++++++++++++++++++++++ 7 files changed, 348 insertions(+), 126 deletions(-) create mode 100644 examples/threaded_keyval.rs create mode 100644 src/actor.rs create mode 100644 src/threaded_actor.rs diff --git a/Cargo.toml b/Cargo.toml index 0172e73..e033872 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cooper" -version = "0.1.0" +version = "0.1.1" authors = ["Frank Pagliughi "] repository = "https://github.com/fpagliughi/cooper-rs" homepage = "https://github.com/fpagliughi/cooper-rs" @@ -16,5 +16,5 @@ A simple, in-process, async Actor library for Rust. [dependencies] futures = "0.3" smol = "1.2" - +crossbeam-channel = "0.5" diff --git a/examples/shared_keyval.rs b/examples/shared_keyval.rs index b0b5a1a..0c6e236 100644 --- a/examples/shared_keyval.rs +++ b/examples/shared_keyval.rs @@ -38,7 +38,7 @@ impl SharedMap { let key = key.into(); let val = val.into(); - self.actor.cast(|state: &mut State| Box::pin(async move { + self.actor.cast(|state| Box::pin(async move { state.insert(key, val); })).await } @@ -52,7 +52,7 @@ impl SharedMap { { let key = key.into(); - self.actor.call(|state: &mut State| Box::pin(async move { + self.actor.call(|state| Box::pin(async move { state.get(&key).map(|v| v.to_string()) })).await } diff --git a/examples/threaded_keyval.rs b/examples/threaded_keyval.rs new file mode 100644 index 0000000..b7f4052 --- /dev/null +++ b/examples/threaded_keyval.rs @@ -0,0 +1,71 @@ +// cooper-rs/examples/threaded_keyval.rs +// +// This is an example app for the Rust "cooper" library. +// +// Copyright (c) 2021, Frank Pagliughi +// All Rights Reserved +// +// Licensed under the MIT license: +// +// This file may not be copied, modified, or distributed except according +// to those terms. +// + +use std::collections::HashMap; +use cooper::ThreadedActor; + +/// The internal state type for the Actor +type State = HashMap; + +/// An actor that can act as a shared key/value store of strings. +#[derive(Clone)] +pub struct SharedMap { + actor: ThreadedActor, +} + +impl SharedMap { + /// Create a new actor to share a key/value map of string. + pub fn new() -> Self { + Self { actor: ThreadedActor::new() } + } + + /// Insert a value into the shared map. + pub fn insert(&self, key: K, val: V) + where + K: Into, + V: Into, + { + let key = key.into(); + let val = val.into(); + + self.actor.cast(move |state| { + state.insert(key, val); + }); + } + + + /// Gets the value, if any, from the shared map that is + /// associated with the key. + pub fn get>(&self, key: K) -> Option { + let key = key.into(); + self.actor.call(move |state| { + state.get(&key).map(|v| v.to_string()) + }) + } +} + +// -------------------------------------------------------------------------- + +fn main() { + let map = SharedMap::new(); + + println!("Inserting entry 'city'..."); + map.insert("city", "Boston"); + + println!("Retrieving entry..."); + match map.get("city") { + Some(s) => println!("Got: {}", s), + None => println!("Error: No entry found"), + } +} + diff --git a/examples/unique_id.rs b/examples/unique_id.rs index 11e2999..3b8d2ae 100644 --- a/examples/unique_id.rs +++ b/examples/unique_id.rs @@ -27,7 +27,7 @@ impl UniqueId { /// Gets a unique ID as the next integer value in the sequence. pub async fn get_unique_id(&self) -> u32 { - self.actor.call(|state: &mut u32| Box::pin(async move { + self.actor.call(|state| Box::pin(async move { *state += 1; *state })).await diff --git a/src/actor.rs b/src/actor.rs new file mode 100644 index 0000000..6849d21 --- /dev/null +++ b/src/actor.rs @@ -0,0 +1,136 @@ +// cooper/src/actor.rs +// +// Copyright (c) 2021, Frank Pagliughi +// All Rights Reserved +// +// Licensed under the MIT license: +// +// This file may not be copied, modified, or distributed except according +// to those terms. +// +//! cooper + +use std::fmt::Debug; +use futures::future::BoxFuture; +use smol::{ + channel::{ + self, + Sender, + Receiver, + }, +}; + +/// Message type for the Actor. +/// This wraps an async function type that takes a mutable reference to a +/// state object. Implementations of actor objects can queue functions and +/// closures to process the state. +/// `S` is the internal state type for the actor to manage +struct Message { + func: Box FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>, +} + +/// The Actor. +/// +/// This is an async command processor that serializes requests around an +/// internal state. Each request runs to completion, atomically, in the +/// order received, and thus tasks do not need to lock or protect the state +/// for access. +#[derive(Clone)] +pub struct Actor +where + S: Send + 'static +{ + /// The channel to send requests to the actor's processor task. + tx: Sender>, +} + +impl Actor +where + S: Default + Send + 'static +{ + /// Create a new actor with a default state + pub fn new() -> Self { + Self::from_state(S::default()) + } +} + +impl Actor +where + S: Send + 'static +{ + /// Creates a new actor from an initial state + pub fn from_state(state: S) -> Self { + let (tx, rx) = channel::unbounded(); + + // TODO: Stash the handle somewhere? + // Perhaps make a registry of running actors? + smol::spawn(async move { + Self::run(state, rx).await + }).detach(); + + Self { tx } + } + + /// The actor's command processor. + /// + /// This runs each request for the actor to completion before + /// running the next one. + async fn run(mut state: S, rx: Receiver>) { + while let Ok(msg) = rx.recv().await { + (msg.func)(&mut state).await; + } + } + + /// This is a totally asynchronous opertion. Awaiting the returned + /// future only waits for the operation to be placed in the queue. + /// It does not wait for the operation to be executed. + pub async fn cast(&self, f: F) + where + F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>, + F: 'static + Send, + { + let msg = Message { + func: Box::new(move |state| Box::pin(async move { + f(state).await; + })) + }; + + // TODO: Should we at least log the error? + let _ = self.tx.send(msg).await; + } + + /// A call is a synchronous opertion within the async task. + /// It will queue the request, wait for it to execute, and + /// return the result. + pub async fn call(&self, f: F) -> R + where + F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>, + F: 'static + Send, + R: 'static + Send + Debug, + { + let (tx, rx) = channel::bounded(1); + let msg = Message { + func: Box::new(move |state| Box::pin(async move { + let res = f(state).await; + let _ = tx.send(res).await; + })) + }; + + let _ = self.tx.send(msg).await; + // TODO: Return an error instead of panicking + rx.recv().await + .expect("Actor is gone") + } + + /// Blocks the calling task until all requests up to this point have + /// been processed. + /// + /// Note that if there are clones of the actor, additional requests + /// may get queued after this one, so the queue is not guaranteed to be + /// empty when this returns; just that all the requests prior to this one + /// have completed. + pub async fn flush(&self) { + self.call(|_| Box::pin(async move {})).await + } +} + diff --git a/src/lib.rs b/src/lib.rs index 6d2b8f4..974b276 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ // cooper/src/lib/rs // +// This file is part of the `cooper-rs` library. +// // Copyright (c) 2021, Frank Pagliughi // All Rights Reserved // @@ -10,126 +12,9 @@ // //! cooper -use std::fmt::Debug; -use futures::future::BoxFuture; -use smol::{ - channel::{ - self, - Sender, - Receiver, - }, -}; - -/// Message type for the Actor. -/// This wraps an async function type that takes a mutable reference to a -/// state object. Implementations of actor objects can queue functions and -/// closures to process the state. -/// `S` is the internal state type for the actor to manage -struct Message { - func: Box FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>, -} - -/// The Actor. -/// -/// This is an async command processor that serializes requests around an -/// internal state. Each request runs to completion, atomically, in the -/// order received, and thus tasks do not need to lock or protect the state -/// for access. -#[derive(Clone)] -pub struct Actor -where - S: Send + 'static -{ - /// The channel to send requests to the actor's processor task. - tx: Sender>, -} - -impl Actor -where - S: Default + Send + 'static -{ - /// Create a new actor with a default state - pub fn new() -> Self { - Self::from_state(S::default()) - } -} - -impl Actor -where - S: Send + 'static -{ - /// Creates a new actor from an initial state - pub fn from_state(state: S) -> Self { - let (tx, rx) = channel::unbounded(); - - // TODO: Stash the handle somewhere? - // Perhaps make a registry of running actors? - smol::spawn(async move { - Self::run(state, rx).await - }).detach(); - - Self { tx } - } - - /// The actor's command processor. - /// This runs each request for the actor to completion before - /// running the next one. - async fn run(mut state: S, rx: Receiver>) { - while let Ok(msg) = rx.recv().await { - (msg.func)(&mut state).await; - } - } - - /// This is a totally asynchronous opertion. Awaiting the returned - /// future only waits for the operation to be placed in the queue. - /// It does not wait for the operation to be executed. - pub async fn cast(&self, f: F) - where - F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>, - F: 'static + Send, - { - let msg = Message { - func: Box::new(move |state| Box::pin(async move { - f(state).await; - })) - }; - - // TODO: Should we at least log the error? - let _ = self.tx.send(msg).await; - } - - /// A call is a synchronous opertion within the async task. - /// It will queue the request, wait for it to execute, and - /// return the result. - pub async fn call(&self, f: F) -> R - where - F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>, - F: 'static + Send, - R: 'static + Send + Debug, - { - let (tx, rx) = channel::bounded(1); - let msg = Message { - func: Box::new(move |state| Box::pin(async move { - let res = f(state).await; - let _ = tx.send(res).await; - })) - }; - - let _ = self.tx.send(msg).await; - // TODO: Return an error instead of panicking - rx.recv().await - .expect("Actor is gone") - } +mod actor; +mod threaded_actor; - /// Blocks the calling task until all requests up to this point have - /// been processed. - /// - /// Note that if there are clones of the actor, additional requests - /// may get queued after this one, so the queue is not guaranteed to be - /// empty when this returns; just that all the requests prior to this one - /// have completed. - pub async fn flush(&self) { - self.call(|_| Box::pin(async move {})).await - } -} +pub use actor::*; +pub use threaded_actor::*; diff --git a/src/threaded_actor.rs b/src/threaded_actor.rs new file mode 100644 index 0000000..22923c2 --- /dev/null +++ b/src/threaded_actor.rs @@ -0,0 +1,130 @@ +// cooper/src/threaded_actor.rs +// +// This file is part of the `cooper-rs` library. +// +// Copyright (c) 2021, Frank Pagliughi +// All Rights Reserved +// +// Licensed under the MIT license: +// +// This file may not be copied, modified, or distributed except according +// to those terms. + +use std::thread; +use crossbeam_channel::{ + self as channel, + Receiver, + Sender, +}; + +/// The type of function that can be sent to a `ThreadedActor`. +type Task = dyn FnOnce(&mut T) -> R + Send; + +/// The boxed verion of the function for a `ThreadedActor`. +type BoxedTask = Box>; + +/// The type of task that can be queued to the `ThreadedActor`. +/// This erases any return value from the user's function. A call() to the +/// actor must wrap the user's function and send the return value back to +/// the caller through a channel. +type QueueTask = BoxedTask; + +// -------------------------------------------------------------------------- + +/// An actor that uses an OS thread-per-instance. +/// +/// This may be useful if the application only needs a few actors and doesn' +/// otherwise use an async runtime. Or it can be used in an async context if +/// an actor needs to block or is compute intensive and requires its own thread. +#[derive(Clone)] +pub struct ThreadedActor { + /// A transmit channel to send requests to the actor thread. + tx: Sender>, +} + +impl ThreadedActor +where + T: Default + Send + 'static, +{ + /// Creates an actor with a default initial state. + /// + /// This requires the state type to implement the Default trait. + pub fn new() -> Self { + Self::from_state(T::default()) + } +} + +impl ThreadedActor +where + T: Send + 'static, +{ + /// Creates a threaded actor with the specified initial state. + pub fn from_state(state: T) -> Self { + let (tx, rx) = channel::unbounded(); + + thread::spawn(move || { + Self::thr_func(state, rx); + }); + + Self { tx } + } + + /// The thread function for the actor. + /// + /// This runs in its own OS thread. + fn thr_func(mut val: T, rx: Receiver>) { + for f in rx { + f(&mut val); + } + } + + /// Send an asynchronous request to the actor. + /// + /// This queues the request and returns immediately. + pub fn cast(&self, f: F) + where + F: FnOnce(&mut T) -> () + Send + 'static, + { + self.tx.send(Box::new(f)).unwrap(); + } + + pub fn call(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R + Send + 'static, + R: Send + 'static + { + let (tx, rx) = channel::unbounded(); + self.tx.send(Box::new(move |val: &mut T| { + let res = f(val); + tx.send(res).unwrap(); + })).unwrap(); + + rx.recv().unwrap() + } +} + +// -------------------------------------------------------------------------- + +/* +fn main() { + println!("Initializing..."); + + println!(); + println!("size_of(ptr): {}", mem::size_of::<&u32>()); + println!("size_of(Task): {}", mem::size_of::>()); + println!(); + + let actor = ThreadedActor::::new(); + + actor.cast(|val| { *val += 1; }); + actor.cast(|val| { *val += 2; }); + + let v = actor.call(|val| { *val }); + println!("Value: {}", v); + + println!("\nCleaning up..."); + drop(actor); + + println!("Done"); +} +*/