From 3991c86861303a2dff587231a4d210bedc8a798f Mon Sep 17 00:00:00 2001 From: Pooya Fekri Date: Tue, 19 Dec 2023 10:19:12 +0330 Subject: [PATCH] Mv tasks.py logic to celery_tasks.py --- core/utils.py | 21 ++ faucet/celery_tasks.py | 297 +++++++++++++++++++++ faucet/faucet_manager/fund_manager.py | 52 +++- faucet/tasks.py | 365 +++----------------------- requirements.txt | 3 + 5 files changed, 397 insertions(+), 341 deletions(-) create mode 100644 faucet/celery_tasks.py diff --git a/core/utils.py b/core/utils.py index 6a31543a..a51f9c07 100644 --- a/core/utils.py +++ b/core/utils.py @@ -1,6 +1,9 @@ import datetime +import time +from contextlib import contextmanager import pytz +from django.core.cache import cache from web3 import Web3 from web3.contract.contract import Contract, ContractFunction from web3.logs import DISCARD, IGNORE, STRICT, WARN @@ -8,6 +11,24 @@ from web3.types import TxParams, Type +@contextmanager +def memcache_lock(lock_id, oid, lock_expire=60): + timeout_at = time.monotonic() + lock_expire + # cache.add fails if the key already exists + status = cache.add(lock_id, oid, lock_expire) + try: + yield status + finally: + # memcache delete is very slow, but we have to use it to take + # advantage of using add() for atomic locking + if time.monotonic() < timeout_at and status: + # don't release the lock if we exceeded the timeout + # to lessen the chance of releasing an expired lock + # owned by someone else + # also don't release the lock if we didn't acquire it + cache.delete(lock_id) + + class TimeUtils: # @staticmethod # def get_last_monday(): diff --git a/faucet/celery_tasks.py b/faucet/celery_tasks.py new file mode 100644 index 00000000..3f52427e --- /dev/null +++ b/faucet/celery_tasks.py @@ -0,0 +1,297 @@ +import decimal +import logging + +import requests +import web3.exceptions +from django.db import transaction +from django.db.models import F, Func +from django.utils import timezone +from sentry_sdk import capture_exception + +from authentication.models import NetworkTypes, Wallet +from core.models import TokenPrice +from core.utils import Web3Utils +from tokenTap.models import TokenDistributionClaim + +from .faucet_manager.fund_manager import FundMangerException, get_fund_manager +from .models import Chain, ClaimReceipt, DonationReceipt, TransactionBatch + + +def has_pending_batch(chain): + return TransactionBatch.objects.filter( + chain=chain, _status=ClaimReceipt.PENDING + ).exists() + + +class CeleryTasks: + @staticmethod + def process_batch(batch_pk): + """ + Process a batch of claims and send the funds to the users + creates an on-chain transaction + """ + + try: + logging.info(f"Processing Batch {batch_pk}") + + batch = TransactionBatch.objects.get(pk=batch_pk) + if not batch.should_be_processed: + return + if batch.is_expired: + batch._status = ClaimReceipt.REJECTED + batch.save() + batch.claims.update(_status=batch._status) + return + + data = [ + { + "to": receipt.passive_address + if receipt.passive_address is not None + else Wallet.objects.get( + user_profile=receipt.user_profile, + wallet_type=batch.chain.chain_type, + ).address, + "amount": int(receipt.amount), + } + for receipt in batch.claims.all() + ] + ##### + logging.info(data) + + try: + manager = get_fund_manager(batch.chain) + tx_hash = manager.multi_transfer(data) + batch.tx_hash = tx_hash + batch.save() + except FundMangerException.GasPriceTooHigh as e: + logging.exception(e) + except FundMangerException.RPCError as e: + logging.exception(e) + except Exception as e: + capture_exception() + logging.exception(str(e)) + except TransactionBatch.DoesNotExist: + pass + + @staticmethod + def update_pending_batch_with_tx_hash(batch_pk): + # only one ongoing update per batch + logging.info("Updating Batch") + try: + batch = TransactionBatch.objects.get(pk=batch_pk) + except TransactionBatch.DoesNotExist: + return + try: + if not batch.status_should_be_updated: + return + manager = get_fund_manager(batch.chain) + + if manager.is_tx_verified(batch.tx_hash): + batch._status = ClaimReceipt.VERIFIED + elif batch.is_expired: + batch._status = ClaimReceipt.REJECTED + except Exception as e: + if batch.is_expired: + batch._status = ClaimReceipt.REJECTED + capture_exception() + logging.exception(str(e)) + finally: + batch.save() + batch.claims.update(_status=batch._status) + + @staticmethod + def reject_expired_pending_claims(): + ClaimReceipt.objects.filter( + batch=None, + _status=ClaimReceipt.PENDING, + datetime__lte=timezone.now() + - timezone.timedelta(minutes=ClaimReceipt.MAX_PENDING_DURATION), + ).update(_status=ClaimReceipt.REJECTED) + + @staticmethod + def process_chain_pending_claims(chain_id): + with transaction.atomic(): + chain = Chain.objects.select_for_update().get( + pk=chain_id + ) # lock based on chain + + # all pending batches must be resolved before new transactions can be made + if has_pending_batch(chain): + return + + # get all pending receipts for this chain + # pending receipts are receipts that have not been batched yet + receipts = ClaimReceipt.objects.filter( + chain=chain, _status=ClaimReceipt.PENDING, batch=None + ) + + if receipts.count() == 0: + return + + if chain.chain_type == NetworkTypes.LIGHTNING: + receipts = receipts.order_by("pk")[:1] + else: + receipts = receipts.order_by("pk")[:32] + + # if there are no pending batches, create a new batch + batch = TransactionBatch.objects.create(chain=chain) + + # assign the batch to the receipts + for receipt in receipts: + receipt.batch = batch + receipt.save() + + @staticmethod + def update_needs_funding_status_chain(chain_id): + try: + chain = Chain.objects.get(pk=chain_id) + # if has enough funds and enough fees, needs_funding is False + + chain.needs_funding = True + + if chain.has_enough_funds and chain.has_enough_fees: + chain.needs_funding = False + + chain.save() + except Exception as e: + logging.exception(str(e)) + capture_exception() + + @staticmethod + def process_verified_lightning_claim(gas_tap_claim_id): + try: + claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id) + user_profile = claim.user_profile + tokentap_lightning_claim = ( + TokenDistributionClaim.objects.filter( + user_profile=user_profile, + token_distribution__chain__chain_type=NetworkTypes.LIGHTNING, + ) + .order_by("-created_at") + .first() + ) + + if not tokentap_lightning_claim: + logging.info("No tokentap claim found for user") + return + + tokentap_lightning_claim.status = ClaimReceipt.VERIFIED + tokentap_lightning_claim.tx_hash = claim.tx_hash + tokentap_lightning_claim.save() + + claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP + claim.save() + + except Exception as e: + capture_exception() + logging.exception(f"error in processing lightning claims: {str(e)}") + + @staticmethod + def process_rejected_lightning_claim(gas_tap_claim_id): + try: + claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id) + user_profile = claim.user_profile + tokentap_lightning_claim = ( + TokenDistributionClaim.objects.filter( + user_profile=user_profile, + token_distribution__chain__chain_type=NetworkTypes.LIGHTNING, + ) + .order_by("-created_at") + .first() + ) + + if not tokentap_lightning_claim: + logging.info("No tokentap claim found for user") + return + + tokentap_lightning_claim.delete() + + claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP_REJECT + claim.save() + + except Exception as e: + capture_exception() + logging.exception(f"error in processing lightning claims: {str(e)}") + + @staticmethod + def update_token_price(token_pk): + with transaction.atomic(): + try: + token = TokenPrice.objects.select_for_update().get(pk=token_pk) + except TokenPrice.DoesNotExist: + logging.error(f"TokenPrice with pk {token_pk} does not exist.") + return + + def parse_request(token: TokenPrice, request_res: requests.Response): + try: + request_res.raise_for_status() + json_data = request_res.json() + token.usd_price = json_data["data"]["rates"]["USD"] + token.save() + except requests.HTTPError as e: + logging.exception( + f"requests for url: {request_res.url} can not fetched" + f" with status_code: {request_res.status_code}. " + f"{str(e)}" + ) + + except KeyError as e: + logging.exception( + f"requests for url: {request_res.url} data do not have" + f" property keys for loading data. {str(e)}" + ) + + except Exception as e: + logging.exception( + f"requests for url: {request_res.url} got error " + f"{type(e).__name__}. {str(e)}" + ) + + parse_request(token, requests.get(token.price_url, timeout=5)) + + @staticmethod + def process_donation_receipt(donation_receipt_pk): + donation_receipt = DonationReceipt.objects.get(pk=donation_receipt_pk) + evm_fund_manager = get_fund_manager(donation_receipt.chain) + try: + if not evm_fund_manager.is_tx_verified(donation_receipt.tx_hash): + donation_receipt.delete() + return + user = donation_receipt.user_profile + tx = evm_fund_manager.get_tx(donation_receipt.tx_hash) + if tx.get("from").lower() not in user.wallets.annotate( + lower_address=Func(F("address"), function="LOWER") + ).values_list("lower_address", flat=True): + donation_receipt.delete() + return + if ( + Web3Utils.to_checksum_address(tx.get("to")) + != evm_fund_manager.get_fund_manager_checksum_address() + ): + donation_receipt.delete() + return + donation_receipt.value = str(evm_fund_manager.from_wei(tx.get("value"))) + if not donation_receipt.chain.is_testnet: + try: + token_price = TokenPrice.objects.get( + symbol=donation_receipt.chain.symbol + ) + donation_receipt.total_price = str( + decimal.Decimal(donation_receipt.value) + * decimal.Decimal(token_price.usd_price) + ) + except TokenPrice.DoesNotExist: + logging.error( + f"TokenPrice for Chain: {donation_receipt.chain.chain_name}" + f" did not defined" + ) + donation_receipt.status = ClaimReceipt.REJECTED + donation_receipt.save() + return + else: + donation_receipt.total_price = str(0) + donation_receipt.status = ClaimReceipt.VERIFIED + donation_receipt.save() + except (web3.exceptions.TransactionNotFound, web3.exceptions.TimeExhausted): + donation_receipt.delete() + return diff --git a/faucet/faucet_manager/fund_manager.py b/faucet/faucet_manager/fund_manager.py index dc761336..03ca52cf 100644 --- a/faucet/faucet_manager/fund_manager.py +++ b/faucet/faucet_manager/fund_manager.py @@ -12,6 +12,7 @@ from solders.signature import Signature from solders.transaction_status import TransactionConfirmationStatus +from authentication.models import NetworkTypes from core.utils import Web3Utils from faucet.constants import MEMCACHE_LIGHTNING_LOCK_KEY from faucet.faucet_manager.fund_manager_abi import manager_abi @@ -32,12 +33,29 @@ class RPCError(Exception): pass +def get_fund_manager(chain: Chain): + if chain.chain_type == NetworkTypes.SOLANA: + manager_cls = SolanaFundManager + elif chain.chain_type == NetworkTypes.LIGHTNING: + manager_cls = LightningFundManager + elif ( + chain.chain_type == NetworkTypes.EVM + or chain.chain_type == NetworkTypes.NONEVMXDC + ): + manager_cls = EVMFundManager + else: + raise Exception(f"Invalid chain type {chain.chain_type}") + return manager_cls(chain) + + class EVMFundManager: def __init__(self, chain: Chain): self.chain = chain self.web3_utils = Web3Utils(self.chain.rpc_url_private) self.web3_utils.set_account(self.chain.wallet.main_key) - self.web3_utils.set_contract(self.get_fund_manager_checksum_address(), abi=manager_abi) + self.web3_utils.set_contract( + self.get_fund_manager_checksum_address(), abi=manager_abi + ) def get_gas_price(self): return self.web3_utils.get_gas_price() @@ -85,7 +103,9 @@ def prepare_tx_for_broadcast(self, tx_function_str, *args): tx_params = { "gas": gas_estimation, - "gasPrice": int(self.web3_utils.get_gas_price() * self.chain.gas_multiplier), + "gasPrice": int( + self.web3_utils.get_gas_price() * self.chain.gas_multiplier + ), } signed_tx = self.web3_utils.build_contract_txn(tx_function, **tx_params) @@ -119,7 +139,9 @@ def w3(self) -> Client: return _w3 except Exception as e: logging.error(e) - raise FundMangerException.RPCError(f"Could not connect to rpc {self.chain.rpc_url_private}") + raise FundMangerException.RPCError( + f"Could not connect to rpc {self.chain.rpc_url_private}" + ) @property def account(self) -> Keypair: @@ -135,7 +157,9 @@ def lock_account_seed(self) -> bytes: @property def lock_account_address(self) -> Pubkey: - lock_account_address, nonce = Pubkey.find_program_address([self.lock_account_seed], self.program_id) + lock_account_address, nonce = Pubkey.find_program_address( + [self.lock_account_seed], self.program_id + ) return lock_account_address @property @@ -209,17 +233,23 @@ def multi_transfer(self, data): def is_tx_verified(self, tx_hash): try: confirmation_status = ( - self.w3.get_signature_statuses([Signature.from_string(tx_hash)]).value[0].confirmation_status + self.w3.get_signature_statuses([Signature.from_string(tx_hash)]) + .value[0] + .confirmation_status ) return confirmation_status in [ TransactionConfirmationStatus.Confirmed, TransactionConfirmationStatus.Finalized, ] except RPCException: - logging.warning("Solana raised the RPCException at get_signature_statuses()") + logging.warning( + "Solana raised the RPCException at get_signature_statuses()" + ) return False except RPCNoResultException: - logging.warning("Solana raised the RPCNoResultException at get_signature_statuses()") + logging.warning( + "Solana raised the RPCNoResultException at get_signature_statuses()" + ) return False except Exception: raise @@ -241,7 +271,9 @@ def api_key(self): @property def lnpay_client(self): - return LNPayClient(self.chain.rpc_url_private, self.api_key, self.chain.fund_manager_address) + return LNPayClient( + self.chain.rpc_url_private, self.api_key, self.chain.fund_manager_address + ) def __check_max_cap_exceeds(self, amount) -> bool: try: @@ -263,7 +295,9 @@ def multi_transfer(self, data): assert acquired, "Could not acquire Lightning multi-transfer lock" item = data[0] - assert not self.__check_max_cap_exceeds(item["amount"]), "Lightning periodic max cap exceeded" + assert not self.__check_max_cap_exceeds( + item["amount"] + ), "Lightning periodic max cap exceeded" try: pay_result = client.pay_invoice(item["to"]) diff --git a/faucet/tasks.py b/faucet/tasks.py index 04f29d1c..2d3e1eb6 100644 --- a/faucet/tasks.py +++ b/faucet/tasks.py @@ -1,54 +1,14 @@ -import decimal import logging -import time -from contextlib import contextmanager -import requests -import web3.exceptions from celery import shared_task from django.conf import settings as django_settings from django.core.cache import cache -from django.db import transaction -from django.db.models import F, Func -from django.utils import timezone -from sentry_sdk import capture_exception -from authentication.models import Wallet from core.models import NetworkTypes, TokenPrice -from core.utils import Web3Utils -from tokenTap.models import TokenDistributionClaim - -from .faucet_manager.fund_manager import ( - EVMFundManager, - FundMangerException, - LightningFundManager, - SolanaFundManager, -) -from .models import Chain, ClaimReceipt, DonationReceipt, TransactionBatch - - -@contextmanager -def memcache_lock(lock_id, oid, lock_expire=60): - timeout_at = time.monotonic() + lock_expire - # cache.add fails if the key already exists - status = cache.add(lock_id, oid, lock_expire) - try: - yield status - finally: - # memcache delete is very slow, but we have to use it to take - # advantage of using add() for atomic locking - if time.monotonic() < timeout_at and status: - # don't release the lock if we exceeded the timeout - # to lessen the chance of releasing an expired lock - # owned by someone else - # also don't release the lock if we didn't acquire it - cache.delete(lock_id) +from core.utils import memcache_lock - -def has_pending_batch(chain): - return TransactionBatch.objects.filter( - chain=chain, _status=ClaimReceipt.PENDING - ).exists() +from .celery_tasks import CeleryTasks +from .models import Chain, ClaimReceipt, DonationReceipt, TransactionBatch def passive_address_is_not_none(address): @@ -59,74 +19,13 @@ def passive_address_is_not_none(address): @shared_task(bind=True) def process_batch(self, batch_pk): - """ - Process a batch of claims and send the funds to the users - creates an on-chain transaction - """ - - id = f"{self.name}-LOCK-{batch_pk}" - - try: - with memcache_lock(id, self.app.oid) as acquired: - if not acquired: - print("Could not acquire process lock") - return - - print(f"Processing Batch {batch_pk}") - - batch = TransactionBatch.objects.get(pk=batch_pk) - - if batch.should_be_processed: - if batch.is_expired: - batch._status = ClaimReceipt.REJECTED - batch.save() - batch.claims.update(_status=batch._status) - return - - data = [ - { - "to": receipt.passive_address - if receipt.passive_address is not None - else Wallet.objects.get( - user_profile=receipt.user_profile, - wallet_type=batch.chain.chain_type, - ).address, - "amount": int(receipt.amount), - } - for receipt in batch.claims.all() - ] - ##### - print(data) - - try: - if batch.chain.chain_type == NetworkTypes.SOLANA: - manager = SolanaFundManager(batch.chain) - elif batch.chain.chain_type == NetworkTypes.LIGHTNING: - manager = LightningFundManager(batch.chain) - elif ( - batch.chain.chain_type == NetworkTypes.EVM - or batch.chain.chain_type == NetworkTypes.NONEVMXDC - ): - manager = EVMFundManager(batch.chain) - else: - raise Exception( - "Invalid chain type to process batch, chain type " - f"{batch.chain.chain_type}" - ) - tx_hash = manager.multi_transfer(data) - batch.tx_hash = tx_hash - batch.save() - except FundMangerException.GasPriceTooHigh as e: - logging.error(e) - except FundMangerException.RPCError as e: - logging.error(e) - except Exception as e: - capture_exception() - print(str(e)) - - cache.delete(id) - except TransactionBatch.DoesNotExist: - pass + id_ = f"{self.name}-LOCK-{batch_pk}" + with memcache_lock(id_, self.app.oid) as acquired: + if not acquired: + logging.info("Could not acquire process lock") + return + CeleryTasks.process_batch(batch_pk) + cache.delete(id_) @shared_task @@ -142,57 +41,21 @@ def process_pending_batches(): def update_pending_batch_with_tx_hash(self, batch_pk): # only one ongoing update per batch - id = f"{self.name}-LOCK-{batch_pk}" + id_ = f"{self.name}-LOCK-{batch_pk}" - with memcache_lock(id, self.app.oid) as acquired: + with memcache_lock(id_, self.app.oid) as acquired: if not acquired: - print("Could not acquire update lock") + logging.info("Could not acquire update lock") return - print("Updating Batch") - - batch = TransactionBatch.objects.get(pk=batch_pk) - try: - if batch.status_should_be_updated: - if batch.chain.chain_type == NetworkTypes.SOLANA: - manager = SolanaFundManager(batch.chain) - elif batch.chain.chain_type == NetworkTypes.LIGHTNING: - manager = LightningFundManager(batch.chain) - elif ( - batch.chain.chain_type == NetworkTypes.EVM - or batch.chain.chain_type == NetworkTypes.NONEVMXDC - ): - manager = EVMFundManager(batch.chain) - else: - raise Exception( - "Invalid chain type to update pending batch, chain type " - f"{batch.chain.chain_type}" - ) - - if manager.is_tx_verified(batch.tx_hash): - batch._status = ClaimReceipt.VERIFIED - elif batch.is_expired: - batch._status = ClaimReceipt.REJECTED - except Exception as e: - if batch.is_expired: - batch._status = ClaimReceipt.REJECTED - capture_exception() - print(str(e)) - finally: - batch.save() - batch.claims.update(_status=batch._status) - - cache.delete(id) + CeleryTasks.update_pending_batch_with_tx_hash(batch_pk) + + cache.delete(id_) @shared_task def reject_expired_pending_claims(): - ClaimReceipt.objects.filter( - batch=None, - _status=ClaimReceipt.PENDING, - datetime__lte=timezone.now() - - timezone.timedelta(minutes=ClaimReceipt.MAX_PENDING_DURATION), - ).update(_status=ClaimReceipt.REJECTED) + CeleryTasks.reject_expired_pending_claims() @shared_task @@ -208,36 +71,7 @@ def update_pending_batches_with_tx_hash_status(): @shared_task def process_chain_pending_claims(chain_id): # locks chain - with transaction.atomic(): - chain = Chain.objects.select_for_update().get( - pk=chain_id - ) # lock based on chain - - # all pending batches must be resolved before new transactions can be made - if has_pending_batch(chain): - return - - # get all pending receipts for this chain - # pending receipts are receipts that have not been batched yet - receipts = ClaimReceipt.objects.filter( - chain=chain, _status=ClaimReceipt.PENDING, batch=None - ) - - if receipts.count() == 0: - return - - if chain.chain_type == NetworkTypes.LIGHTNING: - receipts = receipts.order_by("pk")[:1] - else: - receipts = receipts.order_by("pk")[:32] - - # if there are no pending batches, create a new batch - batch = TransactionBatch.objects.create(chain=chain) - - # assign the batch to the receipts - for receipt in receipts: - receipt.batch = batch - receipt.save() + CeleryTasks.process_chain_pending_claims(chain_id) @shared_task @@ -249,18 +83,7 @@ def process_pending_claims(): # periodic task @shared_task def update_needs_funding_status_chain(chain_id): - try: - chain = Chain.objects.get(pk=chain_id) - # if has enough funds and enough fees, needs_funding is False - - chain.needs_funding = True - - if chain.has_enough_funds and chain.has_enough_fees: - chain.needs_funding = False - - chain.save() - except Exception: - capture_exception() + CeleryTasks.update_needs_funding_status_chain(chain_id) @shared_task @@ -272,60 +95,12 @@ def update_needs_funding_status(): # periodic task @shared_task def process_verified_lighning_claim(gas_tap_claim_id): - try: - claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id) - user_profile = claim.user_profile - tokentap_lightning_claim = ( - TokenDistributionClaim.objects.filter( - user_profile=user_profile, - token_distribution__chain__chain_type=NetworkTypes.LIGHTNING, - ) - .order_by("-created_at") - .first() - ) - - if not tokentap_lightning_claim: - print("No tokentap claim found for user") - return - - tokentap_lightning_claim.status = ClaimReceipt.VERIFIED - tokentap_lightning_claim.tx_hash = claim.tx_hash - tokentap_lightning_claim.save() - - claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP - claim.save() - - except Exception as e: - capture_exception() - print(f"error in processing lightning claims: {str(e)}") + CeleryTasks.process_verified_lightning_claim(gas_tap_claim_id) @shared_task def process_rejected_lighning_claim(gas_tap_claim_id): - try: - claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id) - user_profile = claim.user_profile - tokentap_lightning_claim = ( - TokenDistributionClaim.objects.filter( - user_profile=user_profile, - token_distribution__chain__chain_type=NetworkTypes.LIGHTNING, - ) - .order_by("-created_at") - .first() - ) - - if not tokentap_lightning_claim: - print("No tokentap claim found for user") - return - - tokentap_lightning_claim.delete() - - claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP_REJECT - claim.save() - - except Exception as e: - capture_exception() - print(f"error in processing lightning claims: {str(e)}") + CeleryTasks.process_rejected_lightning_claim(gas_tap_claim_id) @shared_task @@ -352,100 +127,26 @@ def update_tokentap_claim_for_verified_lightning_claims(): @shared_task -def update_tokens_price(): - """ - update token.usd_price for all TokenPrice records in DB - """ +def update_token_price(token_pk): + CeleryTasks.update_token_price(token_pk) - # TODO: we can make this function performance better - # by using aiohttp and asyncio or Threads - tokens = TokenPrice.objects.exclude(price_url__isnull=True).exclude(price_url="") - res_gen = map( - lambda token: (token, requests.get(token.price_url, timeout=5)), tokens - ) - def parse_request(token: TokenPrice, request_res: requests.Response): - try: - request_res.raise_for_status() - json_data = request_res.json() - token.usd_price = json_data["data"]["rates"]["USD"] - # TODO: save all change when this function ended for - # all url done for better performance - token.save() - except requests.HTTPError as e: - logging.exception( - f"requests for url: {request_res.url} can not fetched with " - f"status_code: {request_res.status_code}. \ - {str(e)}" - ) - - except KeyError as e: - logging.exception( - f"requests for url: {request_res.url} data do not have property " - f"keys for loading data. {str(e)}" - ) - - except Exception as e: - logging.exception( - f"requests for url: {request_res.url} got error " - f"{type(e).__name__}. {str(e)}" - ) - - [parse_request(*res) for res in res_gen] +@shared_task +def update_tokens_price(): + tokens = TokenPrice.objects.exclude(price_url__isnull=True).exclude(price_url="") + for token in tokens: + update_token_price.delay(token.pk) @shared_task(bind=True) def process_donation_receipt(self, donation_receipt_pk): - lock_name = f"{self.name}-LOCK-{donation_receipt_pk}" - logging.info(f"lock name is: {lock_name}") - with memcache_lock(lock_name, self.app.oid) as acquired: - donation_receipt = DonationReceipt.objects.get(pk=donation_receipt_pk) + id_ = f"{self.name}-LOCK-{donation_receipt_pk}" + with memcache_lock(id_, self.app.oid) as acquired: if not acquired: - logging.debug("Could not acquire update lock") - return - evm_fund_manager = EVMFundManager(donation_receipt.chain) - try: - if evm_fund_manager.is_tx_verified(donation_receipt.tx_hash) is False: - donation_receipt.delete() - return - user = donation_receipt.user_profile - tx = evm_fund_manager.get_tx(donation_receipt.tx_hash) - if tx.get("from").lower() not in user.wallets.annotate( - lower_address=Func(F("address"), function="LOWER") - ).values_list("lower_address", flat=True): - donation_receipt.delete() - return - if ( - Web3Utils.to_checksum_address(tx.get("to")) - != evm_fund_manager.get_fund_manager_checksum_address() - ): - donation_receipt.delete() - return - donation_receipt.value = str(evm_fund_manager.from_wei(tx.get("value"))) - if donation_receipt.chain.is_testnet is False: - try: - token_price = TokenPrice.objects.get( - symbol=donation_receipt.chain.symbol - ) - donation_receipt.total_price = str( - decimal.Decimal(donation_receipt.value) - * decimal.Decimal(token_price.usd_price) - ) - except TokenPrice.DoesNotExist: - logging.error( - f"TokenPrice for Chain: {donation_receipt.chain.chain_name} " - "did not defined" - ) - donation_receipt.status = ClaimReceipt.REJECTED - donation_receipt.save() - return - else: - donation_receipt.total_price = str(0) - donation_receipt.status = ClaimReceipt.VERIFIED - donation_receipt.save() - except (web3.exceptions.TransactionNotFound, web3.exceptions.TimeExhausted): - donation_receipt.delete() + logging.info("Could not acquire update lock") return + CeleryTasks.process_donation_receipt(donation_receipt_pk) + cache.delete(id_) @shared_task diff --git a/requirements.txt b/requirements.txt index 66e61ce1..4c3fa533 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,6 @@ whitenoise==6.1.0 solana==0.29.1 solders==0.14.3 coverage~=7.3.2 +pytz~=2023.3.post1 +requests~=2.31.0 +celery~=5.3.4