Skip to content

Commit

Permalink
feature/#42 Added exists options for BucketSettings (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rumpelshtinskiy authored Oct 28, 2024
1 parent 7b00b77 commit 1a99abd
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- EntryClient.getRecord(Bucket bucket, Entry<?> body) method to get a record from a
bucket [PR-27](https://github.com/reductstore/reduct-java/issues/27)
- Improve ReductClient initialization. [PR-41](https://github.com/reductstore/reduct-java/issues/41)
- Add exists option to BucketSettings. [PR-42](https://github.com/reductstore/reduct-java/issues/42)
### Infrastructure:

- Added GitHub Actions for CI/CD [PR-35](https://github.com/reductstore/reduct-java/pull/35)
61 changes: 42 additions & 19 deletions src/main/java/store/reduct/client/ReductClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,26 @@
@Getter
public class ReductClient {
private static final String REDUCT_ERROR_HEADER = "x-reduct-error";
public static final String S_S = "%s/%s";
private final ServerProperties serverProperties;
private final HttpClient httpClient;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Send http query and return response. Throw a ReductException if getting an
* IOException or an InterruptedException
*
* @param builder
* @param bodyHandler
* @return
* @param <T>
*/
public <T> HttpResponse<T> send(HttpRequest.Builder builder, HttpResponse.BodyHandler<T> bodyHandler) {
try {
if (isNotBlank(serverProperties.apiToken())) {
builder.headers("Authorization", "Bearer %s".formatted(serverProperties.apiToken()));
}
HttpResponse<T> httpResponse = httpClient.send(builder.build(), bodyHandler);
if (httpResponse.statusCode() == 200) {
return httpResponse;
}
throw new ReductException(
httpResponse.headers().firstValue(REDUCT_ERROR_HEADER).orElse("Unsuccessful request"),
httpResponse.statusCode());
return httpClient.send(builder.build(), bodyHandler);
} catch (IOException e) {
throw new ReductException("An error occurred while processing the request", e);
} catch (InterruptedException e) {
Expand All @@ -56,6 +60,24 @@ public <T> HttpResponse<T> send(HttpRequest.Builder builder, HttpResponse.BodyHa
}
}

/**
* Send http query and return response with 200 status only
*
* @param builder
* @param bodyHandler
* @return
* @param <T>
*/
public <T> HttpResponse<T> sendAndGetOnlySuccess(HttpRequest.Builder builder,
HttpResponse.BodyHandler<T> bodyHandler) {
HttpResponse<T> httpResponse = this.send(builder, bodyHandler);
if (httpResponse.statusCode() == 200) {
return httpResponse;
}
throw new ReductException(httpResponse.headers().firstValue(REDUCT_ERROR_HEADER).orElse("Unsuccessful request"),
httpResponse.statusCode());
}

/**
* Create a new bucket with the name and the settings. NOTE: If, authentication
* is enabled on the server, an access token with full access must be provided
Expand All @@ -76,7 +98,7 @@ public <T> HttpResponse<T> send(HttpRequest.Builder builder, HttpResponse.BodyHa
public Bucket createBucket(String name, BucketSettings bucketSettings)
throws ReductException, IllegalArgumentException {
String createBucketPath = BucketURL.CREATE_BUCKET.getUrl().formatted(name);
URI uri = URI.create("%s/%s".formatted(serverProperties.url(), createBucketPath));
URI uri = URI.create(S_S.formatted(serverProperties.url(), createBucketPath));
HttpRequest.Builder httpRequest = HttpRequest.newBuilder().uri(uri)
.POST(HttpRequest.BodyPublishers.ofString(JsonUtils.serialize(bucketSettings)));
HttpResponse<Void> httpResponse = send(httpRequest, HttpResponse.BodyHandlers.discarding());// TODO ask about
Expand All @@ -92,7 +114,8 @@ public Bucket createBucket(String name, BucketSettings bucketSettings)
// settings as null
// until invoke
// read.
if (httpResponse.statusCode() == 200) {
if (httpResponse.statusCode() == 200
|| (Boolean.TRUE.equals(bucketSettings.getExists() && httpResponse.statusCode() == 409))) {
Bucket bucket = new Bucket(name, this);
bucket.setBucketSettings(bucketSettings);
return bucket;
Expand All @@ -116,9 +139,9 @@ public Bucket createBucket(String name, BucketSettings bucketSettings)
* status code to indicate the failure.
*/
public ServerInfo info() throws ReductException {
URI uri = URI.create("%s/%s".formatted(getServerProperties().url(), ServerURL.SERVER_INFO.getUrl()));
URI uri = URI.create(S_S.formatted(getServerProperties().url(), ServerURL.SERVER_INFO.getUrl()));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<String> httpResponse = send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> httpResponse = sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString());
return JsonUtils.parseObject(httpResponse.body(), ServerInfo.class);
}

Expand All @@ -132,9 +155,9 @@ public ServerInfo info() throws ReductException {
* status code to indicate the failure.
*/
public Buckets list() throws ReductException {
URI uri = URI.create("%s/%s".formatted(getServerProperties().url(), ServerURL.LIST.getUrl()));
URI uri = URI.create(S_S.formatted(getServerProperties().url(), ServerURL.LIST.getUrl()));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<String> httpResponse = send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> httpResponse = sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString());
return JsonUtils.parseObject(httpResponse.body(), Buckets.class);
}

Expand All @@ -148,10 +171,10 @@ public Buckets list() throws ReductException {
* status code to indicate the failure.
*/
public Boolean isAlive() throws ReductException {
URI uri = URI.create("%s/%s".formatted(getServerProperties().url(), ServerURL.ALIVE.getUrl()));
URI uri = URI.create(S_S.formatted(getServerProperties().url(), ServerURL.ALIVE.getUrl()));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).method("HEAD",
HttpRequest.BodyPublishers.noBody());
HttpResponse<String> httpResponse = send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> httpResponse = sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString());
return HttpStatus.OK.getCode().equals(httpResponse.statusCode());
}

Expand All @@ -162,9 +185,9 @@ public Boolean isAlive() throws ReductException {
* @return AccessTokens
*/
public AccessTokens tokens() throws ReductException {
URI uri = URI.create("%s/%s".formatted(getServerProperties().url(), TokenURL.GET_TOKENS.getUrl()));
URI uri = URI.create(S_S.formatted(getServerProperties().url(), TokenURL.GET_TOKENS.getUrl()));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<String> httpResponse = send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> httpResponse = sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString());
return JsonUtils.parseObject(httpResponse.body(), AccessTokens.class);
}

Expand Down Expand Up @@ -201,9 +224,9 @@ public AccessToken createToken(String tokenName, TokenPermissions permissions)
String createTokenBody = JsonUtils.serialize(permissions);

HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create("%s/%s".formatted(serverProperties.url(), createTokenPath)))
.uri(URI.create(S_S.formatted(serverProperties.url(), createTokenPath)))
.POST(HttpRequest.BodyPublishers.ofString(createTokenBody));
HttpResponse<String> response = send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> response = sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString());
return JsonUtils.parseObject(response.body(), AccessToken.class);
}
}
34 changes: 21 additions & 13 deletions src/main/java/store/reduct/model/bucket/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public Bucket read() throws ReductException, IllegalArgumentException {
String createBucketPath = BucketURL.GET_BUCKET.getUrl().formatted(name);
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(URI.create("%s/%s".formatted(reductClient.getServerProperties().url(), createBucketPath))).GET();
HttpResponse<String> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> httpResponse = reductClient.sendAndGetOnlySuccess(builder,
HttpResponse.BodyHandlers.ofString());
BucketMapper.INSTANCE.copy(this, JsonUtils.parseObject(httpResponse.body(), Bucket.class));
return this;
}
Expand Down Expand Up @@ -134,11 +135,13 @@ public void setSettings(BucketSettings bucketSettings) throws ReductException, I
URI uri = URI.create("%s/%s".formatted(reductClient.getServerProperties().url(), createBucketPath));
HttpRequest.Builder httpRequest = HttpRequest.newBuilder().uri(uri)
.PUT(HttpRequest.BodyPublishers.ofString(JsonUtils.serialize(bucketSettings)));
reductClient.send(httpRequest, HttpResponse.BodyHandlers.discarding()); // TODO ask about default settings. The
// answer from DB always is empty for
// success, but settings sets as
// default. This bucket will always have
// settings as null until invoke read.
reductClient.sendAndGetOnlySuccess(httpRequest, HttpResponse.BodyHandlers.discarding()); // TODO ask about
// default settings.
// The
// answer from DB always is empty for
// success, but settings sets as
// default. This bucket will always have
// settings as null until invoke read.
}

