Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement order websockets #527

Merged
merged 6 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile.streaming
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ COPY api api
WORKDIR api
RUN pip3 install pipenv --no-cache-dir --upgrade
RUN pipenv install --system --deploy --ignore-pipfile --clear
ENTRYPOINT ["python3", "-u", "market_updates.py"]
COPY entrypoint.sh entrypoint.sh
RUN chmod +x entrypoint.sh
ENTRYPOINT ["./entrypoint.sh"]

STOPSIGNAL SIGTERM
14 changes: 11 additions & 3 deletions api/account/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,35 +440,43 @@ def get_market_domination(self, size=7):
dict: A dictionary containing the market domination data, including gainers and losers counts, percentages, and dates.
"""
try:
data = list(self.db.market_domination.find({}))
data = list(self.db.market_domination.find(
{ "$query": {}, "$orderby": { "_id" : -1 } }
).limit(size))
market_domination_series = MarketDominationSeries()

for item in data:
gainers_percent = 0
losers_percent = 0
gainers_count = 0
losers_count = 0
total_volume = 0
if "data" in item:
for crypto in item["data"]:
if float(crypto['priceChangePercent']) > 0:
gainers_percent += float(crypto['priceChangePercent'])
gainers_percent += float(crypto['volume'])
gainers_count += 1

if float(crypto['priceChangePercent']) < 0:
losers_percent += abs(float(crypto['priceChangePercent']))
losers_percent += abs(float(crypto['volume']))
losers_count += 1

if float(crypto['volume']) > 0:
total_volume += float(crypto['volume'])

market_domination_series.dates.append(item["time"])
market_domination_series.gainers_percent.append(gainers_percent)
market_domination_series.losers_percent.append(losers_percent)
market_domination_series.gainers_count.append(gainers_count)
market_domination_series.losers_count.append(losers_count)
market_domination_series.total_volume.append(total_volume)

market_domination_series.dates = market_domination_series.dates[-size:]
market_domination_series.gainers_percent = market_domination_series.gainers_percent[-size:]
market_domination_series.losers_percent = market_domination_series.losers_percent[-size:]
market_domination_series.gainers_count = market_domination_series.gainers_count[-size:]
market_domination_series.losers_count = market_domination_series.losers_count[-size:]
market_domination_series.total_volume = market_domination_series.total_volume[-size:]

data = market_domination_series.dict()

Expand Down
2 changes: 1 addition & 1 deletion api/account/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def one_click_liquidation(asset):
return Assets().one_click_liquidation(asset)

@account_blueprint.get("/market-domination", tags=["assets"], response_model=MarketDominationResponse)
def market_domination(size: int):
def market_domination(size: int=7):
return Assets().get_market_domination(size)

@account_blueprint.get("/store-market-domination", tags=["assets"], response_model=GetMarketDominationResponse)
Expand Down
1 change: 1 addition & 0 deletions api/account/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class MarketDominationSeries(BaseModel):
losers_percent: list[float] = []
gainers_count: list[int] = []
losers_count: list[int] = []
total_volume: list[float] = []


class GetMarketDominationResponse(StandardResponse):
Expand Down
16 changes: 16 additions & 0 deletions api/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def request(self, method="GET", **args):
data = handle_binance_errors(res)
return data

def get_listen_key(self):
"""
Get user data websocket stream
"""
headers = {"Content-Type": "application/json", "X-MBX-APIKEY": self.key}
data = self.request("POST", url=self.user_data_stream, headers=headers)
return data["listenKey"]

def cancel_margin_order(self, symbol, order_id):
return self.signed_request(self.margin_order, method="DELETE", payload={"symbol": symbol, "orderId": order_id})

Expand Down Expand Up @@ -167,6 +175,14 @@ def transfer_dust(self, assets: List[str]):
response = self.signed_request(url=self.dust_transfer_url, method="POST", payload={"asset": list_assets})
return response

def get_all_orders(self, symbol, order_id):
"""
Get all orders given symbol and order_id

https://binance-docs.github.io/apidocs/spot/en/#current-open-orders-user_data
"""
return self.signed_request(self.all_orders_url, payload={"symbol": symbol, "orderId": order_id})

class BinbotApi(BinanceApi):
"""
API endpoints on this project itself
Expand Down
5 changes: 3 additions & 2 deletions api/deals/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tools.exceptions import BinanceErrors, MarginLoanNotFound
from scipy.stats import linregress
from tools.round_numbers import round_numbers_ceiling
from tools.enum_definitions import Status
from tools.enum_definitions import Status, Strategy


