Skip to content

Commit

Permalink
Merge pull request #277 from curveresearch/curve-prices-volume
Browse files Browse the repository at this point in the history
Use Curve Prices API for pool volume
  • Loading branch information
nagakingg authored Nov 1, 2023
2 parents cc095b0 + ac070fe commit 71c5518
Show file tree
Hide file tree
Showing 29 changed files with 462 additions and 493 deletions.
27 changes: 27 additions & 0 deletions changelog.d/20231025_203748_nagakingg_curve_prices_volume.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Removed
-------
- Removed volume multiplier modes
- Removed pipelines.utils
- Removed PriceVolume.total_volumes()
- Removed volume from PoolDataCache
- Removed convex subgraph volume query
- Removed PoolDataCache

Added
-----
- Added Curve Prices API volume query (network.curve_prices)
- Added pool_data.get_pool_volume() to fetch historical pool volume from
Curve Prices API

Changed
-------
- Volume multipliers now computed individually for each asset pair
- Replaced pool_data.queries with folder
- Pool volume and volume limiting are only supported for Ethereum pools
pending updates to Curve Prices API

Deprecated
----------
- RAI3CRV pool is currently unsupported by simulation pipelines. It will
be reimplemented along with Stableswap-NG pools.

4 changes: 4 additions & 0 deletions curvesim/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class NetworkError(CurvesimException):
"""Error for network subpackage."""


class ApiResultError(NetworkError):
"""Raised when API results aren't as expected."""


class SimPoolError(CurvesimException):
"""Error in a SimPool operation."""

Expand Down
9 changes: 0 additions & 9 deletions curvesim/iterators/price_samplers/price_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,3 @@ def __iter__(self) -> Iterator[PriceVolumeSample]:
volumes = volumes.to_dict()

yield PriceVolumeSample(price_timestamp, prices, volumes) # type:ignore

def total_volumes(self):
"""
Returns
-------
pandas.Series
Total volume for each pairwise coin combination, summed accross timestamps.
"""
return self.volumes.sum().to_dict()
129 changes: 129 additions & 0 deletions curvesim/network/curve_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Network connector for Curve Prices API.
"""

from typing import List

from eth_utils import to_checksum_address
from pandas import DataFrame, to_datetime

from curvesim.exceptions import ApiResultError, CurvesimValueError

from .http import HTTP
from .utils import sync

URL = "https://prices.curve.fi/v1/"

CHAIN_ALIASES = {"mainnet": "ethereum"}


async def _get_pool_pair_volume(
pool_address,
main_token_address,
reference_token_address,
start_ts,
end_ts,
*,
chain="ethereum",
interval="day",
):
chain = _chain_from_alias(chain)
pool_address = to_checksum_address(pool_address)
main_token_address = to_checksum_address(main_token_address)
reference_token_address = to_checksum_address(reference_token_address)

url = URL + f"volume/{chain}/{pool_address}"
params = {
"main_token": main_token_address,
"reference_token": reference_token_address,
"start": start_ts,
"end": end_ts,
"interval": interval,
}
r = await HTTP.get(url, params=params)

try:
data = r["data"]
except KeyError as e:
raise ApiResultError(
"No historical volume returned for\n"
f"Pool: '{pool_address}', Chain: '{chain}',\n"
f"Tokens: (main: {main_token_address}, "
f"reference: {reference_token_address}),\n"
f"Timestamps: (start: {start_ts}, end: {end_ts})"
) from e

return data


async def get_pool_pair_volume(
pool_address: str,
main_token_address: str,
reference_token_address: str,
start_ts: int,
end_ts: int,
*,
chain: str = "ethereum",
interval: str = "day",
) -> DataFrame:
"""
Gets historical daily volume for a pair of coins traded in a Curve pool.
Parameters
----------
pool_address: str
The Curve pool's address.
main_token_address: str
Address for the token volume will be denominated in.
reference_token_address: str
Address for the second token in the trading pair.
start_ts: int
Posix timestamp (UTC) for start of query period.
end_ts: int
Posix timestamp (UTC) for end of query period.
chain: str, default "ethereum"
The pool's blockchain (note: currently only "ethereum" supported)
interval: str, default "day"
The sampling interval for the data. Available values: week, day, hour
Returns
-------
DataFrame
Rows: DateTimeIndex; Columns: volume, fees
"""
data: List[dict] = await _get_pool_pair_volume(
pool_address,
main_token_address,
reference_token_address,
start_ts,
end_ts,
chain=chain,
interval=interval,
)

df = DataFrame(data, columns=["timestamp", "volume", "fees"])
df["timestamp"] = to_datetime(df["timestamp"], unit="s")
df.set_index("timestamp", inplace=True)
return df


def _chain_from_alias(chain):
if chain in CHAIN_ALIASES: # pylint: disable=consider-using-get
chain = CHAIN_ALIASES[chain]

if chain != "ethereum":
raise CurvesimValueError(
"Curve Prices API currently only supports Ethereum chain."
)

return chain


get_pool_pair_volume_sync = sync(get_pool_pair_volume)
90 changes: 0 additions & 90 deletions curvesim/network/subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Network connector for subgraphs
"""

