-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy path05-enable-parallel-trading.Rmd
534 lines (400 loc) · 19.8 KB
/
05-enable-parallel-trading.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# Enable parallel trading on multiple symbols
## Objectives
- design supervision tree that will allow trading using multiple traders in parallel per symbol
- update application supervisor
- implement `Naive.Server`
- implement `Naive.SymbolSupervisor`
## Introduction - architectural design
In the second chapter, we implemented a basic trader which goes through the trading cycle. Inside the iEx session, we were starting the `Naive.Trader` process using the `start_link/1` function:
```{r, fig.align="center", out.width="25%", echo=FALSE}
knitr::include_graphics("images/chapter_05_01_current_state.png")
```
The `GenServer.start_link/3` creates a link between IEx's process and new `Naive.Trader` process. Whenever a trader process terminates(either finishes the trading cycle or there was an error), a new one won't get started as there's no supervision at all.
We can do much better than that with a little bit of help from Elixir and OTP.
\newpage
Let's introduce a supervisor above our trader process. It will start a new trader process whenever the previous one finished/crashed:
```{r, fig.align="center", out.width="25%", echo=FALSE}
knitr::include_graphics("images/chapter_05_02_supervise_the_trader.png")
```
This looks much better but there are few problems with it. So, when the trader will start to place orders it will be in *some* state(it will hold buy/sell orders) that the supervisor won't be aware of. In case of trader crashing, the supervisor will start a new trader *without* any knowledge of possibly placed orders or any other information from the state(it will be started with a "fresh" state).
To fix that we need to keep a copy of the trader's state outside of the trader process - that's why we will introduce a new server called `Naive.Leader` that will keep track of traders' data:
```{r, fig.align="center", out.width="70%", echo=FALSE}
knitr::include_graphics("images/chapter_05_03_leader_added.png")
```
The `Naive.Leader` will become the interface to start new *traders*. It will call the `start_child/1` function of the Supervisor, then consequently `DynamicTraderSupervisor` will call the `start_link/1` function of our `Naive.Trader` module.
We can also see that our `Naive.Trader`'s are now started with the `temporary restart` option. Setting this option will disallow the Supervisor from restarting the traders on its own. The responsibility of restarting traders will now be shifted to the leader. The leader will monitor the traders and restart them to a correct state when any crashes.
As trader state will get updated, it will notify the leader about its new state to be stored. This way whenever a trader process would crash, the leader will be able to start a new trader process with the last known state.
This setup will also allow us to start and supervise multiple traders for a single symbol which our naive strategy will require in the future(next chapter).
\newpage
For each symbol that we will be trading on we need the above trio of services(Leader + DynamicTraderSupervisor + Trader), to effectively initialize(and supervise) them we will add an `Naive.SymbolSupervisor` that will start both `Naive.Leader` and `Naive.Dynamic`:
```{r, fig.align="center", out.width="40%", echo=FALSE}
knitr::include_graphics("images/chapter_05_04_symbol_sup.png")
```
We will need multiple symbol supervisors, one for each symbol that we would like to trade on. As with traders, they will be dynamically started on demand, this should give us a hint that we need another dynamic supervisor that will supervise symbol supervisors and will be the direct child of our `Naive.Supervisor`(`Naive.Application` module):
```{r, fig.align="center", out.width="70%", echo=FALSE}
knitr::include_graphics("images/chapter_05_05_full.png")
```
You could ask yourself why we don't need some additional server to track which symbols are traded at the moment (in the same way as `Naive.Leader` tracks `Naive.Trader`s). The answer is that we don't need to track them as we register all `Naive.SymbolSupervisor`s with a name containing a symbol that they trade on. This way we will always be able to refer to them by registered name instead of PIDs/refs.
Here's what happens starting from the top of the graph:
* the `Naive.Application` is our top-level application's supervisor for the `naive` app, it was auto-generated as a part of the `naive` app
* it has a single child `Naive.DynamicSymbolSupervisor`, which has strategy one_for_one and all of its children are `Naive.SymbolSupervisor`s
* `Naive.SymbolSupervisor` process will start two further children: the `Naive.Leader` and `DynamicTraderSupervisor`, both created on init
* the `Naive.Leader` will ask `DynamicTraderSupervisor` to start the `Naive.Trader` child process(es)
This can be a little bit confusing at the moment but it will get a lot easier
as we will write the code. Let's get to it!
### Update application supervisor
Let's start by adding a `Naive.DynamicSymbolSupervisor` and a server to the children list of the `Naive.Application` supervisor:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/application.ex
def start(_type, _args) do
children = [
{
DynamicSupervisor,
strategy: :one_for_one,
name: Naive.DynamicSymbolSupervisor
}
]
...
end
```
### Add interface method
We will now add an interface method to the `Naive` module that will instruct
`Naive.DynamicSymbolSupervisor` to start `Naive.SymbolSupervisor`(to be implemented next) as its child:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive.ex
def start_trading(symbol) do
symbol = String.upcase(symbol)
{:ok, _pid} =
DynamicSupervisor.start_child(
Naive.DynamicSymbolSupervisor,
{Naive.SymbolSupervisor, symbol}
)
end
```
## Implement `Naive.SymbolSupervisor`
Next, time for the `Naive.SymbolSupervisor`, the first step will be to create a file called `symbol_supervisor.ex` inside `apps/naive/lib/naive` directory. There's no point in using the [DynamicSupervisor](https://hexdocs.pm/elixir/master/DynamicSupervisor.html), as we know the children that we would like to start automatically on init. This is a full implementation of the supervisor and it's a simple as just listing child processes inside the init function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/symbol_supervisor.ex
defmodule Naive.SymbolSupervisor do
use Supervisor
require Logger
def start_link(symbol) do
Supervisor.start_link(
__MODULE__,
symbol,
name: :"#{__MODULE__}-#{symbol}"
)
end
def init(symbol) do
Logger.info("Starting new supervision tree to trade on #{symbol}")
Supervisor.init(
[
{
DynamicSupervisor,
strategy: :one_for_one,
name: :"Naive.DynamicTraderSupervisor-#{symbol}"
},
{Naive.Leader, symbol}
],
strategy: :one_for_all
)
end
end
```
It's advised to keep supervisor processes slim.
We registered the `Naive.SymbolSupervisor` processes with names, which will help us understand the supervision tree inside the observer GUI(it will also allow us to stop those supervisors in the future).
\newpage
As mentioned previously whenever either the `Naive.Leader` or `Naive.DynamicSymbolSupervisor-#{symbol}` would crash we would like to kill the other child process as we won't be able to recover the state - it's just easier to init both again.
## Implement `Naive.Leader`
It's time for the `Naive.Leader` module, again, the first step will be to create a file called the `leader.ex` inside `apps/naive/lib/naive` directory. At this moment it will be a skeleton GenServer implementation just to get the code to compile:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
defmodule Naive.Leader do
use GenServer
def start_link(symbol) do
GenServer.start_link(
__MODULE__,
symbol,
name: :"#{__MODULE__}-#{symbol}"
)
end
def init(symbol) do
{:ok, %{symbol: symbol}}
end
end
```
At this moment we have half of the supervision tree working so we can give it
a spin in iex. Using the observer we will be able to see all processes created when the `start_trading/1` function gets called:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> :observer.start()
```
\newpage
The above function will open a new window looking as follows:
```{r, fig.align="center", out.width="100%", echo=FALSE}
knitr::include_graphics("images/chapter_05_06_new_observer.png")
```
To clearly see the supervision tree we will click on the "Applications" tab at the top - the following tree of processes will be shown on the left:
```{r, fig.align="center", out.width="100%", echo=FALSE}
knitr::include_graphics("images/chapter_05_07_observer_app_list.png")
```
If any other process tree is visible, go to the list on the left and select the `naive` application.
The `Naive.Supervisor` is our `Naive.Application` module(you can confirm that by checking the `name` option send to the `start_link` function inside the module). It starts the `Naive.DynamicSymbolSupervisor`.
We can now call the `Naive.start_trading/1` function couple time to see how the tree will look like with additional processes(go back to the `iex` session):
```{r, engine = 'bash', eval = FALSE}
...
iex(2)> Naive.start_trading("adausdt")
23:14:40.974 [info] Starting new supervision tree to trade on ADAUSDT
{:ok, #PID<0.340.0>}
iex(3)> Naive.start_trading("xrpusdt")
23:15:12.117 [info] Starting new supervision tree to trade on XRPUSDT
{:ok, #PID<0.345.0>}
```
We can see that two new branches were created:
* `SymbolSupervisor-ADAUSDT`
* `SymbolSupervisor-XRPUSDT`
Each of them contains a `Naive.Leader` and `DynamicTraderSupervisor`.
### Updating the `leader` module
Let's jump back to extending a leader implementation to get those traders running.
We will introduce a leader's state that will consist of a symbol, setting, and a list of traders' data. Trader data will hold PID, ref, and state of the trader:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
...
alias Naive.Trader
require Logger
@binance_client Application.compile_env(:naive, :binance_client)
defmodule State do
defstruct symbol: nil,
settings: nil,
traders: []
end
defmodule TraderData do
defstruct pid: nil,
ref: nil,
state: nil
end
```
We will use a `handle_continue` callback which was introduced in Erlang 21 to
initialize the leader asynchronously. To do that we will return a tuple starting with a `:continue` atom from inside the init function:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
def init(symbol) do
{:ok,
%State{
symbol: symbol
}, {:continue, :start_traders}}
end
```
The `Naive.Leader` will fetch symbol settings and based on them, it will build the state for traders so they don't need to fetch the same settings again. It will also start as many traders there were set under chunks key in setting:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# below init()
def handle_continue(:start_traders, %{symbol: symbol} = state) do
settings = fetch_symbol_settings(symbol)
trader_state = fresh_trader_state(settings)
traders = for _i <- 1..settings.chunks,
do: start_new_trader(trader_state)
{:noreply, %{state | settings: settings, traders: traders}}
end
```
Fetching symbol settings will be hardcoded for time being to keep this chapter focused. We will also move the code responsible for fetching tick
size from the `Naive.Trader` to the `Naive.Leader` and hardcode the rest of the values:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
defp fetch_symbol_settings(symbol) do
tick_size = fetch_tick_size(symbol)
%{
symbol: symbol,
chunks: 1,
# -0.12% for quick testing
profit_interval: "-0.0012",
tick_size: tick_size
}
end
defp fetch_tick_size(symbol) do
@binance_client.get_exchange_info()
|> elem(1)
|> Map.get(:symbols)
|> Enum.find(&(&1["symbol"] == symbol))
|> Map.get("filters")
|> Enum.find(&(&1["filterType"] == "PRICE_FILTER"))
|> Map.get("tickSize")
end
```
\newpage
Additionally, we need to create a helper method that we used inside the `handle_continue/2` callback called `fresh_trader_state/1`:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# place this one above the `fetch_symbol_settings` function
defp fresh_trader_state(settings) do
struct(Trader.State, settings)
end
```
Starting a new trader isn't any different from the code that we already wrote to start a new `Naive.SymbolSupervisor`. We need to call the `DynamicSupervisor.start_child/2` function and start to monitor the process:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
defp start_new_trader(%Trader.State{} = state) do
{:ok, pid} =
DynamicSupervisor.start_child(
:"Naive.DynamicTraderSupervisor-#{state.symbol}",
{Naive.Trader, state}
)
ref = Process.monitor(pid)
%TraderData{pid: pid, ref: ref, state: state}
end
```
### Updating the `Naive.Trader` module
Now we can update the `Naive.Trader`, first, we will set restart to be temporary to avoid restarting it by the `Naive.DynamicTraderSupervisor`:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/trader.ex
defmodule Naive.Trader do
use GenServer, restart: :temporary
...
```
Next, we will update the `start_link/1` and `init/1` functions to take the state instead of building it from args:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/trader.ex
def start_link(%State{} = state) do
GenServer.start_link(__MODULE__, state)
end
def init(%State{symbol: symbol} = state) do
symbol = String.upcase(symbol)
Logger.info("Initializing new trader for symbol(#{symbol})")
Phoenix.PubSub.subscribe(
Streamer.PubSub,
"TRADE_EVENTS:#{symbol}"
)
{:ok, state}
end
```
Next, we need to update two `handle_info/2` callbacks that change the state of the `Naive.Trader` process(when placing buy order and when placing sell order). They will need to notify the `Naive.Leader` that the state is changed before returning it:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/trader.ex
...
def handle_info(
...
) do
Logger.info("Placing buy order (#{symbol}@#{price})")
...
new_state = %{state | buy_order: order}
Naive.Leader.notify(:trader_state_updated, new_state)
{:noreply, new_state}
end
def handle_info(
...
) do
...
Logger.info("Buy order filled, placing sell order ...")
...
new_state = %{state | sell_order: order}
Naive.Leader.notify(:trader_state_updated, new_state)
{:noreply, new_state}
end
...
```
### Finalizing `Naive.Leader` implementation
Now we need to get back to the `Naive.Leader` where we will implement the notifying logic. We will start with the notify function that will just call the `Naive.Leader` process:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# below init
def notify(:trader_state_updated, trader_state) do
GenServer.call(
:"#{__MODULE__}-#{trader_state.symbol}",
{:update_trader_state, trader_state}
)
end
```
Now, it's time for a callback function that will handle the trader state update. As this is a `handle_call/3` callback we have access to the trader PID which sent the notification message. We will try to find that trader in the list of traders. If that's successful we will update the cached state for that
trader locally:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# below handle_continue
def handle_call(
{:update_trader_state, new_trader_state},
{trader_pid, _},
%{traders: traders} = state
) do
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warning(
"Tried to update the state of trader that leader is not aware of"
)
{:reply, :ok, state}
index ->
old_trader_data = Enum.at(traders, index)
new_trader_data = %{old_trader_data | :state => new_trader_state}
{:reply, :ok, %{state | :traders =>
List.replace_at(traders, index, new_trader_data)}}
end
end
```
Another callback functions that we will need to provide are two `handle_info/2` functions that will handle the trade finished scenario as well as crashed trader.
\newpage
First, trade finished scenario. As previously, we will try to find the trader data in the traders list. If that's successful, we will start a new trader with a fresh state. We will also overwrite existing trader data locally(as PID, ref, and state changed):
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# below state updated handle_call callback
def handle_info(
{:DOWN, _ref, :process, trader_pid, :normal},
%{traders: traders, symbol: symbol, settings: settings} = state
) do
Logger.info("#{symbol} trader finished trade - restarting")
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warning(
"Tried to restart finished #{symbol} " <>
"trader that leader is not aware of"
)
{:noreply, state}
index ->
new_trader_data = start_new_trader(fresh_trader_state(settings))
new_traders = List.replace_at(traders, index, new_trader_data)
{:noreply, %{state | traders: new_traders}}
end
end
```
Here we will assume that whenever the reason that the `Naive.Trader` process died is `:normal` that means that we stopped it after trade cycle finished.
The final callback that we need to provide will handle the scenario where the trader crashed. We would like to find the cached state of the crashed trader and start a new one with the same state and then update the local cache as PID and ref will change for that trader:
```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/leader.ex
# below trade finished handle_info callback
def handle_info(
{:DOWN, _ref, :process, trader_pid, reason},
%{traders: traders, symbol: symbol} = state
) do
Logger.error("#{symbol} trader died - reason #{reason} - trying to restart")
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warning(
"Tried to restart #{symbol} trader " <>
"but failed to find its cached state"
)
{:noreply, state}
index ->
trader_data = Enum.at(traders, index)
new_trader_data = start_new_trader(trader_data.state)
new_traders = List.replace_at(traders, index, new_trader_data)
{:noreply, %{state | traders: new_traders}}
end
end
```
### IEx testing
That finishes the implementation part, let's jump into the IEx session to see how it works.
We will start the observer first, then we will start trading on any valid symbol.
When our trader will start, you should be able to right-click and select "Kill process"(leave the reason as kill) and click "OK". At that moment you should see that the PID of the trader changed and we can also see a log message from the leader.
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> :observer.start()
:ok
iex(2)> Naive.start_trading("xrpusdt")
00:04:35.041 [info] Starting new supervision tree to trade on XRPUSDT
{:ok, #PID<0.455.0>}
00:04:37.697 [info] Initializing new trader for XRPUSDT
iex(3)>
00:08:01.476 [error] XRPUSDT trader died - trying to restart
00:08:01.476 [info] Initializing new trader for XRPUSDT
```
[Note] Please remember to run the `mix format` to keep things nice and tidy.
The source code for this chapter can be found on [GitHub](https://github.com/Cinderella-Man/hands-on-elixir-and-otp-cryptocurrency-trading-bot-source-code/tree/chapter_05)