-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
116 lines (88 loc) · 3.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import time
from random import uniform, shuffle
import pyuseragents
import requests
from eth_account import Account
from eth_account.signers.local import LocalAccount
from loguru import logger
from web3 import Web3, types
from abi import CLAIM_ABI
from config import CLAIM_ADDRESS, RPC, SLEEP_BETWEEN_WALLETS, RANDOM_WALLETS_ORDER
BASE_URL = "https://www.zksyncpepe.com/resources/"
AMOUNTS_URL = f"{BASE_URL}amounts/"
PROOFS_URL = f"{BASE_URL}proofs/"
w3 = Web3(Web3.HTTPProvider(RPC))
claim_contract = w3.eth.contract(w3.to_checksum_address(CLAIM_ADDRESS), abi=CLAIM_ABI)
logger.add("log/debug.log")
HEADERS = {
"authority": "www.zksyncpepe.com",
"scheme": "https",
"cache-control": "no-cache",
"accept": "application/json, text/plain, */*",
"accept-encoding": "deflate, br",
"accept-language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7,uk-UA;q=0.6,uk;q=0.5",
"referer": "https://www.zksyncpepe.com/airdrop",
"sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": pyuseragents.random(),
}
def get_amount_and_proof(address: str, session: requests.Session) -> tuple[int, list[str]] | None:
retries = 3
delay = 5
while retries:
try:
amount_response = session.get(f"{AMOUNTS_URL}{address.lower()}.json")
amount_response.raise_for_status()
amount = amount_response.json()
time.sleep(1)
proof_response = session.get(f"{PROOFS_URL}{address.lower()}.json")
proof_response.raise_for_status()
proof = proof_response.json()
return amount[0], proof
except requests.RequestException as ex:
logger.error(f"Request failed for {address}. Retrying in {delay} seconds. Error: {ex}")
time.sleep(delay)
retries -= 1
continue
def claim(wallet: LocalAccount, proof: list[str], amount: int) -> None:
tx_params: types.TxParams = {
"from": wallet.address,
"nonce": w3.eth.get_transaction_count(wallet.address),
}
tx = claim_contract.functions.claim(proof, w3.to_wei(amount, "ether")).build_transaction(tx_params)
signed_tx = wallet.sign_transaction(tx)
try:
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
logger.success(f"Claim {amount} tokens from {wallet.address}. Tx: https://explorer.zksync.io/tx/{tx_hash.hex()}")
except Exception as ex:
logger.error(f"{wallet.address}. Error sending transaction: {ex}")
def sleep() -> None:
sleep_amount = uniform(*SLEEP_BETWEEN_WALLETS)
logger.info(f"Sleep {sleep_amount} s")
time.sleep(sleep_amount)
def main() -> None:
logger.info("Start.")
with open("wallets.txt", "r", encoding="utf8") as file:
wallets: list[LocalAccount] = [
Account.from_key(line.strip())
for line in file.read().split("\n")
if line != ""
]
if RANDOM_WALLETS_ORDER:
shuffle(wallets)
with requests.Session() as session:
session.headers.update(HEADERS)
for wallet in wallets:
res = get_amount_and_proof(wallet.address, session)
if res is None:
continue
amount, proof = res
claim(wallet, proof, amount)
sleep()
logger.info("Finish")
if __name__ == "__main__":
main()