Skip to content

Commit

Permalink
Adding optional prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
patduin committed Apr 16, 2024
1 parent 070ce28 commit fec5715
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;

import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;

import com.hotels.bdp.waggledance.api.WaggleDanceException;
import com.hotels.bdp.waggledance.api.model.AbstractMetaStore;
import com.hotels.bdp.waggledance.client.adapter.MetastoreIfaceAdapter;
Expand Down
1 change: 1 addition & 0 deletions waggle-dance-extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ To enable and configure see the following table, you can add these properties to
| Property | Required | Description |
| --- | --- | --- |
| waggledance.extensions.ratelimit.enabled | no | Whether the rate limiting extension is enabled. Default is `false` |
| waggledance.extensions.ratelimit.keyPrefix | no | Optional prefix for the bucket keys. Default is (empty string) `` |
| waggledance.extensions.ratelimit.storage | yes (if `enabled: true`) | The storage backend for the rate limiter, possible values `MEMORY` or `REDIS` |
| waggledance.extensions.ratelimit.capacity | no | The capacity of the bucket. Default `2000` |
| waggledance.extensions.ratelimit.refillType | no | The refill type, possible values `GREEDY` or `INTERVALLY`. Default is `GREEDY` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.hotels.bdp.waggledance.client.ThriftClientFactory;
import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketBandwidthProvider;
import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketKeyGenerator;
import com.hotels.bdp.waggledance.extensions.client.ratelimit.BucketService;
import com.hotels.bdp.waggledance.extensions.client.ratelimit.RateLimitingClientFactory;
import com.hotels.bdp.waggledance.extensions.client.ratelimit.RefillType;
Expand All @@ -51,8 +52,15 @@ public class ExtensionBeans {
@Bean
public ThriftClientFactory thriftClientFactory(
ThriftClientFactory defaultWaggleDanceClientFactory,
BucketService bucketService) {
return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService);
BucketService bucketService,
BucketKeyGenerator bucketKeyGenerator) {
return new RateLimitingClientFactory(defaultWaggleDanceClientFactory, bucketService, bucketKeyGenerator);
}

@Bean
public BucketKeyGenerator bucketKeyGenerator(
@Value("${waggledance.extensions.ratelimit.keyPrefix:\"\"}") String keyPrefix) {
return new BucketKeyGenerator(keyPrefix);
}

