Skip to content

Commit

Permalink
fix(sync): force full sync when the server reports QRESYNC
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Steinmetz <[email protected]>
  • Loading branch information
st3iny committed Oct 26, 2023
1 parent c5f993d commit 624c826
Showing 1 changed file with 156 additions and 143 deletions.
299 changes: 156 additions & 143 deletions lib/Service/Sync/ImapToDbSynchronizer.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace OCA\Mail\Service\Sync;

use Horde_Imap_Client;
use Horde_Imap_Client_Base;
use Horde_Imap_Client_Exception;
use OCA\Mail\Account;
use OCA\Mail\Contracts\IMailManager;
Expand Down Expand Up @@ -217,6 +218,22 @@ public function sync(Account $account,
return $rebuildThreads;
}

$client = $this->clientFactory->getClient($account);
$client->login(); // Need to login before fetching capabilities.

// There is no partial sync when using QRESYNC. As per RFC the client will always pull
// all changes. This is a cheap operation when using QRESYNC as the server keeps track
// of a client's state through the sync token. We could just update the sync tokens and
// call it a day because Horde caches unrelated/unrequested changes until the next
// operation. However, our cache is not reliable as some instance might use APCu which
// isn't shared between cron and web requests.
if ($client->capability->isEnabled('QRESYNC')) {
$this->logger->debug("Forcing full sync due to QRESYNC");
$criteria |= Horde_Imap_Client::SYNC_NEWMSGSUIDS
| Horde_Imap_Client::SYNC_FLAGSUIDS
| Horde_Imap_Client::SYNC_VANISHEDUIDS;
}

if ($force || ($criteria & Horde_Imap_Client::SYNC_NEWMSGSUIDS)) {
$logger->debug("Locking mailbox " . $mailbox->getId() . " for new messages sync");
$this->mailboxMapper->lockForNewSync($mailbox);
Expand All @@ -236,24 +253,24 @@ public function sync(Account $account,
|| $mailbox->getSyncChangedToken() === null
|| $mailbox->getSyncVanishedToken() === null) {
$logger->debug("Running initial sync for " . $mailbox->getId());
$this->runInitialSync($account, $mailbox, $logger);
$this->runInitialSync($client, $account, $mailbox, $logger);
} else {
try {
$logger->debug("Running partial sync for " . $mailbox->getId());
// Only rebuild threads if there were new or vanished messages
$rebuildThreads = $this->runPartialSync($account, $mailbox, $logger, $criteria, $knownUids);
$rebuildThreads = $this->runPartialSync($client, $account, $mailbox, $logger, $criteria, $knownUids);
} catch (UidValidityChangedException $e) {
$logger->warning('Mailbox UID validity changed. Wiping cache and performing full sync for ' . $mailbox->getId());
$this->resetCache($account, $mailbox);
$logger->debug("Running initial sync for " . $mailbox->getId() . " after cache reset");
$this->runInitialSync($account, $mailbox, $logger);
$this->runInitialSync($client, $account, $mailbox, $logger);
} catch (MailboxDoesNotSupportModSequencesException $e) {
$logger->warning('Mailbox does not support mod-sequences error occured. Wiping cache and performing full sync for ' . $mailbox->getId(), [
'exception' => $e,
]);
$this->resetCache($account, $mailbox);
$logger->debug("Running initial sync for " . $mailbox->getId() . " after cache reset - no mod-sequences error");
$this->runInitialSync($account, $mailbox, $logger);
$this->runInitialSync($client, $account, $mailbox, $logger);
}
}
} catch (ServiceException $e) {
Expand All @@ -274,6 +291,8 @@ public function sync(Account $account,
$logger->debug("Unlocking mailbox " . $mailbox->getId() . " from new messages sync");
$this->mailboxMapper->unlockFromNewSync($mailbox);
}

$client->logout();
}

