From 37238e4490c8fe7701451977a18fd279581f4ad4 Mon Sep 17 00:00:00 2001 From: Carlos Wu Date: Sun, 21 Apr 2024 08:39:13 +0200 Subject: [PATCH] Bot streaming kafka (#556) * WIP setup Kafka for bot streaming updates This new setup with the Binquant Kafka server will: - No longer require the "update_required" to restart websockets, a signal can be produced to send the order to restart - Deduplicate the use of websockets in two places (hopefully), by resuing the klines producer * Refactor raw balances for performance improvements * Break down balance estimate function into simpler endpoints This improves testing abilities (as demonstrated in the new tests), modularity of reusage and improved efficiency (each call takes much less time) * Extend get_one bot function to also use a symbol. Add tests * Solve str -> float model casting issues * Remove dynamic take profit dynamic_take_profit uses standard deviation, and following the signals using telegram this value turns out to be highly inconsistent, therefore unreliable. --- api/account/account.py | 21 --- api/account/assets.py | 102 +++++++---- api/account/routes.py | 20 ++- api/apis.py | 92 +++++++--- api/autotrade/controller.py | 18 +- api/bots/controllers.py | 58 +++---- api/bots/routes.py | 53 +++++- api/bots/schemas.py | 16 +- api/db.py | 60 ++++++- api/deals/base.py | 79 +-------- api/deals/schema.py | 16 +- api/deals/spot.py | 27 +-- api/market_updates.py | 43 +++-- api/research/controller.py | 2 +- api/streaming/streaming_controller.py | 236 ++++++-------------------- api/streaming/user_data_streaming.py | 74 ++++++++ api/tests/test_assets.py | 114 ++++++++++++- api/tests/test_bots.py | 89 ++++++++++ api/tools/enum_definitions.py | 41 ++++- api/tools/exceptions.py | 2 +- api/tools/handle_error.py | 22 +-- api/tools/helpers.py | 24 +++ binquant | 2 +- docker-compose.yml | 3 +- 24 files changed, 761 insertions(+), 453 deletions(-) create mode 100644 api/streaming/user_data_streaming.py create mode 100644 api/tests/test_bots.py create mode 100644 api/tools/helpers.py diff --git a/api/account/account.py b/api/account/account.py index 3c8cb46d9..45c5ec332 100644 --- a/api/account/account.py +++ b/api/account/account.py @@ -6,7 +6,6 @@ json_response_message, json_response_error, ) -from decimal import Decimal from db import setup_db from requests_cache import CachedSession, MongoCache from pymongo import MongoClient @@ -73,26 +72,6 @@ def get_ticker_price(self, symbol: str): data = handle_binance_errors(res) return data["price"] - def ticker_24(self, type: str = "FULL", symbol: str | None = None): - """ - Weight 40 without symbol - https://github.com/carkod/binbot/issues/438 - - Using cache - """ - url = self.ticker24_url - params = { - "type": type - } - if symbol: - params["symbol"] = symbol - - # mongo_cache = self.setup_mongocache() - # expire_after = 15m because candlesticks are 15m - # session = CachedSession('ticker_24_cache', backend=mongo_cache, expire_after=15) - res = requests.get(url=url, params=params) - data = handle_binance_errors(res) - return data def find_quoteAsset(self, symbol): """ diff --git a/api/account/assets.py b/api/account/assets.py index a2c7d8c33..be7d3568e 100644 --- a/api/account/assets.py +++ b/api/account/assets.py @@ -1,4 +1,3 @@ -import json from datetime import datetime, timedelta import pandas as pd @@ -6,7 +5,7 @@ from bson.objectid import ObjectId from charts.models import CandlestickParams from charts.models import Candlestick -from db import Database, setup_db +from db import setup_db from tools.handle_error import json_response, json_response_error, json_response_message from tools.round_numbers import round_numbers from tools.exceptions import BinanceErrors, InvalidSymbol, MarginLoanNotFound @@ -22,29 +21,23 @@ def get_raw_balance(self, asset=None): """ Unrestricted balance """ - data = self.signed_request(url=self.account_url) - df = pd.DataFrame(data["balances"]) - df["free"] = pd.to_numeric(df["free"]) - df["locked"] = pd.to_numeric(df["locked"]) - df["asset"] = df["asset"].astype(str) - # Get table with > 0 - balances = df[(df["free"] > 0) | (df["locked"] > 0)].to_dict("records") - - if asset: - balances = df[ - ((df["free"] > 0) | (df["locked"] > 0)) & (df["asset"] == asset) - ].to_dict("records") - # filter out empty - # Return response - resp = json_response({"data": balances}) - return resp + data = self.get_account_balance() + balances = [] + for item in data["balances"]: + if float(item["free"]) > 0 or float(item["locked"]) > 0: + if asset: + if item["asset"] == asset: + balances.append(item) + else: + balances.append(item) + return balances def get_pnl(self, days=7): current_time = datetime.now() start = current_time - timedelta(days=days) dummy_id = ObjectId.from_datetime(start) data = list( - self.db.balances.find( + self._db.balances.find( { "_id": { "$gte": dummy_id, @@ -58,9 +51,9 @@ def get_pnl(self, days=7): def _check_locked(self, b): qty = 0 if "locked" in b: - qty = b["free"] + b["locked"] + qty = float(b["free"]) + float(b["locked"]) else: - qty = b["free"] + qty = float(b["free"]) return qty def store_balance(self) -> dict: @@ -69,9 +62,8 @@ def store_balance(self) -> dict: Store current balance in Db """ # Store balance works outside of context as cronjob - balances_response = self.get_raw_balance() - bin_balance = json.loads(balances_response.body) - current_time = datetime.utcnow() + bin_balance = self.get_raw_balance() + current_time = datetime.now() total_usdt: float = 0 rate: float = 0 for b in bin_balance["data"]: @@ -104,7 +96,7 @@ def store_balance(self) -> dict: try: balance_schema = BalanceSchema(**total_balance) balances = balance_schema.dict() - self.db.balances.update_one( + self._db.balances.update_one( {"time": current_time.strftime("%Y-%m-%d")}, {"$set": balances}, upsert=True, @@ -119,7 +111,7 @@ def balance_estimate(self, fiat="USDT"): """ Estimated balance in given fiat coin """ - balances_response = self.get_raw_balance() + balances = self.get_raw_balance() # Isolated m isolated_margin = self.signed_request(url=self.isolated_account_url) get_usdt_btc_rate = self.ticker(symbol=f"BTC{fiat}", json=False) @@ -127,11 +119,10 @@ def balance_estimate(self, fiat="USDT"): get_usdt_btc_rate["price"] ) - balances = json.loads(balances_response.body) total_fiat = 0 rate = 0 left_to_allocate = 0 - for b in balances["data"]: + for b in balances: # Transform tethers/stablecoins if "USD" in b["asset"] or fiat == b["asset"]: if fiat == b["asset"]: @@ -149,7 +140,7 @@ def balance_estimate(self, fiat="USDT"): total_fiat += float(qty) * float(rate) balance = { - "balances": balances["data"], + "balances": balances, "total_fiat": total_fiat + total_isolated_margin, "total_isolated_margin": total_isolated_margin, "fiat_left": left_to_allocate, @@ -199,7 +190,7 @@ def store_balance_snapshot(self): """ db = setup_db() print("Store account snapshot starting...") - current_time = datetime.utcnow() + current_time = datetime.now() data = self.signed_request(self.account_snapshot_url, payload={"type": "SPOT"}) spot_data = next( (item for item in data["snapshotVos"] if item["type"] == "spot"), None @@ -299,7 +290,7 @@ async def get_balance_series(self, end_date, start_date): lte_tp_id = ObjectId.from_datetime(obj_end_date) params["_id"]["$lte"] = lte_tp_id - balance_series = list(self.db.balances.find(params).sort([("time", -1)])) + balance_series = list(self._db.balances.find(params).sort([("time", -1)])) # btc candlestick data series params = CandlestickParams( @@ -351,7 +342,7 @@ def clean_balance_assets(self): if len(self.exception_list) == 0: self.exception_list = ["USDT", "NFT", "BNB"] - active_bots = list(self.db.bots.find({"status": Status.active})) + active_bots = list(self._db.bots.find({"status": Status.active})) for bot in active_bots: quote_asset = bot["pair"].replace(bot["balance_to_use"], "") self.exception_list.append(quote_asset) @@ -369,6 +360,49 @@ def clean_balance_assets(self): return resp + def get_total_fiat(self, fiat="USDT"): + """ + Simplified version of balance_estimate + + Returns: + float: total BTC estimated in the SPOT wallet + then converted into USDT + """ + wallet_balance = self.get_wallet_balance() + get_usdt_btc_rate = self.ticker(symbol=f"BTC{fiat}", json=False) + total_balance = 0 + rate = float(get_usdt_btc_rate["price"]) + for item in wallet_balance: + if item["activate"]: + total_balance += float(item["balance"]) + + total_fiat = total_balance * rate + return total_fiat + + def get_available_fiat(self, fiat="USDT"): + """ + Simplified version of balance_estimate + to get free/avaliable USDT. + + Getting the total USDT directly + from the balances because if it were e.g. + Margin trading, it would not be available for use. + The only available fiat is the unused USDT in the SPOT wallet. + + Balance not used in Margin trading should be + transferred back to the SPOT wallet. + + Returns: + str: total USDT available to + """ + total_balance = self.get_raw_balance() + for item in total_balance: + if item["asset"] == fiat: + return float(item["free"]) + else: + return 0 + + def disable_isolated_accounts(self, symbol=None): """ Check and disable isolated accounts @@ -423,7 +457,7 @@ def store_market_domination(self): all_coins = sorted(all_coins, key=lambda item: float(item["priceChangePercent"]), reverse=True) try: current_time = datetime.now() - self.db.market_domination.insert_one( + self._db.market_domination.insert_one( { "time": current_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], "data": all_coins @@ -439,11 +473,9 @@ def get_market_domination(self, size=7): Args: size (int, optional): Number of data points to retrieve. Defaults to 7 (1 week). - Returns: dict: A dictionary containing the market domination data, including gainers and losers counts, percentages, and dates. """ query = {"$query": {}, "$orderby": {"_id": -1}} result = self._db.market_domination.find(query).limit(size) return list(result) - diff --git a/api/account/routes.py b/api/account/routes.py index 440f5ebd2..e14f68cda 100644 --- a/api/account/routes.py +++ b/api/account/routes.py @@ -17,7 +17,8 @@ @account_blueprint.get("/balance/raw", response_model=BalanceResponse, tags=["account"]) def raw_balance(): - return Assets().get_raw_balance() + data = Assets().get_raw_balance() + return json_response({"data": data}) @account_blueprint.get("/symbols", tags=["account"]) @@ -100,6 +101,23 @@ async def get_balance_series(): def clean_balance(): return Assets().clean_balance_assets() +@account_blueprint.get("/fiat/available", response_model=BalanceSeriesResponse, tags=["assets"]) +def total_balance(): + """ + Total USDT in balance + Calculated by Binance + """ + total_fiat = Assets().get_available_fiat() + return json_response({"data": total_fiat}) + +@account_blueprint.get("/fiat", response_model=BalanceSeriesResponse, tags=["assets"]) +def total_balance(): + """ + Total USDT in balance + Calculated by Binance + """ + total_fiat = Assets().get_total_fiat() + return json_response({"data": total_fiat}) @account_blueprint.get( "/disable-isolated", response_model=BalanceSeriesResponse, tags=["assets"] diff --git a/api/apis.py b/api/apis.py index 7f242c8b5..4d1748706 100644 --- a/api/apis.py +++ b/api/apis.py @@ -1,10 +1,10 @@ +from random import randrange from typing import List import hashlib import hmac import os from urllib.parse import urlencode -from time import time -from requests import request +from requests import Session, request from tools.handle_error import handle_binance_errors, json_response, json_response_error from tools.exceptions import IsolateBalanceError from py3cw.request import Py3CW @@ -15,19 +15,21 @@ class BinanceApi: https://binance.github.io/binance-api-swagger/ """ - BASE = "https://api.binance.com" + api_servers = ["https://api.binance.com", "https://api3.binance.com", "https://api-gcp.binance.com"] + BASE = api_servers[randrange(3) - 1] + MARKET_DATA_BASE = "https://data-api.binance.vision" WAPI = f"{BASE}/api/v3/depth" WS_BASE = "wss://stream.binance.com:9443/stream?streams=" recvWindow = 9000 secret = os.getenv("BINANCE_SECRET") key = os.getenv("BINANCE_KEY") - server_time_url = f"{BASE}/api/v3/time" + server_time_url = f"{MARKET_DATA_BASE}/api/v3/time" account_url = f"{BASE}/api/v3/account" - exchangeinfo_url = f"{BASE}/api/v3/exchangeInfo" - ticker_price_url = f"{BASE}/api/v3/ticker/price" - ticker24_url = f"{BASE}/api/v3/ticker/24hr" - candlestick_url = f"{BASE}/api/v3/uiKlines" + exchangeinfo_url = f"{MARKET_DATA_BASE}/api/v3/exchangeInfo" + ticker_price_url = f"{MARKET_DATA_BASE}/api/v3/ticker/price" + ticker24_url = f"{MARKET_DATA_BASE}/api/v3/ticker/24hr" + candlestick_url = f"{MARKET_DATA_BASE}/api/v3/uiKlines" order_url = f"{BASE}/api/v3/order" order_book_url = f"{BASE}/api/v3/depth" avg_price = f"{BASE}/api/v3/avgPrice" @@ -36,6 +38,7 @@ class BinanceApi: cancel_replace_url = f"{BASE}/api/v3/order/cancelReplace" user_data_stream = f"{BASE}/api/v3/userDataStream" trade_fee = f"{BASE}/sapi/v1/asset/tradeFee" + wallet_balance_url = f"{BASE}/sapi/v1/asset/wallet/balance" withdraw_url = f"{BASE}/wapi/v3/withdraw.html" withdraw_history_url = f"{BASE}/wapi/v3/withdrawHistory.html" @@ -55,13 +58,33 @@ class BinanceApi: margin_order = f"{BASE}/sapi/v1/margin/order" max_borrow_url = f"{BASE}/sapi/v1/margin/maxBorrowable" - def signed_request(self, url, method="GET", payload={}, params={}): + def request(self, url, method="GET", session: Session=None, payload={}, **kwargs): + """ + Standard request + - No signed + - No authorization + """ + if session: + res = session.request(method=method, url=url, **kwargs) + else: + res = request(method=method, url=url, json=payload, **kwargs) + data = handle_binance_errors(res) + return data + + def get_server_time(self): + data = self.request(url=self.server_time_url) + return data["serverTime"] + + def signed_request(self, url, method="GET", payload={}): """ USER_DATA, TRADE signed requests """ + session = Session() query_string = urlencode(payload, True) - timestamp = round(time() * 1000) - headers = {"Content-Type": "application/json", "X-MBX-APIKEY": self.key} + timestamp = self.get_server_time() + session.headers.update( + {"Content-Type": "application/json", "X-MBX-APIKEY": self.key} + ) if query_string: query_string = ( @@ -76,17 +99,7 @@ def signed_request(self, url, method="GET", payload={}, params={}): hashlib.sha256, ).hexdigest() url = f"{url}?{query_string}&signature={signature}" - data = self.request(method, url=url, headers=headers, params=params) - return data - - def request(self, method="GET", **args): - """ - Standard request - - No signed - - No authorization - """ - res = request(method, **args) - data = handle_binance_errors(res) + data = self.request(url, method, session) return data def get_listen_key(self): @@ -97,6 +110,41 @@ def get_listen_key(self): data = self.request("POST", url=self.user_data_stream, headers=headers) return data["listenKey"] + + def ticker_24(self, type: str = "FULL", symbol: str | None = None): + """ + Weight 40 without symbol + https://github.com/carkod/binbot/issues/438 + + Using cache + """ + params = { + "type": type + } + if symbol: + params["symbol"] = symbol + + # mongo_cache = self.setup_mongocache() + # expire_after = 15m because candlesticks are 15m + # session = CachedSession('ticker_24_cache', backend=mongo_cache, expire_after=15) + data = self.request(url=self.ticker24_url, params=params) + return data + + def get_account_balance(self): + """ + Get account balance + """ + data = self.signed_request(self.account_url) + return data + + def get_wallet_balance(self): + """ + Balance by wallet (SPOT, FUNDING, CROSS MARGIN...) + https://binance-docs.github.io/apidocs/spot/en/#query-user-wallet-balance-user_data + """ + data = self.signed_request(self.wallet_balance_url) + return data + def cancel_margin_order(self, symbol, order_id): return self.signed_request(self.margin_order, method="DELETE", payload={"symbol": symbol, "orderId": order_id}) diff --git a/api/autotrade/controller.py b/api/autotrade/controller.py index d7b5de55d..29a1234a0 100644 --- a/api/autotrade/controller.py +++ b/api/autotrade/controller.py @@ -2,7 +2,7 @@ from pydantic import ValidationError -from db import setup_db +from db import Database, setup_db from tools.handle_error import ( json_response, json_response_error, @@ -10,7 +10,7 @@ ) from time import time -class AutotradeSettingsController: +class AutotradeSettingsController(Database): """ Autotrade settings """ @@ -19,12 +19,11 @@ def __init__( self, document_id: Literal["test_autotrade_settings", "settings"] = "settings" ): self.document_id = document_id - self.db = setup_db() - self.db_collection = self.db.research_controller + self.db = self._db def get_settings(self): try: - settings = self.db_collection.find_one({"_id": self.document_id}) + settings = self.db.research_controller.find_one({"_id": self.document_id}) resp = json_response( {"message": "Successfully retrieved settings", "data": settings} ) @@ -41,7 +40,7 @@ def edit_settings(self, data): if "update_required" in settings: settings["update_required"] = time() - self.db_collection.update_one({"_id": self.document_id}, {"$set": settings}) + self.research_controller.update_one({"_id": self.document_id}, {"$set": settings}) resp = json_response_message("Successfully updated settings") except TypeError as e: @@ -52,3 +51,10 @@ def edit_settings(self, data): msg += field + desc[0] resp = json_response_error(f"{msg}") return resp + + def get_autotrade_settings(self): + return self._db.research_controller.find_one({"_id": "settings"}) + + def get_test_autotrade_settings(self): + return self._db.research_controller.find_one({"_id": "test_autotrade_settings"}) + diff --git a/api/bots/controllers.py b/api/bots/controllers.py index 4b07a98c4..c012484d4 100644 --- a/api/bots/controllers.py +++ b/api/bots/controllers.py @@ -7,9 +7,8 @@ from fastapi.exceptions import RequestValidationError from account.account import Account -from deals.controllers import CreateDealController -from tools.enum_definitions import BinbotEnums -from tools.exceptions import BinanceErrors, BinbotErrors, DealCreationError, QuantityTooLow +from tools.enum_definitions import BinbotEnums, Status +from tools.exceptions import QuantityTooLow from tools.handle_error import ( handle_binance_errors, json_response, @@ -33,7 +32,18 @@ def _update_required(self): self.db.research_controller.update_one({"_id": "settings"}, {"$set": {"update_required": time()}}) return - def get(self, status, start_date, end_date, no_cooldown): + def get_active_pairs(self, symbol: str = None): + """ + Get distinct (non-repeating) bots by status active + """ + params = {"status": Status.active} + if symbol: + params["pair"] = symbol + + bots = list(self.db_collection.distinct("pair", params)) + return bots + + def get(self, status, start_date=None, end_date=None, no_cooldown=False): """ Get all bots in the db except archived Args: @@ -104,13 +114,16 @@ def get(self, status, start_date, end_date, no_cooldown): return resp - def get_one(self, findId): - bot = self.db_collection.find_one({"id": findId}) - if bot: - resp = json_response({"message": "Bot found", "data": bot}) + def get_one(self, bot_id=None, symbol=None): + if bot_id: + params = {"id": bot_id} + elif symbol: + params = {"pair": symbol} else: - resp = json_response({"message": "Bots not found"}, 404) - return resp + raise ValueError("id or symbol is required to find bot") + + bot = self.db_collection.find_one(params) + return bot def create(self, data): """ @@ -179,30 +192,11 @@ def delete(self, bot_ids: List[str] = Query(...)): def activate(self, botId: str): bot = self.db_collection.find_one({"id": botId}) - if bot: - - try: - CreateDealController( - bot, db_collection=self.db_collection.name - ).open_deal() - return json_response_message("Successfully activated bot!") - except BinanceErrors as error: - logging.info(error) - self.post_errors_by_id(botId, error.message) - return json_response_error(error.message) - except BinbotErrors as error: - logging.info(error) - self.post_errors_by_id(botId, error.message) - return json_response_error(error.message) - except Exception as error: - self.post_errors_by_id(botId, error) - resp = json_response_error(f"Unable to activate bot: {error}") - return resp - else: - return json_response_error("Bot not found.") + return bot def deactivate(self, findId): """ + DO NOT USE, LEGACY CODE NEEDS TO BE REVAMPED Close all deals, sell pair and deactivate 1. Close all deals 2. Sell Coins @@ -233,7 +227,7 @@ def deactivate(self, findId): # Sell everything pair = bot["pair"] base_asset = self.find_baseAsset(pair) - bot = BotSchema.parse_obj(bot) + bot = BotSchema(**bot) precision = self.price_precision qty_precision = self.qty_precision balance = self.get_one_balance(base_asset) diff --git a/api/bots/routes.py b/api/bots/routes.py index 8496d4170..cb441e976 100644 --- a/api/bots/routes.py +++ b/api/bots/routes.py @@ -1,7 +1,12 @@ +import logging +import uuid from fastapi import APIRouter, Query +from deals.controllers import CreateDealController +from tools.handle_error import json_response, json_response_error, json_response_message from bots.controllers import Bot from bots.schemas import BotSchema, BotListResponse, ErrorsRequestBody from typing import List +from tools.exceptions import BinanceErrors, BinbotErrors bot_blueprint = APIRouter() @@ -17,9 +22,23 @@ def get( @bot_blueprint.get("/bot/{id}", tags=["bots"]) -def get_one(id: str): - return Bot(collection_name="bots").get_one(id) - +def get_one_by_id(id: str): + try: + bot = Bot(collection_name="bots").get_one(bot_id=id, symbol=None) + if not bot: + return json_response_error("Bot not found.") + else: + return json_response({"message": "Bot found", "data": bot}) + except ValueError as error: + return json_response_error(error) + +@bot_blueprint.get("/bot/{symbol}", tags=["bots"]) +def get_one_by_symbol(symbol: str): + try: + bot = Bot(collection_name="bots").get_one(bot_id=None, symbol=symbol) + return json_response({"message": "Bot found", "data": bot}) + except ValueError as error: + return json_response_error(error) @bot_blueprint.post("/bot", tags=["bots"]) def create(bot_item: BotSchema): @@ -48,7 +67,27 @@ def activate(id: str): - If changes were made, it will override DB data - Because botId is received from endpoint, it will be a str not a PyObjectId """ - return Bot(collection_name="bots").activate(id) + bot_instance = Bot(collection_name="bots") + bot = bot_instance.activate(id) + if bot: + + try: + CreateDealController(bot, db_collection="bots").open_deal() + return json_response_message("Successfully activated bot!") + except BinanceErrors as error: + logging.info(error) + bot_instance.post_errors_by_id(id, error.message) + return json_response_error(error.message) + except BinbotErrors as error: + logging.info(error) + bot_instance.post_errors_by_id(id, error.message) + return json_response_error(error.message) + except Exception as error: + bot_instance.post_errors_by_id(id, error) + resp = json_response_error(f"Unable to activate bot: {error}") + return resp + else: + return json_response_error("Bot not found.") @bot_blueprint.delete("/bot/deactivate/{id}", tags=["bots"]) @@ -75,4 +114,8 @@ def bot_errors(bot_id: str, bot_errors: ErrorsRequestBody): """ request_body = bot_errors.model_dump(mode="python") bot_errors = request_body.get("errors", None) - return Bot(collection_name="bots").post_errors_by_id(bot_id, bot_errors) + try: + Bot(collection_name="bots").post_errors_by_id(bot_id, bot_errors) + except Exception as error: + return json_response_error(f"Error posting errors: {error}") + return json_response_message("Errors posted successfully.") diff --git a/api/bots/schemas.py b/api/bots/schemas.py index 737e2f16c..f5ef37076 100644 --- a/api/bots/schemas.py +++ b/api/bots/schemas.py @@ -66,6 +66,16 @@ def check_names_not_empty(cls, v): assert v != "", "Empty pair field." return v + @field_validator("balance_size_to_use", "base_order_size") + @classmethod + def validate_str_numbers(cls, v): + if isinstance(v, float): + return str(v) + elif isinstance(v, int): + return str(v) + else: + return v + @field_validator("stop_loss", "take_profit", "trailling_deviation", "trailling_profit") @classmethod def check_percentage(cls, v): @@ -110,9 +120,9 @@ class Config: "description": "Most fields are optional. Deal field is generated internally, orders are filled up by Binance", "example": { "pair": "BNBUSDT", - "balance_size_to_use": 0, - "balance_to_use": 0, - "base_order_size": 15, + "balance_size_to_use": "0", + "balance_to_use": "USDT", + "base_order_size": "15", "candlestick_interval": "15m", "cooldown": 0, "errors": [], diff --git a/api/db.py b/api/db.py index 45a87f714..00f71baba 100644 --- a/api/db.py +++ b/api/db.py @@ -1,5 +1,7 @@ import os -from pymongo import MongoClient +from pymongo import MongoClient, ReturnDocument +from bots.schemas import BotSchema +from tools.enum_definitions import Status def setup_db(): @@ -16,8 +18,61 @@ def setup_db(): class Database: + """ + The whole objective of having this class is to provide + a single point of entry to the database. + Therefore, always initialize database instance + + Methods in this should only be helpers + that can be independently triggered, where + only dependency is the _db instance + """ _db = setup_db() - + + def save_bot_streaming(self, active_bot: BotSchema, db_collection_name: str="bots"): + """ + MongoDB query to save bot using Pydantic + + This function differs from usual save query in that + it returns the saved bot, thus called streaming, it's + specifically for streaming saves + + Returns: + dict: The saved bot + """ + + bot = BotSchema.model_dump(active_bot) + if "_id" in bot: + bot.pop("_id") + + response = self._db[db_collection_name].find_one_and_update( + {"id": active_bot.id}, + { + "$set": bot, + }, + return_document=ReturnDocument.AFTER, + ) + + return response + + def update_deal_logs(self, msg, active_bot: BotSchema, db_collection_name: str="bots"): + """ + Use this function if independently updating Event logs (deal.errors list) + especially useful if a certain operation might fail in an exception + and the error needs to be stored in the logs + + However, if save_bot_streaming is used later, + and there is almost no chance of failure, + best to this.active_bot.errors.append(str(msg)) so we can save + some DB calls. + """ + result = self._db[db_collection_name].find_one_and_update( + {"id": active_bot.id}, + {"$push": {"errors": str(msg)}}, + return_document=ReturnDocument.AFTER, + ) + active_bot.errors = result["errors"] + return result # def get_autotrade_settings(self): # return self._db.research_controller.find_one({"_id": "settings"}) @@ -32,4 +87,3 @@ class Database: # def get_active_paper_trading_bots(self): # bots = list(self._db.paper_trading.distinct("pair", {"status": "active"})) # return bots - diff --git a/api/deals/base.py b/api/deals/base.py index 301986cfc..6bffd6e0e 100644 --- a/api/deals/base.py +++ b/api/deals/base.py @@ -27,7 +27,10 @@ class BaseDeal(OrderController): """ def __init__(self, bot, db_collection_name): - self.active_bot = BotSchema(**bot) + if not isinstance(bot, BotSchema): + self.active_bot = BotSchema(**bot) + else: + self.active_bot = bot self.symbol = self.active_bot.pair super().__init__(symbol=self.active_bot.pair) self.db_collection = self._db[db_collection_name] @@ -217,9 +220,9 @@ def create_new_bot_streaming(self): bot = encode_json(self.active_bot) self.db_collection.insert_one(bot) new_bot = self.db_collection.find_one({"id": bot["id"]}) - bot_class = BotSchema(**new_bot) + new_bot_class = BotSchema(**new_bot) - return bot_class + return new_bot_class def base_order(self): """ @@ -297,76 +300,6 @@ def base_order(self): return document - def dynamic_take_profit(self, current_bot, close_price): - self.active_bot = BotSchema(**current_bot) - - params = { - "symbol": self.active_bot.pair, - "interval": self.active_bot.candlestick_interval, - } - res = requests.get(url=self.bb_candlestick_url, params=params) - data = handle_binance_errors(res) - list_prices = numpy.array(data["trace"][0]["close"]).astype(numpy.single) - series_sd = numpy.std(list_prices.astype(numpy.single)) - sd = series_sd / float(close_price) - dates = numpy.array(data["trace"][0]["x"]) - - # Calculate linear regression to get trend - slope, intercept, rvalue, pvalue, stderr = linregress(dates, list_prices) - - if sd >= 0: - logging.debug( - f"dynamic profit for {self.active_bot.pair} sd: {sd}", - f'slope is {"positive" if slope > 0 else "negative"}', - ) - if ( - self.active_bot.deal.trailling_stop_loss_price > 0 - and self.active_bot.deal.trailling_stop_loss_price - > self.active_bot.deal.base_order_price - and float(close_price) > self.active_bot.deal.trailling_stop_loss_price - and ( - (self.active_bot.strategy == "long" and slope > 0) - or (self.active_bot.strategy == "margin_short" and slope < 0) - ) - and self.active_bot.deal.sd > sd - ): - # Only do dynamic trailling if regression line confirms it - volatility = float(sd) / float(close_price) - if volatility < 0.018: - volatility = 0.018 - elif volatility > 0.088: - volatility = 0.088 - - # sd is multiplied by 2 to increase the difference between take_profit and trailling_stop_loss - # this avoids closing too early - new_trailling_stop_loss_price = float(close_price) - ( - float(close_price) * volatility - ) - # deal.sd comparison will prevent it from making trailling_stop_loss too big - # and thus losing all the gains - if ( - new_trailling_stop_loss_price - > float(self.active_bot.deal.buy_price) - and sd < self.active_bot.deal.sd - ): - self.active_bot.trailling_deviation = volatility * 100 - self.active_bot.deal.trailling_stop_loss_price = float( - close_price - ) - (float(close_price) * volatility) - # Update tralling_profit price - logging.info( - f"Updated trailling_deviation and take_profit {self.active_bot.deal.trailling_stop_loss_price}" - ) - self.active_bot.deal.take_profit_price = float(close_price) + ( - float(close_price) * volatility - ) - self.active_bot.deal.trailling_profit_price = float(close_price) + ( - float(close_price) * volatility - ) - - self.active_bot.deal.sd = sd - bot = self.save_bot_streaming() - return bot def margin_liquidation(self, pair: str, qty_precision=None): """ diff --git a/api/deals/schema.py b/api/deals/schema.py index f4dee9c92..d326d4ade 100644 --- a/api/deals/schema.py +++ b/api/deals/schema.py @@ -4,18 +4,26 @@ class OrderSchema(BaseModel): + """ + No need for validation, + order coming from Binance + + """ order_type: str | None = None time_in_force: str | None = None timestamp: float = 0 pair: str | None = None - qty: str | None = None + qty: str | float | int | None = None order_side: str | None = None - order_id: str | None = None + order_id: int | None = None fills: Any = None price: float | None = None status: str | None = None deal_type: str | None = None # [base_order, take_profit, so_{x}, short_sell, short_buy, margin_short] + class Config: + use_enum_values = True + class DealSchema(BaseModel): buy_price: float = 0 # base currency quantity e.g. 3000 USDT in BTCUSDT @@ -31,7 +39,6 @@ class DealSchema(BaseModel): sell_timestamp: float = 0 sell_price: float = 0 sell_qty: float = 0 - post_closure_current_price: float = 0 trailling_stop_loss_price: float = 0 trailling_profit_price: float = 0 short_sell_price: float = 0 @@ -63,9 +70,10 @@ class DealSchema(BaseModel): def check_prices(cls, v): if float(v) < 0: raise ValueError("Price must be a positive number") + if isinstance(v, str): + return float(v) return v - class MarginOrderSchema(OrderSchema): margin_buy_borrow_amount: int = 0 margin_buy_borrow_asset: str = "USDT" diff --git a/api/deals/spot.py b/api/deals/spot.py index d3fc8805e..ae7f91c19 100644 --- a/api/deals/spot.py +++ b/api/deals/spot.py @@ -350,7 +350,7 @@ def so_update_deal(self, so_index): pass def streaming_updates(self, close_price, open_price): - + close_price = float(close_price) self.close_conditions(float(close_price)) self.active_bot.deal.current_price = close_price @@ -363,7 +363,7 @@ def streaming_updates(self, close_price, open_price): if self.active_bot.margin_short_reversal: self.switch_margin_short() - self.update_required() + # self.update_required() return # Take profit trailling @@ -466,29 +466,6 @@ def streaming_updates(self, close_price, open_price): self.save_bot_streaming() - # Open safety orders - # When bot = None, when bot doesn't exist (unclosed websocket) - if ( - hasattr(self.active_bot, "safety_orders") - and len(self.active_bot.safety_orders) > 0 - ): - for key, so in enumerate(self.active_bot.safety_orders): - # Index is the ID of the safety order price that matches safety_orders list - if ( - hasattr(self.active_bot, "status") - and self.active_bot.status == 0 - and so.buy_price >= float(close_price) - ): - self.so_update_deal(key) - - # Execute dynamic_take_profit at the end, - # so that trailling_take_profit and trailling_stop_loss can execute before - # else trailling_stop_loss could be hit but then changed because of dynamic_tp - if self.active_bot.trailling and self.active_bot.dynamic_trailling: - # Returns bot, to keep modifying in subsequent checks - bot = self.dynamic_take_profit(self.active_bot, close_price) - - def close_conditions(self, current_price): """ diff --git a/api/market_updates.py b/api/market_updates.py index a0bb03af1..24e81e6da 100644 --- a/api/market_updates.py +++ b/api/market_updates.py @@ -1,21 +1,34 @@ +from enum import auto +import json import logging -import time +import os +from click import group +from kafka import KafkaConsumer from streaming.streaming_controller import StreamingController +from tools.enum_definitions import KafkaTopics -logging.Formatter.converter = time.gmtime # date time in GMT/UTC -logging.basicConfig( - level=logging.INFO, - filename=None, - format="%(asctime)s.%(msecs)03d UTC %(levelname)s %(name)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) +def main(): + try: + consumer = KafkaConsumer( + KafkaTopics.klines_store_topic.value, + KafkaTopics.restart_streaming.value, + bootstrap_servers=f'{os.environ["KAFKA_HOST"]}:{os.environ["KAFKA_PORT"]}', + value_deserializer=lambda m: json.loads(m), + auto_offset_reset="latest", + autocommit_enable=True, + ) + mu = StreamingController(consumer) + for message in consumer: + if message.topic == KafkaTopics.restart_streaming.value: + mu.load_data_on_start() + if message.topic == KafkaTopics.klines_store_topic.value: + mu.process_klines(message.value) + except Exception as error: + logging.error(f"Streaming controller error: {error}") + main() -try: - mu = StreamingController() - mu.get_klines() -except Exception as error: - logging.error(f"Streaming controller error: {error}") - mu = StreamingController() - mu.get_klines() +if __name__ == "__main__": + while True: + main() \ No newline at end of file diff --git a/api/research/controller.py b/api/research/controller.py index ee5d08e19..1176ccfbc 100644 --- a/api/research/controller.py +++ b/api/research/controller.py @@ -179,4 +179,4 @@ def edit_subscribed_symbol(self, symbol): ) return json_response_message("Successfully update symbol in the subscribed list") except Exception as error: - return json_response_error(f"Failed to update symbol in the subscribed list {error}") \ No newline at end of file + return json_response_error(f"Failed to update symbol in the subscribed list {error}") diff --git a/api/streaming/streaming_controller.py b/api/streaming/streaming_controller.py index 8f85eaa60..97ce67063 100644 --- a/api/streaming/streaming_controller.py +++ b/api/streaming/streaming_controller.py @@ -1,46 +1,39 @@ import json import logging -from deals.base import BaseDeal -from tools.enum_definitions import Status -from db import setup_db + +from bots.schemas import BotSchema +from autotrade.controller import AutotradeSettingsController +from bots.controllers import Bot +from tools.enum_definitions import Status, Strategy +from db import Database from deals.margin import MarginDeal from deals.spot import SpotLongDeal -from time import time -from streaming.socket_client import SpotWebsocketStreamClient -from tools.exceptions import BinanceErrors, TerminateStreaming +from tools.exceptions import BinanceErrors -class StreamingController(BaseDeal): - def __init__(self): - # For some reason, db connections internally only work with - # db:27017 instead of localhost=:2018 - self.streaming_db = setup_db() - self.socket = None - # test wss://data-stream.binance.com - self.client = SpotWebsocketStreamClient( - on_message=self.on_message, on_error=self.on_error, is_combined=True - ) - self.conn_key = None - self.list_bots = [] - self.list_paper_trading_bots = [] - self.settings = self.streaming_db.research_controller.find_one( - {"_id": "settings"} - ) - self.test_settings = self.streaming_db.research_controller.find_one( - {"_id": "test_autotrade_settings"} - ) - def _update_required(self): - """ - Terminate streaming and restart list of bots required +class StreamingController(Database): + def __init__(self, consumer): + super().__init__() + self.streaming_db = self._db + # Gets any signal to restart streaming + self.consumer = consumer + self.autotrade_controller = AutotradeSettingsController() + self.load_data_on_start() - This will count and store number of times update is required, - so that we can add a condition to restart when it hits certain threshold + def load_data_on_start(self): + """ + New function to replace get_klines without websockets - This is to avoid excess memory consumption + After each "update_required" restart, this function will reload bots and settings """ - self.streaming_db.research_controller.update_one( - {"_id": "settings"}, {"$set": {"update_required": time()}} - ) + self.settings = self.autotrade_controller.get_autotrade_settings() + self.test_settings = self.autotrade_controller.get_test_autotrade_settings() + # Load real bot settings + bot_controller = Bot(collection_name="bots") + self.list_bots = bot_controller.get_active_pairs() + # Load paper trading bot settings + paper_trading_controller_paper = Bot(collection_name="paper_trading") + self.list_paper_trading_bots = paper_trading_controller_paper.get_active_pairs() def execute_strategies( self, @@ -55,9 +48,10 @@ def execute_strategies( It updates the bots deals, safety orders, trailling orders, stop loss for both paper trading test bots and real bots """ + active_bot = BotSchema(**current_bot) # Margin short - if current_bot["strategy"] == "margin_short": - margin_deal = MarginDeal(current_bot, db_collection_name) + if active_bot.strategy == Strategy.margin_short: + margin_deal = MarginDeal(active_bot, db_collection_name) try: margin_deal.streaming_updates(close_price) except BinanceErrors as error: @@ -66,175 +60,51 @@ def execute_strategies( except Exception as error: logging.info(error) margin_deal.update_deal_logs(error) - # Go to _update_required - self._update_required() pass else: # Long strategy starts - if current_bot["strategy"] == "long": - spot_long_deal = SpotLongDeal( - current_bot, db_collection_name - ) + if active_bot.strategy == Strategy.long: + spot_long_deal = SpotLongDeal(active_bot, db_collection_name) try: spot_long_deal.streaming_updates(close_price, open_price) except BinanceErrors as error: if error.code in (-2010, -1013): spot_long_deal.update_deal_logs(error.message) - current_bot["status"] = Status.error + active_bot["status"] = Status.error self.save_bot_streaming() except Exception as error: logging.info(error) spot_long_deal.update_deal_logs(error) - # Go to _update_required - self._update_required() pass pass - def on_error(self, socket, msg): - logging.error(f'Streaming_Controller error:{msg}') - self.get_klines() - - def on_message(self, socket, message): - # If fails to connect, this will cancel loop - res = json.loads(message) - - if "result" in res and res["result"]: - logging.info(f'Subscriptions: {res["result"]}') - - if "data" in res: - if "e" in res["data"] and res["data"]["e"] == "kline": - self.process_klines(res["data"]) - else: - logging.error(f'Error: {res["data"]}') - self.client.stop() - - def process_klines(self, result): + def process_klines(self, message): """ Updates deals with klines websockets, when price and symbol match existent deal """ - # Re-retrieve settings in the middle of streaming - local_settings = self.streaming_db.research_controller.find_one( - {"_id": "settings"} - ) - - if "k" in result: - close_price = result["k"]["c"] - open_price = result["k"]["o"] - symbol = result["k"]["s"] - current_bot = self.streaming_db.bots.find_one( - {"pair": symbol, "status": Status.active} + data = json.loads(message) + close_price = data["close_price"] + open_price = data["open_price"] + symbol = data["symbol"] + current_bot = Bot(collection_name="bots").get_one(symbol=symbol) + current_test_bot = Bot(collection_name="paper_trading").get_one(symbol=symbol) + + if current_bot: + self.execute_strategies( + current_bot, + close_price, + open_price, + "bots", ) - current_test_bot = self.streaming_db.paper_trading.find_one( - {"pair": symbol, "status": Status.active} + if current_test_bot: + self.execute_strategies( + current_test_bot, + close_price, + open_price, + "paper_trading", ) - if current_bot: - self.execute_strategies( - current_bot, - close_price, - open_price, - "bots", - ) - if current_test_bot: - self.execute_strategies( - current_test_bot, - close_price, - open_price, - "paper_trading", - ) - # Add margin time to update_required signal to avoid restarting constantly - # About 1000 seconds (16.6 minutes) - similar to candlestick ticks of 15m - if local_settings["update_required"]: - logging.debug( - f'Time elapsed for update_required: {time() - local_settings["update_required"]}' - ) - if (time() - local_settings["update_required"]) > 40: - self.streaming_db.research_controller.update_one( - {"_id": "settings"}, {"$set": {"update_required": time()}} - ) - logging.info(f"Restarting streaming_controller {self.list_bots}") - # raise TerminateStreaming() - self.get_klines() return - - def get_klines(self): - interval = self.settings["candlestick_interval"] - self.list_bots = list( - self.streaming_db.bots.distinct("pair", {"status": "active"}) - ) - self.list_paper_trading_bots = list( - self.streaming_db.paper_trading.distinct("pair", {"status": "active"}) - ) - # Reset to new update_required system - self.streaming_db.research_controller.update_one( - {"_id": "settings"}, {"$set": {"update_required": time()}} - ) - - markets = self.list_bots + self.list_paper_trading_bots - logging.info(f"Streaming updates: {markets}") - self.client.klines(markets=markets, interval=interval) - - def update_order_data(self, result, db_collection: str = "bots"): - """ - Keep order data up to date - - When buy_order or sell_order is executed, they are often in - status NEW, and it takes time to update to FILLED. - This keeps order data up to date as they are executed - throught the executionReport websocket - - Args: - result (dict): executionReport websocket result - db_collection (str, optional): Defaults to "bots". - - """ - order_id = result["i"] - update = { - "$set": { - "orders.$.status": result["X"], - "orders.$.qty": result["q"], - "orders.$.order_side": result["S"], - "orders.$.order_type": result["o"], - "orders.$.timestamp": result["T"], - }, - "$inc": {"total_commission": float(result["n"])}, - "$push": {"errors": "Order status updated"}, - } - if float(result["p"]) > 0: - update["$set"]["orders.$.price"] = float(result["p"]) - else: - update["$set"]["orders.$.price"] = float(result["L"]) - - query = self.streaming_db[db_collection].update_one( - {"orders": {"$elemMatch": {"order_id": order_id}}}, - update, - ) - return query - - def get_user_data(self): - listen_key = self.get_listen_key() - self.user_data_client = SpotWebsocketStreamClient( - on_message=self.on_user_data_message, on_error=self.on_error - ) - self.user_data_client.user_data(listen_key=listen_key, action=SpotWebsocketStreamClient.subscribe) - - def on_user_data_message(self, socket, message): - logging.info("Streaming user data") - res = json.loads(message) - - if "e" in res: - if "executionReport" in res["e"]: - query = self.update_order_data(res) - if query.raw_result["nModified"] == 0: - logging.debug( - f'No bot found with order client order id: {res["i"]}. Order status: {res["X"]}' - ) - return - - elif "outboundAccountPosition" in res["e"]: - logging.info(f'Assets changed {res["e"]}') - elif "balanceUpdate" in res["e"]: - logging.info(f'Funds transferred {res["e"]}') diff --git a/api/streaming/user_data_streaming.py b/api/streaming/user_data_streaming.py new file mode 100644 index 000000000..ea9faed4a --- /dev/null +++ b/api/streaming/user_data_streaming.py @@ -0,0 +1,74 @@ +import logging +from api.streaming.socket_client import SpotWebsocketStreamClient + + +class UserDataStreaming: + def __init__(self) -> None: + pass + + def update_order_data(self, result, db_collection: str = "bots"): + """ + Keep order data up to date + + When buy_order or sell_order is executed, they are often in + status NEW, and it takes time to update to FILLED. + This keeps order data up to date as they are executed + throught the executionReport websocket + + Args: + result (dict): executionReport websocket result + db_collection (str, optional): Defaults to "bots". + + """ + order_id = result["i"] + update = { + "$set": { + "orders.$.status": result["X"], + "orders.$.qty": result["q"], + "orders.$.order_side": result["S"], + "orders.$.order_type": result["o"], + "orders.$.timestamp": result["T"], + }, + "$inc": {"total_commission": float(result["n"])}, + "$push": {"errors": "Order status updated"}, + } + if float(result["p"]) > 0: + update["$set"]["orders.$.price"] = float(result["p"]) + else: + update["$set"]["orders.$.price"] = float(result["L"]) + + query = self.streaming_db[db_collection].update_one( + {"orders": {"$elemMatch": {"order_id": order_id}}}, + update, + ) + return query + + def get_user_data(self): + listen_key = self.get_listen_key() + self.user_data_client = SpotWebsocketStreamClient( + on_message=self.on_user_data_message, on_error=self.on_error + ) + self.user_data_client.user_data( + listen_key=listen_key, action=SpotWebsocketStreamClient.subscribe + ) + + def on_user_data_message(self, socket, message): + """ + Legacy, needs improvement + """ + logging.info("Streaming user data") + res = json.loads(message) + + if "e" in res: + if "executionReport" in res["e"]: + query = self.update_order_data(res) + if query.raw_result["nModified"] == 0: + logging.debug( + f'No bot found with order client order id: {res["i"]}. Order status: {res["X"]}' + ) + return + + elif "outboundAccountPosition" in res["e"]: + logging.info(f'Assets changed {res["e"]}') + elif "balanceUpdate" in res["e"]: + logging.info(f'Funds transferred {res["e"]}') diff --git a/api/tests/test_assets.py b/api/tests/test_assets.py index 595d16642..41160dfde 100644 --- a/api/tests/test_assets.py +++ b/api/tests/test_assets.py @@ -1,8 +1,6 @@ -from fastapi.encoders import jsonable_encoder from fastapi.testclient import TestClient import pytest from account.assets import Assets -from db import Database, setup_db from main import app import mongomock @@ -15,7 +13,20 @@ @pytest.fixture() def patch_database(monkeypatch): - data = [{'_id': '65d39b44a7ff2a5e843b1e0b', 'time': '2024-02-19 18:17:40.479', 'data': [{'symbol': 'PIXELUSDT', 'priceChangePercent': '15.000', 'volume': '1235816253.30000000', 'price': '0.64880000'}]}] + data = [ + { + "_id": "65d39b44a7ff2a5e843b1e0b", + "time": "2024-02-19 18:17:40.479", + "data": [ + { + "symbol": "PIXELUSDT", + "priceChangePercent": "15.000", + "volume": "1235816253.30000000", + "price": "0.64880000", + } + ], + } + ] def new_init(self): self._db = db @@ -24,6 +35,57 @@ def new_init(self): monkeypatch.setattr(Assets, "get_market_domination", lambda a, size, *arg: data) +@pytest.fixture() +def patch_raw_balances(monkeypatch): + """ + Input data from API + Confidential data has been removed + """ + account_data = { + "makerCommission": 10, + "commissionRates": { + "maker": "0.00100000", + }, + "updateTime": 1713558823589, + "accountType": "SPOT", + "balances": [ + {"asset": "BTC", "free": 6.51e-06, "locked": 0.0}, + {"asset": "BNB", "free": 9.341e-05, "locked": 0.0}, + {"asset": "USDT", "free": 5.2, "locked": 0.0}, + {"asset": "NFT", "free": 1, "locked": 0.0}, + ], + "permissions": ["LEVERAGED", "TRD_GRP_002", "TRD_GRP_009"], + } + + monkeypatch.setattr( + Assets, "get_account_balance", lambda a, asset=None: account_data + ) + + +@pytest.fixture() +def patch_total_fiat(monkeypatch): + """ + Input data from API + Confidential data has been removed + """ + account_data = [ + {"activate": True, "balance": "0.001", "walletName": "Spot"}, + {"activate": True, "balance": "0", "walletName": "Funding"}, + {"activate": True, "balance": "0", "walletName": "Cross Margin"}, + {"activate": True, "balance": "0", "walletName": "Isolated Margin"}, + {"activate": True, "balance": "0", "walletName": "USDⓈ-M Futures"}, + {"activate": True, "balance": "0", "walletName": "COIN-M Futures"}, + {"activate": True, "balance": "0", "walletName": "Earn"}, + {"activate": False, "balance": "0", "walletName": "Options"}, + {"activate": False, "balance": "0", "walletName": "Trading Bots"}, + ] + + monkeypatch.setattr(Assets, "get_wallet_balance", lambda self: account_data) + monkeypatch.setattr( + Assets, "ticker", lambda self, symbol="BTCUSDT", json=False: {"price": "20000"} + ) + + def test_get_market_domination(monkeypatch, patch_database): response = app_client.get("/account/market-domination") @@ -31,15 +93,57 @@ def test_get_market_domination(monkeypatch, patch_database): # Assert the expected result expected_result = { "data": { - "dates": ['2024-02-19 18:17:40.479'], + "dates": ["2024-02-19 18:17:40.479"], "gainers_percent": [1235816253.3], "losers_percent": [0.0], "gainers_count": [1], "losers_count": [0], - "total_volume": [801797585.14104] + "total_volume": [801797585.14104], }, "message": "Successfully retrieved market domination data.", "error": 0, } assert response.status_code == 200 assert response.json() == expected_result + + +def test_get_raw_balance(patch_raw_balances): + """ + Test get all raw_balances + """ + + response = app_client.get("/account/balance/raw") + expected_result = [ + {"asset": "BTC", "free": 6.51e-06, "locked": 0.0}, + {"asset": "BNB", "free": 9.341e-05, "locked": 0.0}, + {"asset": "USDT", "free": 5.2, "locked": 0.0}, + {"asset": "NFT", "free": 1, "locked": 0.0}, + ] + + assert response.status_code == 200 + content = response.json() + assert content["data"] == expected_result + + +def test_total_fiat(patch_total_fiat): + """ + Test get balance estimates + """ + + response = app_client.get("/account/fiat") + + assert response.status_code == 200 + content = response.json() + assert content["data"] == 20 + + +def test_available_fiat(patch_raw_balances): + """ + Test available fiat + """ + + response = app_client.get("/account/fiat/available") + + assert response.status_code == 200 + content = response.json() + assert content["data"] == 5.2 diff --git a/api/tests/test_bots.py b/api/tests/test_bots.py new file mode 100644 index 000000000..52006ad5b --- /dev/null +++ b/api/tests/test_bots.py @@ -0,0 +1,89 @@ +from cgi import test +from fastapi.testclient import TestClient +from httpx import patch +import pytest +from bots.controllers import Bot +from main import app +import mongomock + + +@pytest.fixture() +def patch_bot(monkeypatch): + bot = { + "id": "6624255433c3cf9806d0a70e", + "pair": "ADXUSDT", + "balance_size_to_use": "0.0", + "balance_to_use": "USDT", + "base_order_size": "50", + "candlestick_interval": "15m", + "close_condition": "dynamic_trailling", + "cooldown": 360, + "created_at": 1713643305854.368, + "deal": { + "buy_price": 0.2257, + "base_order_price": 0.0, + "buy_timestamp": 1713644889534.0, + "buy_total_qty": 221.0, + }, + "errors": [], + "mode": "autotrade", + "name": "coinrule_fast_and_slow_macd_2024-04-20T22:28", + "orders": [ + { + "order_type": "LIMIT", + "time_in_force": "GTC", + "timestamp": 1713644889534.0, + "pair": "ADXUSDT", + "qty": "221.00000000", + "order_side": "BUY", + "order_id": 1, + "fills": [], + "price": 0.2257, + "status": "NEW", + "deal_type": "base_order", + } + ], + "status": "active", + "stop_loss": 3.0, + "take_profit": 2.3, + "trailling": "true", + "trailling_deviation": 3.0, + "trailling_profit": 0.0, + "strategy": "long", + "updated_at": 1713643305854.435, + } + + def new_init(self, collection_name="bots"): + mongo_client = mongomock.MongoClient() + self.db = mongo_client.db + self.db_collection = self.db[collection_name] + + monkeypatch.setattr(Bot, "__init__", new_init) + monkeypatch.setattr(Bot, "get_one", lambda self, bot_id, symbol: bot) + + return bot + + +def test_get_one_by_id(patch_bot): + client = TestClient(app) + test_id = "6624255433c3cf9806d0a70e" + response = client.get(f"/bot/{test_id}") + + # Assert the expected result + expected_result = patch_bot + + assert response.status_code == 200 + content = response.json() + assert content["data"] == expected_result + +def test_get_one_by_symbol(patch_bot): + client = TestClient(app) + symbol = "ADXUSDT" + response = client.get(f"/bot/{symbol}") + + # Assert the expected result + expected_result = patch_bot + + assert response.status_code == 200 + content = response.json() + assert content["data"] == expected_result diff --git a/api/tools/enum_definitions.py b/api/tools/enum_definitions.py index 98e375ea1..6313c9891 100644 --- a/api/tools/enum_definitions.py +++ b/api/tools/enum_definitions.py @@ -1,9 +1,11 @@ from enum import Enum + class EnumDefinitions: """ Enums established by Binance API """ + symbol_status = ( "PRE_TRADING", "TRADING", @@ -48,6 +50,7 @@ class BinbotEnums: mode = ("manual", "autotrade") strategy = ("long", "short", "margin_long", "margin_short") + class Status(str, Enum): inactive = "inactive" active = "active" @@ -55,13 +58,15 @@ class Status(str, Enum): error = "error" archived = "archived" + class Strategy(str, Enum): long = "long" margin_short = "margin_short" + class OrderType(str, Enum): limit = "LIMIT" - market= "MARKET" + market = "MARKET" stop_loss = "STOP_LOSS" stop_loss_limit = "STOP_LOSS_LIMIT" take_profit = "TAKE_PROFIT" @@ -80,6 +85,7 @@ class TimeInForce(str, Enum): def __str__(self): return str(self.str) + class OrderSide(str, Enum): buy = "BUY" sell = "SELL" @@ -87,16 +93,41 @@ class OrderSide(str, Enum): def __str__(self): return str(self.str) + class CloseConditions(str, Enum): dynamic_trailling = "dynamic_trailling" - timestamp = "timestamp" # No trailling, standard stop loss - market_reversal = "market_reversal" # binbot-research param (self.market_trend_reversal) + timestamp = "timestamp" # No trailling, standard stop loss + market_reversal = ( + "market_reversal" # binbot-research param (self.market_trend_reversal) + ) def __str__(self): return str(self.str) - + class TrendEnum(str, Enum): up_trend = "uptrend" down_trend = "downtrend" - neutral = None \ No newline at end of file + neutral = None + + +class KafkaTopics(str, Enum): + klines_store_topic = "klines_store_topic" + processed_klines_topic = "processed_klines_topic" + technical_indicators = "technical_indicators" + signals = "signals" + restart_streaming = "restart_streaming" + + def __str__(self): + return str(self.str) + + +class DealType(str, Enum): + base_order = "base_order" + take_profit = "take_profit" + short_sell = "short_sell" + short_buy = "short_buy" + margin_short = "margin_short" + + def __str__(self): + return str(self.str) diff --git a/api/tools/exceptions.py b/api/tools/exceptions.py index b69b528b5..c9b829821 100644 --- a/api/tools/exceptions.py +++ b/api/tools/exceptions.py @@ -93,4 +93,4 @@ class TerminateStreaming(Exception): pass class MaxBorrowLimit(BinbotErrors): - pass \ No newline at end of file + pass diff --git a/api/tools/handle_error.py b/api/tools/handle_error.py index cf2b026f2..fd17138df 100644 --- a/api/tools/handle_error.py +++ b/api/tools/handle_error.py @@ -54,8 +54,6 @@ def handle_binance_errors(response: Response) -> Response: - Binbot internal errors - bot errors, returns "errored" """ - content = response.json() - # Binance doesn't seem to reach 418 or 429 even after 2000 weight requests if ( response.headers.get("x-mbx-used-weight-1m") @@ -68,17 +66,21 @@ def handle_binance_errors(response: Response) -> Response: print("Request weight limit hit, ban will come soon, waiting 1 hour") sleep(3600) + # Cloudfront 403 error + if response.status_code == 403 and response.reason: + raise HTTPError(response.reason) + + content = response.json() + if response.status_code == 404: - error = response.json() - raise HTTPError(error) + raise HTTPError(content) - # Show error message for bad requests + # Show error messsage for bad requests if response.status_code >= 400: - error = response.json() - if "msg" in error: - raise BinanceErrors(error["msg"], error["code"]) - if "error" in error: - raise BinbotErrors(error["message"]) + if "msg" in content: + raise BinanceErrors(content["msg"], content["code"]) + if "error" in content: + raise BinbotErrors(content["message"]) # Binbot errors if content and "error" in content and content["error"] == 1: diff --git a/api/tools/helpers.py b/api/tools/helpers.py new file mode 100644 index 000000000..cdaf5e740 --- /dev/null +++ b/api/tools/helpers.py @@ -0,0 +1,24 @@ +from pymongo import ReturnDocument +from db import Database + + +class MongoDbHelpers(Database): + + def update_deal_logs(self, msg): + """ + Use this function if independently updating Event logs (deal.errors list) + especially useful if a certain operation might fail in an exception + and the error needs to be stored in the logs + + However, if save_bot_streaming is used later, + and there is almost no chance of failure, + best to this.active_bot.errors.append(str(msg)) so we can save + some DB calls. + """ + result = self.db_collection.find_one_and_update( + {"id": self.active_bot.id}, + {"$push": {"errors": str(msg)}}, + return_document=ReturnDocument.AFTER, + ) + self.active_bot.errors = result["errors"] + return result \ No newline at end of file diff --git a/binquant b/binquant index f73ee607c..1c2f1753f 160000 --- a/binquant +++ b/binquant @@ -1 +1 @@ -Subproject commit f73ee607ce8f72e57b082590adafe61c2c7788c2 +Subproject commit 1c2f1753f561fc4443c9bd1d2dbdf79dd28a2e37 diff --git a/docker-compose.yml b/docker-compose.yml index 6d0151056..9d58f7e51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,9 +35,9 @@ services: - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,EXTERNAL://kafka:9094 - - KAFKA_BROKER_ID=1 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:9093 - ALLOW_PLAINTEXT_LISTENER=yes + - KAFKA_BROKER_ID=1 - KAFKA_CFG_NODE_ID=1 - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true - KAFKA_CFG_NUM_PARTITIONS=950 @@ -45,7 +45,6 @@ services: healthcheck: test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server kafka:9092 --topic klines_store_topic --create --if-not-exists", "kafka-topics.sh --bootstrap-server kafka:9092 --topic klines_store_topic --describe"] start_period: 10s - interval: 5s timeout: 10s retries: 5