Skip to content

Commit

Permalink
Add interactive queries for Window stores (#243)
Browse files Browse the repository at this point in the history
* Init window store

* Add interactive queries for Window stores

* Add interactive queries for Window stores

* Add timeFrom and timeTo parameters

* Add tests

* Use constants

* Unit tests

* sonar
  • Loading branch information
loicgreffier authored Oct 6, 2024
1 parent ac806e8 commit e39288d
Show file tree
Hide file tree
Showing 13 changed files with 1,802 additions and 567 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.michelin.kstreamplify.server;

import static com.michelin.kstreamplify.service.InteractiveQueriesService.DEFAULT_STORE_PATH;
import static com.michelin.kstreamplify.service.KubernetesService.DEFAULT_LIVENESS_PATH;
import static com.michelin.kstreamplify.service.KubernetesService.DEFAULT_READINESS_PATH;
import static com.michelin.kstreamplify.service.KubernetesService.LIVENESS_PATH_PROPERTY_NAME;
Expand All @@ -14,29 +13,42 @@
import com.michelin.kstreamplify.exception.HttpServerException;
import com.michelin.kstreamplify.exception.UnknownKeyException;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.service.InteractiveQueriesService;
import com.michelin.kstreamplify.service.KubernetesService;
import com.michelin.kstreamplify.service.TopologyService;
import com.michelin.kstreamplify.service.interactivequeries.KeyValueStoreService;
import com.michelin.kstreamplify.service.interactivequeries.WindowStoreService;
import com.michelin.kstreamplify.store.StreamsMetadata;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;

/**
* Kafka Streams HTTP server.
*/
public class KafkaStreamsHttpServer {
private static final String DEFAULT_STORE_PATH = "store";
private static final String DEFAULT_KEY_VALUE_STORE_PATH = "key-value";
private static final String DEFAULT_WINDOW_STORE_PATH = "window";
private static final String TIME_FROM_REQUEST_PARAM = "timeFrom";
private static final String TIME_TO_REQUEST_PARAM = "timeTo";
private final KafkaStreamsInitializer kafkaStreamsInitializer;
private final ObjectMapper objectMapper;
private final KubernetesService kubernetesService;
private final TopologyService topologyService;
private final InteractiveQueriesService interactiveQueriesService;
private final KeyValueStoreService keyValueStoreService;
private final WindowStoreService windowStoreService;

/**
* The HTTP server.
Expand All @@ -53,7 +65,8 @@ public KafkaStreamsHttpServer(KafkaStreamsInitializer kafkaStreamsInitializer) {
this.objectMapper = new ObjectMapper();
this.kubernetesService = new KubernetesService(kafkaStreamsInitializer);
this.topologyService = new TopologyService(kafkaStreamsInitializer);
this.interactiveQueriesService = new InteractiveQueriesService(kafkaStreamsInitializer);
this.keyValueStoreService = new KeyValueStoreService(kafkaStreamsInitializer);
this.windowStoreService = new WindowStoreService(kafkaStreamsInitializer);
}

/**
Expand Down Expand Up @@ -141,13 +154,13 @@ private void createStoreEndpoints() {

private Object getResponseForStoreEndpoints(HttpExchange exchange) {
if (exchange.getRequestURI().toString().equals("/" + DEFAULT_STORE_PATH)) {
return interactiveQueriesService.getStateStores();
return keyValueStoreService.getStateStores();
}

String store;
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH + "/metadata/.*")) {
store = parsePathParam(exchange, 3);
return interactiveQueriesService.getStreamsMetadataForStore(store)
return keyValueStoreService.getStreamsMetadataForStore(store)
.stream()
.map(streamsMetadata -> new StreamsMetadata(
streamsMetadata.stateStoreNames(),
Expand All @@ -156,19 +169,60 @@ private Object getResponseForStoreEndpoints(HttpExchange exchange) {
.toList();
}

if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH + "/local/.*")) {
store = parsePathParam(exchange, 3);
return interactiveQueriesService.getAllOnLocalhost(store);
if (exchange.getRequestURI().toString()
.matches("/" + DEFAULT_STORE_PATH + "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/local/.*")) {
store = parsePathParam(exchange, 4);
return keyValueStoreService.getAllOnLocalHost(store);
}

store = parsePathParam(exchange, 2);
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH + "/.*/.*")) {
String key = parsePathParam(exchange, 3);
return interactiveQueriesService.getByKey(store, key);
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/local/.*")) {
store = parsePathParam(exchange, 4);
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getAllOnLocalHost(store, instantFrom, instantTo);
}

if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH + "/.*")) {
return interactiveQueriesService.getAll(store);
store = parsePathParam(exchange, 3);
if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/.*/.*")) {
String key = parsePathParam(exchange, 4);
return keyValueStoreService.getByKey(store, key);
}

if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_KEY_VALUE_STORE_PATH + "/.*")) {
return keyValueStoreService.getAll(store);
}

if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/.*/.*")) {
String key = parsePathParam(exchange, 4);
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getByKey(store, key, instantFrom, instantTo);
}

if (exchange.getRequestURI().toString().matches("/" + DEFAULT_STORE_PATH
+ "/" + DEFAULT_WINDOW_STORE_PATH + "/.*")) {
Instant instantFrom = parseRequestParam(exchange, TIME_FROM_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.EPOCH);

Instant instantTo = parseRequestParam(exchange, TIME_TO_REQUEST_PARAM)
.map(Instant::parse)
.orElse(Instant.now());
return windowStoreService.getAll(store, instantFrom, instantTo);
}

return null;
Expand All @@ -181,6 +235,26 @@ private String parsePathParam(HttpExchange exchange, int index) {
.split("/")[index];
}

private Optional<String> parseRequestParam(HttpExchange exchange, String key) {
String[] uriAndParams = exchange.getRequestURI()
.toString()
.split("\\?");

if (uriAndParams.length == 1) {
return Optional.empty();
}

List<String> params = Arrays.asList(uriAndParams[1]
.split("&"));

Map<String, String> keyValue = params
.stream()
.map(param -> param.split("="))
.collect(Collectors.toMap(param -> param[0], param -> param[1]));

return Optional.ofNullable(keyValue.get(key));
}

/**
* Callback to override in case of adding endpoints.
*
Expand Down
Loading

0 comments on commit e39288d

Please sign in to comment.