Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

森空岛登录修复 #697

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions arknights_mower/solvers/base_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,6 @@ def plan_metadata(self):
_name = dorm.name
if _name == "":
continue
if self.op_data.operators[dorm.name].operator_type != "high":
continue
# 如果是rest in full,则新增单独任务..
if (
_name in self.op_data.operators.keys()
Expand Down Expand Up @@ -592,14 +590,14 @@ def plan_metadata(self):
def should_keep(task):
if task.type != TaskTypes.RELEASE_DORM:
return True
elif len(task.plan) == 0:
return False
name = task.meta_data
for key, value in task.plan.items():
dorm_name = key
dorm_op = value
break
free_room = list(task.plan.keys())[0]
free_op = task.plan[free_room]
if (
self.op_data.operators[name].current_room != dorm_name
or dorm_op[self.op_data.operators[name].current_index] != "Free"
self.op_data.operators[name].current_room != free_room
or free_op[self.op_data.operators[name].current_index] != "Free"
):
logger.info(f"检测到{task.meta_data}不在对应位置,移除相关任务")
return False
Expand Down
150 changes: 20 additions & 130 deletions arknights_mower/solvers/skland.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,30 @@
import datetime
import hashlib
import hmac
import json
import os
import time
from urllib import parse

import pandas as pd
import requests

from arknights_mower.utils import config
from arknights_mower.utils.log import logger
from arknights_mower.utils.path import get_path

app_code = "4ca99fa6b56cc2ba"

# 签到url
sign_url = "https://zonai.skland.com/api/v1/game/attendance"
# 绑定的角色url
binding_url = "https://zonai.skland.com/api/v1/game/player/binding"
# 验证码url
login_code_url = "https://as.hypergryph.com/general/v1/send_phone_code"
# 验证码登录
token_phone_code_url = "https://as.hypergryph.com/user/auth/v2/token_by_phone_code"
# 密码登录
token_password_url = "https://as.hypergryph.com/user/auth/v1/token_by_phone_password"
# 使用token获得认证代码
grant_code_url = "https://as.hypergryph.com/user/oauth2/v2/grant"
# 使用认证代码获得cred
cred_code_url = "https://zonai.skland.com/api/v1/user/auth/generate_cred_by_code"
from arknights_mower.utils.skland import (
get_binding_list,
get_cred_by_token,
get_sign_header,
header,
header_login,
log,
sign_url,
token_password_url,
)


class SKLand:
def __init__(self):
self.record_path = get_path("@app/tmp/skland.csv")

self.header = {
"cred": "",
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}
self.header_login = {
"User-Agent": "Skland/1.0.1 (com.hypergryph.skland; build:100001014; Android 31; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
}

self.reward = []
# 签名请求头一定要这个顺序,否则失败
# timestamp是必填的,其它三个随便填,不要为none即可
self.header_for_sign = {"platform": "", "timestamp": "", "dId": "", "vName": ""}

self.sign_token = ""
self.all_recorded = True

Expand All @@ -59,13 +33,14 @@ def start(self):
if self.has_record(item.account):
continue
self.all_recorded = False
self.save_param(self.get_cred_by_token(self.log(item)))
for i in self.get_binding_list():
self.save_param(get_cred_by_token(log(item)))
for i in get_binding_list(self.sign_token):
body = {"gameId": 1, "uid": i.get("uid")}
# list_awards(1, i.get('uid'))
resp = requests.post(
sign_url,
headers=self.get_sign_header(sign_url, "post", body, self.header),
headers=get_sign_header(
sign_url, "post", body, self.sign_token, header
),
json=body,
).json()
if resp["code"] != 0:
Expand Down Expand Up @@ -93,104 +68,19 @@ def start(self):
return False

def save_param(self, cred_resp):
self.header["cred"] = cred_resp["cred"]
header["cred"] = cred_resp["cred"]
self.sign_token = cred_resp["token"]

def log(self, account):
r = requests.post(
token_password_url,
json={"phone": account.account, "password": account.password},
headers=self.header_login,
headers=header_login,
).json()
if r.get("status") != 0:
raise Exception(f'获得token失败:{r["msg"]}')
return r["data"]["token"]

def get_cred_by_token(self, token):
return self.get_cred(self.get_grant_code(token))

def get_grant_code(self, token):
response = requests.post(
grant_code_url,
json={"appCode": app_code, "token": token, "type": 0},
headers=self.header_login,
)
resp = response.json()
if response.status_code != 200:
raise Exception(f"获得认证代码失败:{resp}")
if resp.get("status") != 0:
raise Exception(f'获得认证代码失败:{resp["msg"]}')
return resp["data"]["code"]

def get_cred(self, grant):
resp = requests.post(
cred_code_url, json={"code": grant, "kind": 1}, headers=self.header_login
).json()
if resp["code"] != 0:
raise Exception(f'获得cred失败:{resp["message"]}')
return resp["data"]

def get_binding_list(self):
v = []
resp = requests.get(
binding_url,
headers=self.get_sign_header(binding_url, "get", None, self.header),
).json()

if resp["code"] != 0:
print(f"请求角色列表出现问题:{resp['message']}")
if resp.get("message") == "用户未登录":
print("用户登录可能失效了,请重新运行此程序!")
return []
for i in resp["data"]["list"]:
if i.get("appCode") != "arknights":
continue
v.extend(i.get("bindingList"))
return v

def get_sign_header(self, url: str, method, body, old_header):
h = json.loads(json.dumps(old_header))
p = parse.urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, p.query
)
else:
h["sign"], header_ca = self.generate_signature(
self.sign_token, p.path, json.dumps(body)
)
for i in header_ca:
h[i] = header_ca[i]
return h

def generate_signature(self, token: str, path, body_or_query):
"""
获得签名头
接口地址+方法为Get请求?用query否则用body+时间戳+ 请求头的四个重要参数(dId,platform,timestamp,vName).toJSON()
将此字符串做HMAC加密,算法为SHA-256,密钥token为请求cred接口会返回的一个token值
再将加密后的字符串做MD5即得到sign
:param token: 拿cred时候的token
:param path: 请求路径(不包括网址)
:param body_or_query: 如果是GET,则是它的query。POST则为它的body
:return: 计算完毕的sign
"""
# 总是说请勿修改设备时间,怕不是yj你的服务器有问题吧,所以这里特地-2

t = str(int(time.time()) - 2)
token = token.encode("utf-8")
header_ca = json.loads(json.dumps(self.header_for_sign))
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = (
hashlib.md5(hex_s.encode("utf-8"))
.hexdigest()
.encode("utf-8")
.decode("utf-8")
)
return md5, header_ca

def record_log(self):
date_str = datetime.datetime.now().strftime("%Y/%m/%d")
logger.info(f"存入{date_str}的数据{self.reward}")
Expand Down Expand Up @@ -227,8 +117,8 @@ def test_connect(self):
for item in config.conf.skland_info:
if item.isCheck:
try:
self.save_param(self.get_cred_by_token(self.log(item)))
for i in self.get_binding_list():
self.save_param(get_cred_by_token(log(item)))
for i in get_binding_list(self.sign_token):
if i["uid"]:
res.append(
"{}连接成功".format(
Expand Down
Loading
Loading