Skip to content

Commit

Permalink
Add non rx based caching attribute client (#200)
Browse files Browse the repository at this point in the history
* Update gradle locks

* trivy

* Add non rx based caching attribute client

* remove

* update

* update

* fix

---------

Co-authored-by: kishansairam9 <[email protected]>
  • Loading branch information
Kishan Sairam Adapa and kishansairam9 authored Nov 21, 2023
1 parent f13783d commit 32a2977
Show file tree
Hide file tree
Showing 14 changed files with 437 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .trivyignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# libssl3
CVE-2023-5678 exp:2023-12-31
2 changes: 1 addition & 1 deletion attribute-projection-registry/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ io.grpc:grpc-protobuf:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-stub:1.57.2=compileClasspath,runtimeClasspath
javax.annotation:javax.annotation-api:1.3.2=runtimeClasspath
org.checkerframework:checker-qual:3.33.0=compileClasspath,runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.2.11=compileClasspath,runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.3.0=compileClasspath,runtimeClasspath
org.hypertrace.core.kafkastreams.framework:kafka-bom:0.3.9=compileClasspath,runtimeClasspath
empty=annotationProcessor
2 changes: 1 addition & 1 deletion attribute-service-api/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ io.grpc:grpc-protobuf:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-stub:1.57.2=compileClasspath,runtimeClasspath
javax.annotation:javax.annotation-api:1.3.2=compileClasspath,runtimeClasspath
org.checkerframework:checker-qual:3.33.0=compileClasspath,runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.2.11=compileClasspath,runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.3.0=compileClasspath,runtimeClasspath
org.hypertrace.core.kafkastreams.framework:kafka-bom:0.3.9=compileClasspath,runtimeClasspath
empty=annotationProcessor
19 changes: 18 additions & 1 deletion attribute-service-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,24 @@ plugins {

dependencies {
api(projects.attributeServiceApi)
api(commonLibs.typesafe.config)

implementation(commonLibs.hypertrace.grpcutils.client)
implementation(commonLibs.hypertrace.grpcutils.context)
implementation(commonLibs.hypertrace.framework.metrics)
implementation(commonLibs.slf4j2.api)
implementation(commonLibs.typesafe.config)
implementation(commonLibs.guava)

annotationProcessor(commonLibs.lombok)
compileOnly(commonLibs.lombok)

testImplementation(commonLibs.junit.jupiter)
testImplementation(commonLibs.mockito.core)
testImplementation(commonLibs.mockito.junit)
testImplementation(commonLibs.grpc.core)
testImplementation(commonLibs.log4j.slf4j2.impl)
}

tasks.test {
useJUnitPlatform()
}
40 changes: 36 additions & 4 deletions attribute-service-client/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,53 @@ com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava=comp
com.google.j2objc:j2objc-annotations:2.8=compileClasspath
com.google.protobuf:protobuf-java:3.24.1=compileClasspath,runtimeClasspath
com.typesafe:config:1.4.2=compileClasspath,runtimeClasspath
io.dropwizard.metrics:metrics-core:4.2.16=compileClasspath,runtimeClasspath
io.dropwizard.metrics:metrics-jvm:4.2.16=runtimeClasspath
io.github.mweirauch:micrometer-jvm-extras:0.2.2=runtimeClasspath
io.grpc:grpc-api:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-bom:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-context:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-core:1.57.2=runtimeClasspath
io.grpc:grpc-protobuf-lite:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-protobuf:1.57.2=compileClasspath,runtimeClasspath
io.grpc:grpc-stub:1.57.2=compileClasspath,runtimeClasspath
io.micrometer:micrometer-commons:1.10.2=compileClasspath,runtimeClasspath
io.micrometer:micrometer-core:1.10.2=compileClasspath,runtimeClasspath
io.micrometer:micrometer-observation:1.10.2=compileClasspath,runtimeClasspath
io.micrometer:micrometer-registry-prometheus:1.10.2=runtimeClasspath
io.netty:netty-bom:4.1.100.Final=compileClasspath,runtimeClasspath
io.perfmark:perfmark-api:0.26.0=runtimeClasspath
io.prometheus:simpleclient:0.16.0=runtimeClasspath
io.prometheus:simpleclient_common:0.16.0=runtimeClasspath
io.prometheus:simpleclient_dropwizard:0.12.0=runtimeClasspath
io.prometheus:simpleclient_pushgateway:0.12.0=runtimeClasspath
io.prometheus:simpleclient_servlet:0.12.0=runtimeClasspath
io.prometheus:simpleclient_servlet_common:0.12.0=runtimeClasspath
io.prometheus:simpleclient_tracer_common:0.16.0=runtimeClasspath
io.prometheus:simpleclient_tracer_otel:0.16.0=runtimeClasspath
io.prometheus:simpleclient_tracer_otel_agent:0.16.0=runtimeClasspath
javax.annotation:javax.annotation-api:1.3.2=runtimeClasspath
javax.servlet:javax.servlet-api:3.1.0=compileClasspath,runtimeClasspath
javax.xml.bind:jaxb-api:2.3.0=runtimeClasspath
org.apache.logging.log4j:log4j-api:2.19.0=runtimeClasspath
org.apache.logging.log4j:log4j-core:2.19.0=runtimeClasspath
org.apache.logging.log4j:log4j-slf4j-impl:2.19.0=runtimeClasspath
org.checkerframework:checker-qual:3.33.0=compileClasspath,runtimeClasspath
org.codehaus.mojo:animal-sniffer-annotations:1.23=runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.2.11=compileClasspath,runtimeClasspath
org.eclipse.jetty:jetty-http:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-io:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-security:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-server:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-servlet:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-util-ajax:9.4.53.v20231009=runtimeClasspath
org.eclipse.jetty:jetty-util:9.4.53.v20231009=runtimeClasspath
org.hdrhistogram:HdrHistogram:2.1.12=runtimeClasspath
org.hypertrace.bom:hypertrace-bom:0.3.0=compileClasspath,runtimeClasspath
org.hypertrace.core.grpcutils:grpc-client-utils:0.12.6=compileClasspath,runtimeClasspath
org.hypertrace.core.grpcutils:grpc-context-utils:0.12.6=runtimeClasspath
org.hypertrace.core.grpcutils:grpc-context-utils:0.12.6=compileClasspath,runtimeClasspath
org.hypertrace.core.kafkastreams.framework:kafka-bom:0.3.9=compileClasspath,runtimeClasspath
org.slf4j:slf4j-api:2.0.7=runtimeClasspath
empty=annotationProcessor
org.hypertrace.core.serviceframework:platform-metrics:0.1.62=compileClasspath,runtimeClasspath
org.latencyutils:LatencyUtils:2.0.3=runtimeClasspath
org.projectlombok:lombok:1.18.28=annotationProcessor,compileClasspath
org.slf4j:slf4j-api:2.0.7=compileClasspath,runtimeClasspath
empty=
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.hypertrace.core.attribute.service.client;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.Channel;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.attribute.service.client.config.AttributeServiceCachedClientConfig;
import org.hypertrace.core.attribute.service.v1.AttributeMetadata;
import org.hypertrace.core.attribute.service.v1.AttributeServiceGrpc;
import org.hypertrace.core.attribute.service.v1.AttributeServiceGrpc.AttributeServiceBlockingStub;
import org.hypertrace.core.attribute.service.v1.GetAttributesRequest;
import org.hypertrace.core.grpcutils.client.ClientCallCredentialsProvider;
import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory;
import org.hypertrace.core.grpcutils.context.ContextualKey;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;

@Slf4j
public class AttributeServiceCachedClient {
private final LoadingCache<ContextualKey<Void>, Table<String, String, AttributeMetadata>> cache;
private final Cache<String, AttributeScopeAndKey> scopeAndKeyLookup;
private final AttributeServiceBlockingStub attributeServiceBlockingStub;
private final long deadlineMs;
private final ClientCallCredentialsProvider callCredentialsProvider;

AttributeServiceCachedClient(Channel channel, AttributeServiceCachedClientConfig clientConfig) {
this(
channel,
clientConfig,
RequestContextClientCallCredsProviderFactory.getClientCallCredsProvider());
}

AttributeServiceCachedClient(
Channel channel,
AttributeServiceCachedClientConfig clientConfig,
ClientCallCredentialsProvider callCredentialsProvider) {
this.callCredentialsProvider = callCredentialsProvider;
this.attributeServiceBlockingStub = AttributeServiceGrpc.newBlockingStub(channel);
deadlineMs = clientConfig.getDeadline().toMillis();
cache =
CacheBuilder.newBuilder()
.maximumSize(clientConfig.getMaxSize())
.refreshAfterWrite(clientConfig.getRefreshAfterWrite())
.expireAfterAccess(clientConfig.getExpireAfterAccess())
.build(
CacheLoader.asyncReloading(
CacheLoader.from(this::loadTable),
Executors.newFixedThreadPool(
clientConfig.getExecutorThreads(), this.buildThreadFactory())));
PlatformMetricsRegistry.registerCache(
clientConfig.getCacheMetricsName(), cache, Collections.emptyMap());
scopeAndKeyLookup =
CacheBuilder.newBuilder().expireAfterWrite(clientConfig.getExpireAfterAccess()).build();
}

public Optional<AttributeMetadata> get(
@Nonnull RequestContext requestContext,
@Nonnull String attributeScope,
@Nonnull String attributeKey) {
return Optional.ofNullable(getTable(requestContext).get(attributeScope, attributeKey));
}

public Optional<AttributeMetadata> getById(
@Nonnull RequestContext requestContext, @Nonnull String attributeId) {
Table<String, String, AttributeMetadata> table = getTable(requestContext);
return Optional.ofNullable(scopeAndKeyLookup.getIfPresent(attributeId))
.map(scopeAndKey -> table.get(scopeAndKey.getScope(), scopeAndKey.getKey()));
}

public List<AttributeMetadata> getAllInScope(
@Nonnull RequestContext requestContext, @Nonnull String attributeScope) {
return List.copyOf(getTable(requestContext).row(attributeScope).values());
}

private Table<String, String, AttributeMetadata> getTable(RequestContext requestContext) {
return cache.getUnchecked(requestContext.buildInternalContextualKey());
}

private Table<String, String, AttributeMetadata> loadTable(ContextualKey<Void> contextualKey) {
List<AttributeMetadata> attributeMetadataList =
contextualKey
.getContext()
.call(
() ->
attributeServiceBlockingStub
.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS)
.withCallCredentials(callCredentialsProvider.get())
.getAttributes(GetAttributesRequest.getDefaultInstance()))
.getAttributesList();
attributeMetadataList.forEach(
attributeMetadata ->
scopeAndKeyLookup.put(
attributeMetadata.getId(),
new AttributeScopeAndKey(
attributeMetadata.getScopeString(), attributeMetadata.getKey())));
return attributeMetadataList.stream()
.collect(
ImmutableTable.toImmutableTable(
AttributeMetadata::getScopeString, AttributeMetadata::getKey, Function.identity()));
}

private ThreadFactory buildThreadFactory() {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("attribute-service-cached-client-%d")
.build();
}

@Value
private static class AttributeScopeAndKey {
String scope;
String key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.hypertrace.core.attribute.service.client.config;

import com.typesafe.config.Config;
import java.time.Duration;
import lombok.Value;
import org.hypertrace.core.attribute.service.client.AttributeServiceCachedClient;

@Value
public class AttributeServiceCachedClientConfig {
private static final String DEADLINE_CONFIG_KEY = "deadline";
private static final String CACHE_MAX_SIZE_CONFIG_KEY = "maxSize";
private static final String CACHE_REFRESH_AFTER_WRITE_CONFIG_KEY = "refreshAfterWriteDuration";
private static final String CACHE_EXPIRE_AFTER_ACCESS_CONFIG_KEY = "expireAfterAccessDuration";
private static final String CACHE_EXECUTOR_THREADS_CONFIG_KEY = "executorThreads";

Duration deadline;
long maxSize;
Duration refreshAfterWrite;
Duration expireAfterAccess;
int executorThreads;
String cacheMetricsName;

public static AttributeServiceCachedClientConfig from(Config attributeServiceConfig) {
return from(attributeServiceConfig, AttributeServiceCachedClient.class.getName());
}

public static AttributeServiceCachedClientConfig from(
Config attributeServiceConfig, String cacheMetricsName) {
Duration deadline =
attributeServiceConfig.hasPath(DEADLINE_CONFIG_KEY)
? attributeServiceConfig.getDuration(DEADLINE_CONFIG_KEY)
: Duration.ofSeconds(30);
long maxSize =
attributeServiceConfig.hasPath(CACHE_MAX_SIZE_CONFIG_KEY)
? attributeServiceConfig.getLong(CACHE_MAX_SIZE_CONFIG_KEY)
: 1000;
Duration refreshAfterWrite =
attributeServiceConfig.hasPath(CACHE_REFRESH_AFTER_WRITE_CONFIG_KEY)
? attributeServiceConfig.getDuration(CACHE_REFRESH_AFTER_WRITE_CONFIG_KEY)
: Duration.ofMinutes(15);
Duration expireAfterWrite =
attributeServiceConfig.hasPath(CACHE_EXPIRE_AFTER_ACCESS_CONFIG_KEY)
? attributeServiceConfig.getDuration(CACHE_EXPIRE_AFTER_ACCESS_CONFIG_KEY)
: Duration.ofHours(1);
int executorThreads =
attributeServiceConfig.hasPath(CACHE_EXECUTOR_THREADS_CONFIG_KEY)
? attributeServiceConfig.getInt(CACHE_EXECUTOR_THREADS_CONFIG_KEY)
: 1;
return new AttributeServiceCachedClientConfig(
deadline, maxSize, refreshAfterWrite, expireAfterWrite, executorThreads, cacheMetricsName);
}
}
Loading

0 comments on commit 32a2977

Please sign in to comment.