This repository has been archived by the owner on Feb 6, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
notion_wrapper.py
220 lines (188 loc) · 7.8 KB
/
notion_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from notion.client import NotionClient
from notion.block import CodeBlock
from datetime import datetime
import traceback
import json
import re
import subprocess
import os
import signal
import time
import threading
class NotionWrapper:
""" A wrapper object for easy interation with remote notion control page. """
child_process = {}
kill_now = False
def __init__(self, local_config_file_path):
self.__config = json.load(open(local_config_file_path))
self.__client = NotionClient(
token_v2=self.get_config("token"), monitor=True, start_monitoring=True
)
self.load_global_configs()
signal.signal(signal.SIGTERM, self.end_service)
def get_client(self):
return self.__client
def load_global_configs(self):
""" Retrieve remote config """
cv = self.get_table("global_configs")
for row in cv.get_rows():
self.set_config(row.name, row.data_type, row.value)
row.add_callback(self.config_callback, callback_id="config_callback")
def config_callback(self, record, changes):
if changes[0][0] == "prop_changed":
self.warn(
"Config changed: {}({})={}".format(
str(record.name), str(record.data_type), str(record.value)
)
)
self.set_config(record.name, record.data_type, record.value)
time.sleep(10)
def set_config(self, key: str, data_type: str, value: str):
""" Stores remote config in local memory """
if data_type == "int":
self.__config[key] = int(value)
elif data_type == "bool":
if value.lower() == "true":
self.__config[key] = True
else:
self.__config[key] = False
else:
if value.startswith("["):
self.__config[key] = re.findall(r"(?<=\().*(?=\))", value)[0]
else:
self.__config[key] = value
def get_config(self, key: str):
""" Retrieve local stored global configs """
return self.__config[key]
def get_table(self, table_name: str):
""" Retrieve notion table """
return self.get_table_ref(table_name).collection
def get_table_ref(self, table_name: str):
assert (
self.get_config(table_name) is not None
), "Table name not found in config."
return self.get_client().get_collection_view(self.get_config(table_name))
def clear_table(self, table_name: str):
cv_rows = self.get_table(table_name).get_rows()
for row in cv_rows:
row.remove()
time.sleep(1)
self.warn("{} rows removed".format(str(len(cv_rows))))
def debug(self, message: str, host: str = "Main", level: str = "Debug"):
self.__log(message, host, level)
def print(self, message: str, host: str = "Main", level: str = "Info"):
self.__log(message, host, level)
def warn(self, message: str, host: str = "Main", level: str = "Warning"):
self.__log(message, host, level)
def error(self, message: str, host: str = "Main", level: str = "Error"):
self.__log(message, host, level)
def __log(self, message: str, host: str, level: str):
""" Prints log to local console and remote notion log table """
try:
if self.get_config("debug"):
print("[{}][{}][{}][{}]: {}".format(str(threading.get_ident()), host, datetime.now(), level, message))
threading.Thread(target=self.__write_to_notion, args=(host,level,message,)).start()
except KeyError:
""" Raised due to config not loaded """
pass
except AssertionError as ae:
print(str(ae))
def __write_to_notion(self,host,level,message):
cv = self.get_table("log_table")
row = cv.add_row()
row.name = host
row.level = level
row.log_on = str(datetime.now())
row.message = message
def write_script(self, file_name: str, script_content: str):
try:
status = "Error"
write_mode = "w"
with open("task/" + file_name + ".py", write_mode) as f:
for block in script_content:
if type(block) == CodeBlock:
f.write(block.title)
if write_mode == "a":
f.write("\n\n#Second Code Block\n\n")
if write_mode == "w":
write_mode = "a"
f.close()
status = "Activated"
self.print(file_name + " task activated.")
return status
except Exception as e:
self.error(traceback.format_exc())
def run_script(self, file_name: str):
try:
if file_name in self.child_process:
self.error("Task already running.")
return "Running"
process = subprocess.Popen(
[
"python3",
os.getcwd() + "/task/" + file_name + ".py",
json.dumps(self.__config),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
encoding="utf-8",
errors="replace",
)
self.child_process[file_name] = process.pid
host = "[" + str(process.pid) + "] " + file_name
self.print("Running: " + host)
while True:
realtime_output = process.stdout.readline()
if realtime_output == "" and process.poll() is not None:
break
if realtime_output:
try:
output = json.loads(realtime_output.strip())
if output["level"] == "Debug":
self.debug(output["message"], host)
elif output["level"] == "Warning":
self.warn(output["message"], host)
elif output["level"] == "Error":
self.error(output["message"], host)
else:
self.print(output["message"], host)
except Exception:
self.error(realtime_output.strip(), host)
except FileExistsError as e:
self.error("Task script not found, Reactivate the script again.")
except Exception as e:
self.error(traceback.format_exc())
if self.child_process[file_name] is not None:
del self.child_process[file_name]
return "Completed"
def kill_script(self, file_name: str):
try:
if file_name in self.child_process:
self.warn(
"Terminating: "
+ file_name
+ " -> "
+ str(self.child_process[file_name])
)
os.kill(int(self.child_process[file_name]), signal.SIGTERM)
del self.child_process[file_name]
else:
self.warn("Process Id not found")
return "Running"
except Exception as e:
self.error(traceback.format_exc())
return "Completed"
def end_service(self, signum, frame):
if len(self.child_process.values()) > 0:
self.print("Ending service, terminating all child processes..")
for process in self.child_process.values():
os.kill(int(process), signal.SIGTERM)
cv = self.get_table("task_table")
for row in cv.get_rows():
if row.status == "Running":
row.status = "Completed"
if row.name == "Main":
row.remove()
self.warn("End of the program.")
self.kill_now = True