-
Notifications
You must be signed in to change notification settings - Fork 503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Event<T>
type that can be used to implement a WinRT event
#1705
Changes from all commits
56c247a
471e9f6
c480c83
a610e5b
7aa57e9
3048046
2d234d9
c43fb9c
b25c49a
cdcec35
0695807
54606ce
5103913
2732a31
9549151
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
use super::*; | ||
use bindings::*; | ||
use std::sync::*; | ||
|
||
/// A type that you can use to declare and implement an event of a specified delegate type. | ||
/// | ||
/// The implementation is thread-safe and designed to avoid contention between events being | ||
/// raised and delegates being added or removed. | ||
pub struct Event<T: Interface + Clone> { | ||
swap: Mutex<()>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the need for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because both locks govern the same memory and their lifetimes are overlapping. The one handles changes and the other replacements. I don't think a |
||
change: Mutex<()>, | ||
delegates: Array<T>, | ||
} | ||
|
||
impl<T: Interface + Clone> Default for Event<T> { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl<T: Interface + Clone> Event<T> { | ||
/// Creates a new, empty `Event<T>`. | ||
pub fn new() -> Self { | ||
Self { delegates: Array::new(), swap: Mutex::default(), change: Mutex::default() } | ||
} | ||
/// Registers a delegate with the event object. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: it would be nice to have a space between methods. That's not required by rustfmt, but is the more typical way things are formatted. |
||
pub fn add(&mut self, delegate: &T) -> Result<i64> { | ||
let mut _lock_free_drop = Array::new(); | ||
Ok({ | ||
let change_lock = self.change.lock().unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm surprised you don't get a warning here for an unused variable. I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If I remove it, compiler complains with |
||
let mut new_delegates = Array::with_capacity(self.delegates.len() + 1)?; | ||
for delegate in self.delegates.as_slice() { | ||
new_delegates.push(delegate.clone()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do the delegates have to be cloned instead of the new delegate just being pushed on to the existing delegates array? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid disrupting a concurrent read of the delegates, a new array needs to be created every time. The tension is between a thread adding/removing a delegate and another thread firing the event and thus cycling through the existing array. The vast majority of event sources have either zero or one handler, and this was determined to be the most efficient implementation. |
||
} | ||
let delegate = Delegate::new(delegate); | ||
let token = delegate.to_token(); | ||
new_delegates.push(delegate); | ||
|
||
let swap_lock = self.swap.lock().unwrap(); | ||
_lock_free_drop = self.delegates.swap(new_delegates); | ||
token | ||
}) | ||
} | ||
/// Revokes a delegate's registration from the event object. | ||
pub fn remove(&mut self, token: i64) -> Result<()> { | ||
let mut _lock_free_drop = Array::new(); | ||
{ | ||
let change_lock = self.change.lock().unwrap(); | ||
if self.delegates.is_empty() { | ||
return Ok(()); | ||
} | ||
let mut capacity = self.delegates.len() - 1; | ||
let mut new_delegates = Array::new(); | ||
let mut removed = false; | ||
if capacity == 0 { | ||
if self.delegates.as_slice()[0].to_token() == token { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we might want to just add indexing directly to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed = self.delegates.as_slice()[0].to_token() == token; |
||
removed = true; | ||
} | ||
} else { | ||
new_delegates = Array::with_capacity(capacity)?; | ||
for delegate in self.delegates.as_slice() { | ||
if !removed && delegate.to_token() == token { | ||
removed = true; | ||
continue; | ||
} | ||
if capacity == 0 { | ||
debug_assert!(!removed); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I would add a string message to the |
||
break; | ||
} | ||
new_delegates.push(delegate.clone()); | ||
capacity -= 1; | ||
} | ||
} | ||
if removed { | ||
let swap_lock = self.swap.lock().unwrap(); | ||
_lock_free_drop = self.delegates.swap(new_delegates); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
/// Clears the event, removing all delegates. | ||
pub fn clear(&mut self) { | ||
let mut _lock_free_drop = Array::new(); | ||
{ | ||
let change_lock = self.change.lock().unwrap(); | ||
if self.delegates.is_empty() { | ||
return; | ||
} | ||
let swap_lock = self.swap.lock().unwrap(); | ||
_lock_free_drop = self.delegates.swap(Array::new()); | ||
} | ||
} | ||
/// Invokes all of the event object's registered delegates with the provided callback. | ||
pub fn call<F: FnMut(&T) -> Result<()>>(&mut self, mut callback: F) -> Result<()> { | ||
let lock_free_calls = { | ||
let swap_lock = self.swap.lock().unwrap(); | ||
self.delegates.clone() | ||
}; | ||
for delegate in lock_free_calls.as_slice() { | ||
if let Err(error) = delegate.call(&mut callback) { | ||
const RPC_E_SERVER_UNAVAILABLE: HRESULT = HRESULT(-2147023174); // HRESULT_FROM_WIN32(RPC_S_SERVER_UNAVAILABLE) | ||
if matches!(error.code(), RPC_E_DISCONNECTED | JSCRIPT_E_CANTEXECUTE | RPC_E_SERVER_UNAVAILABLE) { | ||
self.remove(delegate.to_token())?; | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// A thread-safe reference-counted array of delegates. | ||
struct Array<T: Interface + Clone> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: typically, the bounds on a generic param are only given where they're needed. So, here we would not constrain There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that but Rust complains:
|
||
buffer: *mut Buffer, | ||
len: usize, | ||
_phantom: std::marker::PhantomData<T>, | ||
} | ||
|
||
impl<T: Interface + Clone> Default for Array<T> { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl<T: Interface + Clone> Array<T> { | ||
/// Creates a new, empty `Array<T>` with no capacity. | ||
fn new() -> Self { | ||
Self { buffer: std::ptr::null_mut(), len: 0, _phantom: std::marker::PhantomData } | ||
} | ||
/// Creates a new, empty `Array<T>` with the specified capacity. | ||
fn with_capacity(capacity: usize) -> Result<Self> { | ||
Ok(Self { buffer: Buffer::new(capacity * std::mem::size_of::<Delegate<T>>())?, len: 0, _phantom: std::marker::PhantomData }) | ||
} | ||
/// Swaps the contents of two `Array<T>` objects. | ||
fn swap(&mut self, mut other: Self) -> Self { | ||
unsafe { std::ptr::swap(&mut self.buffer, &mut other.buffer) }; | ||
std::mem::swap(&mut self.len, &mut other.len); | ||
other | ||
} | ||
/// Returns `true` if the array contains no delegates. | ||
fn is_empty(&self) -> bool { | ||
self.len == 0 | ||
} | ||
/// Returns the number of delegates in the array. | ||
fn len(&self) -> usize { | ||
self.len | ||
} | ||
/// Appends a delegate to the back of the array. | ||
fn push(&mut self, delegate: Delegate<T>) { | ||
unsafe { | ||
std::ptr::write((*self.buffer).as_mut_ptr::<Delegate<T>>().add(self.len) as _, delegate); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if we push beyond the capacity of the buffer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bad things. |
||
self.len += 1; | ||
} | ||
} | ||
/// Returns a slice containing of all delegates. | ||
fn as_slice(&self) -> &[Delegate<T>] { | ||
if self.is_empty() { | ||
&[] | ||
} else { | ||
unsafe { std::slice::from_raw_parts((*self.buffer).as_ptr::<Delegate<T>>() as _, self.len) } | ||
} | ||
} | ||
/// Returns a mutable slice of all delegates. | ||
fn as_mut_slice(&mut self) -> &mut [Delegate<T>] { | ||
if self.is_empty() { | ||
&mut [] | ||
} else { | ||
unsafe { std::slice::from_raw_parts_mut((*self.buffer).as_mut_ptr::<Delegate<T>>() as _, self.len) } | ||
} | ||
} | ||
} | ||
|
||
impl<T: Interface + Clone> Clone for Array<T> { | ||
fn clone(&self) -> Self { | ||
if !self.is_empty() { | ||
unsafe { (*self.buffer).0.add_ref() }; | ||
} | ||
Self { buffer: self.buffer, len: self.len, _phantom: std::marker::PhantomData } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I clone this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just bumping the ref count on the same array. Basically, there are now two smart pointers to the same array to support lock-free calling. The |
||
} | ||
} | ||
|
||
impl<T: Interface + Clone> Drop for Array<T> { | ||
fn drop(&mut self) { | ||
unsafe { | ||
if !self.is_empty() && (*self.buffer).0.release() == 0 { | ||
std::ptr::drop_in_place(self.as_mut_slice()); | ||
heap_free(self.buffer as _) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// A reference-counted buffer. | ||
#[repr(C)] | ||
struct Buffer(RefCount); | ||
|
||
impl Buffer { | ||
/// Creates a new `Buffer` with the specified size in bytes. | ||
fn new(size: usize) -> Result<*mut Buffer> { | ||
if size == 0 { | ||
Ok(std::ptr::null_mut()) | ||
} else { | ||
let alloc_size = std::mem::size_of::<Buffer>() + size; | ||
let header = heap_alloc(alloc_size)? as *mut Buffer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other code makes assumptions that this pointer is properly aligned. Since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, |
||
unsafe { | ||
(*header).0 = RefCount::new(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should use |
||
} | ||
Ok(header) | ||
} | ||
} | ||
/// Returns a raw pointer to the buffer's contents. | ||
fn as_ptr<T>(&self) -> *const T { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be useful to note that |
||
unsafe { (self as *const Self).add(1) as *const _ } | ||
} | ||
/// Returns a raw mutable pointer to the buffer's contents. | ||
fn as_mut_ptr<T>(&mut self) -> *mut T { | ||
unsafe { (self as *mut Self).add(1) as *mut _ } | ||
} | ||
} | ||
|
||
/// Holds either a direct or indirect reference to a delegate. A direct reference is typically | ||
/// agile while an indirect reference is an agile wrapper. | ||
#[derive(Clone)] | ||
enum Delegate<T: Interface + Clone> { | ||
Direct(T), | ||
Indirect(AgileReference<T>), | ||
} | ||
|
||
impl<T: Interface + Clone> Delegate<T> { | ||
/// Creates a new `Delegate<T>`, containing a suitable reference to the specified delegate. | ||
fn new(delegate: &T) -> Self { | ||
if delegate.cast::<IAgileObject>().is_err() { | ||
if let Ok(delegate) = AgileReference::new(delegate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a back door that C++ uses to handle Xaml's internal delegates which refuse to support agility or marshaling, but I agree it's a hack and Rust doesn't need to support it. |
||
return Self::Indirect(delegate); | ||
} | ||
} | ||
Self::Direct(delegate.clone()) | ||
} | ||
/// Returns an encoded token to identify the delegate. | ||
fn to_token(&self) -> i64 { | ||
unsafe { | ||
match self { | ||
Self::Direct(delegate) => EncodePointer(std::mem::transmute_copy(delegate)) as _, | ||
Self::Indirect(delegate) => EncodePointer(std::mem::transmute_copy(delegate)) as _, | ||
} | ||
} | ||
} | ||
/// Invokes the delegates with the provided callback. | ||
fn call<F: FnMut(&T) -> Result<()>>(&self, mut callback: F) -> Result<()> { | ||
match self { | ||
Self::Direct(delegate) => callback(delegate), | ||
Self::Indirect(delegate) => callback(&delegate.resolve()?), | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
[package] | ||
name = "test_event" | ||
version = "0.0.0" | ||
authors = ["Microsoft"] | ||
edition = "2021" | ||
|
||
[dependencies.windows] | ||
path = "../../libs/windows" | ||
features = [ | ||
"Foundation", | ||
"Win32_System_WinRT", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just removes the redundant constraint on the
AgileReference
constructor. TheInterface
trait doesn't "know" that it requiresIUnknown
. That's something I'll try to fix separately.