Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamFetcher POC #6459

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions openbb_platform/core/openbb_core/api/router/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import inspect
from functools import partial, wraps
from inspect import Parameter, Signature, signature
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, Union

from fastapi import APIRouter, Depends, Header
from fastapi.responses import StreamingResponse
from fastapi.routing import APIRoute
from openbb_core.app.command_runner import CommandRunner
from openbb_core.app.model.command_context import CommandContext
Expand Down Expand Up @@ -188,17 +189,21 @@ def build_api_wrapper(
func.__annotations__ = new_annotations_map

@wraps(wrapped=func)
async def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> OBBject:
async def wrapper(
*args: Tuple[Any], **kwargs: Dict[str, Any]
) -> Union[OBBject, StreamingResponse]:
user_settings: UserSettings = UserSettings.model_validate(
kwargs.pop(
"__authenticated_user_settings",
UserService.read_default_user_settings(),
)
)
execute = partial(command_runner.run, path, user_settings)
output: OBBject = await execute(*args, **kwargs)
output = await execute(*args, **kwargs)

return validate_output(output)
if isinstance(output, OBBject):
return validate_output(output)
return output

return wrapper

Expand Down
32 changes: 17 additions & 15 deletions openbb_platform/core/openbb_core/app/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from inspect import Parameter, signature
from sys import exc_info
from time import perf_counter_ns
from typing import Any, Callable, Dict, List, Optional, Tuple, Type
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from warnings import catch_warnings, showwarning, warn

from fastapi.responses import StreamingResponse
from pydantic import BaseModel, ConfigDict, create_model

from openbb_core.app.logs.logging_service import LoggingService
Expand Down Expand Up @@ -420,7 +421,7 @@ async def run(
/,
*args,
**kwargs,
) -> OBBject:
) -> Union[OBBject, StreamingResponse]:
"""Run a command and return the OBBject as output."""
timestamp = datetime.now()
start_ns = perf_counter_ns()
Expand All @@ -429,7 +430,7 @@ async def run(
route = execution_context.route

if func := command_map.get_command(route=route):
obbject = await cls._execute_func(
result = await cls._execute_func(
route=route,
args=args, # type: ignore
execution_context=execution_context,
Expand All @@ -442,19 +443,20 @@ async def run(
duration = perf_counter_ns() - start_ns

if execution_context.user_settings.preferences.metadata:
try:
obbject.extra["metadata"] = Metadata(
arguments=kwargs,
duration=duration,
route=route,
timestamp=timestamp,
)
except Exception as e:
if Env().DEBUG_MODE:
raise OpenBBError(e) from e
warn(str(e), OpenBBWarning)
if isinstance(result, OBBject):
try:
result.extra["metadata"] = Metadata(
arguments=kwargs,
duration=duration,
route=route,
timestamp=timestamp,
)
except Exception as e:
if Env().DEBUG_MODE:
raise OpenBBError(e) from e
warn(str(e), OpenBBWarning)

return obbject
return result


class CommandRunner:
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a blocker:

Screenshot 2024-05-29 at 3 31 05 PM

Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,6 @@ def _get_provider_field_params(
expanded_types[field], is_required, "website"
)
field_type = f"Union[{field_type}, {expanded_type}]"

cleaned_description = (
str(field_info.description)
.strip().replace("\n", " ").replace(" ", " ").replace('"', "'")
Expand All @@ -1506,7 +1505,6 @@ def _get_provider_field_params(
# Manually setting to List[<field_type>] for multiple items
# Should be removed if TYPE_EXPANSION is updated to include this
field_type = f"Union[{field_type}, List[{field_type}]]"

default_value = "" if field_info.default is PydanticUndefined else field_info.default # fmt: skip

provider_field_params.append(
Expand Down
33 changes: 32 additions & 1 deletion openbb_platform/core/openbb_core/provider/abstract/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from typing import (
Any,
AsyncIterator,
Dict,
Generic,
Optional,
Expand Down Expand Up @@ -53,14 +54,19 @@ def transform_query(params: Dict[str, Any]) -> Q:
async def aextract_data(query: Q, credentials: Optional[Dict[str, str]]) -> Any:
"""Asynchronously extract the data from the provider."""

@staticmethod
async def atransform_data(
query: Q, data: Any, **kwargs
) -> Union[R, AnnotatedResult[R]]:
"""Asynchronously transform the provider-specific data."""

@staticmethod
def extract_data(query: Q, credentials: Optional[Dict[str, str]]) -> Any:
"""Extract the data from the provider."""

@staticmethod
def transform_data(query: Q, data: Any, **kwargs) -> Union[R, AnnotatedResult[R]]:
"""Transform the provider-specific data."""
raise NotImplementedError

def __init_subclass__(cls, *args, **kwargs):
"""Initialize the subclass."""
Expand All @@ -75,6 +81,15 @@ def __init_subclass__(cls, *args, **kwargs):
" default."
)

if cls.atransform_data != Fetcher.atransform_data:
cls.transform_data = cls.atransform_data
elif cls.transform_data == Fetcher.transform_data:
raise NotImplementedError(
"Fetcher subclass must implement either transform_data or atransform_data"
" method. If both are implemented, atransform_data will be used as the"
" default."
)

@classmethod
async def fetch_data(
cls,
Expand All @@ -89,6 +104,22 @@ async def fetch_data(
)
return cls.transform_data(query=query, data=data, **kwargs)

@classmethod
async def stream_data(
cls,
params: Dict[str, Any],
credentials: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[AsyncIterator[R], AsyncIterator[AnnotatedResult[R]]]:
"""Fetch data from a provider."""
query = cls.transform_query(params=params)
data = await maybe_coroutine(
cls.aextract_data, query=query, credentials=credentials, **kwargs
)
transformed_data = cls.atransform_data(query=query, data=data, **kwargs)
async for d in transformed_data:
yield d

@classproperty
def query_params_type(self) -> Q:
"""Get the type of query."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Crypto Router."""

# import asyncio

from openbb_core.app.model.command_context import CommandContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# import asyncio
from openbb_core.app.model.command_context import CommandContext
from openbb_core.app.model.command_context import CommandContext

from openbb_core.app.model.example import APIEx
from openbb_core.app.model.obbject import OBBject
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=W0613:unused-argument
"""Crypto Price Router."""

from fastapi.responses import StreamingResponse
from openbb_core.app.model.command_context import CommandContext
from openbb_core.app.model.example import APIEx
from openbb_core.app.model.obbject import OBBject
Expand All @@ -11,6 +12,10 @@
)
from openbb_core.app.query import Query
from openbb_core.app.router import Router
from providers.binance.openbb_binance.models.crypto_historical import (
BinanceCryptoHistoricalData,
BinanceCryptoHistoricalFetcher,
)

router = Router(prefix="/price")

Expand Down Expand Up @@ -56,3 +61,15 @@ async def historical(
) -> OBBject:
"""Get historical price data for cryptocurrency pair(s) within a provider."""
return await OBBject.from_query(Query(**locals()))


@router.command(methods=["GET"])
async def live(
symbol: str = "ethbtc", lifetime: int = 10
) -> BinanceCryptoHistoricalData:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we make this a Provider Interface method? There could be any number of providers that have a WS connection, like here - https://site.financialmodelingprep.com/developer/docs#crypto-websocket

"""Connect to Binance WebSocket Crypto Price data feed."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This endpoint will not generate any documentation or descriptions because it is not using the ProviderInterface.

generator = BinanceCryptoHistoricalFetcher().stream_data(
params={"symbol": symbol, "lifetime": lifetime},
credentials=None,
)
return StreamingResponse(generator, media_type="application/x-ndjson")
14 changes: 14 additions & 0 deletions openbb_platform/providers/binance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# OpenBB Biztoc Provider

This extension integrates the Biztoc data provider
into the OpenBB Platform.

## Installation

To install the extension, run the following command in this folder:

```bash
pip install openbb-biztoc
```

Documentation available [here](https://docs.openbb.co/platform/development/contributing).
1 change: 1 addition & 0 deletions openbb_platform/providers/binance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc provider."""
26 changes: 26 additions & 0 deletions openbb_platform/providers/binance/openbb_binance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Biztoc provider module."""

# from openbb_binance.models.crypto_historical import (
# BinanceCryptoHistoricalFetcher,
# )
from openbb_core.provider.abstract.provider import Provider

binance_provider = Provider(
name="binance",
website="https://api.binance.com",
description="""BizToc uses Rapid API for its REST API.
You may sign up for your free account at https://rapidapi.com/thma/api/binance.

The Base URL for all requests is:

https://binance.p.rapidapi.com/

If you're not a developer but would still like to use Biztoc outside of the main website,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is half BizToc and half maybe Binance.

we've partnered with OpenBB, allowing you to pull in BizToc's news stream in their Terminal.""",
# credentials=["api_key"],
fetcher_dict={
# "bcrypto_historical": BinanceCryptoHistoricalFetcher,
},
repr_name="Binance",
instructions="The BizToc API is hosted on RapidAPI. To set up, go to: https://rapidapi.com/thma/api/binance.\n\n![binance0](https://github.com/marban/OpenBBTerminal/assets/18151143/04cdd423-f65e-4ad8-ad5a-4a59b0f5ddda)\n\nIn the top right, select 'Sign Up'. After answering some questions, you will be prompted to select one of their plans.\n\n![binance1](https://github.com/marban/OpenBBTerminal/assets/18151143/9f3b72ea-ded7-48c5-aa33-bec5c0de8422)\n\nAfter signing up, navigate back to https://rapidapi.com/thma/api/binance. If you are logged in, you will see a header called X-RapidAPI-Key.\n\n![binance2](https://github.com/marban/OpenBBTerminal/assets/18151143/0f3b6c91-07e0-447a-90cd-a9e23522929f)", # noqa: E501 pylint: disable=line-too-long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc Provider models."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Binance Crypto Historical WS Data."""

import json
import logging
from datetime import datetime, timedelta
from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional

import websockets
from openbb_core.provider.standard_models.crypto_historical import (
CryptoHistoricalData,
CryptoHistoricalQueryParams,
)
from pydantic import Field

from openbb_platform.core.openbb_core.provider.abstract.fetcher import Fetcher

# pylint: disable=unused-argument, arguments-differ


class BinanceCryptoHistoricalQueryParams(CryptoHistoricalQueryParams):
"""Binance Crypto Historical Query Params."""

lifetime: Optional[int] = Field(
default=60, description="Lifetime of WebSocket in seconds."
)


class BinanceCryptoHistoricalData(CryptoHistoricalData):
"""Binance Crypto Historical Data."""

__alias_dict__ = {
"symbol": "s",
"close": "c",
"open": "o",
"high": "h",
"low": "l",
"volume": "v",
}
event_type: Optional[str] = Field(
default=None,
description="Event type",
alias="e",
)
quote_asset_volume: Optional[str] = Field(
default=None,
description="Total traded quote asset volume",
alias="q",
)


class BinanceCryptoHistoricalFetcher(Fetcher):
Copy link
Contributor

@deeleeramone deeleeramone May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this model name and Fetcher class is deceiving. It is not at all like the other provider/standard models with this name. This is not historical data, and the pattern itself is significantly different. It needs to be differentiated.
Like how you have added an additional atransform method, there should be something like wsconnect that accepts a URL and **kwargs where the core logic can be easily reused, like a standard model.

"""Define Binance Crypto Historical Fetcher."""

@staticmethod
def transform_query(params: Dict[str, Any]) -> BinanceCryptoHistoricalQueryParams:
"""Transform the query params."""
return BinanceCryptoHistoricalQueryParams(**params)

@staticmethod
async def aextract_data(
query: BinanceCryptoHistoricalQueryParams,
credentials: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> AsyncGenerator[dict, None]:
"""Return the raw data from the Binance endpoint."""
async with websockets.connect(
f"wss://stream.binance.com:9443/ws/{query.symbol.lower()}@miniTicker"
) as websocket:
logging.info("Connected to WebSocket server.")
end_time = datetime.now() + timedelta(seconds=query.lifetime)
try:
while datetime.now() < end_time:
chunk = await websocket.recv()
yield json.loads(chunk)
except websockets.exceptions.ConnectionClosed as e:
logging.error("WebSocket connection closed.")
raise e
finally:
logging.info("WebSocket connection closed.")

@staticmethod
async def atransform_data(
query: BinanceCryptoHistoricalQueryParams,
data: Dict[str, Any],
) -> AsyncIterator[str]:
"""Return the transformed data."""
async for chunk in data:
chunk["date"] = (
datetime.now().isoformat() if "date" not in chunk else chunk["date"]
)
result = BinanceCryptoHistoricalData(**chunk)
yield result.model_dump_json() + "\n"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Biztoc utils."""
Loading
Loading