Skip to content

Commit

Permalink
refactor: Move chat-related code from sync to chat module
Browse files Browse the repository at this point in the history
- Reduce cross-module dependencies.
- Stop bloating the `sync` module while implementing synchronisation of more entities.
- Now there's the only `ChatId` :)
  • Loading branch information
iequidoo committed Nov 11, 2023
1 parent 48f900e commit 0f3036f
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 228 deletions.
240 changes: 208 additions & 32 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::receive_imf::ReceivedMsg;
use crate::smtp::send_msg_to_smtp;
use crate::sql;
use crate::stock_str;
use crate::sync::{self, ChatAction, Sync::*, SyncData};
use crate::sync::{self, Sync::*, SyncData};
use crate::tools::{
buf_compress, create_id, create_outgoing_rfc724_mid, create_smeared_timestamp,
create_smeared_timestamps, get_abs_path, gm2local_offset, improve_single_line_input,
Expand Down Expand Up @@ -399,7 +399,7 @@ impl ChatId {

if sync.into() {
// NB: For a 1:1 chat this currently triggers `Contact::block()` on other devices.
chat.sync(context, ChatAction::Block).await.ok();
chat.sync(context, SyncAction::Block).await.ok();
}
Ok(())
}
Expand All @@ -417,7 +417,7 @@ impl ChatId {
// TODO: For a 1:1 chat this currently triggers `Contact::unblock()` on other devices.
// Maybe we should unblock the contact locally too, this would also resolve discrepancy
// with `block()` which also blocks the contact.
chat.sync(context, ChatAction::Unblock).await.ok();
chat.sync(context, SyncAction::Unblock).await.ok();
}
Ok(())
}
Expand Down Expand Up @@ -465,7 +465,7 @@ impl ChatId {
}

