Skip to content

Commit

Permalink
fix: Delete messages from SMTP queue only on user demand (#4579)
Browse files Browse the repository at this point in the history
I.e. from delete_msgs(). Otherwise messages must not be deleted from there, e.g. if a message is
ephemeral, but a network outage lasts longer than the ephemeral message timer, the message still
must be sent upon a successful reconnection.
  • Loading branch information
iequidoo committed Aug 24, 2023
1 parent 95b2a15 commit a66f8bd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 44 deletions.
33 changes: 30 additions & 3 deletions src/ephemeral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,14 +1062,14 @@ mod tests {
delete_expired_messages(t, not_deleted_at).await?;

let loaded = Message::load_from_db(t, msg_id).await?;
assert_eq!(loaded.text, "Message text");
assert!(!loaded.text.is_empty());
assert_eq!(loaded.chat_id, chat.id);

assert!(next_expiration < deleted_at);
delete_expired_messages(t, deleted_at).await?;
t.evtracker
.get_matching(|evt| {
if let EventType::MsgsChanged {
if let EventType::MsgDeleted {
msg_id: event_msg_id,
..
} = evt
Expand All @@ -1082,7 +1082,6 @@ mod tests {
.await;

let loaded = Message::load_from_db(t, msg_id).await?;
assert_eq!(loaded.text, "");
assert_eq!(loaded.chat_id, DC_CHAT_ID_TRASH);

// Check that the msg was deleted locally.
Expand Down Expand Up @@ -1300,4 +1299,32 @@ mod tests {

Ok(())
}

// Tests that if we are offline for a time longer than the ephemeral timer duration, the message
// is deleted from the chat but is still in the "smtp" table, i.e. will be sent upon a
// successful reconnection.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_ephemeral_msg_offline() -> Result<()> {
let alice = TestContext::new_alice().await;
let chat = alice
.create_chat_with_contact("Bob", "[email protected]")
.await;
let duration = 60;
chat.id
.set_ephemeral_timer(&alice, Timer::Enabled { duration })
.await?;
let mut msg = Message::new(Viewtype::Text);
msg.set_text("hi".to_string());
assert!(chat::send_msg_sync(&alice, chat.id, &mut msg)
.await
.is_err());
let stmt = "SELECT COUNT(*) FROM smtp WHERE msg_id=?";
assert!(alice.sql.exists(stmt, (msg.id,)).await?);
let now = time();
check_msg_will_be_deleted(&alice, msg.id, &chat, now, now + i64::from(duration) + 1)
.await?;
assert!(alice.sql.exists(stmt, (msg.id,)).await?);

Ok(())
}
}
55 changes: 32 additions & 23 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<Vec<u8
/// and scheduling for deletion on IMAP.
pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
let mut modified_chat_ids = BTreeSet::new();
let mut res = Ok(());

for &msg_id in msg_ids {
let msg = Message::load_from_db(context, msg_id).await?;
Expand All @@ -1444,13 +1445,19 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
modified_chat_ids.insert(msg.chat_id);

let target = context.get_delete_msgs_target().await?;
context
.sql
.execute(
let update_db = |conn: &mut rusqlite::Connection| {
conn.execute(
"UPDATE imap SET target=? WHERE rfc724_mid=?",
(target, msg.rfc724_mid),
)
.await?;
)?;
conn.execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,))?;
Ok(())
};
if let Err(e) = context.sql.call_write(update_db).await {
error!(context, "delete_msgs: failed to update db: {e:#}.");
res = Err(e);
continue;
}

let logging_xdc_id = context
.debug_logging
Expand All @@ -1465,6 +1472,7 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
}
}
}
res?;

for modified_chat_id in modified_chat_ids {
context.emit_msgs_changed(modified_chat_id, MsgId::new(0));
Expand Down Expand Up @@ -1633,24 +1641,6 @@ pub(crate) async fn update_msg_state(

// Context functions to work with messages

/// Returns true if given message ID exists in the database and is not trashed.
pub(crate) async fn exists(context: &Context, msg_id: MsgId) -> Result<bool> {
if msg_id.is_special() {
return Ok(false);
}

let chat_id: Option<ChatId> = context
.sql
.query_get_value("SELECT chat_id FROM msgs WHERE id=?;", (msg_id,))
.await?;

if let Some(chat_id) = chat_id {
Ok(!chat_id.is_trash())
} else {
Ok(false)
}
}

pub(crate) async fn set_msg_failed(context: &Context, msg_id: MsgId, error: &str) {
if let Ok(mut msg) = Message::load_from_db(context, msg_id).await {
if msg.state.can_fail() {
Expand Down Expand Up @@ -2422,4 +2412,23 @@ def hello():

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_delete_msgs_offline() -> Result<()> {
let alice = TestContext::new_alice().await;
let chat = alice
.create_chat_with_contact("Bob", "[email protected]")
.await;
let mut msg = Message::new(Viewtype::Text);
msg.set_text("hi".to_string());
assert!(chat::send_msg_sync(&alice, chat.id, &mut msg)
.await
.is_err());
let stmt = "SELECT COUNT(*) FROM smtp WHERE msg_id=?";
assert!(alice.sql.exists(stmt, (msg.id,)).await?);
delete_msgs(&alice, &[msg.id]).await?;
assert!(!alice.sql.exists(stmt, (msg.id,)).await?);

Ok(())
}
}
18 changes: 0 additions & 18 deletions src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,24 +566,6 @@ pub(crate) async fn send_msg_to_smtp(
)
.collect::<Vec<_>>();

// If there is a msg-id and it does not exist in the db, cancel sending. this happens if
// delete_msgs() was called before the generated mime was sent out.
if !message::exists(context, msg_id)
.await
.with_context(|| format!("failed to check message {msg_id} existence"))?
{
info!(
context,
"Sending of message {msg_id} (entry {rowid}) was cancelled by the user."
);
context
.sql
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
.await
.context("failed to remove cancelled message from smtp table")?;
return Ok(());
}

let status = smtp_send(context, &recipients_list, body.as_str(), smtp, msg_id).await;

match status {
Expand Down

0 comments on commit a66f8bd

Please sign in to comment.