Skip to content

Commit

Permalink
Merge pull request #35 from dqops/1.6.2
Browse files Browse the repository at this point in the history
1.6.2
  • Loading branch information
dqops authored Jul 28, 2024
2 parents d6314e2 + 770d02f commit 0995a9d
Show file tree
Hide file tree
Showing 14 changed files with 40 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .run/dqo run.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-1.6.1.jar" />
<option name="JAR_PATH" value="$PROJECT_DIR$/dqops/target/dqo-dqops-1.6.2.jar" />
<option name="VM_PARAMETERS" value="-XX:MaxRAMPercentage=60.0 --add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED" />
<option name="PROGRAM_PARAMETERS" value="--server.port=8888" />
<option name="WORKING_DIRECTORY" value="C:\dev\dqoado" />
Expand Down
7 changes: 2 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
# 1.6.1
* Incident notification supports emails
* Small bug fixes in the check editor
* Additional check to detect empty columns
* Copying data quality check patterns
# 1.6.2
* Race condition in the table status cache fixed.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.1
1.6.2
2 changes: 1 addition & 1 deletion distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.dqops</groupId>
<artifactId>dqo-distribution</artifactId>
<version>1.6.1</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>1.6.2</version> <!-- DQOps Version, do not touch (changed automatically) -->
<name>dqo-distribution</name>
<description>DQOps Data Quality Operations Center final assembly</description>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions distribution/python/dqops/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# limit

# WARNING: the next two lines with the version numbers (VERSION =, PIP_VERSION =) should not be modified manually. They are changed by a maven profile at compile time.
VERSION = "1.6.1"
PIP_VERSION = "1.6.1"
VERSION = "1.6.2"
PIP_VERSION = "1.6.2"
GITHUB_RELEASE = "v" + VERSION + ""
JAVA_VERSION = "17"

Expand Down
2 changes: 1 addition & 1 deletion dqo
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

export DQO_VERSION=1.6.1
export DQO_VERSION=1.6.2

# Configure local development environment overrides
if [ -f $(dirname $0)/set-dqo-envs.sh ]; then
Expand Down
2 changes: 1 addition & 1 deletion dqo.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@REM limitations under the License.
@REM

set DQO_VERSION=1.6.1
set DQO_VERSION=1.6.2

rem Configure local development environment overrides
if exist "%~dp0set-dqo-envs.cmd" (
Expand Down
2 changes: 1 addition & 1 deletion dqops/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</parent>
<groupId>com.dqops</groupId>
<artifactId>dqo-dqops</artifactId>
<version>1.6.1</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>1.6.2</version> <!-- DQOps Version, do not touch (changed automatically) -->
<packaging>jar</packaging>
<name>dqo-dqops</name>
<description>DQOps Data Quality Operations Center</description>
Expand Down
2 changes: 1 addition & 1 deletion dqops/src/main/frontend/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "DQOps",
"version": "1.6.1",
"version": "1.6.2",
"private": true,
"dependencies": {
"@codemirror/lang-python": "6.1.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class TableStatusCacheImpl implements TableStatusCache {
private boolean started;
private Sinks.Many<CurrentTableStatusKey> loadTableStatusRequestSink;
private Disposable subscription;
private Sinks.EmitFailureHandler emitFailureHandlerPublisher;
private int queuedOperationsCount;
private final Object lock = new Object();
private CompletableFuture<Integer> queueEmptyFuture;
Expand All @@ -86,12 +85,19 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio
this.dqoQueueConfigurationProperties = dqoQueueConfigurationProperties;
this.checkResultsDataService = checkResultsDataService;
this.userDomainIdentityFactory = userDomainIdentityFactory;
this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);
}

/**
* Creates a failure handler with a new duration.
* @return Failure handler.
*/
protected Sinks.EmitFailureHandler createFailureHandler() {
return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
}

