Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish timetable snapshot in background #5974

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider {
*/
private final SiriTripPatternCache tripPatternCache;

private final TransitEditorService transitService;
/**
* Long-lived transit editor service that has access to the timetable snapshot buffer.
* This differs from the usual use case where the transit service refers to the latest published
* timetable snapshot.
*/
private final TransitEditorService transitEditorService;

private final TimetableSnapshotManager snapshotManager;

Expand All @@ -71,9 +76,10 @@ public SiriTimetableSnapshotSource(
parameters,
() -> LocalDate.now(transitModel.getTimeZone())
);
this.transitService = new DefaultTransitService(transitModel);
this.transitEditorService =
new DefaultTransitService(transitModel, getTimetableSnapshotBuffer());
this.tripPatternCache =
new SiriTripPatternCache(tripPatternIdGenerator, transitService::getPatternForTrip);
new SiriTripPatternCache(tripPatternIdGenerator, transitEditorService::getPatternForTrip);

transitModel.initTimetableSnapshotProvider(this);
}
Expand Down Expand Up @@ -102,26 +108,22 @@ public UpdateResult applyEstimatedTimetable(

List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();

snapshotManager.withLock(() -> {
if (incrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}
if (incrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}

for (var etDelivery : updates) {
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
for (EstimatedVehicleJourney journey : journeys) {
results.add(apply(journey, transitService, fuzzyTripMatcher, entityResolver));
}
for (var etDelivery : updates) {
for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) {
var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies();
LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size());
for (EstimatedVehicleJourney journey : journeys) {
results.add(apply(journey, transitEditorService, fuzzyTripMatcher, entityResolver));
}
}
}

LOG.debug("message contains {} trip updates", updates.size());

snapshotManager.purgeAndCommit();
});
LOG.debug("message contains {} trip updates", updates.size());

return UpdateResult.ofResults(results);
}
Expand All @@ -131,6 +133,10 @@ public TimetableSnapshot getTimetableSnapshot() {
return snapshotManager.getTimetableSnapshot();
}

private TimetableSnapshot getTimetableSnapshotBuffer() {
return snapshotManager.getTimetableSnapshotBuffer();
}

