Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Curve Prices API for pool volume #277

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions changelog.d/20231025_203748_nagakingg_curve_prices_volume.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Removed
-------
- Removed volume multiplier modes
- Removed pipelines.utils
- Removed PriceVolume.total_volumes()
- Removed volume from PoolDataCache
- Removed convex subgraph volume query
- Removed PoolDataCache

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention we've removed volume support for chains other Mainnet (pending future support from the curve-prices API) and sims for the RAI pool are no longer supported.

Added
-----
- Added Curve Prices API volume query (network.curve_prices)
- Added pool_data.get_pool_volume() to fetch historical pool volume from
Curve Prices API

Changed
-------
- Volume multipliers now computed individually for each asset pair
- Replaced pool_data.queries with folder
- Pool volume and volume limiting are only supported for Ethereum pools
pending updates to Curve Prices API

Deprecated
----------
- RAI3CRV pool is currently unsupported by simulation pipelines. It will
be reimplemented along with Stableswap-NG pools.

4 changes: 4 additions & 0 deletions curvesim/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class NetworkError(CurvesimException):
"""Error for network subpackage."""


class ApiResultError(NetworkError):
"""Raised when API results aren't as expected."""


class SimPoolError(CurvesimException):
"""Error in a SimPool operation."""

Expand Down
9 changes: 0 additions & 9 deletions curvesim/iterators/price_samplers/price_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,3 @@ def __iter__(self) -> Iterator[PriceVolumeSample]:
volumes = volumes.to_dict()

yield PriceVolumeSample(price_timestamp, prices, volumes) # type:ignore

def total_volumes(self):
"""
Returns
-------
pandas.Series
Total volume for each pairwise coin combination, summed accross timestamps.
"""
return self.volumes.sum().to_dict()
129 changes: 129 additions & 0 deletions curvesim/network/curve_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Network connector for Curve Prices API.
"""

from typing import List

from eth_utils import to_checksum_address
from pandas import DataFrame, to_datetime

from curvesim.exceptions import ApiResultError, CurvesimValueError

from .http import HTTP
from .utils import sync

URL = "https://prices.curve.fi/v1/"

CHAIN_ALIASES = {"mainnet": "ethereum"}


async def _get_pool_pair_volume(
pool_address,
main_token_address,
reference_token_address,
start_ts,
end_ts,
*,
chain="ethereum",
interval="day",
):
chain = _chain_from_alias(chain)
pool_address = to_checksum_address(pool_address)
main_token_address = to_checksum_address(main_token_address)
reference_token_address = to_checksum_address(reference_token_address)

url = URL + f"volume/{chain}/{pool_address}"
params = {
"main_token": main_token_address,
"reference_token": reference_token_address,
"start": start_ts,
"end": end_ts,
"interval": interval,
}
r = await HTTP.get(url, params=params)

try:
data = r["data"]
except KeyError as e:
raise ApiResultError(
"No historical volume returned for\n"
f"Pool: '{pool_address}', Chain: '{chain}',\n"
f"Tokens: (main: {main_token_address}, "
f"reference: {reference_token_address}),\n"
f"Timestamps: (start: {start_ts}, end: {end_ts})"
) from e

return data


async def get_pool_pair_volume(
pool_address: str,
main_token_address: str,
reference_token_address: str,
start_ts: int,
end_ts: int,
*,
chain: str = "ethereum",
interval: str = "day",
) -> DataFrame:
"""
Gets historical daily volume for a pair of coins traded in a Curve pool.

Parameters
----------
pool_address: str
The Curve pool's address.

main_token_address: str
Address for the token volume will be denominated in.

reference_token_address: str
Address for the second token in the trading pair.

start_ts: int
Posix timestamp (UTC) for start of query period.

end_ts: int
Posix timestamp (UTC) for end of query period.

chain: str, default "ethereum"
The pool's blockchain (note: currently only "ethereum" supported)

interval: str, default "day"
The sampling interval for the data. Available values: week, day, hour

Returns
-------
DataFrame
Rows: DateTimeIndex; Columns: volume, fees

"""
data: List[dict] = await _get_pool_pair_volume(
pool_address,
main_token_address,
reference_token_address,
start_ts,
end_ts,
chain=chain,
interval=interval,
)

df = DataFrame(data, columns=["timestamp", "volume", "fees"])
df["timestamp"] = to_datetime(df["timestamp"], unit="s")
df.set_index("timestamp", inplace=True)
return df


def _chain_from_alias(chain):
if chain in CHAIN_ALIASES: # pylint: disable=consider-using-get
chain = CHAIN_ALIASES[chain]

if chain != "ethereum":
raise CurvesimValueError(
"Curve Prices API currently only supports Ethereum chain."
)

return chain


get_pool_pair_volume_sync = sync(get_pool_pair_volume)
90 changes: 0 additions & 90 deletions curvesim/network/subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Network connector for subgraphs
"""

from asyncio import gather
from datetime import datetime, timedelta, timezone
from decimal import Decimal

Expand Down Expand Up @@ -161,91 +160,6 @@ async def symbol_address(symbol, chain, env="prod"):
return addr


async def _volume(address, chain, env, days=60, end=None):
if end is None:
t_end = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
else:
t_end = datetime.fromtimestamp(end, tz=timezone.utc)
logger.info("Volume end date: %s", t_end)
t_start = t_end - timedelta(days=days)

# pylint: disable=consider-using-f-string
q = """
{
swapVolumeSnapshots(
orderBy: timestamp,
orderDirection: desc,
where:
{
pool: "%s"
period: "86400"
timestamp_gte: %d
timestamp_lt: %d
}
)
{
volume
timestamp
}
}
""" % (
address.lower(),
int(t_start.timestamp()),
int(t_end.timestamp()),
)

data = await convex(chain, q, env)
snapshots = data["swapVolumeSnapshots"]
num_snapshots = len(snapshots)

if num_snapshots < days:
logger.warning("Only %s/%s days of pool volume returned.", num_snapshots, days)

return snapshots


async def volume(addresses, chain, env="prod", days=60, end=None):
"""
Retrieves historical volume for a pool or multiple pools.

Parameters
----------
addresses : str or iterable of str
The pool address(es).

chain : str
The blockchain the pool or pools are on.

days : int, default=60
Number of days to fetch data for.

Returns
-------
list of float
A list of total volumes for each day.

"""
if isinstance(addresses, str):
r = await _volume(addresses, chain, env, days=days, end=end)
vol = [float(e["volume"]) for e in r]

else:
tasks = []
for addr in addresses:
tasks.append(_volume(addr, chain, env, days=days, end=end))

r = await gather(*tasks)

vol = []
for _r in r:
_vol = [float(e["volume"]) for e in _r]
vol.append(_vol)

return vol


async def _pool_snapshot(address, chain, env, end_ts=None):
if not end_ts:
end_date = datetime.now(timezone.utc)
Expand Down Expand Up @@ -453,7 +367,6 @@ async def pool_snapshot(address, chain, env="prod", end_ts=None):

convex_sync = sync(convex)
symbol_address_sync = sync(symbol_address)
volume_sync = sync(volume)
pool_snapshot_sync = sync(pool_snapshot)


Expand Down Expand Up @@ -564,6 +477,3 @@ async def redemption_prices(
print("Symbol:", symbol)
address = symbol_address_sync(symbol, chain)
print("Address:", address)
_volume_sync = sync(_volume)
volumes = _volume_sync(address, chain, days=2)
print("Volumes:", volumes)
Loading