Skip to content

Commit

Permalink
Fix missing events in the feed due to out of order commits
Browse files Browse the repository at this point in the history
Events may become available to read by the event-feed API controller
out of event_id order because they are (sometimes) created inside a
transaction. The old code was only looking for events with ID larger
than the last one emitted, which could lead to skipping events.
This happened at the WFs in Luxor especially with judging events for
a compiler error, as inside a transaction two events are logged and
then the scoreboard is recalculated, leaving plenty of time for
another event to get logged in the mean time.

Fix this by querying for *all* events and making sure that we're
not missing any sequential event_ids. If we do wait and retry for
one second, and only then skip these. This means that event
filtering must now be done in code rather than in the SQL query.
  • Loading branch information
DOMjudge team authored and meisterT committed Apr 21, 2024
1 parent 9f44e13 commit be30cbb
Showing 1 changed file with 70 additions and 22 deletions.
92 changes: 70 additions & 22 deletions webapp/src/Controller/API/ContestController.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Metadata\MetadataFactoryInterface;
use Nelmio\ApiDocBundle\Annotation\Model;
use OpenApi\Attributes as OA;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\ExpressionLanguage\Expression;
use Symfony\Component\HttpKernel\Attribute\MapQueryParameter;
Expand Down Expand Up @@ -63,6 +64,7 @@ public function __construct(
ConfigurationService $config,
EventLogService $eventLogService,
protected readonly ImportExportService $importExportService,
protected readonly LoggerInterface $logger,
protected readonly AssetUpdateService $assetUpdater
) {
parent::__construct($entityManager, $dj, $config, $eventLogService);
Expand Down Expand Up @@ -651,7 +653,7 @@ public function getEventFeedAction(
$response->headers->set('Content-Type', 'application/x-ndjson');
$response->setCallback(function () use ($format, $cid, $contest, $request, $since_id, $types, $strict, $stream, $metadataFactory, $kernel) {
$lastUpdate = 0;
$lastIdSent = $since_id;
$lastIdSent = max(0, $since_id); // Don't try to look for event_id=0
$typeFilter = false;
if ($types) {
$typeFilter = explode(',', $types);
Expand Down Expand Up @@ -714,39 +716,77 @@ public function getEventFeedAction(
// Reload the contest as the above method will clear the entity manager.
$contest = $this->getContestWithId($request, $cid);

$missingEventRetries = 0;
while (true) {
// Add missing state events that should have happened already.
$this->eventLogService->addMissingStateEvents($contest);

$qb = $this->em->createQueryBuilder()
// We fetch *all* events after the last seen to check that
// we don't skip events that are committed out of order.
$q = $this->em->createQueryBuilder()
->from(Event::class, 'e')
->select('e')
->andWhere('e.eventid > :lastIdSent')
->setParameter('lastIdSent', $lastIdSent)
->andWhere('e.contest = :cid')
->setParameter('cid', $contest->getCid())
->orderBy('e.eventid', 'ASC');

if ($typeFilter !== false) {
$qb = $qb
->andWhere('e.endpointtype IN (:types)')
->setParameter('types', $typeFilter);
}
if (!$canViewAll) {
$restricted_types = ['judgements', 'runs', 'clarifications'];
if ($contest->getStarttime() === null || Utils::now() < $contest->getStarttime()) {
$restricted_types[] = 'problems';
->orderBy('e.eventid', 'ASC')
->getQuery();

/** @var Event[] $events */
$events = $q->getResult();

// Look for any missing sequential events and wait for them to
// be committed if so.
$missingEvents = false;
$expectedId = $lastIdSent + 1;
$lastFoundId = null;
foreach ($events as $event) {
if ($event->getEventid() !== $expectedId) {
$missingEvents = true;
$lastFoundId = $event->getEventid();
break;
}
$qb = $qb
->andWhere('e.endpointtype NOT IN (:restricted_types)')
->setParameter('restricted_types', $restricted_types);
$expectedId++;
}
if ($missingEvents) {
if ($missingEventRetries==0) {
$this->logger->info(
'Detected missing events %d ... %d, waiting for these to appear',
[$expectedId, $lastFoundId-1]
);
}
if (++$missingEventRetries < 10) {
usleep(100 * 1000);
continue;
}

$q = $qb->getQuery();
// We've decided to permanently ignore these non-existing
// events for this connection. The wait for any
// non-committed events was long enough.
//
// There might be multiple non-existing events. Log the
// first consecutive gap of non-existing events. A consecutive
// gap is guaranteed since the events are ordered.
$this->logger->warning(
'Waited too long for missing events %d ... %d, skipping',
[$expectedId, $lastFoundId-1]
);
}
$missingEventRetries = 0;

/** @var Event[] $events */
$events = $q->getResult();
$numEventsSent = 0;
foreach ($events as $event) {
// Filter out unwanted events
if ($event->getContest()->getCid() !== $contest->getCid()) continue;
if ($typeFilter !== false &&
!in_array($event->getEndpointtype(), $typeFilter)) continue;
if (!$canViewAll) {
$restricted_types = ['judgements', 'runs', 'clarifications'];
if ($contest->getStarttime() === null || Utils::now() < $contest->getStarttime()) {
$restricted_types[] = 'problems';
}
if (in_array($event->getEndpointtype(), $restricted_types)) continue;
}

$data = $event->getContent();
// Filter fields with specific access restrictions.
if (!$canViewAll) {
Expand Down Expand Up @@ -814,9 +854,17 @@ public function getEventFeedAction(
flush();
$lastUpdate = Utils::now();
$lastIdSent = $event->getEventid();
$numEventsSent++;

if ($missingEvents && $event->getEventid() >= $lastFoundId) {
// The first event after the first gap has been emitted. Stop
// emitting events and restart the gap detection logic to find
// any potential gaps after this last emitted event.
break;
}
}

if (count($events) == 0) {
if ($numEventsSent == 0) {
if (!$stream) {
break;
}
Expand Down

0 comments on commit be30cbb

Please sign in to comment.