Skip to content

Commit

Permalink
Update OkexStreamingService.java
Browse files Browse the repository at this point in the history
  • Loading branch information
lvxiao1 authored Nov 22, 2024
1 parent b22c017 commit d8d5db8
Showing 1 changed file with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,18 @@ public class OkexStreamingService extends JsonNettyStreamingService {

private final ExchangeSpecification xSpec;

private volatile boolean loginDone = false;

private volatile boolean needToResubscribeChannels = false;

public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) {
super(apiUrl);
this.xSpec = exchangeSpecification;
}

@Override
public Completable connect() {
loginDone = xSpec.getApiKey() == null;
Completable conn = super.connect();
return conn.andThen(
(CompletableSource)
Expand All @@ -82,6 +87,14 @@ public Completable connect() {
});
}

@Override
public void resubscribeChannels() {
needToResubscribeChannels = true;
if (loginDone) {
super.resubscribeChannels();
}
}

public void login() throws JsonProcessingException {
Mac mac;
try {
Expand Down Expand Up @@ -123,6 +136,18 @@ public void messageHandler(String message) {
LOG.error("Error parsing incoming message to JSON: {}", message);
return;
}
// Retry after a successful login
if (jsonNode.has("event")) {
String event = jsonNode.get("event").asText();
if ("login".equals(event)) {
loginDone = true;
if (needToResubscribeChannels) {
this.resubscribeChannels();
needToResubscribeChannels = false;
}
return;
}
}

if (processArrayMessageSeparately() && jsonNode.isArray()) {
// In case of array - handle every message separately.
Expand Down

0 comments on commit d8d5db8

Please sign in to comment.