forked from flobz/psa_car_controller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
charge_control.py
151 lines (132 loc) · 5.99 KB
/
charge_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import json
import threading
from copy import copy
from datetime import datetime, timedelta
from hashlib import md5
from time import sleep
import pytz
from my_psacc import MyPSACC
from mylogger import logger
DISCONNECTED = "Disconnected"
INPROGRESS = "InProgress"
FAILURE = "Failure"
STOPPED = "Stopped"
FINISHED = "Finished"
class ChargeControl:
MQTT_TIMEOUT = 60
def __init__(self, psacc: MyPSACC, vin, percentage_threshold, stop_hour):
self.vin = vin
self.percentage_threshold = percentage_threshold
self.set_stop_hour(stop_hour)
self.psacc = psacc
self.retry_count = 0
self.wakeup_timeout = 10
def set_stop_hour(self, stop_hour):
if stop_hour is None or stop_hour == [0, 0]:
self._stop_hour = None
self._next_stop_hour = None
else:
self._stop_hour = stop_hour
self._next_stop_hour = datetime.now().replace(hour=stop_hour[0], minute=stop_hour[1], second=0)
if self._next_stop_hour < datetime.now():
self._next_stop_hour += timedelta(days=1)
def get_stop_hour(self):
return self._stop_hour
def control_charge_with_ack(self, charge: bool):
self.psacc.charge_now(self.vin, charge)
self.retry_count += 1
sleep(ChargeControl.MQTT_TIMEOUT)
vehicle_status = self.psacc.get_vehicle_info(self.vin)
status = vehicle_status.get_energy('Electric').charging.status
if status in (FINISHED, DISCONNECTED):
logger.warning("Car state isn't compatible with charging %s", status)
if (status == INPROGRESS) != charge:
logger.warning("retry to control the charge of %s", self.vin)
self.psacc.charge_now(self.vin, charge)
self.retry_count += 1
return False
self.retry_count = 0
return True
def force_update(self, quick_refresh):
# force update if the car doesn't send info during 10 minutes
last_update = self.psacc.vehicles_list.get_car_by_vin(self.vin).get_status().get_energy('Electric').updated_at
if quick_refresh:
wakeup_timeout = self.wakeup_timeout/2
else:
wakeup_timeout = self.wakeup_timeout
if (datetime.utcnow().replace(tzinfo=pytz.UTC) - last_update).total_seconds() > 60 * wakeup_timeout:
self.psacc.wakeup(self.vin)
def process(self):
now = datetime.now()
try:
vehicle_status = self.psacc.vehicles_list.get_car_by_vin(self.vin).get_status()
status = vehicle_status.get_energy('Electric').charging.status
level = vehicle_status.get_energy('Electric').level
logger.info("charging status of %s is %s, battery level: %d", self.vin, status, level)
if status == "InProgress" and self.percentage_threshold < 100:
charging_mode = vehicle_status.get_energy('Electric').charging.charging_mode
quick_refresh = isinstance(charging_mode, str) and charging_mode == "Quick"
self.force_update(quick_refresh)
if level >= self.percentage_threshold and self.retry_count < 2:
logger.info("Charge threshold is reached, stop the charge")
self.control_charge_with_ack(False)
elif self._next_stop_hour is not None:
if self._next_stop_hour < now:
self._next_stop_hour += timedelta(days=1)
logger.info("it's time to stop the charge")
self.control_charge_with_ack(False)
else:
next_in_second = (self._next_stop_hour - now).total_seconds()
if next_in_second < self.psacc.info_refresh_rate:
periodicity = next_in_second
thread = threading.Timer(periodicity, self.process)
thread.setDaemon(True)
thread.start()
else:
if self._next_stop_hour is not None and self._next_stop_hour < now:
self._next_stop_hour += timedelta(days=1)
self.retry_count = 0
except (AttributeError, ValueError):
logger.exception("Probably can't retrieve all information from API:")
except: # pylint: disable=bare-except
logger.exception("Charge control:")
def get_dict(self):
chd = copy(self.__dict__)
chd.pop("psacc")
return chd
class ChargeControls(dict):
def __init__(self, file_name="charge_config.json"):
self.file_name = file_name
super().__init__()
self._config_hash = None
def save_config(self, force=False):
chd = {}
charge_control: ChargeControl
for charge_control in self.values():
chd[charge_control.vin] = {"percentage_threshold": charge_control.percentage_threshold,
"stop_hour": charge_control.get_stop_hour()}
config_str = json.dumps(chd, sort_keys=True, indent=4).encode('utf-8')
new_hash = md5(config_str).hexdigest()
if force or self._config_hash != new_hash:
with open(self.file_name, "wb") as f:
f.write(config_str)
self._config_hash = new_hash
logger.info("save config change")
@staticmethod
def load_config(psacc: MyPSACC, name="charge_config.json"):
with open(name, "r") as file:
config_str = file.read()
chd = json.loads(config_str)
charge_control_list = ChargeControls(name)
for vin, params in chd.items():
charge_control_list[vin] = ChargeControl(psacc, vin, **params)
return charge_control_list
def get(self, vin) -> ChargeControl:
try:
return self[vin]
except KeyError:
pass
return None
def init(self):
for charge_control in self.values():
charge_control.psacc.info_callback.append(charge_control.process)