Skip to content

Commit

Permalink
Bot streaming kafka (#556)
Browse files Browse the repository at this point in the history
* WIP setup Kafka for bot streaming updates

This new setup with the Binquant Kafka server will:
- No longer require the "update_required" to restart websockets, a signal can be produced to send the order to restart
- Deduplicate the use of websockets in two places (hopefully), by resuing the klines producer

* Refactor raw balances for performance improvements

* Break down balance estimate function into simpler endpoints

This improves testing abilities (as demonstrated in the new tests), modularity of reusage and improved efficiency (each call takes much less time)

* Extend get_one bot function to also use a symbol. Add tests

* Solve str -> float model casting issues

* Remove dynamic take profit

dynamic_take_profit uses standard deviation, and following the signals using telegram this value turns out to be highly inconsistent, therefore unreliable.
  • Loading branch information
carkod authored Apr 21, 2024
1 parent a24a8c3 commit 37238e4
Show file tree
Hide file tree
Showing 24 changed files with 761 additions and 453 deletions.
21 changes: 0 additions & 21 deletions api/account/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
json_response_message,
json_response_error,
)
from decimal import Decimal
from db import setup_db
from requests_cache import CachedSession, MongoCache
from pymongo import MongoClient
Expand Down Expand Up @@ -73,26 +72,6 @@ def get_ticker_price(self, symbol: str):
data = handle_binance_errors(res)
return data["price"]

def ticker_24(self, type: str = "FULL", symbol: str | None = None):
"""
Weight 40 without symbol
https://github.com/carkod/binbot/issues/438
Using cache
"""
url = self.ticker24_url
params = {
"type": type
}
if symbol:
params["symbol"] = symbol

# mongo_cache = self.setup_mongocache()
# expire_after = 15m because candlesticks are 15m
# session = CachedSession('ticker_24_cache', backend=mongo_cache, expire_after=15)
res = requests.get(url=url, params=params)
data = handle_binance_errors(res)
return data

