diff --git a/CHANGELOG.md b/CHANGELOG.md index 59f20e76..f5d475ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - support multiple mds for test case for BICEPS.R5042 - inconsistent messaging in SDCcc logs ("No problems were found" and "Test run was invalid" one after another.) - incorrect behavior of the configuration option SDCcc.SummarizeMessageEncodingErrors +- SequenceIds are now ordered by the timestamp of the first message that used them ### Removed diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java index 6584d989..13762fad 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java @@ -77,9 +77,12 @@ import org.apache.commons.io.input.BOMInputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.hibernate.ScrollMode; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.Transaction; +import org.hibernate.query.spi.ScrollableResultsImplementor; +import org.hibernate.query.spi.StreamDecorator; import org.somda.sdc.dpws.CommunicationLog; import org.somda.sdc.dpws.soap.ApplicationInfo; import org.somda.sdc.dpws.soap.CommunicationContext; @@ -1079,6 +1082,7 @@ private void awaitFlushBarrier() { /** * Retrieves all SequenceId attribute values that have been seen. + * Orders them by the timestamp of the first message that used the respective SequenceId. * * @return stream of all SequenceId attribute values that have been seen * @throws IOException if storage is closed @@ -1090,17 +1094,20 @@ public Stream getUniqueSequenceIds() throws IOException { throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE); } - final CriteriaQuery criteria; - + final CriteriaQuery messageContentQuery; try (final Session session = sessionFactory.openSession()) { + session.beginTransaction(); + final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder(); - criteria = criteriaBuilder.createQuery(String.class); - final Root mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class); - criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId)); - criteria.distinct(true); + messageContentQuery = criteriaBuilder.createQuery(String.class); + final Root messageContentRoot = messageContentQuery.from(MessageContent.class); + messageContentQuery.select( + messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId)); + + messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp))); } - return this.getQueryResult(criteria); + return this.getOrderedQueryResult(messageContentQuery).distinct(); } /** @@ -1693,11 +1700,11 @@ public GetterResult getInboundMessagesByTimeIntervalAndBodyType( } final boolean present; - try (final Stream countingStream = this.getQueryResult(messageContentQuery)) { + try (final Stream countingStream = this.getOrderedQueryResult(messageContentQuery)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(messageContentQuery), present); + return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present); } /** @@ -1773,11 +1780,11 @@ public GetterResult getInboundMessagesByTimestampAndBodyType( } final boolean present; - try (final Stream countingStream = this.getQueryResult(messageContentQuery)) { + try (final Stream countingStream = this.getOrderedQueryResult(messageContentQuery)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(messageContentQuery), present); + return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present); } /** @@ -1817,11 +1824,11 @@ public GetterResult getManipulationDataByManipulation(final St } final boolean present; - try (final Stream countingStream = this.getQueryResult(criteria)) { + try (final Stream countingStream = this.getOrderedQueryResult(criteria)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(criteria), present); + return new GetterResult<>(this.getOrderedQueryResult(criteria), present); } /** @@ -1886,10 +1893,10 @@ public GetterResult getManipulationDataByParametersAndManipula criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0])))); } final boolean present; - try (final Stream countingStream = this.getQueryResult(criteria)) { + try (final Stream countingStream = this.getOrderedQueryResult(criteria)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(criteria), present); + return new GetterResult<>(this.getOrderedQueryResult(criteria), present); } private Stream getQueryResult(final CriteriaQuery criteriaQuery) { @@ -1902,6 +1909,16 @@ private Stream getQueryResult(final CriteriaQuery criteriaQuery) { .onClose(resultIterator::close); } + private Stream getOrderedQueryResult(final CriteriaQuery criteriaQuery) { + final Session session = sessionFactory.openSession(); + final Stream results = getOrderedStreamForQuery(session, criteriaQuery); + + final ResultIterator resultIterator = new ResultIterator<>(session, results); + + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false) + .onClose(resultIterator::close); + } + // be aware, that this does not use evict on cached objects private Stream getStreamForQuery(final CriteriaQuery criteriaQuery) { final Session session = sessionFactory.openSession(); @@ -1926,6 +1943,23 @@ private Stream getStreamForQuery(final Session session, final CriteriaQue .stream(); } + // be aware, that this does not use evict on cached objects + private Stream getOrderedStreamForQuery(final Session session, final CriteriaQuery criteriaQuery) { + // The stream provided by Hibernate does not have the ORDERED characteristic. + // We hence build our own. + final ScrollableResultsImplementor scrollableResults = + (ScrollableResultsImplementor) session.createQuery(criteriaQuery) + .setReadOnly(true) + .setCacheable(false) + .setFetchSize(FETCH_SIZE) + .scroll(ScrollMode.FORWARD_ONLY); + final OrderedStreamIterator iterator = new OrderedStreamIterator<>(scrollableResults); + final Spliterator spliterator = + Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED); + + return (Stream) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close); + } + private void transmit(final List results) { try (final Session session = sessionFactory.openSession()) { final Transaction transaction = session.beginTransaction(); diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt new file mode 100644 index 00000000..5a97116c --- /dev/null +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt @@ -0,0 +1,35 @@ +package com.draeger.medical.sdccc.messages + +import org.hibernate.query.spi.CloseableIterator +import org.hibernate.query.spi.ScrollableResultsImplementor + +/** + * Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults. + */ +class OrderedStreamIterator(private val results: ScrollableResultsImplementor) : CloseableIterator { + + override fun close() { + results.close() + } + + override fun remove() { + error("not yet implemented!") + } + + override fun hasNext(): Boolean { + if (results.isClosed) { + return false + } + return results.next() + } + + override fun next(): T { + val element = results.get() + @Suppress("UNCHECKED_CAST") + return if (element.size == 1) { + element[0] + } else { + element + } as T + } +} \ No newline at end of file diff --git a/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java b/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java index 03ea3526..38ca798a 100644 --- a/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java +++ b/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java @@ -56,6 +56,7 @@ import javax.xml.namespace.QName; import org.apache.commons.io.ByteOrderMark; import org.apache.commons.lang3.tuple.Pair; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException, 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException } } + /** + * Tests whether SequenceId values are ordered by the timestamp of the first message they appear in. + * + * @param dir message storage directory + * @throws IOException on io exceptions + * @throws CertificateException on certificate exceptions + */ + @Test + public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException { + try (final MessageStorage messageStorage = new MessageStorage( + 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { + final ListMultimap multimap = ArrayListMultimap.create(); + + final CommunicationContext headerContext = getCommunicationContext(multimap); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Stream sequenceIdStream = messageStorage.getUniqueSequenceIds()) { + assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList()); + } + + try (final MessageStorage.GetterResult inboundMessages = + messageStorage.getInboundMessages()) { + assertEquals(3, inboundMessages.getStream().count()); + } + } + } + + private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap multimap) + throws CertificateException, IOException { + final String transactionId = "transactionId"; + final String requestUri = "requestUri"; + + final X509Certificate certificate = CertificateUtil.getDummyCert(); + return new CommunicationContext( + new HttpApplicationInfo(multimap, transactionId, requestUri), + new TransportInfo( + Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), + null); + } + /** * Tests whether headers and the transaction id are stored properly. *