This repository has been archived by the owner on Dec 17, 2024. It is now read-only.
forked from lizan/service-control-client-cxx
-
Notifications
You must be signed in to change notification settings - Fork 10
/
check_aggregator_impl.h
247 lines (206 loc) · 9.73 KB
/
check_aggregator_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/* Copyright 2016 Google Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
// Caches and aggregates check requests.
#ifndef GOOGLE_SERVICE_CONTROL_CLIENT_CHECK_AGGREGATOR_IMPL_H_
#define GOOGLE_SERVICE_CONTROL_CLIENT_CHECK_AGGREGATOR_IMPL_H_
#include <string>
#include <unordered_map>
#include <utility>
#include "absl/status/status.h"
#include "google/api/metric.pb.h"
#include "google/api/servicecontrol/v1/operation.pb.h"
#include "google/api/servicecontrol/v1/service_controller.pb.h"
#include "src/aggregator_interface.h"
#include "src/cache_removed_items_handler.h"
#include "src/operation_aggregator.h"
#include "utils/simple_lru_cache.h"
#include "utils/simple_lru_cache_inl.h"
#include "utils/thread.h"
namespace google {
namespace service_control_client {
// Caches/Batches/aggregates check requests and sends them to the server.
// Thread safe.
//
// Some typical data flows:
//
// Creates a new cache entry and use cached response:
// 1) Calls Check(), and it returns NOT_FOUND,
// 2) Callers send the request to server and get its response.
// 3) Calls CacheResponse() to set the response to cache.
// 4) The new Check() calls after will find the cached response, use it.
// 5) If the request has quota info, it will be aggregated to the cached entry.
//
// Refreshes a cached entry after refresh interval:
// 1) Calls Check(), found a cached response,
// 2) If it passes refresh_interval, Check() returns NOT_FOUND, callers
// will send the request to server.
// 3) The new Check() calls after will use old response before the new response
// arrives.
// 4) Callers will set the new response by calling CacheResponse().
// 5) The new Check() calls after will use the new response.
//
// After a response is expired:
// 1) During Flush() call, if a cached response is expired, it wil be flushed
// out. If it has aggregated quota info, flush_callback will be called to
// send the request to server.
// 2) Callers need to send the request to server, and get its new response.
// 3) The new response will be added to the cache by calling CacheResponse().
// 4) If there is not Check() called for that entry before it is expired again,
// it will NOT have aggregated data when that entry is removed. It will not
// send to flush_callback(). The item simply just got deleted.
//
// Object life management:
// The callers of this object needs to make sure the object is still valid
// before calling its methods. Specifically, callers may use async
// transport to send request to server and pass an on_done() callback to be
// called when response is received. If on_done() function is calling
// CheckAggregator->CacheReponse() funtion, caller MUST make sure the
// CacheAggregator object is still valid.
typedef CacheRemovedItemsHandler<
::google::api::servicecontrol::v1::CheckRequest>
CheckCacheRemovedItemsHandler;
class CheckAggregatorImpl : public CheckAggregator,
public CheckCacheRemovedItemsHandler {
public:
// Constructor.
// Does not take ownership of metric_kinds and controller, which must outlive
// this instance.
CheckAggregatorImpl(const std::string &service_name,
const std::string &service_config_id,
const CheckAggregationOptions &options,
std::shared_ptr<MetricKindMap> metric_kind);
virtual ~CheckAggregatorImpl();
// Sets the flush callback function.
// It is called when a cache entry is expired and it has aggregated quota
// in the request. The callback function needs to send the request to server,
// calls CacheResponse() to set its response.
virtual void SetFlushCallback(FlushCallback callback);
// If the check could not be handled by the cache, returns NOT_FOUND,
// caller has to send the request to service control server and call
// CacheResponse() to set the response to the cache.
// Otherwise, returns OK and cached response.
virtual absl::Status
Check(const ::google::api::servicecontrol::v1::CheckRequest &request,
::google::api::servicecontrol::v1::CheckResponse *response);
// Caches a response from a remote Service Controller Check call.
virtual absl::Status CacheResponse(
const std::string &request_signature,
const ::google::api::servicecontrol::v1::CheckResponse &response);
// When the next Flush() should be called.
// Returns in ms from now, or -1 for never
virtual int GetNextFlushInterval();
// Flushes expired cache response entries.
// Called at time specified by GetNextFlushInterval().
virtual absl::Status Flush();
// Flushes out all cache items. Usually called at destructor.
virtual absl::Status FlushAll();
private:
// Cache entry for aggregated check requests and previous check response.
class CacheElem {
public:
CacheElem(const ::google::api::servicecontrol::v1::CheckResponse &response,
const int64_t time, const int quota_scale)
: check_response_(response), last_check_time_(time),
quota_scale_(quota_scale), is_flushing_(false) {}
// Aggregates the given request to this cache entry.
void
Aggregate(const ::google::api::servicecontrol::v1::CheckRequest &request,
const MetricKindMap *metric_kinds);
// Returns the aggregated CheckRequest and reset the cache entry.
::google::api::servicecontrol::v1::CheckRequest
ReturnCheckRequestAndClear(const std::string &service_name,
const std::string &service_config_id);
bool HasPendingCheckRequest() const {
return operation_aggregator_ != NULL;
}
// Setter for check response.
inline void
set_check_response(const ::google::api::servicecontrol::v1::CheckResponse
&check_response) {
check_response_ = check_response;
}
// Getter for check response.
inline const ::google::api::servicecontrol::v1::CheckResponse &
check_response() const {
return check_response_;
}
// Setter for last check time.
inline void set_last_check_time(const int64_t last_check_time) {
last_check_time_ = last_check_time;
}
// Getter for last check time.
inline const int64_t last_check_time() const { return last_check_time_; }
// Setter for check response.
inline void set_quota_scale(const int quota_scale) {
quota_scale_ = quota_scale;
}
// Getter for check response.
inline int quota_scale() const { return quota_scale_; }
// Getter and Setter of is_flushing_;
inline bool is_flushing() const { return is_flushing_; }
inline void set_is_flushing(bool v) { is_flushing_ = v; }
private:
// Internal operation.
std::unique_ptr<OperationAggregator> operation_aggregator_;
// The check response for the last check request.
::google::api::servicecontrol::v1::CheckResponse check_response_;
// In general, this is the last time a check response is updated.
//
// During flush, we set it to be the request start time to prevent a next
// check request from triggering another flush. Note that this prevention
// works only during the flush interval, which means for long RPC, there
// could be up to RPC_time/flush_interval ongoing check requests.
int64_t last_check_time_;
// Scale used to predict how much quota are charged. It is calculated
// as the tokens charged in the last check response / requested tokens.
// The predicated amount tokens consumed is then request tokens * scale.
// This field is valid only when check_response has no check errors.
int quota_scale_;
// If true, is sending the request to server to get new response.
bool is_flushing_;
};
using CacheDeleter = std::function<void(CacheElem *)>;
// Key is the signature of the check request. Value is the CacheElem.
// It is a LRU cache with MaxIdelTime as response_expiration_time.
using CheckCache =
SimpleLRUCacheWithDeleter<std::string, CacheElem, CacheDeleter>;
// Returns whether we should flush a cache entry.
// If the aggregated check request is less than flush interval, no need to
// flush.
bool ShouldFlush(const CacheElem &elem);
// Flushes the internal operation in the elem and delete the elem. The
// response from the server is NOT cached.
// Takes ownership of the elem.
void OnCacheEntryDelete(CacheElem *elem);
// The service name for this cache.
const std::string service_name_;
// The service config id for this cache.
const std::string service_config_id_;
// The check aggregation options.
CheckAggregationOptions options_;
// Metric kinds. Key is the metric name and value is the metric kind.
// Defaults to DELTA if not specified. Not owned.
std::shared_ptr<MetricKindMap> metric_kinds_;
// Mutex guarding the access of cache_;
Mutex cache_mutex_;
// The cache that maps from operation signature to an operation.
// We don't calculate fine grained cost for cache entries, assign each
// entry 1 cost unit.
// Guarded by mutex_, except when compare against NULL.
std::unique_ptr<CheckCache> cache_;
// flush interval in cycles.
int64_t flush_interval_in_cycle_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(CheckAggregatorImpl);
};
} // namespace service_control_client
} // namespace google
#endif // GOOGLE_SERVICE_CONTROL_CLIENT_CHECK_AGGREGATOR_IMPL_H_