diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..f1f11a5 --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +ignore = F401 E402 +max-line-length = 1000 +exclude = tests/* \ No newline at end of file diff --git a/.github/workflows/lint-and-format.yml b/.github/workflows/lint-and-format.yml new file mode 100644 index 0000000..ba5e198 --- /dev/null +++ b/.github/workflows/lint-and-format.yml @@ -0,0 +1,31 @@ +name: Lint Code + +on: + push: + branches: + - main + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Check out repository + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.x + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install black flake8 + + - name: Format with black + run: black . + + - name: Lint with flake8 + run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + continue-on-error: false \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..018fc19 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +/dist +/__pycache__ +/cs +bot.py +cs.py +/log +.env.bot \ No newline at end of file diff --git a/cmd_color.py b/cmd_color.py new file mode 100644 index 0000000..ccc1a00 --- /dev/null +++ b/cmd_color.py @@ -0,0 +1,25 @@ +# ! /usr/bin/env python +# coding=utf-8 +import time +from colorama import init, Fore, Back, Style + +init(autoreset=True) + + +class CmdColor: + def __init__(self): + pass + + def ccolor(self, state: str, text: str, color: str): + t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) + if color == 'red': + print(Fore.RED + f'[{state}]' + Fore.RESET + " " + Fore.RED + t + Fore.RESET + " " + Fore.RED + text + Fore.RESET) + elif color == 'green': + print(Fore.GREEN + f'[{state}]' + Fore.RESET + " " + Fore.GREEN + t + Fore.RESET + " " + Fore.GREEN + text + Fore.RESET) + elif color == 'blue': + print(Fore.BLUE + f'[{state}]' + Fore.RESET + " " + Fore.BLUE + t + Fore.RESET + " " + Fore.BLUE + text + Fore.RESET) + elif color == 'yellow': + print(Fore.YELLOW + f'[{state}]' + Fore.RESET + " " + Fore.YELLOW + t + Fore.RESET + " " + Fore.YELLOW + text + Fore.RESET) + + def h_time(self, state, text): + print(Fore.BLUE + f'[{state}]' + Fore.RESET + " " + Fore.BLUE + text + Fore.RESET) diff --git a/data.json b/data.json new file mode 100644 index 0000000..0c07069 --- /dev/null +++ b/data.json @@ -0,0 +1,42 @@ +{ + "1": "责,理,机,管,监,界,巡,模,优,发,确", + "data": [ + "responsibleoperator", + "steward", + "bot", + "sysop", + "templateeditor", + "interface-admin", + "patroller", + "suppress", + "autopatrolled", + "massmessage-sender", + "confirmed" + ], + "to": { + "责": "responsibleoperator", + "理": "steward", + "机": "bot", + "管": "sysop", + "监": "templateeditor", + "界": "interface-admin", + "巡": "patroller", + "模": "suppress", + "优": "autopatrolled", + "发": "massmessage-sender", + "确": "confirmed" + }, + "form": { + "responsibleoperator": "责", + "steward": "理", + "bot": "机", + "sysop": "管", + "templateeditor": "监", + "interface-admin": "界", + "patroller": "巡", + "suppress": "模", + "autopatrolled": "优", + "massmessage-sender": "发", + "confirmed": "确" + } +} \ No newline at end of file diff --git a/file_handle.py b/file_handle.py new file mode 100644 index 0000000..10ec293 --- /dev/null +++ b/file_handle.py @@ -0,0 +1,85 @@ +import json +from pathlib import Path + + +""" +xjiebot 文件操作 +懒的写直接复制 +获取自身路径下的文件 +""" + + +def file_path(file): + return Path(__file__).resolve().parent / file + + +class xj_file_handle: + def __init__(self): + pass + + def xj_file_reading(self, file_name: str, file_content: str = None): + json_file_path_reading = file_path(file_name) + try: + with json_file_path_reading.open("r", encoding="utf-8") as json_file: + loaded_data = json.load(json_file) + if file_content is None: + return loaded_data + return loaded_data.get(file_content, None) + except FileNotFoundError: + print(f"File not found: {file_name}") + except json.JSONDecodeError: + print(f"Error decoding JSON from the file: {file_name}") + except Exception as e: + print(f"An error occurred: {e}") + + def xj_file_change(self, file_name: str, file_key: str, file_content: str): + json_file_path_change = file_path(file_name) + try: + with json_file_path_change.open("r", encoding="utf-8") as json_file: + loaded_data = json.load(json_file) + except FileNotFoundError: + print(f"文件 {file_name} 未找到。") + return + except json.JSONDecodeError: + print(f"{file_name} 文件内容不是有效的JSON格式。") + return + if file_key not in loaded_data: + print(f"键 '{file_key}' 在文件中不存在。") + return + loaded_data[file_key] = file_content + try: + with json_file_path_change.open("w", encoding="utf-8") as json_file: + json.dump(loaded_data, json_file, indent=4) + except IOError as e: + print(f"写入文件时发生错误: {e}") + + def get_keys_ending_with_key(self, json_data, key_suffix='_KEY'): + try: + json_file_path_reading = file_path(json_data) + + with open(json_file_path_reading, "r", encoding="utf-8") as json_file: + loaded_data = json.load(json_file) + + except FileNotFoundError: + print(f"Error: The file {json_file_path_reading} was not found.") + return None + + except json.JSONDecodeError: + print(f"Error: Failed to decode JSON from file {json_file_path_reading}.") + return None + + except Exception as e: + print(f"An unexpected error occurred: {e}") + return None + + result = {} + for key in loaded_data.keys(): + if key.endswith(key_suffix) and loaded_data[key]: + result[key] = loaded_data[key] + + return result + + def read_filenames_with_pathlib(directory): + path_obj = Path(directory) + filenames = [file.name for file in path_obj.iterdir() if file.is_file()] + return filenames diff --git a/httpx_request.py b/httpx_request.py new file mode 100644 index 0000000..066fb2a --- /dev/null +++ b/httpx_request.py @@ -0,0 +1,97 @@ +import httpx +from typing import Dict, Any + +""" +xjbot 请求框架 +""" + + +class AsyncHttpClient: + def __init__(self, max_connections: int = 20): + self.client = httpx.AsyncClient(limits=httpx.Limits(max_connections=max_connections)) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.close() + + async def get(self, url: str, params: Dict[str, Any] = None, headers: Dict[str, str] = None) -> httpx.Response: + """ + 异步发送GET请求。 + """ + try: + response = await self.client.get(url, params=params, headers=headers) + response.raise_for_status() + return response + except httpx.TimeoutException as e: + raise Exception("请求超时") from e + except httpx.RequestError as e: + raise Exception(f"网络请求错误: {e}") from e + except httpx.HTTPStatusError as e: + raise Exception(f"服务器返回错误: {e.response.status_code} - {e.response.text}") from e + + async def post(self, url: str, json: Dict[str, Any] = None, data: Dict[str, Any] = None, params: Dict[str, Any] = None, headers: Dict[str, str] = None) -> httpx.Response: + """ + 异步发送POST请求。 + """ + try: + response = await self.client.post(url, json=json, data=data, params=params, headers=headers) + response.raise_for_status() + return response + except httpx.TimeoutException as e: + raise Exception("请求超时") from e + except (httpx.RequestError, httpx.HTTPStatusError) as e: + raise Exception(f"请求错误: {str(e)}") from e + + async def close(self): + await self.client.aclose() + + +class HttpClient: + def __init__(self): + """ + 初始化HttpClient实例。 + """ + self.client = httpx.Client() + + def get(self, url, params=None, headers=None): + """ + 发送GET请求。 + :param url: 完整的请求URL + :param params: 查询参数字典 + :param headers: 请求头字典 + :return: httpx.Response对象或None(如果发生错误) + """ + try: + response = self.client.get(url, params=params, headers=headers) + response.raise_for_status() + return response + except httpx.RequestError as e: + print(f"网络请求错误: {e}") + except httpx.HTTPStatusError as e: + print(f"服务器返回错误: {e.response.text}") + return None + + def post(self, url, json=None, data=None, params=None, headers=None): + """ + 发送POST请求。 + :param url: 完整的请求URL + :param json: JSON格式的数据 + :param data: 表单数据 + :param params: 查询参数字典 + :param headers: 请求头字典 + :return: httpx.Response对象或None(如果发生错误) + """ + try: + response = self.client.post(url, json=json, data=data, params=params, headers=headers) + response.raise_for_status() + return response + except httpx.RequestError as e: + print(f"网络请求错误: {e}") + except httpx.HTTPStatusError as e: + print(f"服务器返回错误: {e.response.text}") + return None + + def close(self): + self.client.close() diff --git a/log.py b/log.py new file mode 100644 index 0000000..f1f8c8d --- /dev/null +++ b/log.py @@ -0,0 +1,49 @@ +import time +import logging +from pathlib import Path +from file_handle import xj_file_handle + +dayt = time.strftime('%Y-%m-%d', time.localtime()) + + +def x_log(value: str, logtype: str = 'info'): + logging.basicConfig(filename=Path(__file__).resolve().parent / f'log/{dayt}.log', encoding='utf-8', level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(message)s') + if logtype == 'info': + logging.info(value) + elif logtype == 'debug': + logging.debug(value) + elif logtype == 'warning': + logging.warning(value) + elif logtype == 'error': + logging.error(value) + elif logtype == 'critical': + logging.critical(value) + + # logging.debug('这是一条debug信息') + # logging.info('这是一条info信息') + # logging.warning('这是一条warning信息') + # logging.error('这是一条error信息') + # logging.critical('这是一条critical信息') + + +class XJ_Log: + def __init__(self): + pass + + def w_log(self, conten: str, logtype: str = 'info'): + try: + log_data = xj_file_handle.read_filenames_with_pathlib('log') + except FileNotFoundError: + path_obj = Path('log') + path_obj.mkdir(parents=True, exist_ok=True) + log_data = [] + if log_data == []: + with open(Path(__file__).resolve().parent / f'log/{dayt}.log', 'w', encoding='utf-8'): + pass + x_log(conten, logtype) + return + x_log(conten, logtype) + + +if __name__ == '__main__': + XJ_Log().w_log('test') diff --git a/mail.py b/mail.py new file mode 100644 index 0000000..053a057 --- /dev/null +++ b/mail.py @@ -0,0 +1,53 @@ +from dotenv import load_dotenv +from log import XJ_Log +import os +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart + + +load_dotenv('.env.bot') + + +EMAIL = os.getenv('EMAIL') +SMTP_SERVER = os.getenv('SMTP_SERVER') +AC = os.getenv('AC') +SMTP_PORT = os.getenv('SMTP_PORT') +SYSOP_EMAIL = os.getenv('SYSOP_EMAIL') + + +def self_inspection(): + required_vars = ['EMAIL', 'SMTP_SERVER', 'AC', 'SMTP_PORT', 'SYSOP_EMAIL'] + for var in required_vars: + if os.getenv(var) is None: + print(f"缺少环境变量: {var}") + XJ_Log.w_log(f"缺少环境变量: {var}", 'error') + return False + return True + + +def mail(data): + sender_email = EMAIL + password = AC + message = MIMEMultipart() + message["From"] = sender_email + message["To"] = SYSOP_EMAIL + message["Subject"] = "运行报告" + message.attach(MIMEText(data, "plain")) + + try: + with smtplib.SMTP(SMTP_SERVER, int(SMTP_PORT)) as server: + server.starttls() + server.login(sender_email, password) + server.sendmail(sender_email, SYSOP_EMAIL, message.as_string()) + except Exception as e: + print(f"邮件发送失败: {e}") + XJ_Log.w_log(f"邮件发送失败: {e}", 'error') + + +def f_email(data): + if self_inspection(): + mail(data) + return True + else: + return False diff --git a/main.py b/main.py new file mode 100644 index 0000000..6dd7ad6 --- /dev/null +++ b/main.py @@ -0,0 +1,256 @@ +import re +import os +import time +from typing import List +import asyncio +from log import XJ_Log +from dotenv import load_dotenv +from cmd_color import CmdColor +from mail import f_email +from httpx_request import AsyncHttpClient +from file_handle import xj_file_handle + + +CColor = CmdColor() +client = AsyncHttpClient() +handle = xj_file_handle() + +pattern = r"{{Right topicons\|用户=([^}]*)}}" + + +def extract_and_remove_pattern(s: str): + matches = re.findall(pattern, s) + if matches: + new_str = re.sub(pattern, "", s) + filtered_matches = list(filter(lambda x: x != '', matches)) + return filtered_matches, new_str, True + else: + return [], s, False + + +def find_nearest_index(list1: List[str], list2: List[str]): + min_index_diff = len(list1) + nearest_element = None + + for element in list2: + try: + index = list1.index(element) + if index < min_index_diff: + min_index_diff = index + nearest_element = element + except ValueError: + pass + + return nearest_element + + +order_list = handle.xj_file_reading("data.json", "data") +form_order_list = handle.xj_file_reading("data.json", "form") + + +def Return_to(Arry: List[str], permission: List[str]): + P_Data = find_nearest_index(order_list, permission) + if P_Data is None: + return "DAP" + + """ + 用于修改执行错误 + """ + # if P_Data == "autoconfirmed": + # return "DA" + + T_Data = form_order_list.get(P_Data) + if T_Data in Arry: + return "DAP" + return "{{Right topicons|用户=" + T_Data + "}}" + + +def x_self_inspection(): + required_vars = ['USERNAME', 'PASSWORD', 'wiki_url'] + for var in required_vars: + if os.getenv(var) is None: + CColor.ccolor("BUG", f"缺少环境变量: {var}", "red") + XJ_Log.w_log(f"缺少环境变量: {var}", 'error') + return False + return True + + +async def main(): + win = 0 + daps = 0 + mistake = 0 + mistake_list = [] + if os.path.exists('.env.bot'): + try: + load_dotenv('.env.bot') + except Exception as e: + XJ_Log.w_log(f"加载文件时发生错误:{e}", "error") + return CColor.ccolor("BUG", f"加载文件时发生错误:{e}", "red") + else: + XJ_Log.w_log("无.env.bot文件", "error") + return CColor.ccolor("BUG", '无.env.bot文件', "red") + + if not x_self_inspection(): + return + + USERNAME = os.getenv("WIKI_USERNAME") + PASSWORD = os.getenv("WIKI_PASSWORD") + EXCLUDE = ["Zorua Fox", "New user message", "New user page"] + wiki_url = os.getenv("MEDIAWIKI_URL") + + """ + 登录 + """ + PARAMS_0 = { + 'action': "query", + 'meta': "tokens", + 'type': "login", + 'format': "json" + } + + LIB_TOKEN = await client.get(wiki_url, params=PARAMS_0) + LIB_TOKEN = LIB_TOKEN.json() + LOGIN_TOKEN = LIB_TOKEN['query']['tokens']['logintoken'] + + CColor.ccolor("LOGIN_TOKEN", LOGIN_TOKEN, "green") + + PARAMS_1 = { + 'action': "login", + 'lgname': USERNAME, + 'lgpassword': PASSWORD, + 'lgtoken': LOGIN_TOKEN, + 'format': "json" + } + headers = { + "Content-Type": "application/x-www-form-urlencoded" + } + BJ_TOKEN = await client.post(f"{wiki_url}", data=PARAMS_1, headers=headers) + + CColor.ccolor("LOGIN", "登录成功", "green") + + PARAMS_2 = { + "action": "query", + "meta": "tokens", + "format": "json" + } + + BJ_TOKEN = await client.get(wiki_url, params=PARAMS_2) + BJ_TOKEN = BJ_TOKEN.json() + CSRF_TOKEN = BJ_TOKEN['query']['tokens']['csrftoken'] + CColor.ccolor("CSRF_TOKEN", CSRF_TOKEN, "green") + + """ + 获取注册用户列表 + """ + user_lit = f'{wiki_url}?action=query&format=json&list=allusers&aulimit=5000' + + user_lit_data = await client.get(user_lit) + users_lit = user_lit_data.json()['query'].get('allusers', []) + + CColor.ccolor("USERS_LIST", "获取注册用户列表完成", "green") + + """ + 更改用户页 + """ + async def change(CSRF_TOKEN, NAME, QX_TEXT, DATA_TEXT): + PARAMS_3 = { + "action": "edit", + "title": f'User:{NAME}', + "token": CSRF_TOKEN, + "format": "json", + "bot": "true", + "summary": f'将您的用户页权限标识更改为{re.findall(pattern, QX_TEXT)}', + "text": f'{QX_TEXT}\n{DATA_TEXT}', + } + await client.post(wiki_url, data=PARAMS_3) + # DATA = R.json() + # print(DATA) + CColor.ccolor('green', f'{NAME}修改成功', 'green') + + """ + 获取用户权限 + """ + async def get_user_limits(name: str): + LIBRARY_URL = f'{wiki_url}?action=query&list=users&usprop=groups&format=json&ususers=User:{name}' + permission_list = await client.get(LIBRARY_URL) + return permission_list.json()['query']['users'][0].get('groups', []) + + """ + 获取页面信息 + """ + async def sser_page_information(name: str): + user_page = f'{wiki_url}?action=query&format=json&formatversion=2&prop=revisions&rvprop=content&titles=User:{name}' + response = await client.get(user_page) + page_data = response.json()["query"]["pages"][0].get("missing") + if page_data == "" or page_data: + CColor.ccolor("NO-PAGE", user_name, "red") + mistake_list.append(user_name) + return None, None, False + else: + example_text = response.json()["query"]["pages"][0]["revisions"][0]["content"] + matches, modified_str, found = extract_and_remove_pattern(example_text) + return matches, modified_str, found + + # users_lit = [ + # {'userid': 52, 'name': '顶呱呱的阿杰'}, + # {'userid': 1, 'name': 'Zorua Fox'}, + # {'userid': 2, 'name': 'Qiu'} + # ] + + for lit in users_lit: + user_name = lit.get("name", "") + if user_name == "" or user_name in EXCLUDE: + CColor.ccolor("DAP", user_name, "yellow") + daps += 1 + continue + permissionlist = await get_user_limits(user_name) + matches, modified_str, found = await sser_page_information(user_name) + + if permissionlist == []: + CColor.ccolor("DAP", user_name, "yellow") + daps += 1 + continue + + if found or matches == []: + template_data = Return_to(matches, permissionlist) + if template_data == "DAP": + CColor.ccolor("DAP", user_name, "yellow") + daps += 1 + continue + + """ + 用于修改执行错误 + """ + # if template_data == "DA": + # text = f'{modified_str}' + # print(user_name, "DA") + # await change(CSRF_TOKEN, user_name, text) + # continue + + win += 1 + await change(CSRF_TOKEN, user_name, template_data, modified_str) + else: + mistake += 1 + continue + + await client.close() + return daps, mistake, mistake_list, win + +if __name__ == '__main__': + t = time.perf_counter() + + a, b, c, f = asyncio.run(main()) + + th = f'{time.perf_counter() - t:.8f}s' + + CColor.h_time("耗时", th) + + email_data = f'{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n本次运行耗时{th}\n 成功修改{f}个用户\n 跳过{a}个用户\n 错误用户数: {b}\n 错误用户列表:\n{c}' + XJ_Log.w_log(email_data, 'info') + d = f_email(email_data) + if d: + print("邮件发送成功") + XJ_Log.w_log("邮件发送成功", 'info') + else: + print("未配置发送邮件配置或配置错误") + XJ_Log.w_log("未配置发送邮件配置或配置错误", 'info')