/**
Expand Down Expand Up @@ -174,7 +177,7 @@ public void writeRecord(@NonNull Record record) throws ReductException, IllegalA
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).header(getContentTypeHeader(), record.getType())
.POST(HttpRequest.BodyPublishers.ofByteArray(record.getBody()));

reductClient.send(builder, HttpResponse.BodyHandlers.ofString()).body();
reductClient.sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString()).body();
}

/**
Expand Down Expand Up @@ -205,7 +208,7 @@ public void writeRecords(String entryName, Iterator<Record> records)
}
if (Objects.nonNull(body)) {
builder.POST(HttpRequest.BodyPublishers.ofByteArray(body));
reductClient.send(builder, HttpResponse.BodyHandlers.ofString()).body();
reductClient.sendAndGetOnlySuccess(builder, HttpResponse.BodyHandlers.ofString()).body();
}
}

Expand All @@ -226,7 +229,8 @@ public Record readRecord(String entryName, Long timestamp) throws ReductExceptio
URI uri = URI.create(reductClient.getServerProperties().url()
+ String.format(RecordURL.GET_ENTRY.getUrl(), name, entryName) + timeStampQuery);
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<byte[]> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofByteArray());
HttpResponse<byte[]> httpResponse = reductClient.sendAndGetOnlySuccess(builder,
HttpResponse.BodyHandlers.ofByteArray());
return Record.builder().body(httpResponse.body()).entryName(entryName)
.timestamp(httpResponse.headers().firstValue(getXReductTimeHeader()).map(Long::parseLong)
.orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
Expand Down Expand Up @@ -255,7 +259,8 @@ public Record getMetaInfo(String entryName, Long timestamp) throws ReductExcepti
+ String.format(RecordURL.GET_ENTRY.getUrl(), name, entryName) + timeStampQuery);
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).method("HEAD",
HttpRequest.BodyPublishers.noBody());
HttpResponse<byte[]> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofByteArray());
HttpResponse<byte[]> httpResponse = reductClient.sendAndGetOnlySuccess(builder,
HttpResponse.BodyHandlers.ofByteArray());
return Record.builder().entryName(entryName)
.timestamp(httpResponse.headers().firstValue(getXReductTimeHeader()).map(Long::parseLong)
.orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
Expand Down Expand Up @@ -285,7 +290,8 @@ public Iterator<Record> query(String entryName, Long start, Long stop, Long ttl)
reductClient.getServerProperties().url() + String.format(RecordURL.QUERY.getUrl(), name, entryName)
+ new Queries("start", start).add("stop", stop).add("ttl", ttl));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<String> response = reductClient.send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> response = reductClient.sendAndGetOnlySuccess(builder,
HttpResponse.BodyHandlers.ofString());
QueryId queryId = JsonUtils.parseObject(response.body(), QueryId.class);

return new RecordIterator(name, entryName, queryId.getId(), reductClient.getServerProperties().url());
Expand All @@ -301,7 +307,8 @@ public Iterator<Record> getMetaInfos(String entryName, Long start, Long stop, Lo
reductClient.getServerProperties().url() + String.format(RecordURL.QUERY.getUrl(), name, entryName)
+ new Queries("start", start).add("stop", stop).add("ttl", ttl));
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(uri).GET();
HttpResponse<String> response = reductClient.send(builder, HttpResponse.BodyHandlers.ofString());
HttpResponse<String> response = reductClient.sendAndGetOnlySuccess(builder,
HttpResponse.BodyHandlers.ofString());
QueryId queryId = JsonUtils.parseObject(response.body(), QueryId.class);

return new MetaInfoIterator(name, entryName, queryId.getId(), reductClient.getServerProperties().url());
Expand Down Expand Up @@ -388,7 +395,8 @@ public Record next() {
HeaderInstance instance = headerInstances.poll();
ByteBuffer byteBuffer = ByteBuffer.wrap(body);
byte[] nextBody = new byte[instance.length];
byteBuffer.get(nextBody, instance.getOffset(), instance.getLength());
byteBuffer.position(instance.getOffset());
byteBuffer.get(nextBody, 0, instance.getLength());

return Record.builder().body(nextBody).entryName(recordEntryName).timestamp(instance.getTs())
.type(instance.getType()).length(instance.getLength()).build();
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/store/reduct/model/bucket/BucketSettings.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package store.reduct.model.bucket;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

/**
* Configuration for a bucket
*/
@Getter
@Setter
@NoArgsConstructor
Expand All @@ -13,17 +17,34 @@
@AllArgsConstructor
public class BucketSettings {

/**
* Max block size in bytes
*/
@JsonProperty("max_block_size")
private Integer maxBlockSize;

/**
* Max number of records in a block
*/
@JsonProperty("max_block_records")
private Integer maxBlockRecords;

/**
* Quota type
*/
@JsonProperty("quota_type")
@JsonFormat(shape = JsonFormat.Shape.STRING)
private QuotaType quotaType;

/**
* Quota size in bytes
*/
@JsonProperty("quota_size")
private Integer quotaSize;

/**
* The client raises no exception if the bucket already exists and returns it
*/
@JsonIgnore
private Boolean exists;
}
92 changes: 92 additions & 0 deletions src/test/java/store/reduct/client/ReductClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package store.reduct.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.util.Map;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import store.reduct.client.config.ServerProperties;
import store.reduct.common.exception.ReductException;
import store.reduct.model.bucket.Bucket;
import store.reduct.model.bucket.BucketSettings;
import store.reduct.utils.http.HttpStatus;

