Skip to content

Commit

Permalink
[Binance] Support rolling window
Browse files Browse the repository at this point in the history
  • Loading branch information
pipixi authored and bb committed Nov 28, 2024
1 parent c20907c commit fe002a1
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);

private static final JavaType TICKER_TYPE = getTickerType();
private static final JavaType WINDOW_TICKER_TYPE = getWindowTickerType();
private static final JavaType BOOK_TICKER_TYPE = getBookTickerType();
private static final JavaType TRADE_TYPE = getTradeType();
private static final JavaType DEPTH_TYPE = getDepthType();
Expand All @@ -84,13 +85,16 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
private final int oderBookFetchLimitParameter;

private final Map<Instrument, Observable<BinanceTicker24h>> tickerSubscriptions;
private final Map<Instrument, Observable<BinanceTicker24h>> tickerWindowSubscriptions;
private final Map<Instrument, Observable<BinanceBookTicker>> bookTickerSubscriptions;
private final Map<Instrument, Observable<OrderBook>> orderbookSubscriptions;
private final Map<Instrument, Observable<BinanceRawTrade>> tradeSubscriptions;
private final Map<Instrument, Observable<List<OrderBookUpdate>>> orderBookUpdatesSubscriptions;
private final Map<Instrument, Map<KlineInterval, Observable<BinanceKline>>> klineSubscriptions;
private final Map<Instrument, Observable<DepthBinanceWebSocketTransaction>>
orderBookRawUpdatesSubscriptions;
private Observable<List<BinanceTicker24h>> allRollingWindowTickerSubscriptions;


/**
* A scheduler for initialisation of binance order book snapshots, which is delegated to a
Expand Down Expand Up @@ -125,6 +129,7 @@ public BinanceStreamingMarketDataService(
this.marketDataService = marketDataService;
this.onApiCall = onApiCall;
this.tickerSubscriptions = new ConcurrentHashMap<>();
this.tickerWindowSubscriptions = new ConcurrentHashMap<>();
this.bookTickerSubscriptions = new ConcurrentHashMap<>();
this.orderbookSubscriptions = new ConcurrentHashMap<>();
this.tradeSubscriptions = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -211,6 +216,33 @@ public Observable<BinanceTicker24h> getRawTicker(Instrument instrument) {
instrument, s -> triggerObservableBody(rawTickerStream(instrument)).share());
}

public Observable<BinanceTicker24h> rollingWindow(Instrument instrument, KlineInterval windowSize) {
if (!service.isLiveSubscriptionEnabled()
&& !service.getProductSubscription().getTicker().contains(instrument)) {
throw new UpFrontSubscriptionRequiredException();
}
if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) {
return tickerWindowSubscriptions.computeIfAbsent(
instrument, s -> triggerObservableBody(rollingWindowStream(instrument, windowSize)).share());
}else {
throw new UnsupportedOperationException("RollingWindow not supported for other window size!");
}
}

public Observable<List<BinanceTicker24h>> allRollingWindow(KlineInterval windowSize) {
if (!service.isLiveSubscriptionEnabled()) {
throw new UpFrontSubscriptionRequiredException();
}
if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) {
Observable<List<BinanceTicker24h>> observable = triggerObservableBody(
allRollingWindowStream(windowSize)).share();
allRollingWindowTickerSubscriptions = observable;
return observable;
}else {
throw new UnsupportedOperationException("RollingWindow not supported for other window size!");
}
}

public Observable<BinanceBookTicker> getRawBookTicker(Instrument instrument) {
if (!service.isLiveSubscriptionEnabled()
&& !service.getProductSubscription().getTicker().contains(instrument)) {
Expand Down Expand Up @@ -396,6 +428,10 @@ private void unsubscribe(
case TICKER:
tickerSubscriptions.remove(instrument);
break;
case TICKER_WINDOW:
tickerWindowSubscriptions.remove(instrument);
allRollingWindowTickerSubscriptions = null;
break;
case BOOK_TICKER:
bookTickerSubscriptions.remove(instrument);
break;
Expand Down Expand Up @@ -455,6 +491,27 @@ private Observable<BinanceTicker24h> rawTickerStream(Instrument instrument) {
.map(transaction -> transaction.getData().getTicker());
}

private Observable<BinanceTicker24h> rollingWindowStream(Instrument instrument, KlineInterval windowSize) {
return this.service
.subscribeChannel(this.getChannelPrefix(instrument) + "@" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code(), new Object[0])
.map(
(it) -> this.<TickerBinanceWebsocketTransaction>readTransaction(it, TICKER_TYPE, "ticker"))
.filter(
transaction ->
BinanceAdapters.adaptSymbol(
transaction.getData().getSymbol(), instrument instanceof FuturesContract)
.equals(instrument))
.map(transaction -> transaction.getData().getTicker());
}

private Observable<List<BinanceTicker24h>> allRollingWindowStream(KlineInterval windowSize) {
return this.service
.subscribeChannel("!" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code() + "@arr", new Object[0])
.map(
(it) -> this.<List<TickerBinanceWebsocketTransaction>>readTransaction(it, WINDOW_TICKER_TYPE , "ticker"))
.map( transaction -> transaction.getData().stream().map(TickerBinanceWebsocketTransaction::getTicker).collect(Collectors.toList()));
}

private Observable<BinanceBookTicker> rawBookTickerStream(Instrument instrument) {
return service
.subscribeChannel(
Expand Down Expand Up @@ -694,6 +751,13 @@ private static JavaType getTickerType() {
new TypeReference<BinanceWebsocketTransaction<TickerBinanceWebsocketTransaction>>() {});
}

private static JavaType getWindowTickerType() {
return getObjectMapper()
.getTypeFactory()
.constructType(
new TypeReference<BinanceWebsocketTransaction<List<TickerBinanceWebsocketTransaction>>>() {});
}

private static JavaType getBookTickerType() {
return getObjectMapper()
.getTypeFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public enum BinanceSubscriptionType {
FUNDING_RATES("markPrice"),
TRADE("trade"),
TICKER("ticker"),
TICKER_WINDOW("ticker_"),
BOOK_TICKER("bookTicker"),
KLINE("kline");

Expand Down

0 comments on commit fe002a1

Please sign in to comment.