forked from henne49/dbus-opendtu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbus-opendtu.py
157 lines (131 loc) · 5.2 KB
/
dbus-opendtu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
'''module to read data from dtu/template and show in VenusOS'''
# File specific rules
# pylint: disable=broad-except
# system imports:
import logging
import logging.handlers
import os
import configparser
import sys
# our imports:
import constants
import tests
from helpers import *
# Victron imports:
from dbus_service import DbusService
if sys.version_info.major == 2:
import gobject # pylint: disable=E0401
else:
from gi.repository import GLib as gobject # pylint: disable=E0401
def main():
'''main loop'''
# configure logging
config = configparser.ConfigParser()
config.read(f"{(os.path.dirname(os.path.realpath(__file__)))}/config.ini")
logging_level = config["DEFAULT"]["Logging"].upper()
dtuvariant = config["DEFAULT"]["DTU"]
try:
number_of_templates = int(config["DEFAULT"]["NumberOfTemplates"])
except Exception:
number_of_templates = 0
log_rotate_handler = logging.handlers.RotatingFileHandler(
maxBytes=5*1024*1024*10,
backupCount=2,
encoding=None,
delay=0,
filename="%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))
)
logging.basicConfig(
format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging_level,
handlers=[
logging.StreamHandler(),
log_rotate_handler
],
)
tests.run_tests()
try:
logging.info("Start")
from dbus.mainloop.glib import DBusGMainLoop # pylint: disable=E0401,C0415
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
# region formatting
def _kwh(_p, value: float) -> str:
return f"{round(value, 2)}KWh"
def _a(_p, value: float) -> str:
return f"{round(value, 1)}A"
def _w(_p, value: float) -> str:
return f"{round(value, 1)}W"
def _v(_p, value: float) -> str:
return f"{round(value, 1)}V"
# endregion
paths = {
"/Ac/Energy/Forward": {
"initial": None,
"textformat": _kwh,
}, # energy produced by pv inverter
"/Ac/Power": {"initial": None, "textformat": _w},
"/Ac/L1/Voltage": {"initial": None, "textformat": _v},
"/Ac/L2/Voltage": {"initial": None, "textformat": _v},
"/Ac/L3/Voltage": {"initial": None, "textformat": _v},
"/Ac/L1/Current": {"initial": None, "textformat": _a},
"/Ac/L2/Current": {"initial": None, "textformat": _a},
"/Ac/L3/Current": {"initial": None, "textformat": _a},
"/Ac/L1/Power": {"initial": None, "textformat": _w},
"/Ac/L2/Power": {"initial": None, "textformat": _w},
"/Ac/L3/Power": {"initial": None, "textformat": _w},
"/Ac/L1/Energy/Forward": {"initial": None, "textformat": _kwh},
"/Ac/L2/Energy/Forward": {"initial": None, "textformat": _kwh},
"/Ac/L3/Energy/Forward": {"initial": None, "textformat": _kwh},
"/Ac/Out/L1/I": {"initial": None, "textformat": _a},
"/Ac/Out/L1/V": {"initial": None, "textformat": _v},
"/Dc/0/Voltage": {"initial": None, "textformat": _v},
}
if dtuvariant != constants.DTUVARIANT_TEMPLATE:
logging.info("Registering dtu devices")
servicename = get_config_value(config, "Servicename", "INVERTER", 0, "com.victronenergy.pvinverter")
service = DbusService(
servicename=servicename,
paths=paths,
actual_inverter=0,
)
number_of_inverters = service.get_number_of_inverters()
if number_of_inverters > 1:
# start our main-service if there are more than 1 inverter
for actual_inverter in range(number_of_inverters - 1):
servicename = get_config_value(
config,
"Servicename",
"INVERTER",
actual_inverter + 1,
"com.victronenergy.pvinverter"
)
DbusService(
servicename=servicename,
paths=paths,
actual_inverter=actual_inverter + 1,
)
for actual_template in range(number_of_templates):
logging.info("Registering Templates")
servicename = get_config_value(
config,
"Servicename",
"TEMPLATE",
actual_template,
"com.victronenergy.pvinverter"
)
service = DbusService(
servicename=servicename,
paths=paths,
actual_inverter=actual_template,
istemplate=True,
)
logging.info("Connected to dbus, and switching over to gobject.MainLoop() (= event based)")
mainloop = gobject.MainLoop()
mainloop.run()
except Exception as error:
logging.critical("Error at %s", "main", exc_info=error)
if __name__ == "__main__":
main()