Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[question] close real-time stream intentionally #45

Open
thangng48 opened this issue Jul 13, 2022 · 7 comments
Open

[question] close real-time stream intentionally #45

thangng48 opened this issue Jul 13, 2022 · 7 comments

Comments

@thangng48
Copy link

I've checked this #30, but it seems not working properly.

Consider following code snippet, it basically re-subscribes some symbols to deribit exchange every minute. At the end of minute interval, it will break the combined stream --> expect to terminate underlying websockets to deribit. So, spinning new process will not flood the deribit server.
But, at the next iterations, deribit return 429 error code, that indicates the rate limit error.

@thaaddeus do we anyway to shutdown the stream or at least terminate the running websockets ?

const {
  combine,
  normalizeDerivativeTickers,
  normalizeLiquidations, normalizeOptionsSummary,
  normalizeTrades,
  streamNormalized
}  = require("tardis-dev");


async function* breakTick(end) {
  for (;!end.__break__;) {
    await new Promise((resolve) => setTimeout(resolve, 60000));
    yield {__break__: true}
  }
}

async function deribitStream() {
  const symbols = [
    "BTC-29JUL22-35000-C",
    "BTC-29JUL22-70000-C",
    "BTC-29JUL22-34000-P",
    "BTC-29JUL22-30000-C"
  ]

  const channels = ["trade", "derivative_ticker", "liquidation", "option_summary"]
  const end = {__break__: false}
  const streams = []

  for (const symbol of symbols){
    for (const channel of channels){
      const options = {
        exchange: "deribit",
        symbols: [symbol],
        withDisconnectMessages: true,
        onError: (err) => {
          console.log("Error subscribing to channel", {
            channel,
            exchange: "deribit",
            error: err,
          })
        }
      }
      let stream;
      switch (channel) {
        case "trade": {
          stream = streamNormalized(options, normalizeTrades)
          break
        }

        case "derivative_ticker": {
          stream = streamNormalized(options, normalizeDerivativeTickers)
          break
        }

        case "liquidation": {
          // Liquidations happen not frequently, disable feed timeout
          options.timeoutIntervalMS = 0

          stream = streamNormalized(options, normalizeLiquidations)
          break
        }

        case "option_summary": {
          stream = streamNormalized(options, normalizeOptionsSummary)
          break
        }
      }
      streams.push(stream)
    }
  }
  streams.push(breakTick(end))
  const stream = combine(...streams)

  for await (const message of stream){
    if (message.__break__){
      end.__break__ = true
      console.log("end stream")
      break
    }
    console.log(message)
  }
}

(async () => {
  for (;;){
    await deribitStream();
  }
})()
@tardis-dev tardis-dev deleted a comment from tadeuszwojcik Jul 13, 2022
@thaaddeus
Copy link
Member

best to add:

process.env.DEBUG = 'tardis-dev*'
at the beginning of the script so you'll be able to see if connections get closed or not.

@thangng48
Copy link
Author

@thaaddeus

just some of connections were closed, here is detail logs:

  • estabilished connection logs:
    image

  • after break:
    image

So, a lot of connections are being kept, that leads to the rate limit error.

@thaaddeus
Copy link
Member

I can see the issue, unfortunately don't see an easy way to fix it, the issue is that code assumes that those individual streams push messages frequently and require next message to be pushed for a given connection to be closed.
This is unfortunate but with current design that is using async generators it's unsolvable right now, see tc39/proposal-async-iteration#126
In practice what I'd suggest to do is to combine those low frequency streams into one stream/WS connection.

@thangng48
Copy link
Author

@thaaddeus Can you add methods to allow disconnecting the underlying websocket directly ?

@kovetskiy
Copy link

@thaaddeus any updates on methods for intentional disconnects? Due to the lack of disconnect methods on the tardis-node side, an application that uses tardis-node can't gracefully shutdown when receiving such a signal because it can't be sure that the underlying ws connection is properly disconnected.

@thaaddeus
Copy link
Member

I don't have any updates yet, pull requests welcome.

@lostless13
Copy link

Agreed with @kovetskiy. The ability to end streams gracefully would be useful. Looking through the code, the obvious approach is wrapping the _stream generator methods in a class is probably the correct approach to expose the close commands on the underlying websockets on the RealTimeStream classes. However, that change would cause a decent amount of breaking changes up to the library interface.

A hack might be to just keep a list of the open sockets in an array or object that is exposed to the library client allowing the client to kill the socket but that situation would still need to be handled by the generators.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants