Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added meta data send and extract example #29

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ subprojects {

dependencies{
annotationProcessor 'org.projectlombok:lombok'
implementation "jpdemo:proto:1.5"
implementation "jpdemo:proto:1.8"
// https://mvnrepository.com/artifact/io.projectreactor/reactor-core
testCompile group: 'io.projectreactor', name: 'reactor-test', version: '3.3.10.RELEASE'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.server.TcpServerTransport;
import jpdemo.proto.context.v1.MessageContext;
import jpdemo.proto.greeting.v1.GreetingServiceServer;
import jpdemo.proto.greeting.v1.GreetingSetup;
import jpdemo.reactivegreeting.service.greeting.DefaultGreetingService;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
Expand All @@ -15,9 +17,11 @@
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.protobuf.ProtobufDecoder;
import org.springframework.http.codec.protobuf.ProtobufEncoder;
import org.springframework.messaging.rsocket.DefaultMetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Mono;

Expand All @@ -28,9 +32,14 @@ public class RSocketServerConfig {

@Bean
public RSocketMessageHandler rsocketMessageHandler(RSocketStrategies rSocketStrategies) {
var protoMimeType = MimeType.valueOf("application/x-protobuf");
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rSocketStrategies);
var metadataExtractor = new DefaultMetadataExtractor(new ProtobufDecoder());
metadataExtractor.metadataToExtract(protoMimeType, MessageContext.class,"messageContext");
// metadataExtractor.metadataToExtract(protoMimeType, GreetingSetup.class,"greetingSetup");

handler.setMetadataExtractor(metadataExtractor);
return handler;
}

Expand All @@ -40,13 +49,8 @@ public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoders(decoders -> {
decoders.add((new ProtobufDecoder()));
decoders.add(new Jackson2JsonDecoder());
decoders.add(new Jackson2CborDecoder());
}).encoders( encoders -> {
encoders.add(new ProtobufEncoder());
encoders.add(new Jackson2CborEncoder());
encoders.add(new Jackson2JsonEncoder());
encoders.add(new ProtobufEncoder());
})
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import com.google.protobuf.Empty;
import jpdemo.proto.context.v1.MessageContext;
import jpdemo.proto.greeting.v1.GreetingRequest;
import jpdemo.proto.greeting.v1.GreetingResponse;
import jpdemo.proto.greeting.v1.GreetingSetup;
Expand All @@ -10,6 +11,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
Expand All @@ -20,6 +22,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;


@Controller
Expand All @@ -33,46 +36,53 @@ public class GreetingController {
private LocalDateTime connectionInitiation; // value is assigned during SETUP frame

@ConnectMapping("greeting.setup")
public void setup(GreetingSetup config){
public void setup(GreetingSetup config) {

Assert.notNull(config,"Config should not be null");
Assert.notNull(config, "Config should not be null");
greetingService.setup(config);
var tStamp = config.getInitiated();
connectionInitiation = Instant
.ofEpochSecond(tStamp.getSeconds(),tStamp.getNanos())
.ofEpochSecond(tStamp.getSeconds(), tStamp.getNanos())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}

@MessageMapping({"greeting.request"})
public Mono<GreetingResponse> greetingRequestResponse(@Headers Map<String, Object> metadata, GreetingRequest request) {

@MessageMapping("greeting.request")
public Mono<GreetingResponse> greetingRequestResponse(GreetingRequest request){
log.info("Connection start timestamp "+connectionInitiation);
return greetingService.greeting(request,null);
log.info("Connection start timestamp " + connectionInitiation);
var messageContext = metadata != null ? (MessageContext) metadata.get("messageContext") : null;
if (messageContext != null) {
log.info("Retrieved meta data " + messageContext);
} else {
log.info("No message context meta data supplied");
}

return greetingService.greeting(request, null);
}

@MessageMapping("greeting.channel")
public Flux<GreetingResponse> greetingChannel(Publisher<GreetingRequest> requests){
log.info("Connection start timestamp "+connectionInitiation);
return greetingService.greetings(requests,null);
public Flux<GreetingResponse> greetingChannel(Publisher<GreetingRequest> requests) {
log.info("Connection start timestamp " + connectionInitiation);
return greetingService.greetings(requests, null);
}

@MessageMapping("greeting.fireforget")
public Mono<Empty> greetingLog(GreetingRequest request){
log.info("Connection start timestamp "+connectionInitiation);
return greetingService.logGreeting(request,null);
public Mono<Empty> greetingLog(GreetingRequest request) {
log.info("Connection start timestamp " + connectionInitiation);
return greetingService.logGreeting(request, null);
}

@MessageMapping("greeting.stream")
public Flux<GreetingResponse> randomGreetings(){
log.info("Connection start timestamp "+connectionInitiation);
return randomGreetingService.randomGreetings(null,null);
public Flux<GreetingResponse> randomGreetings() {
log.info("Connection start timestamp " + connectionInitiation);
return randomGreetingService.randomGreetings(null, null);
}

@MessageMapping("greeting.random")
public Mono<GreetingResponse> randomGreeting(){
log.info("Connection start timestamp "+connectionInitiation);
return randomGreetingService.randomGreeting(null,null);
public Mono<GreetingResponse> randomGreeting() {
log.info("Connection start timestamp " + connectionInitiation);
return randomGreetingService.randomGreeting(null, null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package jpdemo.reactivegreeting.rsocketgreeting.controller;

import com.google.protobuf.Timestamp;
import io.rsocket.metadata.WellKnownMimeType;
import jpdemo.proto.context.v1.MessageContext;
import jpdemo.proto.greeting.v1.GreetingRequest;
import jpdemo.proto.greeting.v1.GreetingResponse;
import jpdemo.proto.greeting.v1.GreetingSetup;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.http.codec.protobuf.ProtobufDecoder;
import org.springframework.http.codec.protobuf.ProtobufEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
class GreetingControllerITest {

private static Mono<RSocketRequester> monoRequester;
private final static String NAME = "Jason";

@BeforeAll
public static void setupOnce(@Autowired RSocketRequester.Builder builder, @Value("${spring.rsocket.server.port}") Integer port) {
Instant time = Instant.now();
Timestamp timestamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond())
.setNanos(time.getNano()).build();

var config = GreetingSetup.newBuilder()
.setLocale(GreetingSetup.LocaleType.LOCALE_EN)
.setInitiated(timestamp)
.build();

monoRequester = builder
.setupRoute("greeting.setup")
.setupData(config)
.rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
.dataMimeType(MimeTypeUtils.ALL)
.rsocketStrategies(rsBuilder -> {
rsBuilder.encoder(new ProtobufEncoder());
rsBuilder.decoder(new ProtobufDecoder());
})
.connectTcp("localhost", port);


}

@Test
void setup() {
}

@Test
void greetingRequestResponseMetaDataTest() {

var messageContext = MessageContext.newBuilder()
.setInitiated(Timestamp.newBuilder().build())
.setOrigin("Integration Test")
.setRequestId(UUID.randomUUID().toString())
.build();


/* var config = GreetingSetup.newBuilder()
.setLocale(GreetingSetup.LocaleType.LOCALE_DE)
.build();*/
var request = GreetingRequest.newBuilder().setName(NAME).build();
var response = monoRequester.flatMap(requester ->
requester
.route("greeting.request")
//.metadata(config, MimeType.valueOf("application/x-protobuf"))
.metadata(messageContext, MimeType.valueOf("application/x-protobuf"))
.data(request)
.retrieveMono(GreetingResponse.class));

// Verify that the response message contains the expected data (2)
StepVerifier
.create(response)
.consumeNextWith(greetingResponse -> {
assertThat(greetingResponse.getGreeting()).isEqualTo("Hi " + NAME);
})
.verifyComplete();

}

@Test
void greetingRequestResponseWithoutMetaData() {

var request = GreetingRequest.newBuilder().setName(NAME).build();
var response = monoRequester.flatMap(requester ->
requester
.route("greeting.request")
.data(request)
.retrieveMono(GreetingResponse.class));

// Verify that the response message contains the expected data (2)
StepVerifier
.create(response)
.consumeNextWith(greetingResponse -> {
assertThat(greetingResponse.getGreeting()).isEqualTo("Hi " + NAME);
})
.verifyComplete();

}


@Test
void greetingChannel() {
var route = "greeting.channel";
}

@Test
void greetingLog() {
}

@Test
void randomGreetings() {
}

@Test
void randomGreeting() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void greetingRequestResponse() {
when(greetingService.greeting(request, null)).thenReturn(Mono.just(mockResponse));

// when
var response = classUnderTest.greetingRequestResponse(request).block();
var response = classUnderTest.greetingRequestResponse(null,request).block();

//then
Mockito.verify(greetingService).greeting(request, null);
Expand Down