Skip to content

Commit

Permalink
Introduce snapshot flush updater
Browse files Browse the repository at this point in the history
  • Loading branch information
vpaturet committed Jul 17, 2024
1 parent 634375a commit 944b615
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,8 @@ private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDat

return success;
}

public void flushBuffer() {
snapshotManager.purgeAndCommit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opentripplanner.updater.UpdatersParameters;
import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.TimetableSnapshotFlushUpdater;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater;
import org.opentripplanner.updater.trip.PollingTripUpdater;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;
Expand Down Expand Up @@ -201,6 +202,18 @@ private List<GraphUpdater> createUpdatersFromConfig() {
updaters.add(new SiriAzureSXUpdater(configItem, transitModel));
}

// If SIRI or GTFS real-time updaters are in use, configure a periodic flush of the timetable
// snapshot
if (siriTimetableSnapshotSource != null || gtfsTimetableSnapshotSource != null) {
updaters.add(
new TimetableSnapshotFlushUpdater(
updatersParameters.timetableSnapshotParameters(),
siriTimetableSnapshotSource,
gtfsTimetableSnapshotSource
)
);
}

return updaters;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.opentripplanner.updater.spi;

import java.time.Duration;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.updater.TimetableSnapshotSourceParameters;
import org.opentripplanner.updater.trip.TimetableSnapshotSource;

/**
* Updater that flushes the snapshot buffer by committing pending changes.
* The updater runs at the same frequency as the timetable snapshot throttle frequency. See
* {@link org.opentripplanner.updater.TimetableSnapshotSourceParameters#withMaxSnapshotFrequency(Duration)}
* It means that pending changes are committed at least once per throttling period.
* This is useful for a system where real-time updates are sent in batches followed by silent periods.
* In this case flushing the buffer periodically guarantees that a throttled update will be applied timely.
* In a system that receives a steady flow of updates, this updater is not required but does not
* hurt performance either.
*/
public class TimetableSnapshotFlushUpdater extends PollingGraphUpdater {

private final SiriTimetableSnapshotSource siriTimetableSnapshotSource;
private final TimetableSnapshotSource gtfsTimetableSnapshotSource;
protected WriteToGraphCallback saveResultOnGraph;

public TimetableSnapshotFlushUpdater(
TimetableSnapshotSourceParameters config,
SiriTimetableSnapshotSource siriTimetableSnapshotSource,
TimetableSnapshotSource gtfsTimetableSnapshotSource
) {
super(buildConfig(config));
this.siriTimetableSnapshotSource = siriTimetableSnapshotSource;
this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource;
}

private static PollingGraphUpdaterParameters buildConfig(
TimetableSnapshotSourceParameters config
) {
return new PollingGraphUpdaterParameters() {
@Override
public Duration frequency() {
return config.maxSnapshotFrequency();
}

@Override
public String configRef() {
return "TIMETABLE_SNAPSHOT_FLUSH_UPDATER";
}
};
}

@Override
protected void runPolling() {
saveResultOnGraph.execute((graph, transitModel) -> {
if (siriTimetableSnapshotSource != null) {
siriTimetableSnapshotSource.flushBuffer();
}
if (gtfsTimetableSnapshotSource != null) {
gtfsTimetableSnapshotSource.flushBuffer();
}
});
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
saveResultOnGraph = writeToGraphCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1097,4 +1097,8 @@ private enum CancelationType {
CANCEL,
DELETE,
}

public void flushBuffer() {
snapshotManager.purgeAndCommit();
}
}

0 comments on commit 944b615

Please sign in to comment.