From b53256b3922302cf737e6fafc904cd7cad26186c Mon Sep 17 00:00:00 2001 From: Marco Amann Date: Thu, 2 Jun 2022 10:04:31 +0200 Subject: [PATCH 1/5] added more tests and reworked rest --- reset-handler/pom.xml | 37 +++++ ...tringToEventProcessorServiceConverter.java | 33 +++++ .../main/java/io/axoniq/config/WebConfig.java | 24 ++++ .../EventProcessorRestController.java | 35 +++++ .../{ => controller}/EventRestController.java | 2 +- ...FrameworkEventProcessorRestController.java | 39 ------ .../ServerEventProcessorRestController.java | 50 ------- .../axoniq/service/EventProcessorService.java | 13 ++ .../FrameworkEventProcessorService.java | 54 ++++++++ .../RestEventProcessorService.java} | 36 ++++- .../ServerConnectorEventProcessorService.java | 82 ++++++++++++ ...E_ME_EventProcessorRestControllerTest.java | 32 +++++ .../EventProcessorRestControllerTest.java | 90 +++++++++++++ .../controller/ResetIntegrationTest.java | 126 ++++++++++++++++++ .../src/test/resources/compose-test.yml | 8 ++ 15 files changed, 564 insertions(+), 97 deletions(-) create mode 100644 reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java create mode 100644 reset-handler/src/main/java/io/axoniq/config/WebConfig.java create mode 100644 reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java rename reset-handler/src/main/java/io/axoniq/{ => controller}/EventRestController.java (94%) delete mode 100644 reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java delete mode 100644 reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java create mode 100644 reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java create mode 100644 reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java rename reset-handler/src/main/java/io/axoniq/{server/EventProcessorService.java => service/RestEventProcessorService.java} (73%) create mode 100644 reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java create mode 100644 reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java create mode 100644 reset-handler/src/test/java/io/axoniq/controller/EventProcessorRestControllerTest.java create mode 100644 reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java create mode 100644 reset-handler/src/test/resources/compose-test.yml diff --git a/reset-handler/pom.xml b/reset-handler/pom.xml index d8049611..3dd142b2 100644 --- a/reset-handler/pom.xml +++ b/reset-handler/pom.xml @@ -12,15 +12,52 @@ 0.0.1-SNAPSHOT + + org.axonframework axon-spring-boot-starter + 4.6.0-SNAPSHOT + + + io.axoniq + axonserver-connector-java + 4.6.0-SNAPSHOT org.springframework.boot spring-boot-starter-webflux + + + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.testcontainers + testcontainers + 1.17.2 + test + + + org.testcontainers + junit-jupiter + 1.17.2 + test + + + + io.projectreactor + reactor-test + 3.4.17 + test + diff --git a/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java b/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java new file mode 100644 index 00000000..fe26fc4f --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java @@ -0,0 +1,33 @@ +package io.axoniq.config; + +import io.axoniq.service.FrameworkEventProcessorService; +import io.axoniq.service.ServerConnectorEventProcessorService; +import io.axoniq.service.EventProcessorService; +import io.axoniq.service.RestEventProcessorService; +import org.springframework.core.convert.converter.Converter; +import org.springframework.stereotype.Component; + +@Component +public class StringToEventProcessorServiceConverter implements Converter { + + final RestEventProcessorService restEventProcessorService; + final FrameworkEventProcessorService frameworkEventProcessorRestController; + final ServerConnectorEventProcessorService serverConnectorRestController; + + public StringToEventProcessorServiceConverter(RestEventProcessorService restEventProcessorService, FrameworkEventProcessorService frameworkEventProcessorRestController, ServerConnectorEventProcessorService serverConnectorRestController) { + this.restEventProcessorService = restEventProcessorService; + this.frameworkEventProcessorRestController = frameworkEventProcessorRestController; + this.serverConnectorRestController = serverConnectorRestController; + } + + @Override + public EventProcessorService convert(String from) { + switch (from){ + case "server": return serverConnectorRestController; + case "rest": return restEventProcessorService; + case "grpc": throw new IllegalArgumentException(); + case "framework": return frameworkEventProcessorRestController; + default: throw new IllegalArgumentException(); + } + } +} \ No newline at end of file diff --git a/reset-handler/src/main/java/io/axoniq/config/WebConfig.java b/reset-handler/src/main/java/io/axoniq/config/WebConfig.java new file mode 100644 index 00000000..ded423ba --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/config/WebConfig.java @@ -0,0 +1,24 @@ +package io.axoniq.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.format.FormatterRegistry; +import org.springframework.web.reactive.config.WebFluxConfigurer; + +/** + * Add a Converter from String to EventProcessorService. + * We can for example map `framework` to a FrameworkEventProcessorService + */ +@Configuration +public class WebConfig implements WebFluxConfigurer { + + final StringToEventProcessorServiceConverter converter; + + public WebConfig(StringToEventProcessorServiceConverter converter) { + this.converter = converter; + } + + @Override + public void addFormatters(FormatterRegistry registry) { + registry.addConverter(converter); + } +} \ No newline at end of file diff --git a/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java b/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java new file mode 100644 index 00000000..e045185d --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java @@ -0,0 +1,35 @@ +package io.axoniq.controller; + +import io.axoniq.service.EventProcessorService; +import org.axonframework.config.Configuration; +import org.springframework.util.Assert; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * Uses the EventProcessorService provided to it based oin the supplied method + */ +@RestController +public class EventProcessorRestController { + + @GetMapping("{method}/start/{processorName}") + public Mono start(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) { + Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!"); + return service.start(processorName); + } + + @GetMapping("{method}/pause/{processorName}") + public Mono pause(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) { + Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!"); + return service.pause(processorName); + } + + @GetMapping("{method}/reset/{processorName}") + public Mono reset(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) { + Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!"); + return service.reset(processorName); + } + +} diff --git a/reset-handler/src/main/java/io/axoniq/EventRestController.java b/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java similarity index 94% rename from reset-handler/src/main/java/io/axoniq/EventRestController.java rename to reset-handler/src/main/java/io/axoniq/controller/EventRestController.java index fbafd642..1571073c 100644 --- a/reset-handler/src/main/java/io/axoniq/EventRestController.java +++ b/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java @@ -1,4 +1,4 @@ -package io.axoniq; +package io.axoniq.controller; import org.axonframework.eventhandling.gateway.EventGateway; import org.springframework.web.bind.annotation.GetMapping; diff --git a/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java b/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java deleted file mode 100644 index bcdb5a49..00000000 --- a/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.axoniq.framework; - -import org.axonframework.config.Configuration; -import org.axonframework.eventhandling.StreamingEventProcessor; -import org.springframework.http.ResponseEntity; -import org.springframework.util.Assert; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@RequestMapping("framework") -public class FrameworkEventProcessorRestController { - - private final Configuration configuration; - - public FrameworkEventProcessorRestController(Configuration configuration) { - this.configuration = configuration; - } - - @GetMapping("reset/{processorName}") - public ResponseEntity reset(@PathVariable String processorName) { - Assert.hasLength(processorName, "Processing Group is mandatory and can't be empty!"); - - configuration.eventProcessingConfiguration() - .eventProcessorByProcessingGroup(processorName, - StreamingEventProcessor.class) - .ifPresent(streamingEventProcessor -> { - if (streamingEventProcessor.supportsReset()) { - streamingEventProcessor.shutDown(); - streamingEventProcessor.resetTokens(); - streamingEventProcessor.start(); - } - }); - - return ResponseEntity.ok().build(); - } -} diff --git a/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java b/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java deleted file mode 100644 index a50b7e33..00000000 --- a/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.axoniq.server; - -import org.axonframework.config.Configuration; -import org.axonframework.eventhandling.StreamingEventProcessor; -import org.springframework.util.Assert; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -@RestController -@RequestMapping("server") -public class ServerEventProcessorRestController { - - private final EventProcessorService eventProcessorService; - private final Configuration configuration; - - - public ServerEventProcessorRestController(EventProcessorService eventProcessorService, - Configuration configuration) { - this.eventProcessorService = eventProcessorService; - this.configuration = configuration; - } - - @GetMapping("start/{processorName}") - public Mono start(@PathVariable String processorName) { - return eventProcessorService.start(processorName); - } - - @GetMapping("pause/{processorName}") - public Mono pause(@PathVariable String processorName) { - return eventProcessorService.pause(processorName); - } - - @GetMapping("reset/{processorName}") - public Mono reset(@PathVariable String processorName) { - Assert.hasLength(processorName, "Processor Name is mandatory and can't be empty!"); - StreamingEventProcessor eventProcessor = configuration.eventProcessingConfiguration() - .eventProcessorByProcessingGroup( - processorName, - StreamingEventProcessor.class) - .orElseThrow(IllegalArgumentException::new); - - return eventProcessorService.pause(processorName) - .then(eventProcessorService.awaitTermination(processorName)) - .then(Mono.fromRunnable(eventProcessor::resetTokens)) - .then(eventProcessorService.start(processorName)); - } -} diff --git a/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java new file mode 100644 index 00000000..7536f8cd --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java @@ -0,0 +1,13 @@ +package io.axoniq.service; + +import reactor.core.publisher.Mono; + +public interface EventProcessorService { + Mono pause(String processorName); + + Mono start(String processorName); + + Mono reset(String processorName); + +// Mono awaitTermination(String processorName); +} diff --git a/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java new file mode 100644 index 00000000..87c774d2 --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java @@ -0,0 +1,54 @@ +package io.axoniq.service; + +import org.axonframework.config.Configuration; +import org.axonframework.eventhandling.EventProcessor; +import org.axonframework.eventhandling.StreamingEventProcessor; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +import java.util.concurrent.CompletableFuture; + +@Service +public class FrameworkEventProcessorService implements EventProcessorService { + + private final Configuration configuration; + + public FrameworkEventProcessorService(Configuration configuration) { + this.configuration = configuration; + } + + + @Override + public Mono pause(String processorName) { + return Mono.fromFuture( + configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup(processorName,StreamingEventProcessor.class) + .map(EventProcessor::shutdownAsync) + .orElse(CompletableFuture.completedFuture(null)) + ); + } + + @Override + public Mono start(String processorName) { + configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup(processorName,StreamingEventProcessor.class) + .ifPresent(EventProcessor::start); + return Mono.empty(); + } + + @Override + public Mono reset(String processorName) { + configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup(processorName, + StreamingEventProcessor.class) + .ifPresent(streamingEventProcessor -> { + if (streamingEventProcessor.supportsReset()) { + streamingEventProcessor.shutDown(); + streamingEventProcessor.resetTokens(); + streamingEventProcessor.start(); + } + }); + return Mono.empty(); + } + +} diff --git a/reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java similarity index 73% rename from reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java rename to reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java index 0badccae..0549afbe 100644 --- a/reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java @@ -1,10 +1,12 @@ -package io.axoniq.server; +package io.axoniq.service; +import org.axonframework.config.Configuration; +import org.axonframework.eventhandling.StreamingEventProcessor; import org.axonframework.eventhandling.tokenstore.TokenStore; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -16,18 +18,21 @@ import java.util.function.Function; import java.util.function.Supplier; -@Component -public class EventProcessorService { +@Service +public class RestEventProcessorService implements EventProcessorService { private final WebClient webClient; private final Supplier contextSupplier; private final Supplier componentSupplier; private final Supplier tokenStoreIdSupplier; + private final Configuration configuration; - public EventProcessorService(@Value("${axon.axonserver.context}") String context, - @Value("${axon.axonserver.component-name}") String component, - TokenStore tokenStore) { + + public RestEventProcessorService(@Value("${axon.axonserver.context}") String context, + @Value("${axon.axonserver.component-name}") String component, + TokenStore tokenStore, Configuration configuration) { + this.configuration = configuration; this.webClient = WebClient.create("http://localhost:8024"); this.contextSupplier = () -> context; this.componentSupplier = () -> component; @@ -40,6 +45,7 @@ public EventProcessorService(@Value("${axon.axonserver.context}") String context * @param processorName Name of the processor to be paused. * @return Returns a Mono that completes when the request has been accepted by Axon Server. */ + @Override public Mono pause(String processorName) { return webClient.patch() .uri("/v1/components/{component}/processors/{processor}/pause?context={context}&tokenStoreIdentifier={tokenStoreId}", @@ -59,6 +65,7 @@ public Mono pause(String processorName) { * @param processorName Name of the processor to be started. * @return Returns a Mono that completes when the request has been accepted by Axon Server. */ + @Override public Mono start(String processorName) { return webClient.patch() .uri("/v1/components/{component}/processors/{processor}/start?context={context}&tokenStoreIdentifier={tokenStoreId}", @@ -72,6 +79,20 @@ public Mono start(String processorName) { .then(); } + @Override + public Mono reset(String processorName) { + StreamingEventProcessor eventProcessor = configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup( + processorName, + StreamingEventProcessor.class) + .orElseThrow(IllegalArgumentException::new); + + return this.pause(processorName) + .then(this.awaitTermination(processorName)) + .then(Mono.fromRunnable(eventProcessor::resetTokens)) + .then(this.start(processorName)); + } + /** * Check if the given Processor Name has already terminated. It will retry 10 times, one every second and fail if it * is not terminated yet. To check if it is terminated, the method looks into the activeThreads number. @@ -79,6 +100,7 @@ public Mono start(String processorName) { * @param processorName Name of the processor to wait for termination. * @return Returns a Mono that completes when processor is terminated. */ +// @terminatedOverride public Mono awaitTermination(String processorName) { return webClient.get() .uri("/v1/components/{component}/processors?context={context}", diff --git a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java new file mode 100644 index 00000000..adbda865 --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java @@ -0,0 +1,82 @@ +package io.axoniq.service; + +import io.axoniq.axonserver.connector.AxonServerConnection; +import io.axoniq.axonserver.connector.AxonServerConnectionFactory; +import io.axoniq.axonserver.connector.admin.AdminChannel; +import io.axoniq.axonserver.connector.control.ControlChannel; +import io.axoniq.axonserver.connector.impl.ContextConnection; +import io.axoniq.axonserver.grpc.admin.EventProcessor; +//import org.axonframework.axonserver.connector.AxonServerConnectionManager; +//import org.axonframework.config.Configuration; +//import org.axonframework.eventhandling.StreamingEventProcessor; +import io.axoniq.axonserver.grpc.admin.Result; +import org.axonframework.config.Configuration; +import org.axonframework.eventhandling.StreamingEventProcessor; +import org.axonframework.eventhandling.tokenstore.TokenStore; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.util.function.Supplier; + +/** + * @author Sara Pellegrini + * @since 1.1 + */ +@Service +public class ServerConnectorEventProcessorService implements EventProcessorService { + private final Supplier contextSupplier; + private final Supplier componentSupplier; + private final Configuration configuration; + + public ServerConnectorEventProcessorService(@Value("${axon.axonserver.context}") String context, + @Value("${axon.axonserver.component-name}") String component,Configuration configuration) { + this.configuration = configuration; + this.contextSupplier = () -> context; + this.componentSupplier = () -> component; + } + + @Override + public Mono pause(String processorName) { + System.out.println("Pause via server"); + return null; + } + + @Override + public Mono start(String processorName) { + System.out.println("start via server"); + return null; + } + + @Override + public Mono reset(String processorName) { + + AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient(componentSupplier.get()).build(); + ContextConnection contextConnection = (ContextConnection) connectionFactory.connect(contextSupplier.get()); + + AdminChannel adminChannel = contextConnection.adminChannel(); + + StreamingEventProcessor eventProcessor = configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup( + processorName, + StreamingEventProcessor.class) + .orElseThrow(IllegalArgumentException::new); + + String tokenStoreIdentifier = eventProcessor.getTokenStoreIdentifier(); + + adminChannel + .pauseEventProcessor(processorName, tokenStoreIdentifier) + .thenApply(res -> { + if( res == Result.ACCEPTED){ + throw new RuntimeException("We currently do not support the old asynchronous stop mechanism"); + } + return true; //TODO do + }) + .thenRun(eventProcessor::resetTokens) + .thenRun(() -> adminChannel.startEventProcessor(processorName, tokenStoreIdentifier)); + return null; + } + + +} \ No newline at end of file diff --git a/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java b/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java new file mode 100644 index 00000000..ca5f0008 --- /dev/null +++ b/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java @@ -0,0 +1,32 @@ +package io.axoniq.controller; + +import io.axoniq.service.EventProcessorService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.mockito.Mockito.*; + + +@ExtendWith(MockitoExtension.class) +public class DELETE_ME_EventProcessorRestControllerTest { + + @Mock + private EventProcessorService eventProcessorService; + + @InjectMocks + private EventProcessorRestController eventProcessorRestController; + + @Test + void testControllerCallsServiceAndReturns(){ + when(eventProcessorService.reset("someProcessorName")).thenReturn(Mono.empty()); + Mono result = eventProcessorRestController.reset(eventProcessorService,"someProcessorName"); + verify(eventProcessorService, times(1)).reset("someProcessorName"); + StepVerifier.create(result).expectNext(); + + } +} diff --git a/reset-handler/src/test/java/io/axoniq/controller/EventProcessorRestControllerTest.java b/reset-handler/src/test/java/io/axoniq/controller/EventProcessorRestControllerTest.java new file mode 100644 index 00000000..72dcc251 --- /dev/null +++ b/reset-handler/src/test/java/io/axoniq/controller/EventProcessorRestControllerTest.java @@ -0,0 +1,90 @@ +package io.axoniq.controller; + +import io.axoniq.config.StringToEventProcessorServiceConverter; +import io.axoniq.service.FrameworkEventProcessorService; +import io.axoniq.service.RestEventProcessorService; +import io.axoniq.service.ServerConnectorEventProcessorService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Mono; + +import static org.mockito.Mockito.*; + + +@ExtendWith(SpringExtension.class) +@WebFluxTest(controllers = EventProcessorRestController.class) +@Import(StringToEventProcessorServiceConverter.class) +public class EventProcessorRestControllerTest { + + + @Autowired + private WebTestClient webClient; + + @MockBean + private FrameworkEventProcessorService frameworkEventProcessorService; + @MockBean + private RestEventProcessorService restEventProcessorService; + @MockBean + private ServerConnectorEventProcessorService serverConnectorEventProcessorService; + + @InjectMocks + private EventProcessorRestController eventProcessorRestController; + + @Test + void testframeworkEventProcessorServiceIsCalled() throws Exception { + String requestedProcessorName = "someProcessorName"; + + when(frameworkEventProcessorService.reset(requestedProcessorName)).thenReturn(Mono.empty()); + + this.webClient.get() + .uri("/framework/reset/"+requestedProcessorName) + .exchange() + .expectStatus() + .isOk(); + + verify(frameworkEventProcessorService, times(1)).reset(requestedProcessorName); + verify(restEventProcessorService, times(0)).reset(requestedProcessorName); + verify(serverConnectorEventProcessorService, times(0)).reset(requestedProcessorName); + + } + @Test + void testRestEventProcessorServiceIsCalled() throws Exception { + String requestedProcessorName = "someProcessorName"; + + when(restEventProcessorService.reset(requestedProcessorName)).thenReturn(Mono.empty()); + + this.webClient.get() + .uri("/rest/reset/"+requestedProcessorName) + .exchange() + .expectStatus() + .isOk(); + + verify(frameworkEventProcessorService, times(0)).reset(requestedProcessorName); + verify(restEventProcessorService, times(1)).reset(requestedProcessorName); + verify(serverConnectorEventProcessorService, times(0)).reset(requestedProcessorName); + } + @Test + void testServerConnectorEventProcessorServiceIsCalled() throws Exception { + + String requestedProcessorName = "someProcessorName"; + + when(serverConnectorEventProcessorService.reset(requestedProcessorName)).thenReturn(Mono.empty()); + + this.webClient.get() + .uri("/server/reset/"+requestedProcessorName) + .exchange() + .expectStatus() + .isOk(); + + verify(frameworkEventProcessorService, times(0)).reset(requestedProcessorName); + verify(restEventProcessorService, times(0)).reset(requestedProcessorName); + verify(serverConnectorEventProcessorService, times(1)).reset(requestedProcessorName); + } +} diff --git a/reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java b/reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java new file mode 100644 index 00000000..b4f1ae3f --- /dev/null +++ b/reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java @@ -0,0 +1,126 @@ +package io.axoniq.controller; + +import io.axoniq.MyFakeProjection; +import org.junit.ClassRule; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.File; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class ResetIntegrationTest { + + final static int PORT_A = 8024; + final static int PORT_B = 8124; + + @LocalServerPort + private int port; + + @Autowired + private WebTestClient webClient; + + @MockBean + private MyFakeProjection projection; + + private static final String EVENT_PROCESSOR_NAME="io.axoniq"; + + @ClassRule + @Container + public static DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/compose-test.yml")) + .withExposedService("axonserver", PORT_A, Wait.forListeningPort()) + .withExposedService("axonserver", PORT_B, Wait.forListeningPort()) + .waitingFor("axonserver", Wait.forLogMessage(".*Started AxonServer in .*",1)); + + @Test + void testEventsAreCreated(){ + prepareIntegrationTest(); + Mockito.clearInvocations(projection); + + this.webClient.get().uri("/event/") + .exchange() + .expectStatus() + .isOk(); + + verify(projection, times(1)).on(any()); + } + + + @Test + void testResetFrameworkWorks(){ + prepareIntegrationTest(); + + createEvents(); + + verifyResetEventProcessorByMethod("framework"); + verifyResetEventProcessorByMethod("rest"); +// verifyResetEventProcessorByMethod("server"); + + } + + + void createEvents(){ + Mockito.clearInvocations(projection); + + int amountOfCreatedEvetns = 10; + // create a few events so we have something to reset + for(int i = 0 ; i < amountOfCreatedEvetns; i++){ + this.webClient.get().uri("/event/") + .exchange() + .expectStatus() + .isOk(); + } + verify(projection, times(10)).on(any()); + } + + void verifyResetEventProcessorByMethod(String method){ + Mockito.clearInvocations(projection); + String path = String.format("/%s/reset/%s", method, EVENT_PROCESSOR_NAME); + + this.webClient.get().uri(path) + .exchange() + .expectStatus() + .isOk(); + + // this might be asynchronous, so we have to wait a bit or hook something + waitForAS(); + // there might be events of older tests in the context, therefore use at least instead of times + verify(projection, atLeast(10)).on(any()); + + } + + private void prepareIntegrationTest() { + + // TODO hook internal method? + waitForAS(); + } + + private void waitForAS() { + + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/reset-handler/src/test/resources/compose-test.yml b/reset-handler/src/test/resources/compose-test.yml new file mode 100644 index 00000000..3681bff7 --- /dev/null +++ b/reset-handler/src/test/resources/compose-test.yml @@ -0,0 +1,8 @@ +version: '3.3' +services: + axonserver: + image: axoniq/axonserver + hostname: axonserver + ports: + - '8024:8024' + - '8124:8124' \ No newline at end of file From 9280991a27ac149562ec5a8acb47906595c5a244 Mon Sep 17 00:00:00 2001 From: Sara Pellegrini Date: Thu, 2 Jun 2022 11:53:16 +0200 Subject: [PATCH 2/5] Reset event processor using Admin API - Work in progress. --- .../ServerConnectorEventProcessorService.java | 117 +++++++++++------- 1 file changed, 75 insertions(+), 42 deletions(-) diff --git a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java index adbda865..97e3f64f 100644 --- a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java @@ -1,37 +1,34 @@ package io.axoniq.service; - -import io.axoniq.axonserver.connector.AxonServerConnection; + import io.axoniq.axonserver.connector.AxonServerConnectionFactory; +import io.axoniq.axonserver.connector.ResultStreamPublisher; import io.axoniq.axonserver.connector.admin.AdminChannel; -import io.axoniq.axonserver.connector.control.ControlChannel; -import io.axoniq.axonserver.connector.impl.ContextConnection; -import io.axoniq.axonserver.grpc.admin.EventProcessor; -//import org.axonframework.axonserver.connector.AxonServerConnectionManager; -//import org.axonframework.config.Configuration; -//import org.axonframework.eventhandling.StreamingEventProcessor; import io.axoniq.axonserver.grpc.admin.Result; import org.axonframework.config.Configuration; import org.axonframework.eventhandling.StreamingEventProcessor; -import org.axonframework.eventhandling.tokenstore.TokenStore; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; +import java.time.Duration; import java.util.function.Supplier; /** - * @author Sara Pellegrini - * @since 1.1 - */ + * @author Sara Pellegrini + * @since 1.1 + */ @Service public class ServerConnectorEventProcessorService implements EventProcessorService { + private final Supplier contextSupplier; private final Supplier componentSupplier; private final Configuration configuration; public ServerConnectorEventProcessorService(@Value("${axon.axonserver.context}") String context, - @Value("${axon.axonserver.component-name}") String component,Configuration configuration) { + @Value("${axon.axonserver.component-name}") String component, + Configuration configuration) { this.configuration = configuration; this.contextSupplier = () -> context; this.componentSupplier = () -> component; @@ -39,44 +36,80 @@ public ServerConnectorEventProcessorService(@Value("${axon.axonserver.context}") @Override public Mono pause(String processorName) { - System.out.println("Pause via server"); - return null; + return pause(processorName, tokenStoreId(processorName)); } @Override public Mono start(String processorName) { - System.out.println("start via server"); - return null; + return start(processorName, tokenStoreId(processorName)); } @Override public Mono reset(String processorName) { + StreamingEventProcessor eventProcessor = eventProcessor(processorName); + String tokenStoreIdentifier = tokenStoreId(processorName); + + return pause(processorName, tokenStoreIdentifier) + .then(resetTokens(eventProcessor)) + .then(start(processorName, tokenStoreIdentifier)); + } + + private Mono resetTokens(StreamingEventProcessor eventProcessor) { + return Mono.fromRunnable(eventProcessor::resetTokens); + } + + private Mono start(String eventProcessorName, String tokenStoreIdentifier) { + return Mono.fromFuture(() -> adminChannel().startEventProcessor(eventProcessorName, tokenStoreIdentifier)) + .filter(Result.SUCCESS::equals) + .switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, true)) + .then(); + } + + private Mono pause(String eventProcessorName, String tokenStoreIdentifier) { + return Mono.fromFuture(() -> adminChannel().pauseEventProcessor(eventProcessorName, tokenStoreIdentifier)) + .filter(Result.SUCCESS::equals) + .switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, false)) + .then(); + } - AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient(componentSupplier.get()).build(); - ContextConnection contextConnection = (ContextConnection) connectionFactory.connect(contextSupplier.get()); - - AdminChannel adminChannel = contextConnection.adminChannel(); - - StreamingEventProcessor eventProcessor = configuration.eventProcessingConfiguration() - .eventProcessorByProcessingGroup( - processorName, - StreamingEventProcessor.class) - .orElseThrow(IllegalArgumentException::new); - - String tokenStoreIdentifier = eventProcessor.getTokenStoreIdentifier(); - - adminChannel - .pauseEventProcessor(processorName, tokenStoreIdentifier) - .thenApply(res -> { - if( res == Result.ACCEPTED){ - throw new RuntimeException("We currently do not support the old asynchronous stop mechanism"); - } - return true; //TODO do - }) - .thenRun(eventProcessor::resetTokens) - .thenRun(() -> adminChannel.startEventProcessor(processorName, tokenStoreIdentifier)); - return null; + /* + Older versions of the Axon Framework return execution ACK immediately, without waiting for EventProcessors to + start or stop. Newer versions return execution ACK after the EventProcessor has been actually started or stopped. + For older clients, we use this method to poll the status of the event processors, to ensure they have started + or stopped. In case you are using a client >= 4.6, this is no longer needed. + */ + private Mono awaitForStatus(String eventProcessorName, String tokenStoreIdentifier, boolean running) { + return Flux.from(new ResultStreamPublisher<>(() -> adminChannel().eventProcessors())) + .filter(eventProcessor -> eventProcessor.getIdentifier().getProcessorName() + .equals(eventProcessorName)) + .filter(eventProcessor -> eventProcessor.getIdentifier().getTokenStoreIdentifier() + .equals(tokenStoreIdentifier)) + .flatMap(eventProcessor -> Flux.fromIterable(eventProcessor.getClientInstanceList())) + .map(clientInstance -> clientInstance.getIsRunning() == running) + .reduce(Boolean::logicalAnd) + .filter(result -> result.equals(true)) + .switchIfEmpty(Mono.error(new RuntimeException(""))) + .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2))) + .thenReturn(Result.SUCCESS); } + private AdminChannel adminChannel() { + AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient(componentSupplier.get()) + .build(); + return connectionFactory.connect(contextSupplier.get()) + .adminChannel(); + } + private String tokenStoreId(String processorName) { + StreamingEventProcessor eventProcessor = eventProcessor(processorName); + return eventProcessor.getTokenStoreIdentifier(); + } + + private StreamingEventProcessor eventProcessor(String processorName) { + return configuration.eventProcessingConfiguration() + .eventProcessorByProcessingGroup( + processorName, + StreamingEventProcessor.class) + .orElseThrow(IllegalArgumentException::new); + } } \ No newline at end of file From 2d54d51d4164b6750e627e62680983754ac85bcf Mon Sep 17 00:00:00 2001 From: Marco Amann Date: Wed, 8 Jun 2022 18:49:44 +0200 Subject: [PATCH 3/5] Simplified examples and added tests --- reset-handler/README.md | 22 +- reset-handler/pom.xml | 5 - .../config/ConfigBasedAdminChannel.java | 28 +++ .../ServerConnectorEventProcessorService.java | 32 +-- .../axoniq/ResetServiceIntegrationTest.java | 95 ++++++++ ...E_ME_EventProcessorRestControllerTest.java | 32 --- ...IntegrationTest.java => ResetE2ETest.java} | 13 +- ...verConnectorEventProcessorServiceTest.java | 218 ++++++++++++++++++ .../src/test/resources/compose-test.yml | 2 +- 9 files changed, 372 insertions(+), 75 deletions(-) create mode 100644 reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java create mode 100644 reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java delete mode 100644 reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java rename reset-handler/src/test/java/io/axoniq/controller/{ResetIntegrationTest.java => ResetE2ETest.java} (91%) create mode 100644 reset-handler/src/test/java/io/axoniq/service/ServerConnectorEventProcessorServiceTest.java diff --git a/reset-handler/README.md b/reset-handler/README.md index 7f06b706..25831321 100644 --- a/reset-handler/README.md +++ b/reset-handler/README.md @@ -1,21 +1,28 @@ # Reset Handler This demo is meant to show how to reset a tracking token. -There are 2 ways of doing that, using Axon Server API or just Axon Framework. +There are 3 ways of doing that, using Axon Server API, just Axon Framework or the Axon Server Connector. +A more detailed description can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token). -### Using Axon Server API +### Using Axon Server REST API When in a distributed environment, one can have several applications connected to Axon Server while sharing the same token store. To be able to reset a token in this scenario, we have to ask Axon Server to pause every known instance of a given Processor Name to be able to reset it and start it back again. -> We recommend checking the [ServerEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java) for more information. +> We recommend checking the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) for more information. ### Using Axon Framework Axon Framework provides another easy way to do it using the `StreamingEventProcessor` methods, namely `shutDown`, `resetTokens` and `start`. When doing it through Axon Framework, the application instance doing the operation should be the one having the claim of the token. -> We recommend checking the [FrameworkEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java) for more information. - +> We recommend checking the [FrameworkEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java) for more information. + +## Using the Axon Server Connector +The Axon Server Connector provides methods to pause and restart an event processor. +This functionality can be combined to reset the event processor as shown in the other examples. + +> We recommend checking the [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java) for more information. + ### Running the application This is a Spring boot application, as such it can be ran as any other standard Spring Boot application. It has a simple `/event` endpoint where you can create new empty events. For resetting the token, it provides 2 reset endpoints: - `/server/reset/{processorName}` @@ -27,6 +34,9 @@ Since Axon Server is a requirement for this sample, a `docker-compose` file is p Also, if you are on Intellij, a `requests.http` file is provided to make it easy to call the endpoints. -Most of the logic for the Axon Server reset is on the [EventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java) class and the added javadoc should be enough to explain what it does. +Most of the logic for the Axon Server reset via REST is on the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) class and the added javadoc should be enough to explain what it does. +In the same way, details for the Axon Server reset via the Server Connector can be found in [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java). For the Axon Framework version, we recommend checking the official [StreamingEventProcessor.java](https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/StreamingEventProcessor.java) documentation. + +A general introduction, regardless of the method used, can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token). \ No newline at end of file diff --git a/reset-handler/pom.xml b/reset-handler/pom.xml index 3dd142b2..5a4f2ef6 100644 --- a/reset-handler/pom.xml +++ b/reset-handler/pom.xml @@ -29,11 +29,6 @@ org.springframework.boot spring-boot-starter-webflux - - - - - org.springframework.boot spring-boot-starter-test diff --git a/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java new file mode 100644 index 00000000..e73f5cde --- /dev/null +++ b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java @@ -0,0 +1,28 @@ +package io.axoniq.config; + +import io.axoniq.axonserver.connector.AxonServerConnectionFactory; +import io.axoniq.axonserver.connector.admin.AdminChannel; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +@Component +public class ConfigBasedAdminChannel { + + private final String contextName; + private final String componentName; + + public ConfigBasedAdminChannel(@Value("${axon.axonserver.context}") String contextName, + @Value("${axon.axonserver.component-name}") String componentName){ + this.contextName = contextName; + this.componentName = componentName; + } + + @Bean + public AdminChannel adminChannel() { + return AxonServerConnectionFactory.forClient(componentName) + .build() + .connect(contextName) + .adminChannel(); + } +} diff --git a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java index 97e3f64f..d09dfe08 100644 --- a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java @@ -1,19 +1,16 @@ package io.axoniq.service; -import io.axoniq.axonserver.connector.AxonServerConnectionFactory; import io.axoniq.axonserver.connector.ResultStreamPublisher; import io.axoniq.axonserver.connector.admin.AdminChannel; import io.axoniq.axonserver.grpc.admin.Result; import org.axonframework.config.Configuration; import org.axonframework.eventhandling.StreamingEventProcessor; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; import java.time.Duration; -import java.util.function.Supplier; /** * @author Sara Pellegrini @@ -22,16 +19,12 @@ @Service public class ServerConnectorEventProcessorService implements EventProcessorService { - private final Supplier contextSupplier; - private final Supplier componentSupplier; private final Configuration configuration; + private final AdminChannel adminChannel; - public ServerConnectorEventProcessorService(@Value("${axon.axonserver.context}") String context, - @Value("${axon.axonserver.component-name}") String component, - Configuration configuration) { + public ServerConnectorEventProcessorService(Configuration configuration, AdminChannel adminChannel) { this.configuration = configuration; - this.contextSupplier = () -> context; - this.componentSupplier = () -> component; + this.adminChannel = adminChannel; } @Override @@ -58,15 +51,15 @@ private Mono resetTokens(StreamingEventProcessor eventProcessor) { return Mono.fromRunnable(eventProcessor::resetTokens); } - private Mono start(String eventProcessorName, String tokenStoreIdentifier) { - return Mono.fromFuture(() -> adminChannel().startEventProcessor(eventProcessorName, tokenStoreIdentifier)) + protected Mono start(String eventProcessorName, String tokenStoreIdentifier) { + return Mono.fromFuture(() -> adminChannel.startEventProcessor(eventProcessorName, tokenStoreIdentifier)) .filter(Result.SUCCESS::equals) .switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, true)) .then(); } - private Mono pause(String eventProcessorName, String tokenStoreIdentifier) { - return Mono.fromFuture(() -> adminChannel().pauseEventProcessor(eventProcessorName, tokenStoreIdentifier)) + protected Mono pause(String eventProcessorName, String tokenStoreIdentifier) { + return Mono.fromFuture(() -> adminChannel.pauseEventProcessor(eventProcessorName, tokenStoreIdentifier)) .filter(Result.SUCCESS::equals) .switchIfEmpty(awaitForStatus(eventProcessorName, tokenStoreIdentifier, false)) .then(); @@ -78,8 +71,8 @@ private Mono pause(String eventProcessorName, String tokenStoreIdentifier) For older clients, we use this method to poll the status of the event processors, to ensure they have started or stopped. In case you are using a client >= 4.6, this is no longer needed. */ - private Mono awaitForStatus(String eventProcessorName, String tokenStoreIdentifier, boolean running) { - return Flux.from(new ResultStreamPublisher<>(() -> adminChannel().eventProcessors())) + protected Mono awaitForStatus(String eventProcessorName, String tokenStoreIdentifier, boolean running) { + return Flux.from(new ResultStreamPublisher<>(adminChannel::eventProcessors)) .filter(eventProcessor -> eventProcessor.getIdentifier().getProcessorName() .equals(eventProcessorName)) .filter(eventProcessor -> eventProcessor.getIdentifier().getTokenStoreIdentifier() @@ -93,12 +86,7 @@ private Mono awaitForStatus(String eventProcessorName, String tokenStore .thenReturn(Result.SUCCESS); } - private AdminChannel adminChannel() { - AxonServerConnectionFactory connectionFactory = AxonServerConnectionFactory.forClient(componentSupplier.get()) - .build(); - return connectionFactory.connect(contextSupplier.get()) - .adminChannel(); - } + private String tokenStoreId(String processorName) { StreamingEventProcessor eventProcessor = eventProcessor(processorName); diff --git a/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java b/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java new file mode 100644 index 00000000..74787ee9 --- /dev/null +++ b/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java @@ -0,0 +1,95 @@ +package io.axoniq; + +import io.axoniq.MyFakeProjection; +import io.axoniq.service.FrameworkEventProcessorService; +import io.axoniq.service.RestEventProcessorService; +import io.axoniq.service.ServerConnectorEventProcessorService; +import org.axonframework.eventhandling.gateway.EventGateway; +import org.junit.ClassRule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class ResetServiceIntegrationTest { + + @Autowired + RestEventProcessorService restEventProcessorService; + @Autowired + FrameworkEventProcessorService frameworkEventProcessorService; + @Autowired + ServerConnectorEventProcessorService serverConnectorEventProcessorService; + @Autowired + EventGateway eventGateway; + @MockBean + private MyFakeProjection projection; + + private static final String EVENT_PROCESSOR_NAME="io.axoniq"; + + @ClassRule + @Container + public static DockerComposeContainer environment = + new DockerComposeContainer(new File("src/test/resources/compose-test.yml")) + .withExposedService("axonserver", 8024, Wait.forListeningPort()) + .withExposedService("axonserver", 8124, Wait.forListeningPort()) + .waitingFor("axonserver", Wait.forLogMessage(".*Started AxonServer in .*",1)); + + + @BeforeEach + void prepare(){ + createEvents(); + Mockito.clearInvocations(projection); + } + + @Test + void verifyResetEventProcessorByFramework(){ + frameworkEventProcessorService.reset(EVENT_PROCESSOR_NAME).block(); + waitForAS(); + verify(projection, atLeast(10)).on(any()); + } + + @Test + void verifyResetEventProcessorByRest(){ + restEventProcessorService.reset(EVENT_PROCESSOR_NAME).block(); + waitForAS(); + verify(projection, atLeast(10)).on(any()); + } + + @Test + void verifyResetEventProcessorByServer(){ + serverConnectorEventProcessorService.reset(EVENT_PROCESSOR_NAME).block(); + waitForAS(); + verify(projection, atLeast(10)).on(any()); + } + + + void createEvents(){ + for(int i = 0 ; i < 10; i++){ + eventGateway.publish(new Object()); + } + waitForAS(); + } + + private void waitForAS() { + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java b/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java deleted file mode 100644 index ca5f0008..00000000 --- a/reset-handler/src/test/java/io/axoniq/controller/DELETE_ME_EventProcessorRestControllerTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.axoniq.controller; - -import io.axoniq.service.EventProcessorService; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import static org.mockito.Mockito.*; - - -@ExtendWith(MockitoExtension.class) -public class DELETE_ME_EventProcessorRestControllerTest { - - @Mock - private EventProcessorService eventProcessorService; - - @InjectMocks - private EventProcessorRestController eventProcessorRestController; - - @Test - void testControllerCallsServiceAndReturns(){ - when(eventProcessorService.reset("someProcessorName")).thenReturn(Mono.empty()); - Mono result = eventProcessorRestController.reset(eventProcessorService,"someProcessorName"); - verify(eventProcessorService, times(1)).reset("someProcessorName"); - StepVerifier.create(result).expectNext(); - - } -} diff --git a/reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java b/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java similarity index 91% rename from reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java rename to reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java index b4f1ae3f..ba9b3353 100644 --- a/reset-handler/src/test/java/io/axoniq/controller/ResetIntegrationTest.java +++ b/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java @@ -2,7 +2,6 @@ import io.axoniq.MyFakeProjection; import org.junit.ClassRule; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; @@ -10,16 +9,12 @@ import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.test.web.reactive.server.WebTestClient; -import org.springframework.web.reactive.function.client.WebClient; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; import java.io.File; -import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; @@ -27,7 +22,7 @@ @Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) -public class ResetIntegrationTest { +public class ResetE2ETest { final static int PORT_A = 8024; final static int PORT_B = 8124; @@ -60,20 +55,20 @@ void testEventsAreCreated(){ .exchange() .expectStatus() .isOk(); - + waitForAS(); verify(projection, times(1)).on(any()); } @Test - void testResetFrameworkWorks(){ + void testResetWorks(){ prepareIntegrationTest(); createEvents(); verifyResetEventProcessorByMethod("framework"); verifyResetEventProcessorByMethod("rest"); -// verifyResetEventProcessorByMethod("server"); + verifyResetEventProcessorByMethod("server"); } diff --git a/reset-handler/src/test/java/io/axoniq/service/ServerConnectorEventProcessorServiceTest.java b/reset-handler/src/test/java/io/axoniq/service/ServerConnectorEventProcessorServiceTest.java new file mode 100644 index 00000000..6772c0cc --- /dev/null +++ b/reset-handler/src/test/java/io/axoniq/service/ServerConnectorEventProcessorServiceTest.java @@ -0,0 +1,218 @@ +package io.axoniq.service; + +import io.axoniq.axonserver.connector.ResultStream; +import io.axoniq.axonserver.connector.admin.AdminChannel; +import io.axoniq.axonserver.grpc.admin.EventProcessor; +import io.axoniq.axonserver.grpc.admin.EventProcessorIdentifier; +import io.axoniq.axonserver.grpc.admin.EventProcessorInstance; +import io.axoniq.axonserver.grpc.admin.Result; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; + + +@ExtendWith(MockitoExtension.class) +class ServerConnectorEventProcessorServiceTest{ + + private final String PROCESSOR_NAME = "someProcessorName"; + private final String STORE_ID = "someTokenStoreIdentifier"; + + private ServerConnectorEventProcessorService eventProcessorService; + + @Test + void pauseDoesNotWaitIfSuccess() { + AdminChannel adminChannel = Mockito.mock(AdminChannel.class); + when(adminChannel.pauseEventProcessor(PROCESSOR_NAME, STORE_ID)) + .thenReturn(CompletableFuture.completedFuture(Result.SUCCESS)); + + eventProcessorService = new ServerConnectorEventProcessorService(null,adminChannel); + + Mono result = eventProcessorService.pause(PROCESSOR_NAME, STORE_ID); + StepVerifier.create(result) + .expectNext() + .verifyComplete(); + + verify(adminChannel, never()).eventProcessors(); + } + + @Test + void waitDoesNotRetryIfReady() { + AdminChannel adminChannel = Mockito.mock(AdminChannel.class); + when(adminChannel.eventProcessors()) + .thenReturn(new DummyResultStream(processors_all_stopped())); + eventProcessorService = new ServerConnectorEventProcessorService(null,adminChannel); + + Mono result = eventProcessorService.awaitForStatus(PROCESSOR_NAME,STORE_ID,false); + StepVerifier.create(result) + .expectNext(Result.SUCCESS) + .verifyComplete(); + + verify(adminChannel, times(1)).eventProcessors(); + } + + @Test + void waitDoesRetryUntilReady() { + AdminChannel adminChannel = Mockito.mock(AdminChannel.class); + when(adminChannel.eventProcessors()) + .thenReturn(new DummyResultStream(processors_all_running())) + .thenReturn(new DummyResultStream(processors_some_running())) + .thenReturn(new DummyResultStream(processors_all_stopped())); + + eventProcessorService = new ServerConnectorEventProcessorService(null,adminChannel); + + Mono result = eventProcessorService.awaitForStatus(PROCESSOR_NAME,STORE_ID,false); + StepVerifier.create(result) + .expectNext(Result.SUCCESS) + .verifyComplete(); + + verify(adminChannel, times(3)).eventProcessors(); + } + + @Test + void waitDoesErrorAfterRetriesExhausted() { + AdminChannel adminChannel = Mockito.mock(AdminChannel.class); + when(adminChannel.eventProcessors()) + .thenReturn(new DummyResultStream(processors_all_running())); + + eventProcessorService = new ServerConnectorEventProcessorService(null,adminChannel); + + Mono result = eventProcessorService.awaitForStatus(PROCESSOR_NAME,STORE_ID,false); + StepVerifier.create(result) + .expectError() + .verify(); + + verify(adminChannel, times(4)).eventProcessors(); + } + + + private List processors_all_stopped() { + EventProcessorInstance epi0 = EventProcessorInstance.newBuilder().setIsRunning(false).build(); + EventProcessorInstance epi1 = EventProcessorInstance.newBuilder().setIsRunning(false).build(); + EventProcessor ep0 = getEventProcessorRelevant(epi0, epi1); + EventProcessor ep1 = getEventProcessorIrrelevant(); + return Arrays.asList(ep0, ep1); + } + + + + private List processors_some_running() { + EventProcessorInstance epi0 = EventProcessorInstance.newBuilder().setIsRunning(false).build(); + EventProcessorInstance epi1 = EventProcessorInstance.newBuilder().setIsRunning(true).build(); + EventProcessor ep0 = getEventProcessorRelevant(epi0, epi1); + EventProcessor ep1 = getEventProcessorIrrelevant(); + return Arrays.asList(ep0, ep1); + } + + private List processors_all_running() { + EventProcessorInstance epi0 = EventProcessorInstance.newBuilder().setIsRunning(true).build(); + EventProcessorInstance epi1 = EventProcessorInstance.newBuilder().setIsRunning(true).build(); + EventProcessor ep0 = getEventProcessorRelevant(epi0, epi1); + EventProcessor ep1 = getEventProcessorIrrelevant(); + return Arrays.asList(ep0, ep1); + } + + @NotNull + private EventProcessor getEventProcessorRelevant(EventProcessorInstance epi0, EventProcessorInstance epi1) { + return EventProcessor.newBuilder().setIdentifier( + EventProcessorIdentifier.newBuilder() + .setProcessorName(PROCESSOR_NAME) + .setTokenStoreIdentifier(STORE_ID) + .build() + ).addAllClientInstance(Arrays.asList( + epi0, epi1 + )).build(); + } + + @NotNull + private EventProcessor getEventProcessorIrrelevant() { + EventProcessorInstance epi2 = EventProcessorInstance.newBuilder().setIsRunning(true).build(); + EventProcessorInstance epi3 = EventProcessorInstance.newBuilder().setIsRunning(false).build(); + return EventProcessor.newBuilder().setIdentifier( + EventProcessorIdentifier.newBuilder() + .setProcessorName("whatever.dont.care") + .setTokenStoreIdentifier("some.other.id") + .build() + ).addAllClientInstance(Arrays.asList( + epi2, epi3 + )).build(); + } + + + static class DummyResultStream implements ResultStream { + + private int index = 0; + private boolean closed=false; + + public DummyResultStream(List procs) { + this.procs = procs; + } + + private final List procs; + + @Override + public EventProcessor peek() { + if(!closed && procs.size()>index){ + return procs.get(index); + } + else{ + closed=true; + return null; + } + } + + @Override + public EventProcessor nextIfAvailable() { + if(!closed && procs.size()>index){ + EventProcessor ep = procs.get(index); + index+=1; + return ep; + } + else{ + closed=true; + return null; + } + } + + @Override + public EventProcessor nextIfAvailable(long timeout, TimeUnit unit) { + return nextIfAvailable(); + } + + @Override + public EventProcessor next() { + return nextIfAvailable(); + } + + @Override + public void onAvailable(Runnable callback) { + callback.run(); + } + + @Override + public void close() { + this.closed=true; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public Optional getError() { + return Optional.empty(); + } + } +} diff --git a/reset-handler/src/test/resources/compose-test.yml b/reset-handler/src/test/resources/compose-test.yml index 3681bff7..d94b229a 100644 --- a/reset-handler/src/test/resources/compose-test.yml +++ b/reset-handler/src/test/resources/compose-test.yml @@ -1,7 +1,7 @@ version: '3.3' services: axonserver: - image: axoniq/axonserver + image: hackyaxonserver hostname: axonserver ports: - '8024:8024' From c6140dd5b3b30de1ab6b352ea27bd2fc564bbcf0 Mon Sep 17 00:00:00 2001 From: Marco Amann Date: Fri, 10 Jun 2022 15:17:17 +0200 Subject: [PATCH 4/5] Adjusted variable names and added comments to make sample code easier to understand --- .../config/ConfigBasedAdminChannel.java | 4 ++++ ...tringToEventProcessorServiceConverter.java | 22 +++++++++++++------ .../EventProcessorRestController.java | 2 +- .../controller/EventRestController.java | 3 +++ .../ServerConnectorEventProcessorService.java | 12 +++++++++- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java index e73f5cde..db3f8228 100644 --- a/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java +++ b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java @@ -6,6 +6,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; + +/* + Creates an admin channel from the configuration. used to simplify testing in other components. + */ @Component public class ConfigBasedAdminChannel { diff --git a/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java b/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java index fe26fc4f..028270f6 100644 --- a/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java +++ b/reset-handler/src/main/java/io/axoniq/config/StringToEventProcessorServiceConverter.java @@ -7,26 +7,34 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.stereotype.Component; +/* + Extract the service using the specified method to reset the token. + This allows for cleaner signatures in the controllers and the services + */ @Component public class StringToEventProcessorServiceConverter implements Converter { final RestEventProcessorService restEventProcessorService; - final FrameworkEventProcessorService frameworkEventProcessorRestController; - final ServerConnectorEventProcessorService serverConnectorRestController; + final FrameworkEventProcessorService frameworkEventProcessorService; + final ServerConnectorEventProcessorService serverConnectorEventProcessorService; - public StringToEventProcessorServiceConverter(RestEventProcessorService restEventProcessorService, FrameworkEventProcessorService frameworkEventProcessorRestController, ServerConnectorEventProcessorService serverConnectorRestController) { + public StringToEventProcessorServiceConverter(RestEventProcessorService restEventProcessorService, FrameworkEventProcessorService frameworkEventProcessorService, ServerConnectorEventProcessorService serverConnectorEventProcessorService) { this.restEventProcessorService = restEventProcessorService; - this.frameworkEventProcessorRestController = frameworkEventProcessorRestController; - this.serverConnectorRestController = serverConnectorRestController; + this.frameworkEventProcessorService = frameworkEventProcessorService; + this.serverConnectorEventProcessorService = serverConnectorEventProcessorService; } + /* + Match the passed string against a set of known constants. + This is not elegant but does get its job done for the sample. + */ @Override public EventProcessorService convert(String from) { switch (from){ - case "server": return serverConnectorRestController; + case "server": return serverConnectorEventProcessorService; case "rest": return restEventProcessorService; case "grpc": throw new IllegalArgumentException(); - case "framework": return frameworkEventProcessorRestController; + case "framework": return frameworkEventProcessorService; default: throw new IllegalArgumentException(); } } diff --git a/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java b/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java index e045185d..5af3ed32 100644 --- a/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java +++ b/reset-handler/src/main/java/io/axoniq/controller/EventProcessorRestController.java @@ -9,7 +9,7 @@ import reactor.core.publisher.Mono; /** - * Uses the EventProcessorService provided to it based oin the supplied method + * Uses the EventProcessorService provided to it based on the supplied method */ @RestController public class EventProcessorRestController { diff --git a/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java b/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java index 1571073c..3a43d992 100644 --- a/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java +++ b/reset-handler/src/main/java/io/axoniq/controller/EventRestController.java @@ -4,6 +4,9 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; +/* + Creates empty events to allow you to see the effects of a reset + */ @RestController public class EventRestController { diff --git a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java index d09dfe08..295b2e52 100644 --- a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java @@ -47,10 +47,17 @@ public Mono reset(String processorName) { .then(start(processorName, tokenStoreIdentifier)); } + /* + * Resets the token of an event processor. The event processor needs to be stopped for this to work + */ private Mono resetTokens(StreamingEventProcessor eventProcessor) { return Mono.fromRunnable(eventProcessor::resetTokens); } + /* + * Starts event processors and ensures that either the axon server waits for them to have started or we wait locally + * for them to reach the desired state (pre 4.6) + */ protected Mono start(String eventProcessorName, String tokenStoreIdentifier) { return Mono.fromFuture(() -> adminChannel.startEventProcessor(eventProcessorName, tokenStoreIdentifier)) .filter(Result.SUCCESS::equals) @@ -58,6 +65,10 @@ protected Mono start(String eventProcessorName, String tokenStoreIdentifie .then(); } + /* + * Stops event processors and ensures that either the axon server waits for them to have stopped or we wait locally + * for them to reach the desired state (pre 4.6) + */ protected Mono pause(String eventProcessorName, String tokenStoreIdentifier) { return Mono.fromFuture(() -> adminChannel.pauseEventProcessor(eventProcessorName, tokenStoreIdentifier)) .filter(Result.SUCCESS::equals) @@ -87,7 +98,6 @@ protected Mono awaitForStatus(String eventProcessorName, String tokenSto } - private String tokenStoreId(String processorName) { StreamingEventProcessor eventProcessor = eventProcessor(processorName); return eventProcessor.getTokenStoreIdentifier(); From b51f8a9547e892ed0568e1de0c46011fe9ac9436 Mon Sep 17 00:00:00 2001 From: Marco Amann Date: Mon, 18 Jul 2022 16:41:20 +0200 Subject: [PATCH 5/5] Using upstream axon server 4.6 --- reset-handler/docker-compose.yml | 2 +- reset-handler/pom.xml | 4 ++-- .../config/ConfigBasedAdminChannel.java | 19 +++++------------- .../axoniq/service/EventProcessorService.java | 1 - .../service/RestEventProcessorService.java | 7 ++++--- .../ServerConnectorEventProcessorService.java | 3 ++- .../src/main/resources/application.yml | 1 + .../axoniq/ResetServiceIntegrationTest.java | 18 +++++++++++++++-- .../io/axoniq/controller/ResetE2ETest.java | 20 +++++++++++++++---- .../src/test/resources/application.yml | 5 +++++ .../src/test/resources/compose-test.yml | 7 ++----- 11 files changed, 54 insertions(+), 33 deletions(-) create mode 100644 reset-handler/src/test/resources/application.yml diff --git a/reset-handler/docker-compose.yml b/reset-handler/docker-compose.yml index 2c9a3859..d41a4cdd 100644 --- a/reset-handler/docker-compose.yml +++ b/reset-handler/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.3' services: axonserver: - image: axoniq/axonserver + image: 'axoniq/axonserver:4.6.0-dev' hostname: axonserver ports: - '8024:8024' diff --git a/reset-handler/pom.xml b/reset-handler/pom.xml index 5a4f2ef6..63e5a855 100644 --- a/reset-handler/pom.xml +++ b/reset-handler/pom.xml @@ -18,12 +18,12 @@ org.axonframework axon-spring-boot-starter - 4.6.0-SNAPSHOT + 4.5.14 io.axoniq axonserver-connector-java - 4.6.0-SNAPSHOT + 4.6.1 org.springframework.boot diff --git a/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java index db3f8228..039aefa2 100644 --- a/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java +++ b/reset-handler/src/main/java/io/axoniq/config/ConfigBasedAdminChannel.java @@ -1,8 +1,7 @@ package io.axoniq.config; -import io.axoniq.axonserver.connector.AxonServerConnectionFactory; import io.axoniq.axonserver.connector.admin.AdminChannel; -import org.springframework.beans.factory.annotation.Value; +import org.axonframework.axonserver.connector.AxonServerConnectionManager; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @@ -12,21 +11,13 @@ */ @Component public class ConfigBasedAdminChannel { - - private final String contextName; - private final String componentName; - - public ConfigBasedAdminChannel(@Value("${axon.axonserver.context}") String contextName, - @Value("${axon.axonserver.component-name}") String componentName){ - this.contextName = contextName; - this.componentName = componentName; + public ConfigBasedAdminChannel(AxonServerConnectionManager axonServerConnectionManager){ + this.axonServerConnectionManager = axonServerConnectionManager; } + private final AxonServerConnectionManager axonServerConnectionManager; @Bean public AdminChannel adminChannel() { - return AxonServerConnectionFactory.forClient(componentName) - .build() - .connect(contextName) - .adminChannel(); + return axonServerConnectionManager.getConnection().adminChannel(); } } diff --git a/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java index 7536f8cd..145dc694 100644 --- a/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/EventProcessorService.java @@ -9,5 +9,4 @@ public interface EventProcessorService { Mono reset(String processorName); -// Mono awaitTermination(String processorName); } diff --git a/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java index 0549afbe..9deb72d5 100644 --- a/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java @@ -31,9 +31,11 @@ public class RestEventProcessorService implements EventProcessorService { public RestEventProcessorService(@Value("${axon.axonserver.context}") String context, @Value("${axon.axonserver.component-name}") String component, - TokenStore tokenStore, Configuration configuration) { + @Value("${axon.axonserver.http-url}") String httpUrl, + TokenStore tokenStore, + Configuration configuration) { this.configuration = configuration; - this.webClient = WebClient.create("http://localhost:8024"); + this.webClient = WebClient.create(httpUrl!=null?httpUrl:"http://localhost:8024"); this.contextSupplier = () -> context; this.componentSupplier = () -> component; this.tokenStoreIdSupplier = () -> tokenStore.retrieveStorageIdentifier().get(); @@ -100,7 +102,6 @@ public Mono reset(String processorName) { * @param processorName Name of the processor to wait for termination. * @return Returns a Mono that completes when processor is terminated. */ -// @terminatedOverride public Mono awaitTermination(String processorName) { return webClient.get() .uri("/v1/components/{component}/processors?context={context}", diff --git a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java index 295b2e52..487202a3 100644 --- a/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java +++ b/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java @@ -22,7 +22,8 @@ public class ServerConnectorEventProcessorService implements EventProcessorServi private final Configuration configuration; private final AdminChannel adminChannel; - public ServerConnectorEventProcessorService(Configuration configuration, AdminChannel adminChannel) { + public ServerConnectorEventProcessorService(Configuration configuration, + AdminChannel adminChannel) { this.configuration = configuration; this.adminChannel = adminChannel; } diff --git a/reset-handler/src/main/resources/application.yml b/reset-handler/src/main/resources/application.yml index 03b21bbf..9f213495 100644 --- a/reset-handler/src/main/resources/application.yml +++ b/reset-handler/src/main/resources/application.yml @@ -2,4 +2,5 @@ axon: axonserver: context: "default" component-name: "component" + http-url: "http://127.0.0.1:8024" diff --git a/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java b/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java index 74787ee9..fcf0500b 100644 --- a/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java +++ b/reset-handler/src/test/java/io/axoniq/ResetServiceIntegrationTest.java @@ -12,6 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; @@ -40,15 +42,27 @@ public class ResetServiceIntegrationTest { private static final String EVENT_PROCESSOR_NAME="io.axoniq"; + private static final int HTTP_PORT = 8024; + private static final int GRPC_PORT = 8124; + @ClassRule @Container public static DockerComposeContainer environment = new DockerComposeContainer(new File("src/test/resources/compose-test.yml")) - .withExposedService("axonserver", 8024, Wait.forListeningPort()) - .withExposedService("axonserver", 8124, Wait.forListeningPort()) + .withExposedService("axonserver", HTTP_PORT, Wait.forListeningPort()) + .withExposedService("axonserver", GRPC_PORT, Wait.forListeningPort()) .waitingFor("axonserver", Wait.forLogMessage(".*Started AxonServer in .*",1)); + @DynamicPropertySource + public static void properties(DynamicPropertyRegistry registry) { + + int grpcPort = environment.getServicePort("axonserver", GRPC_PORT); + int httpPort = environment.getServicePort("axonserver", HTTP_PORT); + registry.add("axon.axonserver.servers", () -> "localhost:"+grpcPort); + registry.add("axon.axonserver.http-url", () -> "http://localhost:"+httpPort); + + } @BeforeEach void prepare(){ createEvents(); diff --git a/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java b/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java index ba9b3353..31e17287 100644 --- a/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java +++ b/reset-handler/src/test/java/io/axoniq/controller/ResetE2ETest.java @@ -8,6 +8,8 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.web.reactive.server.WebTestClient; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; @@ -24,8 +26,8 @@ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class ResetE2ETest { - final static int PORT_A = 8024; - final static int PORT_B = 8124; + private static final int HTTP_PORT = 8024; + private static final int GRPC_PORT = 8124; @LocalServerPort private int port; @@ -42,10 +44,19 @@ public class ResetE2ETest { @Container public static DockerComposeContainer environment = new DockerComposeContainer(new File("src/test/resources/compose-test.yml")) - .withExposedService("axonserver", PORT_A, Wait.forListeningPort()) - .withExposedService("axonserver", PORT_B, Wait.forListeningPort()) + .withExposedService("axonserver", HTTP_PORT, Wait.forListeningPort()) + .withExposedService("axonserver", GRPC_PORT, Wait.forListeningPort()) .waitingFor("axonserver", Wait.forLogMessage(".*Started AxonServer in .*",1)); + @DynamicPropertySource + public static void properties(DynamicPropertyRegistry registry) { + int grpcPort = environment.getServicePort("axonserver", GRPC_PORT); + int httpPort = environment.getServicePort("axonserver", HTTP_PORT); + + registry.add("axon.axonserver.servers", () -> "localhost:"+grpcPort); + registry.add("axon.axonserver.http-url", () -> "http://localhost:"+httpPort); + + } @Test void testEventsAreCreated(){ prepareIntegrationTest(); @@ -84,6 +95,7 @@ void createEvents(){ .expectStatus() .isOk(); } + waitForAS(); verify(projection, times(10)).on(any()); } diff --git a/reset-handler/src/test/resources/application.yml b/reset-handler/src/test/resources/application.yml new file mode 100644 index 00000000..699177e6 --- /dev/null +++ b/reset-handler/src/test/resources/application.yml @@ -0,0 +1,5 @@ +axon: + axonserver: + context: "default" + component-name: "component" + http-url: "http://127.0.0.1:8024" diff --git a/reset-handler/src/test/resources/compose-test.yml b/reset-handler/src/test/resources/compose-test.yml index d94b229a..a172bbc6 100644 --- a/reset-handler/src/test/resources/compose-test.yml +++ b/reset-handler/src/test/resources/compose-test.yml @@ -1,8 +1,5 @@ version: '3.3' services: axonserver: - image: hackyaxonserver - hostname: axonserver - ports: - - '8024:8024' - - '8124:8124' \ No newline at end of file + image: 'axoniq/axonserver:4.6.0-dev' + hostname: axonserver \ No newline at end of file