-
Notifications
You must be signed in to change notification settings - Fork 0
/
telegram_bot.py
1842 lines (1525 loc) · 94.3 KB
/
telegram_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# dependencies: [telegram_bot]
"""
Основная функция телеграмного чат-бота
todo: нужен ли group_is_donator?
"""
import math
import os
import subprocess
import time
import traceback
import telebot
from telebot import custom_filters
from telebot import types
import sqlite3
from datetime import datetime, timedelta
import pytz
from keyboards_telegram.create_keyboards import payload_to_callback, kb_prepod_schedule
from fuzzywuzzy import process
import logging
import toml
import sys
import pickle
from requests.exceptions import ReadTimeout, ConnectionError
from bot_functions.bots_common_funcs import read_calendar, read_table, get_day, \
compile_group_stats, add_user_to_table, get_exams, get_prepods, group_is_donator, \
add_user_to_anekdot, set_table_mode, get_tables_settings, get_donators, create_link_to_telegram
from bot_functions import attendance
from fun.anekdot import get_random_anekdot, get_random_toast
from fun.minigames import get_coin_flip_result, start_classical_rock_paper_scissors, \
stop_classical_rock_paper_scissors, classical_rock_paper_scissors
from shiza.databases_shiza_helper import change_user_group, create_database, change_user_additional_group, \
check_group_exists, add_donator_group
# init
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
logger.setLevel(logging.INFO)
logger.addHandler(console_handler)
try:
config = toml.load('configuration.toml')
except FileNotFoundError:
logger.critical('configuration.toml не найден!')
sys.exit()
path = config.get('Kiberded').get('path') # необходимо для корректной работы на сервере
token = config.get('Kiberded').get('token_telegram')
group_token = config.get('Kiberded').get('group_token')
tg_deeplink_token = config.get('Kiberded').get('deeplink_token_key')
days = [['Понедельник', 'Вторник', 'Среда', 'Четверг', 'Пятница', 'Суббота', 'Воскресенье'], [' (чёт)', ' (нечёт)']]
timetable = config.get('Kiberded').get('timetable')
admin_chat = config.get('Kiberded').get('telegram_admin_chat')
books_chat = config.get('Kiberded').get('telegram_books_chat')
dayofdaypics_chat = config.get('Kiberded').get('telegram_dayofdaypics_chat')
bot = telebot.TeleBot(token)
# /init
now_date = datetime.now().strftime('%Y-%m-%d') # необходимо для бэкапов сообщений
today = datetime.now(pytz.timezone('Europe/Moscow')).date() # необходимо для всяких расписаний,
# с учетом перезагрузки в 00:00
list_registered_users = set() # список зарегистрированных chat.id из group_ids.db для допуска к боту
list_unauthorized_users = set() # список зарегистрированных ТОЛЬКО в ТГ пользователей/групп
list_prepods = [] # список преподов из базы, нужно для поиска
# list_registered_groups = [] # список зарегистрированных chat.id из group_ids.db для допуска к боту
moderators = set() # лист админов и модераторов для добавления книжек и редактирования баз
admins = set() # лист только админов
groups = {}
class IsRegistered(custom_filters.SimpleCustomFilter): # фильтр для проверки регистрации юзера
key = 'is_registered'
@staticmethod
def check(message: telebot.types.Message, **kwargs):
return message.chat.id in list_registered_users
class IsModerator(custom_filters.SimpleCustomFilter): # фильтр для проверки модератора
key = 'is_moderator'
@staticmethod
def check(message: telebot.types.Message, **kwargs):
return message.chat.id in moderators
class IsAdmin(custom_filters.SimpleCustomFilter): # фильтр для проверки админа
key = 'is_admin'
@staticmethod
def check(message: telebot.types.Message, **kwargs):
return message.chat.id in admins
def create_backup_dir() -> int: # написана с учетом ежедневной перезагрузки в 00:00
"""
Создает папку path/messages_backups/date
:return: 0 если все ок
"""
folder = f'{path}messages_backup/{now_date}'
if not os.path.isdir(f'{folder}'):
os.mkdir(f'{folder}')
return 0
def dump_message(message, callback=False) -> int: # BIG Brother Is Watching You.
"""
Функция для записи всех сообщений в pickle-файлы, дабы они потом собирались в одну гигантскую БД и хранились
где-нибудь у админов. Не очень гуманно, но лучше так, чем никак.
:param message: непосредственно сообщение, которое записывается в pickle
:param callback: if True, то это callback_query, и формат файла будет call_.....
:return: 0 если все ок
"""
date = now_date
date_mes = message.date
chat_id = message.chat.id
message_id = message.message_id # чтобы точно задампились все сообщения, потому что могут быть удаленные и
# отправленные несколько раз в секунду
callback_str = 'call_output_' if callback else ''
with open(f'{path}messages_backup/{date}/{callback_str}{date_mes}_{chat_id}_{message_id}.pickle', 'wb') as f:
pickle.dump(message, f)
return 0
def dump_callback(callback) -> int: # VERY BIG Brother Is Watching You.
"""
Фукция для дампа всех callback-ов, аналогично dump_message.
:param callback:
:return: 0 если все ок
"""
date = now_date
date_mes = callback.message.date
chat_id = callback.message.chat.id
message_id = callback.message.message_id # чтобы точно задампились все сообщения, потому что могут быть удаленные и
# отправленные несколько раз в секунду
with open(f'{path}messages_backup/{date}/call_input_{date_mes}_{chat_id}_{message_id}.pickle', 'wb') as f:
pickle.dump(callback, f)
return 0
def send_message(chat_id, text, **kwargs) -> telebot.types.Message:
"""
Функция отправки сообщений. Создана одновременно и для обхода ограничения на максимальную длину текста, и для
автодампа сообщения. Возвращается API-reply отправленного сообщения; если текст больше 4096 символов - то оно
делится и возвращается api-peply последнего отправленного сообщения
Telegram documentation: https://core.telegram.org/bots/api#sendmessage
:param chat_id: Unique identifier for the target chat or username of the target channel (in the format
@channelusername)
:param text: Text of the message to be sent
:param parse_mode: Send Markdown or HTML, if you want Telegram apps to show bold, italic, fixed-width text or inline
URLs in your bot's message.
:param entities: List of special entities that appear in message text, which can be specified instead of parse_mode
:param disable_web_page_preview: Disables link previews for links in this message
:param disable_notification: Sends the message silently. Users will receive a notification with no sound.
:param protect_content: If True, the message content will be hidden for all users except for the target user
:param reply_to_message_id: If the message is a reply, ID of the original message
:param allow_sending_without_reply: Pass True, if the message should be sent even if the specified replied-to
message is not found
:param reply_markup: Additional interface options. A JSON-serialized object for an inline keyboard, custom reply
keyboard, instructions to remove reply keyboard or to force a reply from the user.
:param timeout:
:return: API reply (JSON-serialized message object)
"""
msg = 0 # дабы IDE не ругалась, далее эта переменная так и так перезапишется
if len(text) > 4096: # обход ограничения
splitted_text = telebot.util.smart_split(text, chars_per_string=3000)
for text in splitted_text:
msg = bot.send_message(chat_id, text, **kwargs)
dump_message(msg)
else:
msg = bot.send_message(chat_id, text, **kwargs)
dump_message(msg)
return msg
def update_list_registered_users(): # ее нужно вызывать каждый раз при запуске и добавлении юзеров
"""
Обновляет переменную list_registered_users, загружая в нее все chat.id, которые есть в group_ids.db
:return: 0 если все ок, иначе ошибка RuntimeError (для отправки в админскую беседу)
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
con.row_factory = lambda cur, row: int(row[0])
cur = con.cursor()
cur.execute(f'SELECT tg_id FROM user_ids WHERE tg_id IS NOT NULL')
users = set(cur.fetchall())
cur.execute(f'SELECT tg_chat_id FROM group_gcals WHERE tg_chat_id IS NOT NULL') # беседы
groups = set(cur.fetchall())
cur.execute(f'SELECT tg_id FROM user_ids WHERE vk_id IS NULL AND tg_id IS NOT NULL')
unauth_users = set(cur.fetchall())
list_unauthorized_users.update(unauth_users)
cur.execute(f'SELECT tg_id FROM user_ids WHERE vk_id IS NOT NULL AND tg_id IS NOT NULL')
auth_users = set(cur.fetchall())
new_auth_users = auth_users & list_unauthorized_users # убираем авторизовавшихся
con.close()
list_registered_users.update(users)
list_registered_users.update(groups)
for el in new_auth_users:
list_unauthorized_users.remove(el)
def update_moderators(): # ее нужно вызывать каждый раз при запуске и добавлении модераторов
"""
Обновляет переменную moderators, загружая в нее все chat.id, которые есть в admins.db.
И админы, и модераторы.
:return: 0 если все ок, иначе ошибка RuntimeError (для отправки в админскую беседу)
"""
with sqlite3.connect(f'{path}admindb/databases/admins.db') as con:
cur = con.cursor()
cur.execute(f'SELECT tg_id FROM users WHERE tg_id IS NOT NULL')
users = cur.fetchall()
con.close()
for user in users:
moderators.add(int(user[0])) # теоретически тут уязвимое место для ошибки, но либо тут добавлять
# int, либо же в классе с фильтром добавлять str, т.к. cursor возвращает строки
return 0
def update_admins(): # ее нужно вызывать каждый раз при запуске и добавлении админов
"""
Обновляет переменную admins, загружая в нее все chat.id, которые есть в admins.db с freedom=admin
И админы, и модераторы.
:return: 0 если все ок, иначе ошибка RuntimeError (для отправки в админскую беседу)
"""
with sqlite3.connect(f'{path}admindb/databases/admins.db') as con:
cur = con.cursor()
cur.execute(f'SELECT tg_id FROM users WHERE freedom=?', ['admin'])
users = cur.fetchall()
con.close()
for user in users:
admins.add(int(user[0])) # теоретически тут уязвимое место для ошибки, но либо тут добавлять
# int, либо же в классе с фильтром добавлять str, т.к. cursor возвращает строки
return 0
def update_prepods(): # ее нужно вызывать каждый раз при запуске и добавлении преподов (то есть никогда лол)
"""
Обновляет переменную list_prepods для поиска преподов
:return: 0 если все ок
"""
global list_prepods
with sqlite3.connect(f'{path}admindb/databases/prepods.db') as con:
con.row_factory = lambda cursor, row: row[0] # чтобы возвращать list, а не list of tuples
cur = con.cursor()
query = 'SELECT surname FROM prepods'
list_prepods = cur.execute(query).fetchall()
return 0
def update_groups_data(): # ее нужно вызывать каждый раз при запуске и добавлении преподов (то есть никогда лол)
"""
Обновляет dict с данными групп
:return: 0 если все ок
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
# достаем StudyStatus
cur.execute("SELECT group_id, isStudy, isExam, gcal_link FROM group_gcals")
status_data = {v[0]: {'isStudy':v[1], 'isExam':v[2], 'gcal': v[3]} for v in cur.fetchall()}
for k, v in status_data.items():
isStudy, isExam, gcal = v.values()
study_status = ""
if isExam and isStudy:
study_status = 'mixed'
elif isStudy:
study_status = 'study'
elif isExam:
study_status = 'exam'
groups[k] = {'calendar': True if gcal else False,
'status': study_status}
groups[None] = {'calendar': None,
'status': None}
return 0
def callback_to_json(callback_data) -> dict:
"""
Переводит callback_data в payload как в вк; обратная функция callback_to_json(). Сделано для удобства и красоты :)
Подробнее см. keyboards_telegram.create_keyboards
Пример:
callback_data='t:action,a_t:message,c:table_empty'
payload={"type": "action", "command": "table_empty"}
:param callback_data: входная строка
:return: json-dict payload
"""
eng_to_rus_days = {
'Monday (even)': 'Понедельник (чёт)',
'Monday (odd)': 'Понедельник (нечёт)',
'Tuesday (even)': 'Вторник (чёт)',
'Tuesday (odd)': 'Вторник (нечёт)',
'Wednesday (even)': 'Среда (чёт)',
'Wednesday (odd)': 'Среда (нечёт)',
'Thursday (even)': 'Четверг (чёт)',
'Thursday (odd)': 'Четверг (нечёт)',
'Friday (even)': 'Пятница (чёт)',
'Friday (odd)': 'Пятница (нечёт)',
'Saturday (even)': 'Суббота (чёт)',
'Saturday (odd)': 'Суббота (нечёт)',
'Sunday (even)': 'Воскресенье (чёт)',
'Sunday (odd)': 'Воскресенье (нечёт)',
'week (even)': 'full (чёт)',
'week (odd)': 'full (нечёт)'
}
payload_item_list = ['type', 'action_type', 'command', 'place', 'weekday', 'subject', 'department_id', 'list_id']
callback_item_list = ['t', 'a_t', 'c', 'p', 'wd', 'sj', 'did', 'lid']
payload = {}
for i in callback_data.split(','):
key, value = i.split(':')
if key in callback_item_list:
key = payload_item_list[callback_item_list.index(key)]
payload[key] = value
if payload['type'] == 'action':
if payload['command'] == 'table_weekday' \
or payload['command'] == 'table_weekday_2' \
or payload['command'] == 'table_prepod':
payload['weekday'] = eng_to_rus_days[payload['weekday']]
return payload
def get_subject_from_id(id, group):
"""
Принимает subject_id (кусок md5-хэша) и возвращает нормальный предмет из таблицы subject_ids
:param id: subject_id
:param group: группа
:return: subject
"""
with sqlite3.connect(f'{path}databases/{group}.db') as con:
cur = con.cursor()
cur.execute(f'SELECT subject FROM subject_ids WHERE id=?', [id])
subject = cur.fetchone()
con.close()
return subject[0]
def get_group(user_id):
"""
Принимает user_id и возвращает группу этого пользователя.
:param user_id: id пользователя (в таблице записан как tg_id для предотвращения путаницы с вк)
:return: номер группы
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
cur.execute("SELECT group_id FROM user_ids WHERE tg_id=?", [user_id])
group = cur.fetchone()
if group:
group = group[0]
return group
def get_additional_group(user_id):
"""
Принимает user_id и возвращает дополнительную группу пользователя, при наличии.
Оставил это внутри бота пока - возможно чуть быстрее?
:param user_id: id пользователя (в таблице записан как tg_id для предотвращения путаницы с вк)
:return: номер доп. группы
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
cur.execute("SELECT additional_group_id FROM user_ids WHERE tg_id=?", [user_id])
extra_group = cur.fetchone()
if extra_group and extra_group[0] != '':
extra_group = extra_group[0]
else:
extra_group = None
return extra_group
def group_study_status(group) -> str:
"""
Принимает номер группы и смотрит их учебный статус, нужно для выбора открываемой клавиатуры
Оставил это внутри бота пока - возможно чуть быстрее?
:param group: номер группы
:return: статус группы: 'mixed' (есть оба расписания) / 'study' (учебный) / 'exam' (сессия) / '' (нет расписания)
"""
return_message = ''
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
isStudy, isExam = cur.execute("SELECT isStudy, isExam FROM group_gcals WHERE group_id=?", [group]).fetchall()[0]
if isExam and isStudy:
return_message = 'mixed'
elif isStudy:
return_message = 'study'
elif isExam:
return_message = 'exam'
return return_message # в результате получится либо kb_table либо kb_table_exam либо kb_table_
def open_keyboard(name):
"""
Чтение клавиатуры из .json-файла
:param str name: название клавиатуры
:return: markup клавиатуры
"""
with open(f'{path}keyboards_telegram/{name}.json', 'r', encoding='utf-8') as f:
markup: telebot.types.InlineKeyboardMarkup = f.read() # тут все нормально с типами, не верь IDE и глазам
return markup
def get_books(subject, group, callback_object):
"""
Отправляет пользователю вложения, указанные в файлах в папке books
:param str subject: предмет
:param group: группа
:param callback_object: telebot-объект callback для изменения сообщения и для распаковки chat.id
:return: 0 или ошибка
"""
with sqlite3.connect(f'{path}databases/{group}.db') as con:
cur = con.cursor()
query = f"SELECT name, file_link_tg FROM books WHERE subject=? AND file_link_tg IS NOT NULL"
all_books = cur.execute(query, [subject]).fetchall()
query_links = f"SELECT name FROM books WHERE subject=? AND file_link_tg IS NULL AND doc_link IS NULL"
all_links = cur.execute(query_links, [subject]).fetchall()
# на всякий случай, проверка на то, не попали ли ссылки в all_books (такое мб, если not null)
if all_books:
i = 0
while i < len(all_books):
if not all_books[i][1]:
all_links.append(all_books[i])
all_books.remove(all_books[i])
else:
i += 1
# Проверка, зарегистрирован ли пользователь ВК (только если есть методички)
if all_books or all_links:
if callback_object.from_user.id in list_unauthorized_users:
err_str = f'Ошибка доступа - методички могут смотреть только авторизованные через ВК участники.' \
f'Это сделано для контроля доступа к файлам группы - в методички группы можно добавлять ' \
f'ссылки на файлы в облаке, закрытые плейлисты и пр.\n' \
f'Чтобы получить доступ, зарегистрируйся в сообществе ВКонтакте: https://vk.com/kiberded_bot ' \
f'\nВ случае возникновения сложностей можешь писать админам: ' \
f'https://t.me/evgeniy_setrov или https://t.me/TSheyd'
send_message(callback_object.from_user.id, text=err_str)
return 0
# отдельно книжки:
if all_books:
for i in range(math.ceil(len(all_books)/5)):
media = []
for k in range(5):
if (i*5)+k in range(len(all_books)):
media.append(telebot.types.InputMediaDocument(all_books[(i * 5) + k][1],
caption=f'{(i*5)+k+1}. ' + all_books[(i*5)+k][0]))
msg_grp = bot.send_media_group(callback_object.from_user.id, media)
for msg in msg_grp:
dump_message(msg)
# и отдельно ссылки самым последним сообщением:
if all_links:
text = 'Ссылки:\n'
for i in range(len(all_links)):
text += f'{i+1}. {all_links[i][0]}\n\n'
send_message(callback_object.from_user.id, text)
if not all_books and not all_links:
cl = bot.edit_message_text(f'{subject}: пусто.\nДобавлять сюда файлы и ссылки может модератор группы. '
f'Изменение БД группы пока доступно только ВКонтакте.',
callback_object.from_user.id,
message_id=callback_object.message.id,
reply_markup=open_keyboard(f'{group}_books'))
dump_message(cl, callback=True)
return 0
def add_prepod_to_history(prepod_id, user_id):
"""
Добавляет препода в историю преподов, если его там нет, или же перемещает на первое место.
Если преподов больше 7, то удаляет последнего
:param int prepod_id: id препода
:param int user_id: id пользователя
:return: 0 или ошибка
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
query = f"SELECT prepod_history FROM user_ids WHERE tg_id=?"
prepod_history = cur.execute(query, [user_id]).fetchone()[0]
if prepod_history:
prepod_history = prepod_history.split(',')
if prepod_id in prepod_history:
prepod_history.remove(prepod_id)
prepod_history.insert(0, prepod_id)
if len(prepod_history) > 7:
prepod_history.pop()
prepod_history = ','.join(prepod_history)
else:
prepod_history = str(prepod_id)
query = f"UPDATE user_ids SET prepod_history=? WHERE tg_id=?"
cur.execute(query, [prepod_history, user_id])
con.commit()
return 0
def get_prepod_info(prepod_id):
"""
Получает информацию о преподе из БД
:param int prepod_id: id препода
:return: кортеж с информацией [id, department_id, initials, name, surname, midname, roles], название кафедры
"""
with sqlite3.connect(f'{path}admindb/databases/prepods.db') as con:
cur = con.cursor()
query = f"SELECT * FROM prepods WHERE id=?"
prepod_info = cur.execute(query, [prepod_id]).fetchall()
readable_department = ""
# todo убрать это вообще - все норм в базе будет.
if prepod_info:
departments = [str(e[1]) for e in prepod_info]
prepod_info = prepod_info[0]
query = "SELECT title FROM departments WHERE id IN ({})".format(','.join(departments))
readable_department = ', '.join([e[0] for e in cur.execute(query).fetchall()])
return prepod_info, readable_department
def get_prepods_history(call):
"""
Получает историю преподов пользователя и отправляет ему кнопки с ними
:param call: callback-объект для изменения сообщения
:return: список преподов
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
query = f"SELECT prepod_history FROM user_ids WHERE tg_id=?"
prepod_history = cur.execute(query, [call.from_user.id]).fetchone()[0]
if prepod_history:
prepod_history = prepod_history.split(',')
prepod_history = [int(i) for i in prepod_history]
else:
prepod_history = []
if prepod_history:
markup = telebot.types.InlineKeyboardMarkup()
for prepod_id in prepod_history:
prepod, department = get_prepod_info(prepod_id)
payload = {"type": "action",
"command": f"choose_prepod",
"id": str(prepod[0]),
"department_id": str(prepod[1])
}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'{prepod[2]} ({department})',
callback_data=callback))
payload = {"type": "navigation",
"place": "table_prepods"}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'Назад', callback_data=callback))
cl = bot.edit_message_text(text='Выбери препода из истории:',
chat_id=call.from_user.id,
message_id=call.message.id,
reply_markup=markup)
dump_message(cl, callback=True)
else:
markup = telebot.types.InlineKeyboardMarkup()
payload = {"type": "navigation",
"place": "table_prepods"}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'Назад', callback_data=callback))
cl = bot.edit_message_text(chat_id=call.from_user.id,
text='История пуста.',
message_id=call.message.id,
reply_markup=markup)
dump_message(cl, callback=True)
return prepod_history
def get_prepod_schedule(prepod_id, weekday):
"""
Получает расписание препода из БД
:param int prepod_id: id препода
:param int weekday: номер дня недели
:return: расписание препода
"""
with sqlite3.connect(f'{path}admindb/databases/prepods.db') as con:
cur = con.cursor()
query = f"SELECT day_table FROM table_cache WHERE id=? AND day=?"
schedule = cur.execute(query, [prepod_id, weekday]).fetchall()
if schedule:
schedule = schedule[0]
else:
schedule = 'У преподавателя пока нет расписания на этот семестр.'
return schedule
def set_tables_time(message):
"""
Настройка подписки на расписания (время рассылки)
"""
time_ = str(message.text)
if len(time_) != 5: # Дополняем нулями, если необходимо
time_ = time_.zfill(5)
try: # Проверка формата времени
time_check = time.strptime(time_, '%H:%M')
except ValueError:
msg = 'Ошибка - проверь формат сообщения (ЧЧ:ММ), нажми кнопку и попробуй еще раз'
send_message(message.chat.id, msg)
return False
with sqlite3.connect(f'{path}admindb/databases/table_ids.db') as con:
cur = con.cursor()
upd_query = f'UPDATE `tg_users` SET time=? WHERE id=?'
cur.execute(upd_query, (time_, message.chat.id))
con.commit()
msg = f'Время рассылки расписания установлено: {time_}. Изменения вступят в силу со следующего дня'
send_message(message.chat.id, msg)
return True
# Команды в ЛС:
@bot.message_handler(commands=['main'], is_registered=True)
def main_reply(message):
dump_message(message)
if message.chat.type == 'private':
group = get_group(message.chat.id)
markup = open_keyboard(f'{group}_main')
send_message(message.chat.id, "Дед на связи", reply_markup=markup)
else:
send_message(message.chat.id, "Данная команда доступна только в личных сообщениях.")
@bot.message_handler(commands=['change_group'], is_registered=True)
def change_group(message):
dump_message(message)
if message.chat.type == 'private':
group = get_group(message.chat.id)
additional_group = get_additional_group(message.chat.id)
if message.chat.id in moderators:
kb = open_keyboard('kb_change_additional_group')
send_message(message.chat.id,
f'Текущая группа: {group}.\n Текущая доп.группа: {additional_group}.\n'
'Для изменения дополнительной группы нажми на кнопку ниже.\n'
'Изменение основной группы недоступно модераторам. При необходимости напиши админам: '
'https://t.me/evgeniy_setrov или https://t.me/TSheyd', reply_markup=kb,
disable_web_page_preview=True)
else:
kb = open_keyboard('kb_change_groups')
send_message(message.chat.id,
f'Текущая группа: {group}.\n Текущая доп.группа: {additional_group}.\n'
'Для изменения нажми на одну из кнопок ниже:', reply_markup=kb)
else:
send_message(message.chat.id, "Данная команда доступна только в личных сообщениях.")
def change_group_step(message):
dump_message(message)
group = get_group(message.chat.id)
if len(message.text) == 4 and message.text.isdecimal():
if not check_group_exists(message.text):
send_message(message.chat.id, f'Ошибка - группа {message.text} не найдена. '
f'Проверь правильность номера или обратись к администраторам')
return False
group_exists, user_existed, msg = change_user_group(message.text, message.chat.id, source='telegram')
if not group_exists:
add_db_response, admin_add_db_response = create_database(message.text)
send_message(admin_chat, text=admin_add_db_response)
send_message(message.chat.id, text=add_db_response)
markup = open_keyboard(f'{message.text}_main')
send_message(message.chat.id, text=msg, reply_markup=markup)
if group is None:
admin_msg = f'К нам пришел дикий {message.chat.id} (@{message.from_user.username}) из {message.text}'
else:
admin_msg = f'Юзер {message.chat.id} (@{message.from_user.username}) изменил группу: ' \
f'{group} -> {message.text}'
send_message(admin_chat, admin_msg)
else:
msg = 'Ошибка - неверный формат номера группы'
send_message(message.chat.id, msg)
update_list_registered_users()
def change_additional_group_step(message):
dump_message(message)
additional_group = get_additional_group(message.chat.id)
if len(message.text) == 4 and message.text.isdecimal():
if str(message.text) != '0000':
if not check_group_exists(message.text):
send_message(message.chat.id, f'Ошибка - группа {message.text} не найдена. '
f'Проверь правильность номера или обратись к администраторам')
return False
user_existed, msg = change_user_additional_group(message.text, message.chat.id, source='telegram')
send_message(message.chat.id, text=msg)
if not additional_group:
send_message(admin_chat, f'Юзер {message.chat.id} (@{message.from_user.username}) добавил доп. группу: '
f'{message.text}')
elif message.text == '0000':
send_message(admin_chat, f'Юзер {message.chat.id} (@{message.from_user.username}) удалил доп. группу.')
else:
send_message(admin_chat, f'Юзер {message.chat.id} (@{message.from_user.username}) изменил доп. группу: '
f'{additional_group if additional_group else "None"} -> {message.text}')
else:
msg = 'Ошибка - неверный формат номера группы'
send_message(message.chat.id, msg)
def add_new_chat_step(message):
group = str(message.text)
tg_id = message.chat.id
if len(group) != 4 or not group.isdecimal():
send_message(tg_id, f'Ошибка - неверный формат номера группы: {group}. Попробуй еще раз')
return 0
return_str = ''
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
group_check = cur.execute('SELECT group_id FROM group_gcals WHERE group_id=?', [group]).fetchone()
if not group_check:
send_message(tg_id, f'Ошибка - нет такой группы: {group}. Проверь данные и попробуй еще раз')
return 0
old_chat_id = cur.execute('SELECT vk_chat_id FROM group_gcals WHERE group_id=?', [group]).fetchone()[0]
if old_chat_id:
return_str = f'Беседа группы {group} уже есть ВКонтакте - vk_chat_id={old_chat_id}\n' \
f'Чтобы добавить беседу и в Телеграм, напиши в беседе ВК команду "@kiberded_bot телеграм"'
else:
old_chat_id = cur.execute('SELECT tg_chat_id '
'FROM group_gcals '
'WHERE group_id=?', [group]).fetchone()[0]
if old_chat_id:
if tg_id == old_chat_id:
send_message(tg_id, f'Беседа уже привязана к группе {group}')
send_message(tg_id, f'Беседа {group} уже существует, создание нескольких бесед на группу пока '
f'не поддерживается')
return 0
cur.execute('UPDATE group_gcals SET tg_chat_id=NULL WHERE tg_chat_id=?', [tg_id])
cur.execute('UPDATE group_gcals SET tg_chat_id=? WHERE group_id=?', [tg_id, group])
con.commit()
return_str = f'Группа {group} успешно добавлена.\n' \
f'Теперь бот будет ежедневно утром присылать сюда расписание на день. Расписание ' \
f'создается на основе расписания на сайта ЛЭТИ и может изменяться модератором ' \
f'группы.\n' \
f'Также Кибердед может присылать ' \
f'уведомления о новых письмах на почте группы (если подключить почту). ' \
f'Вместо расписания бот может присылать ивенты на день с ' \
f'гугл-календаря (при наличии).' \
f'\nВсе настройки бота под группу доступны пока только ВКонтакте: https://vk.com/kiberded_bot'
con.close()
update_list_registered_users()
send_message(tg_id, return_str)
send_message(admin_chat, f'К нам пришла дикая конфа {group}: {message.chat.id}, '
f'username: @{message.from_user.username}')
return 0
def search_prepods_by_surname(surname):
"""
Поиск id преподавателей по фамилии
:param surname: фамилия преподавателя
:return: [prepodId, departmentId] - список id преподавателей/кафедр
"""
with sqlite3.connect(f'{path}admindb/databases/prepods.db') as con:
con.row_factory = lambda cursor, row: row[0] # чтобы возвращать list, а не list of tuples
cur = con.cursor()
prepods = cur.execute('SELECT id, department_id FROM prepods WHERE surname=?', [surname]).fetchall()
con.close()
return prepods
def search_prepod_text_step(message):
"""
Поиск преподов в списке list_prepods и отправка юзеру кнопочек
:param message:
:return: 0
"""
dump_message(message)
result = process.extract(message.text, list_prepods, limit=4)
if result[0][1] == 100:
answer = 'Преподаватель найден'
markup = telebot.types.InlineKeyboardMarkup()
prepods = search_prepods_by_surname(result[0][0])
for prepod_id in prepods:
prepod, department = get_prepod_info(prepod_id)
payload = {"type": "action",
"command": f"choose_prepod",
"id": str(prepod[0]),
"department_id": str(prepod[1])
}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'{prepod[2]} ({department})',
callback_data=callback))
payload = {"type": "navigation",
"place": "table_prepods"}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'Назад', callback_data=callback))
else:
answer = 'Точных совпадений не найдено. Похожие фамилии:'
markup = telebot.types.InlineKeyboardMarkup()
for element in result:
prepods = search_prepods_by_surname(element[0])
for prepod_id in prepods:
prepod, department = get_prepod_info(prepod_id)
payload = {"type": "action",
"command": f"choose_prepod",
"id": str(prepod[0]),
"department_id": str(prepod[1])
}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'{prepod[2]} ({department})',
callback_data=callback))
payload = {"type": "navigation",
"place": "table_prepods"}
callback = payload_to_callback(payload)
markup.add(telebot.types.InlineKeyboardButton(text=f'Назад', callback_data=callback))
send_message(chat_id=message.chat.id, text=answer, reply_markup=markup)
def add_telegram_user_id(vk_id, tg_id, id_type='user'):
"""
Добавление аккаунта пользователя в Телеграме к строке с его аккаунтом в ВК. Здесь нет обработки каких-то ошибок,
т.к. наличие строки с айди ВК предусмотрено авторизацией по хэшу
:param vk_id: айди пользователя/беседы ВК
:param tg_id: айди пользователя/беседы ТГ
:param id_type: тип айди - "user" или "group"
:return: Сообщение о привязке аккаунта, группа пользователя
"""
if id_type == 'user':
del_q = f'DELETE FROM user_ids WHERE tg_id=? AND vk_id IS NULL'
old_q = f'SELECT tg_id FROM user_ids WHERE vk_id=?'
upd_q = f'UPDATE user_ids SET tg_id=? WHERE vk_id=?'
grp_q = f'SELECT group_id FROM user_ids WHERE vk_id=?'
grp_alt_q = f'SELECT group_id FROM user_ids WHERE tg_id=?'
elif id_type == 'group':
del_q = f'DELETE FROM group_gcals WHERE tg_chat_id=? AND vk_chat_id IS NULL'
old_q = f'SELECT tg_chat_id FROM group_gcals WHERE vk_chat_id=?'
upd_q = f'UPDATE group_gcals SET tg_chat_id=? WHERE vk_chat_id=?'
grp_q = f'SELECT group_id FROM group_gcals WHERE vk_chat_id=?'
grp_alt_q = f'SELECT group_id FROM group_gcals WHERE tg_chat_id=?' # вот этот запрос пока не нужен, но все же
else:
raise ValueError('id_type must be "user" or "group"')
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
group = cur.execute(grp_q, [vk_id]).fetchone()
if not group: # неавторизованный пользователь регистрируется
group = cur.execute(grp_alt_q, [tg_id]).fetchone()
group = group[0]
old_id = cur.execute(old_q, [vk_id]).fetchone() # Проверяем, не привязан ли аккаунт уже.
if old_id:
if old_id[0] == tg_id:
msg = f'Аккаунт уже привязан к https://vk.com/id{vk_id} ({group})' if id_type == 'user' \
else f'Беседа уже привязана к {group}'
return msg, group
# Если что-то было зарегистрировано в ТГ, но не в ВК - удаляем строку и перезаписываем в строке в ВК
cur.execute(del_q, [tg_id])
# Случай регистрации пользователя без ВК обрабатывается change_group_step
cur.execute(upd_q, (tg_id, vk_id))
con.commit()
con.close()
update_list_registered_users()
if id_type == 'user':
return f"Аккаунт в Телеграме успешно привязан к https://vk.com/id{vk_id}, группа {group}.", group
elif id_type == 'group':
return f"Группа в Телеграме успешно привязана к vk_chat_id={vk_id}, группа {group}.", group
def get_attendance_statistics_today(checkin):
"""
Формирует адекватный строковый ответ о статистике посещаемости за сегодня из ответа с
https://digital.etu.ru/attendance/api/schedule/check-in
:param checkin: json ответ
:return str: ответ в виде строки
"""
answer = 'Статистика за сегодня: \n\n'
for lesson_elem in checkin:
time_start = time.strptime(lesson_elem['start'], '%Y-%m-%dT%H:%M:%S.000%z')
time_end = time.strptime(lesson_elem['end'], '%Y-%m-%dT%H:%M:%S.000%z')
day_class = time_start.tm_yday
day_now = time.gmtime(time.time()).tm_yday
if day_now == day_class:
lesson_name = lesson_elem['lesson']['shortTitle']
subject_type = lesson_elem['lesson']['subjectType']
self_reported = lesson_elem['selfReported']
if self_reported:
self_reported_ans = '✅'
elif self_reported == False: # не надо делать elif not self_reported, т.к. в случае отсутствия отметки
# сработает это условие (тип Nonetype), а по моей логике должно сработать условие else
self_reported_ans = '❌'
else:
self_reported_ans = '🟢'
answer += f'{time_start.tm_hour:02}:{time_start.tm_min:02} - {time_end.tm_hour:02}:{time_end.tm_min:02}: ' \
f'{lesson_name} ({subject_type}): {self_reported_ans}\n'
return answer
def check_in_at_lesson(chat_id, lesson_id):
"""
Функция для отмечания на паре
"""
with sqlite3.connect(f'{path}admindb/databases/group_ids.db') as con:
cur = con.cursor()
data = cur.execute("SELECT lk_email, lk_password FROM user_ids WHERE tg_id=?", [chat_id]).fetchall()
data = list(data[0])
msg = send_message(chat_id, 'Отмечаемся на паре... Логинюсь в ЛК...')
session = attendance.start_new_session()
code, session = attendance.auth_in_lk(session, data[0], data[1])
if code == 200:
msg = bot.edit_message_text(msg.text + '✅\nЛогинюсь в ИС Посещаемость...', msg.chat.id, msg.id)
else:
bot.edit_message_text(f'Аутентификация в ЛК не удалась. Возможно в базе хранятся неправильные данные для входа.'
f'\nТекущие данные:\n\nemail: {data[0]}\nПароль: ***{data[1][:-3]}. \n\nЕсли данные'
f'верны, попробуй еще раз.', msg.chat.id, msg.id)
return 0
code, session = attendance.auth_in_attendance(session)
if code == 200:
msg = bot.edit_message_text(msg.text + '✅\nОтмечаюсь на паре...', msg.chat.id, msg.id)
else:
bot.edit_message_text(f'Аутентификация в ИС Посещаемость не удалась.', msg.chat.id, msg.id)
return 0
code, session = attendance.check_in_at_lesson(session, lesson_id)
if code == 201:
msg = bot.edit_message_text(msg.text + '✅\nТы успешно отметился на паре.', msg.chat.id, msg.id)
else:
bot.edit_message_text(f'Отметиться на паре не удалось. Возможно, время уже вышло. Код ошибки: {code}',
msg.chat.id, msg.id)
return 0
code, time_data, user, checkin, alldata = attendance.get_info_from_attendance(session)
answer = get_attendance_statistics_today(checkin)
if code == 200:
msg = bot.edit_message_text(msg.text + f"\n\n" + answer, msg.chat.id, msg.id)
else:
msg = bot.edit_message_text(msg.text + 'Не удалось загрузить статистику о сегодняшних отметках. Попробуй еще раз '
'через /attendance_stat или вручную.', msg.chat.id, msg.id)
return 0