Skip to content

Commit

Permalink
Merge pull request #1 from protegeproject/first-releasable-fixes
Browse files Browse the repository at this point in the history
replaced pulsar with RabbitMQ. fixed tests
  • Loading branch information
matthewhorridge authored Mar 20, 2024
2 parents 0b672c6 + 329e591 commit 19361c9
Show file tree
Hide file tree
Showing 41 changed files with 919 additions and 1,771 deletions.
36 changes: 19 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,6 @@
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
Expand Down Expand Up @@ -109,18 +97,32 @@

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.18.1</version>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>1.18.1</version>
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.17.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<profiles>
<profile>
<id>release</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
*/
public interface CommandExecutor<Q extends Request<R>, R extends Response> {


CompletableFuture<R> execute(Q request, ExecutionContext executionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Stanford Center for Biomedical Informatics Research
* 2021-08-06
*/
@WebProtegeHandler
public interface CommandHandler<Q extends Request<R>, R extends Response> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,5 @@
* 2022-01-31
*/
public interface EventDispatcher {

String WEBPROTEGE_EVENTS_CHANNEL_NAME = "webprotege.events";

void dispatchEvent(Event event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

import edu.stanford.protege.webprotege.common.UserId;

import javax.annotation.Nonnull;
import java.util.Objects;

/**
* Matthew Horridge
* Stanford Center for Biomedical Informatics Research
* 2021-08-19
*/
public record ExecutionContext(UserId userId, String jwt) {
public record ExecutionContext(@Nonnull UserId userId, @Nonnull String jwt) {

public ExecutionContext() {
this(UserId.getGuest(), "");
}

public ExecutionContext {
Objects.requireNonNull(userId, "userId cannot be null");
Objects.requireNonNull(jwt, "jwt cannot be null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public class Headers {
public static final String PROJECT_ID = PREFIX + "projectId";

public static final String ACCESS_TOKEN = PREFIX + "accessToken";

public static final String METHOD = PREFIX + "methodName";

public static final String CHANNEL = PREFIX + "channel";
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
package edu.stanford.protege.webprotege.ipc;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.common.WebProtegeCommonConfiguration;
import edu.stanford.protege.webprotege.ipc.pulsar.*;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import edu.stanford.protege.webprotege.ipc.impl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Scope;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@Import(WebProtegeCommonConfiguration.class)
Expand All @@ -36,15 +22,6 @@ public class WebProtegeIpcApplication {

private static final Logger logger = LoggerFactory.getLogger(WebProtegeIpcApplication.class);

@Value("${webprotege.pulsar.tenant}")
private String tenant;

@Value("${webprotege.pulsar.serviceHttpUrl}")
private String serviceHttpUrl;

@Value("${webprotege.pulsar.serviceUrl}")
private String pulsarServiceUrl;

public static void main(String[] args) {
SpringApplication.run(WebProtegeIpcApplication.class, args);
}
Expand All @@ -55,109 +32,14 @@ MessageChannelMapper messageChannelMapper(@Value("${spring.application.name}") S
}

@Bean
EventDispatcher eventDispatcher(@Value("${spring.application.name}") String applicationName,
PulsarProducersManager pulsarProducersManager, ObjectMapper objectMapper) {
return new PulsarEventDispatcher(applicationName, pulsarProducersManager, objectMapper, tenant);
EventDispatcher eventDispatcher(ObjectMapper objectMapper, RabbitTemplate eventRabbitTemplate) {
return new RabbitMQEventDispatcher(objectMapper,eventRabbitTemplate);
}

@Bean
PulsarCommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> executorForGetAuthorizationStatusRequest() {
return new PulsarCommandExecutor<>(GetAuthorizationStatusResponse.class);
}

@Bean
PulsarAdmin pulsarAdmin() {
try {
var admin = new PulsarAdminBuilderImpl()
.serviceHttpUrl(serviceHttpUrl)
.build();
createTenantIfNecessary(admin);
createNamespaceIfNecessary(admin, PulsarNamespaces.COMMAND_REQUESTS);
createNamespaceIfNecessary(admin, PulsarNamespaces.COMMAND_REPLIES);
createNamespaceIfNecessary(admin, PulsarNamespaces.EVENTS);
return admin;
} catch (PulsarClientException | PulsarAdminException e) {
throw new RuntimeException(e);
}
}

private void createTenantIfNecessary(PulsarAdmin admin) throws PulsarAdminException {
if(!admin.tenants().getTenants().contains(tenant)) {
logger.info("Creating Pulsar tenant: {}", tenant);
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("admin"), Set.of("standalone")));
}
CommandExecutorImpl<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> executorForGetAuthorizationStatusRequest() {
return new CommandExecutorImpl<>(GetAuthorizationStatusResponse.class);
}

private void createNamespaceIfNecessary(PulsarAdmin admin, String namespace) throws PulsarAdminException {
var namespaceName = tenant + "/" + namespace;
if(!admin.namespaces().getNamespaces(tenant).contains(namespaceName)) {
logger.info("Creating Pulsar namespace: {}", namespaceName);
admin.namespaces().createNamespace(namespaceName);
}
}

@Bean
PulsarClient pulsarClient() throws PulsarClientException {
return PulsarClient.builder()
.connectionTimeout(3, TimeUnit.MINUTES)
.serviceUrl(pulsarServiceUrl).build();
}

@Bean
PulsarProducersManager pulsarProducersManager(PulsarClient pulsarClient,
@Value("${spring.application.name}") String applicationName) {
return new PulsarProducersManager(pulsarClient, applicationName);
}

@Bean
PulsarCommandHandlerWrapperFactory pulsarCommandHandlerWrapperFactory(@Value("${spring.application.name}") String applicationName,
ObjectMapper objectMapper,
PulsarProducersManager producersManager,
CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor,
PulsarClient pulsarClient) {

return new PulsarCommandHandlerWrapperFactory() {
@Override
public <Q extends Request<R>, R extends Response> PulsarCommandHandlerWrapper<Q, R> create(CommandHandler<Q, R> handler) {
return pulsarCommandHandlerWrapper(handler,
applicationName,
pulsarClient,
objectMapper,
producersManager,
authorizationStatusExecutor);
}
};
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public <Q extends Request<R>, R extends Response> PulsarCommandHandlerWrapper<Q, R> pulsarCommandHandlerWrapper(
CommandHandler<Q, R> handler,
String applicationName,
PulsarClient pulsarClient,
ObjectMapper objectMapper,
PulsarProducersManager producersManager,
CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor) {
return new PulsarCommandHandlerWrapper<>(applicationName,
tenant,
pulsarClient,
handler,
objectMapper,
producersManager,
authorizationStatusExecutor);
}

@Bean
Caffeine<Object, Object> pulsarProducerCaffeineConfig() {
return Caffeine.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES);
}

@Bean
public CacheManager cacheManager(Caffeine<Object, Object> caffeine) {
var caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCaffeine(caffeine);
return caffeineCacheManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package edu.stanford.protege.webprotege.ipc.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import edu.stanford.protege.webprotege.ipc.ExecutionContext;
import edu.stanford.protege.webprotege.ipc.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;


/**
* Matthew Horridge
* Stanford Center for Biomedical Informatics Research
* 2022-01-31
* <p>
* A {@link CommandExecutorImpl} is used to execute a specific command that has a specific type of request and
* a specific type of response. That is, a given command executor instance only handles requests for single channel.
*/
public class CommandExecutorImpl<Q extends Request<R>, R extends Response> implements CommandExecutor<Q, R> {

private static final Logger logger = LoggerFactory.getLogger(CommandExecutorImpl.class);

private final Class<R> responseClass;


private ObjectMapper objectMapper;

private AsyncRabbitTemplate asyncRabbitTemplate;

public CommandExecutorImpl(Class<R> responseClass) {
this.responseClass = responseClass;
}

@Override
public CompletableFuture<R> execute(Q request, ExecutionContext executionContext) {
try {
var json = objectMapper.writeValueAsBytes(request);
org.springframework.amqp.core.Message rabbitRequest = new org.springframework.amqp.core.Message(json);
rabbitRequest.getMessageProperties().getHeaders().put(Headers.ACCESS_TOKEN, executionContext.jwt());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.USER_ID, executionContext.userId());
rabbitRequest.getMessageProperties().getHeaders().put(Headers.METHOD, request.getChannel());

return asyncRabbitTemplate.sendAndReceive(request.getChannel(), rabbitRequest).thenApply(this::handleResponse);
} catch (Exception e) {
logger.error("Error ", e);
throw new RuntimeException(e);
}
}


private R handleResponse(Message rabbitResponse) {
var exception = (String) rabbitResponse.getMessageProperties().getHeaders().get(Headers.ERROR);

if(exception != null) {
try {
logger.error("Found error on response " + exception);
throw objectMapper.readValue(exception, CommandExecutionException.class);
} catch (JsonProcessingException e) {
logger.error("Error ", e);
throw new RuntimeException(e);
}
} else {
try {
return objectMapper.readValue(rabbitResponse.getBody(), responseClass);
} catch (IOException e) {
logger.error("Error ", e);

throw new RuntimeException(e);
}
}
}

@Autowired
@Qualifier("asyncRabbitTemplate")
@Lazy
public void setAsyncRabbitTemplate(AsyncRabbitTemplate asyncRabbitTemplate) {
this.asyncRabbitTemplate = asyncRabbitTemplate;
}

@Autowired
public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
}
Loading

0 comments on commit 19361c9

Please sign in to comment.