/**
* The operation that is called to create a new table entry for the cache and queue a table load operation.
* @param tableStatusKey Table status key.
Expand All @@ -100,7 +106,7 @@ public TableStatusCacheImpl(DqoCacheConfigurationProperties dqoCacheConfiguratio
protected CurrentTableStatusCacheEntry loadEntryCore(CurrentTableStatusKey tableStatusKey) {
CurrentTableStatusCacheEntry currentTableStatusCacheEntry = new CurrentTableStatusCacheEntry(tableStatusKey, CurrentTableStatusEntryStatus.LOADING_QUEUED);
if (this.loadTableStatusRequestSink != null) {
this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher);
this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
return currentTableStatusCacheEntry;
Expand Down Expand Up @@ -165,7 +171,7 @@ public void invalidateTableStatus(CurrentTableStatusKey tableStatusKey, boolean

currentTableStatusCacheEntry.setStatus(CurrentTableStatusEntryStatus.REFRESH_QUEUED);
if (this.loadTableStatusRequestSink != null) {
this.loadTableStatusRequestSink.emitNext(tableStatusKey, this.emitFailureHandlerPublisher);
this.loadTableStatusRequestSink.emitNext(tableStatusKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
}
Expand Down Expand Up @@ -281,10 +287,11 @@ public void start() {
int concurrency = Runtime.getRuntime().availableProcessors();
this.subscription = requestLoadFlux.subscribeOn(Schedulers.boundedElastic())
.flatMap(list -> Flux.fromIterable(list)) // single thread forwarder
.parallel(concurrency)
.flatMap(tableKey -> {
onRequestLoadTableStatus(tableKey);
return Mono.empty();
}, concurrency, concurrency * 2)
})
.subscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class LabelsIndexerImpl implements LabelsIndexer {
private boolean started;
private Sinks.Many<LabelRefreshKey> loadObjectRequestSink;
private Disposable subscription;
private Sinks.EmitFailureHandler emitFailureHandlerPublisher;
private int queuedOperationsCount;
private final Object lock = new Object();
private CompletableFuture<Integer> queueEmptyFuture;
Expand Down Expand Up @@ -97,12 +96,19 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr
this.userHomeContextFactory = userHomeContextFactory;
this.userDomainIdentityFactory = userDomainIdentityFactory;
this.globalLabelsContainer = globalLabelsContainer;
this.emitFailureHandlerPublisher = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);
}

/**
* Creates a failure handler with a new duration.
* @return Failure handler.
*/
protected Sinks.EmitFailureHandler createFailureHandler() {
return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(
this.dqoQueueConfigurationProperties.getPublishBusyLoopingDurationSeconds()));
}

/**
* The operation that is called to request loading labels for a new entry (connection or table).
* @param targetKey Entry key.
Expand All @@ -111,7 +117,7 @@ public LabelsIndexerImpl(DqoCacheConfigurationProperties dqoCacheConfigurationPr
protected LabelsLoadEntry loadEntryCore(LabelRefreshKey targetKey) {
LabelsLoadEntry labelsLoadEntry = new LabelsLoadEntry(targetKey, LabelRefreshStatus.LOADING_QUEUED);
if (this.loadObjectRequestSink != null) {
this.loadObjectRequestSink.emitNext(targetKey, this.emitFailureHandlerPublisher);
this.loadObjectRequestSink.emitNext(targetKey, createFailureHandler());
incrementAwaitingOperationsCount();
}
return labelsLoadEntry;
Expand All @@ -137,7 +143,7 @@ public void invalidateObject(LabelRefreshKey targetLabelsKey, boolean replacingC
}

currentTableStatusCacheEntry.setStatus(LabelRefreshStatus.REFRESH_QUEUED);
this.loadObjectRequestSink.emitNext(targetLabelsKey, this.emitFailureHandlerPublisher);
this.loadObjectRequestSink.emitNext(targetLabelsKey, createFailureHandler());
incrementAwaitingOperationsCount();
}

Expand Down Expand Up @@ -342,10 +348,11 @@ public void start() {
int concurrency = Runtime.getRuntime().availableProcessors();
this.subscription = requestLoadFlux
.flatMap((List<LabelRefreshKey> targetKeys) -> Mono.just(targetKeys)) // single thread forwarder
.parallel(concurrency)
.flatMap((List<LabelRefreshKey> targetKeys) -> {
onRequestLoadLabelsForObjects(targetKeys);
return Mono.empty();
}, concurrency, concurrency * 2)
})
.subscribe();
}

Expand Down
2 changes: 1 addition & 1 deletion dqops/src/main/resources/banner.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
| |) | | (_) | | (_) | | '_ \ (_-<
|___/ \__\_\ \___/ | .__/ /__/
|_|
:: DQOps Data Quality Operations Center :: (v1.6.1)
:: DQOps Data Quality Operations Center :: (v1.6.2)
2 changes: 1 addition & 1 deletion lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.dqops</groupId>
<artifactId>dqo-lib</artifactId>
<version>1.6.1</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>1.6.2</version> <!-- DQOps Version, do not touch (changed automatically) -->
<name>lib</name>
<description>POM for a list of dependencies to libraries that should be distributed in the "lib" folder, especially all JDBC drivers.</description>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.dqops</groupId>
<artifactId>dqo-data-quality-observer</artifactId>
<version>1.6.1</version> <!-- DQOps Version, do not touch (changed automatically) -->
<version>1.6.2</version> <!-- DQOps Version, do not touch (changed automatically) -->
<packaging>pom</packaging>
<description>DQOps Data Quality Operations Center</description>

Expand Down

0 comments on commit 0995a9d

Please sign in to comment.