private Result<UpdateSuccess, UpdateError> apply(
EstimatedVehicleJourney journey,
TransitEditorService transitService,
Expand Down Expand Up @@ -195,11 +201,7 @@ private boolean shouldAddNewTrip(
* Snapshot timetable is used as source if initialised, trip patterns scheduled timetable if not.
*/
private Timetable getCurrentTimetable(TripPattern tripPattern, LocalDate serviceDate) {
TimetableSnapshot timetableSnapshot = getTimetableSnapshot();
if (timetableSnapshot != null) {
return timetableSnapshot.resolve(tripPattern, serviceDate);
}
return tripPattern.getScheduledTimetable();
return getTimetableSnapshotBuffer().resolve(tripPattern, serviceDate);
}

private Result<TripUpdate, UpdateError> handleModifiedTrip(
Expand Down Expand Up @@ -228,7 +230,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(

if (trip != null) {
// Found exact match
pattern = transitService.getPatternForTrip(trip);
pattern = transitEditorService.getPatternForTrip(trip);
} else if (fuzzyTripMatcher != null) {
// No exact match found - search for trips based on arrival-times/stop-patterns
TripAndPattern tripAndPattern = fuzzyTripMatcher.match(
Expand Down Expand Up @@ -263,7 +265,7 @@ private Result<TripUpdate, UpdateError> handleModifiedTrip(
pattern,
estimatedVehicleJourney,
serviceDate,
transitService.getTimeZone(),
transitEditorService.getTimeZone(),
entityResolver
)
.build();
Expand Down Expand Up @@ -310,7 +312,7 @@ private Result<UpdateSuccess, UpdateError> addTripToGraphAndBuffer(TripUpdate tr
private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDate) {
boolean success = false;

final TripPattern pattern = transitService.getPatternForTrip(trip);
final TripPattern pattern = transitEditorService.getPatternForTrip(trip);

if (pattern != null) {
// Mark scheduled trip times for this trip in this pattern as deleted
Expand All @@ -329,4 +331,11 @@ private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDat

return success;
}

/**
* Flush pending changes in the timetable snapshot buffer and publish a new snapshot.
*/
public void flushBuffer() {
snapshotManager.purgeAndCommit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
* up timetables on this class could conceivably be replaced with snapshotting entire views of the
* transit network. It would also be possible to make the realtime version of Timetables or
* TripTimes the primary view, and include references back to their scheduled versions.
* <p>
* Implementation note: when a snapshot is committed, the mutable state of this class is stored
* in final fields and completely initialized in the constructor. This provides an additional
* guarantee of safe-publication without synchronization.
* (see <a href="https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.5">final Field Semantics</a>)
*/
public class TimetableSnapshot {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public DefaultTransitService(TransitModel transitModel) {
this.transitModelIndex = transitModel.getTransitModelIndex();
}

public DefaultTransitService(
TransitModel transitModel,
TimetableSnapshot timetableSnapshotBuffer
) {
this(transitModel);
this.timetableSnapshot = timetableSnapshotBuffer;
}

@Override
public Collection<String> getFeedIds() {
return this.transitModel.getFeedIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.ext.siri.updater.SiriETUpdater;
import org.opentripplanner.ext.siri.updater.SiriSXUpdater;
Expand All @@ -20,6 +21,7 @@
import org.opentripplanner.updater.UpdatersParameters;
import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.TimetableSnapshotFlush;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater;
import org.opentripplanner.updater.trip.PollingTripUpdater;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
Expand Down Expand Up @@ -94,6 +96,9 @@ private void configure() {
);

GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph, transitModel, updaters);

configureTimetableSnapshotFlush(updaterManager);

updaterManager.startUpdaters();

// Stop the updater manager if it contains nothing
Expand Down Expand Up @@ -223,4 +228,21 @@ private TimetableSnapshotSource provideGtfsTimetableSnapshot() {
}
return gtfsTimetableSnapshotSource;
}

/**
* If SIRI or GTFS real-time updaters are in use, configure a periodic flush of the timetable
* snapshot.
*/
private void configureTimetableSnapshotFlush(GraphUpdaterManager updaterManager) {
if (siriTimetableSnapshotSource != null || gtfsTimetableSnapshotSource != null) {
updaterManager
.getScheduler()
.scheduleWithFixedDelay(
new TimetableSnapshotFlush(siriTimetableSnapshotSource, gtfsTimetableSnapshotSource),
0,
updatersParameters.timetableSnapshotParameters().maxSnapshotFrequency().toSeconds(),
TimeUnit.SECONDS
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.opentripplanner.updater.spi;

import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Flush the timetable snapshot buffer by committing pending changes.
* Exceptions occurring during the flush are caught and ignored: the scheduler can then retry
* the task later.
*/
public class TimetableSnapshotFlush implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotFlush.class);

private final SiriTimetableSnapshotSource siriTimetableSnapshotSource;
private final TimetableSnapshotSource gtfsTimetableSnapshotSource;

public TimetableSnapshotFlush(
SiriTimetableSnapshotSource siriTimetableSnapshotSource,
TimetableSnapshotSource gtfsTimetableSnapshotSource
) {
this.siriTimetableSnapshotSource = siriTimetableSnapshotSource;
this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource;
}

@Override
public void run() {
try {
LOG.debug("Flushing timetable snapshot buffer");
if (siriTimetableSnapshotSource != null) {
siriTimetableSnapshotSource.flushBuffer();
}
if (gtfsTimetableSnapshotSource != null) {
gtfsTimetableSnapshotSource.flushBuffer();
}
LOG.debug("Flushed timetable snapshot buffer");
} catch (Throwable t) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Throwable opens up for a potensial critical error state, where the OTP Server is in a corrupt state, but does not go down. But the pragmatic programmer in me tells me that we probably have bigger issues in the code, and that we should fokus on those. To fix it we would need to catch both RuntimeException and Throwable. Log both, but take down the application for Throwable. I think it is out-of-scope for this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Is there ever a case where you can recover from an OutOfMemoryError? I think in such a case you really want to let the JVM die.

Copy link
Contributor Author

@vpaturet vpaturet Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a real case in a web server: a request thread uses up all memory, triggers an OOME which makes its thread dies, freeing up the memory again. We had this case in our production environment with some problematic GraphQL queries returning gigabytes of data. If the periodic task was running at the same time as such a request, the OOME could be triggered from the periodic task thread and makes it stop forever.

As a general rule, web servers do not stop because a worker thread dies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also Andrew's comment on that topic: #5974 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two separate concerns here: recovering from errors, and logging errors. Indeed you probably want to just let certain severe errors run their course even if that incapacitates the server. But you nonetheless want to make sure they are logged so you know what happened. If I recall correctly, an error in a Runnable on an Executor can cause the executor thread to die, but silently. Then the tasks just pile up in a queue for no apparent reason, which would be very confusing to debug.

An OOME within the executor might kill the executor thread. You definitely want to log that, as it will presumably be followed by tasks piling up and the JVM crashing and you'd want to know why.

LOG.error("Error flushing timetable snapshot buffer", t);
vpaturet marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import java.time.LocalDate;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.opentripplanner.framework.time.CountdownTimer;
import org.opentripplanner.model.Timetable;
import org.opentripplanner.model.TimetableSnapshot;
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater;
import org.opentripplanner.routing.util.ConcurrentPublished;
import org.opentripplanner.transit.model.framework.FeedScopedId;
import org.opentripplanner.transit.model.framework.Result;
import org.opentripplanner.transit.model.network.TripPattern;
Expand All @@ -30,36 +29,18 @@ public final class TimetableSnapshotManager {

private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotManager.class);
private final TransitLayerUpdater transitLayerUpdater;
/**
* Lock to indicate that buffer is in use
*/
private final ReentrantLock bufferLock = new ReentrantLock(true);

/**
* The working copy of the timetable snapshot. Should not be visible to routing threads. Should
* only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that
* might modify this buffer will correctly acquire the lock. By design, only one thread should
* ever be writing to this buffer.
* TODO RT_AB: research and document why this lock is needed since only one thread should ever be
* writing to this buffer. One possible reason may be a need to suspend writes while indexing
* and swapping out the buffer. But the original idea was to make a new copy of the buffer
* before re-indexing it. While refactoring or rewriting parts of this system, we could throw
* an exception if a writing section is entered by more than one thread.
* The working copy of the timetable snapshot. Should not be visible to routing threads.
* By design, only one thread should ever be writing to this buffer.
*/
private final TimetableSnapshot buffer = new TimetableSnapshot();

/**
* The last committed snapshot that was handed off to a routing thread. This snapshot may be given
* to more than one routing thread if the maximum snapshot frequency is exceeded.
* to more than one routing thread.
*/
private volatile TimetableSnapshot snapshot = null;

/**
* If a timetable snapshot is requested less than this number of milliseconds after the previous
* snapshot, just return the same one. Throttles the potentially resource-consuming task of
* duplicating a TripPattern -> Timetable map and indexing the new Timetables.
*/
private final CountdownTimer snapshotFrequencyThrottle;
private final ConcurrentPublished<TimetableSnapshot> snapshot = new ConcurrentPublished<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I already know the answer but want to ask anyway: the use of ConcurrentPublished adds another layer of safety even though it isn't strictly necessary as there are never two writer threads active at the same time. Correct?

Or is there a case where its synchronization features are useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization has 2 properties: mutual exclusion and safe publication.
Mutual exclusion is not needed here since, like you said, there is only one writer thread and publishing the snapshot is an atomic operation (updating a reference).
What we need here is safe publication, that is: making sure that the reader thread sees all the modifications done by the writer thread. In practice it means purging the CPU caches and registers so that the reader thread does not use stale data.
Using a volatile field, as done in the current implementation, provides also a guarantee of safe publication, but it is a bit more obscure than using the ConcurrentPublished mechanism.

More details there: https://shipilev.net/blog/2014/safe-public-construction/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConcurrentPublished is simple enough, but would the Java AtomicReference be a better alternative. There are pros/cons here, and in the end I think we will revisit this, so maybe leave it for now.

Copy link
Member

@abyrd abyrd Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that AtomicReference is used when you need to do a compare-and-swap (check that the reference is the one you think it is before replacing it). Isn't the ConcurrentPublished synchronization doing something deeper here by imposing happens-before, and blocking reordering-based optimizations?

An example of the problem I think it's avoiding:
Code running on the single graph-writer thread changes some sub-object referenced by the snapshot, such as a TripTimes. It then updates the shared reference to the snapshot containing those TripTimes, but due to some reordering optimization (which would be harmless in a single threaded environment) the snapshot reference becomes visible before the new TripTimes sub-object becomes visible. Even though setting a reference to the snapshot would always be atomic (no word-tearing when seen from other threads), there's no guarantee that the reading thread sees the new snapshot reference after the changes to the TripTimes it contains are in place. Synchronization should establish a kind of reordering "fence" where everything appearing higher up in the source code is definitely visible to the reader.

We're talking about a place where there's likely to be read/write contention only once every 1-5 seconds so extremely rarely at the CPU-timescale. It's probably harmless to synchronize as ConcurrentPublished does, to get the benefit of strong ordering effects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vpaturet checked the Javadoc in detail. For these purposes, volatile, AtomicReference (Javadoc on the atomic package rather than one class), and synchronization all work because they all establish happens-before relationships, and we don't have to deal with read/write contention here.


/**
* Should expired real-time data be purged from the graph.
Expand All @@ -85,7 +66,6 @@ public TimetableSnapshotManager(
Supplier<LocalDate> localDateNow
) {
this.transitLayerUpdater = transitLayerUpdater;
this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency());
this.purgeExpiredData = parameters.purgeExpiredData();
this.localDateNow = Objects.requireNonNull(localDateNow);
// Force commit so that snapshot initializes
Expand All @@ -99,19 +79,11 @@ public TimetableSnapshotManager(
* to the snapshot to release resources.
*/
public TimetableSnapshot getTimetableSnapshot() {
// Try to get a lock on the buffer
if (bufferLock.tryLock()) {
// Make a new snapshot if necessary
try {
commitTimetableSnapshot(false);
return snapshot;
} finally {
bufferLock.unlock();
}
}
// No lock could be obtained because there is either a snapshot commit busy or updates
// are applied at this moment, just return the current snapshot
return snapshot;
return snapshot.get();
}

public TimetableSnapshot getTimetableSnapshotBuffer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add Javadoc under what circumstances you want to have the buffer rather than the snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return buffer;
}

/**
Expand All @@ -122,21 +94,12 @@ public TimetableSnapshot getTimetableSnapshot() {
*
* @param force Force the committing of a new snapshot even if the above conditions are not met.
*/
public void commitTimetableSnapshot(final boolean force) {
if (force || snapshotFrequencyThrottle.timeIsUp()) {
if (force || buffer.isDirty()) {
LOG.debug("Committing {}", buffer);
snapshot = buffer.commit(transitLayerUpdater, force);

// We only reset the timer when the snapshot is updated. This will cause the first
// update to be committed after a silent period. This should not have any effect in
// a busy updater. It is however useful when manually testing the updater.
snapshotFrequencyThrottle.restart();
} else {
LOG.debug("Buffer was unchanged, keeping old snapshot.");
}
void commitTimetableSnapshot(final boolean force) {
if (force || buffer.isDirty()) {
LOG.debug("Committing {}", buffer);
snapshot.publish(buffer.commit(transitLayerUpdater, force));
} else {
LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot);
LOG.debug("Buffer was unchanged, keeping old snapshot.");
}
}

Expand Down Expand Up @@ -205,22 +168,6 @@ private boolean purgeExpiredData() {
return buffer.purgeExpiredData(previously);
}

/**
* Execute a {@code Runnable} with a locked snapshot buffer and release the lock afterwards. While
* the action of locking and unlocking is not complicated to do for calling code, this method
* exists so that the lock instance is a private field.
*/
public void withLock(Runnable action) {
bufferLock.lock();

try {
action.run();
} finally {
// Always release lock
bufferLock.unlock();
}
}

/**
* Clear all data of snapshot for the provided feed id
*/
Expand Down
Loading
Loading