-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
457 additions
and
57 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
from . import info, manage, search, help, command # noqa | ||
from . import info, manage, search, help, command, statistics # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
from datetime import datetime, timedelta | ||
from typing import Any, Optional, Union | ||
|
||
from dateutil.relativedelta import relativedelta | ||
from nonebot.matcher import Matcher | ||
from nonebot_plugin_alconna import ( | ||
Alconna, | ||
AlconnaQuery, | ||
Args, | ||
Option, | ||
Query, | ||
UniMessage, | ||
on_alconna, | ||
store_true, | ||
) | ||
from nonebot_plugin_session import EventSession, SessionIdType | ||
|
||
from ..plot import plot_duration_counts, plot_key_and_duration_counts | ||
from ..recorder import get_meme_generation_records, get_meme_generation_times | ||
from ..utils import add_timezone | ||
from .utils import find_meme | ||
|
||
statistics_matcher = on_alconna( | ||
Alconna( | ||
"表情调用统计", | ||
Args["meme_name?", str], | ||
Option("-g|--global", default=False, action=store_true, help_text="全局统计"), | ||
Option("--my", default=False, action=store_true, help_text="我的"), | ||
Option( | ||
"-t|--type", | ||
Args["type", ["day", "week", "month", "year", "24h", "7d", "30d", "1y"]], | ||
help_text="统计类型", | ||
), | ||
), | ||
aliases={"表情使用统计"}, | ||
block=True, | ||
priority=11, | ||
use_cmd_start=True, | ||
) | ||
|
||
|
||
def wrapper( | ||
slot: Union[int, str], content: Optional[str], context: dict[str, Any] | ||
) -> str: | ||
if slot == "my" and content: | ||
return "--my" | ||
elif slot == "global" and content: | ||
return "--global" | ||
elif slot == "type" and content: | ||
if content in ["日", "24小时", "1天"]: | ||
return "--type 24h" | ||
elif content in ["本日", "今日"]: | ||
return "--type day" | ||
elif content in ["周", "一周", "7天"]: | ||
return "--type 7d" | ||
elif content in ["本周"]: | ||
return "--type week" | ||
elif content in ["月", "30天"]: | ||
return "--type 30d" | ||
elif content in ["本月", "月度"]: | ||
return "--type month" | ||
elif content in ["年", "一年"]: | ||
return "--type 1y" | ||
elif content in ["本年", "年度"]: | ||
return "--type year" | ||
return "" | ||
|
||
|
||
pattern_my = r"(?P<my>我的)" | ||
pattern_type = r"(?P<type>日|24小时|1天|本日|今日|周|一周|7天|本周|月|30天|本月|月度|年|一年|本年|年度)" # noqa E501 | ||
pattern_global = r"(?P<global>全局)" | ||
pattern_cmd = r"表情(?:调用|使用)统计" | ||
|
||
statistics_matcher.shortcut( | ||
rf"{pattern_my}{pattern_cmd}", | ||
prefix=True, | ||
wrapper=wrapper, | ||
arguments=["{my}"], | ||
).shortcut( | ||
rf"{pattern_global}{pattern_cmd}", | ||
prefix=True, | ||
wrapper=wrapper, | ||
arguments=["{global}"], | ||
).shortcut( | ||
rf"{pattern_my}{pattern_global}{pattern_cmd}", | ||
prefix=True, | ||
wrapper=wrapper, | ||
arguments=["{my}", "{global}"], | ||
).shortcut( | ||
rf"{pattern_my}?{pattern_global}?{pattern_type}{pattern_cmd}", | ||
prefix=True, | ||
wrapper=wrapper, | ||
arguments=["{my}", "{global}", "{type}"], | ||
) | ||
|
||
|
||
@statistics_matcher.handle() | ||
async def _( | ||
matcher: Matcher, | ||
session: EventSession, | ||
meme_name: Optional[str] = None, | ||
query_global: Query[bool] = AlconnaQuery("global.value", False), | ||
query_my: Query[bool] = AlconnaQuery("my.value", False), | ||
query_type: Query[str] = AlconnaQuery("type", "24h"), | ||
): | ||
meme = await find_meme(matcher, meme_name) if meme_name else None | ||
|
||
is_my = query_my.result | ||
is_global = query_global.result | ||
type = query_type.result | ||
|
||
if is_my and is_global: | ||
id_type = SessionIdType.USER | ||
elif is_my: | ||
id_type = SessionIdType.GROUP_USER | ||
elif is_global: | ||
id_type = SessionIdType.GLOBAL | ||
else: | ||
id_type = SessionIdType.GROUP | ||
|
||
now = datetime.now().astimezone() | ||
if type == "24h": | ||
start = now - timedelta(days=1) | ||
td = timedelta(hours=1) | ||
fmt = "%H:%M" | ||
humanized = "24小时" | ||
elif type == "day": | ||
start = now.replace(hour=0, minute=0, second=0, microsecond=0) | ||
td = timedelta(hours=1) | ||
fmt = "%H:%M" | ||
humanized = "本日" | ||
elif type == "7d": | ||
start = now - timedelta(days=7) | ||
td = timedelta(days=1) | ||
fmt = "%m/%d" | ||
humanized = "7天" | ||
elif type == "week": | ||
start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta( | ||
days=now.weekday() | ||
) | ||
td = timedelta(days=1) | ||
fmt = "%a" | ||
humanized = "本周" | ||
elif type == "30d": | ||
start = now - timedelta(days=30) | ||
td = timedelta(days=1) | ||
fmt = "%m/%d" | ||
humanized = "30天" | ||
elif type == "month": | ||
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | ||
td = timedelta(days=1) | ||
fmt = "%m/%d" | ||
humanized = "本月" | ||
elif type == "1y": | ||
start = now - relativedelta(years=1) | ||
td = relativedelta(months=1) | ||
fmt = "%y/%m" | ||
humanized = "一年" | ||
else: | ||
start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) | ||
td = relativedelta(months=1) | ||
fmt = "%b" | ||
humanized = "本年" | ||
|
||
if meme: | ||
meme_times = await get_meme_generation_times( | ||
session, id_type, meme_key=meme.key, time_start=start | ||
) | ||
meme_keys = [meme.key] * len(meme_times) | ||
else: | ||
meme_records = await get_meme_generation_records( | ||
session, id_type, time_start=start | ||
) | ||
meme_times = [record.time for record in meme_records] | ||
meme_keys = [record.meme_key for record in meme_records] | ||
|
||
if not meme_times: | ||
await matcher.finish("暂时没有表情调用记录") | ||
|
||
meme_times = [add_timezone(time) for time in meme_times] | ||
meme_times.sort() | ||
|
||
def fmt_time(time: datetime) -> str: | ||
if type in ["24h", "7d", "30d", "1y"]: | ||
return (time + td).strftime(fmt) | ||
return time.strftime(fmt) | ||
|
||
duration_counts: dict[str, int] = {} | ||
stop = start + td | ||
count = 0 | ||
key = fmt_time(start) | ||
for time in meme_times: | ||
while time >= stop: | ||
duration_counts[key] = count | ||
key = fmt_time(stop) | ||
stop += td | ||
count = 0 | ||
count += 1 | ||
duration_counts[key] = count | ||
while stop <= now: | ||
key = fmt_time(stop) | ||
stop += td | ||
duration_counts[key] = 0 | ||
|
||
key_counts: dict[str, int] = {} | ||
for key in meme_keys: | ||
key_counts[key] = key_counts.get(key, 0) + 1 | ||
|
||
if meme: | ||
title = ( | ||
f"表情“{meme.key}”{humanized}调用统计" | ||
f"(总调用次数为 {key_counts.get(meme.key, 0)})" | ||
) | ||
output = await plot_duration_counts(duration_counts, title) | ||
else: | ||
title = f"{humanized}表情调用统计" | ||
output = await plot_key_and_duration_counts(key_counts, duration_counts, title) | ||
await UniMessage.image(raw=output).send() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from io import BytesIO | ||
|
||
import matplotlib | ||
from matplotlib import pyplot as plt | ||
from matplotlib.axes import Axes | ||
from matplotlib.font_manager import fontManager | ||
from matplotlib.ticker import MaxNLocator | ||
from nonebot.utils import run_sync | ||
|
||
matplotlib.use("agg") | ||
fallback_fonts = [ | ||
"PingFang SC", | ||
"Hiragino Sans GB", | ||
"Microsoft YaHei", | ||
"Source Han Sans SC", | ||
"Noto Sans SC", | ||
"Noto Sans CJK SC", | ||
"WenQuanYi Micro Hei", | ||
] | ||
for fontfamily in fallback_fonts.copy(): | ||
try: | ||
fontManager.findfont(fontfamily, fallback_to_default=False) | ||
except ValueError: | ||
fallback_fonts.remove(fontfamily) | ||
matplotlib.rcParams["font.family"] = fallback_fonts | ||
|
||
|
||
@run_sync | ||
def plot_key_and_duration_counts( | ||
key_counts: dict[str, int], duration_counts: dict[str, int], title: str | ||
) -> BytesIO: | ||
up_x = list(key_counts.keys()) | ||
up_y = list(key_counts.values()) | ||
low_x = list(duration_counts.keys()) | ||
low_y = list(duration_counts.values()) | ||
up_height = len(up_x) * 0.3 | ||
low_height = 3 | ||
fig_width = 8 | ||
fig, axs = plt.subplots( | ||
nrows=2, | ||
figsize=(fig_width, up_height + low_height), | ||
height_ratios=[up_height, low_height], | ||
) | ||
up: Axes = axs[0] | ||
up.barh(up_x, up_y, height=0.6) | ||
up.xaxis.set_major_locator(MaxNLocator(integer=True)) | ||
low: Axes = axs[1] | ||
low.plot(low_x, low_y, marker="o") | ||
if len(low_x) > 24: | ||
low.set_xticks(low_x[::3]) | ||
elif len(low_x) > 12: | ||
low.set_xticks(low_x[::2]) | ||
low.yaxis.set_major_locator(MaxNLocator(integer=True)) | ||
fig.suptitle(title) | ||
fig.tight_layout() | ||
output = BytesIO() | ||
fig.savefig(output) | ||
return output | ||
|
||
|
||
@run_sync | ||
def plot_duration_counts(duration_counts: dict[str, int], title: str) -> BytesIO: | ||
x = list(duration_counts.keys()) | ||
y = list(duration_counts.values()) | ||
fig, ax = plt.subplots(figsize=(6, 4)) | ||
ax.plot(x, y, marker="o") | ||
if len(x) > 24: | ||
ax.set_xticks(x[::3]) | ||
elif len(x) > 12: | ||
ax.set_xticks(x[::2]) | ||
ax.yaxis.set_major_locator(MaxNLocator(integer=True)) | ||
fig.suptitle(title) | ||
fig.tight_layout() | ||
output = BytesIO() | ||
fig.savefig(output) | ||
return output |
Oops, something went wrong.