-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy path01-stream-crypto-prices.Rmd
394 lines (296 loc) · 15.6 KB
/
01-stream-crypto-prices.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# Stream live cryptocurrency prices from the Binance WSS
## Objectives
- create a new umbrella app
- create a supervised application inside an umbrella
- connect to Binance's WebSocket Stream using the WebSockex module
- define a TradeEvent struct that will hold incoming data
- decode incoming events using the Jason module
## Create a new umbrella app
As we are starting from scratch, we need to create a new umbrella project:
```{r, engine = 'bash', eval = FALSE}
mix new hedgehog --umbrella
```
## Create a supervised application inside an umbrella
We can now proceed with creating a new supervised application called `streamer` inside our umbrella:
```{r, engine = 'bash', eval = FALSE}
cd hedgehog/apps
mix new streamer --sup
```
## Connect to Binance's WebSocket Stream using the WebSockex module
To establish a connection to Binance API's stream, we will need to use a WebSocket client. The module that we will use is called [WebSockex](https://github.com/Azolo/websockex). Scrolling down to the `Installation` section inside the module's readme on Github, we are instructed what dependency we need to add to our project.
We will append `:websockex` to the `deps` function inside the `mix.exs` file of the `streamer` application:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/mix.exs
defp deps do
[
{:websockex, "~> 0.4"}
]
end
```
As we added a dependency to our project, we need to fetch it using `mix deps.get`.
We can now progress with creating a module that will be responsible for streaming. We will create a new file called `binance.ex` inside the `apps/streamer/lib/streamer` directory.
From the readme of [WebSockex](https://github.com/Azolo/websockex) module, we can see that to use it we need to create a module that will implement the `WebSockex` behavior:
```{r, engine = 'elixir', eval = FALSE}
# WebSockex's readme
defmodule WebSocketExample do
use WebSockex
def start_link(url, state) do
WebSockex.start_link(url, __MODULE__, state)
end
def handle_frame({type, msg}, state) do
IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
{:ok, state}
end
def handle_cast({:send, {type, msg} = frame}, state) do
IO.puts "Sending #{type} frame with payload: #{msg}"
{:reply, frame, state}
end
end
```
We will copy the whole code above across to our new `binance.ex` file.
\newpage
The first step will be to update the module name to match our file name:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
defmodule Streamer.Binance do
```
In the spirit of keeping things tidy - we will now remove the `handle_cast/2` function (the last function in our module) as we won't be sending any messages back to Binance via WebSocket (to place orders etc - Binance provides a REST API which we will use in the next chapter).
Next, let's look up what URL should we use to connect to Binance's API. Binance has a separate WSS (Web Socket Streams) documentation at [Github](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md).
Scrolling down we can see the `General WSS information` section where 3 important pieces of information are listed:
* The base endpoint is: `wss://stream.binance.com:9443`
* Raw streams are accessed at `/ws/<streamName>`
* All symbols for streams are *lowercase*
We can see that the full endpoint for raw streams(we will be using a "raw" stream) will be `wss://stream.binance.com:9443/ws/` with stream name at the end (together with lowercased symbol).
Note: In the context of Binance API, "raw" means that no aggregation was performed before broadcasting the data on WebSocket.
Let's introduce a module attribute that will hold the full raw stream endpoint which will be used across the module:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
@stream_endpoint "wss://stream.binance.com:9443/ws/"
```
Now back in [Binance's WSS documentation](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md) we need to search for "Trade Streams". "trade" in the context of this documentation means an exchange of assets(coins/tokens) by two sides (buyer and seller). Our future trading strategy will be interested in the "latest price" which is simply the last trade event's price.
We can see that docs are pointing to the following stream name:
```
Stream Name: <symbol>@trade
```
Together, our full URL looks like: "wss://stream.binance.com:9443/ws/<symbol>@trade".
To give a concrete example: the raw trade events stream URL for symbol XRPUSDT is:
"wss://stream.binance.com:9443/ws/xrpusdt@trade" (remember that symbols need to be lowercased, otherwise no trade events will get streamed - there's *no* error).
\newpage
Back to the IDE, we will now modify the `start_link/2` function to use Binance API's URL:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
def start_link(symbol) do
symbol = String.downcase(symbol)
WebSockex.start_link(
"#{@stream_endpoint}#{symbol}@trade",
__MODULE__,
nil
)
end
```
Instead of passing an URL, we modified the function to accept a `symbol`, downcase it and use it together with the module's `@stream_endpoint` attribute to build a full URL.
At this moment streaming of trade events already works which we can test using `iex`:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> Streamer.Binance.start_link("xrpusdt")
{:ok, #PID<0.335.0>}
Received Message - Type: :text -- Message: "{\"e\":\"trade\", \"E\":1603226394741,
\"s\":\"XRPUSDT\",\"t\":74608889,\"p\":\"0.24373000\",\"q\":\"200.00000000\",
\"b\":948244411,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"
```
We can see the messages logged above because we copied the sample implementation from [WebSockex's readme](https://github.com/Azolo/websockex) where `handle_frame/2` function uses `IO.puts/1` to print out all incoming data. The lesson here is that every incoming message from Binance will cause the `handle_frame/2` callback to be called with the message and the process' state.
Just for reference, our module should look currently as follows:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
defmodule Streamer.Binance do
use WebSockex
@stream_endpoint "wss://stream.binance.com:9443/ws/"
def start_link(symbol) do
symbol = String.downcase(symbol)
WebSockex.start_link(
"#{@stream_endpoint}#{symbol}@trade",
__MODULE__,
nil
)
end
def handle_frame({type, msg}, state) do
IO.puts "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
{:ok, state}
end
end
```
## Decode incoming events using the Jason module
Currently, all incoming data from WebSocket is encoded as a JSON. To decode JSON we will use the [jason](https://github.com/michalmuskala/jason) module.
Scrolling down to the `Installation` section inside the module's readme, we can see that we need to add it to the dependencies and we can start to use it right away.
Let's open the `mix.exs` file of the `streamer` application and append the `:jason` dependency to the list inside `deps` function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/mix.exs
defp deps do
[
{:jason, "~> 1.2"},
{:websockex, "~> 0.4"}
]
end
```
As previously, don't forget to run `mix deps.get` to fetch the new dependency.
Looking through the documentation of the Jason module we can see `encode!/2` and `decode!/2` functions, both of them have exclamation marks which indicate that they will throw an error whenever they will be unable to successfully encode or decode the passed value.
This is less than perfect for our use case as we would like to handle those errors in our own way(technically we could just use try/rescue but as we will find out both `encode/2` and `decode/2` are available).
We will go a little bit off-topic but I would highly recommend those sorts of journeys around somebody's code. Let's look inside the [Jason](https://github.com/michalmuskala/jason/blob/master/lib/jason.ex) module. Scrolling down in search of `decode/2` (without the exclamation mark) we can see it about line 54:
```{r, engine = 'elixir', eval = FALSE}
# /lib/jason.ex
def decode(input, opts \\ []) do
input = IO.iodata_to_binary(input)
Decoder.parse(input, format_decode_opts(opts))
end
```
\newpage
It looks like it uses the `parse/2` function of a `Decoder` module, let's scroll back up and check where it's coming from. At line 6:
```{r, engine = 'elixir', eval = FALSE}
# /lib/jason.ex
alias Jason.{Encode, Decoder, DecodeError, EncodeError, Formatter}
```
we can see that `Decoder` is an alias of the [`Jason.Decoder`](https://github.com/michalmuskala/jason/blob/master/lib/decoder.ex). Scrolling down to the `Jason.Decoder` module we will find a `parse/2` function about line 43:
```{r, engine = 'elixir', eval = FALSE}
# /lib/decoder.ex
def parse(data, opts) when is_binary(data) do
key_decode = key_decode_function(opts)
string_decode = string_decode_function(opts)
try do
value(data, data, 0, [@terminate], key_decode, string_decode)
catch
{:position, position} ->
{:error, %DecodeError{position: position, data: data}}
{:token, token, position} ->
{:error, %DecodeError{token: token, position: position, data: data}}
else
value ->
{:ok, value}
end
end
```
Based on the result of decoding it will either return `{:ok, value}` or `{:error, %Decode.Error{...}}` we can confirm that by digging through documentation of the module on the [hexdocs](https://hexdocs.pm/jason/Jason.html#decode/2).
Once again, the point of this lengthy investigation was to show that Elixir code is readable and easy to understand so don't be thrown off when documentation is a little bit light, quite opposite, contribute to docs and code as you gain a better understanding of the codebase.
We can now get back to our `Streamer.Binance` module and modify the `handle_frame/2` function to decode the incoming JSON message. Based on the result of `Jason.decode/2` we will either call the `process_event/2` function or log an error. Here's the new version of the `handle_frame/2` function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
def handle_frame({_type, msg}, state) do
case Jason.decode(msg) do
{:ok, event} -> process_event(event)
{:error, _} -> Logger.error("Unable to parse msg: #{msg}")
end
{:ok, state}
end
```
Please make note that `type` is now prefixed with an underscore as we aren't using it at the moment.
The second important thing to note is that we are using `Logger` so it needs to be `require`d at the beginning of the module:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
require Logger
```
Before implementing the `process_event/2` function we need to create a structure that will hold the incoming trade event's data.
Let's create a new directory called `binance` inside the `apps/streamer/lib/streamer/` and a new file called `trade_event.ex` inside it.
Our new module will hold all the trade event's information but we will also use readable field names(you will see the incoming data below). We can start by writing a skeleton module code:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance/trade_event.ex
defmodule Streamer.Binance.TradeEvent do
defstruct []
end
```
We can refer to [Binance's docs](https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md#trade-streams) to get a list of fields:
```
{
"e": "trade", // Event type
"E": 123456789, // Event time
"s": "BNBUSDT", // Symbol
"t": 12345, // Trade ID
"p": "0.001", // Price
"q": "100", // Quantity
"b": 88, // Buyer order ID
"a": 50, // Seller order ID
"T": 123456785, // Trade time
"m": true, // Is the buyer the market maker?
"M": true // Ignore
}
```
Let's copy them across and convert the comments to update the `defstruct` inside the
`Streamer.Binance.TradeEvent` module's struct to following:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance/trade_event.ex
defstruct [
:event_type,
:event_time,
:symbol,
:trade_id,
:price,
:quantity,
:buyer_order_id,
:seller_order_id,
:trade_time,
:buyer_market_maker
]
```
That's all for this struct, we can now get back to implementing the `process_event/2` function inside the `Streamer.Binance` module. We will map every field of the response map to the `%Streamer.Binance.TradeEvent` struct. A useful trick here would be to copy the list of fields once again from the struct and assign the incoming fields one by one.
Inside the header of the function, we will pattern match on event type(a field called "e" in the message) to confirm that indeed we received a trade event). In the end, the `process_event/2` function should look as follows:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer/binance.ex
defp process_event(%{"e" => "trade"} = event) do
trade_event = %Streamer.Binance.TradeEvent{
:event_type => event["e"],
:event_time => event["E"],
:symbol => event["s"],
:trade_id => event["t"],
:price => event["p"],
:quantity => event["q"],
:buyer_order_id => event["b"],
:seller_order_id => event["a"],
:trade_time => event["T"],
:buyer_market_maker => event["m"]
}
Logger.debug(
"Trade event received " <>
"#{trade_event.symbol}@#{trade_event.price}"
)
end
```
We added the `Logger.debug/2` function to be able to see logs of incoming trade events.
\newpage
Lastly, before testing our implementation, let's add a nice interface to our `streamer` application that allows starting streaming:
```{r, engine = 'elixir', eval = FALSE}
# /apps/streamer/lib/streamer.ex
defmodule Streamer do
@moduledoc """
Documentation for `Streamer`.
"""
def start_streaming(symbol) do
Streamer.Binance.start_link(symbol)
end
end
```
The final version of the `Streamer.Binance` module should look like [this](https://github.com/Cinderella-Man/hands-on-elixir-and-otp-cryptocurrency-trading-bot-source-code/blob/chapter_01/apps/streamer/lib/streamer/binance.ex).
The last step will be to add the `Logger` configuration into the main `config/config.exs` file. We will set the `Logger` level to `:debug` for a moment to be able to see incoming trade events:
```{r, engine = 'elixir', eval = FALSE}
# /config/config.exs
config :logger,
level: :debug
```
This finishes the implementation part of this chapter, we can now give our implementation a whirl using `iex`:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> Streamer.start_streaming("xrpusdt")
{:ok, #PID<0.251.0>}
23:14:32.217 [debug] Trade event received [email protected]
23:14:33.381 [debug] Trade event received [email protected]
23:14:35.380 [debug] Trade event received [email protected]
23:14:36.386 [debug] Trade event received [email protected]
```
As we can see, the streamer is establishing a WebSocket connection with Binance's API and its receiving trade events. It decodes them from JSON to `%Streamer.Binance.TradeEvent` struct and logs a compiled message. Also, our interface hides implementation details from the "user" of our application.
We will now flip the `Logger` level back to `info` so the output won't every incoming trade event:
```{r, engine = 'elixir', eval = FALSE}
# /config/config.exs
config :logger,
level: :info
```
[Note] Please remember to run the `mix format` to keep things nice and tidy.
The source code for this chapter can be found on [GitHub](https://github.com/Cinderella-Man/hands-on-elixir-and-otp-cryptocurrency-trading-bot-source-code/tree/chapter_01)