Skip to content

Commit

Permalink
Merge pull request #229 from UnitapApp/refactor/mv-tasks-logic-to-cel…
Browse files Browse the repository at this point in the history
…ery-tasks

Mv tasks.py logic to celery_tasks.py
  • Loading branch information
PooyaFekri authored Dec 19, 2023
2 parents 3203e2a + 3991c86 commit 4c71f1f
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 341 deletions.
21 changes: 21 additions & 0 deletions core/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
import datetime
import time
from contextlib import contextmanager

import pytz
from django.core.cache import cache
from web3 import Web3
from web3.contract.contract import Contract, ContractFunction
from web3.logs import DISCARD, IGNORE, STRICT, WARN
from web3.middleware import geth_poa_middleware
from web3.types import TxParams, Type


@contextmanager
def memcache_lock(lock_id, oid, lock_expire=60):
timeout_at = time.monotonic() + lock_expire
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, lock_expire)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)


class TimeUtils:
# @staticmethod
# def get_last_monday():
Expand Down
297 changes: 297 additions & 0 deletions faucet/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
import decimal
import logging

import requests
import web3.exceptions
from django.db import transaction
from django.db.models import F, Func
from django.utils import timezone
from sentry_sdk import capture_exception

from authentication.models import NetworkTypes, Wallet
from core.models import TokenPrice
from core.utils import Web3Utils
from tokenTap.models import TokenDistributionClaim

from .faucet_manager.fund_manager import FundMangerException, get_fund_manager
from .models import Chain, ClaimReceipt, DonationReceipt, TransactionBatch


def has_pending_batch(chain):
return TransactionBatch.objects.filter(
chain=chain, _status=ClaimReceipt.PENDING
).exists()


class CeleryTasks:
@staticmethod
def process_batch(batch_pk):
"""
Process a batch of claims and send the funds to the users
creates an on-chain transaction
"""

try:
logging.info(f"Processing Batch {batch_pk}")

batch = TransactionBatch.objects.get(pk=batch_pk)
if not batch.should_be_processed:
return
if batch.is_expired:
batch._status = ClaimReceipt.REJECTED
batch.save()
batch.claims.update(_status=batch._status)
return

data = [
{
"to": receipt.passive_address
if receipt.passive_address is not None
else Wallet.objects.get(
user_profile=receipt.user_profile,
wallet_type=batch.chain.chain_type,
).address,
"amount": int(receipt.amount),
}
for receipt in batch.claims.all()
]
#####
logging.info(data)

try:
manager = get_fund_manager(batch.chain)
tx_hash = manager.multi_transfer(data)
batch.tx_hash = tx_hash
batch.save()
except FundMangerException.GasPriceTooHigh as e:
logging.exception(e)
except FundMangerException.RPCError as e:
logging.exception(e)
except Exception as e:
capture_exception()
logging.exception(str(e))
except TransactionBatch.DoesNotExist:
pass

@staticmethod
def update_pending_batch_with_tx_hash(batch_pk):
# only one ongoing update per batch
logging.info("Updating Batch")
try:
batch = TransactionBatch.objects.get(pk=batch_pk)
except TransactionBatch.DoesNotExist:
return
try:
if not batch.status_should_be_updated:
return
manager = get_fund_manager(batch.chain)

if manager.is_tx_verified(batch.tx_hash):
batch._status = ClaimReceipt.VERIFIED
elif batch.is_expired:
batch._status = ClaimReceipt.REJECTED
except Exception as e:
if batch.is_expired:
batch._status = ClaimReceipt.REJECTED
capture_exception()
logging.exception(str(e))
finally:
batch.save()
batch.claims.update(_status=batch._status)

@staticmethod
def reject_expired_pending_claims():
ClaimReceipt.objects.filter(
batch=None,
_status=ClaimReceipt.PENDING,
datetime__lte=timezone.now()
- timezone.timedelta(minutes=ClaimReceipt.MAX_PENDING_DURATION),
).update(_status=ClaimReceipt.REJECTED)

@staticmethod
def process_chain_pending_claims(chain_id):
with transaction.atomic():
chain = Chain.objects.select_for_update().get(
pk=chain_id
) # lock based on chain

# all pending batches must be resolved before new transactions can be made
if has_pending_batch(chain):
return

# get all pending receipts for this chain
# pending receipts are receipts that have not been batched yet
receipts = ClaimReceipt.objects.filter(
chain=chain, _status=ClaimReceipt.PENDING, batch=None
)

if receipts.count() == 0:
return

if chain.chain_type == NetworkTypes.LIGHTNING:
receipts = receipts.order_by("pk")[:1]
else:
receipts = receipts.order_by("pk")[:32]

# if there are no pending batches, create a new batch
batch = TransactionBatch.objects.create(chain=chain)

