diff --git a/src/ext/java/org/opentripplanner/ext/siri/SiriTimetableSnapshotSource.java b/src/ext/java/org/opentripplanner/ext/siri/SiriTimetableSnapshotSource.java index 7ab316bfc55..9e2dfaff76b 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/SiriTimetableSnapshotSource.java +++ b/src/ext/java/org/opentripplanner/ext/siri/SiriTimetableSnapshotSource.java @@ -57,7 +57,12 @@ public class SiriTimetableSnapshotSource implements TimetableSnapshotProvider { */ private final SiriTripPatternCache tripPatternCache; - private final TransitEditorService transitService; + /** + * Long-lived transit editor service that has access to the timetable snapshot buffer. + * This differs from the usual use case where the transit service refers to the latest published + * timetable snapshot. + */ + private final TransitEditorService transitEditorService; private final TimetableSnapshotManager snapshotManager; @@ -71,9 +76,10 @@ public SiriTimetableSnapshotSource( parameters, () -> LocalDate.now(transitModel.getTimeZone()) ); - this.transitService = new DefaultTransitService(transitModel); + this.transitEditorService = + new DefaultTransitService(transitModel, getTimetableSnapshotBuffer()); this.tripPatternCache = - new SiriTripPatternCache(tripPatternIdGenerator, transitService::getPatternForTrip); + new SiriTripPatternCache(tripPatternIdGenerator, transitEditorService::getPatternForTrip); transitModel.initTimetableSnapshotProvider(this); } @@ -102,26 +108,22 @@ public UpdateResult applyEstimatedTimetable( List> results = new ArrayList<>(); - snapshotManager.withLock(() -> { - if (incrementality == FULL_DATASET) { - // Remove all updates from the buffer - snapshotManager.clearBuffer(feedId); - } + if (incrementality == FULL_DATASET) { + // Remove all updates from the buffer + snapshotManager.clearBuffer(feedId); + } - for (var etDelivery : updates) { - for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) { - var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies(); - LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size()); - for (EstimatedVehicleJourney journey : journeys) { - results.add(apply(journey, transitService, fuzzyTripMatcher, entityResolver)); - } + for (var etDelivery : updates) { + for (var estimatedJourneyVersion : etDelivery.getEstimatedJourneyVersionFrames()) { + var journeys = estimatedJourneyVersion.getEstimatedVehicleJourneies(); + LOG.debug("Handling {} EstimatedVehicleJourneys.", journeys.size()); + for (EstimatedVehicleJourney journey : journeys) { + results.add(apply(journey, transitEditorService, fuzzyTripMatcher, entityResolver)); } } + } - LOG.debug("message contains {} trip updates", updates.size()); - - snapshotManager.purgeAndCommit(); - }); + LOG.debug("message contains {} trip updates", updates.size()); return UpdateResult.ofResults(results); } @@ -131,6 +133,10 @@ public TimetableSnapshot getTimetableSnapshot() { return snapshotManager.getTimetableSnapshot(); } + private TimetableSnapshot getTimetableSnapshotBuffer() { + return snapshotManager.getTimetableSnapshotBuffer(); + } + private Result apply( EstimatedVehicleJourney journey, TransitEditorService transitService, @@ -195,11 +201,7 @@ private boolean shouldAddNewTrip( * Snapshot timetable is used as source if initialised, trip patterns scheduled timetable if not. */ private Timetable getCurrentTimetable(TripPattern tripPattern, LocalDate serviceDate) { - TimetableSnapshot timetableSnapshot = getTimetableSnapshot(); - if (timetableSnapshot != null) { - return timetableSnapshot.resolve(tripPattern, serviceDate); - } - return tripPattern.getScheduledTimetable(); + return getTimetableSnapshotBuffer().resolve(tripPattern, serviceDate); } private Result handleModifiedTrip( @@ -228,7 +230,7 @@ private Result handleModifiedTrip( if (trip != null) { // Found exact match - pattern = transitService.getPatternForTrip(trip); + pattern = transitEditorService.getPatternForTrip(trip); } else if (fuzzyTripMatcher != null) { // No exact match found - search for trips based on arrival-times/stop-patterns TripAndPattern tripAndPattern = fuzzyTripMatcher.match( @@ -263,7 +265,7 @@ private Result handleModifiedTrip( pattern, estimatedVehicleJourney, serviceDate, - transitService.getTimeZone(), + transitEditorService.getTimeZone(), entityResolver ) .build(); @@ -310,7 +312,7 @@ private Result addTripToGraphAndBuffer(TripUpdate tr private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDate) { boolean success = false; - final TripPattern pattern = transitService.getPatternForTrip(trip); + final TripPattern pattern = transitEditorService.getPatternForTrip(trip); if (pattern != null) { // Mark scheduled trip times for this trip in this pattern as deleted @@ -329,4 +331,11 @@ private boolean markScheduledTripAsDeleted(Trip trip, final LocalDate serviceDat return success; } + + /** + * Flush pending changes in the timetable snapshot buffer and publish a new snapshot. + */ + public void flushBuffer() { + snapshotManager.purgeAndCommit(); + } } diff --git a/src/main/java/org/opentripplanner/model/TimetableSnapshot.java b/src/main/java/org/opentripplanner/model/TimetableSnapshot.java index 7d3f6181d44..c0a7737abce 100644 --- a/src/main/java/org/opentripplanner/model/TimetableSnapshot.java +++ b/src/main/java/org/opentripplanner/model/TimetableSnapshot.java @@ -65,6 +65,11 @@ * up timetables on this class could conceivably be replaced with snapshotting entire views of the * transit network. It would also be possible to make the realtime version of Timetables or * TripTimes the primary view, and include references back to their scheduled versions. + *

+ * Implementation note: when a snapshot is committed, the mutable state of this class is stored + * in final fields and completely initialized in the constructor. This provides an additional + * guarantee of safe-publication without synchronization. + * (see final Field Semantics) */ public class TimetableSnapshot { diff --git a/src/main/java/org/opentripplanner/transit/service/DefaultTransitService.java b/src/main/java/org/opentripplanner/transit/service/DefaultTransitService.java index 398cb806524..b3d68b6ffa5 100644 --- a/src/main/java/org/opentripplanner/transit/service/DefaultTransitService.java +++ b/src/main/java/org/opentripplanner/transit/service/DefaultTransitService.java @@ -79,6 +79,14 @@ public DefaultTransitService(TransitModel transitModel) { this.transitModelIndex = transitModel.getTransitModelIndex(); } + public DefaultTransitService( + TransitModel transitModel, + TimetableSnapshot timetableSnapshotBuffer + ) { + this(transitModel); + this.timetableSnapshot = timetableSnapshotBuffer; + } + @Override public Collection getFeedIds() { return this.transitModel.getFeedIds(); diff --git a/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java b/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java index 1e9b730ee3a..103120b7ecb 100644 --- a/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java +++ b/src/main/java/org/opentripplanner/updater/configure/UpdaterConfigurator.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; import org.opentripplanner.ext.siri.updater.SiriETUpdater; import org.opentripplanner.ext.siri.updater.SiriSXUpdater; @@ -20,6 +21,7 @@ import org.opentripplanner.updater.UpdatersParameters; import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater; import org.opentripplanner.updater.spi.GraphUpdater; +import org.opentripplanner.updater.spi.TimetableSnapshotFlush; import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater; import org.opentripplanner.updater.trip.PollingTripUpdater; import org.opentripplanner.updater.trip.TimetableSnapshotSource; @@ -94,6 +96,9 @@ private void configure() { ); GraphUpdaterManager updaterManager = new GraphUpdaterManager(graph, transitModel, updaters); + + configureTimetableSnapshotFlush(updaterManager); + updaterManager.startUpdaters(); // Stop the updater manager if it contains nothing @@ -223,4 +228,21 @@ private TimetableSnapshotSource provideGtfsTimetableSnapshot() { } return gtfsTimetableSnapshotSource; } + + /** + * If SIRI or GTFS real-time updaters are in use, configure a periodic flush of the timetable + * snapshot. + */ + private void configureTimetableSnapshotFlush(GraphUpdaterManager updaterManager) { + if (siriTimetableSnapshotSource != null || gtfsTimetableSnapshotSource != null) { + updaterManager + .getScheduler() + .scheduleWithFixedDelay( + new TimetableSnapshotFlush(siriTimetableSnapshotSource, gtfsTimetableSnapshotSource), + 0, + updatersParameters.timetableSnapshotParameters().maxSnapshotFrequency().toSeconds(), + TimeUnit.SECONDS + ); + } + } } diff --git a/src/main/java/org/opentripplanner/updater/spi/TimetableSnapshotFlush.java b/src/main/java/org/opentripplanner/updater/spi/TimetableSnapshotFlush.java new file mode 100644 index 00000000000..3f5c8f4d23d --- /dev/null +++ b/src/main/java/org/opentripplanner/updater/spi/TimetableSnapshotFlush.java @@ -0,0 +1,43 @@ +package org.opentripplanner.updater.spi; + +import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource; +import org.opentripplanner.updater.trip.TimetableSnapshotSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flush the timetable snapshot buffer by committing pending changes. + * Exceptions occurring during the flush are caught and ignored: the scheduler can then retry + * the task later. + */ +public class TimetableSnapshotFlush implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotFlush.class); + + private final SiriTimetableSnapshotSource siriTimetableSnapshotSource; + private final TimetableSnapshotSource gtfsTimetableSnapshotSource; + + public TimetableSnapshotFlush( + SiriTimetableSnapshotSource siriTimetableSnapshotSource, + TimetableSnapshotSource gtfsTimetableSnapshotSource + ) { + this.siriTimetableSnapshotSource = siriTimetableSnapshotSource; + this.gtfsTimetableSnapshotSource = gtfsTimetableSnapshotSource; + } + + @Override + public void run() { + try { + LOG.debug("Flushing timetable snapshot buffer"); + if (siriTimetableSnapshotSource != null) { + siriTimetableSnapshotSource.flushBuffer(); + } + if (gtfsTimetableSnapshotSource != null) { + gtfsTimetableSnapshotSource.flushBuffer(); + } + LOG.debug("Flushed timetable snapshot buffer"); + } catch (Throwable t) { + LOG.error("Error flushing timetable snapshot buffer", t); + } + } +} diff --git a/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotManager.java b/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotManager.java index bf7c8c98919..b5683a9a62e 100644 --- a/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotManager.java +++ b/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotManager.java @@ -2,13 +2,12 @@ import java.time.LocalDate; import java.util.Objects; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.annotation.Nullable; -import org.opentripplanner.framework.time.CountdownTimer; import org.opentripplanner.model.Timetable; import org.opentripplanner.model.TimetableSnapshot; import org.opentripplanner.routing.algorithm.raptoradapter.transit.mappers.TransitLayerUpdater; +import org.opentripplanner.routing.util.ConcurrentPublished; import org.opentripplanner.transit.model.framework.FeedScopedId; import org.opentripplanner.transit.model.framework.Result; import org.opentripplanner.transit.model.network.TripPattern; @@ -30,36 +29,18 @@ public final class TimetableSnapshotManager { private static final Logger LOG = LoggerFactory.getLogger(TimetableSnapshotManager.class); private final TransitLayerUpdater transitLayerUpdater; - /** - * Lock to indicate that buffer is in use - */ - private final ReentrantLock bufferLock = new ReentrantLock(true); /** - * The working copy of the timetable snapshot. Should not be visible to routing threads. Should - * only be modified by a thread that holds a lock on {@link #bufferLock}. All public methods that - * might modify this buffer will correctly acquire the lock. By design, only one thread should - * ever be writing to this buffer. - * TODO RT_AB: research and document why this lock is needed since only one thread should ever be - * writing to this buffer. One possible reason may be a need to suspend writes while indexing - * and swapping out the buffer. But the original idea was to make a new copy of the buffer - * before re-indexing it. While refactoring or rewriting parts of this system, we could throw - * an exception if a writing section is entered by more than one thread. + * The working copy of the timetable snapshot. Should not be visible to routing threads. + * By design, only one thread should ever be writing to this buffer. */ private final TimetableSnapshot buffer = new TimetableSnapshot(); /** * The last committed snapshot that was handed off to a routing thread. This snapshot may be given - * to more than one routing thread if the maximum snapshot frequency is exceeded. - */ - private volatile TimetableSnapshot snapshot = null; - - /** - * If a timetable snapshot is requested less than this number of milliseconds after the previous - * snapshot, just return the same one. Throttles the potentially resource-consuming task of - * duplicating a TripPattern -> Timetable map and indexing the new Timetables. + * to more than one routing thread. */ - private final CountdownTimer snapshotFrequencyThrottle; + private final ConcurrentPublished snapshot = new ConcurrentPublished<>(); /** * Should expired real-time data be purged from the graph. @@ -85,7 +66,6 @@ public TimetableSnapshotManager( Supplier localDateNow ) { this.transitLayerUpdater = transitLayerUpdater; - this.snapshotFrequencyThrottle = new CountdownTimer(parameters.maxSnapshotFrequency()); this.purgeExpiredData = parameters.purgeExpiredData(); this.localDateNow = Objects.requireNonNull(localDateNow); // Force commit so that snapshot initializes @@ -99,19 +79,17 @@ public TimetableSnapshotManager( * to the snapshot to release resources. */ public TimetableSnapshot getTimetableSnapshot() { - // Try to get a lock on the buffer - if (bufferLock.tryLock()) { - // Make a new snapshot if necessary - try { - commitTimetableSnapshot(false); - return snapshot; - } finally { - bufferLock.unlock(); - } - } - // No lock could be obtained because there is either a snapshot commit busy or updates - // are applied at this moment, just return the current snapshot - return snapshot; + return snapshot.get(); + } + + /** + * @return the current timetable snapshot buffer that contains pending changes (not yet published + * in a snapshot). + * This should be used in the context of an updater to build a TransitEditorService that sees all + * the changes applied so far by real-time updates. + */ + public TimetableSnapshot getTimetableSnapshotBuffer() { + return buffer; } /** @@ -122,21 +100,12 @@ public TimetableSnapshot getTimetableSnapshot() { * * @param force Force the committing of a new snapshot even if the above conditions are not met. */ - public void commitTimetableSnapshot(final boolean force) { - if (force || snapshotFrequencyThrottle.timeIsUp()) { - if (force || buffer.isDirty()) { - LOG.debug("Committing {}", buffer); - snapshot = buffer.commit(transitLayerUpdater, force); - - // We only reset the timer when the snapshot is updated. This will cause the first - // update to be committed after a silent period. This should not have any effect in - // a busy updater. It is however useful when manually testing the updater. - snapshotFrequencyThrottle.restart(); - } else { - LOG.debug("Buffer was unchanged, keeping old snapshot."); - } + void commitTimetableSnapshot(final boolean force) { + if (force || buffer.isDirty()) { + LOG.debug("Committing {}", buffer); + snapshot.publish(buffer.commit(transitLayerUpdater, force)); } else { - LOG.debug("Snapshot frequency exceeded. Reusing snapshot {}", snapshot); + LOG.debug("Buffer was unchanged, keeping old snapshot."); } } @@ -205,22 +174,6 @@ private boolean purgeExpiredData() { return buffer.purgeExpiredData(previously); } - /** - * Execute a {@code Runnable} with a locked snapshot buffer and release the lock afterwards. While - * the action of locking and unlocking is not complicated to do for calling code, this method - * exists so that the lock instance is a private field. - */ - public void withLock(Runnable action) { - bufferLock.lock(); - - try { - action.run(); - } finally { - // Always release lock - bufferLock.unlock(); - } - } - /** * Clear all data of snapshot for the provided feed id */ diff --git a/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotSource.java b/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotSource.java index c452d5f58f8..56e3c380b67 100644 --- a/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotSource.java +++ b/src/main/java/org/opentripplanner/updater/trip/TimetableSnapshotSource.java @@ -91,7 +91,13 @@ public class TimetableSnapshotSource implements TimetableSnapshotProvider { private final TripPatternCache tripPatternCache = new TripPatternCache(); private final ZoneId timeZone; - private final TransitEditorService transitService; + + /** + * Long-lived transit editor service that has access to the timetable snapshot buffer. + * This differs from the usual use case where the transit service refers to the latest published + * timetable snapshot. + */ + private final TransitEditorService transitEditorService; private final Deduplicator deduplicator; @@ -119,7 +125,8 @@ public TimetableSnapshotSource( this.snapshotManager = new TimetableSnapshotManager(transitModel.getTransitLayerUpdater(), parameters, localDateNow); this.timeZone = transitModel.getTimeZone(); - this.transitService = new DefaultTransitService(transitModel); + this.transitEditorService = + new DefaultTransitService(transitModel, snapshotManager.getTimetableSnapshotBuffer()); this.deduplicator = transitModel.getDeduplicator(); this.serviceCodes = transitModel.getServiceCodes(); this.localDateNow = localDateNow; @@ -156,121 +163,117 @@ public UpdateResult applyTripUpdates( Map failuresByRelationship = new HashMap<>(); List> results = new ArrayList<>(); - snapshotManager.withLock(() -> { - if (updateIncrementality == FULL_DATASET) { - // Remove all updates from the buffer - snapshotManager.clearBuffer(feedId); + if (updateIncrementality == FULL_DATASET) { + // Remove all updates from the buffer + snapshotManager.clearBuffer(feedId); + } + + LOG.debug("message contains {} trip updates", updates.size()); + int uIndex = 0; + for (TripUpdate tripUpdate : updates) { + if (!tripUpdate.hasTrip()) { + debug(feedId, "", "Missing TripDescriptor in gtfs-rt trip update: \n{}", tripUpdate); + continue; } - LOG.debug("message contains {} trip updates", updates.size()); - int uIndex = 0; - for (TripUpdate tripUpdate : updates) { - if (!tripUpdate.hasTrip()) { - debug(feedId, "", "Missing TripDescriptor in gtfs-rt trip update: \n{}", tripUpdate); - continue; - } + if (fuzzyTripMatcher != null) { + final TripDescriptor trip = fuzzyTripMatcher.match(feedId, tripUpdate.getTrip()); + tripUpdate = tripUpdate.toBuilder().setTrip(trip).build(); + } - if (fuzzyTripMatcher != null) { - final TripDescriptor trip = fuzzyTripMatcher.match(feedId, tripUpdate.getTrip()); - tripUpdate = tripUpdate.toBuilder().setTrip(trip).build(); - } + final TripDescriptor tripDescriptor = tripUpdate.getTrip(); - final TripDescriptor tripDescriptor = tripUpdate.getTrip(); + if (!tripDescriptor.hasTripId() || tripDescriptor.getTripId().isBlank()) { + debug(feedId, "", "No trip id found for gtfs-rt trip update: \n{}", tripUpdate); + results.add(Result.failure(UpdateError.noTripId(INVALID_INPUT_STRUCTURE))); + continue; + } - if (!tripDescriptor.hasTripId() || tripDescriptor.getTripId().isBlank()) { - debug(feedId, "", "No trip id found for gtfs-rt trip update: \n{}", tripUpdate); - results.add(Result.failure(UpdateError.noTripId(INVALID_INPUT_STRUCTURE))); + FeedScopedId tripId = new FeedScopedId(feedId, tripUpdate.getTrip().getTripId()); + + LocalDate serviceDate; + if (tripDescriptor.hasStartDate()) { + try { + serviceDate = ServiceDateUtils.parseString(tripDescriptor.getStartDate()); + } catch (final ParseException e) { + debug( + tripId, + "Failed to parse start date in gtfs-rt trip update: {}", + tripDescriptor.getStartDate() + ); continue; } + } else { + // TODO: figure out the correct service date. For the special case that a trip + // starts for example at 40:00, yesterday would probably be a better guess. + serviceDate = localDateNow.get(); + } + // Determine what kind of trip update this is + var scheduleRelationship = Objects.requireNonNullElse( + tripDescriptor.getScheduleRelationship(), + SCHEDULED + ); + if (updateIncrementality == DIFFERENTIAL) { + purgePatternModifications(scheduleRelationship, tripId, serviceDate); + } - FeedScopedId tripId = new FeedScopedId(feedId, tripUpdate.getTrip().getTripId()); + uIndex += 1; + LOG.debug("trip update #{} ({} updates) :", uIndex, tripUpdate.getStopTimeUpdateCount()); + LOG.trace("{}", tripUpdate); - LocalDate serviceDate; - if (tripDescriptor.hasStartDate()) { - try { - serviceDate = ServiceDateUtils.parseString(tripDescriptor.getStartDate()); - } catch (final ParseException e) { - debug( + Result result; + try { + result = + switch (scheduleRelationship) { + case SCHEDULED -> handleScheduledTrip( + tripUpdate, tripId, - "Failed to parse start date in gtfs-rt trip update: {}", - tripDescriptor.getStartDate() + serviceDate, + backwardsDelayPropagationType ); - continue; - } - } else { - // TODO: figure out the correct service date. For the special case that a trip - // starts for example at 40:00, yesterday would probably be a better guess. - serviceDate = localDateNow.get(); - } - // Determine what kind of trip update this is - var scheduleRelationship = Objects.requireNonNullElse( - tripDescriptor.getScheduleRelationship(), - SCHEDULED - ); - if (updateIncrementality == DIFFERENTIAL) { - purgePatternModifications(scheduleRelationship, tripId, serviceDate); - } - - uIndex += 1; - LOG.debug("trip update #{} ({} updates) :", uIndex, tripUpdate.getStopTimeUpdateCount()); - LOG.trace("{}", tripUpdate); - - Result result; - try { - result = - switch (scheduleRelationship) { - case SCHEDULED -> handleScheduledTrip( - tripUpdate, - tripId, - serviceDate, - backwardsDelayPropagationType - ); - case ADDED -> validateAndHandleAddedTrip( - tripUpdate, - tripDescriptor, - tripId, - serviceDate - ); - case CANCELED -> handleCanceledTrip( - tripId, - serviceDate, - CancelationType.CANCEL, - updateIncrementality - ); - case DELETED -> handleCanceledTrip( - tripId, - serviceDate, - CancelationType.DELETE, - updateIncrementality - ); - case REPLACEMENT -> validateAndHandleModifiedTrip( - tripUpdate, - tripDescriptor, - tripId, - serviceDate - ); - case UNSCHEDULED -> UpdateError.result(tripId, NOT_IMPLEMENTED_UNSCHEDULED); - case DUPLICATED -> UpdateError.result(tripId, NOT_IMPLEMENTED_DUPLICATED); - }; - } catch (DataValidationException e) { - result = DataValidationExceptionMapper.toResult(e); - } + case ADDED -> validateAndHandleAddedTrip( + tripUpdate, + tripDescriptor, + tripId, + serviceDate + ); + case CANCELED -> handleCanceledTrip( + tripId, + serviceDate, + CancelationType.CANCEL, + updateIncrementality + ); + case DELETED -> handleCanceledTrip( + tripId, + serviceDate, + CancelationType.DELETE, + updateIncrementality + ); + case REPLACEMENT -> validateAndHandleModifiedTrip( + tripUpdate, + tripDescriptor, + tripId, + serviceDate + ); + case UNSCHEDULED -> UpdateError.result(tripId, NOT_IMPLEMENTED_UNSCHEDULED); + case DUPLICATED -> UpdateError.result(tripId, NOT_IMPLEMENTED_DUPLICATED); + }; + } catch (DataValidationException e) { + result = DataValidationExceptionMapper.toResult(e); + } - results.add(result); - if (result.isFailure()) { - debug(tripId, "Failed to apply TripUpdate."); - LOG.trace(" Contents: {}", tripUpdate); - if (failuresByRelationship.containsKey(scheduleRelationship)) { - var c = failuresByRelationship.get(scheduleRelationship); - failuresByRelationship.put(scheduleRelationship, ++c); - } else { - failuresByRelationship.put(scheduleRelationship, 1); - } + results.add(result); + if (result.isFailure()) { + debug(tripId, "Failed to apply TripUpdate."); + LOG.trace(" Contents: {}", tripUpdate); + if (failuresByRelationship.containsKey(scheduleRelationship)) { + var c = failuresByRelationship.get(scheduleRelationship); + failuresByRelationship.put(scheduleRelationship, ++c); + } else { + failuresByRelationship.put(scheduleRelationship, 1); } } - - snapshotManager.purgeAndCommit(); - }); + } var updateResult = UpdateResult.ofResults(results); @@ -368,8 +371,8 @@ private Result handleScheduledTrip( return UpdateError.result(tripId, NO_UPDATES); } - final FeedScopedId serviceId = transitService.getTripForId(tripId).getServiceId(); - final Set serviceDates = transitService + final FeedScopedId serviceId = transitEditorService.getTripForId(tripId).getServiceId(); + final Set serviceDates = transitEditorService .getCalendarService() .getServiceDatesForServiceId(serviceId); if (!serviceDates.contains(serviceDate)) { @@ -412,7 +415,7 @@ private Result handleScheduledTrip( .cancelStops(skippedStopIndices) .build(); - final Trip trip = transitService.getTripForId(tripId); + final Trip trip = transitEditorService.getTripForId(tripId); // Get cached trip pattern or create one if it doesn't exist yet final TripPattern newPattern = tripPatternCache.getOrCreateTripPattern( newStopPattern, @@ -450,7 +453,7 @@ private Result validateAndHandleAddedTrip( // // Check whether trip id already exists in graph - final Trip trip = transitService.getTripForId(tripId); + final Trip trip = transitEditorService.getTripForId(tripId); if (trip != null) { // TODO: should we support this and add a new instantiation of this trip (making it @@ -504,7 +507,7 @@ private List removeUnknownStops(TripUpdate tripUpdate, FeedScope .filter(StopTimeUpdate::hasStopId) .filter(st -> { var stopId = new FeedScopedId(tripId.getFeedId(), st.getStopId()); - var stopFound = transitService.getRegularStop(stopId) != null; + var stopFound = transitEditorService.getRegularStop(stopId) != null; if (!stopFound) { debug(tripId, "Stop '{}' not found in graph. Removing from ADDED trip.", st.getStopId()); } @@ -553,7 +556,7 @@ private List checkNewStopTimeUpdatesAndFindStops( // Find stops if (stopTimeUpdate.hasStopId()) { // Find stop - final var stop = transitService.getRegularStop( + final var stop = transitEditorService.getRegularStop( new FeedScopedId(tripId.getFeedId(), stopTimeUpdate.getStopId()) ); if (stop != null) { @@ -636,7 +639,7 @@ private Result handleAddedTrip( tripBuilder.withRoute(route); // Find service ID running on this service date - final Set serviceIds = transitService + final Set serviceIds = transitEditorService .getCalendarService() .getServiceIdsOnDate(serviceDate); if (serviceIds.isEmpty()) { @@ -664,7 +667,7 @@ private Result handleAddedTrip( private Route getOrCreateRoute(TripDescriptor tripDescriptor, FeedScopedId tripId) { if (routeExists(tripId.getFeedId(), tripDescriptor)) { // Try to find route - return transitService.getRouteForId( + return transitEditorService.getRouteForId( new FeedScopedId(tripId.getFeedId(), tripDescriptor.getRouteId()) ); } @@ -679,7 +682,7 @@ else if ( var addedRouteExtension = AddedRoute.ofTripDescriptor(tripDescriptor); - var agency = transitService + var agency = transitEditorService .findAgencyById(new FeedScopedId(tripId.getFeedId(), addedRouteExtension.agencyId())) .orElseGet(() -> fallbackAgency(tripId.getFeedId())); @@ -695,7 +698,7 @@ else if ( builder.withUrl(addedRouteExtension.routeUrl()); var route = builder.build(); - transitService.addRoutes(route); + transitEditorService.addRoutes(route); return route; } // no information about the rout is given, so we create a dummy one @@ -711,7 +714,7 @@ else if ( I18NString longName = NonLocalizedString.ofNullable(tripDescriptor.getTripId()); builder.withLongName(longName); var route = builder.build(); - transitService.addRoutes(route); + transitEditorService.addRoutes(route); return route; } } @@ -723,14 +726,14 @@ private Agency fallbackAgency(String feedId) { return Agency .of(new FeedScopedId(feedId, "autogenerated-gtfs-rt-added-route")) .withName("Agency automatically added by GTFS-RT update") - .withTimezone(transitService.getTimeZone().toString()) + .withTimezone(transitEditorService.getTimeZone().toString()) .build(); } private boolean routeExists(String feedId, TripDescriptor tripDescriptor) { if (tripDescriptor.hasRouteId() && StringUtils.hasValue(tripDescriptor.getRouteId())) { var routeId = new FeedScopedId(feedId, tripDescriptor.getRouteId()); - return Objects.nonNull(transitService.getRouteForId(routeId)); + return Objects.nonNull(transitEditorService.getRouteForId(routeId)); } else { return false; } @@ -819,7 +822,7 @@ private Result addTripToGraphAndBuffer( // Create StopPattern final StopPattern stopPattern = new StopPattern(stopTimes); - final TripPattern originalTripPattern = transitService.getPatternForTrip(trip); + final TripPattern originalTripPattern = transitEditorService.getPatternForTrip(trip); // Get cached trip pattern or create one if it doesn't exist yet final TripPattern pattern = tripPatternCache.getOrCreateTripPattern( stopPattern, @@ -965,7 +968,7 @@ private Result validateAndHandleModifiedTrip( // // Check whether trip id already exists in graph - Trip trip = transitService.getTripForId(tripId); + Trip trip = transitEditorService.getTripForId(tripId); if (trip == null) { // TODO: should we support this and consider it an ADDED trip? @@ -980,7 +983,7 @@ private Result validateAndHandleModifiedTrip( return UpdateError.result(tripId, NO_START_DATE); } else { // Check whether service date is served by trip - final Set serviceIds = transitService + final Set serviceIds = transitEditorService .getCalendarService() .getServiceIdsOnDate(serviceDate); if (!serviceIds.contains(trip.getServiceId())) { @@ -1081,8 +1084,8 @@ private Result handleCanceledTrip( * @return trip pattern or null if no trip pattern was found */ private TripPattern getPatternForTripId(FeedScopedId tripId) { - Trip trip = transitService.getTripForId(tripId); - return transitService.getPatternForTrip(trip); + Trip trip = transitEditorService.getTripForId(tripId); + return transitEditorService.getPatternForTrip(trip); } private static void debug(FeedScopedId id, String message, Object... params) { @@ -1098,4 +1101,8 @@ private enum CancelationType { CANCEL, DELETE, } + + public void flushBuffer() { + snapshotManager.purgeAndCommit(); + } } diff --git a/src/test/java/org/opentripplanner/GtfsTest.java b/src/test/java/org/opentripplanner/GtfsTest.java index baf12225c9a..074e31147a3 100644 --- a/src/test/java/org/opentripplanner/GtfsTest.java +++ b/src/test/java/org/opentripplanner/GtfsTest.java @@ -233,6 +233,7 @@ protected void setUp() throws Exception { updates, feedId.getId() ); + timetableSnapshotSource.flushBuffer(); alertsUpdateHandler.update(feedMessage); } catch (Exception exception) {} } diff --git a/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java b/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java index 6adce735fb0..bf6f743eac7 100644 --- a/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java +++ b/src/test/java/org/opentripplanner/updater/trip/RealtimeTestEnvironment.java @@ -284,13 +284,15 @@ public UpdateResult applyTripUpdates( UpdateIncrementality incrementality ) { Objects.requireNonNull(gtfsSource, "Test environment is configured for SIRI only"); - return gtfsSource.applyTripUpdates( + UpdateResult updateResult = gtfsSource.applyTripUpdates( null, BackwardsDelayPropagationType.REQUIRED_NO_DATA, incrementality, updates, getFeedId() ); + commitTimetableSnapshot(); + return updateResult; } // private methods @@ -300,7 +302,19 @@ private UpdateResult applyEstimatedTimetable( boolean fuzzyMatching ) { Objects.requireNonNull(siriSource, "Test environment is configured for GTFS-RT only"); - return getEstimatedTimetableHandler(fuzzyMatching).applyUpdate(updates, DIFFERENTIAL); + UpdateResult updateResult = getEstimatedTimetableHandler(fuzzyMatching) + .applyUpdate(updates, DIFFERENTIAL); + commitTimetableSnapshot(); + return updateResult; + } + + private void commitTimetableSnapshot() { + if (siriSource != null) { + siriSource.flushBuffer(); + } + if (gtfsSource != null) { + gtfsSource.flushBuffer(); + } } private Trip createTrip(String id, Route route, List stops) { diff --git a/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotManagerTest.java b/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotManagerTest.java index 8f5c278b586..384e8a41fd1 100644 --- a/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotManagerTest.java +++ b/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotManagerTest.java @@ -63,19 +63,16 @@ SameAssert not() { static Stream purgeExpiredDataTestCases() { return Stream.of( - // purgeExpiredData maxSnapshotFrequency || snapshots PatternSnapshotA PatternSnapshotB - Arguments.of(Boolean.TRUE, -1, NotSame, NotSame), - Arguments.of(Boolean.FALSE, -1, NotSame, Same), - Arguments.of(Boolean.TRUE, 1000, NotSame, NotSame), - Arguments.of(Boolean.FALSE, 1000, Same, Same) + // purgeExpiredData || snapshots PatternSnapshotA PatternSnapshotB + Arguments.of(Boolean.TRUE, NotSame, NotSame), + Arguments.of(Boolean.FALSE, NotSame, Same) ); } - @ParameterizedTest(name = "purgeExpired: {0}, maxFrequency: {1} || {2} {3}") + @ParameterizedTest(name = "purgeExpired: {0} || {1} {2}") @MethodSource("purgeExpiredDataTestCases") public void testPurgeExpiredData( boolean purgeExpiredData, - int maxSnapshotFrequency, SameAssert expSnapshots, SameAssert expPatternAeqB ) { @@ -85,9 +82,7 @@ public void testPurgeExpiredData( var snapshotManager = new TimetableSnapshotManager( null, - TimetableSnapshotSourceParameters.DEFAULT - .withPurgeExpiredData(purgeExpiredData) - .withMaxSnapshotFrequency(Duration.ofMillis(maxSnapshotFrequency)), + TimetableSnapshotSourceParameters.DEFAULT.withPurgeExpiredData(purgeExpiredData), clock::get ); diff --git a/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotSourceTest.java b/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotSourceTest.java index 84b53c95f9c..e53ba07cbcf 100644 --- a/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotSourceTest.java +++ b/src/test/java/org/opentripplanner/updater/trip/TimetableSnapshotSourceTest.java @@ -82,28 +82,6 @@ public void testGetSnapshot() { assertSame(snapshot, updater.getTimetableSnapshot()); } - @Test - public void testGetSnapshotWithMaxSnapshotFrequencyCleared() { - var updater = new TimetableSnapshotSource( - TimetableSnapshotSourceParameters.DEFAULT.withMaxSnapshotFrequency(Duration.ofMillis(-1)), - transitModel - ); - - final TimetableSnapshot snapshot = updater.getTimetableSnapshot(); - - updater.applyTripUpdates( - TRIP_MATCHER_NOOP, - REQUIRED_NO_DATA, - DIFFERENTIAL, - List.of(CANCELLATION), - feedId - ); - - final TimetableSnapshot newSnapshot = updater.getTimetableSnapshot(); - assertNotNull(newSnapshot); - assertNotSame(snapshot, newSnapshot); - } - @Test public void testHandleModifiedTrip() { // GIVEN @@ -221,6 +199,7 @@ public void testHandleModifiedTrip() { List.of(tripUpdate), feedId ); + updater.flushBuffer(); // THEN final TimetableSnapshot snapshot = updater.getTimetableSnapshot();