Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event processor reset with admin api #193

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions reset-handler/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
# Reset Handler
This demo is meant to show how to reset a tracking token.

There are 2 ways of doing that, using Axon Server API or just Axon Framework.
There are 3 ways of doing that, using Axon Server API, just Axon Framework or the Axon Server Connector.
A more detailed description can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token).

### Using Axon Server API
### Using Axon Server REST API

When in a distributed environment, one can have several applications connected to Axon Server while sharing the same token store.
To be able to reset a token in this scenario, we have to ask Axon Server to pause every known instance of a given Processor Name to be able to reset it and start it back again.

> We recommend checking the [ServerEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/ServerEventProcessorRestController.java) for more information.
> We recommend checking the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) for more information.

### Using Axon Framework

Axon Framework provides another easy way to do it using the `StreamingEventProcessor` methods, namely `shutDown`, `resetTokens` and `start`. When doing it through Axon Framework, the application instance doing the operation should be the one having the claim of the token.

> We recommend checking the [FrameworkEventProcessorRestController.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/framework/FrameworkEventProcessorRestController.java) for more information.

> We recommend checking the [FrameworkEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/FrameworkEventProcessorService.java) for more information.

## Using the Axon Server Connector
The Axon Server Connector provides methods to pause and restart an event processor.
This functionality can be combined to reset the event processor as shown in the other examples.

> We recommend checking the [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java) for more information.

### Running the application
This is a Spring boot application, as such it can be ran as any other standard Spring Boot application. It has a simple `/event` endpoint where you can create new empty events. For resetting the token, it provides 2 reset endpoints:
- `/server/reset/{processorName}`
Expand All @@ -27,6 +34,9 @@ Since Axon Server is a requirement for this sample, a `docker-compose` file is p

Also, if you are on Intellij, a `requests.http` file is provided to make it easy to call the endpoints.

Most of the logic for the Axon Server reset is on the [EventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/server/EventProcessorService.java) class and the added javadoc should be enough to explain what it does.
Most of the logic for the Axon Server reset via REST is on the [RestEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/RestEventProcessorService.java) class and the added javadoc should be enough to explain what it does.
In the same way, details for the Axon Server reset via the Server Connector can be found in [ServerConnectorEventProcessorService.java](https://github.com/AxonIQ/code-samples/blob/master/reset-handler/src/main/java/io/axoniq/service/ServerConnectorEventProcessorService.java).

For the Axon Framework version, we recommend checking the official [StreamingEventProcessor.java](https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/StreamingEventProcessor.java) documentation.

A general introduction, regardless of the method used, can be found in the [reference-guide](https://docs.axoniq.io/reference-guide/axon-server/administration/reset-event-processor-token).
2 changes: 1 addition & 1 deletion reset-handler/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3.3'
services:
axonserver:
image: axoniq/axonserver
image: 'axoniq/axonserver:4.6.0-dev'
hostname: axonserver
ports:
- '8024:8024'
Expand Down
32 changes: 32 additions & 0 deletions reset-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,47 @@
<version>0.0.1-SNAPSHOT</version>
</parent>



<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.5.14</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not the 4.8 form the bom?

</dependency>
<dependency>
<groupId>io.axoniq</groupId>
<artifactId>axonserver-connector-java</artifactId>
<version>4.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.17</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.axoniq.config;

import io.axoniq.axonserver.connector.admin.AdminChannel;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;


/*
Creates an admin channel from the configuration. used to simplify testing in other components.
*/
@Component
public class ConfigBasedAdminChannel {
public ConfigBasedAdminChannel(AxonServerConnectionManager axonServerConnectionManager){
this.axonServerConnectionManager = axonServerConnectionManager;
}
private final AxonServerConnectionManager axonServerConnectionManager;

@Bean
public AdminChannel adminChannel() {
return axonServerConnectionManager.getConnection().adminChannel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.axoniq.config;

import io.axoniq.service.FrameworkEventProcessorService;
import io.axoniq.service.ServerConnectorEventProcessorService;
import io.axoniq.service.EventProcessorService;
import io.axoniq.service.RestEventProcessorService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;

/*
Extract the service using the specified method to reset the token.
This allows for cleaner signatures in the controllers and the services
*/
@Component
public class StringToEventProcessorServiceConverter implements Converter<String, EventProcessorService> {

final RestEventProcessorService restEventProcessorService;
final FrameworkEventProcessorService frameworkEventProcessorService;
final ServerConnectorEventProcessorService serverConnectorEventProcessorService;

public StringToEventProcessorServiceConverter(RestEventProcessorService restEventProcessorService, FrameworkEventProcessorService frameworkEventProcessorService, ServerConnectorEventProcessorService serverConnectorEventProcessorService) {
this.restEventProcessorService = restEventProcessorService;
this.frameworkEventProcessorService = frameworkEventProcessorService;
this.serverConnectorEventProcessorService = serverConnectorEventProcessorService;
}

/*
Match the passed string against a set of known constants.
This is not elegant but does get its job done for the sample.
*/
@Override
public EventProcessorService convert(String from) {
switch (from){
case "server": return serverConnectorEventProcessorService;
case "rest": return restEventProcessorService;
case "grpc": throw new IllegalArgumentException();
case "framework": return frameworkEventProcessorService;
default: throw new IllegalArgumentException();
}
}
}
24 changes: 24 additions & 0 deletions reset-handler/src/main/java/io/axoniq/config/WebConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.axoniq.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.format.FormatterRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;

/**
* Add a Converter from String to EventProcessorService.
* We can for example map `framework` to a FrameworkEventProcessorService
*/
@Configuration
public class WebConfig implements WebFluxConfigurer {

final StringToEventProcessorServiceConverter converter;

public WebConfig(StringToEventProcessorServiceConverter converter) {
this.converter = converter;
}

@Override
public void addFormatters(FormatterRegistry registry) {
registry.addConverter(converter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.axoniq.controller;

import io.axoniq.service.EventProcessorService;
import org.axonframework.config.Configuration;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

/**
* Uses the EventProcessorService provided to it based on the supplied method
*/
@RestController
public class EventProcessorRestController {

@GetMapping("{method}/start/{processorName}")
public Mono<Void> start(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) {
Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!");
return service.start(processorName);
}

@GetMapping("{method}/pause/{processorName}")
public Mono<Void> pause(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) {
Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!");
return service.pause(processorName);
}

@GetMapping("{method}/reset/{processorName}")
public Mono<Void> reset(@PathVariable("method") EventProcessorService service, @PathVariable String processorName) {
Assert.hasText(processorName, "Processing Group is mandatory and can't be empty!");
return service.reset(processorName);
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.axoniq;
package io.axoniq.controller;

import org.axonframework.eventhandling.gateway.EventGateway;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/*
Creates empty events to allow you to see the effects of a reset
*/
@RestController
public class EventRestController {

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.axoniq.service;

import reactor.core.publisher.Mono;

public interface EventProcessorService {
Mono<Void> pause(String processorName);

Mono<Void> start(String processorName);

Mono<Void> reset(String processorName);

}
Loading