Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change WS gem #19

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ PATH
remote: .
specs:
intrinio-realtime (5.0.0)
bigdecimal (~> 1.4.0)
bigdecimal (~> 2.0.0)
eventmachine (~> 1.2)
thread (~> 0.2.2)
websocket-client-simple (~> 0.3)
websocket-eventmachine-client (~> 1.3)

GEM
remote: https://rubygems.org/
specs:
bigdecimal (1.4.4)
event_emitter (0.2.6)
bigdecimal (2.0.3)
eventmachine (1.2.7)
eventmachine (1.2.7-x64-mingw32)
thread (0.2.2)
websocket (1.2.9)
websocket-client-simple (0.3.0)
event_emitter
websocket
websocket-eventmachine-base (1.2.0)
eventmachine (~> 1.0)
websocket (~> 1.0)
websocket-native (~> 1.0)
websocket-eventmachine-client (1.3.0)
websocket-eventmachine-base (~> 1.0)
websocket-native (1.0.0)

PLATFORMS
ruby
Expand Down
4 changes: 2 additions & 2 deletions intrinio-realtime.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency "eventmachine", '~> 1.2'
spec.add_dependency "websocket-client-simple", '~> 0.3'
spec.add_dependency "websocket-eventmachine-client", '~> 1.3'
spec.add_dependency "thread", '~> 0.2.2'
spec.add_dependency "bigdecimal", '~> 1.4.0'
spec.add_dependency "bigdecimal", '~> 2.0.0'
end
79 changes: 39 additions & 40 deletions lib/intrinio-realtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#require 'http'
require 'net/http'
require 'eventmachine'
require 'websocket-client-simple'
require 'websocket-eventmachine-client'

module Intrinio
module Realtime
Expand Down Expand Up @@ -144,7 +144,7 @@ def initialize(options, on_trade, on_quote)

@api_key = options[:api_key]
raise "API Key was formatted invalidly." if @api_key && !valid_api_key?(@api_key)

unless @api_key
@username = options[:username]
@password = options[:password]
Expand Down Expand Up @@ -178,7 +178,7 @@ def initialize(options, on_trade, on_quote)
raise "Invalid channels to join: #{bad_channels}" unless bad_channels.empty?

if options[:logger] == false
@logger = nil
@logger = nil
elsif !options[:logger].nil?
@logger = options[:logger]
else
Expand All @@ -196,30 +196,30 @@ def initialize(options, on_trade, on_quote)
def provider
@provider
end

def join(*channels)
channels = parse_channels(channels)
nonconforming = channels.select{|x| !x.is_a?(String)}
return error("Invalid channels to join: #{nonconforming}") unless nonconforming.empty?

@channels.concat(channels)
@channels.uniq!
debug "Joining channels #{channels}"

refresh_channels()
end

def leave(*channels)
channels = parse_channels(channels)
nonconforming = channels.find{|x| !x.is_a?(String)}
return error("Invalid channels to leave: #{nonconforming}") unless nonconforming.empty?

channels.each{|c| @channels.delete(c)}
debug "Leaving channels #{channels}"

refresh_channels()
end

def leave_all
@channels = []
debug "Leaving all channels"
Expand All @@ -230,7 +230,7 @@ def connect
raise "Must be run from within an EventMachine run loop" unless EM.reactor_running?
return warn("Already connected!") if @ready
debug "Connecting..."

catch :fatal do
begin
@closing = false
Expand All @@ -243,7 +243,7 @@ def connect
end
end
end

def disconnect
EM.cancel_timer(@selfheal_timer) if @selfheal_timer
@ready = false
Expand Down Expand Up @@ -272,7 +272,7 @@ def on_trade(on_trade)
def on_quote(on_quote)
@on_quote = on_quote
end

private

def queue_message(message)
Expand Down Expand Up @@ -417,11 +417,11 @@ def refresh_token
@token = response.body
debug "Token refreshed"
end
def auth_url

def auth_url
url = ""

