Skip to content

Commit

Permalink
CheckRequestAggregator: remove unused, leaking, out queue.
Browse files Browse the repository at this point in the history
Check requests are already sent when doing a request. Even if it would
work on App Engine (no background threads allowed), there is no point in
sending check requests again at some later point.

Check requests are still cached (and expired).
  • Loading branch information
UweTrottmann committed Aug 24, 2023
1 parent ff4fe8a commit 907cf7e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 294 deletions.
50 changes: 3 additions & 47 deletions endpoints-control/src/main/java/com/google/api/control/Client.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
* Copyright 2023 Uwe Trottmann
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +92,7 @@ public Client(String serviceName, CheckAggregationOptions checkOptions,
ServiceControl transport, ThreadFactory threads,
SchedulerFactory schedulers, int statsLogFrequency, @Nullable Ticker ticker) {
ticker = ticker == null ? Ticker.systemTicker() : ticker;
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, null, ticker);
this.checkAggregator = new CheckRequestAggregator(serviceName, checkOptions, ticker);
this.reportAggregator = new ReportRequestAggregator(serviceName, reportOptions, null, ticker);
this.quotaAggregator = new QuotaRequestAggregator(serviceName, quotaOptions, ticker);
this.serviceName = serviceName;
Expand Down Expand Up @@ -134,6 +135,7 @@ public void run() {
scheduleFlushes();
}
});
// Note: this is not supported on App Engine Standard.
schedulerThread.start();
} catch (RuntimeException e) {
log.atInfo().log(BACKGROUND_THREAD_ERROR);
Expand Down Expand Up @@ -305,7 +307,6 @@ private synchronized void initializeFlushing() {
this.scheduler = schedulers.create(ticker);
this.scheduler.setStatistics(statistics);
log.atInfo().log("scheduling the initial check, report, and quota");
flushAndScheduleChecks();
flushAndScheduleReports();
flushAndScheduleQuota();
}
Expand All @@ -323,51 +324,6 @@ private synchronized boolean resetIfStopped() {
return true;
}

private void flushAndScheduleChecks() {
if (resetIfStopped()) {
log.atFine().log("did not schedule check flush: client is stopped");
return;
}
int interval = checkAggregator.getFlushIntervalMillis();
if (interval < 0) {
log.atFine().log("did not schedule check flush: caching is disabled");
return; // cache is disabled, so no flushing it
}

if (isRunningSchedulerDirectly()) {
log.atFine().log("did not schedule check flush: no scheduler thread is running");
return;
}

log.atFine().log("flushing the check aggregator");
Stopwatch w = Stopwatch.createUnstarted(ticker);
for (CheckRequest req : checkAggregator.flush()) {
try {
statistics.recachedChecks.incrementAndGet();
w.reset().start();
CheckResponse resp = transport.services().check(serviceName, req).execute();
statistics.totalCheckTransportTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
w.reset().start();
checkAggregator.addResponse(req, resp);
statistics.totalCheckCacheUpdateTimeMillis.addAndGet(w.elapsed(TimeUnit.MILLISECONDS));
} catch (IOException e) {
log.atSevere().withCause(e).log("direct send of a check request %s failed", req);
}
}
// copy scheduler into a local variable to avoid data races beween this method and stop()
Scheduler currentScheduler = scheduler;
if (resetIfStopped()) {
log.atFine().log("did not schedule succeeding check flush: client is stopped");
return;
}
currentScheduler.enter(new Runnable() {
@Override
public void run() {
flushAndScheduleChecks(); // Do this again after the interval
}
}, interval, 0 /* high priority */);
}

private void flushAndScheduleReports() {
if (resetIfStopped()) {
log.atFine().log("did not schedule report flush: client is stopped");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
* Copyright 2023 Uwe Trottmann
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,10 +21,7 @@
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand All @@ -42,13 +40,7 @@ public class CheckAggregationOptions {
*/
public static final int DEFAULT_RESPONSE_EXPIRATION_MILLIS = 4000;

/**
* The default flush cache entry interval.
*/
public static final int DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS = 2000;

private final int numEntries;
private final int flushCacheEntryIntervalMillis;
private final int expirationMillis;

/**
Expand All @@ -58,21 +50,13 @@ public class CheckAggregationOptions {
* is the maximum number of cache entries that can be kept in the
* aggregation cache. The cache is disabled if this value is
* negative.
* @param flushCacheEntryIntervalMillis
* the maximum interval before an aggregated check request is
* flushed to the server. The cache entry is deleted after the
* flush
* @param expirationMillis
* is the maximum interval in milliseconds before a cached check
* response is invalidated. This value should be greater than
* {@code flushCacheEntryIntervalMillis}. If not, it is ignored,
* and a value of {@code flushCacheEntryIntervalMillis} is used
* instead.
* response is invalidated.
*/
public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis, int expirationMillis) {
public CheckAggregationOptions(int numEntries, int expirationMillis) {
this.numEntries = numEntries;
this.flushCacheEntryIntervalMillis = flushCacheEntryIntervalMillis;
this.expirationMillis = Math.max(expirationMillis, flushCacheEntryIntervalMillis + 1);
this.expirationMillis = expirationMillis;
}

/**
Expand All @@ -81,7 +65,7 @@ public CheckAggregationOptions(int numEntries, int flushCacheEntryIntervalMillis
* Creates an instance initialized with the default values.
*/
public CheckAggregationOptions() {
this(DEFAULT_NUM_ENTRIES, DEFAULT_FLUSH_CACHE_ENTRY_INTERVAL_MILLIS, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
this(DEFAULT_NUM_ENTRIES, DEFAULT_RESPONSE_EXPIRATION_MILLIS);
}

/**
Expand All @@ -92,18 +76,9 @@ public int getNumEntries() {
return numEntries;
}

/**
* @return the maximum interval before aggregated report requests are
* flushed to the server
*/
public int getFlushCacheEntryIntervalMillis() {
return flushCacheEntryIntervalMillis;
}

/**
* @return the maximum interval before a cached check response should be
* deleted. This value will not be greater than
* {@link #getFlushCacheEntryIntervalMillis()}
* deleted.
*/
public int getExpirationMillis() {
return expirationMillis;
Expand All @@ -115,45 +90,29 @@ public int getExpirationMillis() {
* @param <T>
* the type of the instance being cached
*
* @param out
* a concurrent {@code Deque} to which previously cached items
* are added as they expire
* @return a {@link Cache} corresponding to this instance's values or
* {@code null} unless {@link #numEntries} is positive.
*/
@Nullable
public <T> Cache<String, T> createCache(ConcurrentLinkedDeque<T> out) {
return createCache(out, Ticker.systemTicker());
public <T> Cache<String, T> createCache() {
return createCache(Ticker.systemTicker());
}

/**
* Creates a {@link Cache} configured by this instance.
*
* @param <T>
* the type of the value stored in the Cache
* @param out
* a concurrent {@code Deque} to which the cached values are
* added as they are removed from the cache
* @param ticker
* the time source used to determine expiration
* @param <T> the type of the value stored in the Cache
* @param ticker the time source used to determine expiration
* @return a {@link Cache} corresponding to this instance's values or
* {@code null} unless {@code #numEntries} is positive.
* {@code null} unless {@code #numEntries} is positive.
*/
@Nullable
public <T> Cache<String, T> createCache(final ConcurrentLinkedDeque<T> out, Ticker ticker) {
Preconditions.checkNotNull(out, "The out deque cannot be null");
public <T> Cache<String, T> createCache(Ticker ticker) {
Preconditions.checkNotNull(ticker, "The ticker cannot be null");
if (numEntries <= 0) {
return null;
}
final RemovalListener<String, T> listener = new RemovalListener<String, T>() {
@Override
public void onRemoval(RemovalNotification<String, T> notification) {
out.addFirst(notification.getValue());
}
};
CacheBuilder<String, T> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker)
.removalListener(listener);
CacheBuilder<Object, Object> b = CacheBuilder.newBuilder().maximumSize(numEntries).ticker(ticker);
if (expirationMillis >= 0) {
b.expireAfterWrite(expirationMillis, TimeUnit.MILLISECONDS);
}
Expand Down
Loading

0 comments on commit 907cf7e

Please sign in to comment.