from asyncio import gather
from datetime import datetime, timedelta, timezone
from decimal import Decimal

Expand Down Expand Up @@ -161,91 +160,6 @@ async def symbol_address(symbol, chain, env="prod"):
return addr


async def _volume(address, chain, env, days=60, end=None):
if end is None:
t_end = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
else:
t_end = datetime.fromtimestamp(end, tz=timezone.utc)
logger.info("Volume end date: %s", t_end)
t_start = t_end - timedelta(days=days)

# pylint: disable=consider-using-f-string
q = """
{
swapVolumeSnapshots(
orderBy: timestamp,
orderDirection: desc,
where:
{
pool: "%s"
period: "86400"
timestamp_gte: %d
timestamp_lt: %d
}
)
{
volume
timestamp
}
}
""" % (
address.lower(),
int(t_start.timestamp()),
int(t_end.timestamp()),
)

data = await convex(chain, q, env)
snapshots = data["swapVolumeSnapshots"]
num_snapshots = len(snapshots)

if num_snapshots < days:
logger.warning("Only %s/%s days of pool volume returned.", num_snapshots, days)

return snapshots


async def volume(addresses, chain, env="prod", days=60, end=None):
"""
Retrieves historical volume for a pool or multiple pools.
Parameters
----------
addresses : str or iterable of str
The pool address(es).
chain : str
The blockchain the pool or pools are on.
days : int, default=60
Number of days to fetch data for.
Returns
-------
list of float
A list of total volumes for each day.
"""
if isinstance(addresses, str):
r = await _volume(addresses, chain, env, days=days, end=end)
vol = [float(e["volume"]) for e in r]

else:
tasks = []
for addr in addresses:
tasks.append(_volume(addr, chain, env, days=days, end=end))

r = await gather(*tasks)

vol = []
for _r in r:
_vol = [float(e["volume"]) for e in _r]
vol.append(_vol)

return vol


async def _pool_snapshot(address, chain, env, end_ts=None):
if not end_ts:
end_date = datetime.now(timezone.utc)
Expand Down Expand Up @@ -453,7 +367,6 @@ async def pool_snapshot(address, chain, env="prod", end_ts=None):

convex_sync = sync(convex)
symbol_address_sync = sync(symbol_address)
volume_sync = sync(volume)
pool_snapshot_sync = sync(pool_snapshot)


Expand Down Expand Up @@ -564,6 +477,3 @@ async def redemption_prices(
print("Symbol:", symbol)
address = symbol_address_sync(symbol, chain)
print("Address:", address)
_volume_sync = sync(_volume)
volumes = _volume_sync(address, chain, days=2)
print("Volumes:", volumes)
Loading

0 comments on commit 71c5518

Please sign in to comment.