Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kraken: add support for OHLC Streaming #4826

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
package info.bitrich.xchangestream.kraken;

import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.*;
import static info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.createCrcChecksum;

import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.*;
import com.google.common.collect.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import java.math.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.stream.*;
import org.knowm.xchange.dto.*;
import org.knowm.xchange.dto.marketdata.*;
import org.knowm.xchange.dto.trade.*;
import org.knowm.xchange.instrument.*;
import org.knowm.xchange.kraken.*;
import org.knowm.xchange.kraken.dto.trade.*;
import org.knowm.xchange.utils.*;
import org.slf4j.*;
import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.instrument.Instrument;
import org.knowm.xchange.kraken.KrakenAdapters;
import org.knowm.xchange.kraken.dto.trade.KrakenType;
import org.knowm.xchange.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Kraken streaming adapters */
public class KrakenStreamingAdapters {

private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingAdapters.class);

static final String ASK_SNAPSHOT = "as";
Expand Down Expand Up @@ -223,6 +234,22 @@ public static Trade adaptTrade(Instrument instrument, JsonNode arrayNode) {
.build();
}

public static KrakenStreamingOhlc adaptOhlc(Instrument instrument, ArrayNode arrayNode) {
ArrayNode data = (ArrayNode) arrayNode.get(1);

return new KrakenStreamingOhlc(
(long) (Double.parseDouble(data.get(0).textValue()) * 1000), // time in millis since epoch
(long) (Double.parseDouble(data.get(1).textValue()) * 1000),
// etime time in millis since epoch
arrayNodeItemAsDecimal(data, 2), // open
arrayNodeItemAsDecimal(data, 3), // high
arrayNodeItemAsDecimal(data, 4), // low
arrayNodeItemAsDecimal(data, 5), // close
arrayNodeItemAsDecimal(data, 6), // vwap
arrayNodeItemAsDecimal(data, 7), // volume
arrayNodeItemAsDecimal(data, 8).longValue()); // count
}

