Skip to content

Commit

Permalink
Kraken: add support for OHLC Streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
rvullriede committed Feb 28, 2024
1 parent 9d9b0b4 commit 8016740
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
package info.bitrich.xchangestream.kraken;

import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.*;
import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.createCrcChecksum;

import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.*;
import com.google.common.collect.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import java.math.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import org.knowm.xchange.dto.*;
import org.knowm.xchange.dto.marketdata.*;
import org.knowm.xchange.dto.trade.*;
import org.knowm.xchange.instrument.*;
import org.knowm.xchange.kraken.*;
import org.knowm.xchange.kraken.dto.trade.*;
import org.knowm.xchange.utils.*;
import org.slf4j.*;
import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.instrument.Instrument;
import org.knowm.xchange.kraken.KrakenAdapters;
import org.knowm.xchange.kraken.dto.trade.KrakenType;
import org.knowm.xchange.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Kraken streaming adapters */
public class KrakenStreamingAdapters {

private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingAdapters.class);

static final String ASK_SNAPSHOT = "as";
Expand Down Expand Up @@ -223,6 +234,22 @@ public static Trade adaptTrade(Instrument instrument, JsonNode arrayNode) {
.build();
}

public static KrakenStreamingOhlc adaptOhlc(Instrument instrument, ArrayNode arrayNode) {
ArrayNode data = (ArrayNode) arrayNode.get(1);

return new KrakenStreamingOhlc(
(long) (Double.parseDouble(data.get(0).textValue()) * 1000), // time in millis since epoch
(long) (Double.parseDouble(data.get(1).textValue()) * 1000),
// etime time in millis since epoch
arrayNodeItemAsDecimal(data, 2), // open
arrayNodeItemAsDecimal(data, 3), // high
arrayNodeItemAsDecimal(data, 4), // low
arrayNodeItemAsDecimal(data, 5), // close
arrayNodeItemAsDecimal(data, 6), // vwap
arrayNodeItemAsDecimal(data, 7), // volume
arrayNodeItemAsDecimal(data, 8).longValue()); // count
}

/**
* Returns the element at index in arrayNode as a BigDecimal. Retuns null if the arrayNode is null
* or index does not exist.
Expand Down Expand Up @@ -270,4 +297,4 @@ private static Order.OrderType nextNodeAsOrderType(Iterator<JsonNode> iterator)
}
return KrakenAdapters.adaptOrderType(KrakenType.fromString(iterator.next().textValue()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.kraken.dto.marketdata.KrakenOHLC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +94,20 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
KrakenStreamingAdapters.adaptTrades(currencyPair, arrayNode)));
}


public Observable<KrakenStreamingOhlc> getOHLC(
CurrencyPair currencyPair, Integer interval) {
String channelName = getChannelName(KrakenSubscriptionName.ohlc, currencyPair);
// args[0] is reserved for an optional order boo depth, we'll use args[1] for the interval
Object[] args = new Object[2];
args[0] = null;
args[1] = interval;

return subscribe(channelName, MIN_DATA_ARRAY_SIZE, args)
.map( arrayNode -> KrakenStreamingAdapters.adaptOhlc(currencyPair, arrayNode));
}


public Observable<ArrayNode> subscribe(String channelName, int maxItems, Object... args) {
return service
.subscribeChannel(channelName, args)
Expand All @@ -112,4 +131,4 @@ public String getChannelName(KrakenSubscriptionName subscriptionName, CurrencyPa
String pair = currencyPair.base.toString() + "/" + currencyPair.counter.toString();
return subscriptionName + KRAKEN_CHANNEL_DELIMITER + pair;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.kraken.dto.*;
import info.bitrich.xchangestream.kraken.dto.KrakenEvent;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionConfig;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSystemStatus;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
Expand Down Expand Up @@ -38,6 +42,7 @@
* @author makarid, pchertalev
*/
public class KrakenStreamingService extends JsonNettyStreamingService {

private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class);
private static final String EVENT = "event";
private static final String WEBSOCKET_REQUESTS_PER_SECOND =
Expand Down Expand Up @@ -95,6 +100,7 @@ private static RateLimiter initRateLimiter(KrakenStreamingExchange exchange) {
.limitForPeriod(requestsPerSecond)
.limitRefreshPeriod(Duration.ofSeconds(1))
.build());

}
return rateLimiter;
}
Expand Down Expand Up @@ -125,6 +131,7 @@ public Completable disconnect() {

@Override
protected void handleMessage(JsonNode message) {

String channelName = getChannel(message);

try {
Expand All @@ -146,38 +153,48 @@ protected void handleMessage(JsonNode message) {
LOG.info("System status: {}", systemStatus);
// send to subscribers if any
ObservableEmitter<KrakenEvent> emitter = systemChannels.get(krakenEvent.name());
if (emitter != null) emitter.onNext(systemStatus);

if (emitter != null) {
emitter.onNext(systemStatus);
}
break;
case subscriptionStatus:
LOG.debug("Received subscriptionStatus message {}", message);
KrakenSubscriptionStatusMessage statusMessage =
mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class);
Integer reqid = statusMessage.getReqid();
if (!isPrivate && reqid != null) channelName = subscriptionRequestMap.remove(reqid);
if (!isPrivate && reqid != null) {
channelName = subscriptionRequestMap.remove(reqid);
}
statusMessage.setChannelName(channelName);

Integer channelId = statusMessage.getChannelID();
switch (statusMessage.getStatus()) {
case subscribed:
LOG.info("Channel {} has been subscribed", channelName);
LOG.info("Channel name={}, id={} has been subscribed", channelName, channelId);

if (statusMessage.getChannelID() != null)
channels.put(statusMessage.getChannelID(), channelName);
if (channelId != null) {
channels.put(channelId, channelName);
}

break;
case unsubscribed:
LOG.info("Channel {} has been unsubscribed", channelName);
channels.remove(statusMessage.getChannelID());
LOG.info("Channel name={}, id={} has been unsubscribed", channelName, channelId);
channels.remove(channelId);
break;
case error:
LOG.error(
"Channel {} has been failed: {}", channelName, statusMessage.getErrorMessage());
"Channel name={}, id={} has been failed: {}", channelName, channelId,
statusMessage.getErrorMessage());
if ("ESession:Invalid session".equals(statusMessage.getErrorMessage())) {
throw new ExchangeException("Issue with session validity");
}
}
// send to subscribers if any
emitter = systemChannels.get(krakenEvent.name());
if (emitter != null) emitter.onNext(statusMessage);
if (emitter != null) {
emitter.onNext(statusMessage);
}
break;
case error:
LOG.error(
Expand All @@ -196,7 +213,8 @@ protected void handleMessage(JsonNode message) {
}

if (!message.isArray() || channelName == null) {
LOG.error("Unknown message: {}", message.toString());
LOG.error("Unknown message: isArray={}, name={}, message={}", message.isArray(), channelName,
message);
return;
}

Expand All @@ -207,18 +225,23 @@ protected void handleMessage(JsonNode message) {
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
String channelName = null;
if (message.has("channelID")) {
channelName = channels.get(message.get("channelID").asInt());
int channelId = message.get("channelID").asInt();
return channels.getOrDefault(channelId, String.valueOf(channelId));
}
if (message.has("channelName")) {
channelName = message.get("channelName").asText();
return channelName;
}

if (message.isArray()) {
if (message.get(0).isInt()) {
channelName = channels.get(message.get(0).asInt());
LOG.trace("Taking channelName from ID from first field INT).");
int channelId = message.get(0).asInt();
return channels.getOrDefault(channelId, String.valueOf(channelId));
}
if (message.get(1).isTextual()) {
channelName = message.get(1).asText();
return channelName;
}
}

Expand All @@ -231,6 +254,9 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException
@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());

Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null;

String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);
Expand All @@ -240,27 +266,34 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE

KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
reqID, subscribe, null, new KrakenSubscriptionConfig(subscriptionName, null, token));
reqID, subscribe, null,
new KrakenSubscriptionConfig(subscriptionName, null, interval, token));