@ExtendWith(MockitoExtension.class)
class ReductClientTest {

@Nested
class BucketSettingsTest {
@Mock
private HttpClient httpClient;
@Mock
HttpResponse httpResponse;

@ParameterizedTest(name = "Should return bucket with an expected name, client and settings if exists = {0}")
@CsvSource(value = {"200, null", "200, false"}, nullValues = "null")
void test1(Integer statusCode, Boolean exists) throws IOException, InterruptedException {
// Init
String expectedBucketName = "testBucket";
BucketSettings expectedBucketSettings = BucketSettings.builder().maxBlockSize(Integer.MAX_VALUE)
.exists(exists).build();
ReductClient expectedClient = new ReductClient(ServerProperties.builder().url("http://testUrl").build(),
httpClient);
when(httpClient.send(any(), any())).thenReturn(httpResponse);
when(httpResponse.statusCode()).thenReturn(statusCode);
// Act
Bucket testBucket = expectedClient.createBucket(expectedBucketName, expectedBucketSettings);
// Assert
assertThat(testBucket.getBucketSettings()).isEqualTo(expectedBucketSettings);
assertThat(testBucket.getName()).isEqualTo(expectedBucketName);
assertThat(testBucket.getReductClient()).isEqualTo(expectedClient);
}
@ParameterizedTest(name = "Should throw an exception if status = {0}")
@EnumSource(value = HttpStatus.class, mode = EnumSource.Mode.EXCLUDE, names = {"OK", "CONFLICT"})
void test2(HttpStatus status) throws IOException, InterruptedException {
// Init
String expectedBucketName = "testBucket";
BucketSettings expectedBucketSettings = BucketSettings.builder().maxBlockSize(Integer.MAX_VALUE)
.exists(true).build();
ReductClient expectedClient = new ReductClient(ServerProperties.builder().url("http://testUrl").build(),
httpClient);
when(httpClient.send(any(), any())).thenReturn(httpResponse);
when(httpResponse.statusCode()).thenReturn(status.getCode());
when(httpResponse.headers()).thenReturn(HttpHeaders.of(Map.of(), (l, r) -> true));
// Act
AbstractThrowableAssert<?, ? extends Throwable> anAssert = assertThatThrownBy(
() -> expectedClient.createBucket(expectedBucketName, expectedBucketSettings));
// Assert
anAssert.isInstanceOf(ReductException.class);
}
@ParameterizedTest(name = "Should return bucket with an expected name, client and settings if status = {0}")
@EnumSource(value = HttpStatus.class, mode = EnumSource.Mode.INCLUDE, names = {"OK", "CONFLICT"})
void test3(HttpStatus status) throws IOException, InterruptedException {
// Init
String expectedBucketName = "testBucket";
BucketSettings expectedBucketSettings = BucketSettings.builder().maxBlockSize(Integer.MAX_VALUE)
.exists(true).build();
ReductClient expectedClient = new ReductClient(ServerProperties.builder().url("http://testUrl").build(),
httpClient);
when(httpClient.send(any(), any())).thenReturn(httpResponse);
when(httpResponse.statusCode()).thenReturn(status.getCode());
// Act
Bucket testBucket = expectedClient.createBucket(expectedBucketName, expectedBucketSettings);
// Assert
assertThat(testBucket.getBucketSettings()).isEqualTo(expectedBucketSettings);
assertThat(testBucket.getName()).isEqualTo(expectedBucketName);
assertThat(testBucket.getReductClient()).isEqualTo(expectedClient);
}
}
}

0 comments on commit 1a99abd

Please sign in to comment.