diff --git a/curvesim/exceptions/__init__.py b/curvesim/exceptions/__init__.py index 6ad609ccb..a8609945b 100644 --- a/curvesim/exceptions/__init__.py +++ b/curvesim/exceptions/__init__.py @@ -77,3 +77,11 @@ class StateLogError(CurvesimException): class UnregisteredPoolError(StateLogError): """Error raised when a pool type is not recognized by the metrics framework.""" + + +class TimeSequenceError(CurvesimException): + """Error using a TimeSequence object.""" + + +class DataSourceError(CurvesimException): + """Error using a DataSource object.""" diff --git a/curvesim/iterators/price_samplers/price_volume.py b/curvesim/iterators/price_samplers/price_volume.py index b7728d4d5..b665dd9fe 100644 --- a/curvesim/iterators/price_samplers/price_volume.py +++ b/curvesim/iterators/price_samplers/price_volume.py @@ -1,7 +1,7 @@ from typing import Iterator from curvesim.logging import get_logger -from curvesim.price_data import get +from curvesim.price_data import get_price_data from curvesim.templates.price_samplers import PriceSample, PriceSampler from curvesim.utils import dataclass, override @@ -57,7 +57,7 @@ def __init__( Identifies pricing source: coingecko or local. """ - prices, volumes, _ = get( + prices, volumes, _ = get_price_data( assets.addresses, chain=assets.chain, days=days, diff --git a/curvesim/network/coingecko.py b/curvesim/network/coingecko.py index 554ac9530..615f35856 100644 --- a/curvesim/network/coingecko.py +++ b/curvesim/network/coingecko.py @@ -3,13 +3,9 @@ """ # pylint: disable=redefined-outer-name import asyncio -from datetime import datetime, timedelta, timezone -import numpy as np import pandas as pd -from curvesim.utils import get_pairs - from .http import HTTP from .utils import sync @@ -40,102 +36,14 @@ async def get_prices(coin_id, vs_currency, start, end): r = await _get_prices(coin_id, vs_currency, start, end) # Format data - data = pd.DataFrame(r["prices"], columns=["timestamp", "prices"]) - data = data.merge( - pd.DataFrame(r["total_volumes"], columns=["timestamp", "volumes"]) - ) + data = pd.DataFrame(r["prices"], columns=["timestamp", "price"]) + data = data.merge(pd.DataFrame(r["total_volumes"], columns=["timestamp", "volume"])) data["timestamp"] = pd.to_datetime(data["timestamp"], unit="ms", utc="True") data = data.set_index("timestamp") return data -async def _pool_prices(coins, vs_currency, days, end=None): - if end is not None: - # Times to reindex to: daily intervals - # Coingecko only allows daily data when more than 90 days in the past - # for the free REST endpoint - t_end = datetime.fromtimestamp(end, tz=timezone.utc) - t_start = t_end - timedelta(days=days + 1) - t_samples = pd.date_range(start=t_start, end=t_end, freq="1D", tz=timezone.utc) - else: - # Times to reindex to: hourly intervals starting on half hour mark - t_end = datetime.now(timezone.utc) - timedelta(days=1) - t_end = t_end.replace(hour=23, minute=30, second=0, microsecond=0) - t_start = t_end - timedelta(days=days + 1) - t_samples = pd.date_range(start=t_start, end=t_end, freq="60T", tz=timezone.utc) - end = t_end.timestamp() - - # Fetch data - tasks = [] - for coin in coins: - start = t_start.timestamp() - 86400 * 3 - tasks.append(get_prices(coin, vs_currency, start, end)) - - data = await asyncio.gather(*tasks) - - # Format data - qprices = [] - qvolumes = [] - for d in data: - d.drop(d.tail(1).index, inplace=True) # remove last row - d = d.reindex(t_samples, method="ffill") - qprices.append(d["prices"]) - qvolumes.append(d["volumes"]) - - qprices = pd.concat(qprices, axis=1) - qvolumes = pd.concat(qvolumes, axis=1) - qvolumes = qvolumes / np.array(qprices) - - return qprices, qvolumes - - -def pool_prices(coins, vs_currency, days, chain="mainnet", end=None): - """ - Pull price and volume data for given coins, quoted in given - quote currency for given days. - - Parameters - ---------- - coins: list of str - List of coin addresses. - vs_currency: str - Symbol for quote currency. - days: int - Number of days to pull data for. - - Returns - ------- - pair of pandas.Series - prices Series and volumes Series - """ - # Get data - coins = coin_ids_from_addresses_sync(coins, chain) - qprices, qvolumes = _pool_prices_sync(coins, vs_currency, days, end) - - # Compute prices by coin pairs - combos = get_pairs(len(coins)) - prices = [] - volumes = [] - - for pair in combos: - base_price = qprices.iloc[:, pair[0]] - base_volume = qvolumes.iloc[:, pair[0]] - - quote_price = qprices.iloc[:, pair[1]] - quote_volume = qvolumes.iloc[:, pair[1]] - - # divide prices: (usd/base) / (usd/quote) = quote/base - prices.append(base_price / quote_price) - # sum volumes and convert to base: usd / (usd/base) = base - volumes.append((base_volume + quote_volume) / base_price) - - prices = pd.concat(prices, axis=1) - volumes = pd.concat(volumes, axis=1) - - return prices, volumes - - async def _coin_id_from_address(address, chain): address = address.lower() chain = PLATFORMS[chain.lower()] @@ -163,11 +71,12 @@ async def coin_ids_from_addresses(addresses, chain): # Sync -_pool_prices_sync = sync(_pool_prices) +get_prices_sync = sync(get_prices) coin_ids_from_addresses_sync = sync(coin_ids_from_addresses) if __name__ == "__main__": + # TODO: update coin_addresses = [ "0x6B175474E89094C44Da98b954EedeAC495271d0F", "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", @@ -179,6 +88,6 @@ async def coin_ids_from_addresses(addresses, chain): vs_ccy = "USD" days = 1 - prices, volumes = pool_prices(coin_addresses, vs_ccy, days, chain) + prices, volumes = get_prices_sync(coin_addresses, vs_ccy, days, chain) print(prices.head()) print(volumes.head()) diff --git a/curvesim/pool/sim_interface/cryptoswap.py b/curvesim/pool/sim_interface/cryptoswap.py index 45ebb2443..ef3bc68a0 100644 --- a/curvesim/pool/sim_interface/cryptoswap.py +++ b/curvesim/pool/sim_interface/cryptoswap.py @@ -2,7 +2,6 @@ from math import prod from curvesim.exceptions import SimPoolError -from curvesim.templates import SimAssets from curvesim.templates.sim_pool import SimPool from curvesim.utils import cache, override @@ -213,17 +212,3 @@ def prepare_for_run(self, prices): self.virtual_price = self.get_virtual_price() self.xcp_profit = 10**18 self.xcp_profit_a = 10**18 - - @property - @override - @cache - def assets(self): - """ - Return :class:`.SimAssets` object with the properties of the pool's assets. - - Returns - ------- - SimAssets - SimAssets object that stores the properties of the pool's assets. - """ - return SimAssets(self.coin_names, self.coin_addresses, self.chain) diff --git a/curvesim/pool/sim_interface/metapool.py b/curvesim/pool/sim_interface/metapool.py index a9be3161c..c35eff1a4 100644 --- a/curvesim/pool/sim_interface/metapool.py +++ b/curvesim/pool/sim_interface/metapool.py @@ -1,5 +1,4 @@ from curvesim.exceptions import CurvesimValueError, SimPoolError -from curvesim.templates import SimAssets from curvesim.templates.sim_pool import SimPool from curvesim.utils import cache, override @@ -214,20 +213,3 @@ def get_min_trade_size(self, coin_in): The minimal trade size """ return 0 - - @property - @override - @cache - def assets(self): - """ - Return :class:`.SimAssets` object with the properties of the pool's assets. - - Returns - ------- - SimAssets - SimAssets object that stores the properties of the pool's assets. - """ - symbols = self.coin_names[:-1] + self.basepool.coin_names - addresses = self.coin_addresses[:-1] + self.basepool.coin_addresses - - return SimAssets(symbols, addresses, self.chain) diff --git a/curvesim/pool/sim_interface/pool.py b/curvesim/pool/sim_interface/pool.py index b1e73f564..51bcac605 100644 --- a/curvesim/pool/sim_interface/pool.py +++ b/curvesim/pool/sim_interface/pool.py @@ -1,5 +1,4 @@ from curvesim.exceptions import SimPoolError -from curvesim.templates import SimAssets from curvesim.templates.sim_pool import SimPool from curvesim.utils import cache, override @@ -141,17 +140,3 @@ def get_min_trade_size(self, coin_in): The minimal trade size """ return 0 - - @property - @override - @cache - def assets(self): - """ - Return :class:`.SimAssets` object with the properties of the pool's assets. - - Returns - ------- - SimAssets - SimAssets object that stores the properties of the pool's assets. - """ - return SimAssets(self.coin_names, self.coin_addresses, self.chain) diff --git a/curvesim/pool_data/__init__.py b/curvesim/pool_data/__init__.py index a00749a80..b6acb0fff 100644 --- a/curvesim/pool_data/__init__.py +++ b/curvesim/pool_data/__init__.py @@ -5,8 +5,9 @@ and 2-token cryptopools. """ -__all__ = ["get_metadata", "get_pool_volume"] +__all__ = ["get_metadata", "get_pool_assets", "get_pool_volume"] from .queries.metadata import get_metadata +from .queries.pool_assets import get_pool_assets from .queries.pool_volume import get_pool_volume diff --git a/curvesim/pool_data/queries/pool_assets.py b/curvesim/pool_data/queries/pool_assets.py new file mode 100644 index 000000000..20627eafe --- /dev/null +++ b/curvesim/pool_data/queries/pool_assets.py @@ -0,0 +1,27 @@ +from typing import List, Union + +from curvesim.constants import Chain +from curvesim.pool_data.metadata import PoolMetaDataInterface +from curvesim.pool_data.queries.metadata import get_metadata +from curvesim.templates.sim_asset import OnChainAssetPair +from curvesim.utils import get_pairs + + +def get_pool_assets( + metadata_or_address, chain: Union[str, Chain] = Chain.MAINNET +) -> List[OnChainAssetPair]: + if isinstance(metadata_or_address, str): + pool_metadata: PoolMetaDataInterface = get_metadata(metadata_or_address, chain) + else: + pool_metadata = metadata_or_address + + symbol_pairs = get_pairs(pool_metadata.coin_names) + address_pairs = get_pairs(pool_metadata.coins) + + sim_assets = [] + for symbols, addresses in zip(symbol_pairs, address_pairs): + id = "-".join(symbols) + asset = OnChainAssetPair(id, *symbols, *addresses, pool_metadata.chain) + sim_assets.append(asset) + + return sim_assets diff --git a/curvesim/price_data/__init__.py b/curvesim/price_data/__init__.py index c66625c5c..cf551f4ba 100644 --- a/curvesim/price_data/__init__.py +++ b/curvesim/price_data/__init__.py @@ -7,38 +7,30 @@ Nomics data is deprecated. """ + +from typing import List + from curvesim.exceptions import NetworkError +from curvesim.templates.sim_asset import SimAsset +from curvesim.templates.time_sequence import TimeSequence -from .sources import coingecko +from .data_sources import CoinGeckoPriceVolumeDataSource -def get( - coins, - chain="mainnet", - *, - days=60, - data_dir="data", - src="coingecko", - end=None, +def get_price_data( + sim_assets: List[SimAsset], + time_sequence: TimeSequence, + data_source="coingecko", ): """ - Pull price and volume data for given coins. + Pull price and volume data for each sim_asset. - Data is returned for all pairwise combinations of the input coins. Parameters ---------- - coins : list of str - List of coin addresses. - - days : int, default=60 - Number of days to pull data for. - - data_dir : str, default="data" - Directory to load local data from. + sim_assets: List[SimAsset] - src : str, default="coingecko" - Data source ("coingecko", "nomics", or "local"). + time_sequence: TimeSequence Returns @@ -49,17 +41,24 @@ def get( volumes : pandas.DataFrame Timestamped volumes for each pair of coins. - pzero : int or pandas.Series - Proportion of timestamps with zero volume. - """ - if src == "coingecko": - prices, volumes, pzero = coingecko(coins, chain=chain, days=days, end=end) - elif src == "nomics": + # Todo: replace this logic with SimAssetSeriesFactory + + if data_source == "coingecko": + data_source = CoinGeckoPriceVolumeDataSource() + + elif data_source == "nomics": raise NetworkError("Nomics data is no longer supported.") - elif src == "local": + elif data_source == "local": raise NetworkError("Local data currently not supported.") - return prices, volumes, pzero + prices = [] + volumes = [] + for sim_asset in sim_assets: + price, volume = data_source.query(sim_asset, time_sequence) + prices.append(price) + volumes.append(volume) + + return prices, volumes diff --git a/curvesim/price_data/data_sources/__init__.py b/curvesim/price_data/data_sources/__init__.py new file mode 100644 index 000000000..4642b9eab --- /dev/null +++ b/curvesim/price_data/data_sources/__init__.py @@ -0,0 +1,5 @@ +__all__ = ["CoinGeckoPriceVolumeDataSource", "FileDataSource"] + +from curvesim.templates.data_source import FileDataSource + +from .coingecko import CoinGeckoPriceVolumeDataSource diff --git a/curvesim/price_data/data_sources/coingecko.py b/curvesim/price_data/data_sources/coingecko.py new file mode 100644 index 000000000..b663a1312 --- /dev/null +++ b/curvesim/price_data/data_sources/coingecko.py @@ -0,0 +1,77 @@ +""" +Helper functions for the different data sources we pull from. +""" + + +from pandas import DataFrame + +from curvesim.exceptions import DataSourceError +from curvesim.logging import get_logger +from curvesim.network.coingecko import coin_ids_from_addresses_sync, get_prices_sync +from curvesim.templates.data_source import ApiDataSource +from curvesim.templates.sim_asset import OnChainAssetPair +from curvesim.templates.time_sequence import DateTimeSequence + +logger = get_logger(__name__) + + +class CoinGeckoPriceVolumeDataSource(ApiDataSource): + def query( + self, sim_asset: OnChainAssetPair, time_sequence: DateTimeSequence + ) -> DataFrame: + logger.info(f"Fetching CoinGecko price data for {sim_asset.id}...") + + _validate_arguments(sim_asset, time_sequence) + + buffer = 3600 + t_start = time_sequence[0].timestamp() - buffer + t_end = time_sequence[-1].timestamp() + buffer + + # _validate_duration(t_start-t_end, timesequence.freq) + + coin_ids = coin_ids_from_addresses_sync(sim_asset.address_pair, sim_asset.chain) + + vs_currency = "USD" + base_data = get_prices_sync(coin_ids[0], vs_currency, t_start, t_end) + quote_data = get_prices_sync(coin_ids[1], vs_currency, t_start, t_end) + + base_data = _reindex_to_time_sequence(base_data, time_sequence) + quote_data = _reindex_to_time_sequence(quote_data, time_sequence) + + # divide prices: (usd/base) / (usd/quote) = quote/base + # sum volumes and convert to base: usd / (usd/base) = base + prices = base_data["price"] / quote_data["price"] + volumes = (base_data["volume"] + quote_data["volume"]) / base_data["price"] + + return prices, volumes + + +def _validate_arguments(sim_asset, time_sequence): + if not isinstance(sim_asset, OnChainAssetPair): + type_name = type(sim_asset).__name__ + raise DataSourceError( + f"For CoinGecko, sim_asset must be 'OnChainAssetPair', not '{type_name}'." + ) + + if not isinstance(time_sequence, DateTimeSequence): + type_name = type(sim_asset).__name__ + raise DataSourceError( + f"For CoinGecko, time_sequence must be 'DateTimeSequence', not '{type_name}'." + ) + + +def _validate_duration(duration, freq): + """WIP""" + + +def _reindex_to_time_sequence(df, time_sequence): + # Use "nearest" because CoinGecko timestamps usually delayed 1-5 minutes + df_reindexed = df.reindex(time_sequence, method="nearest", tolerance="10T") + + nan_count = df_reindexed.isna().sum() + if any(nan_count > 0): + logger.info("Filling NaN values:\n" + nan_count.to_string()) + df_reindexed["price"].ffill(inplace=True) + df_reindexed["volume"].fillna(0, inplace=True) + + return df_reindexed diff --git a/curvesim/price_data/sources.py b/curvesim/price_data/sources.py deleted file mode 100644 index ad14516e6..000000000 --- a/curvesim/price_data/sources.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -Helper functions for the different data sources we pull from. -""" - -from curvesim.logging import get_logger -from curvesim.network import coingecko as _coingecko - -logger = get_logger(__name__) - - -def coingecko(coins, chain="mainnet", days=60, end=None): - """ - Fetch CoinGecko price data for specified coins. - - Parameters - ---------- - coins : list of str - List of coin symbols to fetch data for. - chain : str, optional - Blockchain network to consider. Default is "mainnet". - days : int, optional - Number of past days to fetch data for. Default is 60. - end : int, optional - End timestamp for the data in seconds since epoch. - If None, the end time will be the current time. Default is None. - - Returns - ------- - tuple of (dict, dict, int) - Tuple of prices, volumes, and pzero (fixed as 0 for this function). - """ - logger.info("Fetching CoinGecko price data...") - prices, volumes = _coingecko.pool_prices(coins, "usd", days, chain=chain, end=end) - pzero = 0 - - return prices, volumes, pzero diff --git a/curvesim/templates/__init__.py b/curvesim/templates/__init__.py index 15cb2ad44..cb88938bd 100644 --- a/curvesim/templates/__init__.py +++ b/curvesim/templates/__init__.py @@ -6,7 +6,7 @@ "Log", "Trader", "Strategy", - "SimAssets", + "SimAsset", "SimPool", "Trade", "TradeResult", @@ -18,7 +18,7 @@ from .log import Log from .param_samplers import ParameterSampler from .price_samplers import PriceSample, PriceSampler -from .sim_assets import SimAssets +from .sim_asset import SimAsset from .sim_pool import SimPool from .strategy import Strategy from .trader import Trade, Trader, TradeResult diff --git a/curvesim/templates/data_source.py b/curvesim/templates/data_source.py new file mode 100644 index 000000000..22da973c0 --- /dev/null +++ b/curvesim/templates/data_source.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod +from os import extsep +from os.path import join + +from pandas import DataFrame, read_csv + +from .sim_asset import SimAsset +from .time_sequence import TimeSequence + + +class DataSource(ABC): + @abstractmethod + def query(self, sim_asset: SimAsset, time_sequence: TimeSequence) -> DataFrame: + raise NotImplementedError + + +class ApiDataSource(DataSource): + """WIP -- anything needed here?""" + + +class FileDataSource(DataSource): + def __init__( + self, + dir: str = "", + filename_format: str = "{base}-{quote}", + file_extension: str = "csv", + ): + self.path = join(dir, filename_format + extsep + file_extension) + + def query(self, sim_asset: SimAsset, time_sequence: TimeSequence) -> DataFrame: + """WIP -- add other file types; resample to TimeSequence""" + path = self.path.format(base=SimAsset.base_symbol, quote=SimAsset.quote_symbol) + df = read_csv(path) + return df diff --git a/curvesim/templates/sim_asset.py b/curvesim/templates/sim_asset.py new file mode 100644 index 000000000..92c3d2bcf --- /dev/null +++ b/curvesim/templates/sim_asset.py @@ -0,0 +1,28 @@ +from curvesim.constants import Chain +from curvesim.utils import dataclass + + +@dataclass +class SimAsset: + id: str + + +@dataclass +class AssetPair(SimAsset): + base_symbol: str + quote_symbol: str + + @property + def symbol_pair(self): + return (self.base_symbol, self.quote_symbol) + + +@dataclass +class OnChainAssetPair(AssetPair): + base_address: str + quote_address: str + chain: Chain + + @property + def address_pair(self): + return (self.base_address, self.quote_address) diff --git a/curvesim/templates/sim_assets.py b/curvesim/templates/sim_assets.py deleted file mode 100644 index f701d58da..000000000 --- a/curvesim/templates/sim_assets.py +++ /dev/null @@ -1,18 +0,0 @@ -from curvesim.utils import get_pairs - - -# pylint: disable-next=too-few-public-methods -class SimAssets: - """ - Stores the properties of the assets to be used in a simulation. Currently, only - specific coins identified by their address/chain are supported. This will be - expanded to "abstract" assets (e.g., some function of multiple coins) in the near - future. - """ - - def __init__(self, symbols, addresses, chain): - self.symbols = symbols - self.addresses = addresses - self.chain = chain - self.symbol_pairs = get_pairs(symbols) - self.address_pairs = get_pairs(addresses) diff --git a/curvesim/templates/time_sequence.py b/curvesim/templates/time_sequence.py new file mode 100644 index 000000000..7d1ddc948 --- /dev/null +++ b/curvesim/templates/time_sequence.py @@ -0,0 +1,97 @@ +from collections.abc import Iterable +from datetime import datetime, timezone +from typing import Optional, Union + +from pandas import DateOffset, date_range +from pandas.tseries.frequencies import to_offset + +from curvesim.exceptions import TimeSequenceError + + +class TimeSequence: + """ + Time-like sequence generator. + Abstraction to encompass different ways of tracking "time", + useful for trading strategies involving a blockchain. + This could be timestamps, block times, block numbers, etc. + """ + + def __init__(self, sequence: Iterable): + _validate_sequence(sequence) + self._sequence = tuple(sequence) + + def __getitem__(self, index): + return self._sequence[index] + + def __iter__(self): + for time in self._sequence: + yield time + + def __repr__(self): + return f"<{self.__class__.__name__} start={self[0]} end={self[-1]}>" + + +class DateTimeSequence(TimeSequence): + """ + TimeSequence composed of datetimes. + """ + + def __init__( + self, + sequence: Iterable[datetime], + freq: Optional[Union[str, DateOffset]] = None, + ): + _validate_datetime_sequence(sequence) + super().__init__(sequence) + self.freq = to_offset(freq) + + @classmethod + def from_range( + cls, + start=None, + end=None, + periods=None, + freq=None, + tz=timezone.utc, + inclusive="both", + unit=None, + ): + """ + Instantiates a DateTimeSequence from a pandas date range. + The function signature is analogous to pandas.date_range. + """ + + times = date_range( + start=start, + end=end, + periods=periods, + freq=freq, + tz=tz, + inclusive=inclusive, + unit=unit, + ) + + return cls(times, freq=times.freq) + + # resample? + + +def _validate_sequence(times): + if not isinstance(times, Iterable) or isinstance(times, str): + type_name = type(times).__name__ + raise TimeSequenceError( + f"Input time sequence must be a non-string iterable, not '{type_name}'." + ) + + if sorted(times) != list(times): + raise TimeSequenceError("Input time sequence must be in ascending order.") + + if len(set(times)) != len(times): + raise TimeSequenceError("Input time sequence must not contain duplicates.") + + +def _validate_datetime_sequence(times): + if not all([isinstance(t, datetime) for t in times]): + raise TimeSequenceError( + "DateTimeSequence may only contain iterables of datetime.datetime." + )