Skip to content

Commit

Permalink
Fix duplicated webhook calls (safe-global#1354)
Browse files Browse the repository at this point in the history
* Add processed cache to ERC20/721 events indexer
* Add processed cache to Safe events indexer (not really needed as it doesn't use `bulk_create` but still saves processing an event and some calls to the database)
* Remove not needed methods from ERC20 indexer
* Add processed cache to InternalTxIndexer
* Refactor tests
* When there's a reorg reset the indexers status
* Remove deprecated docs
  • Loading branch information
Uxio0 authored Mar 8, 2023
1 parent d0ad9a9 commit 7f0bd55
Show file tree
Hide file tree
Showing 11 changed files with 940 additions and 109 deletions.
69 changes: 29 additions & 40 deletions safe_transaction_service/history/indexers/erc20_events_indexer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import operator
from collections import OrderedDict
from logging import getLogger
from typing import Iterator, List, Optional, Sequence

from django.db.models import QuerySet

from cache_memoize import cache_memoize
from cachetools import cachedmethod
from eth_abi.exceptions import DecodingError
from eth_typing import ChecksumAddress
from web3.contract import ContractEvent
from web3.exceptions import BadFunctionCallOutput
from web3.types import EventData, LogReceipt

from gnosis.eth import EthereumClient

from safe_transaction_service.tokens.models import Token

from ...utils.utils import FixedSizeDict
from ..models import (
ERC20Transfer,
ERC721Transfer,
Expand Down Expand Up @@ -47,12 +41,22 @@ def del_singleton(cls):


class Erc20EventsIndexer(EventsIndexer):
_cache_is_erc20 = {}

"""
Indexes ERC20 and ERC721 `Transfer` Event (as ERC721 has the same topic)
Indexes `ERC20` and `ERC721` `Transfer` events.
ERC20 Transfer Event: `Transfer(address indexed from, address indexed to, uint256 value)`
ERC721 Transfer Event: `Transfer(address indexed from, address indexed to, uint256 indexed tokenId)`
As `event topic` is the same both events can be indexed together, and then tell
apart based on the `indexed` part as `indexed` elements are stored in a different way in the
`ethereum tx receipt`.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self._processed_element_cache = FixedSizeDict(maxlen=40_000) # Around 3MiB

@property
def contract_events(self) -> List[ContractEvent]:
"""
Expand Down Expand Up @@ -107,36 +111,14 @@ def _do_node_query(
or transfer_event["args"]["from"] in addresses_set
]

@cachedmethod(cache=operator.attrgetter("_cache_is_erc20"))
@cache_memoize(60 * 60 * 24, prefix="erc20-events-indexer-is-erc20") # 1 day
def _is_erc20(self, token_address: str) -> bool:
try:
token = Token.objects.get(address=token_address)
return token.is_erc20()
except Token.DoesNotExist:
try:
decimals = self.ethereum_client.erc20.get_decimals(token_address)
return decimals is not None
except (ValueError, BadFunctionCallOutput, DecodingError):
return False

def _process_decoded_element(self, event: EventData) -> EventData:
"""
:param event: Be careful, it will be modified instead of copied
:return: The same event if it's a ERC20/ERC721. Tries to tell apart if it's not defined (`unknown` instead
of `value` or `tokenId`)
def _process_decoded_element(self, decoded_element: EventData) -> None:
"""
event_args = event["args"]
if "unknown" in event_args: # Not standard event
event_args["value"] = event_args.pop("unknown")
Not used as `process_elements` is redefined using custom processors
if self._is_erc20(event["address"]):
if "tokenId" in event_args:
event_args["value"] = event_args.pop("tokenId")
else:
if "value" in event_args:
event_args["tokenId"] = event_args.pop("value")
return event
:param decoded_element:
:return:
"""
pass

def events_to_erc20_transfer(
self, log_receipts: Sequence[EventData]
Expand Down Expand Up @@ -176,11 +158,18 @@ def process_elements(
logger.debug("End prefetching and storing of ethereum txs")

logger.debug("Storing TokenTransfer objects")
not_processed_log_receipts = [
log_receipt
for log_receipt in log_receipts
if self.mark_as_processed(log_receipt)
]
result_erc20 = ERC20Transfer.objects.bulk_create_from_generator(
self.events_to_erc20_transfer(log_receipts), ignore_conflicts=True
self.events_to_erc20_transfer(not_processed_log_receipts),
ignore_conflicts=True,
)
result_erc721 = ERC721Transfer.objects.bulk_create_from_generator(
self.events_to_erc721_transfer(log_receipts), ignore_conflicts=True
self.events_to_erc721_transfer(not_processed_log_receipts),
ignore_conflicts=True,
)
logger.debug("Stored TokenTransfer objects")
return range(
Expand Down
51 changes: 46 additions & 5 deletions safe_transaction_service/history/indexers/events_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from web3.exceptions import LogTopicError
from web3.types import EventData, FilterParams, LogReceipt

from safe_transaction_service.utils.utils import chunks
from safe_transaction_service.utils.utils import FixedSizeDict, chunks

from .ethereum_indexer import EthereumIndexer, FindRelevantElementsException

Expand Down Expand Up @@ -51,9 +51,43 @@ def __init__(self, *args, **kwargs):

# Number of concurrent requests to `getLogs`
self.get_logs_concurrency = settings.ETH_EVENTS_GET_LOGS_CONCURRENCY
self._processed_element_cache = FixedSizeDict(maxlen=40_000) # Around 3MiB

super().__init__(*args, **kwargs)

def mark_as_processed(self, log_receipt: LogReceipt) -> bool:
"""
Mark event as processed by the indexer
:param log_receipt:
:return: `True` if `event` was marked as processed, `False` if it was already processed
"""

# Calculate id, collision should be almost impossible
# Add blockHash to prevent reorg issues
block_hash = HexBytes(log_receipt["blockHash"])
tx_hash = HexBytes(log_receipt["transactionHash"])
log_index = log_receipt["logIndex"]
event_id = block_hash + tx_hash + HexBytes(log_index)

if event_id in self._processed_element_cache:
logger.debug(
"Event with tx-hash=%s log-index=%d on block=%s was already processed",
tx_hash.hex(),
log_index,
block_hash.hex(),
)
return False
else:
logger.debug(
"Marking event with tx-hash=%s log-index=%d on block=%s as processed",
tx_hash.hex(),
log_index,
block_hash.hex(),
)
self._processed_element_cache[event_id] = None
return True

@property
@abstractmethod
def contract_events(self) -> List[ContractEvent]:
Expand Down Expand Up @@ -229,18 +263,25 @@ def process_elements(self, log_receipts: Sequence[LogReceipt]) -> List[Any]:
if not log_receipts:
return []

decoded_elements: List[EventData] = self.decode_elements(log_receipts)
# Ignore already processed events
not_processed_log_receipts = [
log_receipt
for log_receipt in log_receipts
if self.mark_as_processed(log_receipt)
]
decoded_elements: List[EventData] = self.decode_elements(
not_processed_log_receipts
)
tx_hashes = OrderedDict.fromkeys(
[event["transactionHash"] for event in log_receipts]
[event["transactionHash"] for event in not_processed_log_receipts]
).keys()
logger.debug("Prefetching and storing %d ethereum txs", len(tx_hashes))
self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")
logger.debug("Processing %d decoded events", len(decoded_elements))
processed_elements = []
for decoded_element in decoded_elements:
processed_element = self._process_decoded_element(decoded_element)
if processed_element:
if processed_element := self._process_decoded_element(decoded_element):
processed_elements.append(processed_element)
logger.debug("End processing %d decoded events", len(decoded_elements))
return processed_elements
66 changes: 61 additions & 5 deletions safe_transaction_service/history/indexers/internal_tx_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.db import transaction

from eth_typing import HexStr
from hexbytes import HexBytes
from web3.types import ParityBlockTrace, ParityFilterTrace

from gnosis.eth import EthereumClient
Expand All @@ -14,7 +15,7 @@
CannotDecode,
get_safe_tx_decoder,
)
from safe_transaction_service.utils.utils import chunks
from safe_transaction_service.utils.utils import FixedSizeDict, chunks

from ..models import InternalTx, InternalTxDecoded, MonitoredAddress, SafeMasterCopy
from .ethereum_indexer import EthereumIndexer, FindRelevantElementsException
Expand Down Expand Up @@ -56,6 +57,38 @@ def __init__(self, *args, **kwargs):
10 # Use `trace_block` for last `number_trace_blocks` blocks indexing
)
self.tx_decoder = get_safe_tx_decoder()
self._processed_element_cache = FixedSizeDict(maxlen=40_000) # Around 3MiB

def mark_as_processed(
self, tx_hash: HexBytes, block_hash: Optional[HexBytes]
) -> bool:
"""
Mark a `tx_hash` as processed by the indexer
:param tx_hash:
:param block_hash:
:return: `True` if `tx_hash` was marked as processed, `False` if it was already processed
"""

tx_hash = HexBytes(tx_hash)
block_hash = HexBytes(block_hash or 0)
tx_id = tx_hash + block_hash

if tx_id in self._processed_element_cache:
logger.debug(
"Tx with tx-hash=%s on block=%s was already processed",
tx_hash.hex(),
block_hash.hex(),
)
return False
else:
logger.debug(
"Marking tx with tx-hash=%s on block=%s as processed",
tx_hash.hex(),
block_hash.hex(),
)
self._processed_element_cache[tx_id] = None
return True

@property
def database_field(self):
Expand Down Expand Up @@ -251,21 +284,44 @@ def trace_transactions(
def process_elements(
self, tx_hash_with_traces: OrderedDict[HexStr, Optional[ParityFilterTrace]]
) -> List[InternalTx]:
"""
:param tx_hash_with_traces:
:return: Inserted `InternalTx` objects
"""
# Prefetch ethereum txs
if not tx_hash_with_traces:
return []

# Copy as we might modify it
tx_hash_with_traces = dict(tx_hash_with_traces)

logger.debug(
"Prefetching and storing %d ethereum txs", len(tx_hash_with_traces)
)
tx_hashes = list(tx_hash_with_traces.keys())

tx_hashes = []
tx_hashes_missing_traces = []
for tx_hash in list(tx_hash_with_traces.keys()):
# Check if transactions have already been processed
# Provide block_hash if available as a mean to prevent reorgs
block_hash = (
tx_hash_with_traces[tx_hash][0]["blockHash"]
if tx_hash_with_traces[tx_hash]
else None
)
if self.mark_as_processed(tx_hash, block_hash):
tx_hashes.append(tx_hash)
# Traces can be already populated if using `trace_block`, but with `trace_filter`
# some traces will be missing and `trace_transaction` needs to be called
if not tx_hash_with_traces[tx_hash]:
tx_hashes_missing_traces.append(tx_hash)
else:
del tx_hash_with_traces[tx_hash]

ethereum_txs = self.index_service.txs_create_or_update_from_tx_hashes(tx_hashes)
logger.debug("End prefetching and storing of ethereum txs")

logger.debug("Prefetching of traces(internal txs)")
tx_hashes_missing_traces = [
tx_hash for tx_hash, trace in tx_hash_with_traces.items() if not trace
]
missing_traces_lists = self.trace_transactions(
tx_hashes_missing_traces, batch_size=self.trace_txs_batch_size
)
Expand Down
10 changes: 7 additions & 3 deletions safe_transaction_service/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -135,19 +135,23 @@ def bulk_create(
return result

def bulk_create_from_generator(
self, objs: Iterable[Any], batch_size: int = 100, ignore_conflicts: bool = False
self, objs: Iterator[Any], batch_size: int = 100, ignore_conflicts: bool = False
) -> int:
"""
Implementation in Django is not ok, as it will do `objs = list(objs)`. If objects come from a generator
they will be brought to RAM. This approach is more friendly
:return: Count of inserted elements
"""
assert batch_size is not None and batch_size > 0
iterator = iter(
objs
) # Make sure we are not slicing the same elements if a sequence is provided
total = 0
while True:
if inserted := len(
self.bulk_create(
islice(objs, batch_size), ignore_conflicts=ignore_conflicts
islice(iterator, batch_size), ignore_conflicts=ignore_conflicts
)
):
total += inserted
Expand Down
18 changes: 18 additions & 0 deletions safe_transaction_service/history/services/reorg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

from gnosis.eth import EthereumClient, EthereumClientProvider

from ..indexers import (
Erc20EventsIndexerProvider,
InternalTxIndexerProvider,
ProxyFactoryIndexerProvider,
SafeEventsIndexerProvider,
)
from ..models import EthereumBlock, IndexingStatus, ProxyFactory, SafeMasterCopy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,6 +65,14 @@ def __init__(
),
]

# Indexers to reset
self.indexer_providers = [
Erc20EventsIndexerProvider,
InternalTxIndexerProvider,
ProxyFactoryIndexerProvider,
SafeEventsIndexerProvider,
]

def check_reorgs(self) -> Optional[int]:
"""
:return: Number of the oldest block with reorg detected. `None` if not reorg found
Expand Down Expand Up @@ -95,6 +109,10 @@ def reset_all_to_block(self, block_number: int) -> int:
for reorg_function in self.reorg_functions:
updated += reorg_function(block_number)

# Reset indexer status and caches
for indexer_provider in self.indexer_providers:
indexer_provider.del_singleton()

return updated

@transaction.atomic
Expand Down
Loading

0 comments on commit 7f0bd55

Please sign in to comment.