diff --git a/cli/openbb_cli/controllers/base_platform_controller.py b/cli/openbb_cli/controllers/base_platform_controller.py index ed8f0087516a..116746e98445 100644 --- a/cli/openbb_cli/controllers/base_platform_controller.py +++ b/cli/openbb_cli/controllers/base_platform_controller.py @@ -6,6 +6,8 @@ from typing import Dict, List, Optional import pandas as pd +from anyio.from_thread import start_blocking_portal +from fastapi.responses import StreamingResponse from openbb import obb from openbb_charting.core.openbb_figure import OpenBBFigure from openbb_cli.argparse_translator.argparse_class_processor import ( @@ -201,6 +203,23 @@ def method(self, other_args: List[str], translator=translator): df = pd.DataFrame.from_dict(obbject, orient="columns") print_rich_table(df=df, show_index=True, title=title) + elif isinstance(obbject, StreamingResponse): + received_data = [] + + async def stream_data(obbject): + async for data in obbject.body_iterator: + received_data.append(data) + session.console.print(data) + + with start_blocking_portal() as portal: + try: + portal.start_task_soon(stream_data, obbject) + finally: + portal.call(portal.stop) + + df = pd.DataFrame(received_data) + print_rich_table(df=df, show_index=True, title=title) + elif not isinstance(obbject, OBBject): session.console.print(obbject) diff --git a/openbb_platform/core/openbb_core/api/router/commands.py b/openbb_platform/core/openbb_core/api/router/commands.py index ce1cab484a8c..41094ddb3ab7 100644 --- a/openbb_platform/core/openbb_core/api/router/commands.py +++ b/openbb_platform/core/openbb_core/api/router/commands.py @@ -3,9 +3,10 @@ import inspect from functools import partial, wraps from inspect import Parameter, Signature, signature -from typing import Any, Callable, Dict, Optional, Tuple, TypeVar +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, Union from fastapi import APIRouter, Depends, Header +from fastapi.responses import StreamingResponse from fastapi.routing import APIRoute from openbb_core.app.command_runner import CommandRunner from openbb_core.app.model.command_context import CommandContext @@ -188,7 +189,9 @@ def build_api_wrapper( func.__annotations__ = new_annotations_map @wraps(wrapped=func) - async def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> OBBject: + async def wrapper( + *args: Tuple[Any], **kwargs: Dict[str, Any] + ) -> Union[OBBject, StreamingResponse]: user_settings: UserSettings = UserSettings.model_validate( kwargs.pop( "__authenticated_user_settings", @@ -196,9 +199,15 @@ async def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> OBBject: ) ) execute = partial(command_runner.run, path, user_settings) - output: OBBject = await execute(*args, **kwargs) + output = await execute(*args, **kwargs) - return validate_output(output) + if route.openapi_extra.get("is_stream", False): + return output.results + + if isinstance(output, OBBject): + return validate_output(output) + + return output return wrapper diff --git a/openbb_platform/core/openbb_core/app/command_runner.py b/openbb_platform/core/openbb_core/app/command_runner.py index 89433e45156f..7a5df24dea0f 100644 --- a/openbb_platform/core/openbb_core/app/command_runner.py +++ b/openbb_platform/core/openbb_core/app/command_runner.py @@ -8,9 +8,10 @@ from inspect import Parameter, signature from sys import exc_info from time import perf_counter_ns -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from warnings import catch_warnings, showwarning, warn +from fastapi.responses import StreamingResponse from pydantic import BaseModel, ConfigDict, create_model from openbb_core.app.logs.logging_service import LoggingService @@ -420,7 +421,7 @@ async def run( /, *args, **kwargs, - ) -> OBBject: + ) -> Union[OBBject, StreamingResponse]: """Run a command and return the OBBject as output.""" timestamp = datetime.now() start_ns = perf_counter_ns() @@ -429,7 +430,7 @@ async def run( route = execution_context.route if func := command_map.get_command(route=route): - obbject = await cls._execute_func( + result = await cls._execute_func( route=route, args=args, # type: ignore execution_context=execution_context, @@ -442,19 +443,20 @@ async def run( duration = perf_counter_ns() - start_ns if execution_context.user_settings.preferences.metadata: - try: - obbject.extra["metadata"] = Metadata( - arguments=kwargs, - duration=duration, - route=route, - timestamp=timestamp, - ) - except Exception as e: - if Env().DEBUG_MODE: - raise OpenBBError(e) from e - warn(str(e), OpenBBWarning) + if isinstance(result, OBBject): + try: + result.extra["metadata"] = Metadata( + arguments=kwargs, + duration=duration, + route=route, + timestamp=timestamp, + ) + except Exception as e: + if Env().DEBUG_MODE: + raise OpenBBError(e) from e + warn(str(e), OpenBBWarning) - return obbject + return result class CommandRunner: diff --git a/openbb_platform/core/openbb_core/app/logs/handlers_manager.py b/openbb_platform/core/openbb_core/app/logs/handlers_manager.py index 2d9a0efb927d..4ad9f929d2e1 100644 --- a/openbb_platform/core/openbb_core/app/logs/handlers_manager.py +++ b/openbb_platform/core/openbb_core/app/logs/handlers_manager.py @@ -45,8 +45,11 @@ def _add_posthog_handler(self): def _add_stdout_handler(self): """Add a stdout handler.""" handler = logging.StreamHandler(sys.stdout) - formatter = FormatterWithExceptions(settings=self._settings) + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(name)s - %(message)s" + ) handler.setFormatter(formatter) + handler.setLevel(logging.DEBUG) logging.getLogger().addHandler(handler) def _add_stderr_handler(self): @@ -68,6 +71,7 @@ def _add_file_handler(self): handler = PathTrackingFileHandler(settings=self._settings) formatter = FormatterWithExceptions(settings=self._settings) handler.setFormatter(formatter) + handler.setLevel(logging.INFO) logging.getLogger().addHandler(handler) def update_handlers(self, settings: LoggingSettings): diff --git a/openbb_platform/core/openbb_core/app/router.py b/openbb_platform/core/openbb_core/app/router.py index 6185eea74b68..80232e2bbe55 100644 --- a/openbb_platform/core/openbb_core/app/router.py +++ b/openbb_platform/core/openbb_core/app/router.py @@ -19,6 +19,7 @@ ) from fastapi import APIRouter, Depends +from fastapi.responses import StreamingResponse from pydantic import BaseModel from pydantic.v1.validators import find_validators from typing_extensions import Annotated, ParamSpec, _AnnotatedAlias @@ -230,6 +231,26 @@ def __init__( self._description = description self._routers: Dict[str, Router] = {} + @overload + def stream(self, func: Callable[P, OBBject]) -> Callable[P, StreamingResponse]: + pass + + @overload + def stream(self, **kwargs) -> Callable: + pass + + def stream( + self, + func: Optional[Callable[P, OBBject]] = None, + **kwargs, + ) -> Optional[Callable]: + """Stream decorator for routes.""" + if func is None: + return lambda f: self.stream(f, **kwargs) + + kwargs["is_stream"] = True + return self.command(func, **kwargs) + @overload def command(self, func: Optional[Callable[P, OBBject]]) -> Callable[P, OBBject]: pass @@ -260,6 +281,8 @@ def command( examples=kwargs.pop("examples", []), providers=ProviderInterface().available_providers, ) + kwargs["openapi_extra"]["is_stream"] = kwargs.pop("is_stream", False) + kwargs["operation_id"] = kwargs.get( "operation_id", SignatureInspector.get_operation_id(func) ) @@ -349,7 +372,7 @@ class SignatureInspector: @classmethod def complete( - cls, func: Callable[P, OBBject], model: str + cls, func: Callable[P, OBBject], model: str, is_stream: bool = False ) -> Optional[Callable[P, OBBject]]: """Complete function signature.""" if isclass(return_type := func.__annotations__["return"]) and not issubclass( diff --git a/openbb_platform/core/openbb_core/app/static/package_builder.py b/openbb_platform/core/openbb_core/app/static/package_builder.py index 758f2be6f9ae..1f3e20f2920a 100644 --- a/openbb_platform/core/openbb_core/app/static/package_builder.py +++ b/openbb_platform/core/openbb_core/app/static/package_builder.py @@ -342,6 +342,7 @@ def get_path_hint_type_list(cls, path: str) -> List[Type]: if route: if route.deprecated: hint_type_list.append(type(route.summary.metadata)) + function_hint_type_list = cls.get_function_hint_type_list(func=route.endpoint) # type: ignore hint_type_list.extend(function_hint_type_list) @@ -1479,7 +1480,6 @@ def _get_provider_field_params( expanded_types[field], is_required, "website" ) field_type = f"Union[{field_type}, {expanded_type}]" - cleaned_description = ( str(field_info.description) .strip().replace("\n", " ").replace(" ", " ").replace('"', "'") @@ -1506,7 +1506,6 @@ def _get_provider_field_params( # Manually setting to List[] for multiple items # Should be removed if TYPE_EXPANSION is updated to include this field_type = f"Union[{field_type}, List[{field_type}]]" - default_value = "" if field_info.default is PydanticUndefined else field_info.default # fmt: skip provider_field_params.append( diff --git a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py index f2cc759821ab..386ea0420f87 100644 --- a/openbb_platform/core/openbb_core/provider/abstract/fetcher.py +++ b/openbb_platform/core/openbb_core/provider/abstract/fetcher.py @@ -5,6 +5,7 @@ from typing import ( Any, + AsyncIterator, Dict, Generic, Optional, @@ -53,6 +54,12 @@ def transform_query(params: Dict[str, Any]) -> Q: async def aextract_data(query: Q, credentials: Optional[Dict[str, str]]) -> Any: """Asynchronously extract the data from the provider.""" + @staticmethod + async def atransform_data( + query: Q, data: Any, **kwargs + ) -> Union[R, AnnotatedResult[R]]: + """Asynchronously transform the provider-specific data.""" + @staticmethod def extract_data(query: Q, credentials: Optional[Dict[str, str]]) -> Any: """Extract the data from the provider.""" @@ -60,7 +67,6 @@ def extract_data(query: Q, credentials: Optional[Dict[str, str]]) -> Any: @staticmethod def transform_data(query: Q, data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]: """Transform the provider-specific data.""" - raise NotImplementedError def __init_subclass__(cls, *args, **kwargs): """Initialize the subclass.""" @@ -75,6 +81,15 @@ def __init_subclass__(cls, *args, **kwargs): " default." ) + if cls.atransform_data != Fetcher.atransform_data: + cls.transform_data = cls.atransform_data + elif cls.transform_data == Fetcher.transform_data: + raise NotImplementedError( + "Fetcher subclass must implement either transform_data or atransform_data" + " method. If both are implemented, atransform_data will be used as the" + " default." + ) + @classmethod async def fetch_data( cls, @@ -89,6 +104,22 @@ async def fetch_data( ) return cls.transform_data(query=query, data=data, **kwargs) + @classmethod + async def stream_data( + cls, + params: Dict[str, Any], + credentials: Optional[Dict[str, str]] = None, + **kwargs, + ) -> Union[AsyncIterator[R], AsyncIterator[AnnotatedResult[R]]]: + """Fetch data from a provider.""" + query = cls.transform_query(params=params) + data = await maybe_coroutine( + cls.aextract_data, query=query, credentials=credentials, **kwargs + ) + transformed_data = cls.atransform_data(query=query, data=data, **kwargs) + async for d in transformed_data: + yield d + @classproperty def query_params_type(self) -> Q: """Get the type of query.""" diff --git a/openbb_platform/extensions/crypto/openbb_crypto/price/price_router.py b/openbb_platform/extensions/crypto/openbb_crypto/price/price_router.py index efce03361499..a64b2db89362 100644 --- a/openbb_platform/extensions/crypto/openbb_crypto/price/price_router.py +++ b/openbb_platform/extensions/crypto/openbb_crypto/price/price_router.py @@ -1,6 +1,7 @@ # pylint: disable=W0613:unused-argument """Crypto Price Router.""" +from fastapi.responses import StreamingResponse from openbb_core.app.model.command_context import CommandContext from openbb_core.app.model.example import APIEx from openbb_core.app.model.obbject import OBBject @@ -11,6 +12,9 @@ ) from openbb_core.app.query import Query from openbb_core.app.router import Router +from providers.binance.openbb_binance.models.crypto_historical import ( + BinanceCryptoHistoricalFetcher, +) router = Router(prefix="/price") @@ -56,3 +60,16 @@ async def historical( ) -> OBBject: """Get historical price data for cryptocurrency pair(s) within a provider.""" return await OBBject.from_query(Query(**locals())) + + +@router.stream(methods=["GET"]) +async def live(symbol: str = "ethbtc", lifetime: int = 10, tld: str = "us") -> OBBject: + """Connect to Binance WebSocket Crypto Price data feed.""" + generator = BinanceCryptoHistoricalFetcher().stream_data( + params={"symbol": symbol, "lifetime": lifetime, "tld": tld}, + credentials=None, + ) + return OBBject( + results=StreamingResponse(generator, media_type="application/x-ndjson"), + provider="binance", + ) diff --git a/openbb_platform/providers/binance/README.md b/openbb_platform/providers/binance/README.md new file mode 100644 index 000000000000..d451a17211a3 --- /dev/null +++ b/openbb_platform/providers/binance/README.md @@ -0,0 +1,14 @@ +# OpenBB Binance Provider + +This extension integrates the Binance data provider +into the OpenBB Platform. + +## Installation + +To install the extension, run the following command in this folder: + +```bash +pip install openbb-binance +``` + +Documentation available [here](https://docs.openbb.co/platform/development/contributing). diff --git a/openbb_platform/providers/binance/__init__.py b/openbb_platform/providers/binance/__init__.py new file mode 100644 index 000000000000..0415926b7318 --- /dev/null +++ b/openbb_platform/providers/binance/__init__.py @@ -0,0 +1 @@ +"""Binance provider.""" diff --git a/openbb_platform/providers/binance/openbb_binance/__init__.py b/openbb_platform/providers/binance/openbb_binance/__init__.py new file mode 100644 index 000000000000..573a044abdde --- /dev/null +++ b/openbb_platform/providers/binance/openbb_binance/__init__.py @@ -0,0 +1,22 @@ +"""Binance provider module.""" + +from openbb_core.provider.abstract.provider import Provider + +from providers.binance.openbb_binance.models.crypto_historical import ( + BinanceCryptoHistoricalFetcher, +) + +binance_provider = Provider( + name="binance", + website="https://api.binance.com", + description="""Binance is a cryptocurrency exchange that provides a platform for trading various cryptocurrencies. + + The Binance API features both REST and WebSocket endpoints for accessing historical and real-time data. + """, + # credentials=["api_key"], + fetcher_dict={ + "CryptoLive": BinanceCryptoHistoricalFetcher, + }, + repr_name="Binance", + instructions="", +) diff --git a/openbb_platform/providers/binance/openbb_binance/models/__init__.py b/openbb_platform/providers/binance/openbb_binance/models/__init__.py new file mode 100644 index 000000000000..1736531a79d6 --- /dev/null +++ b/openbb_platform/providers/binance/openbb_binance/models/__init__.py @@ -0,0 +1 @@ +"""Binance Provider models.""" diff --git a/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py new file mode 100644 index 000000000000..655c08a15c3a --- /dev/null +++ b/openbb_platform/providers/binance/openbb_binance/models/crypto_historical.py @@ -0,0 +1,101 @@ +"""Binance Crypto Historical WS Data.""" + +import json +import logging +from datetime import datetime, timedelta +from typing import Any, AsyncGenerator, AsyncIterator, Dict, Literal, Optional + +import websockets +from openbb_core.provider.abstract.fetcher import Fetcher +from openbb_core.provider.standard_models.crypto_historical import ( + CryptoHistoricalData, + CryptoHistoricalQueryParams, +) +from pydantic import Field + +# pylint: disable=unused-argument + +logger = logging.getLogger(__name__) + + +class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams): + """Binance Crypto Historical Query Params.""" + + tld: Optional[Literal["us", "com"]] = Field( + default="us", description="Top-level domain of the Binance endpoint." + ) + lifetime: Optional[int] = Field( + default=60, description="Lifetime of WebSocket in seconds." + ) + + +class BinanceCryptoHistoricalData(CryptoHistoricalData): + """Binance Crypto Historical Data.""" + + __alias_dict__ = { + "symbol": "s", + "close": "c", + "open": "o", + "high": "h", + "low": "l", + "volume": "v", + } + event_type: Optional[str] = Field( + default=None, + description="Event type", + alias="e", + ) + quote_asset_volume: Optional[str] = Field( + default=None, + description="Total traded quote asset volume", + alias="q", + ) + + +class BinanceCryptoHistoricalFetcher( + Fetcher[BinanceCryptoHistoricalQueryParams, BinanceCryptoHistoricalData] +): + """Define Binance Crypto Historical Fetcher.""" + + @staticmethod + def transform_query(params: Dict[str, Any]) -> BinanceCryptoHistoricalQueryParams: + """Transform the query params.""" + return BinanceCryptoHistoricalQueryParams(**params) + + @staticmethod + async def aextract_data( + query: BinanceCryptoHistoricalQueryParams, + credentials: Optional[Dict[str, str]] = None, + **kwargs: Any, + ) -> AsyncGenerator[dict, None]: + """Return the raw data from the Binance endpoint.""" + async with websockets.connect( + f"wss://stream.binance.{query.tld}:9443/ws/{query.symbol.lower()}@miniTicker" + ) as websocket: + logger.info("Connected to WebSocket server.") + end_time = datetime.now() + timedelta(seconds=query.lifetime) # type: ignore + try: + async for message in websocket: + data = json.loads(message) + yield data + if datetime.now() >= end_time: + break + except websockets.exceptions.ConnectionClosed as e: + logger.error("WebSocket connection closed.") + raise e + finally: + logger.info("WebSocket connection closed.") + + @staticmethod + async def atransform_data( + query: BinanceCryptoHistoricalQueryParams, + data: Dict[str, Any], + **kwargs: Any, + ) -> AsyncIterator[str]: + """Return the transformed data.""" + async for chunk in data: # type: ignore + chunk["date"] = ( + datetime.now().isoformat() if "date" not in chunk else chunk["date"] + ) + result = BinanceCryptoHistoricalData(**chunk) + yield result.model_dump_json() + "\n" diff --git a/openbb_platform/providers/binance/openbb_binance/utils/__init__.py b/openbb_platform/providers/binance/openbb_binance/utils/__init__.py new file mode 100644 index 000000000000..e46fb65077d6 --- /dev/null +++ b/openbb_platform/providers/binance/openbb_binance/utils/__init__.py @@ -0,0 +1 @@ +"""Biztoc utils.""" diff --git a/openbb_platform/providers/binance/pyproject.toml b/openbb_platform/providers/binance/pyproject.toml new file mode 100644 index 000000000000..18958b3ffbf7 --- /dev/null +++ b/openbb_platform/providers/binance/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "openbb-binance" +version = "1.2.1" +description = "" +authors = ["OpenBB Team "] +license = "AGPL-3.0-only" +readme = "README.md" +packages = [{ include = "openbb_binance" }] + +[tool.poetry.dependencies] +python = "^3.8" +openbb-core = "^1.2.3" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.plugins."openbb_provider_extension"] +binance = "openbb_binance:binance_provider" diff --git a/openbb_platform/providers/binance/tests/__init__.py b/openbb_platform/providers/binance/tests/__init__.py new file mode 100644 index 000000000000..f9f64a7d750b --- /dev/null +++ b/openbb_platform/providers/binance/tests/__init__.py @@ -0,0 +1 @@ +"""Biztoc Provider tests."""