if (!$batchSync) {
Expand All @@ -293,7 +312,9 @@ public function sync(Account $account,
* @throws ServiceException
* @throws IncompleteSyncException
*/
private function runInitialSync(Account $account,
private function runInitialSync(
Horde_Imap_Client_Base $client,
Account $account,
Mailbox $mailbox,
LoggerInterface $logger): void {
$perf = $this->performanceLogger->startWithLogger(
Expand All @@ -302,50 +323,45 @@ private function runInitialSync(Account $account,
);

$highestKnownUid = $this->dbMapper->findHighestUid($mailbox);
$client = $this->clientFactory->getClient($account, false);
try {
try {
$imapMessages = $this->imapMapper->findAll(
$client,
$mailbox->getName(),
self::MAX_NEW_MESSAGES,
$highestKnownUid ?? 0,
$logger,
$perf,
$account->getUserId(),
);
$perf->step(sprintf('fetch %d messages from IMAP', count($imapMessages)));
} catch (Horde_Imap_Client_Exception $e) {
throw new ServiceException('Can not get messages from mailbox ' . $mailbox->getName() . ': ' . $e->getMessage(), 0, $e);
}

foreach (array_chunk($imapMessages['messages'], 500) as $chunk) {
$messages = array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk);
$this->dbMapper->insertBulk($account, ...$messages);
$perf->step(sprintf('persist %d messages in database', count($chunk)));
// Free the memory
unset($messages);
}
$imapMessages = $this->imapMapper->findAll(
$client,
$mailbox->getName(),
self::MAX_NEW_MESSAGES,
$highestKnownUid ?? 0,
$logger,
$perf,
$account->getUserId(),
);
$perf->step(sprintf('fetch %d messages from IMAP', count($imapMessages)));
} catch (Horde_Imap_Client_Exception $e) {
throw new ServiceException('Can not get messages from mailbox ' . $mailbox->getName() . ': ' . $e->getMessage(), 0, $e);
}

if (!$imapMessages['all']) {
// We might need more attempts to fill the cache
$loggingMailboxId = $account->getId() . ':' . $mailbox->getName();
$total = $imapMessages['total'];
$cached = count($this->messageMapper->findAllUids($mailbox));
$perf->step('find number of cached UIDs');
foreach (array_chunk($imapMessages['messages'], 500) as $chunk) {
$messages = array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk);
$this->dbMapper->insertBulk($account, ...$messages);
$perf->step(sprintf('persist %d messages in database', count($chunk)));
// Free the memory
unset($messages);
}

$perf->end();
throw new IncompleteSyncException("Initial sync is not complete for $loggingMailboxId ($cached of $total messages cached).");
}
if (!$imapMessages['all']) {
// We might need more attempts to fill the cache
$loggingMailboxId = $account->getId() . ':' . $mailbox->getName();
$total = $imapMessages['total'];
$cached = count($this->messageMapper->findAllUids($mailbox));
$perf->step('find number of cached UIDs');

$mailbox->setSyncNewToken($client->getSyncToken($mailbox->getName()));
$mailbox->setSyncChangedToken($client->getSyncToken($mailbox->getName()));
$mailbox->setSyncVanishedToken($client->getSyncToken($mailbox->getName()));
} finally {
$client->logout();
$perf->end();
throw new IncompleteSyncException("Initial sync is not complete for $loggingMailboxId ($cached of $total messages cached).");
}

$mailbox->setSyncNewToken($client->getSyncToken($mailbox->getName()));
$mailbox->setSyncChangedToken($client->getSyncToken($mailbox->getName()));
$mailbox->setSyncVanishedToken($client->getSyncToken($mailbox->getName()));
$this->mailboxMapper->update($mailbox);

$perf->end();
Expand All @@ -358,7 +374,9 @@ private function runInitialSync(Account $account,
* @throws UidValidityChangedException
* @return bool whether there are new or vanished messages
*/
private function runPartialSync(Account $account,
private function runPartialSync(
Horde_Imap_Client_Base $client,
Account $account,
Mailbox $mailbox,
LoggerInterface $logger,
int $criteria,
Expand All @@ -369,114 +387,109 @@ private function runPartialSync(Account $account,
$logger
);

$client = $this->clientFactory->getClient($account);
try {
$uids = $knownUids ?? $this->dbMapper->findAllUids($mailbox);
$perf->step('get all known UIDs');

if ($criteria & Horde_Imap_Client::SYNC_NEWMSGSUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncNewToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_NEWMSGSUIDS
);
$perf->step('get new messages via Horde');

$highestKnownUid = $this->dbMapper->findHighestUid($mailbox);
if ($highestKnownUid === null) {
// Everything is relevant
$newMessages = $response->getNewMessages();
} else {
// Filter out anything that is already in the DB. Ideally this never happens, but if there is an error
// during a consecutive chunk INSERT, the sync token won't be updated. In that case the same message(s)
// will be seen as *new* and therefore cause conflicts.
$newMessages = array_filter($response->getNewMessages(), static function (IMAPMessage $imapMessage) use ($highestKnownUid) {
return $imapMessage->getUid() > $highestKnownUid;
});
}
$uids = $knownUids ?? $this->dbMapper->findAllUids($mailbox);
$perf->step('get all known UIDs');

foreach (array_chunk($newMessages, 500) as $chunk) {
$dbMessages = array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk);
if ($criteria & Horde_Imap_Client::SYNC_NEWMSGSUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncNewToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_NEWMSGSUIDS
);
$perf->step('get new messages via Horde');

$this->dbMapper->insertBulk($account, ...$dbMessages);
$highestKnownUid = $this->dbMapper->findHighestUid($mailbox);
if ($highestKnownUid === null) {
// Everything is relevant
$newMessages = $response->getNewMessages();
} else {
// Filter out anything that is already in the DB. Ideally this never happens, but if there is an error
// during a consecutive chunk INSERT, the sync token won't be updated. In that case the same message(s)
// will be seen as *new* and therefore cause conflicts.
$newMessages = array_filter($response->getNewMessages(), static function (IMAPMessage $imapMessage) use ($highestKnownUid) {
return $imapMessage->getUid() > $highestKnownUid;
});
}

$this->dispatcher->dispatch(
NewMessagesSynchronized::class,
new NewMessagesSynchronized($account, $mailbox, $dbMessages)
);
$perf->step('classified a chunk of new messages');
}
$perf->step('persist new messages');
foreach (array_chunk($newMessages, 500) as $chunk) {
$dbMessages = array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk);

$mailbox->setSyncNewToken($client->getSyncToken($mailbox->getName()));
$newOrVanished = $newMessages !== [];
}
if ($criteria & Horde_Imap_Client::SYNC_FLAGSUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncChangedToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_FLAGSUIDS
$this->dbMapper->insertBulk($account, ...$dbMessages);

$this->dispatcher->dispatch(
NewMessagesSynchronized::class,
new NewMessagesSynchronized($account, $mailbox, $dbMessages)
);
$perf->step('get changed messages via Horde');
$perf->step('classified a chunk of new messages');
}
$perf->step('persist new messages');

$permflagsEnabled = $this->mailManager->isPermflagsEnabled($client, $account, $mailbox->getName());
$mailbox->setSyncNewToken($client->getSyncToken($mailbox->getName()));
$newOrVanished = $newMessages !== [];
}
if ($criteria & Horde_Imap_Client::SYNC_FLAGSUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncChangedToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_FLAGSUIDS
);
$perf->step('get changed messages via Horde');

foreach (array_chunk($response->getChangedMessages(), 500) as $chunk) {
$this->dbMapper->updateBulk($account, $permflagsEnabled, ...array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk));
}
$perf->step('persist changed messages');

// If a list of UIDs was *provided* (as opposed to loaded from the DB,
// we can not assume that all changes were detected, hence this is kinda
// a silent sync and we don't update the change token until the next full
// mailbox sync
if ($knownUids === null) {
$mailbox->setSyncChangedToken($client->getSyncToken($mailbox->getName()));
}
$permflagsEnabled = $this->mailManager->isPermflagsEnabled($client, $account, $mailbox->getName());

foreach (array_chunk($response->getChangedMessages(), 500) as $chunk) {
$this->dbMapper->updateBulk($account, $permflagsEnabled, ...array_map(static function (IMAPMessage $imapMessage) use ($mailbox, $account) {
return $imapMessage->toDbMessage($mailbox->getId(), $account->getMailAccount());
}, $chunk));
}
if ($criteria & Horde_Imap_Client::SYNC_VANISHEDUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncVanishedToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_VANISHEDUIDS
);
$perf->step('get vanished messages via Horde');
$perf->step('persist changed messages');

// If a list of UIDs was *provided* (as opposed to loaded from the DB,
// we can not assume that all changes were detected, hence this is kinda
// a silent sync and we don't update the change token until the next full
// mailbox sync
if ($knownUids === null) {
$mailbox->setSyncChangedToken($client->getSyncToken($mailbox->getName()));
}
}
if ($criteria & Horde_Imap_Client::SYNC_VANISHEDUIDS) {
$response = $this->synchronizer->sync(
$client,
new Request(
$mailbox->getName(),
$mailbox->getSyncVanishedToken(),
$uids
),
$account->getUserId(),
Horde_Imap_Client::SYNC_VANISHEDUIDS
);
$perf->step('get vanished messages via Horde');

foreach (array_chunk($response->getVanishedMessageUids(), 500) as $chunk) {
$this->dbMapper->deleteByUid($mailbox, ...$chunk);
}
$perf->step('delete vanished messages');

// If a list of UIDs was *provided* (as opposed to loaded from the DB,
// we can not assume that all changes were detected, hence this is kinda
// a silent sync and we don't update the vanish token until the next full
// mailbox sync
if ($knownUids === null) {
$mailbox->setSyncVanishedToken($client->getSyncToken($mailbox->getName()));
}
$newOrVanished = $newOrVanished || !empty($response->getVanishedMessageUids());
foreach (array_chunk($response->getVanishedMessageUids(), 500) as $chunk) {
$this->dbMapper->deleteByUid($mailbox, ...$chunk);
}
} finally {
$client->logout();
$perf->step('delete vanished messages');

// If a list of UIDs was *provided* (as opposed to loaded from the DB,
// we can not assume that all changes were detected, hence this is kinda
// a silent sync and we don't update the vanish token until the next full
// mailbox sync
if ($knownUids === null) {
$mailbox->setSyncVanishedToken($client->getSyncToken($mailbox->getName()));
}
$newOrVanished = $newOrVanished || !empty($response->getVanishedMessageUids());
}
$this->mailboxMapper->update($mailbox);
$perf->end();
Expand Down

0 comments on commit 624c826

Please sign in to comment.