Skip to content

Commit

Permalink
Added initial implementation of ThreadedActor. Bumped version to 0.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Mar 27, 2021
1 parent f8dd1ff commit ffdc27a
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 126 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cooper"
version = "0.1.0"
version = "0.1.1"
authors = ["Frank Pagliughi <[email protected]>"]
repository = "https://github.com/fpagliughi/cooper-rs"
homepage = "https://github.com/fpagliughi/cooper-rs"
Expand All @@ -16,5 +16,5 @@ A simple, in-process, async Actor library for Rust.
[dependencies]
futures = "0.3"
smol = "1.2"

crossbeam-channel = "0.5"

4 changes: 2 additions & 2 deletions examples/shared_keyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl SharedMap {
let key = key.into();
let val = val.into();

self.actor.cast(|state: &mut State| Box::pin(async move {
self.actor.cast(|state| Box::pin(async move {
state.insert(key, val);
})).await
}
Expand All @@ -52,7 +52,7 @@ impl SharedMap {
{
let key = key.into();

self.actor.call(|state: &mut State| Box::pin(async move {
self.actor.call(|state| Box::pin(async move {
state.get(&key).map(|v| v.to_string())
})).await
}
Expand Down
71 changes: 71 additions & 0 deletions examples/threaded_keyval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// cooper-rs/examples/threaded_keyval.rs
//
// This is an example app for the Rust "cooper" library.
//
// Copyright (c) 2021, Frank Pagliughi <[email protected]>
// All Rights Reserved
//
// Licensed under the MIT license:
// <LICENSE or http://opensource.org/licenses/MIT>
// This file may not be copied, modified, or distributed except according
// to those terms.
//

use std::collections::HashMap;
use cooper::ThreadedActor;

/// The internal state type for the Actor
type State = HashMap<String, String>;

/// An actor that can act as a shared key/value store of strings.
#[derive(Clone)]
pub struct SharedMap {
actor: ThreadedActor<State>,
}

impl SharedMap {
/// Create a new actor to share a key/value map of string.
pub fn new() -> Self {
Self { actor: ThreadedActor::new() }
}

/// Insert a value into the shared map.
pub fn insert<K,V>(&self, key: K, val: V)
where
K: Into<String>,
V: Into<String>,
{
let key = key.into();
let val = val.into();

self.actor.cast(move |state| {
state.insert(key, val);
});
}


/// Gets the value, if any, from the shared map that is
/// associated with the key.
pub fn get<K: Into<String>>(&self, key: K) -> Option<String> {
let key = key.into();
self.actor.call(move |state| {
state.get(&key).map(|v| v.to_string())
})
}
}

// --------------------------------------------------------------------------

fn main() {
let map = SharedMap::new();

println!("Inserting entry 'city'...");
map.insert("city", "Boston");

println!("Retrieving entry...");
match map.get("city") {
Some(s) => println!("Got: {}", s),
None => println!("Error: No entry found"),
}
}

2 changes: 1 addition & 1 deletion examples/unique_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl UniqueId {

/// Gets a unique ID as the next integer value in the sequence.
pub async fn get_unique_id(&self) -> u32 {
self.actor.call(|state: &mut u32| Box::pin(async move {
self.actor.call(|state| Box::pin(async move {
*state += 1;
*state
})).await
Expand Down
136 changes: 136 additions & 0 deletions src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// cooper/src/actor.rs
//
// Copyright (c) 2021, Frank Pagliughi <[email protected]>
// All Rights Reserved
//
// Licensed under the MIT license:
// <LICENSE or http://opensource.org/licenses/MIT>
// This file may not be copied, modified, or distributed except according
// to those terms.
//
//! cooper

use std::fmt::Debug;
use futures::future::BoxFuture;
use smol::{
channel::{
self,
Sender,
Receiver,
},
};

/// Message type for the Actor.
/// This wraps an async function type that takes a mutable reference to a
/// state object. Implementations of actor objects can queue functions and
/// closures to process the state.
/// `S` is the internal state type for the actor to manage
struct Message<S> {
func: Box<dyn for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>,
}

/// The Actor.
///
/// This is an async command processor that serializes requests around an
/// internal state. Each request runs to completion, atomically, in the
/// order received, and thus tasks do not need to lock or protect the state
/// for access.
#[derive(Clone)]
pub struct Actor<S>
where
S: Send + 'static
{
/// The channel to send requests to the actor's processor task.
tx: Sender<Message<S>>,
}

impl<S> Actor<S>
where
S: Default + Send + 'static
{
/// Create a new actor with a default state
pub fn new() -> Self {
Self::from_state(S::default())
}
}

impl<S> Actor<S>
where
S: Send + 'static
{
/// Creates a new actor from an initial state
pub fn from_state(state: S) -> Self {
let (tx, rx) = channel::unbounded();

// TODO: Stash the handle somewhere?
// Perhaps make a registry of running actors?
smol::spawn(async move {
Self::run(state, rx).await
}).detach();

Self { tx }
}

/// The actor's command processor.
///
/// This runs each request for the actor to completion before
/// running the next one.
async fn run(mut state: S, rx: Receiver<Message<S>>) {
while let Ok(msg) = rx.recv().await {
(msg.func)(&mut state).await;
}
}

/// This is a totally asynchronous opertion. Awaiting the returned
/// future only waits for the operation to be placed in the queue.
/// It does not wait for the operation to be executed.
pub async fn cast<F>(&self, f: F)
where
F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>,
F: 'static + Send,
{
let msg = Message {
func: Box::new(move |state| Box::pin(async move {
f(state).await;
}))
};

// TODO: Should we at least log the error?
let _ = self.tx.send(msg).await;
}

/// A call is a synchronous opertion within the async task.
/// It will queue the request, wait for it to execute, and
/// return the result.
pub async fn call<F, R>(&self, f: F) -> R
where
F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>,
F: 'static + Send,
R: 'static + Send + Debug,
{
let (tx, rx) = channel::bounded(1);
let msg = Message {
func: Box::new(move |state| Box::pin(async move {
let res = f(state).await;
let _ = tx.send(res).await;
}))
};

let _ = self.tx.send(msg).await;
// TODO: Return an error instead of panicking
rx.recv().await
.expect("Actor is gone")
}

/// Blocks the calling task until all requests up to this point have
/// been processed.
///
/// Note that if there are clones of the actor, additional requests
/// may get queued after this one, so the queue is not guaranteed to be
/// empty when this returns; just that all the requests prior to this one
/// have completed.
pub async fn flush(&self) {
self.call(|_| Box::pin(async move {})).await
}
}

127 changes: 6 additions & 121 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// cooper/src/lib/rs
//
// This file is part of the `cooper-rs` library.
//
// Copyright (c) 2021, Frank Pagliughi <[email protected]>
// All Rights Reserved
//
Expand All @@ -10,126 +12,9 @@
//
//! cooper

use std::fmt::Debug;
use futures::future::BoxFuture;
use smol::{
channel::{
self,
Sender,
Receiver,
},
};

/// Message type for the Actor.
/// This wraps an async function type that takes a mutable reference to a
/// state object. Implementations of actor objects can queue functions and
/// closures to process the state.
/// `S` is the internal state type for the actor to manage
struct Message<S> {
func: Box<dyn for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()> + Send>,
}

/// The Actor.
///
/// This is an async command processor that serializes requests around an
/// internal state. Each request runs to completion, atomically, in the
/// order received, and thus tasks do not need to lock or protect the state
/// for access.
#[derive(Clone)]
pub struct Actor<S>
where
S: Send + 'static
{
/// The channel to send requests to the actor's processor task.
tx: Sender<Message<S>>,
}

impl<S> Actor<S>
where
S: Default + Send + 'static
{
/// Create a new actor with a default state
pub fn new() -> Self {
Self::from_state(S::default())
}
}

impl<S> Actor<S>
where
S: Send + 'static
{
/// Creates a new actor from an initial state
pub fn from_state(state: S) -> Self {
let (tx, rx) = channel::unbounded();

// TODO: Stash the handle somewhere?
// Perhaps make a registry of running actors?
smol::spawn(async move {
Self::run(state, rx).await
}).detach();

Self { tx }
}

/// The actor's command processor.
/// This runs each request for the actor to completion before
/// running the next one.
async fn run(mut state: S, rx: Receiver<Message<S>>) {
while let Ok(msg) = rx.recv().await {
(msg.func)(&mut state).await;
}
}

/// This is a totally asynchronous opertion. Awaiting the returned
/// future only waits for the operation to be placed in the queue.
/// It does not wait for the operation to be executed.
pub async fn cast<F>(&self, f: F)
where
F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, ()>,
F: 'static + Send,
{
let msg = Message {
func: Box::new(move |state| Box::pin(async move {
f(state).await;
}))
};

// TODO: Should we at least log the error?
let _ = self.tx.send(msg).await;
}

/// A call is a synchronous opertion within the async task.
/// It will queue the request, wait for it to execute, and
/// return the result.
pub async fn call<F, R>(&self, f: F) -> R
where
F: for<'a> FnOnce(&'a mut S) -> BoxFuture<'a, R>,
F: 'static + Send,
R: 'static + Send + Debug,
{
let (tx, rx) = channel::bounded(1);
let msg = Message {
func: Box::new(move |state| Box::pin(async move {
let res = f(state).await;
let _ = tx.send(res).await;
}))
};

let _ = self.tx.send(msg).await;
// TODO: Return an error instead of panicking
rx.recv().await
.expect("Actor is gone")
}
mod actor;
mod threaded_actor;

/// Blocks the calling task until all requests up to this point have
/// been processed.
///
/// Note that if there are clones of the actor, additional requests
/// may get queued after this one, so the queue is not guaranteed to be
/// empty when this returns; just that all the requests prior to this one
/// have completed.
pub async fn flush(&self) {
self.call(|_| Box::pin(async move {})).await
}
}
pub use actor::*;
pub use threaded_actor::*;

Loading

0 comments on commit ffdc27a

Please sign in to comment.