forked from hormones/submission_telebot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
188 lines (135 loc) · 6.06 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import logging
import sqlite3
import typing
from base import util
from base.types import Chat, Submission
_conn: sqlite3.Connection = None
def init():
global _conn
_conn = sqlite3.connect('appdata/submission_telebot.db')
table_exists = _conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='chat';").fetchone()
if table_exists is None:
logging.info('create table --> submission chat')
_conn.execute('''CREATE TABLE submission
(id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
offset INTEGER DEFAULT 1,
anonymous INTEGER DEFAULT 1,
approval_id INTEGER,
approver_id INTEGER,
approval_result INTEGER,
approval_reply_id INTEGER,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP);''')
# status 1/0 normal/banned
_conn.execute('''CREATE TABLE chat
(chat_id INTEGER PRIMARY KEY,
status INTEGER DEFAULT 1,
lang_code TEXT,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP);''')
logging.info(f'=== database initialized ===')
def close():
_conn.close()
def _insert(table: str, obj: dict, type: str = None):
logging.debug(f'INSERT {type} --> table: {table} obj: {util.dumps(obj)}')
type = 'OR ' + type.upper() if type else ''
data = {k: v for k, v in obj.items() if k != 'create_time' and v != None}
fields = ", ".join(data.keys())
placeholder = ", ".join(['?' for k in data.keys()])
params = list(data.values())
cursor = _conn.cursor()
sql = f"INSERT {type} INTO {table} ({fields}) VALUES ({placeholder})"
id = cursor.execute(sql, params).lastrowid
_conn.commit()
cursor.close()
return id
def _update(table: str, obj: dict, key='id'):
# filter parameters: id and create_time and None-value
data = {k: v for k, v in obj.items() if (k != key and k != 'create_time' and v != None)}
set_stmts = [f'{k} = ?' for k in data.keys()]
params = list(data.values())
params.append(obj[key])
sql = f"UPDATE {table} SET {', '.join(set_stmts)} WHERE {key} = ?"
logging.debug(f'update {table} --> sql: {sql}, data: {util.dumps(obj)}')
cursor = _conn.cursor()
rowcount = cursor.execute(sql, params).rowcount
_conn.commit()
if not rowcount:
logging.warning(f'update failed, table: {table}, {key}: {obj[key]}, number of rows affected: {rowcount}')
cursor.close()
def _query_one_by_field(table: str, field='id', value=None) -> tuple:
logging.debug(f'query data: {table}.{field} = {value}')
cursor = _conn.cursor()
result = cursor.execute(f"SELECT * FROM {table} WHERE {field}=?", (value,)).fetchone()
cursor.close()
return result
def _query_all(table: str) -> tuple:
logging.debug(f'query all: {table}')
cursor = _conn.cursor()
result = cursor.execute(f"SELECT * FROM {table} ORDER BY create_time").fetchall()
cursor.close()
return result
def _exists(table: str, field='id', value=None) -> bool:
logging.debug(f'query data exists: {table}.{field} = {value}')
cursor = _conn.cursor()
row = cursor.execute(f"SELECT count(1) FROM {table} WHERE {field}=?", (value,)).fetchone()
cursor.close()
return True if row[0] else False
def submission_insert_replace(obj: object):
return _insert('submission', obj.__dict__, 'replace')
def submission_insert_ignore(obj: object):
return _insert('submission', obj.__dict__, 'ignore')
def submission_update(obj: object):
_update('submission', obj.__dict__, 'id')
def submission_exists_by_approval_id(approval_id):
return _exists('submission', 'approval_id', approval_id)
def submission_update_by_approval_id(dic: dict):
_update('submission', dic, 'approval_id')
def submission_query(id) -> Submission:
row = _query_one_by_field('submission', 'id', id)
return Submission(*row) if row else None
def submission_query_by_approval_id(approval_id) -> Submission:
row = _query_one_by_field('submission', 'approval_id', approval_id)
return Submission(*row) if row else None
def submission_query_all() -> typing.List[Submission]:
rows = _query_all('submission')
return [Submission(*row) for row in rows]
def chat_code_query(chat_id=None) -> str:
chat = chat_query(chat_id)
return chat.lang_code if chat else None
def chat_banned(chat_id=None) -> str:
chat = chat_query(chat_id)
if not chat:
return False
return False if chat.status else True
def chats_count_banned() -> int:
logging.debug(f'count chats banned')
cursor = _conn.cursor()
row = cursor.execute(f"SELECT count(1) FROM chat WHERE status=0").fetchone()
cursor.close()
return row[0]
def chats_banned(page_number=1, page_size=10) -> typing.List[Chat]:
data = []
offset = (page_number - 1) * page_size
logging.debug(f'query chats banned')
cursor = _conn.cursor()
total = cursor.execute(f"SELECT count(1) FROM chat WHERE status=0").fetchone()[0]
if total and offset < total:
rows = cursor.execute(f"SELECT * FROM chat WHERE status=0 ORDER BY create_time LIMIT {page_size} OFFSET {offset}").fetchall()
data = [Chat(*row) for row in rows]
cursor.close()
return (total, data)
def chat_query(chat_id=None) -> Chat:
row = _query_one_by_field('chat', 'chat_id', chat_id)
return Chat(*row) if row else None
def chat_insert_replace(obj: Chat) -> int:
return _insert('chat', obj.__dict__, 'replace')
def chat_insert_ignore(obj: Chat) -> int:
return _insert('chat', obj.__dict__, 'ignore')
def chat_update(obj: Chat):
_update('chat', obj.__dict__, 'chat_id')
def chat_query_all() -> typing.List[Chat]:
rows = _query_all('chat')
return [Chat(*row) for row in rows]
def chat_exists(chat_id) -> bool:
return _exists('chat', 'chat_id', chat_id)