Skip to content

Commit

Permalink
Merge pull request #29 from Cinderella-Man/final-stretch
Browse files Browse the repository at this point in the history
Bug fixes/tidy up/updates up to chapter 22
  • Loading branch information
Cinderella-Man authored Nov 2, 2023
2 parents 0b99dc1 + 0f84921 commit 90cc96f
Show file tree
Hide file tree
Showing 38 changed files with 1,735 additions and 1,622 deletions.
6 changes: 3 additions & 3 deletions 05-enable-parallel-trading.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ trader locally:
) do
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn(
Logger.warning(
"Tried to update the state of trader that leader is not aware of"
)
{:reply, :ok, state}
Expand Down Expand Up @@ -458,7 +458,7 @@ First, trade finished scenario. As previously, we will try to find the trader da
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn(
Logger.warning(
"Tried to restart finished #{symbol} " <>
"trader that leader is not aware of"
)
Expand Down Expand Up @@ -489,7 +489,7 @@ The final callback that we need to provide will handle the scenario where the tr
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn(
Logger.warning(
"Tried to restart #{symbol} trader " <>
"but failed to find its cached state"
)
Expand Down
2 changes: 1 addition & 1 deletion 09-run-multiple-parallel-traders.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Moving on to the body of our callback. As with other ones, we will check can we
# body of our callback
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn("Rebuy triggered by trader that leader is not aware of")
Logger.warning("Rebuy triggered by trader that leader is not aware of")
{:reply, :ok, state}
index ->
Expand Down
4 changes: 2 additions & 2 deletions 11-supervise-and-autostart-streaming.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ Next, we will add the `start_streaming/1` function at the bottom of the
{:ok, _pid} = start_streamer(symbol)
pid ->
Logger.warn("Streaming on #{symbol} already started")
Logger.warning("Streaming on #{symbol} already started")
{:ok, _settings} = update_streaming_status(symbol, "on")
{:ok, pid}
end
Expand Down Expand Up @@ -378,7 +378,7 @@ Let's write a `stop_streaming/1` logic inside the `Streamer.DynamicStreamerSuper
def stop_streaming(symbol) when is_binary(symbol) do
case get_pid(symbol) do
nil ->
Logger.warn("Streaming on #{symbol} already stopped")
Logger.warning("Streaming on #{symbol} already stopped")
{:ok, _settings} = update_streaming_status(symbol, "off")
pid ->
Expand Down
14 changes: 7 additions & 7 deletions 12-start-stop-and-autostart-trading.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Our `start_trading/1` implementation is almost the same as one for the `streamer
{:ok, _pid} = start_symbol_supervisor(symbol)
pid ->
Logger.warn("Trading on #{symbol} already started")
Logger.warning("Trading on #{symbol} already started")
{:ok, _settings} = update_trading_status(symbol, "on")
{:ok, pid}
end
Expand Down Expand Up @@ -142,7 +142,7 @@ Stop trading will require a change in two places, first inside the `Naive.Dynami
case get_pid(symbol) do
nil ->
Logger.warn("Trading on #{symbol} already stopped")
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = update_trading_status(symbol, "off")
pid ->
Expand Down Expand Up @@ -309,7 +309,7 @@ Next, we will create a `shutdown_trading/1` function inside the `Naive.DynamicSy
case get_pid(symbol) do
nil ->
Logger.warn("Trading on #{symbol} already stopped")
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = update_trading_status(symbol, "off")
_pid ->
Expand Down Expand Up @@ -367,7 +367,7 @@ Let's look at the updated implementation of the "end of trade" handler:
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn(
Logger.warning(
"Tried to restart finished #{symbol} " <>
"trader that leader is not aware of"
)
Expand All @@ -381,7 +381,7 @@ Let's look at the updated implementation of the "end of trade" handler:
index ->
new_traders =
if settings.status == "shutdown" do # <= refactored code
Logger.warn(
Logger.warning(
"The leader won't start a new trader on #{symbol} " <>
"as symbol is in the 'shutdown' state"
)
Expand Down Expand Up @@ -416,7 +416,7 @@ The second callback that we need to modify is the `rebuy` triggered:
) do
case Enum.find_index(traders, &(&1.pid == trader_pid)) do
nil ->
Logger.warn("Rebuy triggered by trader that leader is not aware of")
Logger.warning("Rebuy triggered by trader that leader is not aware of")
{:reply, :ok, state}
index ->
Expand All @@ -430,7 +430,7 @@ The second callback that we need to modify is the `rebuy` triggered:
updated_traders
else
if settings.status == "shutdown" do
Logger.warn(
Logger.warning(
"The leader won't start a new trader on #{symbol} " <>
"as symbol is in the 'shutdown' state"
)
Expand Down
14 changes: 7 additions & 7 deletions 13-abstract-duplicated-code-using-macros.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ The `fetch_symbols_to_trade/0` will get updated to `fetch_symbols_to_start/0`:
) # ^^^^^^ inlined `start_symbol_supervisor/1`
pid ->
Logger.warn("Trading on #{symbol} already started")
Logger.warning("Trading on #{symbol} already started")
{:ok, _settings} = update_status(symbol, "on") # <= updated name
{:ok, pid}
end
Expand Down Expand Up @@ -150,7 +150,7 @@ Last function to rename in this module will be the `stop_trading/1` to `stop_wor
def stop_worker(symbol) when is_binary(symbol) do # <= updated name
case get_pid(symbol) do
nil ->
Logger.warn("Trading on #{symbol} already stopped")
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= updated name
pid ->
Expand Down Expand Up @@ -213,7 +213,7 @@ We also need to update the `shutdown_trading/1` function as we removed all the p
def shutdown_worker(symbol) when is_binary(symbol) do # <= updated name
case Core.ServiceSupervisor.get_pid(symbol) do # <= module added
nil ->
Logger.warn("Trading on #{symbol} already stopped")
Logger.warning("Trading on #{symbol} already stopped")
{:ok, _settings} = Core.ServiceSupervisor.update_status(symbol, "off")
# ^^^ updated name + module
Expand Down Expand Up @@ -502,7 +502,7 @@ Let's look at updated functions:
# ^^^ args used
pid ->
Logger.warn("#{worker_module} worker for #{symbol} already started")
Logger.warning("#{worker_module} worker for #{symbol} already started")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "on", repo, schema)
{:ok, pid}
Expand All @@ -514,7 +514,7 @@ Let's look at updated functions:
when is_binary(symbol) do
case get_pid(worker_module, symbol) do # <= worker_module passed
nil ->
Logger.warn("#{worker_module} worker for #{symbol} already stopped")
Logger.warning("#{worker_module} worker for #{symbol} already stopped")
# ^^^ dynamic text
{:ok, _settings} = update_status(symbol, "off", repo, schema)
Expand Down Expand Up @@ -615,7 +615,7 @@ That finishes the 3rd round of updates inside the `Core.ServiceSupervisor` modul
def shutdown_worker(symbol) when is_binary(symbol) do
case Core.ServiceSupervisor.get_pid(Naive.SymbolSupervisor, symbol) do # <= arg added
nil ->
Logger.warn("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
# ^^^ updated
{:ok, _settings} =
Expand Down Expand Up @@ -862,7 +862,7 @@ As those will get compiled and "pasted" into the `Naive.DynamicSymbolSupervisor`
def shutdown_worker(symbol) when is_binary(symbol) do
case get_pid(symbol) do # <= macro provided function
nil ->
Logger.warn("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
Logger.warning("#{Naive.SymbolSupervisor} worker for #{symbol} already stopped")
{:ok, _settings} = update_status(symbol, "off") # <= macro provided function
_pid ->
Expand Down
5 changes: 3 additions & 2 deletions 14-store-trade-events-and-orders.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ We can now follow similar steps as previously and add required dependencies (lik
# /apps/data_warehouse/mix.exs
defp deps do
[
{:binance, "~> 1.0"},
{:ecto_sql, "~> 3.0"},
{:ecto_enum, "~> 1.4"},
{:phoenix_pubsub, "~> 2.0"},
Expand All @@ -59,7 +60,7 @@ We can now follow similar steps as previously and add required dependencies (lik
end
```

Additionally, we added the `phoenix_pubsub` module to be able to subscribe to the PubSub topic and the `streamer` application to be able to use its `Streamer.Binance.TradeEvent` struct.
Additionally, we added the `phoenix_pubsub`(to subscribe to the PubSub topic), the `streamer` application(to use its `Streamer.Binance.TradeEvent` struct) and the `binance` package(to pattern match it's structs).

We can now jump back to the terminal to install added dependencies and generate a new `Ecto.Repo` module:

Expand Down Expand Up @@ -800,7 +801,7 @@ Last function in this module will be `stop_worker/1` which uses private `stop_ch
defp stop_child(args) do
case Registry.lookup(@registry, args) do
[{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
_ -> Logger.warn("Unable to locate process assigned to #{inspect(args)}")
_ -> Logger.warning("Unable to locate process assigned to #{inspect(args)}")
end
end
```
Expand Down
4 changes: 2 additions & 2 deletions 16-end-to-end-testing.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ defmodule Core.Struct.TradeEvent do
As we moved the `TradeEvent` struct over to the `Core` application, we need to:

* update all places that reference the `Streamer.Binance.TradeEvent` to `Core.Struct.TradeEvent`
* add the `core` to the dependencies list of the `streamer` application
* remove the `streamer` from the dependencies list of all apps in the umbrella
* add the `core` to the dependencies lists of all apps in the umbrella
* remove the `streamer` from the dependencies lists of all apps in the umbrella



Expand Down
43 changes: 23 additions & 20 deletions 18-functional-elixir.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -295,53 +295,56 @@ The second function deals with the race condition when multiple transactions fil
},
sell_order: %Binance.OrderResponse{}
}
) do
)
when is_number(order_id) do
:skip
end
```

### Place a sell order rules
### Fetch the buy order rules

We will follow the same logic for the 3rd clause of the `generate_decision/2` function. We will leave only the sell price calculation as it's pure and return a tuple together with the decision:
For the 3th clause, we will return only an atom as there's no pure logic besides the pattern-match in the header itself:

```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/strategy.ex
# the third clause
def generate_decision(
%TradeEvent{},
%TradeEvent{
buyer_order_id: order_id
},
%State{
buy_order: %Binance.OrderResponse{
status: "FILLED",
price: buy_price
order_id: order_id
},
sell_order: nil,
profit_interval: profit_interval,
tick_size: tick_size
sell_order: nil
}
) do
sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
{:place_sell_order, sell_price}
)
when is_number(order_id) do
:fetch_buy_order
end
```

### Fetch the buy order rules
### Place a sell order rules

For the 4th clause, we will return only an atom as there's no pure logic besides the pattern-match in the header itself:
We will follow the same logic for the 4th clause of the `generate_decision/2` function. We will leave only the sell price calculation as it's pure and return a tuple together with the decision:

```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/strategy.ex
# the fourth clause
def generate_decision(
%TradeEvent{
buyer_order_id: order_id
},
%TradeEvent{},
%State{
buy_order: %Binance.OrderResponse{
order_id: order_id
}
status: "FILLED",
price: buy_price
},
sell_order: nil,
profit_interval: profit_interval,
tick_size: tick_size
}
) do
:fetch_buy_order
sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
{:place_sell_order, sell_price}
end
```

Expand Down
14 changes: 11 additions & 3 deletions 19-idiomatic-elixir.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ As each worker will need to subscribe to the PubSub's `"TRADE_EVENTS:#{symbol}"`

Following the pattern established by the `Naive.Trader`, we use the module's attributes(with values based on the configuration) instead of hardcoded module names.

Additionally, we used the `Core.PubSub` module(and we will use other `core` module's structs down below) so we need to add the `core` application to the dependencies list of the `indicator` application:

```{r, engine = 'elixir', eval = FALSE}
# /apps/indicator/mix.exs
defp deps do
[
{:core, in_umbrella: true} # <= added
...
```

As we subscribed to the PubSub, we need to provide a callback that will handle the incoming trade events:

```{r, engine = 'elixir', eval = FALSE}
Expand All @@ -93,8 +103,6 @@ As we subscribed to the PubSub, we need to provide a callback that will handle t
end
```

\newpage

To avoid mixing our business logic with the GenServer boilerplate(as discussed in the last chapter), we will place it in a new module. First, we need to create a new file `/apps/indicator/lib/indicator/ohlc.ex` and the `Indicator.Ohlc` module inside it:

```{r, engine = 'elixir', eval = FALSE}
Expand Down Expand Up @@ -301,7 +309,7 @@ We could continue with this exercise, add a Registry to be able to stop the indi

## Idiomatic solution

What we will focus on is the **usage** of processes(in our case the GenServers) in our solution. We've split our logic between multiple processes, each aggregating a single timeframe. All of those processes work in the same way. They subscribe to the PubSub topic, merge the incoming data into OHLC structs, and potentially broadcast them. The only difference is the timeframe that they use for merging data.
What we will focus on is the **usage of processes**(in our case the GenServers) in our solution. We've split our logic between multiple processes, each aggregating a single timeframe. All of those processes work in the same way. They subscribe to the PubSub topic, merge the incoming data into OHLC structs, and potentially broadcast them. The only difference is the timeframe that they use for merging data.

The solution feels very clean, but also we are using multiple processes to aggregate data for each symbol. In a situation like this, we should always ask ourselves:

Expand Down
8 changes: 4 additions & 4 deletions 20-idiomatic-trading-strategy.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ At this moment, the `generate_decisions/4` can look like overengineered `Enum.ma

\newpage

It's important to note that we are now passing four arguments into the `generate_decision` function - we added `current_positions` and `settings` - those will be required in the further updates as it was mentioned above. At this moment though, we will update the `generate_decision/2` clauses to include two additional arguments to **all** clauses:
It's important to note that we are now passing four arguments into the `generate_decision` function - we added `current_positions` and `settings` - those will be required in the further updates as it was mentioned above. At this moment though, we will update **all** the `generate_decision/2` clauses to include two additional arguments:

```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/strategy.ex
Expand Down Expand Up @@ -592,7 +592,7 @@ Now, the Trader will handle updating the settings, which we will add next, but b
# /apps/naive/lib/naive/strategy.ex
def update_status(symbol, status) # <= updated to public
when is_binary(symbol) and is_binary(status) do
@repo.get_by(Settings, symbol: symbol) <= updated to use @repo
@repo.get_by(Settings, symbol: symbol) # <= updated to use @repo
|> Ecto.Changeset.change(%{status: status})
|> @repo.update() # <= updated to use @repo
end
Expand Down Expand Up @@ -639,7 +639,7 @@ We can now move on to the `Naive.Trader` module, where we need to add a new `no
)
_ ->
Logger.warn("Unable to locate trader process assigned to #{symbol}")
Logger.warning("Unable to locate trader process assigned to #{symbol}")
{:error, :unable_to_locate_trader}
end
end
Expand Down Expand Up @@ -727,7 +727,7 @@ As we added a new `:exit` decision that we need to handle inside the `generate_d
...
```

Inside the recursive function, we are skipping all the positions that ended up with the `:exit` decisions. This will slowly cause the list of positions to drain to an empty list, which will cause the `parse_results/1` function to fail(as it expects non-empty list). We will add a new first clause to match the empty list of positions and return the `:exit` atom.:
Inside the recursive function, we are skipping all the positions that ended up with the `:exit` decisions. This will slowly cause the list of positions to drain to an empty list, which will cause the `parse_results/1` function to fail(as it expects non-empty list). We will add a new first clause to match the empty list of positions and return the `:exit` atom:

```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/lib/naive/strategy.ex
Expand Down
2 changes: 1 addition & 1 deletion 21-layers-of-abstraction.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ The other places we use the exchange are seed scripts that we need to update. Fi

```{r, engine = 'elixir', eval = FALSE}
# /apps/naive/priv/seed_settings.exs
exchange_client = Application.get_env(:naive, :exchange_client)
exchange_client = Application.compile_env(:naive, :exchange_client)
...
{:ok, symbols} = exchange_client.fetch_symbols()
...
Expand Down
Loading

0 comments on commit 90cc96f

Please sign in to comment.