diff --git a/endpoints-control/src/main/java/com/google/api/control/Client.java b/endpoints-control/src/main/java/com/google/api/control/Client.java index 328708e..d53703d 100644 --- a/endpoints-control/src/main/java/com/google/api/control/Client.java +++ b/endpoints-control/src/main/java/com/google/api/control/Client.java @@ -91,7 +91,7 @@ public Client(String serviceName, CheckAggregationOptions checkOptions, ServiceControl transport, ThreadFactory threads, SchedulerFactory schedulers, int statsLogFrequency, @Nullable Ticker ticker) { ticker = ticker == null ? Ticker.systemTicker() : ticker; - this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, null, ticker); + this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, ticker); this.reportAggregator = new ReportRequestAggregator(serviceName, reportOptions, null, ticker); this.quotaAggregator = new QuotaRequestAggregator(serviceName, quotaOptions, ticker); this.serviceName = serviceName; @@ -134,6 +134,7 @@ public void run() { scheduleFlushes(); } }); + // Note: this is not supported on App Engine Standard. schedulerThread.start(); } catch (RuntimeException e) { log.atInfo().log(BACKGROUND_THREAD_ERROR); @@ -305,7 +306,6 @@ private synchronized void initializeFlushing() { this.scheduler = schedulers.create(ticker); this.scheduler.setStatistics(statistics); log.atInfo().log("scheduling the initial check, report, and quota"); - flushAndScheduleChecks(); flushAndScheduleReports(); flushAndScheduleQuota(); } @@ -323,51 +323,6 @@ private synchronized boolean resetIfStopped() { return true; } - private void flushAndScheduleChecks() { - if (resetIfStopped()) { - log.atFine().log("did not schedule check flush: client is stopped"); - return; - } - int interval = checkAggregator.getFlushIntervalMillis(); - if (interval < 0) { - log.atFine().log("did not schedule check flush: caching is disabled"); - return; // cache is disabled, so no flushing it - } - - if (isRunningSchedulerDirectly()) { - log.atFine().log("did not schedule check flush: no scheduler thread is running"); - return; - } - - log.atFine().log("flushing the check aggregator"); - Stopwatch w = Stopwatch.createUnstarted(ticker); - for (CheckRequest req : checkAggregator.flush()) { - try { - statistics.recachedChecks.incrementAndGet(); - w.reset().start(); - CheckResponse resp = transport.services().check(serviceName, req).execute(); - statistics.totalCheckTransportTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS)); - w.reset().start(); - checkAggregator.addResponse(req, resp); - statistics.totalCheckCacheUpdateTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS)); - } catch (IOException e) { - log.atSevere().withCause(e).log("direct send of a check request %s failed", req); - } - } - // copy scheduler into a local variable to avoid data races beween this method and stop() - Scheduler currentScheduler = scheduler; - if (resetIfStopped()) { - log.atFine().log("did not schedule succeeding check flush: client is stopped"); - return; - } - currentScheduler.enter(new Runnable() { - @Override - public void run() { - flushAndScheduleChecks(); // Do this again after the interval - } - }, interval, 0 /* high priority */); - } - private void flushAndScheduleReports() { if (resetIfStopped()) { log.atFine().log("did not schedule report flush: client is stopped"); diff --git a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java index 6e929fe..fab34e4 100644 --- a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java +++ b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckAggregationOptions.java @@ -20,10 +20,7 @@ import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -42,13 +39,7 @@ public class CheckAggregationOptions { */ public static final int DEFAULT_RESPONSE_EXPIRATION_MILLIS = 4000; - /** - * The default flush cache entry interval. - */ - public static final int DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS = 2000; - private final int numEntries; - private final int flushCacheEntryIntervalMillis; private final int expirationMillis; /** @@ -58,21 +49,13 @@ public class CheckAggregationOptions { * is the maximum number of cache entries that can be kept in the * aggregation cache. The cache is disabled if this value is * negative. - * @param flushCacheEntryIntervalMillis - * the maximum interval before an aggregated check request is - * flushed to the server. The cache entry is deleted after the - * flush * @param expirationMillis * is the maximum interval in milliseconds before a cached check - * response is invalidated. This value should be greater than - * {@code flushCacheEntryIntervalMillis}. If not, it is ignored, - * and a value of {@code flushCacheEntryIntervalMillis} is used - * instead. + * response is invalidated. */ - public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis, int expirationMillis) { + public CheckAggregationOptions(int numEntries, int expirationMillis) { this.numEntries = numEntries; - this.flushCacheEntryIntervalMillis = flushCacheEntryIntervalMillis; - this.expirationMillis = Math.max(expirationMillis, flushCacheEntryIntervalMillis + 1); + this.expirationMillis = expirationMillis; } /** @@ -81,7 +64,7 @@ public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis * Creates an instance initialized with the default values. */ public CheckAggregationOptions() { - this(DEFAULT_NUM_ENTRIES, DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, DEFAULT_RESPONSE_EXPIRATION_MILLIS); + this(DEFAULT_NUM_ENTRIES, DEFAULT_RESPONSE_EXPIRATION_MILLIS); } /** @@ -92,18 +75,9 @@ public int getNumEntries() { return numEntries; } - /** - * @return the maximum interval before aggregated report requests are - * flushed to the server - */ - public int getFlushCacheEntryIntervalMillis() { - return flushCacheEntryIntervalMillis; - } - /** * @return the maximum interval before a cached check response should be - * deleted. This value will not be greater than - * {@link #getFlushCacheEntryIntervalMillis()} + * deleted. */ public int getExpirationMillis() { return expirationMillis; @@ -115,45 +89,29 @@ public int getExpirationMillis() { * @param * the type of the instance being cached * - * @param out - * a concurrent {@code Deque} to which previously cached items - * are added as they expire * @return a {@link Cache} corresponding to this instance's values or * {@code null} unless {@link #numEntries} is positive. */ @Nullable - public Cache createCache(ConcurrentLinkedDeque out) { - return createCache(out, Ticker.systemTicker()); + public Cache createCache() { + return createCache(Ticker.systemTicker()); } /** * Creates a {@link Cache} configured by this instance. * - * @param - * the type of the value stored in the Cache - * @param out - * a concurrent {@code Deque} to which the cached values are - * added as they are removed from the cache - * @param ticker - * the time source used to determine expiration + * @param the type of the value stored in the Cache + * @param ticker the time source used to determine expiration * @return a {@link Cache} corresponding to this instance's values or - * {@code null} unless {@code #numEntries} is positive. + * {@code null} unless {@code #numEntries} is positive. */ @Nullable - public Cache createCache(final ConcurrentLinkedDeque out, Ticker ticker) { - Preconditions.checkNotNull(out, "The out deque cannot be null"); + public Cache createCache(Ticker ticker) { Preconditions.checkNotNull(ticker, "The ticker cannot be null"); if (numEntries <= 0) { return null; } - final RemovalListener listener = new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - out.addFirst(notification.getValue()); - } - }; - CacheBuilder b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker) - .removalListener(listener); + CacheBuilder b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker); if (expirationMillis >= 0) { b.expireAfterWrite(expirationMillis, TimeUnit.MILLISECONDS); } diff --git a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java index be6cf81..a02354a 100644 --- a/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java +++ b/endpoints-control/src/main/java/com/google/api/control/aggregator/CheckRequestAggregator.java @@ -16,7 +16,6 @@ package com.google.api.control.aggregator; -import com.google.api.MetricDescriptor.MetricKind; import com.google.api.servicecontrol.v1.CheckRequest; import com.google.api.servicecontrol.v1.CheckResponse; import com.google.api.servicecontrol.v1.MetricValue; @@ -27,35 +26,25 @@ import com.google.common.base.Strings; import com.google.common.base.Ticker; import com.google.common.cache.Cache; -import com.google.common.collect.Lists; -import com.google.common.flogger.FluentLogger; import com.google.common.hash.HashCode; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedDeque; + import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; /** - * Caches and aggregates {@link CheckRequest}s. + * Caches {@link CheckRequest}s. */ public class CheckRequestAggregator { /** - * The flush interval returned by {@link #getFlushIntervalMillis() } when an instance is + * The flush interval returned by {@link #getExpirationMillis() } when an instance is * configured to be non-caching. */ public static final int NON_CACHING = -1; - private static final int NANOS_PER_MILLI = 1000000; - private static final CheckRequest[] NO_REQUESTS = new CheckRequest[] {}; - private static final FluentLogger log = FluentLogger.forEnclosingClass(); - private final String serviceName; private final CheckAggregationOptions options; - private final Map kinds; - private final ConcurrentLinkedDeque out; private final Cache cache; private final Ticker ticker; @@ -64,33 +53,18 @@ public class CheckRequestAggregator { * * @param serviceName the service whose {@code CheckRequest}s are being aggregated * @param options configures this instance's caching behavior - * @param kinds specifies the {@link MetricKind} for specific metric names * @param ticker the time source used to determine expiration. When not specified, this defaults * to {@link Ticker#systemTicker()} */ public CheckRequestAggregator(String serviceName, CheckAggregationOptions options, - @Nullable Map kinds, @Nullable Ticker ticker) { + @Nullable Ticker ticker) { Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceName), "service name cannot be empty"); Preconditions.checkNotNull(options, "options must be non-null"); - this.out = new ConcurrentLinkedDeque(); this.ticker = ticker == null ? Ticker.systemTicker() : ticker; - this.cache = options.createCache(out, this.ticker); + this.cache = options.createCache(this.ticker); this.serviceName = serviceName; this.options = options; - this.kinds = kinds; - } - - /** - * Constructor. - * - * @param serviceName the service whose {@code CheckRequest}s are being aggregated - * @param options configures this instances caching behavior - * @param kinds specifies the {@link MetricKind} for specific metric names - */ - public CheckRequestAggregator(String serviceName, CheckAggregationOptions options, - @Nullable Map kinds) { - this(serviceName, options, kinds, Ticker.systemTicker()); } /** @@ -100,13 +74,13 @@ public CheckRequestAggregator(String serviceName, CheckAggregationOptions option * @param options configures this instances caching behavior */ public CheckRequestAggregator(String serviceName, CheckAggregationOptions options) { - this(serviceName, options, null); + this(serviceName, options, Ticker.systemTicker()); } /** - * @return the interval in milliseconds between calls to {@link #flush} + * @return See {@link CheckAggregationOptions#getExpirationMillis()}. */ - public int getFlushIntervalMillis() { + public int getExpirationMillis() { if (cache == null) { return NON_CACHING; } else { @@ -132,36 +106,6 @@ public void clear() { } synchronized (cache) { cache.invalidateAll(); - out.clear(); - } - } - - /** - * Flushes this instance's cache. - * - * The instance's driver should call the this method every {@link #getFlushIntervalMillis()} - * milliseconds, and send the results to the check service. - * - * @return CheckRequest[] containing the CheckRequests that were pending - */ - public CheckRequest[] flush() { - if (cache == null) { - return NO_REQUESTS; - } - - // Thread safety - the current thread cleans up the cache, which may add multiple cached - // aggregated operations to the output deque. - synchronized (cache) { - cache.cleanUp(); - ArrayList reqs = Lists.newArrayList(); - for (CachedItem item : out) { - CheckRequest req = item.extractRequest(); - if (req != null) { - reqs.add(req); - } - } - out.clear(); - return reqs.toArray(new CheckRequest[reqs.size()]); } } @@ -181,7 +125,7 @@ public void addResponse(CheckRequest req, CheckResponse resp) { synchronized (cache) { CachedItem item = cache.getIfPresent(signature); if (item == null) { - cache.put(signature, new CachedItem(resp, req, kinds, now, quotaScale)); + cache.put(signature, new CachedItem(resp, now, quotaScale)); } else { item.lastCheckTimestamp = now; item.response = resp; @@ -230,37 +174,7 @@ public void addResponse(CheckRequest req, CheckResponse resp) { if (item == null) { return null; // signal caller to send the response } else { - return handleCachedResponse(req, item); - } - } - - private boolean isCurrent(CachedItem item) { - long age = ticker.read() - item.lastCheckTimestamp; - return age < (options.getFlushCacheEntryIntervalMillis() * NANOS_PER_MILLI); - } - - private CheckResponse handleCachedResponse(CheckRequest req, CachedItem item) { - if (item.response.getCheckErrorsCount() > 0) { - if (isCurrent(item)) { - return item.response; - } - - // Not current - item.lastCheckTimestamp = ticker.read(); - return null; // signal the caller to make a new check request - } else { - if (isCurrent(item)) { - return item.response; - } - item.updateRequest(req, kinds); - if (item.isFlushing) { - log.atWarning().log("latest check request has not completed"); - } - - // Not current - item.isFlushing = true; - item.lastCheckTimestamp = ticker.read(); - return null; // signal the caller to make a new check request + return item.response; } } @@ -302,42 +216,18 @@ private static class CachedItem { long lastCheckTimestamp; int quotaScale; CheckResponse response; - private final String serviceName; - - private OperationAggregator aggregator; /** * @param response the cached {@code CheckResponse} - * @param request caused {@code response} - * @param kinds the kinds of metrics * @param lastCheckTimestamp the last time the {@code CheckRequest} for tracked by this item was * checked * @param quotaScale WIP, used to track quota */ - CachedItem(CheckResponse response, CheckRequest request, Map kinds, - long lastCheckTimestamp, int quotaScale) { + CachedItem(CheckResponse response, long lastCheckTimestamp, int quotaScale) { this.response = response; - this.serviceName = request.getServiceName(); this.lastCheckTimestamp = lastCheckTimestamp; this.quotaScale = quotaScale; - this.aggregator = new OperationAggregator(request.getOperation(), kinds); - } - - public synchronized void updateRequest(CheckRequest req, Map kinds) { - if (this.aggregator == null) { - this.aggregator = new OperationAggregator(req.getOperation(), kinds); - } else { - aggregator.add(req.getOperation()); - } } - public synchronized CheckRequest extractRequest() { - if (this.aggregator == null) { - return null; - } - Operation op = this.aggregator.asOperation(); - this.aggregator = null; - return CheckRequest.newBuilder().setServiceName(this.serviceName).setOperation(op).build(); - } } } diff --git a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java index b5359eb..c5f9e38 100644 --- a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java +++ b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckAggregationOptionsTest.java @@ -40,21 +40,10 @@ public class CheckAggregationOptionsTest { public void defaultConstructorShouldSpecifyTheDefaultValues() { CheckAggregationOptions options = new CheckAggregationOptions(); assertEquals(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, options.getNumEntries()); - assertEquals(CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - options.getFlushCacheEntryIntervalMillis()); assertEquals(CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS, options.getExpirationMillis()); } - @Test - public void constructorShouldIgnoreLowExpirationMillis() { - CheckAggregationOptions options = - new CheckAggregationOptions(-1, 1, 0 /* this is low and will be ignored */); - assertEquals(-1, options.getNumEntries()); - assertEquals(1, options.getFlushCacheEntryIntervalMillis()); - assertEquals(2 /* cache interval + 1 */, options.getExpirationMillis()); - } - @Test public void shouldFailToCreateCacheWithANullOutputDeque() { try { @@ -70,7 +59,7 @@ public void shouldFailToCreateCacheWithANullOutputDeque() { public void shouldFailToCreateACacheWithANullTicker() { try { CheckAggregationOptions options = new CheckAggregationOptions(); - options.createCache(testDeque(), null); + options.createCache(null); fail("should have raised NullPointerException"); } catch (NullPointerException e) { // expected @@ -81,12 +70,11 @@ public void shouldFailToCreateACacheWithANullTicker() { public void shouldNotCreateACacheUnlessMaxSizeIsPositive() { for (int i : new int[] {-1, 0, 1}) { CheckAggregationOptions options = new CheckAggregationOptions(i, - CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); + CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); if (i > 0) { - assertNotNull(options.createCache(testDeque())); + assertNotNull(options.createCache()); } else { - assertNull(options.createCache(testDeque())); + assertNull(options.createCache()); } } } @@ -95,45 +83,23 @@ public void shouldNotCreateACacheUnlessMaxSizeIsPositive() { public void shouldCreateACacheEvenIfExpirationIsNotPositive() { for (int i : new int[] {-1, 0, 1}) { CheckAggregationOptions options = - new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, i - 1, i); - assertNotNull(options.createCache(testDeque())); + new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, i); + assertNotNull(options.createCache()); } } @Test - public void shouldCreateACacheThatFlushesToTheOutputDeque() { - CheckAggregationOptions options = new CheckAggregationOptions(1, - CheckAggregationOptions.DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, - CheckAggregationOptions.DEFAULT_RESPONSE_EXPIRATION_MILLIS); - - ConcurrentLinkedDeque deque = testDeque(); - Cache cache = options.createCache(deque); - cache.put("one", 1L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 0); - cache.put("two", 2L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 1); - cache.put("three", 3L); - assertEquals(cache.size(), 1); - assertEquals(deque.size(), 2); - } - - @Test - public void shouldCreateACacheThatFlushesToTheOutputDequeAfterExpiration() { + public void shouldCreateACacheThatDeletesEntryAfterExpiration() { CheckAggregationOptions options = - new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, 0, 1); + new CheckAggregationOptions(CheckAggregationOptions.DEFAULT_NUM_ENTRIES, 1); - ConcurrentLinkedDeque deque = testDeque(); FakeTicker ticker = new FakeTicker(); - Cache cache = options.createCache(deque, ticker); + Cache cache = options.createCache(ticker); cache.put("one", 1L); assertEquals(1, cache.size()); - assertEquals(0, deque.size()); ticker.tick(1 /* expires the entry */, TimeUnit.MILLISECONDS); cache.cleanUp(); assertEquals(0, cache.size()); - assertEquals(1, deque.size()); } private static ConcurrentLinkedDeque testDeque() { diff --git a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java index df87529..b2cfeac 100644 --- a/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java +++ b/endpoints-control/src/test/java/com/google/api/control/aggregator/CheckRequestAggregatorTest.java @@ -53,7 +53,7 @@ public class CheckRequestAggregatorTest { private static final int TEST_EXPIRATION = TEST_FLUSH_INTERVAL + 1; private static final Timestamp EARLY = Timestamp.newBuilder().setNanos(1).setSeconds(100).build(); private CheckRequestAggregator NO_CACHE = new CheckRequestAggregator(NO_CACHE_NAME, - new CheckAggregationOptions(-1 /* disables cache */, 2, 1)); + new CheckAggregationOptions(-1 /* disables cache */, 1)); private CheckRequestAggregator DEFAULT = new CheckRequestAggregator(DEFAULT_NAME, new CheckAggregationOptions()); private FakeTicker ticker; @@ -102,20 +102,14 @@ public void signShouldChangeAsImportFieldsChange() { assertNotEquals(withMetrics, withLabels); } - @Test - public void whenNonCachingShouldHaveEmptyFlush() { - assertEquals(0, NO_CACHE.flush().length); - } - @Test public void whenNonCachingShouldHaveWellKnownFlushInterval() { - assertEquals(CheckRequestAggregator.NON_CACHING, NO_CACHE.getFlushIntervalMillis()); + assertEquals(CheckRequestAggregator.NON_CACHING, NO_CACHE.getExpirationMillis()); } @Test public void whenNonCachingShouldNotCacheResponse() { CheckRequest req = newTestRequest("service.no_cache"); - assertEquals(0, NO_CACHE.flush().length); assertEquals(null, NO_CACHE.check(req)); CheckResponse fakeResponse = fakeResponse(); @@ -152,9 +146,9 @@ public void whenCachingShouldReturnNullInitiallyAsRequestIsNotCached() { } @Test - public void whenCachingShouldHaveExpirationAsFlushInterval() { + public void whenCachingShouldHaveExpiration() { CheckRequestAggregator agg = newCachingInstance(); - assertEquals(TEST_EXPIRATION, agg.getFlushIntervalMillis()); + assertEquals(TEST_EXPIRATION, agg.getExpirationMillis()); } @Test @@ -255,26 +249,23 @@ public void shouldExtendExpirationOnReceiptOfAResponse() { assertEquals(fakeResponse, agg.check(req)); // not expired yet } - public void shouldNotFlushRequestThatHaveNotBeenUpdated() { + @Test + public void shouldExpireRequestThatHasNotBeenUpdated() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); CheckResponse fakeResponse = fakeResponse(); assertEquals(null, agg.check(req)); agg.addResponse(req, fakeResponse); assertEquals(fakeResponse, agg.check(req)); - ticker.tick(1, TimeUnit.MILLISECONDS); - - // now past the flush interval, nothing to expire - assertEquals(0, agg.flush().length); + ticker.tick(TEST_EXPIRATION, TimeUnit.MILLISECONDS); - // now expired, confirm nothing in the cache, and nothing flushed - ticker.tick(1, TimeUnit.MILLISECONDS); + // now expired, confirm nothing in the cache assertEquals(null, agg.check(req)); assertEquals(null, agg.check(req)); - assertEquals(0, agg.flush().length); } - public void shouldFlushRequestsThatHaveBeenUpdated() { + @Test + public void shouldNotExpireRequestThatHasBeenUpdated() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); CheckResponse fakeResponse = fakeResponse(); @@ -283,18 +274,19 @@ public void shouldFlushRequestsThatHaveBeenUpdated() { assertEquals(fakeResponse, agg.check(req)); ticker.tick(1, TimeUnit.MILLISECONDS); - // now past the flush interval, nothing to expire - assertEquals(0, agg.flush().length); + // update request + agg.addResponse(req, fakeResponse); - // now expired, flush without checking the cache gives - // the cached request + // would be expired if not updated ticker.tick(1, TimeUnit.MILLISECONDS); - assertEquals(1, agg.flush().length); + assertEquals(fakeResponse, agg.check(req)); - // flushing again immediately should result in 0 entries - assertEquals(0, agg.flush().length); + // now expired, confirm nothing in the cache + ticker.tick(TEST_EXPIRATION, TimeUnit.MILLISECONDS); + assertEquals(null, agg.check(req)); } + @Test public void shouldClearRequests() { CheckRequest req = newTestRequest(CACHING_NAME); CheckRequestAggregator agg = newCachingInstance(); @@ -304,12 +296,11 @@ public void shouldClearRequests() { assertEquals(fakeResponse, agg.check(req)); agg.clear(); assertEquals(null, agg.check(req)); - assertEquals(0, agg.flush().length); } private CheckRequestAggregator newCachingInstance() { return new CheckRequestAggregator(CACHING_NAME, - new CheckAggregationOptions(1, TEST_FLUSH_INTERVAL, TEST_EXPIRATION), null, ticker); + new CheckAggregationOptions(1, TEST_EXPIRATION), ticker); } private static CheckResponse fakeResponse() {