# To be removed one day when commission endpoint found that provides this value
Expand All @@ -36,7 +36,8 @@ def __init__(self, bot, db_collection_name):
self.symbol = self.active_bot.pair
super().__init__(symbol=self.active_bot.pair)
self.db_collection = self.db[db_collection_name]
self.isolated_balance: float = self.get_isolated_balance(self.symbol)
if self.active_bot.strategy == Strategy.margin_short:
self.isolated_balance: float = self.get_isolated_balance(self.symbol)

def __repr__(self) -> str:
"""
Expand Down
6 changes: 3 additions & 3 deletions api/deals/margin.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,9 @@ def update_trailling_profit(self, close_price):

# Reset stop_loss_price to avoid confusion in front-end
self.active_bot.deal.stop_loss_price = 0
logging.info(
f"{self.active_bot.pair} Updating after broken first trailling_profit (short)"
)
milestone_msg = f"{self.active_bot.pair} Updating after broken first trailling_profit (short)"
logging.info(milestone_msg)
self.update_deal_logs(milestone_msg)

# Direction 1 (downward): breaking the current trailling
if float(close_price) <= float(self.active_bot.deal.trailling_profit_price):
Expand Down
22 changes: 22 additions & 0 deletions api/deals/spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,28 @@ def streaming_updates(self, close_price, open_price):
logging.error(error)
return

# Update unfilled orders
unupdated_order = next(
(
deal
for deal in self.active_bot.orders
if deal.deal_type == "NEW" or deal.price == 0
),
None,
)
if unupdated_order:
order_response = self.get_all_orders(self.active_bot.pair, unupdated_order.order_id)
logging.info(f"Unfilled orders response{order_response}")
if order_response[0]["status"] == "FILLED":
for i, order in enumerate(self.active_bot.orders):
if order.order_id == order_response["orderId"]:
self.active_bot.orders[i].price = order_response["price"]
self.active_bot.orders[i].qty = order_response["origQty"]
self.active_bot.orders[i].fills = order_response["fills"]
self.active_bot.orders[i].status = order_response["status"]

self.save_bot_streaming()

# Open safety orders
# When bot = None, when bot doesn't exist (unclosed websocket)
if (
Expand Down
14 changes: 1 addition & 13 deletions api/market_updates.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import atexit
import os
import logging
import time

from apscheduler.schedulers.background import BackgroundScheduler
from streaming.streaming_controller import StreamingController
from account.assets import Assets
from websocket import WebSocketConnectionClosedException


logging.Formatter.converter = time.gmtime # date time in GMT/UTC
logging.basicConfig(
Expand All @@ -17,16 +11,10 @@
datefmt="%Y-%m-%d %H:%M:%S",
)

try:

try:
mu = StreamingController()
mu.get_klines()

except WebSocketConnectionClosedException as e:
logging.error("Lost websocket connection")
mu = StreamingController()
mu.get_klines()

except Exception as error:
logging.error(f"Streaming controller error: {error}")
mu = StreamingController()
Expand Down
20 changes: 20 additions & 0 deletions api/order_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging
import time

from streaming.streaming_controller import StreamingController

logging.Formatter.converter = time.gmtime # date time in GMT/UTC
logging.basicConfig(
level=logging.INFO,
filename=None,
format="%(asctime)s.%(msecs)03d UTC %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

try:
mu = StreamingController()
mu.get_user_data()
except Exception as error:
logging.error(f"User data streaming error: {error}")
mu = StreamingController()
mu.get_user_data()
25 changes: 24 additions & 1 deletion api/orders/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,20 @@ def sell_order(self, symbol, qty, price=None):
"type": OrderType.market,
"quantity": supress_notation(qty, self.qty_precision),
}
# Because market orders don't have price
# get it from fills

data = self.signed_request(url=self.order_url, method="POST", payload=payload)
logging.info(f'Sell transaction: {data["price"]}')

if data["price"] == 0:
total_qty = 0
weighted_avg = 0
for item in data["fills"]:
weighted_avg += float(item["price"]) * float(item["qty"])
total_qty += float(item["qty"])

weighted_avg_price = weighted_avg / total_qty
data["price"] = weighted_avg_price

return data

Expand Down Expand Up @@ -106,6 +118,17 @@ def buy_order(self, symbol, qty, price=None):
}

data = self.signed_request(url=self.order_url, method="POST", payload=payload)

if data["price"] == 0:
total_qty = 0
weighted_avg = 0
for item in data["fills"]:
weighted_avg += float(item["price"]) * float(item["qty"])
total_qty += float(item["qty"])

weighted_avg_price = weighted_avg / total_qty
data["price"] = weighted_avg_price

return data

def get_open_orders(self):
Expand Down
4 changes: 4 additions & 0 deletions api/streaming/socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@ def klines(self, markets: list, interval: str, id=None, action=None):
params.append(f"{market.lower()}@kline_{interval}")

self.send_message_to_server(params, action=action, id=id)

def user_data(self, listen_key: str, id=None, action=None, **kwargs):
"""Listen to user data by using the provided listen_key"""
self.send_message_to_server(listen_key, action=action, id=id)
64 changes: 35 additions & 29 deletions api/streaming/streaming_controller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import logging
from apis import BinanceApi
from db import setup_db
from deals.margin import MarginDeal
from deals.spot import SpotLongDeal
from time import time
from streaming.socket_client import SpotWebsocketStreamClient
from tools.exceptions import TerminateStreaming

class StreamingController:
class StreamingController(BinanceApi):
def __init__(self):
# For some reason, db connections internally only work with
# db:27017 instead of localhost=:2018
Expand Down Expand Up @@ -44,7 +46,6 @@ def execute_strategies(
current_bot,
close_price: str,
open_price: str,
symbol: str,
db_collection_name,
):
"""
Expand Down Expand Up @@ -82,7 +83,7 @@ def execute_strategies(
pass

def on_error(self, socket, msg):
logging.error(msg)
logging.error(f'Streaming_Controller error:{msg}')
self.get_klines()

def on_message(self, socket, message):
Expand Down Expand Up @@ -124,15 +125,13 @@ def process_klines(self, result):
current_bot,
close_price,
open_price,
symbol,
"bots",
)
if current_test_bot:
self.execute_strategies(
current_test_bot,
close_price,
open_price,
symbol,
"paper_trading",
)

