-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish timetable snapshot in background #5974
Changes from 10 commits
2c34a4e
892449a
78dd01f
2050550
79802f0
104ee66
0f77374
850a2be
7fe6999
d1d7f43
cd8e29e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package org.opentripplanner.updater.spi; | ||
|
||
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; | ||
import org.opentripplanner.updater.trip.TimetableSnapshotSource; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Flush the timetable snapshot buffer by committing pending changes. | ||
* Exceptions occurring during the flush are caught and ignored: the scheduler can then retry | ||
* the task later. | ||
*/ | ||
public class TimetableSnapshotFlush implements Runnable { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotFlush.class); | ||
|
||
private final SiriTimetableSnapshotSource siriTimetableSnapshotSource; | ||
private final TimetableSnapshotSource gtfsTimetableSnapshotSource; | ||
|
||
public TimetableSnapshotFlush( | ||
SiriTimetableSnapshotSource siriTimetableSnapshotSource, | ||
TimetableSnapshotSource gtfsTimetableSnapshotSource | ||
) { | ||
this.siriTimetableSnapshotSource = siriTimetableSnapshotSource; | ||
this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
LOG.debug("Flushing timetable snapshot buffer"); | ||
if (siriTimetableSnapshotSource != null) { | ||
siriTimetableSnapshotSource.flushBuffer(); | ||
} | ||
if (gtfsTimetableSnapshotSource != null) { | ||
gtfsTimetableSnapshotSource.flushBuffer(); | ||
} | ||
LOG.debug("Flushed timetable snapshot buffer"); | ||
} catch (Throwable t) { | ||
LOG.error("Error flushing timetable snapshot buffer", t); | ||
vpaturet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,13 +2,12 @@ | |
|
||
import java.time.LocalDate; | ||
import java.util.Objects; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.function.Supplier; | ||
import javax.annotation.Nullable; | ||
import org.opentripplanner.framework.time.CountdownTimer; | ||
import org.opentripplanner.model.Timetable; | ||
import org.opentripplanner.model.TimetableSnapshot; | ||
import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater; | ||
import org.opentripplanner.routing.util.ConcurrentPublished; | ||
import org.opentripplanner.transit.model.framework.FeedScopedId; | ||
import org.opentripplanner.transit.model.framework.Result; | ||
import org.opentripplanner.transit.model.network.TripPattern; | ||
|
@@ -30,36 +29,18 @@ public final class TimetableSnapshotManager { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotManager.class); | ||
private final TransitLayerUpdater transitLayerUpdater; | ||
/** | ||
* Lock to indicate that buffer is in use | ||
*/ | ||
private final ReentrantLock bufferLock = new ReentrantLock(true); | ||
|
||
/** | ||
* The working copy of the timetable snapshot. Should not be visible to routing threads. Should | ||
* only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that | ||
* might modify this buffer will correctly acquire the lock. By design, only one thread should | ||
* ever be writing to this buffer. | ||
* TODO RT_AB: research and document why this lock is needed since only one thread should ever be | ||
* writing to this buffer. One possible reason may be a need to suspend writes while indexing | ||
* and swapping out the buffer. But the original idea was to make a new copy of the buffer | ||
* before re-indexing it. While refactoring or rewriting parts of this system, we could throw | ||
* an exception if a writing section is entered by more than one thread. | ||
* The working copy of the timetable snapshot. Should not be visible to routing threads. | ||
* By design, only one thread should ever be writing to this buffer. | ||
*/ | ||
private final TimetableSnapshot buffer = new TimetableSnapshot(); | ||
|
||
/** | ||
* The last committed snapshot that was handed off to a routing thread. This snapshot may be given | ||
* to more than one routing thread if the maximum snapshot frequency is exceeded. | ||
* to more than one routing thread. | ||
*/ | ||
private volatile TimetableSnapshot snapshot = null; | ||
|
||
/** | ||
* If a timetable snapshot is requested less than this number of milliseconds after the previous | ||
* snapshot, just return the same one. Throttles the potentially resource-consuming task of | ||
* duplicating a TripPattern -> Timetable map and indexing the new Timetables. | ||
*/ | ||
private final CountdownTimer snapshotFrequencyThrottle; | ||
private final ConcurrentPublished<TimetableSnapshot> snapshot = new ConcurrentPublished<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I already know the answer but want to ask anyway: the use of Or is there a case where its synchronization features are useful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Synchronization has 2 properties: mutual exclusion and safe publication. More details there: https://shipilev.net/blog/2014/safe-public-construction/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that AtomicReference is used when you need to do a compare-and-swap (check that the reference is the one you think it is before replacing it). Isn't the ConcurrentPublished synchronization doing something deeper here by imposing happens-before, and blocking reordering-based optimizations? An example of the problem I think it's avoiding: We're talking about a place where there's likely to be read/write contention only once every 1-5 seconds so extremely rarely at the CPU-timescale. It's probably harmless to synchronize as ConcurrentPublished does, to get the benefit of strong ordering effects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vpaturet checked the Javadoc in detail. For these purposes, volatile, AtomicReference (Javadoc on the |
||
|
||
/** | ||
* Should expired real-time data be purged from the graph. | ||
|
@@ -85,7 +66,6 @@ public TimetableSnapshotManager( | |
Supplier<LocalDate> localDateNow | ||
) { | ||
this.transitLayerUpdater = transitLayerUpdater; | ||
this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency()); | ||
this.purgeExpiredData = parameters.purgeExpiredData(); | ||
this.localDateNow = Objects.requireNonNull(localDateNow); | ||
// Force commit so that snapshot initializes | ||
|
@@ -99,19 +79,11 @@ public TimetableSnapshotManager( | |
* to the snapshot to release resources. | ||
*/ | ||
public TimetableSnapshot getTimetableSnapshot() { | ||
// Try to get a lock on the buffer | ||
if (bufferLock.tryLock()) { | ||
// Make a new snapshot if necessary | ||
try { | ||
commitTimetableSnapshot(false); | ||
return snapshot; | ||
} finally { | ||
bufferLock.unlock(); | ||
} | ||
} | ||
// No lock could be obtained because there is either a snapshot commit busy or updates | ||
// are applied at this moment, just return the current snapshot | ||
return snapshot; | ||
return snapshot.get(); | ||
} | ||
|
||
public TimetableSnapshot getTimetableSnapshotBuffer() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add Javadoc under what circumstances you want to have the buffer rather than the snapshot? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return buffer; | ||
} | ||
|
||
/** | ||
|
@@ -122,21 +94,12 @@ public TimetableSnapshot getTimetableSnapshot() { | |
* | ||
* @param force Force the committing of a new snapshot even if the above conditions are not met. | ||
*/ | ||
public void commitTimetableSnapshot(final boolean force) { | ||
if (force || snapshotFrequencyThrottle.timeIsUp()) { | ||
if (force || buffer.isDirty()) { | ||
LOG.debug("Committing {}", buffer); | ||
snapshot = buffer.commit(transitLayerUpdater, force); | ||
|
||
// We only reset the timer when the snapshot is updated. This will cause the first | ||
// update to be committed after a silent period. This should not have any effect in | ||
// a busy updater. It is however useful when manually testing the updater. | ||
snapshotFrequencyThrottle.restart(); | ||
} else { | ||
LOG.debug("Buffer was unchanged, keeping old snapshot."); | ||
} | ||
void commitTimetableSnapshot(final boolean force) { | ||
if (force || buffer.isDirty()) { | ||
LOG.debug("Committing {}", buffer); | ||
snapshot.publish(buffer.commit(transitLayerUpdater, force)); | ||
} else { | ||
LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot); | ||
LOG.debug("Buffer was unchanged, keeping old snapshot."); | ||
} | ||
} | ||
|
||
|
@@ -205,22 +168,6 @@ private boolean purgeExpiredData() { | |
return buffer.purgeExpiredData(previously); | ||
} | ||
|
||
/** | ||
* Execute a {@code Runnable} with a locked snapshot buffer and release the lock afterwards. While | ||
* the action of locking and unlocking is not complicated to do for calling code, this method | ||
* exists so that the lock instance is a private field. | ||
*/ | ||
public void withLock(Runnable action) { | ||
bufferLock.lock(); | ||
|
||
try { | ||
action.run(); | ||
} finally { | ||
// Always release lock | ||
bufferLock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Clear all data of snapshot for the provided feed id | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Throwable
opens up for a potensial critical error state, where the OTP Server is in a corrupt state, but does not go down. But the pragmatic programmer in me tells me that we probably have bigger issues in the code, and that we should fokus on those. To fix it we would need to catch both RuntimeException and Throwable. Log both, but take down the application for Throwable. I think it is out-of-scope for this PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
Is there ever a case where you can recover from an
OutOfMemoryError
? I think in such a case you really want to let the JVM die.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a real case in a web server: a request thread uses up all memory, triggers an OOME which makes its thread dies, freeing up the memory again. We had this case in our production environment with some problematic GraphQL queries returning gigabytes of data. If the periodic task was running at the same time as such a request, the OOME could be triggered from the periodic task thread and makes it stop forever.
As a general rule, web servers do not stop because a worker thread dies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also Andrew's comment on that topic: #5974 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two separate concerns here: recovering from errors, and logging errors. Indeed you probably want to just let certain severe errors run their course even if that incapacitates the server. But you nonetheless want to make sure they are logged so you know what happened. If I recall correctly, an error in a Runnable on an Executor can cause the executor thread to die, but silently. Then the tasks just pile up in a queue for no apparent reason, which would be very confusing to debug.
An OOME within the executor might kill the executor thread. You definitely want to log that, as it will presumably be followed by tasks piling up and the JVM crashing and you'd want to know why.