diff --git a/CHANGELOG.md b/CHANGELOG.md index 58a00e7bac..b070dd91da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Committing with empty X-Nakadi-StreamId causes 503 +- Massive topic deletion while switching timelines is now made with small interval between deletions ### Added - Create event type with SAFE rack-awareness diff --git a/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java b/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java index 4985a2b022..44444b0ea5 100644 --- a/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java @@ -15,6 +15,8 @@ import org.zalando.nakadi.repository.db.TimelineDbRepository; import org.zalando.nakadi.service.timeline.TimelineService; +import java.util.List; + @Service public class TimelineCleanupJob { @@ -26,17 +28,20 @@ public class TimelineCleanupJob { private final TimelineDbRepository timelineDbRepository; private final TimelineService timelineService; private final ExclusiveJobWrapper jobWrapper; + private final long deletionDelayMs; @Autowired public TimelineCleanupJob(final EventTypeCache eventTypeCache, final TimelineDbRepository timelineDbRepository, final TimelineService timelineService, final JobWrapperFactory jobWrapperFactory, - @Value("${nakadi.jobs.timelineCleanup.runPeriodMs}") final int periodMs) { + @Value("${nakadi.jobs.timelineCleanup.runPeriodMs}") final int periodMs, + @Value("${nakadi.jobs.timelineCleanup.deletionDelayMs}") final long deletionDelayMs) { this.eventTypeCache = eventTypeCache; this.timelineDbRepository = timelineDbRepository; this.timelineService = timelineService; this.jobWrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, periodMs); + this.deletionDelayMs = deletionDelayMs; } @Scheduled( @@ -44,17 +49,30 @@ public TimelineCleanupJob(final EventTypeCache eventTypeCache, initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}") public void cleanupTimelines() { try { - jobWrapper.runJobLocked(() -> - timelineDbRepository.getExpiredTimelines().stream() - .forEach(timeline -> { - deleteTimelineTopic(timeline); - markTimelineDeleted(timeline); - })); + jobWrapper.runJobLocked(this::deleteTimelinesLocked); } catch (final RepositoryProblemException e) { LOG.error("DB error occurred when trying to get expired timelines", e); } } + private void deleteTimelinesLocked() { + final List expired = timelineDbRepository.getExpiredTimelines(); + for (int i = 0; i < expired.size(); ++i) { + if (i != 0 && deletionDelayMs > 0) { + try { + Thread.sleep(deletionDelayMs); + } catch (InterruptedException e) { + LOG.warn("Timeline deletion thread was interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + final Timeline timeline = expired.get(i); + deleteTimelineTopic(timeline); + markTimelineDeleted(timeline); + } + } + private void deleteTimelineTopic(final Timeline timeline) { try { final TopicRepository topicRepository = timelineService.getTopicRepository(timeline); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 795a28b585..fd60d30ef5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -88,7 +88,9 @@ nakadi: maxPartitions: 100 jobs: checkRunMs: 600000 # 10 min - timelineCleanup.runPeriodMs: 3600000 # 1 hour + timelineCleanup: + runPeriodMs: 3600000 # 1 hour + deletionDelayMs: 2000 # 2 seconds, to be on the safe side consumerNodesCleanup.runPeriodMs: 21600000 # 6 hours twintip: diff --git a/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java b/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java index 9e880d8c69..e2b14bc402 100644 --- a/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java +++ b/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java @@ -43,7 +43,7 @@ public TimelineCleaningJobTest() { when(jobWrapperFactory.createExclusiveJobWrapper(any(), anyLong())).thenReturn(jobWrapper); timelineCleanupJob = new TimelineCleanupJob(eventTypeCache, timelineDbRepository, timelineService, - jobWrapperFactory, 0); + jobWrapperFactory, 0, 0L); } @Test