/**
* Returns the element at index in arrayNode as a BigDecimal. Retuns null if the arrayNode is null
* or index does not exist.
Expand Down Expand Up @@ -270,4 +297,4 @@ private static Order.OrderType nextNodeAsOrderType(Iterator<JsonNode> iterator)
}
return KrakenAdapters.adaptOrderType(KrakenType.fromString(iterator.next().textValue()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.kraken.dto.KrakenStreamingOhlc;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.kraken.dto.marketdata.KrakenOHLC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +94,20 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
KrakenStreamingAdapters.adaptTrades(currencyPair, arrayNode)));
}


public Observable<KrakenStreamingOhlc> getOHLC(
CurrencyPair currencyPair, Integer interval) {
String channelName = getChannelName(KrakenSubscriptionName.ohlc, currencyPair);
// args[0] is reserved for an optional order boo depth, we'll use args[1] for the interval
Object[] args = new Object[2];
args[0] = null;
args[1] = interval;

return subscribe(channelName, MIN_DATA_ARRAY_SIZE, args)
.map( arrayNode -> KrakenStreamingAdapters.adaptOhlc(currencyPair, arrayNode));
}


public Observable<ArrayNode> subscribe(String channelName, int maxItems, Object... args) {
return service
.subscribeChannel(channelName, args)
Expand All @@ -112,4 +131,4 @@ public String getChannelName(KrakenSubscriptionName subscriptionName, CurrencyPa
String pair = currencyPair.base.toString() + "/" + currencyPair.counter.toString();
return subscriptionName + KRAKEN_CHANNEL_DELIMITER + pair;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.kraken.dto.*;
import info.bitrich.xchangestream.kraken.dto.KrakenEvent;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionConfig;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSubscriptionStatusMessage;
import info.bitrich.xchangestream.kraken.dto.KrakenSystemStatus;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenEventType;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
Expand Down Expand Up @@ -38,6 +42,7 @@
* @author makarid, pchertalev
*/
public class KrakenStreamingService extends JsonNettyStreamingService {

private static final Logger LOG = LoggerFactory.getLogger(KrakenStreamingService.class);
private static final String EVENT = "event";
private static final String WEBSOCKET_REQUESTS_PER_SECOND =
Expand Down Expand Up @@ -95,6 +100,7 @@ private static RateLimiter initRateLimiter(KrakenStreamingExchange exchange) {
.limitForPeriod(requestsPerSecond)
.limitRefreshPeriod(Duration.ofSeconds(1))
.build());

}
return rateLimiter;
}
Expand Down Expand Up @@ -125,6 +131,7 @@ public Completable disconnect() {

@Override
protected void handleMessage(JsonNode message) {

String channelName = getChannel(message);

try {
Expand All @@ -146,38 +153,48 @@ protected void handleMessage(JsonNode message) {
LOG.info("System status: {}", systemStatus);
// send to subscribers if any
ObservableEmitter<KrakenEvent> emitter = systemChannels.get(krakenEvent.name());
if (emitter != null) emitter.onNext(systemStatus);

if (emitter != null) {
emitter.onNext(systemStatus);
}
break;
case subscriptionStatus:
LOG.debug("Received subscriptionStatus message {}", message);
KrakenSubscriptionStatusMessage statusMessage =
mapper.treeToValue(message, KrakenSubscriptionStatusMessage.class);
Integer reqid = statusMessage.getReqid();
if (!isPrivate && reqid != null) channelName = subscriptionRequestMap.remove(reqid);
if (!isPrivate && reqid != null) {
channelName = subscriptionRequestMap.remove(reqid);
}
statusMessage.setChannelName(channelName);

Integer channelId = statusMessage.getChannelID();
switch (statusMessage.getStatus()) {
case subscribed:
LOG.info("Channel {} has been subscribed", channelName);
LOG.info("Channel name={}, id={} has been subscribed", channelName, channelId);

if (statusMessage.getChannelID() != null)
channels.put(statusMessage.getChannelID(), channelName);
if (channelId != null) {
channels.put(channelId, channelName);
}

break;
case unsubscribed:
LOG.info("Channel {} has been unsubscribed", channelName);
channels.remove(statusMessage.getChannelID());
LOG.info("Channel name={}, id={} has been unsubscribed", channelName, channelId);
channels.remove(channelId);
break;
case error:
LOG.error(
"Channel {} has been failed: {}", channelName, statusMessage.getErrorMessage());
"Channel name={}, id={} has been failed: {}", channelName, channelId,
statusMessage.getErrorMessage());
if ("ESession:Invalid session".equals(statusMessage.getErrorMessage())) {
throw new ExchangeException("Issue with session validity");
}
}
// send to subscribers if any
emitter = systemChannels.get(krakenEvent.name());
if (emitter != null) emitter.onNext(statusMessage);
if (emitter != null) {
emitter.onNext(statusMessage);
}
break;
case error:
LOG.error(
Expand All @@ -196,7 +213,8 @@ protected void handleMessage(JsonNode message) {
}

if (!message.isArray() || channelName == null) {
LOG.error("Unknown message: {}", message.toString());
LOG.error("Unknown message: isArray={}, name={}, message={}", message.isArray(), channelName,
message);
return;
}

Expand All @@ -207,18 +225,23 @@ protected void handleMessage(JsonNode message) {
protected String getChannelNameFromMessage(JsonNode message) throws IOException {
String channelName = null;
if (message.has("channelID")) {
channelName = channels.get(message.get("channelID").asInt());
int channelId = message.get("channelID").asInt();
return channels.getOrDefault(channelId, String.valueOf(channelId));
}
if (message.has("channelName")) {
channelName = message.get("channelName").asText();
return channelName;
}

if (message.isArray()) {
if (message.get(0).isInt()) {
channelName = channels.get(message.get(0).asInt());
LOG.trace("Taking channelName from ID from first field INT).");
int channelId = message.get(0).asInt();
return channels.getOrDefault(channelId, String.valueOf(channelId));
}
if (message.get(1).isTextual()) {
channelName = message.get(1).asText();
return channelName;
}
}

Expand All @@ -231,6 +254,9 @@ protected String getChannelNameFromMessage(JsonNode message) throws IOException
@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());

Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null;

String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);
Expand All @@ -240,27 +266,34 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE

KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
reqID, subscribe, null, new KrakenSubscriptionConfig(subscriptionName, null, token));
reqID, subscribe, null,
new KrakenSubscriptionConfig(subscriptionName, null, interval, token));

return objectMapper.writeValueAsString(subscriptionMessage);
String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage);
return subscriptionMessageString;
} else {
String pair = channelData[1];

subscriptionRequestMap.put(reqID, channelName);
subscriptionRequestMap.put(reqID, getSubscriptionUniqueId(channelName, args));

KrakenSubscriptionMessage subscriptionMessage =
new KrakenSubscriptionMessage(
reqID,
subscribe,
Collections.singletonList(pair),
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null));
return objectMapper.writeValueAsString(subscriptionMessage);
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval,
null));
String subscriptionMessageString = objectMapper.writeValueAsString(subscriptionMessage);
return subscriptionMessageString;
}
}

