Skip to content

Commit

Permalink
Expose the user identities and devices streams in the main crate
Browse files Browse the repository at this point in the history
  • Loading branch information
poljar committed Sep 6, 2023
1 parent 8e48951 commit 4a01294
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 18 deletions.
45 changes: 43 additions & 2 deletions crates/matrix-sdk/src/encryption/identities/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::{collections::BTreeMap, ops::Deref};

use matrix_sdk_base::crypto::{
store::CryptoStoreError, Device as BaseDevice, LocalTrust, ReadOnlyDevice,
UserDevices as BaseUserDevices,
};
use ruma::{events::key::verification::VerificationMethod, DeviceId};
use ruma::{events::key::verification::VerificationMethod, DeviceId, OwnedDeviceId, OwnedUserId};

use super::ManualVerifyError;
use crate::{
Expand All @@ -27,6 +27,47 @@ use crate::{
Client,
};

/// Updates about [`Device`]s which got received over the `/keys/query`
/// endpoint.
#[derive(Clone, Debug, Default)]
pub struct DeviceUpdates {
/// The list of newly discovered devices.
///
/// A device being in this list does not necessarily mean that the device
/// was just created, it just means that it's the first time we're
/// seeing this device.
pub new: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
/// The list of changed devices.
pub changed: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
}

impl DeviceUpdates {
pub(crate) fn new(
client: Client,
updates: matrix_sdk_base::crypto::store::DeviceUpdates,
) -> Self {
let map_devices = |(user_id, devices)| {
// For some reason we need to tell Rust the type of `devices`.
let devices: BTreeMap<_, _> = devices;

(
user_id,
devices
.into_iter()
.map(|(device_id, device)| {
(device_id, Device { inner: device, client: client.to_owned() })
})
.collect(),
)
};

let new = updates.new.into_iter().map(map_devices).collect();
let changed = updates.changed.into_iter().map(map_devices).collect();

DeviceUpdates { new, changed }
}
}

/// A device represents a E2EE capable client or device of an user.
///
/// A `Device` is backed by [device keys] that are uploaded to the server.
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/encryption/identities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
mod devices;
mod users;

pub use devices::{Device, UserDevices};
pub use devices::{Device, DeviceUpdates, UserDevices};
pub use matrix_sdk_base::crypto::types::MasterPubkey;
pub use users::UserIdentity;
pub use users::{IdentityUpdates, UserIdentity};

/// Error for the manual verification step, when we manually sign users or
/// devices.
Expand Down
62 changes: 49 additions & 13 deletions crates/matrix-sdk/src/encryption/identities/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::collections::BTreeMap;

use matrix_sdk_base::{
crypto::{
types::MasterPubkey, OwnUserIdentity as InnerOwnUserIdentity,
UserIdentity as InnerUserIdentity,
UserIdentities as InnerUserIdentities, UserIdentity as InnerUserIdentity,
},
RoomMemberships,
};
Expand All @@ -26,12 +26,46 @@ use ruma::{
key::verification::VerificationMethod,
room::message::{MessageType, RoomMessageEventContent},
},
UserId,
OwnedUserId, UserId,
};
use tokio::sync::RwLock;

use super::{ManualVerifyError, RequestVerificationError};
use crate::{encryption::verification::VerificationRequest, Client, Room};
use crate::{encryption::verification::VerificationRequest, Client};

/// Updates about [`UserIdentity`]s which got received over the `/keys/query`
/// endpoint.
#[derive(Clone, Debug, Default)]
pub struct IdentityUpdates {
/// The list of newly discovered user identities .
///
/// A identity being in this list does not necessarily mean that the
/// identity was just created, it just means that it's the first time
/// we're seeing this identity.
pub new: BTreeMap<OwnedUserId, UserIdentity>,
/// The list of changed identities.
pub changed: BTreeMap<OwnedUserId, UserIdentity>,
}

impl IdentityUpdates {
pub(crate) fn new(
client: Client,
updates: matrix_sdk_base::crypto::store::IdentityUpdates,
) -> Self {
let new = updates
.new
.into_iter()
.map(|(user_id, identity)| (user_id, UserIdentity::new(client.to_owned(), identity)))
.collect();

let changed = updates
.changed
.into_iter()
.map(|(user_id, identity)| (user_id, UserIdentity::new(client.to_owned(), identity)))
.collect();

Self { new, changed }
}
}

/// A struct representing a E2EE capable identity of a user.
///
Expand Down Expand Up @@ -72,18 +106,21 @@ pub struct UserIdentity {
}