if sync.into() {
chat.sync(context, ChatAction::Accept).await.ok();
chat.sync(context, SyncAction::Accept).await.ok();
}
Ok(())
}
Expand Down Expand Up @@ -605,7 +605,7 @@ impl ChatId {

if sync.into() {
let chat = Chat::load_from_db(context, self).await?;
chat.sync(context, ChatAction::SetVisibility(visibility))
chat.sync(context, SyncAction::SetVisibility(visibility))
.await
.ok();
}
Expand Down Expand Up @@ -1895,18 +1895,18 @@ impl Chat {
Ok(msg.id)
}

/// Sends a `ChatAction` synchronising chat contacts to other devices.
/// Sends a `SyncAction` synchronising chat contacts to other devices.
pub(crate) async fn sync_contacts(&self, context: &Context) -> Result<()> {
let mut addrs = Vec::new();
for contact_id in get_chat_contacts(context, self.id).await? {
let contact = Contact::get_by_id(context, contact_id).await?;
addrs.push(contact.get_addr().to_string());
}
self.sync(context, ChatAction::SetContacts(addrs)).await
self.sync(context, SyncAction::SetContacts(addrs)).await
}

/// Returns chat id for the purpose of synchronisation across devices.
async fn get_sync_id(&self, context: &Context) -> Result<Option<sync::ChatId>> {
async fn get_sync_id(&self, context: &Context) -> Result<Option<SyncId>> {
match self.typ {
Chattype::Single => {
let mut r = None;
Expand All @@ -1918,29 +1918,29 @@ impl Chat {
return Ok(None);
}
let contact = Contact::get_by_id(context, contact_id).await?;
r = Some(sync::ChatId::ContactAddr(contact.get_addr().to_string()));
r = Some(SyncId::ContactAddr(contact.get_addr().to_string()));
}
Ok(r)
}
Chattype::Broadcast | Chattype::Group | Chattype::Mailinglist => {
if self.grpid.is_empty() {
return Ok(None);
}
Ok(Some(sync::ChatId::Grpid(self.grpid.clone())))
Ok(Some(SyncId::Grpid(self.grpid.clone())))
}
}
}

/// Synchronises a chat action to other devices.
pub(crate) async fn sync(&self, context: &Context, action: ChatAction) -> Result<()> {
pub(crate) async fn sync(&self, context: &Context, action: SyncAction) -> Result<()> {
if let Some(id) = self.get_sync_id(context).await? {
sync(context, id, action).await?;
}
Ok(())
}
}

async fn sync(context: &Context, id: sync::ChatId, action: ChatAction) -> Result<()> {
async fn sync(context: &Context, id: SyncId, action: SyncAction) -> Result<()> {
context
.add_sync_item(SyncData::AlterChat { id, action })
.await?;
Expand Down Expand Up @@ -3253,8 +3253,8 @@ async fn create_broadcast_list_ex(

context.emit_msgs_changed_without_ids();
if sync.into() {
let id = sync::ChatId::Grpid(grpid);
let action = ChatAction::CreateBroadcast(chat_name);
let id = SyncId::Grpid(grpid);
let action = SyncAction::CreateBroadcast(chat_name);
self::sync(context, id, action).await.ok();
}
Ok(chat_id)
Expand Down Expand Up @@ -3527,7 +3527,7 @@ pub(crate) async fn set_muted_ex(
context.emit_event(EventType::ChatModified(chat_id));
if sync.into() {
let chat = Chat::load_from_db(context, chat_id).await?;
chat.sync(context, ChatAction::SetMuted(duration))
chat.sync(context, SyncAction::SetMuted(duration))
.await
.ok();
}
Expand Down Expand Up @@ -4180,26 +4180,45 @@ async fn set_contacts_by_addrs(context: &Context, id: ChatId, addrs: &[String])
Ok(())
}

/// A cross-device chat id used for synchronisation.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub(crate) enum SyncId {
ContactAddr(String),
Grpid(String),
// NOTE: Ad-hoc groups lack an identifier that can be used across devices so
// block/mute/etc. actions on them are not synchronized to other devices.
}

/// An action synchronised to other devices.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub(crate) enum SyncAction {
Block,
Unblock,
Accept,
SetVisibility(ChatVisibility),
SetMuted(MuteDuration),
/// Create broadcast list with the given name.
CreateBroadcast(String),
/// Set chat contacts by their addresses.
SetContacts(Vec<String>),
}

impl Context {
/// Executes [`SyncData::AlterChat`] item sent by other device.
pub(crate) async fn sync_alter_chat(
&self,
id: &sync::ChatId,
action: &ChatAction,
) -> Result<()> {
pub(crate) async fn sync_alter_chat(&self, id: &SyncId, action: &SyncAction) -> Result<()> {
let chat_id = match id {
sync::ChatId::ContactAddr(addr) => {
SyncId::ContactAddr(addr) => {
let Some(contact_id) =
Contact::lookup_id_by_addr_ex(self, addr, Origin::Unknown, None).await?
else {
warn!(self, "sync_alter_chat: No contact for addr '{addr}'.");
return Ok(());
};
match action {
ChatAction::Block => {
SyncAction::Block => {
return contact::set_blocked(self, Nosync, contact_id, true).await
}
ChatAction::Unblock => {
SyncAction::Unblock => {
return contact::set_blocked(self, Nosync, contact_id, false).await
}
_ => (),
Expand All @@ -4210,8 +4229,8 @@ impl Context {
};
chat_id
}
sync::ChatId::Grpid(grpid) => {
if let ChatAction::CreateBroadcast(name) = action {
SyncId::Grpid(grpid) => {
if let SyncAction::CreateBroadcast(name) = action {
create_broadcast_list_ex(self, Nosync, grpid.clone(), name.clone()).await?;
return Ok(());
}
Expand All @@ -4223,15 +4242,15 @@ impl Context {
}
};
match action {
ChatAction::Block => chat_id.block_ex(self, Nosync).await,
ChatAction::Unblock => chat_id.unblock_ex(self, Nosync).await,
ChatAction::Accept => chat_id.accept_ex(self, Nosync).await,
ChatAction::SetVisibility(v) => chat_id.set_visibility_ex(self, Nosync, *v).await,
ChatAction::SetMuted(duration) => set_muted_ex(self, Nosync, chat_id, *duration).await,
ChatAction::CreateBroadcast(_) => {
SyncAction::Block => chat_id.block_ex(self, Nosync).await,
SyncAction::Unblock => chat_id.unblock_ex(self, Nosync).await,
SyncAction::Accept => chat_id.accept_ex(self, Nosync).await,
SyncAction::SetVisibility(v) => chat_id.set_visibility_ex(self, Nosync, *v).await,
SyncAction::SetMuted(duration) => set_muted_ex(self, Nosync, chat_id, *duration).await,
SyncAction::CreateBroadcast(_) => {
bail!("sync_alter_chat({id:?}, {action:?}): Bad request.")
}
ChatAction::SetContacts(addrs) => set_contacts_by_addrs(self, chat_id, addrs).await,
SyncAction::SetContacts(addrs) => set_contacts_by_addrs(self, chat_id, addrs).await,
}
}
}
Expand All @@ -4245,6 +4264,7 @@ mod tests {
use crate::message::delete_msgs;
use crate::receive_imf::receive_imf;
use crate::test_utils::{TestContext, TestContextManager};
use strum::IntoEnumIterator;
use tokio::fs;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -6723,4 +6743,160 @@ mod tests {
);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_sync_alter_chat() -> Result<()> {
let alices = [
TestContext::new_alice().await,
TestContext::new_alice().await,
];
for a in &alices {
a.set_config_bool(Config::SyncMsgs, true).await?;
}
let bob = TestContext::new_bob().await;

let ba_chat = bob.create_chat(&alices[0]).await;
let sent_msg = bob.send_text(ba_chat.id, "hi").await;
let a0b_chat_id = alices[0].recv_msg(&sent_msg).await.chat_id;
alices[1].recv_msg(&sent_msg).await;
let ab_contact_ids = [
alices[0].add_or_lookup_contact(&bob).await.id,
alices[1].add_or_lookup_contact(&bob).await.id,
];

async fn sync(alices: &[TestContext]) -> Result<()> {
let sync_msg = alices.get(0).unwrap().pop_sent_msg().await;
alices.get(1).unwrap().recv_msg(&sync_msg).await;
Ok(())
}

assert_eq!(alices[1].get_chat(&bob).await.blocked, Blocked::Request);
a0b_chat_id.accept(&alices[0]).await?;
sync(&alices).await?;
assert_eq!(alices[1].get_chat(&bob).await.blocked, Blocked::Not);
a0b_chat_id.block(&alices[0]).await?;
sync(&alices).await?;
assert_eq!(alices[1].get_chat(&bob).await.blocked, Blocked::Yes);
a0b_chat_id.unblock(&alices[0]).await?;
sync(&alices).await?;
assert_eq!(alices[1].get_chat(&bob).await.blocked, Blocked::Not);

// Unblocking a 1:1 chat doesn't unblock the contact currently.
Contact::unblock(&alices[0], ab_contact_ids[0]).await?;

assert!(!alices[1].add_or_lookup_contact(&bob).await.is_blocked());
Contact::block(&alices[0], ab_contact_ids[0]).await?;
sync(&alices).await?;
assert!(alices[1].add_or_lookup_contact(&bob).await.is_blocked());
Contact::unblock(&alices[0], ab_contact_ids[0]).await?;
sync(&alices).await?;
assert!(!alices[1].add_or_lookup_contact(&bob).await.is_blocked());

// Test accepting and blocking groups. This way we test:
// - Group chats synchronisation.
// - That blocking a group deletes it on other devices.
let fiona = TestContext::new_fiona().await;
let fiona_grp_chat_id = fiona
.create_group_with_members(ProtectionStatus::Unprotected, "grp", &[&alices[0]])
.await;
let sent_msg = fiona.send_text(fiona_grp_chat_id, "hi").await;
let a0_grp_chat_id = alices[0].recv_msg(&sent_msg).await.chat_id;
let a1_grp_chat_id = alices[1].recv_msg(&sent_msg).await.chat_id;
let a1_grp_chat = Chat::load_from_db(&alices[1], a1_grp_chat_id).await?;
assert_eq!(a1_grp_chat.blocked, Blocked::Request);
a0_grp_chat_id.accept(&alices[0]).await?;
sync(&alices).await?;
let a1_grp_chat = Chat::load_from_db(&alices[1], a1_grp_chat_id).await?;
assert_eq!(a1_grp_chat.blocked, Blocked::Not);
a0_grp_chat_id.block(&alices[0]).await?;
sync(&alices).await?;
assert!(Chat::load_from_db(&alices[1], a1_grp_chat_id)
.await
.is_err());
assert!(
!alices[1]
.sql
.exists("SELECT COUNT(*) FROM chats WHERE id=?", (a1_grp_chat_id,))
.await?
);

// Test syncing of chat visibility on a self-chat. This way we test:
// - Self-chat synchronisation.
// - That sync messages don't unarchive the self-chat.
let a0self_chat_id = alices[0].get_self_chat().await.id;
assert_eq!(
alices[1].get_self_chat().await.get_visibility(),
ChatVisibility::Normal
);
let mut visibilities =
ChatVisibility::iter().chain(std::iter::once(ChatVisibility::Normal));
visibilities.next();
for v in visibilities {
a0self_chat_id.set_visibility(&alices[0], v).await?;
sync(&alices).await?;
for a in &alices {
assert_eq!(a.get_self_chat().await.get_visibility(), v);
}
}

assert_eq!(
alices[1].get_chat(&bob).await.mute_duration,
MuteDuration::NotMuted
);
let mute_durations = [
MuteDuration::Forever,
MuteDuration::Until(SystemTime::now() + Duration::from_secs(42)),
MuteDuration::NotMuted,
];
for m in mute_durations {
set_muted(&alices[0], a0b_chat_id, m).await?;
sync(&alices).await?;
let m = match m {
MuteDuration::Until(time) => MuteDuration::Until(
SystemTime::UNIX_EPOCH
+ Duration::from_secs(
time.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
),
),
_ => m,
};
assert_eq!(alices[1].get_chat(&bob).await.mute_duration, m);
}

let a0_broadcast_id = create_broadcast_list(&alices[0]).await?;
let a0_broadcast_chat = Chat::load_from_db(&alices[0], a0_broadcast_id).await?;
set_chat_name(&alices[0], a0_broadcast_id, "Broadcast list 42").await?;
sync(&alices).await?;
let a1_broadcast_id = get_chat_id_by_grpid(&alices[1], &a0_broadcast_chat.grpid)
.await?
.unwrap()
.0;
let a1_broadcast_chat = Chat::load_from_db(&alices[1], a1_broadcast_id).await?;
assert_eq!(a1_broadcast_chat.get_type(), Chattype::Broadcast);
// TODO: Implement synchronisation of `set_chat_name()`.
// assert_eq!(a1_broadcast_chat.get_name(), "Broadcast list 42");
assert!(get_chat_contacts(&alices[1], a1_broadcast_id)
.await?
.is_empty());
add_contact_to_chat(&alices[0], a0_broadcast_id, ab_contact_ids[0]).await?;
sync(&alices).await?;
assert_eq!(
get_chat_contacts(&alices[1], a1_broadcast_id).await?,
vec![ab_contact_ids[1]]
);
let sent_msg = alices[1].send_text(a1_broadcast_id, "hi").await;
let msg = bob.recv_msg(&sent_msg).await;
let chat = Chat::load_from_db(&bob, msg.chat_id).await?;
assert_eq!(chat.get_type(), Chattype::Mailinglist);
// TODO: It doesn't work now for some reason, `msg.chat_id == DC_CHAT_ID_TRASH`.
// let msg = alices[0].recv_msg(&sent_msg).await;
// assert_eq!(msg.chat_id, a0_broadcast_id);
remove_contact_from_chat(&alices[0], a0_broadcast_id, ab_contact_ids[0]).await?;
sync(&alices).await?;
assert!(get_chat_contacts(&alices[1], a1_broadcast_id)
.await?
.is_empty());

Ok(())
}
}
6 changes: 3 additions & 3 deletions src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,12 +1480,12 @@ WHERE type=? AND id IN (

if sync.into() {
let action = match new_blocking {
true => sync::ChatAction::Block,
false => sync::ChatAction::Unblock,
true => chat::SyncAction::Block,
false => chat::SyncAction::Unblock,
};
context
.add_sync_item(SyncData::AlterChat {
id: sync::ChatId::ContactAddr(contact.addr.clone()),
id: chat::SyncId::ContactAddr(contact.addr.clone()),
action,
})
.await?;
Expand Down
Loading

0 comments on commit 0f3036f

Please sign in to comment.