From b8eadce0e02bf27d1d1bb8922ae947c2106c931d Mon Sep 17 00:00:00 2001 From: pipixi Date: Wed, 27 Nov 2024 22:22:31 +0800 Subject: [PATCH] [Binance] Support rolling window --- .../BinanceStreamingMarketDataService.java | 67 +++++++++++++++++++ .../binance/BinanceSubscriptionType.java | 1 + 2 files changed, 68 insertions(+) diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 61cf2b7f3e..f7d74623bc 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -71,6 +71,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); private static final JavaType TICKER_TYPE = getTickerType(); + private static final JavaType WINDOW_TICKER_TYPE = getWindowTickerType(); private static final JavaType BOOK_TICKER_TYPE = getBookTickerType(); private static final JavaType TRADE_TYPE = getTradeType(); private static final JavaType DEPTH_TYPE = getDepthType(); @@ -84,6 +85,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private final int oderBookFetchLimitParameter; private final Map> tickerSubscriptions; + private final Map> rollingWindowTickerSubscriptions; private final Map> bookTickerSubscriptions; private final Map> orderbookSubscriptions; private final Map> tradeSubscriptions; @@ -91,6 +93,8 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private final Map>> klineSubscriptions; private final Map> orderBookRawUpdatesSubscriptions; + private Observable> allRollingWindowTickerSubscriptions; + /** * A scheduler for initialisation of binance order book snapshots, which is delegated to a @@ -125,6 +129,7 @@ public BinanceStreamingMarketDataService( this.marketDataService = marketDataService; this.onApiCall = onApiCall; this.tickerSubscriptions = new ConcurrentHashMap<>(); + this.rollingWindowTickerSubscriptions = new ConcurrentHashMap<>(); this.bookTickerSubscriptions = new ConcurrentHashMap<>(); this.orderbookSubscriptions = new ConcurrentHashMap<>(); this.tradeSubscriptions = new ConcurrentHashMap<>(); @@ -211,6 +216,33 @@ public Observable getRawTicker(Instrument instrument) { instrument, s -> triggerObservableBody(rawTickerStream(instrument)).share()); } + public Observable rollingWindow(Instrument instrument, KlineInterval windowSize) { + if (!service.isLiveSubscriptionEnabled() + && !service.getProductSubscription().getTicker().contains(instrument)) { + throw new UpFrontSubscriptionRequiredException(); + } + if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) { + return rollingWindowTickerSubscriptions.computeIfAbsent( + instrument, s -> triggerObservableBody(rollingWindowStream(instrument, windowSize)).share()); + }else { + throw new UnsupportedOperationException("RollingWindow not supported for other window size!"); + } + } + + public Observable> allRollingWindow(KlineInterval windowSize) { + if (!service.isLiveSubscriptionEnabled()) { + throw new UpFrontSubscriptionRequiredException(); + } + if(windowSize.equals(KlineInterval.h1) || windowSize.equals(KlineInterval.h4) || windowSize.equals(KlineInterval.d1)) { + Observable> observable = triggerObservableBody( + allRollingWindowStream(windowSize)).share(); + allRollingWindowTickerSubscriptions = observable; + return observable; + }else { + throw new UnsupportedOperationException("RollingWindow not supported for other window size!"); + } + } + public Observable getRawBookTicker(Instrument instrument) { if (!service.isLiveSubscriptionEnabled() && !service.getProductSubscription().getTicker().contains(instrument)) { @@ -396,6 +428,13 @@ private void unsubscribe( case TICKER: tickerSubscriptions.remove(instrument); break; + case TICKER_WINDOW: + if(null == instrument){ + allRollingWindowTickerSubscriptions = null; + }else { + rollingWindowTickerSubscriptions.remove(instrument); + } + break; case BOOK_TICKER: bookTickerSubscriptions.remove(instrument); break; @@ -455,6 +494,27 @@ private Observable rawTickerStream(Instrument instrument) { .map(transaction -> transaction.getData().getTicker()); } + private Observable rollingWindowStream(Instrument instrument, KlineInterval windowSize) { + return this.service + .subscribeChannel(this.getChannelPrefix(instrument) + "@" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code(), new Object[0]) + .map( + (it) -> this.readTransaction(it, TICKER_TYPE, "ticker")) + .filter( + transaction -> + BinanceAdapters.adaptSymbol( + transaction.getData().getSymbol(), instrument instanceof FuturesContract) + .equals(instrument)) + .map(transaction -> transaction.getData().getTicker()); + } + + private Observable> allRollingWindowStream(KlineInterval windowSize) { + return this.service + .subscribeChannel("!" + BinanceSubscriptionType.TICKER_WINDOW.getType() + windowSize.code() + "@arr", new Object[0]) + .map( + (it) -> this.>readTransaction(it, WINDOW_TICKER_TYPE , "ticker")) + .map( transaction -> transaction.getData().stream().map(TickerBinanceWebsocketTransaction::getTicker).collect(Collectors.toList())); + } + private Observable rawBookTickerStream(Instrument instrument) { return service .subscribeChannel( @@ -694,6 +754,13 @@ private static JavaType getTickerType() { new TypeReference>() {}); } + private static JavaType getWindowTickerType() { + return getObjectMapper() + .getTypeFactory() + .constructType( + new TypeReference>>() {}); + } + private static JavaType getBookTickerType() { return getObjectMapper() .getTypeFactory() diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java index 5a4b91d9b8..2b09105d33 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceSubscriptionType.java @@ -6,6 +6,7 @@ public enum BinanceSubscriptionType { FUNDING_RATES("markPrice"), TRADE("trade"), TICKER("ticker"), + TICKER_WINDOW("ticker_"), BOOK_TICKER("bookTicker"), KLINE("kline");