return objectMapper.writeValueAsString(subscriptionMessage);
String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage);
return subscriptionMessageString;
} else {
String pair = channelData[1];

subscriptionRequestMap.put(reqID, channelName);
subscriptionRequestMap.put(reqID, getSubscriptionUniqueId(channelName, args));

KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
reqID,
subscribe,
Collections.singletonList(pair),
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null));
return objectMapper.writeValueAsString(subscriptionMessage);
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval,
null));
String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage);
return subscriptionMessageString;
}
}

@Override
public String getUnsubscribeMessage(String channelName, Object... args) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());

Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null;

String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);
Expand All @@ -271,7 +304,7 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I
reqID,
KrakenEventType.unsubscribe,
null,
new KrakenSubscriptionConfig(subscriptionName, null, null));
new KrakenSubscriptionConfig(subscriptionName));
return objectMapper.writeValueAsString(subscriptionMessage);
} else {
String pair = channelData[1];
Expand All @@ -282,14 +315,17 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I
reqID,
KrakenEventType.unsubscribe,
Collections.singletonList(pair),
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null));
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval,
null));
return objectMapper.writeValueAsString(subscriptionMessage);
}
}

@Override
public void sendMessage(String message) {
if (rateLimiter != null) RateLimiter.waitForPermission(rateLimiter);
if (rateLimiter != null) {
RateLimiter.waitForPermission(rateLimiter);
}

super.sendMessage(message);
}
Expand Down Expand Up @@ -333,7 +369,7 @@ static Integer parseOrderBookSize(Object[] args) {
if (args != null && args.length > 0) {
Object obSizeParam = args[0];
LOG.debug("Specified Kraken order book size: {}", obSizeParam);
if (Number.class.isAssignableFrom(obSizeParam.getClass())) {
if (obSizeParam != null && Number.class.isAssignableFrom(obSizeParam.getClass())) {
int obSize = ((Number) obSizeParam).intValue();
if (ArrayUtils.contains(KRAKEN_VALID_ORDER_BOOK_SIZES, obSize)) {
return obSize;
Expand All @@ -348,4 +384,4 @@ static Integer parseOrderBookSize(Object[] args) {
}
return null;
}
}
}
Loading

0 comments on commit 8016740

Please sign in to comment.