From 55507f913c34d05f7a69dda9e27c859f91f15e0e Mon Sep 17 00:00:00 2001 From: dtuchs Date: Sat, 2 Mar 2024 22:16:31 +0600 Subject: [PATCH 1/2] Improve AllureGrpc for non-blocking (streaming) stubs --- README.md | 45 ++- allure-grpc/build.gradle.kts | 5 +- .../io/qameta/allure/grpc/AllureGrpc.java | 348 ++++++++++++++---- .../allure/grpc/GrpcRequestAttachment.java | 3 + .../allure/grpc/GrpcResponseAttachment.java | 3 + .../io/qameta/allure/grpc/AllureGrpcTest.java | 233 ++++++++++-- allure-grpc/src/test/proto/api.proto | 2 + settings.gradle.kts | 2 +- 8 files changed, 523 insertions(+), 118 deletions(-) diff --git a/README.md b/README.md index e452787b3..a87375802 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ You can specify custom templates, which should be placed in src/main/resources/t ## gRPC -Interceptor for gRPC stubs, that generates attachment for allure. +Interceptor for gRPC stubs (blocking and non-blocking), that generates attachment for allure. ```xml @@ -136,20 +136,51 @@ Interceptor for gRPC stubs, that generates attachment for allure. ``` -Usage example: +Usage example with blocking stub: +``` +ServiceGrpc.newBlockingStub(channel) + .withInterceptors(new AllureGrpc()); + +@Test +void test() { + blockingStub.unaryCall(...); +} ``` -.newBlockingStub(channel).withInterceptors(new AllureGrpc()); +Similarly with non-blocking stub: +``` +ServiceGrpc.newStub(channel) + .withInterceptors(new AllureGrpc()); + +@Test +void test() { + stub.streamingCall(...); +} + +@AfterEach +void awaitAllureForNonBlockingCalls() { + AllureGrpc.await(); +} +``` +With non-blocking Stubs you have to use after-each hook for tests because non-blocking stubs +are async, and allure-report might not be created after non-blocking Stubs call. +You can use `@AfterEach` annotation for JUnit 5, `@AfterTest` for TestNG and `@After` for JUnit 4. +Of course, you can implement Extension or Rule to achieve that. + +Also, you can register interceptor for `Channel` and use it for all kinds of gRPC stubs, +but you still need to use `AllureGrpc.await()` after non-blocking Stubs calls. +``` +final Channel channel = ManagedChannelBuilder.forAddress("localhost", 8092) + .intercept(new AllureGrpc()) + .build(); ``` You can enable interception of response metadata (disabled by default) ``` -.withInterceptors(new AllureGrpc() - .interceptResponseMetadata(true)) +.withInterceptors(new AllureGrpc().interceptResponseMetadata(true)) ``` By default, a step will be marked as failed in case that response contains any statuses except 0(OK). You can change this behavior, for example, for negative scenarios ``` -.withInterceptors(new AllureGrpc() - .markStepFailedOnNonZeroCode(false)) +.withInterceptors(new AllureGrpc().markStepFailedOnNonZeroCode(false)) ``` You can specify custom templates, which should be placed in src/main/resources/tpl folder: ``` diff --git a/allure-grpc/build.gradle.kts b/allure-grpc/build.gradle.kts index 1205a6caf..38fc15e11 100644 --- a/allure-grpc/build.gradle.kts +++ b/allure-grpc/build.gradle.kts @@ -8,14 +8,15 @@ description = "Allure gRPC Integration" val agent: Configuration by configurations.creating -val grpcVersion = "1.57.2" -val protobufVersion = "3.22.4" +val grpcVersion = "1.62.2" +val protobufVersion = "3.25.3" dependencies { agent("org.aspectj:aspectjweaver") api(project(":allure-attachments")) implementation("io.grpc:grpc-core:$grpcVersion") implementation("com.google.protobuf:protobuf-java-util:$protobufVersion") + internal("com.fasterxml.jackson.core:jackson-databind") testImplementation("io.grpc:grpc-stub:$grpcVersion") testImplementation("io.grpc:grpc-protobuf:$grpcVersion") diff --git a/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java b/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java index 1217dcf92..1cd8adf38 100644 --- a/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java +++ b/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java @@ -15,6 +15,11 @@ */ package io.qameta.allure.grpc; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.util.DefaultIndenter; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; @@ -27,6 +32,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.qameta.allure.Allure; +import io.qameta.allure.AllureLifecycle; import io.qameta.allure.attachment.AttachmentData; import io.qameta.allure.attachment.AttachmentProcessor; import io.qameta.allure.attachment.DefaultAttachmentProcessor; @@ -37,16 +43,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.UUID; - -import static java.util.Objects.requireNonNull; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** - * Allure interceptor logger for gRPC. + * Allure interceptor logger for gRPC that is used to create allure steps and attach gRPC messages + * for the gRPC Channel, blocking stubs, or non-blocking stubs with after-each hook (see examples). + * Channels, stubs may be either static or non-static. + * + *