impl UserIdentity {
fn new(client: Client, identity: InnerUserIdentities) -> Self {
match identity {
InnerUserIdentities::Own(i) => Self::new_own(client, i),
InnerUserIdentities::Other(i) => Self::new_other(client, i),
}
}

pub(crate) fn new_own(client: Client, identity: InnerOwnUserIdentity) -> Self {
let identity = OwnUserIdentity { inner: identity, client };

Self { inner: identity.into() }
}

pub(crate) fn new(client: Client, identity: InnerUserIdentity, room: Option<Room>) -> Self {
let identity = OtherUserIdentity {
inner: identity,
client,
direct_message_room: RwLock::new(room).into(),
};
pub(crate) fn new_other(client: Client, identity: InnerUserIdentity) -> Self {
let identity = OtherUserIdentity { inner: identity, client };

Self { inner: identity.into() }
}
Expand Down Expand Up @@ -425,7 +462,6 @@ struct OwnUserIdentity {
struct OtherUserIdentity {
pub(crate) inner: InnerUserIdentity,
pub(crate) client: Client,
pub(crate) direct_message_room: Arc<RwLock<Option<Room>>>,
}

impl OwnUserIdentity {
Expand Down Expand Up @@ -462,7 +498,7 @@ impl OtherUserIdentity {
) -> Result<VerificationRequest, RequestVerificationError> {
let content = self.inner.verification_request_content(methods.clone()).await;

let room = if let Some(room) = self.direct_message_room.read().await.as_ref() {
let room = if let Some(room) = self.client.get_dm_room(self.inner.user_id()) {
// Make sure that the user, to be verified, is still in the room
if !room
.members(RoomMemberships::ACTIVE)
Expand Down
83 changes: 82 additions & 1 deletion crates/matrix-sdk/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
};

use eyeball::SharedObservable;
use futures_core::Stream;
use futures_util::{
future::try_join,
stream::{self, StreamExt},
Expand Down Expand Up @@ -82,6 +83,7 @@ pub use matrix_sdk_base::crypto::{
};

pub use self::futures::PrepareEncryptedFile;
use self::identities::{DeviceUpdates, IdentityUpdates};
pub use crate::error::RoomKeyImportError;

impl Client {
Expand Down Expand Up @@ -659,11 +661,90 @@ impl Encryption {
UserIdentity::new_own(self.client.clone(), i)
}
matrix_sdk_base::crypto::UserIdentities::Other(i) => {
UserIdentity::new(self.client.clone(), i, self.client.get_dm_room(user_id))
UserIdentity::new_other(self.client.clone(), i)
}
}))
}

/// Returns a stream of device updates, allowing users to listen for
/// notifications about new or changed devices.
///
/// The stream produced by this method emits updates whenever a new device
/// is discovered or when an existing device's information is changed. Users
/// can subscribe to this stream and receive updates in real-time.
///
/// # Examples
///
/// ```no_run
/// # use matrix_sdk::Client;
/// # use ruma::{device_id, user_id};
/// # use futures_util::{pin_mut, StreamExt};
/// # let client: Client = unimplemented!();
/// # async {
/// let devices_stream = client.encryption().devices_stream().await?;
/// let user_id = client
/// .user_id()
/// .expect("We should know our user id afte we have logged in");
/// pin_mut!(devices_stream);
///
/// for device_updates in devices_stream.next().await {
/// if let Some(user_devices) = device_updates.new.get(user_id) {
/// for device in user_devices.values() {
/// println!("A new device has been added {}", device.device_id());
/// }
/// }
/// }
/// # anyhow::Ok(()) };
/// ```
pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
let olm = self.client.olm_machine().await;
let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
let client = self.client.to_owned();

Ok(olm
.store()
.devices_stream()
.map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
}

/// Returns a stream of user identity updates, allowing users to listen for
/// notifications about new or changed user identities.
///
/// The stream produced by this method emits updates whenever a new user
/// identity is discovered or when an existing identities information is
/// changed. Users can subscribe to this stream and receive updates in
/// real-time.
///
/// # Examples
///
/// ```no_run
/// # use matrix_sdk::Client;
/// # use ruma::{device_id, user_id};
/// # use futures_util::{pin_mut, StreamExt};
/// # let client: Client = unimplemented!();
/// # async {
/// let identities_stream =
/// client.encryption().user_identities_stream().await?;
/// pin_mut!(identities_stream);
///
/// for identity_updates in identities_stream.next().await {
/// for (_, identity) in identity_updates.new {
/// println!("A new identity has been added {}", identity.user_id());
/// }
/// }
/// # anyhow::Ok(()) };
/// ```
pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
let olm = self.client.olm_machine().await;
let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
let client = self.client.to_owned();

Ok(olm
.store()
.user_identities_stream()
.map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
}

/// Create and upload a new cross signing identity.
///
/// # Arguments
Expand Down

0 comments on commit 4a01294

Please sign in to comment.