def find_quoteAsset(self, symbol):
"""
Expand Down
102 changes: 67 additions & 35 deletions api/account/assets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import json
from datetime import datetime, timedelta

import pandas as pd
from account.schemas import BalanceSchema, MarketDominationSeries
from bson.objectid import ObjectId
from charts.models import CandlestickParams
from charts.models import Candlestick
from db import Database, setup_db
from db import setup_db
from tools.handle_error import json_response, json_response_error, json_response_message
from tools.round_numbers import round_numbers
from tools.exceptions import BinanceErrors, InvalidSymbol, MarginLoanNotFound
Expand All @@ -22,29 +21,23 @@ def get_raw_balance(self, asset=None):
"""
Unrestricted balance
"""
data = self.signed_request(url=self.account_url)
df = pd.DataFrame(data["balances"])
df["free"] = pd.to_numeric(df["free"])
df["locked"] = pd.to_numeric(df["locked"])
df["asset"] = df["asset"].astype(str)
# Get table with > 0
balances = df[(df["free"] > 0) | (df["locked"] > 0)].to_dict("records")

if asset:
balances = df[
((df["free"] > 0) | (df["locked"] > 0)) & (df["asset"] == asset)
].to_dict("records")
# filter out empty
# Return response
resp = json_response({"data": balances})
return resp
data = self.get_account_balance()
balances = []
for item in data["balances"]:
if float(item["free"]) > 0 or float(item["locked"]) > 0:
if asset:
if item["asset"] == asset:
balances.append(item)
else:
balances.append(item)
return balances

def get_pnl(self, days=7):
current_time = datetime.now()
start = current_time - timedelta(days=days)
dummy_id = ObjectId.from_datetime(start)
data = list(
self.db.balances.find(
self._db.balances.find(
{
"_id": {
"$gte": dummy_id,
Expand All @@ -58,9 +51,9 @@ def get_pnl(self, days=7):
def _check_locked(self, b):
qty = 0
if "locked" in b:
qty = b["free"] + b["locked"]
qty = float(b["free"]) + float(b["locked"])
else:
qty = b["free"]
qty = float(b["free"])
return qty

def store_balance(self) -> dict:
Expand All @@ -69,9 +62,8 @@ def store_balance(self) -> dict:
Store current balance in Db
"""
# Store balance works outside of context as cronjob
balances_response = self.get_raw_balance()
bin_balance = json.loads(balances_response.body)
current_time = datetime.utcnow()
bin_balance = self.get_raw_balance()
current_time = datetime.now()
total_usdt: float = 0
rate: float = 0
for b in bin_balance["data"]:
Expand Down Expand Up @@ -104,7 +96,7 @@ def store_balance(self) -> dict:
try:
balance_schema = BalanceSchema(**total_balance)
balances = balance_schema.dict()
self.db.balances.update_one(
self._db.balances.update_one(
{"time": current_time.strftime("%Y-%m-%d")},
{"$set": balances},
upsert=True,
Expand All @@ -119,19 +111,18 @@ def balance_estimate(self, fiat="USDT"):
"""
Estimated balance in given fiat coin
"""
balances_response = self.get_raw_balance()
balances = self.get_raw_balance()
# Isolated m
isolated_margin = self.signed_request(url=self.isolated_account_url)
get_usdt_btc_rate = self.ticker(symbol=f"BTC{fiat}", json=False)
total_isolated_margin = float(isolated_margin["totalNetAssetOfBtc"]) * float(
get_usdt_btc_rate["price"]
)

balances = json.loads(balances_response.body)
total_fiat = 0
rate = 0
left_to_allocate = 0
for b in balances["data"]:
for b in balances:
# Transform tethers/stablecoins
if "USD" in b["asset"] or fiat == b["asset"]:
if fiat == b["asset"]:
Expand All @@ -149,7 +140,7 @@ def balance_estimate(self, fiat="USDT"):
total_fiat += float(qty) * float(rate)

balance = {
"balances": balances["data"],
"balances": balances,
"total_fiat": total_fiat + total_isolated_margin,
"total_isolated_margin": total_isolated_margin,
"fiat_left": left_to_allocate,
Expand Down Expand Up @@ -199,7 +190,7 @@ def store_balance_snapshot(self):
"""
db = setup_db()
print("Store account snapshot starting...")
current_time = datetime.utcnow()
current_time = datetime.now()
data = self.signed_request(self.account_snapshot_url, payload={"type": "SPOT"})
spot_data = next(
(item for item in data["snapshotVos"] if item["type"] == "spot"), None
Expand Down Expand Up @@ -299,7 +290,7 @@ async def get_balance_series(self, end_date, start_date):
lte_tp_id = ObjectId.from_datetime(obj_end_date)
params["_id"]["$lte"] = lte_tp_id

balance_series = list(self.db.balances.find(params).sort([("time", -1)]))
balance_series = list(self._db.balances.find(params).sort([("time", -1)]))

# btc candlestick data series
params = CandlestickParams(
Expand Down Expand Up @@ -351,7 +342,7 @@ def clean_balance_assets(self):
if len(self.exception_list) == 0:
self.exception_list = ["USDT", "NFT", "BNB"]

active_bots = list(self.db.bots.find({"status": Status.active}))
active_bots = list(self._db.bots.find({"status": Status.active}))
for bot in active_bots:
quote_asset = bot["pair"].replace(bot["balance_to_use"], "")
self.exception_list.append(quote_asset)
Expand All @@ -369,6 +360,49 @@ def clean_balance_assets(self):

return resp

def get_total_fiat(self, fiat="USDT"):
"""
Simplified version of balance_estimate
Returns:
float: total BTC estimated in the SPOT wallet
then converted into USDT
"""
wallet_balance = self.get_wallet_balance()
get_usdt_btc_rate = self.ticker(symbol=f"BTC{fiat}", json=False)
total_balance = 0
rate = float(get_usdt_btc_rate["price"])
for item in wallet_balance:
if item["activate"]:
total_balance += float(item["balance"])

total_fiat = total_balance * rate
return total_fiat

def get_available_fiat(self, fiat="USDT"):
"""
Simplified version of balance_estimate
to get free/avaliable USDT.
Getting the total USDT directly
from the balances because if it were e.g.
Margin trading, it would not be available for use.
The only available fiat is the unused USDT in the SPOT wallet.
Balance not used in Margin trading should be
transferred back to the SPOT wallet.
Returns:
str: total USDT available to
"""
total_balance = self.get_raw_balance()
for item in total_balance:
if item["asset"] == fiat:
return float(item["free"])
else:
return 0


def disable_isolated_accounts(self, symbol=None):
"""
Check and disable isolated accounts
Expand Down Expand Up @@ -423,7 +457,7 @@ def store_market_domination(self):
all_coins = sorted(all_coins, key=lambda item: float(item["priceChangePercent"]), reverse=True)
try:
current_time = datetime.now()
self.db.market_domination.insert_one(
self._db.market_domination.insert_one(
{
"time": current_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
"data": all_coins
Expand All @@ -439,11 +473,9 @@ def get_market_domination(self, size=7):
Args:
size (int, optional): Number of data points to retrieve. Defaults to 7 (1 week).
Returns:
dict: A dictionary containing the market domination data, including gainers and losers counts, percentages, and dates.
"""
query = {"$query": {}, "$orderby": {"_id": -1}}
result = self._db.market_domination.find(query).limit(size)
return list(result)

20 changes: 19 additions & 1 deletion api/account/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

@account_blueprint.get("/balance/raw", response_model=BalanceResponse, tags=["account"])
def raw_balance():
return Assets().get_raw_balance()
data = Assets().get_raw_balance()
return json_response({"data": data})


@account_blueprint.get("/symbols", tags=["account"])
Expand Down Expand Up @@ -100,6 +101,23 @@ async def get_balance_series():
def clean_balance():
return Assets().clean_balance_assets()

@account_blueprint.get("/fiat/available", response_model=BalanceSeriesResponse, tags=["assets"])
def total_balance():
"""
Total USDT in balance
Calculated by Binance
"""
total_fiat = Assets().get_available_fiat()
return json_response({"data": total_fiat})

@account_blueprint.get("/fiat", response_model=BalanceSeriesResponse, tags=["assets"])
def total_balance():
"""
Total USDT in balance
Calculated by Binance
"""
total_fiat = Assets().get_total_fiat()
return json_response({"data": total_fiat})

@account_blueprint.get(
"/disable-isolated", response_model=BalanceSeriesResponse, tags=["assets"]
Expand Down
Loading

0 comments on commit 37238e4

Please sign in to comment.