From 03c02f23768d39044b0f0b1f604bc26b4f21e744 Mon Sep 17 00:00:00 2001
From: perrinsjason <59032519+perrinsjason@users.noreply.github.com>
Date: Thu, 19 Nov 2020 13:13:54 +0000
Subject: [PATCH 1/2] Added meta data send and extract example
---
.idea/jarRepositories.xml | 5 +
build.gradle | 2 +-
.../config/RSocketServerConfig.java | 14 +-
.../controller/GreetingController.java | 14 +-
.../controller/GreetingControllerITest.java | 138 ++++++++++++++++++
.../controller/GreetingControllerTest.java | 2 +-
6 files changed, 166 insertions(+), 9 deletions(-)
create mode 100644 rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml
index 93af507..68a327b 100644
--- a/.idea/jarRepositories.xml
+++ b/.idea/jarRepositories.xml
@@ -21,5 +21,10 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index c03839d..2993db4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -49,7 +49,7 @@ subprojects {
dependencies{
annotationProcessor 'org.projectlombok:lombok'
- implementation "jpdemo:proto:1.5"
+ implementation "jpdemo:proto:1.8"
// https://mvnrepository.com/artifact/io.projectreactor/reactor-core
testCompile group: 'io.projectreactor', name: 'reactor-test', version: '3.3.10.RELEASE'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
diff --git a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/config/RSocketServerConfig.java b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/config/RSocketServerConfig.java
index fc45411..59b0677 100644
--- a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/config/RSocketServerConfig.java
+++ b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/config/RSocketServerConfig.java
@@ -4,7 +4,9 @@
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.server.TcpServerTransport;
+import jpdemo.proto.context.v1.MessageContext;
import jpdemo.proto.greeting.v1.GreetingServiceServer;
+import jpdemo.proto.greeting.v1.GreetingSetup;
import jpdemo.reactivegreeting.service.greeting.DefaultGreetingService;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
@@ -15,9 +17,11 @@
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.protobuf.ProtobufDecoder;
import org.springframework.http.codec.protobuf.ProtobufEncoder;
+import org.springframework.messaging.rsocket.DefaultMetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
+import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Mono;
@@ -28,9 +32,14 @@ public class RSocketServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler(RSocketStrategies rSocketStrategies) {
+ var protoMimeType = MimeType.valueOf("application/x-protobuf");
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rSocketStrategies);
+ var metadataExtractor = new DefaultMetadataExtractor(new ProtobufDecoder());
+ metadataExtractor.metadataToExtract(protoMimeType, MessageContext.class,"messageContext");
+ // metadataExtractor.metadataToExtract(protoMimeType, GreetingSetup.class,"greetingSetup");
+ handler.setMetadataExtractor(metadataExtractor);
return handler;
}
@@ -40,13 +49,8 @@ public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoders(decoders -> {
decoders.add((new ProtobufDecoder()));
- decoders.add(new Jackson2JsonDecoder());
- decoders.add(new Jackson2CborDecoder());
}).encoders( encoders -> {
encoders.add(new ProtobufEncoder());
- encoders.add(new Jackson2CborEncoder());
- encoders.add(new Jackson2JsonEncoder());
- encoders.add(new ProtobufEncoder());
})
.build();
}
diff --git a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
index d16beeb..750a3b8 100644
--- a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
+++ b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
@@ -2,6 +2,7 @@
import com.google.protobuf.Empty;
+import jpdemo.proto.context.v1.MessageContext;
import jpdemo.proto.greeting.v1.GreetingRequest;
import jpdemo.proto.greeting.v1.GreetingResponse;
import jpdemo.proto.greeting.v1.GreetingSetup;
@@ -10,6 +11,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
+import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
@@ -20,6 +22,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.util.Map;
@Controller
@@ -44,10 +47,17 @@ public void setup(GreetingSetup config){
.toLocalDateTime();
}
+ @MessageMapping({"greeting.request"})
+ public Mono greetingRequestResponse(@Headers Map metadata, GreetingRequest request){
- @MessageMapping("greeting.request")
- public Mono greetingRequestResponse(GreetingRequest request){
log.info("Connection start timestamp "+connectionInitiation);
+ var messageContext = (MessageContext)metadata.get("messageContext");
+ if(messageContext!=null){
+ log.info("Retrieved meta data "+messageContext);
+ }else{
+ log.info("No message context meta data supplied");
+ }
+
return greetingService.greeting(request,null);
}
diff --git a/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java b/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
new file mode 100644
index 0000000..38db67b
--- /dev/null
+++ b/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
@@ -0,0 +1,138 @@
+package jpdemo.reactivegreeting.rsocketgreeting.controller;
+
+import com.google.protobuf.Timestamp;
+import io.rsocket.metadata.WellKnownMimeType;
+import jpdemo.proto.context.v1.MessageContext;
+import jpdemo.proto.greeting.v1.GreetingRequest;
+import jpdemo.proto.greeting.v1.GreetingResponse;
+import jpdemo.proto.greeting.v1.GreetingSetup;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.codec.cbor.Jackson2CborEncoder;
+import org.springframework.http.codec.json.Jackson2JsonEncoder;
+import org.springframework.http.codec.protobuf.ProtobufDecoder;
+import org.springframework.http.codec.protobuf.ProtobufEncoder;
+import org.springframework.messaging.rsocket.RSocketRequester;
+import org.springframework.util.MimeType;
+import org.springframework.util.MimeTypeUtils;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest
+class GreetingControllerITest {
+
+ private static Mono monoRequester;
+ private final static String NAME = "Jason";
+
+ @BeforeAll
+ public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) {
+ Instant time = Instant.now();
+ Timestamp timestamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond())
+ .setNanos(time.getNano()).build();
+
+ var config = GreetingSetup.newBuilder()
+ .setLocale(GreetingSetup.LocaleType.LOCALE_EN)
+ .setInitiated(timestamp)
+ .build();
+
+ monoRequester = builder
+ .setupRoute("greeting.setup")
+ .setupData(config)
+ .rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
+ .dataMimeType(MimeTypeUtils.ALL)
+ .rsocketStrategies(rsBuilder -> {
+ rsBuilder.encoder(new ProtobufEncoder());
+ rsBuilder.decoder(new ProtobufDecoder());
+ })
+ .connectTcp("localhost", port);
+
+
+ }
+
+ @Test
+ void setup() {
+ }
+
+ @Test
+ void greetingRequestResponseMetaDataTest() {
+
+ var messageContext = MessageContext.newBuilder()
+ .setInitiated(Timestamp.newBuilder().build())
+ .setOrigin("Integration Test")
+ .setRequestId(UUID.randomUUID().toString())
+ .build();
+
+
+ /* var config = GreetingSetup.newBuilder()
+ .setLocale(GreetingSetup.LocaleType.LOCALE_DE)
+ .build();*/
+ var request = GreetingRequest.newBuilder().setName(NAME).build();
+ var response = monoRequester.flatMap(requester ->
+ requester
+ .route("greeting.request")
+ //.metadata(config, MimeType.valueOf("application/x-protobuf"))
+ .metadata(messageContext, MimeType.valueOf("application/x-protobuf"))
+ .data(request)
+ .retrieveMono(GreetingResponse.class));
+
+ // Verify that the response message contains the expected data (2)
+ StepVerifier
+ .create(response)
+ .consumeNextWith(greetingResponse -> {
+ assertThat(greetingResponse.getGreeting()).isEqualTo("Hi " + NAME);
+ })
+ .verifyComplete();
+
+ }
+
+ @Test
+ void greetingRequestResponseWithoutMetaData() {
+
+ var request = GreetingRequest.newBuilder().setName(NAME).build();
+ var response = monoRequester.flatMap(requester ->
+ requester
+ .route("greeting.request")
+ .data(request)
+ .retrieveMono(GreetingResponse.class));
+
+ // Verify that the response message contains the expected data (2)
+ StepVerifier
+ .create(response)
+ .consumeNextWith(greetingResponse -> {
+ assertThat(greetingResponse.getGreeting()).isEqualTo("Hi " + NAME);
+ })
+ .verifyComplete();
+
+ }
+
+ @Test
+ void testGreetingRequestResponse() {
+ }
+
+ @Test
+ void greetingChannel() {
+ }
+
+ @Test
+ void greetingLog() {
+ }
+
+ @Test
+ void randomGreetings() {
+ }
+
+ @Test
+ void randomGreeting() {
+ }
+}
\ No newline at end of file
diff --git a/rsocket-greeting/src/test/java/unit/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerTest.java b/rsocket-greeting/src/test/java/unit/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerTest.java
index fca7cf8..75b6fd5 100644
--- a/rsocket-greeting/src/test/java/unit/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerTest.java
+++ b/rsocket-greeting/src/test/java/unit/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerTest.java
@@ -75,7 +75,7 @@ void greetingRequestResponse() {
when(greetingService.greeting(request, null)).thenReturn(Mono.just(mockResponse));
// when
- var response = classUnderTest.greetingRequestResponse(request).block();
+ var response = classUnderTest.greetingRequestResponse(null,request).block();
//then
Mockito.verify(greetingService).greeting(request, null);
From 91f70a8e0223fe7d20f06fc89a428ba4df42414f Mon Sep 17 00:00:00 2001
From: perrinsjason <59032519+perrinsjason@users.noreply.github.com>
Date: Thu, 19 Nov 2020 13:20:37 +0000
Subject: [PATCH 2/2] Added meta data send and extract example
---
.../controller/GreetingController.java | 44 +++++++++----------
.../controller/GreetingControllerITest.java | 6 +--
2 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
index 750a3b8..bc91125 100644
--- a/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
+++ b/rsocket-greeting/src/main/java/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingController.java
@@ -36,53 +36,53 @@ public class GreetingController {
private LocalDateTime connectionInitiation; // value is assigned during SETUP frame
@ConnectMapping("greeting.setup")
- public void setup(GreetingSetup config){
+ public void setup(GreetingSetup config) {
- Assert.notNull(config,"Config should not be null");
+ Assert.notNull(config, "Config should not be null");
greetingService.setup(config);
var tStamp = config.getInitiated();
connectionInitiation = Instant
- .ofEpochSecond(tStamp.getSeconds(),tStamp.getNanos())
+ .ofEpochSecond(tStamp.getSeconds(), tStamp.getNanos())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
@MessageMapping({"greeting.request"})
- public Mono greetingRequestResponse(@Headers Map metadata, GreetingRequest request){
+ public Mono greetingRequestResponse(@Headers Map metadata, GreetingRequest request) {
- log.info("Connection start timestamp "+connectionInitiation);
- var messageContext = (MessageContext)metadata.get("messageContext");
- if(messageContext!=null){
- log.info("Retrieved meta data "+messageContext);
- }else{
+ log.info("Connection start timestamp " + connectionInitiation);
+ var messageContext = metadata != null ? (MessageContext) metadata.get("messageContext") : null;
+ if (messageContext != null) {
+ log.info("Retrieved meta data " + messageContext);
+ } else {
log.info("No message context meta data supplied");
}
- return greetingService.greeting(request,null);
+ return greetingService.greeting(request, null);
}
@MessageMapping("greeting.channel")
- public Flux greetingChannel(Publisher requests){
- log.info("Connection start timestamp "+connectionInitiation);
- return greetingService.greetings(requests,null);
+ public Flux greetingChannel(Publisher requests) {
+ log.info("Connection start timestamp " + connectionInitiation);
+ return greetingService.greetings(requests, null);
}
@MessageMapping("greeting.fireforget")
- public Mono greetingLog(GreetingRequest request){
- log.info("Connection start timestamp "+connectionInitiation);
- return greetingService.logGreeting(request,null);
+ public Mono greetingLog(GreetingRequest request) {
+ log.info("Connection start timestamp " + connectionInitiation);
+ return greetingService.logGreeting(request, null);
}
@MessageMapping("greeting.stream")
- public Flux randomGreetings(){
- log.info("Connection start timestamp "+connectionInitiation);
- return randomGreetingService.randomGreetings(null,null);
+ public Flux randomGreetings() {
+ log.info("Connection start timestamp " + connectionInitiation);
+ return randomGreetingService.randomGreetings(null, null);
}
@MessageMapping("greeting.random")
- public Mono randomGreeting(){
- log.info("Connection start timestamp "+connectionInitiation);
- return randomGreetingService.randomGreeting(null,null);
+ public Mono randomGreeting() {
+ log.info("Connection start timestamp " + connectionInitiation);
+ return randomGreetingService.randomGreeting(null, null);
}
}
diff --git a/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java b/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
index 38db67b..3745259 100644
--- a/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
+++ b/rsocket-greeting/src/test/java/integration/jpdemo/reactivegreeting/rsocketgreeting/controller/GreetingControllerITest.java
@@ -99,7 +99,7 @@ void greetingRequestResponseMetaDataTest() {
@Test
void greetingRequestResponseWithoutMetaData() {
- var request = GreetingRequest.newBuilder().setName(NAME).build();
+ var request = GreetingRequest.newBuilder().setName(NAME).build();
var response = monoRequester.flatMap(requester ->
requester
.route("greeting.request")
@@ -116,12 +116,10 @@ void greetingRequestResponseWithoutMetaData() {
}
- @Test
- void testGreetingRequestResponse() {
- }
@Test
void greetingChannel() {
+ var route = "greeting.channel";
}
@Test