forked from LibraHp/GetQzonehistory
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetch_all_message.py
213 lines (184 loc) · 7.51 KB
/
fetch_all_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import json
import math
import os
import re
import sys
import time
import requests
from util import LoginUtil
WORKDIR = "./resource/fetch-all/"
MESSAGE_SAMPLE = 'msg-one.json'
MESSAGE_ALL = 'msg-all.json'
cookies = None
# 获取所有可见的未删除的说说+高清图片(包含2014年之前)
def get_visible_msg_list():
global cookies
if cookies is None:
cookies = LoginUtil.cookie()
# 1. 获取说说总条数
try:
msgSample = read_txt_file(MESSAGE_SAMPLE)
except FileNotFoundError as e:
# 样本缓存未找到,开始请求获取样本
qqResponse = get_msg_list(1)
# 创建缓存文件并写入
write_txt_file(MESSAGE_SAMPLE, qqResponse)
msgSample = read_txt_file(MESSAGE_SAMPLE)
try:
json_dict = json.loads(msgSample)
totalCount = json_dict['total']
print(f'你的未删除说说总条数{totalCount}')
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
sys.exit(1)
# 2. 获取所有说说数据
try:
msgAll = read_txt_file(MESSAGE_ALL)
except FileNotFoundError as e:
# 缓存未找到,准备分页获取所有未删除说说"
# 一页20条
defaultPageSize = 30
# 总页数
totalPageNum = math.ceil(totalCount / defaultPageSize)
# 用于存储所有页的数据
allPageData = []
print(f"一共{totalPageNum}页")
for currentPageNum in range(0, totalPageNum):
# 数据偏移量
pos = currentPageNum * defaultPageSize
print(
f"一页{defaultPageSize}条, 获取第{currentPageNum + 1}页")
qqResponse = get_msg_list(defaultPageSize, pos)
currentPageData = json.loads(qqResponse)["msglist"]
allPageData.extend(currentPageData)
msgAll = json.dumps({"msglist": allPageData}, ensure_ascii=False, indent=2)
write_txt_file(MESSAGE_ALL, msgAll)
try:
json_dict = json.loads(msgAll)
msgList = json_dict['msglist']
print(f'已获取到数据的说说总条数{len(msgList)}')
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
sys.exit(1)
# 3. 解析原始JSON写成Markdown
markdown_content = ''
for item in msgList:
myWord = item['content'] if item['content'] else ""
myCurrentQQName = item['name']
myCreateTime = format_timestamp(item['created_time'])
myCurrentSourceName = '\n来自 ' + item['source_name'] if item['source_name'] else ""
# 如果有图片
markdown_pictures = ""
if 'pic' in item:
for index, myPic in enumerate(item['pic']):
myPicUrl = myPic['url1']
myPicFileName = f"{item['tid']}{index}.jpeg"
get_image(myPicUrl, myPicFileName)
markdown_pictures += f"![{myPicFileName}](./{myPicFileName})"
markdown_content += f"## {myCurrentQQName} {myCreateTime} \n{myWord} {markdown_pictures} \n{myCurrentSourceName}"
# 有转发的内容
if 'rt_tid' in item:
rt_tid = item['rt_tid']
rtContent = item['rt_con']['content']
rtQQName = item['rt_uinname']
rt_uin = item['rt_uin']
markdown_content += f"\n> {rtQQName} - {rt_uin} : {rtContent}"
# 有人评论
if 'commentlist' in item:
markdown_content += f"\n💬 **{len(item['commentlist'])}条评论回复**\n"
for index, commentToMe in enumerate(item['commentlist']):
commentContent = commentToMe['content']
commentCreateTime = commentToMe['createTime2']
commentQQName = commentToMe['name']
commentQQNumber = commentToMe['uin']
markdown_content += f"- {commentQQName}({commentQQNumber}) : {commentContent} - {commentCreateTime}\n"
# append write
markdown_content += "\n\n"
# write markdown to file
write_txt_file("所有可见说说.md", markdown_content)
def get_msg_list(pageSize, offset=0):
url = 'https://user.qzone.qq.com/proxy/domain/taotao.qq.com/cgi-bin/emotion_cgi_msglist_v6'
g_tk = LoginUtil.bkn(cookies.get('p_skey'))
qqNumber = re.sub(r'o0*', '', cookies.get('uin'))
skey = cookies.get('skey')
p_uin = cookies.get('p_uin')
pt4_token = cookies.get('pt4_token')
p_skey = cookies.get('p_skey')
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'cookie': f'uin={p_uin};skey={skey};p_uin={p_uin};pt4_token={pt4_token};p_skey={p_skey}',
'priority': 'u=1, i',
'referer': f'https://user.qzone.qq.com/{qqNumber}/main',
'sec-ch-ua': '"Not;A=Brand";v="24", "Chromium";v="128"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
}
params = {
'uin': f'{qqNumber}',
'ftype': '0',
'sort': '0',
'pos': f'{offset}',
'num': f'{pageSize}',
'replynum': '100',
'g_tk': f'{g_tk}',
'callback': '_preloadCallback',
'code_version': '1',
'format': 'jsonp',
'need_private_comment': '1'
}
try:
response = requests.get(url, headers=headers, params=params)
except Exception as e:
print(e)
rawResponse = response.text
# 使用正则表达式去掉 _preloadCallback(),并提取其中的 JSON 数据
raw_txt = re.sub(r'^_preloadCallback\((.*)\);?$', r'\1', rawResponse, flags=re.S)
# 再转一次是为了去掉响应值本身自带的转义符http:\/\/
json_dict = json.loads(raw_txt)
if json_dict['code'] != 0:
print(f"错误 {json_dict['message']}")
sys.exit(1)
return json.dumps(json_dict, indent=2, ensure_ascii=False)
def write_txt_file(file_name, data):
if not os.path.exists(WORKDIR):
os.makedirs(WORKDIR)
base_path_file_name = os.path.join(WORKDIR, file_name)
with open(base_path_file_name, 'w', encoding='utf-8') as file:
file.write(data)
def read_txt_file(file_name):
base_path_file_name = os.path.join(WORKDIR, file_name)
if os.path.exists(base_path_file_name):
with open(base_path_file_name, 'r', encoding='utf-8') as file:
return file.read()
else:
raise FileNotFoundError(f"文件 {base_path_file_name} 不存在")
def format_timestamp(timestamp):
time_struct = time.localtime(timestamp)
formatted_time = time.strftime("%Y年%m月%d日 %H:%M:%S", time_struct)
return formatted_time
def get_image(url, img_name):
headers = {
'sec-ch-ua': '"Not;A=Brand";v="24", "Chromium";v="128"',
'Referer': 'https://user.qzone.qq.com/',
'sec-ch-ua-mobile': '?0',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'sec-ch-ua-platform': '"Linux"',
}
# 发起GET请求
response = requests.get(url, headers=headers)
# 检查请求是否成功
if response.status_code == 200:
# 保存图片到本地
file_path = os.path.join(WORKDIR, img_name)
with open(file_path, 'wb') as file:
file.write(response.content)
print('图片下载成功')
else:
print(f'请求失败,状态码:{response.status_code}')
if __name__ == '__main__':
get_visible_msg_list()