From 729356b6eb639ad978e14ae9283cf2284e2f76cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn?= Date: Fri, 29 Nov 2024 12:56:53 +0100 Subject: [PATCH] Fix for the SequenceId Ordering Problem (#221) SequenceIds are currently ordered lexicographically, which is wrong as the SDC standards define no order on them. We hence order them by the timestamp of the first message that they were used in. # Checklist The following aspects have been respected by the author of this pull request, confirmed by both pull request assignee **and** reviewer: * Adherence to coding conventions * [x] Pull Request Assignee * [x] Reviewer (midtuna) * Adherence to javadoc conventions * [x] Pull Request Assignee * [x] Reviewer (midtuna) * Changelog update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * README update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * config update (necessity checked and entry added or not added respectively) * [x] Pull Request Assignee * [x] Reviewer (midtuna) * SDCcc executable ran against a test device (if necessary) * [x] Pull Request Assignee * [x] Reviewer (midtuna) --------- Co-authored-by: midttuna <162321757+midttuna@users.noreply.github.com> --- CHANGELOG.md | 1 + .../sdccc/messages/MessageStorage.java | 64 ++++++++--- .../sdccc/messages/OrderedStreamIterator.kt | 45 ++++++++ .../sdccc/messages/TestMessageStorage.java | 103 +++++++++++++----- 4 files changed, 171 insertions(+), 42 deletions(-) create mode 100644 sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index b513d737..9778074f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - support multiple mds for test case for BICEPS.R5042 - inconsistent messaging in SDCcc logs ("No problems were found" and "Test run was invalid" one after another.) - incorrect behavior of the configuration option SDCcc.SummarizeMessageEncodingErrors +- SequenceIds are now ordered by the timestamp of the first message that used them ### Removed diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java index 6584d989..13762fad 100644 --- a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/MessageStorage.java @@ -77,9 +77,12 @@ import org.apache.commons.io.input.BOMInputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.hibernate.ScrollMode; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.Transaction; +import org.hibernate.query.spi.ScrollableResultsImplementor; +import org.hibernate.query.spi.StreamDecorator; import org.somda.sdc.dpws.CommunicationLog; import org.somda.sdc.dpws.soap.ApplicationInfo; import org.somda.sdc.dpws.soap.CommunicationContext; @@ -1079,6 +1082,7 @@ private void awaitFlushBarrier() { /** * Retrieves all SequenceId attribute values that have been seen. + * Orders them by the timestamp of the first message that used the respective SequenceId. * * @return stream of all SequenceId attribute values that have been seen * @throws IOException if storage is closed @@ -1090,17 +1094,20 @@ public Stream getUniqueSequenceIds() throws IOException { throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE); } - final CriteriaQuery criteria; - + final CriteriaQuery messageContentQuery; try (final Session session = sessionFactory.openSession()) { + session.beginTransaction(); + final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder(); - criteria = criteriaBuilder.createQuery(String.class); - final Root mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class); - criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId)); - criteria.distinct(true); + messageContentQuery = criteriaBuilder.createQuery(String.class); + final Root messageContentRoot = messageContentQuery.from(MessageContent.class); + messageContentQuery.select( + messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId)); + + messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp))); } - return this.getQueryResult(criteria); + return this.getOrderedQueryResult(messageContentQuery).distinct(); } /** @@ -1693,11 +1700,11 @@ public GetterResult getInboundMessagesByTimeIntervalAndBodyType( } final boolean present; - try (final Stream countingStream = this.getQueryResult(messageContentQuery)) { + try (final Stream countingStream = this.getOrderedQueryResult(messageContentQuery)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(messageContentQuery), present); + return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present); } /** @@ -1773,11 +1780,11 @@ public GetterResult getInboundMessagesByTimestampAndBodyType( } final boolean present; - try (final Stream countingStream = this.getQueryResult(messageContentQuery)) { + try (final Stream countingStream = this.getOrderedQueryResult(messageContentQuery)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(messageContentQuery), present); + return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present); } /** @@ -1817,11 +1824,11 @@ public GetterResult getManipulationDataByManipulation(final St } final boolean present; - try (final Stream countingStream = this.getQueryResult(criteria)) { + try (final Stream countingStream = this.getOrderedQueryResult(criteria)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(criteria), present); + return new GetterResult<>(this.getOrderedQueryResult(criteria), present); } /** @@ -1886,10 +1893,10 @@ public GetterResult getManipulationDataByParametersAndManipula criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0])))); } final boolean present; - try (final Stream countingStream = this.getQueryResult(criteria)) { + try (final Stream countingStream = this.getOrderedQueryResult(criteria)) { present = countingStream.findAny().isPresent(); } - return new GetterResult<>(this.getQueryResult(criteria), present); + return new GetterResult<>(this.getOrderedQueryResult(criteria), present); } private Stream getQueryResult(final CriteriaQuery criteriaQuery) { @@ -1902,6 +1909,16 @@ private Stream getQueryResult(final CriteriaQuery criteriaQuery) { .onClose(resultIterator::close); } + private Stream getOrderedQueryResult(final CriteriaQuery criteriaQuery) { + final Session session = sessionFactory.openSession(); + final Stream results = getOrderedStreamForQuery(session, criteriaQuery); + + final ResultIterator resultIterator = new ResultIterator<>(session, results); + + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false) + .onClose(resultIterator::close); + } + // be aware, that this does not use evict on cached objects private Stream getStreamForQuery(final CriteriaQuery criteriaQuery) { final Session session = sessionFactory.openSession(); @@ -1926,6 +1943,23 @@ private Stream getStreamForQuery(final Session session, final CriteriaQue .stream(); } + // be aware, that this does not use evict on cached objects + private Stream getOrderedStreamForQuery(final Session session, final CriteriaQuery criteriaQuery) { + // The stream provided by Hibernate does not have the ORDERED characteristic. + // We hence build our own. + final ScrollableResultsImplementor scrollableResults = + (ScrollableResultsImplementor) session.createQuery(criteriaQuery) + .setReadOnly(true) + .setCacheable(false) + .setFetchSize(FETCH_SIZE) + .scroll(ScrollMode.FORWARD_ONLY); + final OrderedStreamIterator iterator = new OrderedStreamIterator<>(scrollableResults); + final Spliterator spliterator = + Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED); + + return (Stream) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close); + } + private void transmit(final List results) { try (final Session session = sessionFactory.openSession()) { final Transaction transaction = session.beginTransaction(); diff --git a/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt new file mode 100644 index 00000000..4270c826 --- /dev/null +++ b/sdccc/src/main/java/com/draeger/medical/sdccc/messages/OrderedStreamIterator.kt @@ -0,0 +1,45 @@ +/* + * This Source Code Form is subject to the terms of the MIT License. + * Copyright (c) 2023-2024 Draegerwerk AG & Co. KGaA. + * + * SPDX-License-Identifier: MIT + */ + +package com.draeger.medical.sdccc.messages + +import org.hibernate.query.spi.CloseableIterator +import org.hibernate.query.spi.ScrollableResultsImplementor + +/** + * Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults. + */ +class OrderedStreamIterator(private val results: ScrollableResultsImplementor) : CloseableIterator { + + override fun close() { + results.close() + } + + override fun remove() { + throw UnsupportedOperationException( + "this stream does not support the" + + " remove operation" + ) + } + + override fun hasNext(): Boolean { + if (results.isClosed) { + return false + } + return results.next() + } + + override fun next(): T { + val element = results.get() + @Suppress("UNCHECKED_CAST") + return if (element.size == 1) { + element[0] + } else { + element + } as T + } +} diff --git a/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java b/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java index 03ea3526..38ca798a 100644 --- a/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java +++ b/sdccc/src/test/java/com/draeger/medical/sdccc/messages/TestMessageStorage.java @@ -56,6 +56,7 @@ import javax.xml.namespace.QName; import org.apache.commons.io.ByteOrderMark; import org.apache.commons.lang3.tuple.Pair; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException, 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { final ListMultimap multimap = ArrayListMultimap.create(); - final String transactionId = "transactionId"; - final String requestUri = "requestUri"; - - final X509Certificate certificate = CertificateUtil.getDummyCert(); - final CommunicationContext headerContext = new CommunicationContext( - new HttpApplicationInfo(multimap, transactionId, requestUri), - new TransportInfo( - Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), - null); + final CommunicationContext headerContext = getCommunicationContext(multimap); try (final Message message = new Message( CommunicationLog.Direction.INBOUND, @@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException } } + /** + * Tests whether SequenceId values are ordered by the timestamp of the first message they appear in. + * + * @param dir message storage directory + * @throws IOException on io exceptions + * @throws CertificateException on certificate exceptions + */ + @Test + public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException { + try (final MessageStorage messageStorage = new MessageStorage( + 1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) { + final ListMultimap multimap = ArrayListMultimap.create(); + + final CommunicationContext headerContext = getCommunicationContext(multimap); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Message message = new Message( + CommunicationLog.Direction.INBOUND, + CommunicationLog.MessageType.REQUEST, + headerContext, + messageStorage)) { + message.write(String.format( + BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1")) + .getBytes(StandardCharsets.UTF_8)); + } + messageStorage.flush(); + + try (final Stream sequenceIdStream = messageStorage.getUniqueSequenceIds()) { + assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList()); + } + + try (final MessageStorage.GetterResult inboundMessages = + messageStorage.getInboundMessages()) { + assertEquals(3, inboundMessages.getStream().count()); + } + } + } + + private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap multimap) + throws CertificateException, IOException { + final String transactionId = "transactionId"; + final String requestUri = "requestUri"; + + final X509Certificate certificate = CertificateUtil.getDummyCert(); + return new CommunicationContext( + new HttpApplicationInfo(multimap, transactionId, requestUri), + new TransportInfo( + Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)), + null); + } + /** * Tests whether headers and the transaction id are stored properly. *