diff --git a/curvesim/exceptions/__init__.py b/curvesim/exceptions/__init__.py index a8609945b..eff3c7503 100644 --- a/curvesim/exceptions/__init__.py +++ b/curvesim/exceptions/__init__.py @@ -27,6 +27,10 @@ def __repr__(self): return f"HttpClientError({self.status}, {self.message}, url={self.url})" +class CurvesimTypeError(CurvesimException, TypeError): + """Raised when an argument is the wrong type.""" + + class CurvesimValueError(CurvesimException, ValueError): """Raised when an argument has an inappropriate value (but the right type).""" diff --git a/curvesim/iterators/price_samplers/price_volume.py b/curvesim/iterators/price_samplers/price_volume.py index b665dd9fe..88e66edff 100644 --- a/curvesim/iterators/price_samplers/price_volume.py +++ b/curvesim/iterators/price_samplers/price_volume.py @@ -1,7 +1,6 @@ from typing import Iterator from curvesim.logging import get_logger -from curvesim.price_data import get_price_data from curvesim.templates.price_samplers import PriceSample, PriceSampler from curvesim.utils import dataclass, override @@ -29,15 +28,7 @@ class PriceVolume(PriceSampler): An iterator that retrieves price/volume and iterates over timepoints in the data. """ - def __init__( - self, - assets, - *, - days=60, - data_dir="data", - src="coingecko", - end=None, - ): + def __init__(self, data): """ Retrieves price/volume data and prepares it for iteration. @@ -57,17 +48,7 @@ def __init__( Identifies pricing source: coingecko or local. """ - prices, volumes, _ = get_price_data( - assets.addresses, - chain=assets.chain, - days=days, - data_dir=data_dir, - src=src, - end=end, - ) - - self.prices = prices.set_axis(assets.symbol_pairs, axis="columns") - self.volumes = volumes.set_axis(assets.symbol_pairs, axis="columns") + self.data = data @override def __iter__(self) -> Iterator[PriceVolumeSample]: @@ -76,16 +57,18 @@ def __iter__(self) -> Iterator[PriceVolumeSample]: ------- :class:`PriceVolumeSample` """ - for price_row, volume_row in zip( - self.prices.iterrows(), self.volumes.iterrows() - ): - price_timestamp, prices = price_row - volume_timestamp, volumes = volume_row - assert ( - price_timestamp == volume_timestamp - ), "Price/volume timestamps don't match" - - prices = prices.to_dict() - volumes = volumes.to_dict() - - yield PriceVolumeSample(price_timestamp, prices, volumes) # type:ignore + for row in self.data.iterrows(): + timestamp, row_data = row + + prices = row_data["price"].to_dict() + volumes = row_data["volume"].to_dict() + + yield PriceVolumeSample(timestamp, prices, volumes) # type:ignore + + @property + def prices(self): + return self.data["price"] + + @property + def volumes(self): + return self.data["volume"] diff --git a/curvesim/metrics/base.py b/curvesim/metrics/base.py index f61e734aa..47d18848b 100644 --- a/curvesim/metrics/base.py +++ b/curvesim/metrics/base.py @@ -312,8 +312,6 @@ def __init__(self, coin_names, **kwargs): Symbols for the coins used in a simulation. A numeraire is selected from the specified coins. """ - - self.coin_names = coin_names self.numeraire = get_numeraire(coin_names) super().__init__(**kwargs) @@ -348,9 +346,6 @@ def get_market_price(self, base, quote, prices): return prices[(base, quote)] -pandas_coin_pair_attr = {DataFrame: "columns", Series: "index"} - - def get_coin_pairs(prices): """ Returns the coin pairs available in the price data. @@ -418,4 +413,4 @@ def __init__(self, pool, **kwargs): :func:`pool_config` and stored as :python:`self._pool` for access during metric computations. Number and names of coins derived from pool metadata. """ - super().__init__(pool.assets.symbols, pool=pool) + super().__init__(pool.asset_names, pool=pool) diff --git a/curvesim/metrics/metrics.py b/curvesim/metrics/metrics.py index 9e421f9f7..e12fb8e42 100644 --- a/curvesim/metrics/metrics.py +++ b/curvesim/metrics/metrics.py @@ -88,7 +88,7 @@ def config(self): } def __init__(self, pool, **kwargs): - super().__init__(pool.assets.symbols) + super().__init__(pool.asset_names) def compute_arb_metrics(self, **kwargs): """Computes all metrics for each timestamp in an individual run.""" @@ -218,7 +218,7 @@ def get_stableswap_metapool_volume(self, **kwargs): """ trade_data = kwargs["trade_data"] - meta_asset = self._pool.assets.symbols[0] + meta_asset = self._pool.asset_names[0] def per_timestamp_function(trade_data): volume = 0 diff --git a/curvesim/pipelines/common/__init__.py b/curvesim/pipelines/common/__init__.py index ce0110541..33b1329c0 100644 --- a/curvesim/pipelines/common/__init__.py +++ b/curvesim/pipelines/common/__init__.py @@ -1,6 +1,7 @@ """ Contains variables and functions common to the arbitrage pipelines. """ +__all__ = ["DEFAULT_METRICS", "get_arb_trades", "get_asset_data", "get_pool_data"] from scipy.optimize import root_scalar @@ -8,6 +9,9 @@ from curvesim.metrics import metrics as Metrics from curvesim.templates.trader import ArbTrade +from .get_asset_data import get_asset_data +from .get_pool_data import get_pool_data + logger = get_logger(__name__) DEFAULT_METRICS = [ Metrics.Timestamp, diff --git a/curvesim/pipelines/common/get_asset_data.py b/curvesim/pipelines/common/get_asset_data.py new file mode 100644 index 000000000..bf5eda755 --- /dev/null +++ b/curvesim/pipelines/common/get_asset_data.py @@ -0,0 +1,22 @@ +from datetime import datetime, timedelta, timezone + +from curvesim.pool_data import get_pool_assets +from curvesim.price_data import get_price_data +from curvesim.templates import DateTimeSequence + + +def get_asset_data(pool_metadata, time_sequence, data_source): + sim_assets = get_pool_assets(pool_metadata) + time_sequence = time_sequence or _make_default_time_sequence() + asset_data = get_price_data( + sim_assets, time_sequence, data_source=data_source + ) # allow DataSource to be passed + return asset_data, time_sequence + + +def _make_default_time_sequence(): + t_end = datetime.now(timezone.utc) - timedelta(days=1) + t_end = t_end.replace(hour=23, minute=0, second=0, microsecond=0) + t_start = t_end - timedelta(days=60) + timedelta(hours=1) + time_sequence = DateTimeSequence.from_range(start=t_start, end=t_end, freq="1h") + return time_sequence diff --git a/curvesim/pipelines/common/get_pool_data.py b/curvesim/pipelines/common/get_pool_data.py new file mode 100644 index 000000000..ef7854b80 --- /dev/null +++ b/curvesim/pipelines/common/get_pool_data.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from curvesim.exceptions import CurvesimTypeError +from curvesim.pool import get_sim_pool +from curvesim.pool_data import get_metadata +from curvesim.pool_data.metadata import PoolMetaDataInterface + + +def get_pool_data(metadata_or_address, chain, env, pool_ts): + pool_metadata = _parse_metadata_or_address(metadata_or_address, chain) + end_ts = _parse_timestamp(pool_ts) + pool = get_sim_pool(pool_metadata, env=env, end_ts=end_ts) + + return pool, pool_metadata + + +def _parse_timestamp(timestamp) -> int: + if not timestamp: + return timestamp + + if isinstance(timestamp, datetime): + timestamp = int(timestamp.timestamp()) + + if not isinstance(timestamp, int): + _type = type(timestamp).__name__ + raise CurvesimTypeError(f"'Pool_ts' must be 'int' or 'timestamp', not {_type}.") + + return timestamp + + +def _parse_metadata_or_address(metadata_or_address, chain): + if isinstance(metadata_or_address, str): + pool_metadata: PoolMetaDataInterface = get_metadata(metadata_or_address, chain) + + elif isinstance(metadata_or_address, PoolMetaDataInterface): + pool_metadata = metadata_or_address + + else: + _type = type(metadata_or_address).__name__ + raise CurvesimTypeError( + f"'Metadata_or_address' must be 'PoolMetaDataInterface' or 'str', not {_type}." + ) + + return pool_metadata diff --git a/curvesim/pipelines/simple/__init__.py b/curvesim/pipelines/simple/__init__.py index 64307c4e8..a4fcaad9b 100644 --- a/curvesim/pipelines/simple/__init__.py +++ b/curvesim/pipelines/simple/__init__.py @@ -11,21 +11,19 @@ from curvesim.metrics.results import make_results from curvesim.pipelines import run_pipeline from curvesim.pipelines.simple.strategy import SimpleStrategy -from curvesim.pool import get_sim_pool -from ..common import DEFAULT_METRICS +from ..common import DEFAULT_METRICS, get_asset_data, get_pool_data def pipeline( # pylint: disable=too-many-locals - pool_address, - chain, + metadata_or_address, *, + chain="mainnet", variable_params=None, fixed_params=None, - end_ts=None, - days=60, src="coingecko", - data_dir="data", + time_sequence=None, + pool_ts=None, ncpu=None, env="prod", ): @@ -84,15 +82,12 @@ def pipeline( # pylint: disable=too-many-locals """ ncpu = ncpu or os.cpu_count() - pool = get_sim_pool(pool_address, chain, env=env, end_ts=end_ts) - - sim_assets = pool.assets - price_sampler = PriceVolume( - sim_assets, days=days, end=end_ts, data_dir=data_dir, src=src - ) + pool, pool_metadata = get_pool_data(metadata_or_address, chain, env, pool_ts) + asset_data, _ = get_asset_data(pool_metadata, time_sequence, src) # pylint: disable-next=abstract-class-instantiated param_sampler = ParameterizedPoolIterator(pool, variable_params, fixed_params) + price_sampler = PriceVolume(asset_data) _metrics = init_metrics(DEFAULT_METRICS, pool=pool) strategy = SimpleStrategy(_metrics) diff --git a/curvesim/pipelines/simple/__main__.py b/curvesim/pipelines/simple/__main__.py index 7de769d56..e2d233d99 100644 --- a/curvesim/pipelines/simple/__main__.py +++ b/curvesim/pipelines/simple/__main__.py @@ -3,4 +3,4 @@ if __name__ == "__main__": pool_address = "0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7" chain = "mainnet" - results = pipeline(pool_address, chain, ncpu=1) + results = pipeline(pool_address, chain=chain, ncpu=1) diff --git a/curvesim/pipelines/vol_limited_arb/__init__.py b/curvesim/pipelines/vol_limited_arb/__init__.py index e7b03b09e..99a5fdfdc 100644 --- a/curvesim/pipelines/vol_limited_arb/__init__.py +++ b/curvesim/pipelines/vol_limited_arb/__init__.py @@ -8,11 +8,10 @@ from curvesim.iterators.price_samplers import PriceVolume from curvesim.logging import get_logger from curvesim.metrics import init_metrics, make_results -from curvesim.pool import get_sim_pool from curvesim.pool_data import get_pool_volume from .. import run_pipeline -from ..common import DEFAULT_METRICS +from ..common import DEFAULT_METRICS, get_asset_data, get_pool_data from .strategy import VolumeLimitedStrategy logger = get_logger(__name__) @@ -20,17 +19,18 @@ # pylint: disable-next=too-many-locals def pipeline( - pool_metadata, + metadata_or_address, *, + chain="mainnet", variable_params=None, fixed_params=None, metrics=None, - days=60, src="coingecko", - data_dir="data", + time_sequence=None, vol_mult=None, + pool_ts=None, ncpu=None, - end=None, + env="prod", ): """ Implements the volume-limited arbitrage pipeline. @@ -91,16 +91,17 @@ def pipeline( cpu_count = os.cpu_count() ncpu = cpu_count if cpu_count is not None else 1 - pool = get_sim_pool(pool_metadata) + pool, pool_metadata = get_pool_data(metadata_or_address, chain, env, pool_ts) + asset_data, time_sequence = get_asset_data(pool_metadata, time_sequence, src) # pylint: disable-next=abstract-class-instantiated param_sampler = ParameterizedPoolIterator(pool, variable_params, fixed_params) - price_sampler = PriceVolume( - pool.assets, days=days, data_dir=data_dir, src=src, end=end - ) + price_sampler = PriceVolume(asset_data) if vol_mult is None: - pool_volume = get_pool_volume(pool_metadata, days=days, end=end) + pool_volume = get_pool_volume( + pool_metadata, time_sequence[0], time_sequence[-1] + ) vol_mult = pool_volume.sum() / price_sampler.volumes.sum() logger.info("Volume Multipliers:\n%s", vol_mult.to_string()) vol_mult = vol_mult.to_dict() diff --git a/curvesim/pool_data/queries/pool_assets.py b/curvesim/pool_data/queries/pool_assets.py index 0465645cf..085b68f1e 100644 --- a/curvesim/pool_data/queries/pool_assets.py +++ b/curvesim/pool_data/queries/pool_assets.py @@ -3,7 +3,7 @@ from curvesim.constants import Chain from curvesim.pool_data.metadata import PoolMetaDataInterface from curvesim.pool_data.queries.metadata import get_metadata -from curvesim.templates.sim_asset import OnChainAsset, OnChainAssetPair +from curvesim.templates import OnChainAsset, OnChainAssetPair from curvesim.utils import get_pairs diff --git a/curvesim/pool_data/queries/pool_volume.py b/curvesim/pool_data/queries/pool_volume.py index ae2a5aa7e..103ad61e2 100644 --- a/curvesim/pool_data/queries/pool_volume.py +++ b/curvesim/pool_data/queries/pool_volume.py @@ -2,9 +2,9 @@ Functions to get historical volume for Curve pools. """ -from datetime import datetime, timezone +from datetime import datetime from math import comb -from typing import List, Optional, Tuple, Union +from typing import List, Tuple, Union from pandas import DataFrame, Series @@ -20,8 +20,8 @@ def get_pool_volume( metadata_or_address: Union[PoolMetaDataInterface, str], - days: int = 60, - end: Optional[int] = None, + start: Union[int, datetime], + end: Union[int, datetime], chain: Union[str, Chain] = "mainnet", ) -> DataFrame: """ @@ -56,7 +56,7 @@ def get_pool_volume( pool_metadata = metadata_or_address pair_data = _get_pair_data(pool_metadata) - start_ts, end_ts = _process_timestamps(days, end) + start_ts, end_ts = _process_timestamps(start, end) loop = get_event_loop() volumes: dict[Tuple[str, str], Series] = {} @@ -71,7 +71,7 @@ def get_pool_volume( ) volumes[pair_symbols] = data["volume"] - volume_df = _make_volume_df(volumes, days) + volume_df = _make_volume_df(volumes) return volume_df @@ -108,21 +108,19 @@ def _get_metapool_addresses(pool_metadata) -> List[str]: return [address_meta] * n_pairs_meta + [address_base] * n_pairs_base -def _process_timestamps(days, end) -> Tuple[int, int]: - end = end or int( - datetime.now(timezone.utc) - .replace(hour=0, minute=0, second=0, microsecond=0) - .timestamp() - ) - start = end - days * 86400 +def _process_timestamps(start, end) -> Tuple[int, int]: + if isinstance(start, datetime): + start = int(start.timestamp()) + + if isinstance(end, datetime): + end = int(end.timestamp()) + return start, end -def _make_volume_df(volumes, days) -> DataFrame: +def _make_volume_df(volumes) -> DataFrame: df = DataFrame(volumes) df.columns = df.columns.to_flat_index() - if len(df) > days: - df = df[-days:] logger.info("Days of volume returned:\n%s", df.count().to_string()) df.fillna(0, inplace=True) return df diff --git a/curvesim/price_data/data_sources/coingecko.py b/curvesim/price_data/data_sources/coingecko.py index e28351540..3a7d018d1 100644 --- a/curvesim/price_data/data_sources/coingecko.py +++ b/curvesim/price_data/data_sources/coingecko.py @@ -10,9 +10,12 @@ from curvesim.exceptions import DataSourceError from curvesim.logging import get_logger from curvesim.network.coingecko import coin_id_from_address_sync, get_prices_sync -from curvesim.templates.data_source import ApiDataSource -from curvesim.templates.sim_asset import OnChainAssetPair -from curvesim.templates.time_sequence import DateTimeSequence, TimeSequence +from curvesim.templates import ( + ApiDataSource, + DateTimeSequence, + OnChainAssetPair, + TimeSequence, +) logger = get_logger(__name__) @@ -33,7 +36,7 @@ def query( for asset in sim_asset: coingecko_id = coin_id_from_address_sync(asset.address, asset.chain) _data = get_prices_sync(coingecko_id, "USD", t_start, t_end) - _data = _reindex_to_time_sequence(_data, time_sequence) + _data = _reindex_to_time_sequence(_data, time_sequence, asset.id) data.append(_data) # divide prices: (usd/base) / (usd/quote) = quote/base @@ -76,13 +79,13 @@ def _validate_duration(duration, freq): """WIP""" -def _reindex_to_time_sequence(df, time_sequence): - # Use "nearest" because CoinGecko timestamps usually delayed 1-5 minutes +def _reindex_to_time_sequence(df, time_sequence, asset_id): + # Use "nearest" because CoinGecko timestamps usually slightly delayed df_reindexed = df.reindex(time_sequence, method="nearest", tolerance="10T") nan_count = df_reindexed.isna().sum() if any(nan_count > 0): - logger.info("Filling NaN values:\n%s", nan_count.to_string()) + logger.info("Filling %s NaN values:\n%s", asset_id, nan_count.to_string()) df_reindexed["price"].ffill(inplace=True) df_reindexed["volume"].fillna(0, inplace=True) diff --git a/curvesim/templates/__init__.py b/curvesim/templates/__init__.py index f86bd545c..1b6b4d106 100644 --- a/curvesim/templates/__init__.py +++ b/curvesim/templates/__init__.py @@ -6,22 +6,27 @@ "ApiDataSource", "FileDataSource", "Log", - "Trader", - "Strategy", + "ParameterSampler", + "PriceSample", + "PriceSampler", + "OnChainAsset", + "OnChainAssetPair", "SimAsset", "SimPool", + "Strategy", + "DateTimeSequence", + "TimeSequence", "Trade", + "Trader", "TradeResult", - "ParameterSampler", - "PriceSample", - "PriceSampler", ] from .data_source import ApiDataSource, FileDataSource from .log import Log from .param_samplers import ParameterSampler from .price_samplers import PriceSample, PriceSampler -from .sim_asset import SimAsset +from .sim_asset import OnChainAsset, OnChainAssetPair, SimAsset from .sim_pool import SimPool from .strategy import Strategy +from .time_sequence import DateTimeSequence, TimeSequence from .trader import Trade, Trader, TradeResult diff --git a/curvesim/templates/sim_asset.py b/curvesim/templates/sim_asset.py index a028b1711..8adefb3d8 100644 --- a/curvesim/templates/sim_asset.py +++ b/curvesim/templates/sim_asset.py @@ -12,8 +12,6 @@ class SimAsset: @dataclass class OnChainAsset(SimAsset): - id: str - symbol: str address: str chain: Chain diff --git a/curvesim/templates/sim_pool.py b/curvesim/templates/sim_pool.py index 205b88df0..026e45063 100644 --- a/curvesim/templates/sim_pool.py +++ b/curvesim/templates/sim_pool.py @@ -136,16 +136,3 @@ def get_min_trade_size(self, coin_in): The minimal trade size """ raise NotImplementedError - - @property - @abstractmethod - def assets(self): - """ - Return :class:`.SimAssets` object with the properties of the pool's assets. - - Returns - ------- - SimAssets - SimAssets object that stores the properties of the pool's assets. - """ - raise NotImplementedError