Expand All @@ -142,11 +141,12 @@ def process_klines(self, result):
logging.debug(
f'Time elapsed for update_required: {time() - local_settings["update_required"]}'
)
if (time() - local_settings["update_required"]) > 20:
if (time() - local_settings["update_required"]) > 40:
self.streaming_db.research_controller.update_one(
{"_id": "settings"}, {"$set": {"update_required": time()}}
)
logging.info("Restarting streaming_controller")
logging.info(f"Restarting streaming_controller {self.list_bots}")
# raise TerminateStreaming()
self.get_klines()
return

Expand Down Expand Up @@ -235,28 +235,34 @@ def process_user_data(self, result):
return
return

async def get_user_data(self):
logging.info("Streaming user data")
socket, client = await self.setup_client()
user_data = socket.user_socket()
async with user_data as ud:
while True:
try:
res = await ud.recv()
def get_user_data(self):
listen_key = self.get_listen_key()
self.user_data_client = SpotWebsocketStreamClient(
on_message=self.on_user_data_message, on_error=self.on_error
)
self.user_data_client.user_data(listen_key=listen_key, action=SpotWebsocketStreamClient.subscribe)

if "e" in res:
if "executionReport" in res["e"]:
self.process_user_data(res)
elif "outboundAccountPosition" in res["e"]:
logging.info(f'Assets changed {res["e"]}')
elif "balanceUpdate" in res["e"]:
logging.info(f'Funds transferred {res["e"]}')
else:
logging.info(f"Unrecognized user data: {res}")
def on_user_data_message(self, socket, message):
logging.info("Streaming user data")
res = json.loads(message)

pass
except Exception as error:
logging.info(f"get_user_data sockets error: {error}")
pass
if "result" in res and res["result"]:
logging.info(f'Subscriptions: {res["result"]}')

await client.close_connection()
# if "data" in res:
# if "e" in res["data"] and res["data"]["e"] == "kline":
# self.process_klines(res["data"])
# self.process_user_data(res["data"])
# else:
# logging.error(f'Error: {res["data"]}')
# self.client.stop()
logging.info(f'User data {res}')
if "data" in res:
if "e" in res:
if "executionReport" in res["e"]:
logging.info(f'executionReport {res}')
self.process_user_data(res)
elif "outboundAccountPosition" in res["e"]:
logging.info(f'Assets changed {res["e"]}')
elif "balanceUpdate" in res["e"]:
logging.info(f'Funds transferred {res["e"]}')
4 changes: 4 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

exec python3 -u market_updates.py &
exec python3 -u order_updates.py
Loading
Loading