diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java index 0578f0f..b7a7f31 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java @@ -19,6 +19,7 @@ import com.michelin.kstreamplify.integration.container.KafkaIntegrationTest; import com.michelin.kstreamplify.serde.SerdesUtils; import com.michelin.kstreamplify.service.interactivequeries.KeyValueStoreService; +import com.michelin.kstreamplify.service.interactivequeries.WindowStoreService; import com.michelin.kstreamplify.store.StateStoreRecord; import com.michelin.kstreamplify.store.StreamsMetadata; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -63,6 +64,7 @@ @Testcontainers class InteractiveQueriesIntegrationTest extends KafkaIntegrationTest { private final KeyValueStoreService keyValueStoreService = new KeyValueStoreService(initializer); + private final WindowStoreService windowStoreService = new WindowStoreService(initializer); @BeforeAll static void globalSetUp() throws ExecutionException, InterruptedException { @@ -280,6 +282,42 @@ void shouldGetByKeyInStringAvroTimestampedKeyValueStore() throws IOException, In assertNotNull(body.getTimestamp()); } + @Test + void shouldGetByKeyInStringAvroWindowStore() throws IOException, InterruptedException { + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8081/store/window/STRING_AVRO_WINDOW_STORE/person")) + .GET() + .build(); + + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + List body = objectMapper.readValue(response.body(), new TypeReference<>() {}); + + assertEquals(200, response.statusCode()); + assertEquals("person", body.get(0).getKey()); + assertEquals(1, ((Map) body.get(0).getValue()).get("id")); + assertEquals("John", ((Map) body.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) body.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) body.get(0).getValue()).get("birthDate")); + assertNull(body.get(0).getTimestamp()); + } + + @Test + void shouldGetByKeyInStringAvroWindowStoreFromService() { + List stateStoreRecord = windowStoreService.getByKey( + "STRING_AVRO_WINDOW_STORE", + "person", + Instant.EPOCH, + Instant.now() + ); + + assertEquals("person", stateStoreRecord.get(0).getKey()); + assertEquals(1L, ((Map) stateStoreRecord.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateStoreRecord.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateStoreRecord.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateStoreRecord.get(0).getValue()).get("birthDate")); + assertNull(stateStoreRecord.get(0).getTimestamp()); + } + @Test void shouldGetAllInStringStringKeyValueStore() throws IOException, InterruptedException { HttpRequest request = HttpRequest.newBuilder() @@ -296,10 +334,16 @@ void shouldGetAllInStringStringKeyValueStore() throws IOException, InterruptedEx assertNull(body.get(0).getTimestamp()); } - @Test - void shouldGetAllInStringAvroKeyValueStore() throws IOException, InterruptedException { + @ParameterizedTest + @CsvSource({ + "http://localhost:8081/store/key-value/STRING_AVRO_STORE", + "http://localhost:8081/store/key-value/local/STRING_AVRO_STORE", + "http://localhost:8081/store/window/STRING_AVRO_WINDOW_STORE", + "http://localhost:8081/store/window/local/STRING_AVRO_WINDOW_STORE" + }) + void shouldGetAllInStringAvroStores(String url) throws IOException, InterruptedException { HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:8081/store/key-value/STRING_AVRO_STORE")) + .uri(URI.create(url)) .GET() .build(); @@ -315,22 +359,17 @@ void shouldGetAllInStringAvroKeyValueStore() throws IOException, InterruptedExce assertNull(body.get(0).getTimestamp()); } - @Test - void shouldGetAllInStringAvroKeyValueStoreFromService() { - List stateQueryData = keyValueStoreService.getAll("STRING_AVRO_STORE"); - - assertEquals("person", stateQueryData.get(0).getKey()); - assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); - assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); - assertNull(stateQueryData.get(0).getTimestamp()); - } - - @Test - void shouldGetAllInStringAvroTimestampedKeyValueStore() throws IOException, InterruptedException { + @ParameterizedTest + @CsvSource({ + "http://localhost:8081/store/key-value/STRING_AVRO_TIMESTAMPED_STORE", + "http://localhost:8081/store/key-value/local/STRING_AVRO_TIMESTAMPED_STORE", + "http://localhost:8081/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE", + "http://localhost:8081/store/window/local/STRING_AVRO_TIMESTAMPED_WINDOW_STORE", + "http://localhost:8081/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE/person" + }) + void shouldGetWithTimestamp(String url) throws IOException, InterruptedException { HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:8081/store/key-value/STRING_AVRO_TIMESTAMPED_STORE")) + .uri(URI.create(url)) .GET() .build(); @@ -347,19 +386,28 @@ void shouldGetAllInStringAvroTimestampedKeyValueStore() throws IOException, Inte } @Test - void shouldGetAllOnLocalHostInStringStringKeyValueStore() throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create("http://localhost:8081/store/key-value/local/STRING_STRING_STORE")) - .GET() - .build(); + void shouldGetAllInStringAvroKeyValueStoreFromService() { + List stateQueryData = keyValueStoreService.getAll("STRING_AVRO_STORE"); - HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - List body = objectMapper.readValue(response.body(), new TypeReference<>() {}); + assertEquals("person", stateQueryData.get(0).getKey()); + assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); + assertNull(stateQueryData.get(0).getTimestamp()); + } - assertEquals(200, response.statusCode()); - assertEquals("person", body.get(0).getKey()); - assertEquals("Doe", body.get(0).getValue()); - assertNull(body.get(0).getTimestamp()); + @Test + void shouldGetAllInStringAvroWindowStoreFromService() { + List stateQueryData = windowStoreService + .getAll("STRING_AVRO_WINDOW_STORE", Instant.EPOCH, Instant.now()); + + assertEquals("person", stateQueryData.get(0).getKey()); + assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); + assertNull(stateQueryData.get(0).getTimestamp()); } /** diff --git a/kstreamplify-spring-boot/pom.xml b/kstreamplify-spring-boot/pom.xml index 7c771ab..2f3fb30 100644 --- a/kstreamplify-spring-boot/pom.xml +++ b/kstreamplify-spring-boot/pom.xml @@ -12,7 +12,6 @@ kstreamplify-spring-boot - 1.0.3-SNAPSHOT 2.6.0 @@ -52,7 +51,7 @@ com.michelin kstreamplify-core - ${kstreamplify-core.version} + 1.0.3-SNAPSHOT diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/controller/InteractiveQueriesControllerTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/controller/InteractiveQueriesControllerTest.java index 9ff0806..a4589e3 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/controller/InteractiveQueriesControllerTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/controller/InteractiveQueriesControllerTest.java @@ -9,7 +9,6 @@ import com.michelin.kstreamplify.service.interactivequeries.KeyValueStoreService; import com.michelin.kstreamplify.service.interactivequeries.WindowStoreService; import com.michelin.kstreamplify.store.StateStoreRecord; -import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.Set; diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java index a851ace..4bf4b67 100644 --- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integration/InteractiveQueriesIntegrationTest.java @@ -20,6 +20,7 @@ import com.michelin.kstreamplify.store.StateStoreRecord; import com.michelin.kstreamplify.store.StreamsMetadata; import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -252,66 +253,38 @@ void shouldGetByKeyInStringAvroTimestampedKeyValueStore() { } @Test - void shouldGetAllInStringStringKeyValueStore() { - ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/key-value/STRING_STRING_STORE", GET, null, new ParameterizedTypeReference<>() { - }); - - assertEquals(200, response.getStatusCode().value()); - assertNotNull(response.getBody()); - assertEquals("person", response.getBody().get(0).getKey()); - assertEquals("Doe", response.getBody().get(0).getValue()); - assertNull(response.getBody().get(0).getTimestamp()); - } - - @Test - void shouldGetAllInStringAvroKeyValueStore() { + void shouldGetByKeyInStringAvroTimestampedWindowStore() { ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/key-value/STRING_AVRO_STORE", GET, null, new ParameterizedTypeReference<>() { - }); + .exchange("http://localhost:8085/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE/person", GET, null, + new ParameterizedTypeReference<>() {}); assertEquals(200, response.getStatusCode().value()); assertNotNull(response.getBody()); assertEquals("person", response.getBody().get(0).getKey()); - assertEquals(1, ((Map) response.getBody().get(0).getValue()).get("id")); - assertEquals("John", ((Map) response.getBody().get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) response.getBody().get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) response.getBody().get(0).getValue()).get("birthDate")); - assertNull(response.getBody().get(0).getTimestamp()); - } - - @Test - void shouldGetAllInStringAvroKeyValueStoreFromService() { - List stateQueryData = keyValueStoreService.getAll("STRING_AVRO_STORE"); - - assertEquals("person", stateQueryData.get(0).getKey()); - assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); - assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); - assertNull(stateQueryData.get(0).getTimestamp()); + assertEquals(1, ((HashMap) response.getBody().get(0).getValue()).get("id")); + assertEquals("John", ((HashMap) response.getBody().get(0).getValue()).get("firstName")); + assertEquals("Doe", ((HashMap) response.getBody().get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((HashMap) response.getBody().get(0).getValue()).get("birthDate")); + assertNotNull(response.getBody().get(0).getTimestamp()); } @Test - void shouldGetAllInStringAvroTimestampedKeyValueStore() { - ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/key-value/STRING_AVRO_TIMESTAMPED_STORE", GET, null, new ParameterizedTypeReference<>() { - }); + void shouldGetByKeyInStringAvroWindowStoreFromService() { + List stateStoreRecord = windowStoreService + .getByKey("STRING_AVRO_WINDOW_STORE", "person", Instant.EPOCH, Instant.now()); - assertEquals(200, response.getStatusCode().value()); - assertNotNull(response.getBody()); - assertEquals("person", response.getBody().get(0).getKey()); - assertEquals(1, ((Map) response.getBody().get(0).getValue()).get("id")); - assertEquals("John", ((Map) response.getBody().get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) response.getBody().get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) response.getBody().get(0).getValue()).get("birthDate")); - assertNotNull(response.getBody().get(0).getTimestamp()); + assertEquals("person", stateStoreRecord.get(0).getKey()); + assertEquals(1L, ((Map) stateStoreRecord.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateStoreRecord.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateStoreRecord.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateStoreRecord.get(0).getValue()).get("birthDate")); + assertNull(stateStoreRecord.get(0).getTimestamp()); } @Test - void shouldGetAllOnLocalHostInStringStringKeyValueStore() { + void shouldGetAllInStringStringKeyValueStore() { ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/key-value/local/STRING_STRING_STORE", GET, null, new ParameterizedTypeReference<>() { + .exchange("http://localhost:8085/store/key-value/STRING_STRING_STORE", GET, null, new ParameterizedTypeReference<>() { }); assertEquals(200, response.getStatusCode().value()); @@ -321,10 +294,16 @@ void shouldGetAllOnLocalHostInStringStringKeyValueStore() { assertNull(response.getBody().get(0).getTimestamp()); } - @Test - void shouldGetAllInStringAvroWindowStore() { + @ParameterizedTest + @CsvSource({ + "http://localhost:8085/store/key-value/STRING_AVRO_STORE", + "http://localhost:8085/store/key-value/local/STRING_AVRO_STORE", + "http://localhost:8085/store/window/STRING_AVRO_WINDOW_STORE", + "http://localhost:8085/store/window/local/STRING_AVRO_WINDOW_STORE" + }) + void shouldGetAllInStringAvroStores(String url) { ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/window/STRING_AVRO_WINDOW_STORE", GET, null, new ParameterizedTypeReference<>() { + .exchange(url, GET, null, new ParameterizedTypeReference<>() { }); assertEquals(200, response.getStatusCode().value()); @@ -337,23 +316,17 @@ void shouldGetAllInStringAvroWindowStore() { assertNull(response.getBody().get(0).getTimestamp()); } - @Test - void shouldGetAllInStringAvroWindowStoreFromService() { - List stateQueryData = windowStoreService - .getAll("STRING_AVRO_WINDOW_STORE", Instant.EPOCH, Instant.now()); - - assertEquals("person", stateQueryData.get(0).getKey()); - assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); - assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); - assertNull(stateQueryData.get(0).getTimestamp()); - } - - @Test - void shouldGetAllInStringAvroTimestampedWindowStore() { + @ParameterizedTest + @CsvSource({ + "http://localhost:8085/store/key-value/STRING_AVRO_TIMESTAMPED_STORE", + "http://localhost:8085/store/key-value/local/STRING_AVRO_TIMESTAMPED_STORE", + "http://localhost:8085/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE", + "http://localhost:8085/store/window/local/STRING_AVRO_TIMESTAMPED_WINDOW_STORE", + "http://localhost:8085/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE/person" + }) + void shouldGetWithTimestamp(String url) { ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE", GET, null, new ParameterizedTypeReference<>() { + .exchange(url, GET, null, new ParameterizedTypeReference<>() { }); assertEquals(200, response.getStatusCode().value()); @@ -367,64 +340,28 @@ void shouldGetAllInStringAvroTimestampedWindowStore() { } @Test - void shouldGetAllOnLocalHostInStringAvroWindowStore() { - ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/window/local/STRING_AVRO_WINDOW_STORE", GET, null, new ParameterizedTypeReference<>() { - }); - - assertEquals(200, response.getStatusCode().value()); - assertNotNull(response.getBody()); - assertEquals("person", response.getBody().get(0).getKey()); - assertEquals(1, ((Map) response.getBody().get(0).getValue()).get("id")); - assertEquals("John", ((Map) response.getBody().get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) response.getBody().get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) response.getBody().get(0).getValue()).get("birthDate")); - assertNull(response.getBody().get(0).getTimestamp()); - } - - @Test - void shouldGetByKeyInStringAvroWindowStore() { - ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/window/STRING_AVRO_WINDOW_STORE/person", GET, null, new ParameterizedTypeReference<>() { - }); - - assertEquals(200, response.getStatusCode().value()); - assertNotNull(response.getBody()); - assertEquals("person", response.getBody().get(0).getKey()); - assertEquals(1, ((HashMap) response.getBody().get(0).getValue()).get("id")); - assertEquals("John", ((HashMap) response.getBody().get(0).getValue()).get("firstName")); - assertEquals("Doe", ((HashMap) response.getBody().get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((HashMap) response.getBody().get(0).getValue()).get("birthDate")); - assertNull(response.getBody().get(0).getTimestamp()); - } - - @Test - void shouldGetByKeyInStringAvroWindowStoreFromService() { - List stateStoreRecord = windowStoreService - .getByKey("STRING_AVRO_WINDOW_STORE", "person", Instant.EPOCH, Instant.now()); + void shouldGetAllInStringAvroKeyValueStoreFromService() { + List stateQueryData = keyValueStoreService.getAll("STRING_AVRO_STORE"); - assertEquals("person", stateStoreRecord.get(0).getKey()); - assertEquals(1L, ((Map) stateStoreRecord.get(0).getValue()).get("id")); - assertEquals("John", ((Map) stateStoreRecord.get(0).getValue()).get("firstName")); - assertEquals("Doe", ((Map) stateStoreRecord.get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((Map) stateStoreRecord.get(0).getValue()).get("birthDate")); - assertNull(stateStoreRecord.get(0).getTimestamp()); + assertEquals("person", stateQueryData.get(0).getKey()); + assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); + assertNull(stateQueryData.get(0).getTimestamp()); } @Test - void shouldGetByKeyInStringAvroTimestampedWindowStore() { - ResponseEntity> response = restTemplate - .exchange("http://localhost:8085/store/window/STRING_AVRO_TIMESTAMPED_WINDOW_STORE/person", GET, null, - new ParameterizedTypeReference<>() {}); + void shouldGetAllInStringAvroWindowStoreFromService() { + List stateQueryData = windowStoreService + .getAll("STRING_AVRO_WINDOW_STORE", Instant.EPOCH, Instant.now()); - assertEquals(200, response.getStatusCode().value()); - assertNotNull(response.getBody()); - assertEquals("person", response.getBody().get(0).getKey()); - assertEquals(1, ((HashMap) response.getBody().get(0).getValue()).get("id")); - assertEquals("John", ((HashMap) response.getBody().get(0).getValue()).get("firstName")); - assertEquals("Doe", ((HashMap) response.getBody().get(0).getValue()).get("lastName")); - assertEquals("2000-01-01T01:00:00Z", ((HashMap) response.getBody().get(0).getValue()).get("birthDate")); - assertNotNull(response.getBody().get(0).getTimestamp()); + assertEquals("person", stateQueryData.get(0).getKey()); + assertEquals(1L, ((Map) stateQueryData.get(0).getValue()).get("id")); + assertEquals("John", ((Map) stateQueryData.get(0).getValue()).get("firstName")); + assertEquals("Doe", ((Map) stateQueryData.get(0).getValue()).get("lastName")); + assertEquals("2000-01-01T01:00:00Z", ((Map) stateQueryData.get(0).getValue()).get("birthDate")); + assertNull(stateQueryData.get(0).getTimestamp()); } /**