@Override
public String getUnsubscribeMessage(String channelName, Object... args) throws IOException {
int reqID = Math.abs(UUID.randomUUID().hashCode());

Integer interval = args != null && args.length > 1 ? (Integer) args[1] : null;

String[] channelData =
channelName.split(KrakenStreamingMarketDataService.KRAKEN_CHANNEL_DELIMITER);
KrakenSubscriptionName subscriptionName = KrakenSubscriptionName.valueOf(channelData[0]);
Expand All @@ -271,7 +304,7 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I
reqID,
KrakenEventType.unsubscribe,
null,
new KrakenSubscriptionConfig(subscriptionName, null, null));
new KrakenSubscriptionConfig(subscriptionName));
return objectMapper.writeValueAsString(subscriptionMessage);
} else {
String pair = channelData[1];
Expand All @@ -282,14 +315,17 @@ public String getUnsubscribeMessage(String channelName, Object... args) throws I
reqID,
KrakenEventType.unsubscribe,
Collections.singletonList(pair),
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), null));
new KrakenSubscriptionConfig(subscriptionName, parseOrderBookSize(args), interval,
null));
return objectMapper.writeValueAsString(subscriptionMessage);
}
}

@Override
public void sendMessage(String message) {
if (rateLimiter != null) RateLimiter.waitForPermission(rateLimiter);
if (rateLimiter != null) {
RateLimiter.waitForPermission(rateLimiter);
}

super.sendMessage(message);
}
Expand Down Expand Up @@ -333,7 +369,7 @@ static Integer parseOrderBookSize(Object[] args) {
if (args != null && args.length > 0) {
Object obSizeParam = args[0];
LOG.debug("Specified Kraken order book size: {}", obSizeParam);
if (Number.class.isAssignableFrom(obSizeParam.getClass())) {
if (obSizeParam != null && Number.class.isAssignableFrom(obSizeParam.getClass())) {
int obSize = ((Number) obSizeParam).intValue();
if (ArrayUtils.contains(KRAKEN_VALID_ORDER_BOOK_SIZES, obSize)) {
return obSize;
Expand All @@ -348,4 +384,4 @@ static Integer parseOrderBookSize(Object[] args) {
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ public class KrakenSubscriptionConfig {
private Integer interval;

public KrakenSubscriptionConfig(KrakenSubscriptionName name) {
this(name, null, null);
this(name, null, null, null);
}

@JsonCreator
public KrakenSubscriptionConfig(
@JsonProperty("name") KrakenSubscriptionName name,
@JsonProperty("depth") Integer depth,
@JsonProperty("interval") Integer interval,
@JsonProperty("token") String token) {
this.name = name;
this.depth = depth;
this.interval = interval;
this.token = token;
}

Expand Down Expand Up @@ -77,4 +79,4 @@ public Integer getInterval() {
public void setInterval(Integer interval) {
this.interval = interval;
}
}
}
Loading
Loading