case @provider
case @provider
when REALTIME then url = "https://realtime-mx.intrinio.com/auth"
when DELAYED_SIP then url = "https://realtime-delayed-sip.intrinio.com/auth"
when NASDAQ_BASIC then url = "https://realtime-nasdaq-basic.intrinio.com/auth"
Expand All @@ -443,7 +443,7 @@ def api_auth_url(url)
"#{url}api_key=#{@api_key}"
end

def socket_url
def socket_url
case @provider
when REALTIME then "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
when DELAYED_SIP then "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
Expand Down Expand Up @@ -476,11 +476,11 @@ def refresh_websocket
headers[:headers] = {}
headers[CLIENT_INFO_HEADER_KEY] = CLIENT_INFO_HEADER_VALUE
headers[MESSAGE_VERSION_HEADER_KEY] = MESSAGE_VERSION_HEADER_VALUE
@ws = ws = WebSocket::Client::Simple.connect(socket_url, headers)
@ws = ws = WebSocket::EventMachine::Client.connect(uri: socket_url, headers: headers)

me.send :info, "Connection opening"

ws.on :open do
ws.onopen do
me.send :info, "Connection established"
me.send :ready, true
if PROVIDERS.include?(me.send(:provider))
Expand All @@ -489,8 +489,7 @@ def refresh_websocket
me.send :stop_self_heal
end

ws.on :message do |frame|
data_message = frame.data
ws.onmessage do |data_message|
#me.send :debug, "Message: #{data_message}"
begin
unless data_message.nil?
Expand All @@ -500,24 +499,24 @@ def refresh_websocket
me.send :error, "Error adding message to queue: #{data_message} #{e}"
end
end
ws.on :close do |e|

ws.onclose do |code, reason|
me.send :ready, false
me.send :info, "Connection closing...: #{e}"
me.send :info, "Connection closing...: #{reason}"
me.send :try_self_heal
end

ws.on :error do |e|
ws.onerror do |e|
me.send :ready, false
me.send :error, "Connection error: #{e}"
me.send :try_self_heal
end
end

def refresh_channels
return unless @ready
debug "Refreshing channels"

# Join new channels
new_channels = @channels - @joined_channels
new_channels.each do |channel|
Expand All @@ -527,7 +526,7 @@ def refresh_channels
@ws.send(msg)
info "Joined #{channel}"
end

# Leave old channels
old_channels = @joined_channels - @channels
old_channels.each do |channel|
Expand All @@ -537,31 +536,31 @@ def refresh_channels
@ws.send(msg)
info "Left #{channel}"
end

@channels.uniq!
@joined_channels = Array.new(@channels)
debug "Current channels: #{@channels}"
end

def try_self_heal
return if @closing
debug "Attempting to self-heal"

time = @selfheal_backoffs.first
@selfheal_backoffs.delete_at(0) if @selfheal_backoffs.count > 1

EM.cancel_timer(@selfheal_timer) if @selfheal_timer

@selfheal_timer = EM.add_timer(time/1000) do
connect()
end
end

def stop_self_heal
EM.cancel_timer(@selfheal_timer) if @selfheal_timer
@selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS)
end

def ready(val)
@ready = val
end
Expand All @@ -571,34 +570,34 @@ def debug(message)
@logger.debug(message) rescue
nil
end

def info(message)
message = "IntrinioRealtime | #{message}"
@logger.info(message) rescue
nil
end

def error(message)
message = "IntrinioRealtime | #{message}"
@logger.error(message) rescue
nil
end

def fatal(message)
message = "IntrinioRealtime | #{message}"
@logger.fatal(message) rescue
EM.stop_event_loop
throw :fatal
nil
end

def parse_channels(channels)
channels.flatten!
channels.uniq!
channels.compact!
channels
end

def join_binary_message(channel)
if (channel == "lobby") && (@trades_only == false)
return [74, 0, 36, 70, 73, 82, 69, 72, 79, 83, 69].pack('C*') #74, not trades only, "$FIREHOSE"
Expand Down