Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for the SequenceId Ordering Problem #221

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- support multiple mds for test case for BICEPS.R5042
- inconsistent messaging in SDCcc logs ("No problems were found" and "Test run was invalid" one after another.)
- incorrect behavior of the configuration option SDCcc.SummarizeMessageEncodingErrors
- SequenceIds are now ordered by the timestamp of the first message that used them

### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@
import org.apache.commons.io.input.BOMInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.ScrollMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.StreamDecorator;
import org.somda.sdc.dpws.CommunicationLog;
import org.somda.sdc.dpws.soap.ApplicationInfo;
import org.somda.sdc.dpws.soap.CommunicationContext;
Expand Down Expand Up @@ -1079,6 +1082,7 @@ private void awaitFlushBarrier() {

/**
* Retrieves all SequenceId attribute values that have been seen.
* Orders them by the timestamp of the first message that used the respective SequenceId.
*
* @return stream of all SequenceId attribute values that have been seen
* @throws IOException if storage is closed
Expand All @@ -1090,17 +1094,20 @@ public Stream<String> getUniqueSequenceIds() throws IOException {
throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE);
}

final CriteriaQuery<String> criteria;

final CriteriaQuery<String> messageContentQuery;
try (final Session session = sessionFactory.openSession()) {
session.beginTransaction();

final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder();
criteria = criteriaBuilder.createQuery(String.class);
final Root<MdibVersionGroupEntity> mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class);
criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId));
criteria.distinct(true);
messageContentQuery = criteriaBuilder.createQuery(String.class);
final Root<MessageContent> messageContentRoot = messageContentQuery.from(MessageContent.class);
messageContentQuery.select(
messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId));

messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp)));
}

return this.getQueryResult(criteria);
return this.getOrderedQueryResult(messageContentQuery).distinct();
}

/**
Expand Down Expand Up @@ -1693,11 +1700,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimeIntervalAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1773,11 +1780,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimestampAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1817,11 +1824,11 @@ public GetterResult<ManipulationData> getManipulationDataByManipulation(final St
}

final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

/**
Expand Down Expand Up @@ -1886,10 +1893,10 @@ public GetterResult<ManipulationData> getManipulationDataByParametersAndManipula
criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0]))));
}
final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}
return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
Expand All @@ -1902,6 +1909,16 @@ private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
.onClose(resultIterator::close);
}

private <T> Stream<T> getOrderedQueryResult(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
final Stream<T> results = getOrderedStreamForQuery(session, criteriaQuery);

final ResultIterator<T> resultIterator = new ResultIterator<>(session, results);

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
.onClose(resultIterator::close);
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getStreamForQuery(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
Expand All @@ -1926,6 +1943,23 @@ private <T> Stream<T> getStreamForQuery(final Session session, final CriteriaQue
.stream();
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getOrderedStreamForQuery(final Session session, final CriteriaQuery<T> criteriaQuery) {
// The stream provided by Hibernate does not have the ORDERED characteristic.
// We hence build our own.
final ScrollableResultsImplementor scrollableResults =
(ScrollableResultsImplementor) session.createQuery(criteriaQuery)
.setReadOnly(true)
.setCacheable(false)
.setFetchSize(FETCH_SIZE)
.scroll(ScrollMode.FORWARD_ONLY);
final OrderedStreamIterator<T> iterator = new OrderedStreamIterator<>(scrollableResults);
final Spliterator<T> spliterator =
Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED);

return (Stream<T>) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close);
}

private void transmit(final List<DatabaseEntry> results) {
try (final Session session = sessionFactory.openSession()) {
final Transaction transaction = session.beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* This Source Code Form is subject to the terms of the MIT License.
* Copyright (c) 2023-2024 Draegerwerk AG & Co. KGaA.
*
* SPDX-License-Identifier: MIT
*/

package com.draeger.medical.sdccc.messages

import org.hibernate.query.spi.CloseableIterator
import org.hibernate.query.spi.ScrollableResultsImplementor

/**
* Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults.
*/
class OrderedStreamIterator<T>(private val results: ScrollableResultsImplementor) : CloseableIterator<T> {

override fun close() {
results.close()
}

override fun remove() {
error("not yet implemented!")
}

override fun hasNext(): Boolean {
if (results.isClosed) {
return false
}
return results.next()
midttuna marked this conversation as resolved.
Show resolved Hide resolved
}

override fun next(): T {
val element = results.get()
@Suppress("UNCHECKED_CAST")
return if (element.size == 1) {
element[0]
} else {
element
} as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.xml.namespace.QName;
import org.apache.commons.io.ByteOrderMark;
import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException,
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
}
}

/**
* Tests whether SequenceId values are ordered by the timestamp of the first message they appear in.
*
* @param dir message storage directory
* @throws IOException on io exceptions
* @throws CertificateException on certificate exceptions
*/
@Test
public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException {
try (final MessageStorage messageStorage = new MessageStorage(
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Stream<String> sequenceIdStream = messageStorage.getUniqueSequenceIds()) {
assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList());
}

try (final MessageStorage.GetterResult<MessageContent> inboundMessages =
messageStorage.getInboundMessages()) {
assertEquals(3, inboundMessages.getStream().count());
}
}
}

private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap<String, String> multimap)
throws CertificateException, IOException {
final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
return new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
}

/**
* Tests whether headers and the transaction id are stored properly.
*
Expand Down
Loading