# assign the batch to the receipts
for receipt in receipts:
receipt.batch = batch
receipt.save()

@staticmethod
def update_needs_funding_status_chain(chain_id):
try:
chain = Chain.objects.get(pk=chain_id)
# if has enough funds and enough fees, needs_funding is False

chain.needs_funding = True

if chain.has_enough_funds and chain.has_enough_fees:
chain.needs_funding = False

chain.save()
except Exception as e:
logging.exception(str(e))
capture_exception()

@staticmethod
def process_verified_lightning_claim(gas_tap_claim_id):
try:
claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id)
user_profile = claim.user_profile
tokentap_lightning_claim = (
TokenDistributionClaim.objects.filter(
user_profile=user_profile,
token_distribution__chain__chain_type=NetworkTypes.LIGHTNING,
)
.order_by("-created_at")
.first()
)

if not tokentap_lightning_claim:
logging.info("No tokentap claim found for user")
return

tokentap_lightning_claim.status = ClaimReceipt.VERIFIED
tokentap_lightning_claim.tx_hash = claim.tx_hash
tokentap_lightning_claim.save()

claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP
claim.save()

except Exception as e:
capture_exception()
logging.exception(f"error in processing lightning claims: {str(e)}")

@staticmethod
def process_rejected_lightning_claim(gas_tap_claim_id):
try:
claim = ClaimReceipt.objects.get(pk=gas_tap_claim_id)
user_profile = claim.user_profile
tokentap_lightning_claim = (
TokenDistributionClaim.objects.filter(
user_profile=user_profile,
token_distribution__chain__chain_type=NetworkTypes.LIGHTNING,
)
.order_by("-created_at")
.first()
)

if not tokentap_lightning_claim:
logging.info("No tokentap claim found for user")
return

tokentap_lightning_claim.delete()

claim._status = ClaimReceipt.PROCESSED_FOR_TOKENTAP_REJECT
claim.save()

except Exception as e:
capture_exception()
logging.exception(f"error in processing lightning claims: {str(e)}")

@staticmethod
def update_token_price(token_pk):
with transaction.atomic():
try:
token = TokenPrice.objects.select_for_update().get(pk=token_pk)
except TokenPrice.DoesNotExist:
logging.error(f"TokenPrice with pk {token_pk} does not exist.")
return

def parse_request(token: TokenPrice, request_res: requests.Response):
try:
request_res.raise_for_status()
json_data = request_res.json()
token.usd_price = json_data["data"]["rates"]["USD"]
token.save()
except requests.HTTPError as e:
logging.exception(
f"requests for url: {request_res.url} can not fetched"
f" with status_code: {request_res.status_code}. "
f"{str(e)}"
)

except KeyError as e:
logging.exception(
f"requests for url: {request_res.url} data do not have"
f" property keys for loading data. {str(e)}"
)

except Exception as e:
logging.exception(
f"requests for url: {request_res.url} got error "
f"{type(e).__name__}. {str(e)}"
)

parse_request(token, requests.get(token.price_url, timeout=5))

@staticmethod
def process_donation_receipt(donation_receipt_pk):
donation_receipt = DonationReceipt.objects.get(pk=donation_receipt_pk)
evm_fund_manager = get_fund_manager(donation_receipt.chain)
try:
if not evm_fund_manager.is_tx_verified(donation_receipt.tx_hash):
donation_receipt.delete()
return
user = donation_receipt.user_profile
tx = evm_fund_manager.get_tx(donation_receipt.tx_hash)
if tx.get("from").lower() not in user.wallets.annotate(
lower_address=Func(F("address"), function="LOWER")
).values_list("lower_address", flat=True):
donation_receipt.delete()
return
if (
Web3Utils.to_checksum_address(tx.get("to"))
!= evm_fund_manager.get_fund_manager_checksum_address()
):
donation_receipt.delete()
return
donation_receipt.value = str(evm_fund_manager.from_wei(tx.get("value")))
if not donation_receipt.chain.is_testnet:
try:
token_price = TokenPrice.objects.get(
symbol=donation_receipt.chain.symbol
)
donation_receipt.total_price = str(
decimal.Decimal(donation_receipt.value)
* decimal.Decimal(token_price.usd_price)
)
except TokenPrice.DoesNotExist:
logging.error(
f"TokenPrice for Chain: {donation_receipt.chain.chain_name}"
f" did not defined"
)
donation_receipt.status = ClaimReceipt.REJECTED
donation_receipt.save()
return
else:
donation_receipt.total_price = str(0)
donation_receipt.status = ClaimReceipt.VERIFIED
donation_receipt.save()
except (web3.exceptions.TransactionNotFound, web3.exceptions.TimeExhausted):
donation_receipt.delete()
return
Loading

0 comments on commit 4c71f1f

Please sign in to comment.