Skip to content

Commit

Permalink
Merge branch 'release/2020.16.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
sfeilmeier committed Jul 30, 2020
2 parents 9262b73 + bed8ec0 commit 2beb272
Show file tree
Hide file tree
Showing 80 changed files with 3,123 additions and 2,642 deletions.
10 changes: 6 additions & 4 deletions cnf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@
<version>3.6.1</version>
</dependency>
<dependency>
<!-- Apache Felix Configuration Admin Service -->
<!-- Changelog: https://github.com/apache/felix-dev/blob/master/configadmin/changelog.txt -->
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.configadmin</artifactId>
<version>1.9.16</version>
<version>1.9.18</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
Expand All @@ -136,7 +138,7 @@
<!-- Changelog: https://github.com/apache/felix-dev/commits/master/http -->
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.http.jetty</artifactId>
<version>4.0.18</version>
<version>4.0.20</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
Expand All @@ -155,7 +157,7 @@
<!-- Changelog: https://github.com/apache/felix-dev/blob/master/webconsole/changelog.txt -->
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.webconsole</artifactId>
<version>4.5.2</version>
<version>4.5.4</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
Expand All @@ -175,7 +177,7 @@
<dependency>
<groupId>org.dhatim</groupId>
<artifactId>fastexcel</artifactId>
<version>0.10.15</version>
<version>0.10.16</version>
</dependency>
<dependency>
<groupId>org.eclipse.platform</groupId>
Expand Down
2 changes: 1 addition & 1 deletion doc/modules/ROOT/pages/single_document.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
= OpenEMS - Open Energy Management System
ifndef::toc[]
(c) 2020 OpenEMS Association e.V.
Version 2020.15.0
Version 2020.16.0
:sectnums:
:sectnumlevels: 4
:toc:
Expand Down
4 changes: 2 additions & 2 deletions io.openems.backend.application/BackendApp.bndrun
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@
io.openems.wrapper.opczip;version=snapshot,\
org.apache.commons.commons-fileupload;version='[1.4.0,1.4.1)',\
org.apache.commons.io;version='[2.6.0,2.6.1)',\
org.apache.felix.configadmin;version='[1.9.16,1.9.17)',\
org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
org.apache.felix.http.jetty;version='[4.0.18,4.0.19)',\
org.apache.felix.http.servlet-api;version='[1.1.2,1.1.3)',\
org.apache.felix.inventory;version='[1.0.6,1.0.7)',\
org.apache.felix.metatype;version='[1.2.2,1.2.3)',\
org.apache.felix.scr;version='[2.1.20,2.1.21)',\
org.apache.felix.webconsole;version='[4.5.2,4.5.3)',\
org.apache.felix.webconsole;version='[4.5.4,4.5.5)',\
org.apache.felix.webconsole.plugins.ds;version='[2.1.0,2.1.1)',\
org.apache.servicemix.bundles.ws-commons-util;version='[1.0.2,1.0.3)',\
org.apache.servicemix.bundles.xmlrpc-client;version='[3.1.3,3.1.4)',\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public String id() {
}

public int index() {
return queryIndex;
return this.queryIndex;
}

public boolean isQuery() {
return query;
return this.query;
}
}

Expand Down Expand Up @@ -124,11 +124,11 @@ public String id() {
}

public int index() {
return queryIndex;
return this.queryIndex;
}

public boolean isQuery() {
return query;
return this.query;
}
}

Expand Down Expand Up @@ -170,11 +170,11 @@ public String id() {
}

public int index() {
return queryIndex;
return this.queryIndex;
}