@ConditionalOnProperty(name = "waggledance.extensions.ratelimit.storage", havingValue = STORAGE_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.hotels.bdp.waggledance.extensions.client.ratelimit;

public class BucketKeyGenerator {

private final String prefix;

public BucketKeyGenerator(String prefix) {
this.prefix = prefix;
}

public String generateKey(String key) {
if (prefix != null && !prefix.isEmpty()) {
return prefix + "_" + key;
}
return key;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ public class RateLimitingClientFactory implements ThriftClientFactory {

private final ThriftClientFactory thriftClientFactory;
private final BucketService bucketService;
private final BucketKeyGenerator bucketKeyGenerator;

public RateLimitingClientFactory(
ThriftClientFactory thriftClientFactory, BucketService bucketService) {
ThriftClientFactory thriftClientFactory,
BucketService bucketService,
BucketKeyGenerator bucketKeyGenerator) {
this.thriftClientFactory = thriftClientFactory;
this.bucketService = bucketService;
this.bucketKeyGenerator = bucketKeyGenerator;
}

@Override
public CloseableThriftHiveMetastoreIface newInstance(AbstractMetaStore metaStore) {
CloseableThriftHiveMetastoreIface client = thriftClientFactory.newInstance(metaStore);
return (CloseableThriftHiveMetastoreIface) Proxy
.newProxyInstance(getClass().getClassLoader(), INTERFACES, new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService));
.newProxyInstance(getClass().getClassLoader(), INTERFACES,
new RateLimitingInvocationHandler(client, metaStore.getName(), bucketService, bucketKeyGenerator));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ class RateLimitingInvocationHandler implements InvocationHandler {
private String user = UNKNOWN_USER;

private final BucketService bucketService;
private final BucketKeyGenerator bucketKeyGenerator;

public RateLimitingInvocationHandler(
CloseableThriftHiveMetastoreIface client,
String metastoreName,
BucketService bucketService) {
BucketService bucketService, BucketKeyGenerator bucketKeyGenerator) {
this.client = client;
this.metastoreName = metastoreName;
this.bucketService = bucketService;
this.bucketKeyGenerator = bucketKeyGenerator;
}

@Override
Expand Down Expand Up @@ -78,7 +80,7 @@ private Object doRateLimitCall(CloseableThriftHiveMetastoreIface client, Method

private boolean proceedWithCall(Method method) {
try {
Bucket bucket = bucketService.getBucket(user);
Bucket bucket = bucketService.getBucket(bucketKeyGenerator.generateKey(user));
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
log
.info("RateLimitCall:[User:{}, method:{}, source_ip:{}, tokens_remaining:{}, metastoreName:{}]", user,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.hotels.bdp.waggledance.extensions.client.ratelimit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import org.junit.Test;

public class BucketKeyGeneratorTest {

@Test
public void testGenerateKey() {
BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator("prefix");
String key = bucketKeyGenerator.generateKey("key");
assertThat(key, is("prefix_key"));
}

@Test
public void testGenerateKeyNullPrefix() {
BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator(null);
String key = bucketKeyGenerator.generateKey("key");
assertThat(key, is("key"));
}

@Test
public void testGenerateKeyEmptyPrefix() {
BucketKeyGenerator bucketKeyGenerator = new BucketKeyGenerator("");
String key = bucketKeyGenerator.generateKey("key");
assertThat(key, is("key"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -41,17 +42,25 @@
@RunWith(MockitoJUnitRunner.class)
public class RateLimitingInvocationHandlerTest {

private static final String USER = "user";
private @Mock ThriftClientFactory thriftClientFactory;
private @Mock CloseableThriftHiveMetastoreIface client;
private @Mock BucketKeyGenerator bucketKeyGenerator;
private BucketService bucketService = new InMemoryBucketService(new IntervallyBandwidthProvider(2, 1));
private AbstractMetaStore metastore = AbstractMetaStore.newPrimaryInstance("name", "uri");
private static final String USER = "user";
private CloseableThriftHiveMetastoreIface handlerProxy;

@Test
public void testLimitDifferentUsers() throws Exception {
@Before
public void setUp() {
when(thriftClientFactory.newInstance(metastore)).thenReturn(client);
CloseableThriftHiveMetastoreIface handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService)
when(bucketKeyGenerator.generateKey(USER)).thenReturn(USER);
when(bucketKeyGenerator.generateKey(UNKNOWN_USER)).thenReturn(UNKNOWN_USER);
handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService, bucketKeyGenerator)
.newInstance(metastore);
}

@Test
public void testLimitDifferentUsers() throws Exception {

assertTokens(2, 2);
handlerProxy.get_table("db", "table");
Expand Down Expand Up @@ -79,27 +88,20 @@ public void testLimitDifferentUsers() throws Exception {

@Test
public void testBucketExceptionStillDoCall() throws Exception {
when(thriftClientFactory.newInstance(metastore)).thenReturn(client);
Table table = new Table();
when(client.get_table("db", "table")).thenReturn(table);
BucketService mockedBucketService = Mockito.mock(BucketService.class);
when(mockedBucketService.getBucket(anyString())).thenThrow(new RuntimeException("Bucket exception"));
CloseableThriftHiveMetastoreIface handlerProxy = new RateLimitingClientFactory(thriftClientFactory,
mockedBucketService).newInstance(metastore);
CloseableThriftHiveMetastoreIface proxy = new RateLimitingClientFactory(thriftClientFactory, mockedBucketService, bucketKeyGenerator)
.newInstance(metastore);

Table result = handlerProxy.get_table("db", "table");
Table result = proxy.get_table("db", "table");
assertThat(result, is(table));
}

@Test
public void testInvocationHandlerThrowsCause() throws Exception {
when(thriftClientFactory.newInstance(metastore)).thenReturn(client);
when(client.get_table("db", "table")).thenThrow(new NoSuchObjectException("No such table"));
BucketService mockedBucketService = Mockito.mock(BucketService.class);
when(mockedBucketService.getBucket(anyString())).thenThrow(new RuntimeException("Bucket exception"));
CloseableThriftHiveMetastoreIface handlerProxy = new RateLimitingClientFactory(thriftClientFactory,
mockedBucketService).newInstance(metastore);

try {
handlerProxy.get_table("db", "table");
fail("Should have thrown exception.");
Expand All @@ -110,10 +112,6 @@ public void testInvocationHandlerThrowsCause() throws Exception {

@Test
public void testIgnoredMethods() throws Exception {
when(thriftClientFactory.newInstance(metastore)).thenReturn(client);
CloseableThriftHiveMetastoreIface handlerProxy = new RateLimitingClientFactory(thriftClientFactory, bucketService)
.newInstance(metastore);

assertTokens(2, 2);
handlerProxy.set_ugi(USER, null);
handlerProxy.isOpen();
Expand Down

0 comments on commit fec5715

Please sign in to comment.