From 2d7468bee1182f70b5cd45fef2f123223a480aa1 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Mon, 23 Oct 2017 16:39:49 +0200 Subject: [PATCH 1/2] ARUHA-1203 Add delay (by default 2 seconds) between deletion of topics while timelines switch --- .../service/job/TimelineCleanupJob.java | 32 +++++++++++++++---- src/main/resources/application.yml | 4 ++- .../service/job/TimelineCleaningJobTest.java | 2 +- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java b/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java index 4985a2b022..44444b0ea5 100644 --- a/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/TimelineCleanupJob.java @@ -15,6 +15,8 @@ import org.zalando.nakadi.repository.db.TimelineDbRepository; import org.zalando.nakadi.service.timeline.TimelineService; +import java.util.List; + @Service public class TimelineCleanupJob { @@ -26,17 +28,20 @@ public class TimelineCleanupJob { private final TimelineDbRepository timelineDbRepository; private final TimelineService timelineService; private final ExclusiveJobWrapper jobWrapper; + private final long deletionDelayMs; @Autowired public TimelineCleanupJob(final EventTypeCache eventTypeCache, final TimelineDbRepository timelineDbRepository, final TimelineService timelineService, final JobWrapperFactory jobWrapperFactory, - @Value("${nakadi.jobs.timelineCleanup.runPeriodMs}") final int periodMs) { + @Value("${nakadi.jobs.timelineCleanup.runPeriodMs}") final int periodMs, + @Value("${nakadi.jobs.timelineCleanup.deletionDelayMs}") final long deletionDelayMs) { this.eventTypeCache = eventTypeCache; this.timelineDbRepository = timelineDbRepository; this.timelineService = timelineService; this.jobWrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, periodMs); + this.deletionDelayMs = deletionDelayMs; } @Scheduled( @@ -44,17 +49,30 @@ public TimelineCleanupJob(final EventTypeCache eventTypeCache, initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}") public void cleanupTimelines() { try { - jobWrapper.runJobLocked(() -> - timelineDbRepository.getExpiredTimelines().stream() - .forEach(timeline -> { - deleteTimelineTopic(timeline); - markTimelineDeleted(timeline); - })); + jobWrapper.runJobLocked(this::deleteTimelinesLocked); } catch (final RepositoryProblemException e) { LOG.error("DB error occurred when trying to get expired timelines", e); } } + private void deleteTimelinesLocked() { + final List expired = timelineDbRepository.getExpiredTimelines(); + for (int i = 0; i < expired.size(); ++i) { + if (i != 0 && deletionDelayMs > 0) { + try { + Thread.sleep(deletionDelayMs); + } catch (InterruptedException e) { + LOG.warn("Timeline deletion thread was interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + final Timeline timeline = expired.get(i); + deleteTimelineTopic(timeline); + markTimelineDeleted(timeline); + } + } + private void deleteTimelineTopic(final Timeline timeline) { try { final TopicRepository topicRepository = timelineService.getTopicRepository(timeline); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 795a28b585..fd60d30ef5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -88,7 +88,9 @@ nakadi: maxPartitions: 100 jobs: checkRunMs: 600000 # 10 min - timelineCleanup.runPeriodMs: 3600000 # 1 hour + timelineCleanup: + runPeriodMs: 3600000 # 1 hour + deletionDelayMs: 2000 # 2 seconds, to be on the safe side consumerNodesCleanup.runPeriodMs: 21600000 # 6 hours twintip: diff --git a/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java b/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java index 9e880d8c69..e2b14bc402 100644 --- a/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java +++ b/src/test/java/org/zalando/nakadi/service/job/TimelineCleaningJobTest.java @@ -43,7 +43,7 @@ public TimelineCleaningJobTest() { when(jobWrapperFactory.createExclusiveJobWrapper(any(), anyLong())).thenReturn(jobWrapper); timelineCleanupJob = new TimelineCleanupJob(eventTypeCache, timelineDbRepository, timelineService, - jobWrapperFactory, 0); + jobWrapperFactory, 0, 0L); } @Test From 427fe472d6b8a93047cc7128d1de7c8b1ecfc26c Mon Sep 17 00:00:00 2001 From: dsorokin Date: Mon, 23 Oct 2017 17:22:29 +0200 Subject: [PATCH 2/2] ARUHA-1203 Added changelog record --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35279c8b1f..ccc7e6c99b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - Committing with empty X-Nakadi-StreamId causes 503 +- Massive topic deletion while switching timelines is now made with small interval between deletions ## [2.2.2] - 2017-09-28