Usage with Channel

+ *
+ *   static Channel channel = ManagedChannelBuilder.forAddress("localhost", 8092)
+ *       .intercept(new AllureGrpc())
+ *       .build();
+ * 
+ * When {@code AllureGrpc} object is present in a Channel, + * all stubs (both blocking and non-blocking) that using this Channel will intercept messages + * to Allure-report. + *

NOTE! + * With non-blocking Stubs you have to use after-each hook for tests because non-blocking stubs + * are async, and allure-report might not be created after non-blocking Stubs call. See examples below + *

+ * + *

Usage with blocking Stubs

+ *
+ *   ServiceGrpc.ServiceBlockingStub blockingStub = ServiceGrpc
+ *      .newBlockingStub(channel)
+ *      .withInterceptors(new AllureGrpc());
+ * 
+ * You can just add {@code AllureGrpc} object to blocking Stub object, all unary calls from + * this stub will be logged to Allure-report. + * + *

Usage with non-blocking Stubs (example for JUnit 5)

+ *
+ *   ServiceGrpc.ServiceStub stub = ServiceGrpc
+ *      .newStub(channel)
+ *      .withInterceptors(new AllureGrpc());
  *
- * @author dtuchs (Dmitry Tuchs).
+ *   {@code @Test}
+ *   void test() {
+ *       stub.streamingCall(...);
+ *   }
+ *
+ *   {@code @AfterEach}
+ *   void awaitAllureForNonBlockingCalls() {
+ *     AllureGrpc.await();
+ *   }
+ * 
+ * You can use {@code @AfterTest} annotation for TestNG and {@code @After} for JUnit 4. + * Of course, you can implement Extension or Rule to achieve that. + * + * @author dtuchs (Dmitrii Tuchs). */ @SuppressWarnings({ "PMD.AvoidFieldNameMatchingMethodName", @@ -57,7 +113,12 @@ public class AllureGrpc implements ClientInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(AllureGrpc.class); - private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer(); + private static final JsonFormat.Printer GRPC_TO_JSON_PRINTER = JsonFormat.printer(); + private static final Map TEST_CASE_HOLDER = new ConcurrentHashMap<>(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final DefaultPrettyPrinter JSON_PRETTY_PRINTER = new DefaultPrettyPrinter(); + private static final String STREAMING_MESSAGE + = "gRPC messages (collection of elements from %s stream)"; private String requestTemplatePath = "grpc-request.ftl"; private String responseTemplatePath = "grpc-response.ftl"; @@ -85,44 +146,122 @@ public AllureGrpc interceptResponseMetadata(final boolean value) { return this; } - @SuppressWarnings({"PMD.MethodArgumentCouldBeFinal", "PMD.NPathComplexity"}) + /** + *

Should be called only after tests that use non-blocking Stubs.

+ * Generally, you can do it in the {@code @AfterEach} method in JUnit 5, + * {@code @AfterTest} in TestNG, or {@code @After} in JUnit 4 tests + */ + public static void await() { + Allure.getLifecycle().getCurrentTestCase().ifPresent(caseId -> { + try { + final CountDownLatch latch = TEST_CASE_HOLDER.get(caseId); + if (latch != null) { + latch.await(); + TEST_CASE_HOLDER.remove(caseId); + } + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + }); + } + + @SuppressWarnings({"checkstyle:Methodlength", "PMD.MethodArgumentCouldBeFinal", "PMD.NPathComplexity"}) @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - final AttachmentProcessor processor = new DefaultAttachmentProcessor(); - return new ForwardingClientCall.SimpleForwardingClientCall( next.newCall(method, callOptions.withoutWaitForReady())) { - private String stepUuid; - private List parsedResponses = new ArrayList<>(); + private final Queue parsedResponses = new ConcurrentLinkedQueue<>(); + private final Queue parsedRequests = new ConcurrentLinkedQueue<>(); + + private final AtomicBoolean allureThreadStarted = new AtomicBoolean(false); + private final CountDownLatch interceptorLatch = new CountDownLatch(1); + + private final AtomicReference exceptionHolder = new AtomicReference<>(); + private final AtomicReference statusHolder = new AtomicReference<>(); + private final AtomicReference metadataHolder = new AtomicReference<>(); + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private String caseUuid; @SuppressWarnings("PMD.MethodArgumentCouldBeFinal") @Override public void sendMessage(T message) { - stepUuid = UUID.randomUUID().toString(); - Allure.getLifecycle().startStep(stepUuid, (new StepResult()).setName( - "Send gRPC request to " - + next.authority() - + trimGrpcMethodName(method.getFullMethodName()) - )); + final AllureLifecycle lifecycle = Allure.getLifecycle(); + caseUuid = lifecycle.getCurrentTestCase().orElseThrow( + () -> new IllegalStateException("No test case started") + ); + TEST_CASE_HOLDER.put(caseUuid, new CountDownLatch(1)); + + if (!allureThreadStarted.get()) { + executor.submit(new Runnable() { + private final AtomicBoolean stepCreated = new AtomicBoolean(false); + private final AttachmentProcessor processor + = new DefaultAttachmentProcessor(lifecycle); + private String stepUuid; + + /** + * Daemon thread that will be used to create steps and attachments. + * This approach should be used because non-blocking stubs use several threads + * to send and receive messages, + * while {@code AllureThreadContext} uses {@code ThreadLocal} internally. + */ + @Override + public void run() { + if (!stepCreated.get()) { + stepUuid = startStep(lifecycle); + } + + try { + interceptorLatch.await(); + if (exceptionHolder.get() != null) { + markStepBrokenByException(lifecycle); + } else { + final io.grpc.Status status = statusHolder.get(); + final Metadata metadata = metadataHolder.get(); + final GrpcRequestAttachment.Builder requestAttachment + = requestAttachment(method); + final GrpcResponseAttachment.Builder responseAttachment + = responseAttachment(method, status, metadata); + + processor.addAttachment( + requestAttachment.build(), + new FreemarkerAttachmentRenderer(requestTemplatePath) + ); + processor.addAttachment( + responseAttachment.build(), + new FreemarkerAttachmentRenderer(responseTemplatePath) + ); + + if (status.isOk() || !markStepFailedOnNonZeroCode) { + lifecycle.updateStep(stepUuid, step -> step.setStatus(Status.PASSED)); + } else { + lifecycle.updateStep(stepUuid, step -> step.setStatus(Status.FAILED)); + } + } + lifecycle.stopStep(stepUuid); + TEST_CASE_HOLDER.get(caseUuid).countDown(); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted ", e); + } finally { + stepUuid = null; + stepCreated.set(false); + } + } + }); + allureThreadStarted.set(true); + } + try { - final GrpcRequestAttachment rpcRequestAttach = GrpcRequestAttachment.Builder - .create("gRPC request", method.getFullMethodName()) - .setBody(JSON_PRINTER.print((MessageOrBuilder) message)) - .build(); - processor.addAttachment(rpcRequestAttach, new FreemarkerAttachmentRenderer(requestTemplatePath)); + parsedRequests.add(GRPC_TO_JSON_PRINTER.print((MessageOrBuilder) message)); super.sendMessage(message); } catch (InvalidProtocolBufferException e) { LOGGER.warn("Can`t parse gRPC request", e); - } catch (Throwable e) { - Allure.getLifecycle().updateStep(stepResult -> - stepResult.setStatus(ResultsUtils.getStatus(e).orElse(Status.BROKEN)) - .setStatusDetails(ResultsUtils.getStatusDetails(e).orElse(null)) - ); - Allure.getLifecycle().stopStep(stepUuid); - stepUuid = null; + } catch (Exception e) { + exceptionHolder.set(e); + shutdownAllureThread(); } } @@ -138,48 +277,9 @@ protected Listener delegate() { @SuppressWarnings({"PMD.MethodArgumentCouldBeFinal", "PMD.AvoidLiteralsInIfCondition"}) @Override public void onClose(io.grpc.Status status, Metadata trailers) { - GrpcResponseAttachment.Builder responseAttachmentBuilder = null; - - if (parsedResponses.size() == 1) { - responseAttachmentBuilder = GrpcResponseAttachment.Builder - .create("gRPC response") - .setBody(parsedResponses.iterator().next()); - } else if (parsedResponses.size() > 1) { - responseAttachmentBuilder = GrpcResponseAttachment.Builder - .create("gRPC response (collection of elements from Server stream)") - .setBody("[" + String.join(",\n", parsedResponses) + "]"); - } - if (!status.isOk()) { - String description = status.getDescription(); - if (description == null) { - description = "No description provided"; - } - responseAttachmentBuilder = GrpcResponseAttachment.Builder - .create(status.getCode().name()) - .setStatus(description); - } - - requireNonNull(responseAttachmentBuilder).setStatus(status.toString()); - if (interceptResponseMetadata) { - for (String key : headers.keys()) { - requireNonNull(responseAttachmentBuilder).setMetadata( - key, - headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)) - ); - } - } - processor.addAttachment( - requireNonNull(responseAttachmentBuilder).build(), - new FreemarkerAttachmentRenderer(responseTemplatePath) - ); - - if (status.isOk() || !markStepFailedOnNonZeroCode) { - Allure.getLifecycle().updateStep(stepUuid, step -> step.setStatus(Status.PASSED)); - } else { - Allure.getLifecycle().updateStep(stepUuid, step -> step.setStatus(Status.FAILED)); - } - Allure.getLifecycle().stopStep(stepUuid); - stepUuid = null; + statusHolder.set(status); + metadataHolder.set(trailers); + shutdownAllureThread(); super.onClose(status, trailers); } @@ -187,26 +287,112 @@ public void onClose(io.grpc.Status status, Metadata trailers) { @Override public void onMessage(A message) { try { - parsedResponses.add(JSON_PRINTER.print((MessageOrBuilder) message)); + parsedResponses.add(GRPC_TO_JSON_PRINTER.print((MessageOrBuilder) message)); super.onMessage(message); } catch (InvalidProtocolBufferException e) { LOGGER.warn("Can`t parse gRPC response", e); - } catch (Throwable e) { - Allure.getLifecycle().updateStep(step -> - step.setStatus(ResultsUtils.getStatus(e).orElse(Status.BROKEN)) - .setStatusDetails(ResultsUtils.getStatusDetails(e).orElse(null)) - ); - Allure.getLifecycle().stopStep(stepUuid); - stepUuid = null; + } catch (Exception e) { + exceptionHolder.set(e); + shutdownAllureThread(); } } }; super.start(listener, headers); } - private String trimGrpcMethodName(final String source) { + private String stepBody(final Queue messages) { + JSON_PRETTY_PRINTER.indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE); + try { + return OBJECT_MAPPER.writer(JSON_PRETTY_PRINTER) + .writeValueAsString( + OBJECT_MAPPER.readValue( + "[" + String.join(",", messages) + "]", + ArrayNode.class + ) + ); + } catch (JsonProcessingException e) { + LOGGER.warn("Can`t parse collected gRPC messages"); + return ""; + } + } + + private String grpcMethodName(final String source) { return source.substring(source.lastIndexOf('/')); } + + private void shutdownAllureThread() { + interceptorLatch.countDown(); + executor.shutdown(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ex) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private String startStep(final AllureLifecycle lifecycle) { + final String stepUuid = UUID.randomUUID().toString(); + lifecycle.startStep(stepUuid, (new StepResult()).setName( + "Send " + method.getType().toString().toLowerCase() + " gRPC request to " + + next.authority() + + grpcMethodName(method.getFullMethodName()) + )); + return stepUuid; + } + + private void markStepBrokenByException(final AllureLifecycle lifecycle) { + final Throwable e = exceptionHolder.get(); + lifecycle.updateStep(stepResult -> + stepResult.setStatus(ResultsUtils.getStatus(e).orElse(Status.BROKEN)) + .setStatusDetails( + ResultsUtils.getStatusDetails(e).orElse(null) + ) + ); + } + + private GrpcRequestAttachment.Builder requestAttachment(final MethodDescriptor method) { + if (method.getType().clientSendsOneMessage()) { + return GrpcRequestAttachment.Builder + .create("gRPC request", method.getFullMethodName()) + .setBody(parsedRequests.element()); + } else { + return GrpcRequestAttachment.Builder + .create(String.format(STREAMING_MESSAGE, "Client"), method.getFullMethodName()) + .setBody(stepBody(parsedRequests)); + } + } + + private GrpcResponseAttachment.Builder responseAttachment(final MethodDescriptor method, + final io.grpc.Status status, + final Metadata metadata) { + final GrpcResponseAttachment.Builder result; + if (!status.isOk()) { + final String description = status.getDescription() == null + ? "No description provided" + : status.getDescription(); + result = GrpcResponseAttachment.Builder.create(status.getCode().name()) + .setStatus(status + " " + description); + } else { + if (method.getType().serverSendsOneMessage()) { + result = GrpcResponseAttachment.Builder.create("gRPC response") + .setBody(parsedResponses.element()) + .setStatus(status.toString()); + } else { + result = GrpcResponseAttachment.Builder.create(String.format(STREAMING_MESSAGE, "Server")) + .setBody(stepBody(parsedResponses)) + .setStatus(status.toString()); + } + } + if (interceptResponseMetadata) { + for (String key : metadata.keys()) { + result.setMetadata(key, metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))); + } + } + return result; + } }; } } diff --git a/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcRequestAttachment.java b/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcRequestAttachment.java index 39eeb7213..85d7a2936 100644 --- a/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcRequestAttachment.java +++ b/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcRequestAttachment.java @@ -19,6 +19,9 @@ import java.util.Objects; +/** + * @author dtuchs (Dmitrii Tuchs). + */ public class GrpcRequestAttachment implements AttachmentData { private final String name; diff --git a/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcResponseAttachment.java b/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcResponseAttachment.java index ffd2a9b33..6a79d0623 100644 --- a/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcResponseAttachment.java +++ b/allure-grpc/src/main/java/io/qameta/allure/grpc/GrpcResponseAttachment.java @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Objects; +/** + * @author dtuchs (Dmitrii Tuchs). + */ public class GrpcResponseAttachment implements AttachmentData { private final String name; diff --git a/allure-grpc/src/test/java/io/qameta/allure/grpc/AllureGrpcTest.java b/allure-grpc/src/test/java/io/qameta/allure/grpc/AllureGrpcTest.java index 6c83b9f03..881c34b90 100644 --- a/allure-grpc/src/test/java/io/qameta/allure/grpc/AllureGrpcTest.java +++ b/allure-grpc/src/test/java/io/qameta/allure/grpc/AllureGrpcTest.java @@ -19,57 +19,86 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; import io.qameta.allure.model.Attachment; import io.qameta.allure.model.StepResult; import io.qameta.allure.test.AllureResults; import org.grpcmock.GrpcMock; import org.grpcmock.junit5.GrpcMockExtension; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import java.util.Iterator; +import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import static io.qameta.allure.test.RunUtils.runWithinTestContext; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.grpcmock.GrpcMock.serverStreamingMethod; -import static org.grpcmock.GrpcMock.unaryMethod; +import static org.grpcmock.GrpcMock.*; /** - * @author dtuchs (Dmitry Tuchs). + * @author dtuchs (Dmitrii Tuchs). */ @ExtendWith(GrpcMockExtension.class) class AllureGrpcTest { - private static final String RESPONSE_MESSAGE = "Hello world!"; + private static final Response RESPONSE_MESSAGE = Response.newBuilder().setMessage("Hello world!").build(); + private static final ManagedChannel CHANNEL = ManagedChannelBuilder + .forAddress("localhost", GrpcMock.getGlobalPort()) + .intercept(new AllureGrpc()) + .usePlaintext() + .build(); - private ManagedChannel channel; private TestServiceGrpc.TestServiceBlockingStub blockingStub; + private TestServiceGrpc.TestServiceStub nonBlockingStub; @BeforeEach void configureMock() { - channel = ManagedChannelBuilder.forAddress("localhost", GrpcMock.getGlobalPort()) - .usePlaintext() - .build(); - blockingStub = TestServiceGrpc.newBlockingStub(channel) - .withInterceptors(new AllureGrpc()); + blockingStub = TestServiceGrpc.newBlockingStub(CHANNEL); + nonBlockingStub = TestServiceGrpc.newStub(CHANNEL); GrpcMock.stubFor(unaryMethod(TestServiceGrpc.getCalculateMethod()) - .willReturn(Response.newBuilder().setMessage(RESPONSE_MESSAGE).build())); + .willReturn(RESPONSE_MESSAGE)); + GrpcMock.stubFor(serverStreamingMethod(TestServiceGrpc.getCalculateServerStreamMethod()) .willReturn(asList( - Response.newBuilder().setMessage(RESPONSE_MESSAGE).build(), - Response.newBuilder().setMessage(RESPONSE_MESSAGE).build() + RESPONSE_MESSAGE, + RESPONSE_MESSAGE ))); + + GrpcMock.stubFor(clientStreamingMethod(TestServiceGrpc.getCalculateClientStreamMethod()) + .withFirstRequest(request -> "client".equals(request.getTopic())) + .willReturn(RESPONSE_MESSAGE)); + + GrpcMock.stubFor(bidiStreamingMethod(TestServiceGrpc.getCalculateBidirectionalStreamMethod()) + .withFirstRequest(request -> "bidirectional".equals(request.getTopic())) + .willProxyTo(responseObserver -> new StreamObserver() { + @Override + public void onNext(Request request) { + responseObserver.onNext(RESPONSE_MESSAGE); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + })); } - @AfterEach - void shutdownChannel() { - Optional.ofNullable(channel).ifPresent(ManagedChannel::shutdownNow); + @AfterAll + static void shutdownChannel() { + Optional.ofNullable(CHANNEL).ifPresent(ManagedChannel::shutdownNow); } @Test @@ -80,6 +109,8 @@ void shouldCreateRequestAttachment() { final AllureResults results = execute(request); + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); assertThat(results.getTestResults().get(0).getSteps()) .flatExtracting(StepResult::getAttachments) .extracting(Attachment::getName) @@ -94,6 +125,8 @@ void shouldCreateResponseAttachment() { final AllureResults results = execute(request); + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); assertThat(results.getTestResults().get(0).getSteps()) .flatExtracting(StepResult::getAttachments) .extracting(Attachment::getName) @@ -106,12 +139,59 @@ void shouldCreateResponseAttachmentForServerStreamingResponse() { .setTopic("1") .build(); - final AllureResults results = executeStreaming(request); + final AllureResults results = executeServerStreaming(request); + + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()) + .flatExtracting(StepResult::getAttachments) + .extracting(Attachment::getName) + .contains( + "gRPC request", + "gRPC messages (collection of elements from Server stream)" + ); + } + + @Test + void shouldCreateResponseAttachmentForClientStreamingResponse() { + final Request request = Request.newBuilder() + .setTopic("client") + .build(); + final Queue requests = new LinkedList<>(); + requests.add(request); + + final AllureResults results = executeClientStreaming(requests); + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); assertThat(results.getTestResults().get(0).getSteps()) .flatExtracting(StepResult::getAttachments) .extracting(Attachment::getName) - .contains("gRPC response (collection of elements from Server stream)"); + .contains( + "gRPC messages (collection of elements from Client stream)", + "gRPC response" + ); + } + + @Test + void shouldCreateResponseAttachmentForBidirectionalStreaming() { + final Request request = Request.newBuilder() + .setTopic("bidirectional") + .build(); + final Queue requests = new LinkedList<>(); + requests.add(request); + + final AllureResults results = executeBidirectionalStreaming(requests); + + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()) + .flatExtracting(StepResult::getAttachments) + .extracting(Attachment::getName) + .contains( + "gRPC messages (collection of elements from Client stream)", + "gRPC messages (collection of elements from Server stream)" + ); } @Test @@ -126,6 +206,8 @@ void shouldCreateResponseAttachmentOnStatusException() { final AllureResults results = executeException(request); + assertThat(results.getTestResults()).hasSize(1); + assertThat(results.getTestResults().get(0).getSteps()).hasSize(1); assertThat(results.getTestResults().get(0).getSteps()) .flatExtracting(StepResult::getAttachments) .extracting(Attachment::getName) @@ -136,28 +218,125 @@ protected final AllureResults execute(final Request request) { return runWithinTestContext(() -> { try { final Response response = blockingStub.calculate(request); - assertThat(response.getMessage()).isEqualTo(RESPONSE_MESSAGE); + assertThat(response).isEqualTo(RESPONSE_MESSAGE); } catch (Exception e) { throw new RuntimeException("Could not execute request " + request, e); } }); } - protected final AllureResults executeStreaming(final Request request) { + + protected final AllureResults executeServerStreaming(final Request request) { return runWithinTestContext(() -> { try { - Iterator responseIterator = blockingStub.calculateServerStream(request); - while (responseIterator.hasNext()) { - assertThat(responseIterator.next().getMessage()).isEqualTo(RESPONSE_MESSAGE); - } + final Queue responses = new ConcurrentLinkedQueue<>(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(Response response) { + responses.add(response); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + countDownLatch.countDown(); + } + }; + + nonBlockingStub.calculateServerStream(request, responseObserver); + countDownLatch.await(); + + // Can not execute AllureGrpc.await() in common place (@AfterEach method) + // Because runWithinTestContext() destroys AllureLifecycle after Runnable execution. + AllureGrpc.await(); + + assertThat(responses).allMatch(response -> response.equals(RESPONSE_MESSAGE)); } catch (Exception e) { throw new RuntimeException("Could not execute request " + request, e); } }); } + protected final AllureResults executeClientStreaming(final Queue requests) { + return runWithinTestContext(() -> { + try { + final Queue responses = new ConcurrentLinkedQueue<>(); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(Response response) { + responses.add(response); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + + StreamObserver requestObserver = nonBlockingStub.calculateClientStream(responseObserver); + while (!requests.isEmpty()) { + requestObserver.onNext(requests.poll()); + } + requestObserver.onCompleted(); + responseObserver.onCompleted(); + + // Can not execute AllureGrpc.await() in common place (@AfterEach method) + // Because runWithinTestContext() destroys AllureLifecycle after Runnable execution. + AllureGrpc.await(); + + assertThat(responses).allMatch(response -> response.equals(RESPONSE_MESSAGE)); + } catch (Exception e) { + throw new RuntimeException("Could not execute bidirectional request " + requests, e); + } + }); + } + + protected final AllureResults executeBidirectionalStreaming(final Queue requests) { + return runWithinTestContext(() -> { + try { + final Queue responses = new ConcurrentLinkedQueue<>(); + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(Response response) { + responses.add(response); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + + StreamObserver requestObserver = nonBlockingStub.calculateBidirectionalStream(responseObserver); + while (!requests.isEmpty()) { + requestObserver.onNext(requests.poll()); + } + requestObserver.onCompleted(); + responseObserver.onCompleted(); + + // Can not execute AllureGrpc.await() in common place (@AfterEach method) + // Because runWithinTestContext() destroys AllureLifecycle after Runnable execution. + AllureGrpc.await(); + + assertThat(responses).allMatch(response -> response.equals(RESPONSE_MESSAGE)); + } catch (Exception e) { + throw new RuntimeException("Could not execute bidirectional request " + requests, e); + } + }); + } + protected final AllureResults executeException(final Request request) { return runWithinTestContext(() -> { - assertThatExceptionOfType(StatusRuntimeException.class).isThrownBy(() -> blockingStub.calculate(request)); + assertThatExceptionOfType(StatusRuntimeException.class).isThrownBy(() -> blockingStub.calculate(request)); }); } } diff --git a/allure-grpc/src/test/proto/api.proto b/allure-grpc/src/test/proto/api.proto index 552e76f6c..01ea01a74 100644 --- a/allure-grpc/src/test/proto/api.proto +++ b/allure-grpc/src/test/proto/api.proto @@ -6,6 +6,8 @@ option java_package = "io.qameta.allure.grpc"; service TestService { rpc Calculate (Request) returns (Response); rpc CalculateServerStream (Request) returns (stream Response); + rpc CalculateClientStream (stream Request) returns (Response); + rpc CalculateBidirectionalStream (stream Request) returns (stream Response); } message Request { diff --git a/settings.gradle.kts b/settings.gradle.kts index 487d5b0ad..6ad3ae343 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -56,7 +56,7 @@ pluginManagement { id("io.qameta.allure-download") version "2.11.2" id("io.qameta.allure-report") version "2.11.2" id("io.spring.dependency-management") version "1.1.0" - id("com.google.protobuf") version "0.9.1" + id("com.google.protobuf") version "0.9.4" id("com.github.spotbugs") version "6.0.6" kotlin("jvm") version "1.7.10" } From 35388be43bbe78709599103743644076cf119bb0 Mon Sep 17 00:00:00 2001 From: dtuchs Date: Mon, 4 Mar 2024 16:57:26 +0600 Subject: [PATCH 2/2] small fix - replace put to putIfAbsent --- allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java b/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java index 1cd8adf38..c5f6bea69 100644 --- a/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java +++ b/allure-grpc/src/main/java/io/qameta/allure/grpc/AllureGrpc.java @@ -193,7 +193,7 @@ public void sendMessage(T message) { caseUuid = lifecycle.getCurrentTestCase().orElseThrow( () -> new IllegalStateException("No test case started") ); - TEST_CASE_HOLDER.put(caseUuid, new CountDownLatch(1)); + TEST_CASE_HOLDER.putIfAbsent(caseUuid, new CountDownLatch(1)); if (!allureThreadStarted.get()) { executor.submit(new Runnable() {