Skip to content

Commit

Permalink
Fix for the SequenceId Ordering Problem (#221)
Browse files Browse the repository at this point in the history
SequenceIds are currently ordered lexicographically, which is wrong as
the SDC standards define no order on them. We hence order them by the
timestamp of the first message that they were used in.

# Checklist

The following aspects have been respected by the author of this pull
request, confirmed by both pull request assignee **and** reviewer:

* Adherence to coding conventions
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)
* Adherence to javadoc conventions
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)
* Changelog update (necessity checked and entry added or not added
respectively)
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)
* README update (necessity checked and entry added or not added
respectively)
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)
* config update (necessity checked and entry added or not added
respectively)
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)
* SDCcc executable ran against a test device (if necessary)
  * [x] Pull Request Assignee
  * [x] Reviewer (midtuna)

---------

Co-authored-by: midttuna <[email protected]>
  • Loading branch information
ben-Draeger and midttuna authored Nov 29, 2024
1 parent c7ff37e commit 729356b
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- support multiple mds for test case for BICEPS.R5042
- inconsistent messaging in SDCcc logs ("No problems were found" and "Test run was invalid" one after another.)
- incorrect behavior of the configuration option SDCcc.SummarizeMessageEncodingErrors
- SequenceIds are now ordered by the timestamp of the first message that used them

### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@
import org.apache.commons.io.input.BOMInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.ScrollMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.StreamDecorator;
import org.somda.sdc.dpws.CommunicationLog;
import org.somda.sdc.dpws.soap.ApplicationInfo;
import org.somda.sdc.dpws.soap.CommunicationContext;
Expand Down Expand Up @@ -1079,6 +1082,7 @@ private void awaitFlushBarrier() {

/**
* Retrieves all SequenceId attribute values that have been seen.
* Orders them by the timestamp of the first message that used the respective SequenceId.
*
* @return stream of all SequenceId attribute values that have been seen
* @throws IOException if storage is closed
Expand All @@ -1090,17 +1094,20 @@ public Stream<String> getUniqueSequenceIds() throws IOException {
throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE);
}

final CriteriaQuery<String> criteria;

final CriteriaQuery<String> messageContentQuery;
try (final Session session = sessionFactory.openSession()) {
session.beginTransaction();

final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder();
criteria = criteriaBuilder.createQuery(String.class);
final Root<MdibVersionGroupEntity> mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class);
criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId));
criteria.distinct(true);
messageContentQuery = criteriaBuilder.createQuery(String.class);
final Root<MessageContent> messageContentRoot = messageContentQuery.from(MessageContent.class);
messageContentQuery.select(
messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId));

messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp)));
}

return this.getQueryResult(criteria);
return this.getOrderedQueryResult(messageContentQuery).distinct();
}

/**
Expand Down Expand Up @@ -1693,11 +1700,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimeIntervalAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1773,11 +1780,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimestampAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1817,11 +1824,11 @@ public GetterResult<ManipulationData> getManipulationDataByManipulation(final St
}

final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

/**
Expand Down Expand Up @@ -1886,10 +1893,10 @@ public GetterResult<ManipulationData> getManipulationDataByParametersAndManipula
criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0]))));
}
final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}
return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
Expand All @@ -1902,6 +1909,16 @@ private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
.onClose(resultIterator::close);
}

private <T> Stream<T> getOrderedQueryResult(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
final Stream<T> results = getOrderedStreamForQuery(session, criteriaQuery);

final ResultIterator<T> resultIterator = new ResultIterator<>(session, results);

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
.onClose(resultIterator::close);
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getStreamForQuery(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
Expand All @@ -1926,6 +1943,23 @@ private <T> Stream<T> getStreamForQuery(final Session session, final CriteriaQue
.stream();
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getOrderedStreamForQuery(final Session session, final CriteriaQuery<T> criteriaQuery) {
// The stream provided by Hibernate does not have the ORDERED characteristic.
// We hence build our own.
final ScrollableResultsImplementor scrollableResults =
(ScrollableResultsImplementor) session.createQuery(criteriaQuery)
.setReadOnly(true)
.setCacheable(false)
.setFetchSize(FETCH_SIZE)
.scroll(ScrollMode.FORWARD_ONLY);
final OrderedStreamIterator<T> iterator = new OrderedStreamIterator<>(scrollableResults);
final Spliterator<T> spliterator =
Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED);

return (Stream<T>) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close);
}

private void transmit(final List<DatabaseEntry> results) {
try (final Session session = sessionFactory.openSession()) {
final Transaction transaction = session.beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* This Source Code Form is subject to the terms of the MIT License.
* Copyright (c) 2023-2024 Draegerwerk AG & Co. KGaA.
*
* SPDX-License-Identifier: MIT
*/

package com.draeger.medical.sdccc.messages

import org.hibernate.query.spi.CloseableIterator
import org.hibernate.query.spi.ScrollableResultsImplementor

/**
* Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults.
*/
class OrderedStreamIterator<T>(private val results: ScrollableResultsImplementor) : CloseableIterator<T> {

override fun close() {
results.close()
}

override fun remove() {
throw UnsupportedOperationException(
"this stream does not support the" +
" remove operation"
)
}

override fun hasNext(): Boolean {
if (results.isClosed) {
return false
}
return results.next()
}

override fun next(): T {
val element = results.get()
@Suppress("UNCHECKED_CAST")
return if (element.size == 1) {
element[0]
} else {
element
} as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.xml.namespace.QName;
import org.apache.commons.io.ByteOrderMark;
import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException,
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
}
}

/**
* Tests whether SequenceId values are ordered by the timestamp of the first message they appear in.
*
* @param dir message storage directory
* @throws IOException on io exceptions
* @throws CertificateException on certificate exceptions
*/
@Test
public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException {
try (final MessageStorage messageStorage = new MessageStorage(
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Stream<String> sequenceIdStream = messageStorage.getUniqueSequenceIds()) {
assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList());
}

try (final MessageStorage.GetterResult<MessageContent> inboundMessages =
messageStorage.getInboundMessages()) {
assertEquals(3, inboundMessages.getStream().count());
}
}
}

private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap<String, String> multimap)
throws CertificateException, IOException {
final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
return new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
}

/**
* Tests whether headers and the transaction id are stored properly.
*
Expand Down

0 comments on commit 729356b

Please sign in to comment.