diff --git a/arknights_mower/solvers/base_schedule.py b/arknights_mower/solvers/base_schedule.py
index 00b18787a..bddd2b367 100644
--- a/arknights_mower/solvers/base_schedule.py
+++ b/arknights_mower/solvers/base_schedule.py
@@ -2259,8 +2259,8 @@ def maa_plan_solver(self, tasks='All', one_time=False):
time.sleep(remaining_time)
def skland_plan_solover(self):
- skland = SKLand(self.skland_config['skland_info'])
- skland.attendance()
+ return SKLand(self.skland_config['skland_info']).start()
+
def recruit_plan_solver(self):
if ('last_execution' not in self.recruit_config
diff --git a/arknights_mower/solvers/report.py b/arknights_mower/solvers/report.py
index 9ac9abaeb..eca3dbbb5 100644
--- a/arknights_mower/solvers/report.py
+++ b/arknights_mower/solvers/report.py
@@ -143,6 +143,8 @@ def has_record(self):
return False
except PermissionError:
logger.info("report.csv正在被占用")
+ except pd.errors.EmptyDataError:
+ return False
def get_report_data():
diff --git a/arknights_mower/solvers/skland.py b/arknights_mower/solvers/skland.py
index 52be2b9fb..ce13fc6cb 100644
--- a/arknights_mower/solvers/skland.py
+++ b/arknights_mower/solvers/skland.py
@@ -1,207 +1,232 @@
+import hashlib
+import hmac
import json
import csv
import datetime
+import os
+import time
+from urllib import parse
+
+import pandas as pd
import requests
from arknights_mower.utils.log import logger
from arknights_mower.utils.path import get_path
+app_code = '4ca99fa6b56cc2ba'
+
+# 签到url
+sign_url = "https://zonai.skland.com/api/v1/game/attendance"
+# 绑定的角色url
+binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
+# 验证码url
+login_code_url = "https://as.hypergryph.com/general/v1/send_phone_code"
+# 验证码登录
+token_phone_code_url = "https://as.hypergryph.com/user/auth/v2/token_by_phone_code"
+# 密码登录
+token_password_url = "https://as.hypergryph.com/user/auth/v1/token_by_phone_password"
+# 使用token获得认证代码
+grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
+# 使用认证代码获得cred
+cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
+
class SKLand:
+
def __init__(self, skland_info):
- self.account = []
self.record_path = get_path("@app/tmp/skland.csv")
- try:
- for item in skland_info:
- if item["isCheck"] is False:
- continue
- if item['account'] != "" and item['password'] != "":
- self.account.append({
- "name": "",
- "phone": item['account'],
- "password": item['password'],
- "uid": "",
- "cred": ""
- })
- except:
- raise RuntimeError("森空岛信息初始化失败")
-
- self.url = {
- "get_cred": "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code",
- "token_by_phone_password": "https://as.hypergryph.com/user/auth/v1/token_by_phone_password",
- "check_cred_url": "https://zonai.skland.com/api/v1/user/check",
- "OAuth2": "https://as.hypergryph.com/user/oauth2/v2/grant",
- "get_cred_url": "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code",
- "attendance": "https://zonai.skland.com/api/v1/game/attendance",
- "get_binding_player": "https://zonai.skland.com/api/v1/game/player/binding"
+ self.account_list = []
+ for item in skland_info:
+ self.account_list.append({
+ 'account': item['account'],
+ 'isCheck': item['isCheck'],
+ 'password': item['password'],
+ })
+
+ self.header = {
+ 'cred': '',
+ 'User-Agent': 'Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0',
+ 'Accept-Encoding': 'gzip',
+ 'Connection': 'close'
}
-
- self.request_header = {
- "user-agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 33; ) Okhttp/4.11.0",
- "cred": '',
- "vName": "1.0.1",
- "vCode": "100001014",
+ self.header_login = {
+ 'User-Agent': 'Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0',
'Accept-Encoding': 'gzip',
- 'Connection': 'close',
- "dId": "de9759a5afaa634f",
- "platform": "1"
+ 'Connection': 'close'
}
- self.get_award = {}
-
- def respone_to_json(self, responese):
- try:
- responese_json = json.loads(responese.text)
- return responese_json
- except:
- raise RuntimeError("返回信息获取失败")
-
- """登录获取cred"""
- def sign_by_phone(self, account):
- data = {"phone": account['phone'], "password": account['password']}
- response = requests.post(headers=self.request_header, url=self.url.get("token_by_phone_password"), data=data)
-
- response_json = self.respone_to_json(response)
- if response_json.get("status") == 0:
- return response_json.get("data").get("token")
- else:
- raise RuntimeError("token获取失败")
-
- def check_cred(self, account):
- if account['cred'] == "":
- return False
- headers = self.request_header
- headers["cred"] = account['cred']
- response = requests.get(headers=headers, url=self.url.get("check_cred_url"))
- response_json = self.respone_to_json(response)
-
- if response_json.get("code") == 0:
- logger.debug("验证cred未过期")
- else:
- raise RuntimeError("验证cred过期")
-
- def get_binding_player(self, account):
- if account['cred'] == "":
- raise RuntimeError("获取绑定信息失败")
- headers = self.request_header
- headers["cred"] = account['cred']
-
- response = requests.get(headers=headers, url=self.url.get("get_binding_player"))
- response_json = self.respone_to_json(response)
-
- if response_json.get("code") == 0:
- logger.info("获取玩家信息成功")
- player_info = response_json.get('data').get('list')[0].get('bindingList')[0]
- account['name'] = player_info.get('nickName')
- account['uid'] = player_info.get('uid')
- else:
- raise RuntimeError("验证cred过期")
-
- def get_OAuth2_token(self, account):
- token = self.sign_by_phone(account)
- if token == "":
- raise RuntimeError("token获取失败")
-
- data = {
- "token": token,
- "appCode": "4ca99fa6b56cc2ba",
- "type": 0
+ self.reward = []
+ # 签名请求头一定要这个顺序,否则失败
+ # timestamp是必填的,其它三个随便填,不要为none即可
+ self.header_for_sign = {
+ 'platform': '',
+ 'timestamp': '',
+ 'dId': '',
+ 'vName': ''
}
- response = requests.post(headers=self.request_header, url=self.url.get("OAuth2"), data=data)
- response_json = self.respone_to_json(response)
- if response_json.get("status") == 0:
- logger.debug("OAuth2授权代码获取成功")
- return response_json.get("data").get("code")
-
- return ""
-
- def get_cred(self, account):
- code = self.get_OAuth2_token(account)
-
- if code == "":
- raise RuntimeError("OAuth2授权代码获取失败")
- data = {
- "kind": 1,
- "code": code
- }
- response = requests.post(headers=self.request_header, url=self.url.get("get_cred_url"), data=data)
- response_json = self.respone_to_json(response)
-
- if response_json.get("code") == 0:
- account['cred'] = response_json.get("data").get("cred")
-
- def attendance(self):
- for account in self.account:
- if self.get_record(account['phone']):
- logger.info(f"{account['phone']} 今日已经签到过了")
- continue
+ self.sign_token = ''
- if self.check_cred(account) is False:
- self.get_cred(account)
-
- self.get_binding_player(account)
- logger.debug(account)
- data = {
- "uid": account["uid"],
- "gameId": 1
- }
- headers = self.request_header
- headers["cred"] = account['cred']
- response = requests.post(headers=headers, url=self.url.get("attendance"), data=data)
-
- response_json = self.respone_to_json(response)
- award = []
- if response_json["code"] == 0:
-
- for item in response_json.get("data").get("awards"):
- temp_str = str(item["count"]) + str(item["resource"]["name"])
- award.append(temp_str)
- elif response_json["code"] == 10001 and response_json["message"] == "请勿重复签到!":
- logger.info(f"{account['name']} 请勿重复签到!")
- award.append("请勿重复签到!")
- self.get_award[account['phone']] = award
- self.record_attendance(account['phone'], self.get_award[account['phone']])
- if self.get_award:
- logger.info(self.get_award)
- return self.get_award
-
- def record_attendance(self, account, data):
- data_row = {"account": account,
- "data": data,
- "date": datetime.date.today().strftime("%Y-%m-%d")
+ def start(self):
+ for item in self.account_list:
+ if item['isCheck']:
+ if self.has_record(item['account']):
+ continue
+ self.save_param(self.get_cred_by_token(self.log(item)))
+ for i in self.get_binding_list():
+ body = {
+ 'gameId': 1,
+ 'uid': i.get('uid')
}
- file = open(self.record_path, 'a+', newline='')
- csv_writer = csv.DictWriter(file, fieldnames=list(data_row.keys()))
- csv_writer.writerow(data_row)
- file.close()
+ # list_awards(1, i.get('uid'))
+ resp = requests.post(sign_url, headers=self.get_sign_header(sign_url, 'post', body, self.header),
+ json=body).json()
+ if resp['code'] != 0:
+ self.reward.append(
+ {
+ "nickName": item['account'],
+ "reward": resp.get("message")
+ }
+ )
+ logger.info(
+ f'{i.get("nickName")}:{resp.get("message")}')
+ continue
+ awards = resp['data']['awards']
+ for j in awards:
+ res = j['resource']
+ self.reward.append(
+ {
+ "nickName": item['account'],
+ "reward": "{}×{}".format(res["name"], j.get("count") or 1)
+ }
+ )
+ logger.info(
+ f'{i.get("nickName")}获得了{res["name"]}×{j.get("count") or 1}')
+ if len(self.reward) > 0:
+ self.record_log()
+
+ def save_param(self, cred_resp):
+ self.header['cred'] = cred_resp['cred']
+ self.sign_token = cred_resp['token']
+
+ def log(self, account):
+ r = requests.post(token_password_url, json={"phone": account['account'], "password": account['password']},
+ headers=self.header_login).json()
+ if r.get('status') != 0:
+ raise Exception(f'获得token失败:{r["msg"]}')
+ return r['data']['token']
+
+ def get_cred_by_token(self, token):
+ return self.get_cred(self.get_grant_code(token))
+
+ def get_grant_code(self, token):
+ response = requests.post(grant_code_url, json={
+ 'appCode': app_code,
+ 'token': token,
+ 'type': 0
+ }, headers=self.header_login)
+ resp = response.json()
+ if response.status_code != 200:
+ raise Exception(f'获得认证代码失败:{resp}')
+ if resp.get('status') != 0:
+ raise Exception(f'获得认证代码失败:{resp["msg"]}')
+ return resp['data']['code']
+
+ def get_cred(self, grant):
+ resp = requests.post(cred_code_url, json={
+ 'code': grant,
+ 'kind': 1
+ }, headers=self.header_login).json()
+ if resp['code'] != 0:
+ raise Exception(f'获得cred失败:{resp["message"]}')
+ return resp['data']
+
+ def get_binding_list(self):
+ v = []
+ resp = requests.get(binding_url, headers=self.get_sign_header(binding_url, 'get', None, self.header)).json()
+
+ if resp['code'] != 0:
+ print(f"请求角色列表出现问题:{resp['message']}")
+ if resp.get('message') == '用户未登录':
+ print(f'用户登录可能失效了,请重新运行此程序!')
+ return []
+ for i in resp['data']['list']:
+ if i.get('appCode') != 'arknights':
+ continue
+ v.extend(i.get('bindingList'))
+ return v
+
+ def get_sign_header(self, url: str, method, body, old_header):
+ h = json.loads(json.dumps(old_header))
+ p = parse.urlparse(url)
+ if method.lower() == 'get':
+ h['sign'], header_ca = self.generate_signature(self.sign_token, p.path, p.query)
+ else:
+ h['sign'], header_ca = self.generate_signature(self.sign_token, p.path, json.dumps(body))
+ for i in header_ca:
+ h[i] = header_ca[i]
+ return h
+
+ def generate_signature(self, token: str, path, body_or_query):
+ """
+ 获得签名头
+ 接口地址+方法为Get请求?用query否则用body+时间戳+ 请求头的四个重要参数(dId,platform,timestamp,vName).toJSON()
+ 将此字符串做HMAC加密,算法为SHA-256,密钥token为请求cred接口会返回的一个token值
+ 再将加密后的字符串做MD5即得到sign
+ :param token: 拿cred时候的token
+ :param path: 请求路径(不包括网址)
+ :param body_or_query: 如果是GET,则是它的query。POST则为它的body
+ :return: 计算完毕的sign
+ """
+ # 总是说请勿修改设备时间,怕不是yj你的服务器有问题吧,所以这里特地-2
+
+ t = str(int(time.time()) - 2)
+ token = token.encode('utf-8')
+ header_ca = json.loads(json.dumps(self.header_for_sign))
+ header_ca['timestamp'] = t
+ header_ca_str = json.dumps(header_ca, separators=(',', ':'))
+ s = path + body_or_query + t + header_ca_str
+ hex_s = hmac.new(token, s.encode('utf-8'), hashlib.sha256).hexdigest()
+ md5 = hashlib.md5(hex_s.encode('utf-8')).hexdigest().encode('utf-8').decode('utf-8')
+ return md5, header_ca
+
+ def record_log(self):
+ date_str = datetime.datetime.now().strftime("%Y/%m/%d")
+ logger.info(f"存入{date_str}的数据{self.reward}")
+ try:
+ for item in self.reward:
+ res_df = pd.DataFrame(item, index=[date_str])
+ res_df.to_csv(self.record_path, mode='a', header=False, encoding='gbk')
+ except:
+ pass
- def get_record(self, account):
+ def has_record(self, phone: str):
try:
- with open(self.record_path, 'r+') as f:
- csv_reader = csv.reader(f)
- for line in csv_reader:
- if line:
- if line[0] == account and line[2] == str(datetime.date.today()):
- return True
- except FileNotFoundError:
- logger.info("森空岛记录查询失败")
- except FileExistsError:
- logger.info("森空岛记录查询失败")
-
- return False
+ if os.path.exists(self.record_path) is False:
+ logger.debug("无森空岛记录")
+ return False
+ df = pd.read_csv(self.record_path, header=None, encoding='gbk')
+ for item in df.iloc:
+ if item[0] == datetime.datetime.now().strftime("%Y/%m/%d"):
+ if item[1].astype(str) == phone:
+ logger.info(f"{phone}今天签到过了")
+ return True
+ return False
+ except PermissionError:
+ logger.info("skland.csv正在被占用")
+ except pd.errors.EmptyDataError:
+ return False
def test_connect(self):
res = []
- for item in self.account:
- data = {"phone": item['phone'], "password": item['password']}
- response = requests.post(headers=self.request_header, url=self.url.get("token_by_phone_password"),
- data=data)
-
- response_json = self.respone_to_json(response)
- temp_res = {
- "account": item['phone'],
- 'msg': response_json['msg']
- }
- res.append(temp_res)
-
- logger.info(res)
+ for item in self.account_list:
+ if item['isCheck']:
+ try:
+ self.save_param(self.get_cred_by_token(self.log(item)))
+ for i in self.get_binding_list():
+ if i['uid']:
+ res.append("{}连接成功".format(i['nickName']+"({})".format(i['channelName'])))
+ except:
+ res.append("{}无法连接".format(item['account']))
+
+ return res
+
diff --git a/server.py b/server.py
index 3d8395ae9..559064e05 100755
--- a/server.py
+++ b/server.py
@@ -6,6 +6,7 @@
from arknights_mower.solvers import record
from arknights_mower.solvers.report import get_report_data
+from arknights_mower.solvers.skland import SKLand
from arknights_mower.utils.conf import load_conf, save_conf, load_plan, write_plan
from arknights_mower.utils import depot
from arknights_mower.utils.log import logger
@@ -452,29 +453,4 @@ def test_serverJang_push():
@app.route("/check-skland")
@require_token
def test_skland():
- skland_info = []
- skland_info = conf["skland_info"]
-
- request_header = {
- "user-agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 33; ) Okhttp/4.11.0",
- "cred": "",
- "vName": "1.0.1",
- "vCode": "100001014",
- "Accept-Encoding": "gzip",
- "Connection": "close",
- "dId": "de9759a5afaa634f",
- "platform": "1",
- }
- res = []
- for item in skland_info:
- data = {"phone": item["account"], "password": item["password"]}
- response = requests.post(
- headers=request_header,
- url="https://as.hypergryph.com/user/auth/v1/token_by_phone_password",
- data=data,
- )
- response_json = json.loads(response.text)
- temp_res = {"account": item["account"], "msg": response_json["msg"]}
- res.append(temp_res)
-
- return res
+ return SKLand(conf["skland_info"]).test_connect()
diff --git a/ui/src/components/SKLand.vue b/ui/src/components/SKLand.vue
index 9808d4527..f24d4616d 100644
--- a/ui/src/components/SKLand.vue
+++ b/ui/src/components/SKLand.vue
@@ -6,6 +6,7 @@ import { useConfigStore } from '@/stores/config'
const store = useConfigStore()
import { storeToRefs } from 'pinia'
+import {NTag} from "naive-ui";
const { skland_enable, skland_info } = storeToRefs(store)
function add_account() {
@@ -30,6 +31,9 @@ async function test_maa() {
若账号密码正确但是连接失败,用手机登录查看是否需要人机验证