public boolean isQuery() {
return query;
return this.query;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.openems.backend.metadata.odoo.Field;
import io.openems.backend.metadata.odoo.Field.EdgeDevice;
import io.openems.backend.metadata.odoo.MyEdge;
import io.openems.backend.metadata.odoo.postgres.task.UpdateEdgeStatesSum;

public class InitializeEdgesWorker {

Expand All @@ -38,7 +37,7 @@ public InitializeEdgesWorker(PostgresHandler parent, HikariDataSource dataSource
}

public synchronized void start() {
this.executor.execute(() -> task.accept(this));
this.executor.execute(() -> this.task.accept(this));
}

public synchronized void stop() {
Expand Down Expand Up @@ -88,7 +87,7 @@ public synchronized void stop() {
MyEdge edge = self.parent.edgeCache.addOrUpate(rs);

// Trigger update to Edge States Sum
self.parent.getQueueWriteWorker().addTask(new UpdateEdgeStatesSum(edge.getOdooId()));
self.parent.getPeriodicWriteWorker().triggerUpdateEdgeStatesSum(edge);
} catch (Exception e) {
self.parent.logError(this.log,
"Unable to read Edge: " + e.getClass().getSimpleName() + ". " + e.getMessage());
Expand Down Expand Up @@ -119,7 +118,7 @@ private PreparedStatement psQueryAllEdges(Connection connection) throws SQLExcep
}

/**
* UPDATE {} SET openems_is_connected = FALSE;
* UPDATE {} SET openems_is_connected = FALSE;.
*
* @return the PreparedStatement
* @throws SQLException on error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
Expand All @@ -19,13 +21,19 @@
import io.openems.backend.metadata.odoo.Field;
import io.openems.backend.metadata.odoo.Field.EdgeDevice;
import io.openems.backend.metadata.odoo.MyEdge;
import io.openems.backend.metadata.odoo.postgres.task.UpdateEdgeStatesSum;

/**
* This worker combines writes to lastMessage and lastUpdate fields, to avoid
* DDOSing Odoo/Postgres by writing too often.
*/
public class PeriodicWriteWorker {

/**
* DEBUG_MODE activates printing of reqular statistics about queued tasks.
*/
private static final boolean DEBUG_MODE = true;

private static final int UPDATE_INTERVAL_IN_SECONDS = 60;

private final Logger log = LoggerFactory.getLogger(PeriodicWriteWorker.class);
Expand All @@ -49,7 +57,7 @@ public PeriodicWriteWorker(PostgresHandler parent, HikariDataSource dataSource)

public synchronized void start() {
this.future = this.executor.scheduleWithFixedDelay(//
() -> task.accept(this.dataSource), //
() -> this.task.accept(this.dataSource), //
UPDATE_INTERVAL_IN_SECONDS, UPDATE_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}

Expand All @@ -61,25 +69,30 @@ public synchronized void stop() {
// Shutdown executor
if (this.executor != null) {
try {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
this.executor.shutdown();
this.executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
this.parent.logWarn(this.log, "tasks interrupted");
} finally {
if (!executor.isTerminated()) {
if (!this.executor.isTerminated()) {
this.parent.logWarn(this.log, "cancel non-finished tasks");
}
executor.shutdownNow();
this.executor.shutdownNow();
}
}
}

private Consumer<HikariDataSource> task = (dataSource) -> {
try {
if (DEBUG_MODE) {
this.debugLog();
}

this.writeLastMessage(dataSource);
this.writeLastUpdate(dataSource);
this.writeIsOnline(dataSource);
this.writeIsOffline(dataSource);
this.updateEdgeStatesSum(dataSource);

} catch (SQLException e) {
this.log.error("Unable to execute WriteWorker task: " + e.getMessage());
Expand All @@ -90,13 +103,35 @@ public synchronized void stop() {
private final Set<Integer> lastUpdateOdooIds = new HashSet<>();
private final Set<Integer> isOnlineOdooIds = new HashSet<>();
private final Set<Integer> isOfflineOdooIds = new HashSet<>();
private final Set<Integer> updateEdgeStatesSum = new HashSet<>();

public void onLastMessage(MyEdge edge) {
synchronized (this.lastMessageOdooIds) {
this.lastMessageOdooIds.add(edge.getOdooId());
}
}

public void triggerUpdateEdgeStatesSum(MyEdge edge) {
synchronized (this.updateEdgeStatesSum) {
this.updateEdgeStatesSum.add(edge.getOdooId());
}
}

private void updateEdgeStatesSum(HikariDataSource dataSource) throws SQLException {
Set<Integer> edgeIds;
synchronized (this.updateEdgeStatesSum) {
edgeIds = new HashSet<>(this.updateEdgeStatesSum);
}

try {
new UpdateEdgeStatesSum(edgeIds).execute(dataSource);
} catch (SQLException e) {
this.parent.logWarn(this.log,
"Unable to execute Task. " + this.task.getClass().getSimpleName() + ": " + e.getMessage());
e.printStackTrace();
}
}

private void writeIsOffline(HikariDataSource dataSource) throws SQLException {
synchronized (this.isOfflineOdooIds) {
if (!this.isOfflineOdooIds.isEmpty()) {
Expand Down Expand Up @@ -202,4 +237,18 @@ public void isOffline(MyEdge edge) {
}
}

/*
* From here required for DEBUG_MODE
*/
private LocalDateTime lastExecute = null;

private synchronized void debugLog() {
LocalDateTime now = LocalDateTime.now();
if (this.lastExecute != null) {
this.parent.logInfo(this.log, "PeriodicWriteWorker. " //
+ "Time since last run: [" + ChronoUnit.SECONDS.between(this.lastExecute, now) + "s]" //
);
}
this.lastExecute = now;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand All @@ -18,8 +20,7 @@
import io.openems.backend.metadata.odoo.EdgeCache;
import io.openems.backend.metadata.odoo.MetadataOdoo;
import io.openems.backend.metadata.odoo.MyEdge;
import io.openems.backend.metadata.odoo.postgres.task.InsertOrUpdateDeviceState;
import io.openems.backend.metadata.odoo.postgres.task.UpdateEdgeStatesSum;
import io.openems.backend.metadata.odoo.postgres.task.InsertOrUpdateDeviceStates;
import io.openems.common.channel.Level;
import io.openems.common.types.ChannelAddress;
import io.openems.common.types.EdgeConfig.Component.Channel;
Expand All @@ -41,13 +42,13 @@ public PostgresHandler(MetadataOdoo parent, EdgeCache edgeCache, Config config,
this.parent = parent;
this.edgeCache = edgeCache;
this.dataSource = this.getDataSource(config);
this.initializeEdgesWorker = new InitializeEdgesWorker(this, dataSource, () -> {
this.initializeEdgesWorker = new InitializeEdgesWorker(this, this.dataSource, () -> {
onInitialized.run();
});
this.initializeEdgesWorker.start();
this.periodicWriteWorker = new PeriodicWriteWorker(this, dataSource);
this.periodicWriteWorker = new PeriodicWriteWorker(this, this.dataSource);
this.periodicWriteWorker.start();
this.queueWriteWorker = new QueueWriteWorker(this, dataSource);
this.queueWriteWorker = new QueueWriteWorker(this, this.dataSource);
this.queueWriteWorker.start();
}

Expand Down Expand Up @@ -77,6 +78,7 @@ public synchronized void updateDeviceStates(MyEdge edge, Map<ChannelAddress, Cha
/*
* Update the EdgeDeviceState table
*/
List<InsertOrUpdateDeviceStates.DeviceState> deviceStates = new ArrayList<>();
for (Entry<ChannelAddress, Channel> entry : activeStateChannels.entrySet()) {
ChannelDetail detail = entry.getValue().getDetail();
if (!(detail instanceof ChannelDetailState)) {
Expand All @@ -91,15 +93,16 @@ public synchronized void updateDeviceStates(MyEdge edge, Map<ChannelAddress, Cha
} else {
stateChannelName = channel.getId();
}
InsertOrUpdateDeviceState task = new InsertOrUpdateDeviceState(edge.getOdooId(), channelAddress, level,
stateChannelName, Timestamp.valueOf(LocalDateTime.now(ZoneOffset.UTC)));

// Add this StateChannel to the write queue
this.queueWriteWorker.addTask(task);
deviceStates.add(new InsertOrUpdateDeviceStates.DeviceState(channelAddress, level, stateChannelName));
}

// Add "UpdateEdgeStates" task to write queue
this.queueWriteWorker.addTask(new UpdateEdgeStatesSum(edge.getOdooId()));
// Add this Task to the write queue
InsertOrUpdateDeviceStates task = new InsertOrUpdateDeviceStates(edge.getOdooId(),
Timestamp.valueOf(LocalDateTime.now(ZoneOffset.UTC)), deviceStates);
this.queueWriteWorker.addTask(task);

// Update Sum-State from time to time
this.periodicWriteWorker.triggerUpdateEdgeStatesSum(edge);
}

public PeriodicWriteWorker getPeriodicWriteWorker() {
Expand Down
Loading

0 comments on commit 2beb272

Please sign in to comment.