From 1b3ec2a82bdea803b4a8c8d772c03161d8246113 Mon Sep 17 00:00:00 2001 From: Raphael Vullriede Date: Mon, 29 Apr 2024 09:04:04 +0200 Subject: [PATCH] feat: add support for OHLC streaming on kraken.com --- .../kraken/KrakenStreamingAdapters.java | 61 ++++++++++---- .../KrakenStreamingMarketDataService.java | 21 ++++- .../kraken/KrakenStreamingService.java | 82 +++++++++++++------ .../kraken/dto/KrakenSubscriptionConfig.java | 6 +- .../service/netty/NettyStreamingService.java | 48 +++++++---- 5 files changed, 161 insertions(+), 57 deletions(-) diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingAdapters.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingAdapters.java index 83903c7bcb7..81ae27fb369 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingAdapters.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingAdapters.java @@ -1,26 +1,37 @@ package info.bitrich.xchangestream.kraken; -import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.*; +import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.createCrcChecksum; -import com.fasterxml.jackson.databind.*; -import com.fasterxml.jackson.databind.node.*; -import com.google.common.collect.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.Streams; -import java.math.*; -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.stream.*; -import org.knowm.xchange.dto.*; -import org.knowm.xchange.dto.marketdata.*; -import org.knowm.xchange.dto.trade.*; -import org.knowm.xchange.instrument.*; -import org.knowm.xchange.kraken.*; -import org.knowm.xchange.kraken.dto.trade.*; -import org.knowm.xchange.utils.*; -import org.slf4j.*; +import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.knowm.xchange.dto.Order; +import org.knowm.xchange.dto.marketdata.OrderBook; +import org.knowm.xchange.dto.marketdata.Ticker; +import org.knowm.xchange.dto.marketdata.Trade; +import org.knowm.xchange.dto.trade.LimitOrder; +import org.knowm.xchange.instrument.Instrument; +import org.knowm.xchange.kraken.KrakenAdapters; +import org.knowm.xchange.kraken.dto.trade.KrakenType; +import org.knowm.xchange.utils.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Kraken streaming adapters */ public class KrakenStreamingAdapters { + private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingAdapters.class); static final String ASK_SNAPSHOT = "as"; @@ -223,6 +234,22 @@ public static Trade adaptTrade(Instrument instrument, JsonNode arrayNode) { .build(); } + public static KrakenStreamingOhlc adaptOhlc(Instrument instrument, ArrayNode arrayNode) { + ArrayNode data = (ArrayNode) arrayNode.get(1); + + return new KrakenStreamingOhlc( + (long) (Double.parseDouble(data.get(0).textValue()) * 1000), // time in millis since epoch + (long) (Double.parseDouble(data.get(1).textValue()) * 1000), + // etime time in millis since epoch + arrayNodeItemAsDecimal(data, 2), // open + arrayNodeItemAsDecimal(data, 3), // high + arrayNodeItemAsDecimal(data, 4), // low + arrayNodeItemAsDecimal(data, 5), // close + arrayNodeItemAsDecimal(data, 6), // vwap + arrayNodeItemAsDecimal(data, 7), // volume + arrayNodeItemAsDecimal(data, 8).longValue()); // count + } + /** * Returns the element at index in arrayNode as a BigDecimal. Retuns null if the arrayNode is null * or index does not exist. @@ -270,4 +297,4 @@ private static Order.OrderType nextNodeAsOrderType(Iterator iterator) } return KrakenAdapters.adaptOrderType(KrakenType.fromString(iterator.next().textValue())); } -} +} \ No newline at end of file diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java index 612f8869843..8975f2addb9 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.java @@ -4,15 +4,20 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc; import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName; import io.reactivex.Observable; +import java.math.BigDecimal; +import java.util.List; import java.util.TreeSet; +import org.apache.commons.beanutils.ConvertUtils; import org.apache.commons.lang3.ObjectUtils; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; import org.knowm.xchange.dto.marketdata.Trade; import org.knowm.xchange.dto.trade.LimitOrder; +import org.knowm.xchange.kraken.dto.marketdata.KrakenOHLC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +94,20 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { KrakenStreamingAdapters.adaptTrades(currencyPair, arrayNode))); } + + public Observable getOHLC( + CurrencyPair currencyPair, Integer interval) { + String channelName = getChannelName(KrakenSubscriptionName.ohlc, currencyPair); + // args[0] is reserved for an optional order boo depth, we'll use args[1] for the interval + Object[] args = new Object[2]; + args[0] = null; + args[1] = interval; + + return subscribe(channelName, MIN_DATA_ARRAY_SIZE, args) + .map( arrayNode -> KrakenStreamingAdapters.adaptOhlc(currencyPair, arrayNode)); + } + + public Observable subscribe(String channelName, int maxItems, Object... args) { return service .subscribeChannel(channelName, args) @@ -112,4 +131,4 @@ public String getChannelName(KrakenSubscriptionName subscriptionName, CurrencyPa String pair = currencyPair.base.toString() + "/" + currencyPair.counter.toString(); return subscriptionName + KRAKEN_CHANNEL_DELIMITER + pair; } -} +} \ No newline at end of file diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java index 01758e6893b..741dcb8a206 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/KrakenStreamingService.java @@ -5,7 +5,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.kraken.dto.*; +import info.bitrich.xchangestream.kraken.dto.KrakenEvent; +import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionConfig; +import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionMessage; +import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage; +import info.bitrich.xchangestream.kraken.dto.KrakenSystemStatus; import info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType; import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; @@ -38,6 +42,7 @@ * @author makarid, pchertalev */ public class KrakenStreamingService extends JsonNettyStreamingService { + private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class); private static final String EVENT = "event"; private static final String WEBSOCKET_REQUESTS_PER_SECOND = @@ -95,6 +100,7 @@ private static RateLimiter initRateLimiter(KrakenStreamingExchange exchange) { .limitForPeriod(requestsPerSecond) .limitRefreshPeriod(Duration.ofSeconds(1)) .build()); + } return rateLimiter; } @@ -125,6 +131,7 @@ public Completable disconnect() { @Override protected void handleMessage(JsonNode message) { + String channelName = getChannel(message); try { @@ -146,38 +153,48 @@ protected void handleMessage(JsonNode message) { LOG.info("System status: {}", systemStatus); // send to subscribers if any ObservableEmitter emitter = systemChannels.get(krakenEvent.name()); - if (emitter != null) emitter.onNext(systemStatus); + + if (emitter != null) { + emitter.onNext(systemStatus); + } break; case subscriptionStatus: LOG.debug("Received subscriptionStatus message {}", message); KrakenSubscriptionStatusMessage statusMessage = mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class); Integer reqid = statusMessage.getReqid(); - if (!isPrivate && reqid != null) channelName = subscriptionRequestMap.remove(reqid); + if (!isPrivate && reqid != null) { + channelName = subscriptionRequestMap.remove(reqid); + } statusMessage.setChannelName(channelName); + Integer channelId = statusMessage.getChannelID(); switch (statusMessage.getStatus()) { case subscribed: - LOG.info("Channel {} has been subscribed", channelName); + LOG.info("Channel name={}, id={} has been subscribed", channelName, channelId); - if (statusMessage.getChannelID() != null) - channels.put(statusMessage.getChannelID(), channelName); + if (channelId != null) { + channels.put(channelId, channelName); + } break; case unsubscribed: - LOG.info("Channel {} has been unsubscribed", channelName); - channels.remove(statusMessage.getChannelID()); + LOG.info("Channel name={}, id={} has been unsubscribed", channelName, channelId); + channels.remove(channelId); break; case error: LOG.error( - "Channel {} has been failed: {}", channelName, statusMessage.getErrorMessage()); + "Channel name={}, id={} has been failed: {}", channelName, channelId, + statusMessage.getErrorMessage()); if ("ESession:Invalid session".equals(statusMessage.getErrorMessage())) { throw new ExchangeException("Issue with session validity"); } } // send to subscribers if any emitter = systemChannels.get(krakenEvent.name()); - if (emitter != null) emitter.onNext(statusMessage); + if (emitter != null) { + emitter.onNext(statusMessage); + } break; case error: LOG.error( @@ -196,7 +213,8 @@ protected void handleMessage(JsonNode message) { } if (!message.isArray() || channelName == null) { - LOG.error("Unknown message: {}", message.toString()); + LOG.error("Unknown message: isArray={}, name={}, message={}", message.isArray(), channelName, + message); return; } @@ -207,18 +225,23 @@ protected void handleMessage(JsonNode message) { protected String getChannelNameFromMessage(JsonNode message) throws IOException { String channelName = null; if (message.has("channelID")) { - channelName = channels.get(message.get("channelID").asInt()); + int channelId = message.get("channelID").asInt(); + return channels.getOrDefault(channelId, String.valueOf(channelId)); } if (message.has("channelName")) { channelName = message.get("channelName").asText(); + return channelName; } if (message.isArray()) { if (message.get(0).isInt()) { - channelName = channels.get(message.get(0).asInt()); + LOG.trace("Taking channelName from ID from first field INT)."); + int channelId = message.get(0).asInt(); + return channels.getOrDefault(channelId, String.valueOf(channelId)); } if (message.get(1).isTextual()) { channelName = message.get(1).asText(); + return channelName; } } @@ -231,6 +254,9 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { int reqID = Math.abs(UUID.randomUUID().hashCode()); + + Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null; + String[] channelData = channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER); KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); @@ -240,27 +266,34 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE KrakenSubscriptionMessage subscriptionMessage = new KrakenSubscriptionMessage( - reqID, subscribe, null, new KrakenSubscriptionConfig(subscriptionName, null, token)); + reqID, subscribe, null, + new KrakenSubscriptionConfig(subscriptionName, null, interval, token)); - return objectMapper.writeValueAsString(subscriptionMessage); + String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage); + return subscriptionMessageString; } else { String pair = channelData[1]; - subscriptionRequestMap.put(reqID, channelName); + subscriptionRequestMap.put(reqID, getSubscriptionUniqueId(channelName, args)); KrakenSubscriptionMessage subscriptionMessage = new KrakenSubscriptionMessage( reqID, subscribe, Collections.singletonList(pair), - new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null)); - return objectMapper.writeValueAsString(subscriptionMessage); + new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval, + null)); + String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage); + return subscriptionMessageString; } } @Override public String getUnsubscribeMessage(String channelName, Object... args) throws IOException { int reqID = Math.abs(UUID.randomUUID().hashCode()); + + Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null; + String[] channelData = channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER); KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]); @@ -271,7 +304,7 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I reqID, KrakenEventType.unsubscribe, null, - new KrakenSubscriptionConfig(subscriptionName, null, null)); + new KrakenSubscriptionConfig(subscriptionName)); return objectMapper.writeValueAsString(subscriptionMessage); } else { String pair = channelData[1]; @@ -282,14 +315,17 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I reqID, KrakenEventType.unsubscribe, Collections.singletonList(pair), - new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null)); + new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval, + null)); return objectMapper.writeValueAsString(subscriptionMessage); } } @Override public void sendMessage(String message) { - if (rateLimiter != null) RateLimiter.waitForPermission(rateLimiter); + if (rateLimiter != null) { + RateLimiter.waitForPermission(rateLimiter); + } super.sendMessage(message); } @@ -333,7 +369,7 @@ static Integer parseOrderBookSize(Object[] args) { if (args != null && args.length > 0) { Object obSizeParam = args[0]; LOG.debug("Specified Kraken order book size: {}", obSizeParam); - if (Number.class.isAssignableFrom(obSizeParam.getClass())) { + if (obSizeParam != null && Number.class.isAssignableFrom(obSizeParam.getClass())) { int obSize = ((Number) obSizeParam).intValue(); if (ArrayUtils.contains(KRAKEN_VALID_ORDER_BOOK_SIZES, obSize)) { return obSize; @@ -348,4 +384,4 @@ static Integer parseOrderBookSize(Object[] args) { } return null; } -} +} \ No newline at end of file diff --git a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/dto/KrakenSubscriptionConfig.java b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/dto/KrakenSubscriptionConfig.java index 226b60def64..013e0301388 100644 --- a/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/dto/KrakenSubscriptionConfig.java +++ b/xchange-stream-kraken/src/main/java/info/bitrich/xchangestream/kraken/dto/KrakenSubscriptionConfig.java @@ -33,16 +33,18 @@ public class KrakenSubscriptionConfig { private Integer interval; public KrakenSubscriptionConfig(KrakenSubscriptionName name) { - this(name, null, null); + this(name, null, null, null); } @JsonCreator public KrakenSubscriptionConfig( @JsonProperty("name") KrakenSubscriptionName name, @JsonProperty("depth") Integer depth, + @JsonProperty("interval") Integer interval, @JsonProperty("token") String token) { this.name = name; this.depth = depth; + this.interval = interval; this.token = token; } @@ -77,4 +79,4 @@ public Integer getInterval() { public void setInterval(Integer interval) { this.interval = interval; } -} +} \ No newline at end of file diff --git a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index 8170ab56b92..e9a08ff738a 100644 --- a/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/xchange-stream-service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -45,15 +45,19 @@ import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class NettyStreamingService extends ConnectableService { + private final Logger LOG = LoggerFactory.getLogger(this.getClass()); protected static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(10); @@ -215,13 +219,16 @@ protected void initChannel(SocketChannel ch) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast(new HttpClientCodec()); - if (enableLoggingHandler) + if (enableLoggingHandler) { p.addLast(new LoggingHandler(loggingHandlerLevel)); - if (compressedMessages) + } + if (compressedMessages) { p.addLast(WebSocketClientCompressionHandler.INSTANCE); + } p.addLast(new HttpObjectAggregator(8192)); - if (idleTimeoutSeconds > 0) + if (idleTimeoutSeconds > 0) { p.addLast(new IdleStateHandler(idleTimeoutSeconds, 0, 0)); + } WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler(); if (clientExtensionHandler != null) { @@ -348,7 +355,15 @@ public abstract String getUnsubscribeMessage(String channelName, Object... args) throws IOException; public String getSubscriptionUniqueId(String channelName, Object... args) { - return channelName; + + if (args == null || args.length == 0) { + return channelName; + } + + List collect = Arrays.stream(args).map(String::valueOf).collect(Collectors.toList()); + String argsString = String.join("-", collect); + + return channelName + "_" + argsString; } /** @@ -392,8 +407,8 @@ public Observable subscribeConnectionState() { } public Observable subscribeChannel(String channelName, Object... args) { - final String channelId = getSubscriptionUniqueId(channelName, args); - LOG.info("Subscribing to channel {}", channelId); + final String subscriptionUniqueId = getSubscriptionUniqueId(channelName, args); + LOG.info("Subscribing to subscriptionUniqueId={}, args={}", subscriptionUniqueId, args); return Observable.create( e -> { @@ -401,7 +416,7 @@ public Observable subscribeChannel(String channelName, Object... args) { e.onError(new NotConnectedException()); } channels.computeIfAbsent( - channelId, + subscriptionUniqueId, cid -> { Subscription newSubscription = new Subscription(e, channelName, args); try { @@ -417,13 +432,13 @@ public Observable subscribeChannel(String channelName, Object... args) { }) .doOnDispose( () -> { - if (channels.remove(channelId) != null) { + if (channels.remove(subscriptionUniqueId) != null) { try { - sendMessage(getUnsubscribeMessage(channelId, args)); + sendMessage(getUnsubscribeMessage(subscriptionUniqueId, args)); } catch (IOException e) { - LOG.debug("Failed to unsubscribe channel: {} {}", channelId, e.toString()); + LOG.debug("Failed to unsubscribe channel: {} {}", subscriptionUniqueId, e.toString()); } catch (Exception e) { - LOG.warn("Failed to unsubscribe channel: {}", channelId, e); + LOG.warn("Failed to unsubscribe channel: {}", subscriptionUniqueId, e); } } }) @@ -454,13 +469,18 @@ protected String getChannel(T message) { protected void handleMessage(T message) { String channel = getChannel(message); - if (!StringUtil.isNullOrEmpty(channel)) handleChannelMessage(channel, message); + if (!StringUtil.isNullOrEmpty(channel)) { + handleChannelMessage(channel, message); + } } protected void handleError(T message, Throwable t) { String channel = getChannel(message); - if (!StringUtil.isNullOrEmpty(channel)) handleChannelError(channel, t); - else LOG.error("handleError cannot parse channel from message: {}", message); + if (!StringUtil.isNullOrEmpty(channel)) { + handleChannelError(channel, t); + } else { + LOG.error("handleError cannot parse channel from message: {}", message); + } } protected void handleIdle(ChannelHandlerContext ctx) {