Skip to content

Commit

Permalink
更新
Browse files Browse the repository at this point in the history
  • Loading branch information
ajdgg committed Jun 30, 2024
1 parent 7d19f9c commit 87a5240
Show file tree
Hide file tree
Showing 10 changed files with 649 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[flake8]
ignore = F401 E402
max-line-length = 1000
exclude = tests/*
31 changes: 31 additions & 0 deletions .github/workflows/lint-and-format.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Lint Code

on:
push:
branches:
- main

jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.x

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install black flake8
- name: Format with black
run: black .

- name: Lint with flake8
run: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
continue-on-error: false
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/dist
/__pycache__
/cs
bot.py
cs.py
/log
.env.bot
25 changes: 25 additions & 0 deletions cmd_color.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# ! /usr/bin/env python
# coding=utf-8
import time
from colorama import init, Fore, Back, Style

init(autoreset=True)


class CmdColor:
def __init__(self):
pass

def ccolor(self, state: str, text: str, color: str):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
if color == 'red':
print(Fore.RED + f'[{state}]' + Fore.RESET + " " + Fore.RED + t + Fore.RESET + " " + Fore.RED + text + Fore.RESET)
elif color == 'green':
print(Fore.GREEN + f'[{state}]' + Fore.RESET + " " + Fore.GREEN + t + Fore.RESET + " " + Fore.GREEN + text + Fore.RESET)
elif color == 'blue':
print(Fore.BLUE + f'[{state}]' + Fore.RESET + " " + Fore.BLUE + t + Fore.RESET + " " + Fore.BLUE + text + Fore.RESET)
elif color == 'yellow':
print(Fore.YELLOW + f'[{state}]' + Fore.RESET + " " + Fore.YELLOW + t + Fore.RESET + " " + Fore.YELLOW + text + Fore.RESET)

def h_time(self, state, text):
print(Fore.BLUE + f'[{state}]' + Fore.RESET + " " + Fore.BLUE + text + Fore.RESET)
42 changes: 42 additions & 0 deletions data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"1": "责,理,机,管,监,界,巡,模,优,发,确",
"data": [
"responsibleoperator",
"steward",
"bot",
"sysop",
"templateeditor",
"interface-admin",
"patroller",
"suppress",
"autopatrolled",
"massmessage-sender",
"confirmed"
],
"to": {
"责": "responsibleoperator",
"理": "steward",
"机": "bot",
"管": "sysop",
"监": "templateeditor",
"界": "interface-admin",
"巡": "patroller",
"模": "suppress",
"优": "autopatrolled",
"发": "massmessage-sender",
"确": "confirmed"
},
"form": {
"responsibleoperator": "",
"steward": "",
"bot": "",
"sysop": "",
"templateeditor": "",
"interface-admin": "",
"patroller": "",
"suppress": "",
"autopatrolled": "",
"massmessage-sender": "",
"confirmed": ""
}
}
85 changes: 85 additions & 0 deletions file_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import json
from pathlib import Path


"""
xjiebot 文件操作
懒的写直接复制
获取自身路径下的文件
"""


def file_path(file):
return Path(__file__).resolve().parent / file


class xj_file_handle:
def __init__(self):
pass

def xj_file_reading(self, file_name: str, file_content: str = None):
json_file_path_reading = file_path(file_name)
try:
with json_file_path_reading.open("r", encoding="utf-8") as json_file:
loaded_data = json.load(json_file)
if file_content is None:
return loaded_data
return loaded_data.get(file_content, None)
except FileNotFoundError:
print(f"File not found: {file_name}")
except json.JSONDecodeError:
print(f"Error decoding JSON from the file: {file_name}")
except Exception as e:
print(f"An error occurred: {e}")

def xj_file_change(self, file_name: str, file_key: str, file_content: str):
json_file_path_change = file_path(file_name)
try:
with json_file_path_change.open("r", encoding="utf-8") as json_file:
loaded_data = json.load(json_file)
except FileNotFoundError:
print(f"文件 {file_name} 未找到。")
return
except json.JSONDecodeError:
print(f"{file_name} 文件内容不是有效的JSON格式。")
return
if file_key not in loaded_data:
print(f"键 '{file_key}' 在文件中不存在。")
return
loaded_data[file_key] = file_content
try:
with json_file_path_change.open("w", encoding="utf-8") as json_file:
json.dump(loaded_data, json_file, indent=4)
except IOError as e:
print(f"写入文件时发生错误: {e}")

def get_keys_ending_with_key(self, json_data, key_suffix='_KEY'):
try:
json_file_path_reading = file_path(json_data)

with open(json_file_path_reading, "r", encoding="utf-8") as json_file:
loaded_data = json.load(json_file)

except FileNotFoundError:
print(f"Error: The file {json_file_path_reading} was not found.")
return None

except json.JSONDecodeError:
print(f"Error: Failed to decode JSON from file {json_file_path_reading}.")
return None

except Exception as e:
print(f"An unexpected error occurred: {e}")
return None

result = {}
for key in loaded_data.keys():
if key.endswith(key_suffix) and loaded_data[key]:
result[key] = loaded_data[key]

return result

def read_filenames_with_pathlib(directory):
path_obj = Path(directory)
filenames = [file.name for file in path_obj.iterdir() if file.is_file()]
return filenames
97 changes: 97 additions & 0 deletions httpx_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import httpx
from typing import Dict, Any

"""
xjbot 请求框架
"""


class AsyncHttpClient:
def __init__(self, max_connections: int = 20):
self.client = httpx.AsyncClient(limits=httpx.Limits(max_connections=max_connections))

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()

async def get(self, url: str, params: Dict[str, Any] = None, headers: Dict[str, str] = None) -> httpx.Response:
"""
异步发送GET请求。
"""
try:
response = await self.client.get(url, params=params, headers=headers)
response.raise_for_status()
return response
except httpx.TimeoutException as e:
raise Exception("请求超时") from e
except httpx.RequestError as e:
raise Exception(f"网络请求错误: {e}") from e
except httpx.HTTPStatusError as e:
raise Exception(f"服务器返回错误: {e.response.status_code} - {e.response.text}") from e

async def post(self, url: str, json: Dict[str, Any] = None, data: Dict[str, Any] = None, params: Dict[str, Any] = None, headers: Dict[str, str] = None) -> httpx.Response:
"""
异步发送POST请求。
"""
try:
response = await self.client.post(url, json=json, data=data, params=params, headers=headers)
response.raise_for_status()
return response
except httpx.TimeoutException as e:
raise Exception("请求超时") from e
except (httpx.RequestError, httpx.HTTPStatusError) as e:
raise Exception(f"请求错误: {str(e)}") from e

async def close(self):
await self.client.aclose()


class HttpClient:
def __init__(self):
"""
初始化HttpClient实例。
"""
self.client = httpx.Client()

def get(self, url, params=None, headers=None):
"""
发送GET请求。
:param url: 完整的请求URL
:param params: 查询参数字典
:param headers: 请求头字典
:return: httpx.Response对象或None(如果发生错误)
"""
try:
response = self.client.get(url, params=params, headers=headers)
response.raise_for_status()
return response
except httpx.RequestError as e:
print(f"网络请求错误: {e}")
except httpx.HTTPStatusError as e:
print(f"服务器返回错误: {e.response.text}")
return None

def post(self, url, json=None, data=None, params=None, headers=None):
"""
发送POST请求。
:param url: 完整的请求URL
:param json: JSON格式的数据
:param data: 表单数据
:param params: 查询参数字典
:param headers: 请求头字典
:return: httpx.Response对象或None(如果发生错误)
"""
try:
response = self.client.post(url, json=json, data=data, params=params, headers=headers)
response.raise_for_status()
return response
except httpx.RequestError as e:
print(f"网络请求错误: {e}")
except httpx.HTTPStatusError as e:
print(f"服务器返回错误: {e.response.text}")
return None

def close(self):
self.client.close()
49 changes: 49 additions & 0 deletions log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
import logging
from pathlib import Path
from file_handle import xj_file_handle

dayt = time.strftime('%Y-%m-%d', time.localtime())


def x_log(value: str, logtype: str = 'info'):
logging.basicConfig(filename=Path(__file__).resolve().parent / f'log/{dayt}.log', encoding='utf-8', level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(message)s')
if logtype == 'info':
logging.info(value)
elif logtype == 'debug':
logging.debug(value)
elif logtype == 'warning':
logging.warning(value)
elif logtype == 'error':
logging.error(value)
elif logtype == 'critical':
logging.critical(value)

# logging.debug('这是一条debug信息')
# logging.info('这是一条info信息')
# logging.warning('这是一条warning信息')
# logging.error('这是一条error信息')
# logging.critical('这是一条critical信息')


class XJ_Log:
def __init__(self):
pass

def w_log(self, conten: str, logtype: str = 'info'):
try:
log_data = xj_file_handle.read_filenames_with_pathlib('log')
except FileNotFoundError:
path_obj = Path('log')
path_obj.mkdir(parents=True, exist_ok=True)
log_data = []
if log_data == []:
with open(Path(__file__).resolve().parent / f'log/{dayt}.log', 'w', encoding='utf-8'):
pass
x_log(conten, logtype)
return
x_log(conten, logtype)


if __name__ == '__main__':
XJ_Log().w_log('test')
Loading

0 comments on commit 87a5240

Please sign in to comment.