Skip to content

Commit

Permalink
Max's Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-Draeger committed Nov 14, 2024
1 parent ebe0e09 commit 60dc36b
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- support multiple mds for test case for BICEPS.R5042
- inconsistent messaging in SDCcc logs ("No problems were found" and "Test run was invalid" one after another.)
- incorrect behavior of the configuration option SDCcc.SummarizeMessageEncodingErrors
- SequenceIds are now ordered by the timestamp of the first message that used them

### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@
import org.apache.commons.io.input.BOMInputStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.ScrollMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.StreamDecorator;
import org.somda.sdc.dpws.CommunicationLog;
import org.somda.sdc.dpws.soap.ApplicationInfo;
import org.somda.sdc.dpws.soap.CommunicationContext;
Expand Down Expand Up @@ -1079,6 +1082,7 @@ private void awaitFlushBarrier() {

/**
* Retrieves all SequenceId attribute values that have been seen.
* Orders them by the timestamp of the first message that used the respective SequenceId.
*
* @return stream of all SequenceId attribute values that have been seen
* @throws IOException if storage is closed
Expand All @@ -1090,17 +1094,20 @@ public Stream<String> getUniqueSequenceIds() throws IOException {
throw new IOException(GET_UNIQUE_SEQUENCE_IDS_CALLED_ON_CLOSED_STORAGE);
}

final CriteriaQuery<String> criteria;

final CriteriaQuery<String> messageContentQuery;
try (final Session session = sessionFactory.openSession()) {
session.beginTransaction();

final CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder();
criteria = criteriaBuilder.createQuery(String.class);
final Root<MdibVersionGroupEntity> mdibVersionGroupEntityRoot = criteria.from(MdibVersionGroupEntity.class);
criteria.select(mdibVersionGroupEntityRoot.get(MdibVersionGroupEntity_.sequenceId));
criteria.distinct(true);
messageContentQuery = criteriaBuilder.createQuery(String.class);
final Root<MessageContent> messageContentRoot = messageContentQuery.from(MessageContent.class);
messageContentQuery.select(
messageContentRoot.join(MessageContent_.mdibVersionGroups).get(MdibVersionGroupEntity_.sequenceId));

messageContentQuery.orderBy(criteriaBuilder.asc(messageContentRoot.get(MessageContent_.nanoTimestamp)));
}

return this.getQueryResult(criteria);
return this.getOrderedQueryResult(messageContentQuery).distinct();
}

/**
Expand Down Expand Up @@ -1693,11 +1700,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimeIntervalAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1773,11 +1780,11 @@ public GetterResult<MessageContent> getInboundMessagesByTimestampAndBodyType(
}

final boolean present;
try (final Stream<MessageContent> countingStream = this.getQueryResult(messageContentQuery)) {
try (final Stream<MessageContent> countingStream = this.getOrderedQueryResult(messageContentQuery)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(messageContentQuery), present);
return new GetterResult<>(this.getOrderedQueryResult(messageContentQuery), present);
}

/**
Expand Down Expand Up @@ -1817,11 +1824,11 @@ public GetterResult<ManipulationData> getManipulationDataByManipulation(final St
}

final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}

return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

/**
Expand Down Expand Up @@ -1886,10 +1893,10 @@ public GetterResult<ManipulationData> getManipulationDataByParametersAndManipula
criteriaBuilder.and(parameterExistPredicates.toArray(new Predicate[0]))));
}
final boolean present;
try (final Stream<ManipulationData> countingStream = this.getQueryResult(criteria)) {
try (final Stream<ManipulationData> countingStream = this.getOrderedQueryResult(criteria)) {
present = countingStream.findAny().isPresent();
}
return new GetterResult<>(this.getQueryResult(criteria), present);
return new GetterResult<>(this.getOrderedQueryResult(criteria), present);
}

private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
Expand All @@ -1902,6 +1909,16 @@ private <T> Stream<T> getQueryResult(final CriteriaQuery<T> criteriaQuery) {
.onClose(resultIterator::close);
}

private <T> Stream<T> getOrderedQueryResult(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
final Stream<T> results = getOrderedStreamForQuery(session, criteriaQuery);

final ResultIterator<T> resultIterator = new ResultIterator<>(session, results);

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false)
.onClose(resultIterator::close);
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getStreamForQuery(final CriteriaQuery<T> criteriaQuery) {
final Session session = sessionFactory.openSession();
Expand All @@ -1926,6 +1943,23 @@ private <T> Stream<T> getStreamForQuery(final Session session, final CriteriaQue
.stream();
}

// be aware, that this does not use evict on cached objects
private <T> Stream<T> getOrderedStreamForQuery(final Session session, final CriteriaQuery<T> criteriaQuery) {
// The stream provided by Hibernate does not have the ORDERED characteristic.
// We hence build our own.
final ScrollableResultsImplementor scrollableResults =
(ScrollableResultsImplementor) session.createQuery(criteriaQuery)
.setReadOnly(true)
.setCacheable(false)
.setFetchSize(FETCH_SIZE)
.scroll(ScrollMode.FORWARD_ONLY);
final OrderedStreamIterator<T> iterator = new OrderedStreamIterator<>(scrollableResults);
final Spliterator<T> spliterator =
Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL | Spliterator.ORDERED);

return (Stream<T>) new StreamDecorator(StreamSupport.stream(spliterator, false), scrollableResults::close);
}

private void transmit(final List<DatabaseEntry> results) {
try (final Session session = sessionFactory.openSession()) {
final Transaction transaction = session.beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.draeger.medical.sdccc.messages

import org.hibernate.query.spi.CloseableIterator
import org.hibernate.query.spi.ScrollableResultsImplementor

/**
* Iterator to be used to create Streams with the ORDERED characteristic from ScrollableResults.
*/
class OrderedStreamIterator<T>(private val results: ScrollableResultsImplementor) : CloseableIterator<T> {

override fun close() {
results.close()
}

override fun remove() {
error("not yet implemented!")
}

override fun hasNext(): Boolean {
if (results.isClosed) {
return false
}
return results.next()
}

override fun next(): T {
val element = results.get()
@Suppress("UNCHECKED_CAST")
return if (element.size == 1) {
element[0]
} else {
element
} as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import javax.xml.namespace.QName;
import org.apache.commons.io.ByteOrderMark;
import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -155,15 +156,7 @@ public void testMdibVersionOverflow(@TempDir final File dir) throws IOException,
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -196,15 +189,7 @@ public void testMdibVersionCloseToOverflow(@TempDir final File dir) throws IOExc
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -248,15 +233,7 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
final CommunicationContext headerContext = new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
Expand Down Expand Up @@ -302,6 +279,78 @@ public void testGetUniqueSequenceIds(@TempDir final File dir) throws IOException
}
}

/**
* Tests whether SequenceId values are ordered by the timestamp of the first message they appear in.
*
* @param dir message storage directory
* @throws IOException on io exceptions
* @throws CertificateException on certificate exceptions
*/
@Test
public void testGetUniqueSequenceIdsOrdering(@TempDir final File dir) throws IOException, CertificateException {
try (final MessageStorage messageStorage = new MessageStorage(
1, false, true, mock(MessageFactory.class), new HibernateConfigImpl(dir), this.testRunObserver)) {
final ListMultimap<String, String> multimap = ArrayListMultimap.create();

final CommunicationContext headerContext = getCommunicationContext(multimap);

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "3"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "2"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Message message = new Message(
CommunicationLog.Direction.INBOUND,
CommunicationLog.MessageType.REQUEST,
headerContext,
messageStorage)) {
message.write(String.format(
BASE_MESSAGE_STRING, "action", String.format(SEQUENCE_ID_METRIC_BODY_STRING, "3", "1"))
.getBytes(StandardCharsets.UTF_8));
}
messageStorage.flush();

try (final Stream<String> sequenceIdStream = messageStorage.getUniqueSequenceIds()) {
assertEquals(List.of("urn:uuid:3", "urn:uuid:2", "urn:uuid:1"), sequenceIdStream.toList());
}

try (final MessageStorage.GetterResult<MessageContent> inboundMessages =
messageStorage.getInboundMessages()) {
assertEquals(3, inboundMessages.getStream().count());
}
}
}

private static @NotNull CommunicationContext getCommunicationContext(final ListMultimap<String, String> multimap)
throws CertificateException, IOException {
final String transactionId = "transactionId";
final String requestUri = "requestUri";

final X509Certificate certificate = CertificateUtil.getDummyCert();
return new CommunicationContext(
new HttpApplicationInfo(multimap, transactionId, requestUri),
new TransportInfo(
Constants.HTTPS_SCHEME, null, null, null, null, Collections.singletonList(certificate)),
null);
}

/**
* Tests whether headers and the transaction id are stored properly.
*
Expand Down

0 comments on commit 60dc36b

Please sign in to comment.