Skip to content

Commit

Permalink
Fix/washboard events (#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
connorsmith256 authored Mar 31, 2023
1 parent 69b7889 commit 1a8ec38
Showing 1 changed file with 140 additions and 16 deletions.
156 changes: 140 additions & 16 deletions wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
}
}

conn = HostCore.Nats.control_connection(prefix)
{:ok, _sub} = Gnat.sub(conn, self(), "wasmbus.evt.#{prefix}")

PubSub.subscribe(:hostcore_pubsub, "latticeevents:#{prefix}")
PubSub.subscribe(:hostcore_pubsub, "cacheloader:#{prefix}")

Expand Down Expand Up @@ -113,9 +116,47 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
GenServer.call(:state_monitor, :refmaps_query)
end

# handle cloud events from HostCore PubSub
@impl true
def handle_info({:lattice_event, event}, state) do
state = handle_event(state, event)
def handle_info({:lattice_event, raw_event}, state) do
case Cloudevents.from_json(raw_event) do
{:ok, event} ->
process_event(state, event)

# No-op
_ ->
Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring")
state
end

{:noreply, state}
end

# handle cloud events from NATS
@impl true
def handle_info(
{:msg, %{body: body, topic: topic}},
state
) do
state =
with true <- String.starts_with?(topic, "wasmbus.evt."),
{:ok, event = %Cloudevents.Format.V_1_0.Event{source: source}} <-
Cloudevents.from_json(body),
{pk, _pid, _prefix} <- WasmcloudHost.Application.first_host(),
true <- source != pk do
process_event(state, event)
else
# ignoring bad topic or event from this host
false ->
state

{:error, _} ->
Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring")
state

_ ->
state
end

{:noreply, state}
end
Expand Down Expand Up @@ -158,18 +199,6 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
{:noreply, %State{state | refmaps: ocirefs}}
end

defp handle_event(state, body) do
case Cloudevents.from_json(body) do
{:ok, evt} ->
process_event(state, evt)

# No-op
_ ->
Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring")
state
end
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand All @@ -186,6 +215,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
%State{state | hosts: hosts}
end

# No-op. Ignore start failure events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actor_start_failed"
}
) do
state
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand All @@ -200,6 +239,26 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
%State{state | hosts: hosts}
end

# No-op. Ignore actor updated events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actor_updated"
}
) do
state
end

# No-op. Ignore actor update failure events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actor_update_failed"
}
) do
state
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand All @@ -218,6 +277,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
%State{state | hosts: hosts}
end

# No-op. Ignore start failure events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.provider_start_failed"
}
) do
state
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand All @@ -238,6 +307,33 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
%State{state | hosts: hosts}
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
data: %{"labels" => labels},
datacontenttype: "application/json",
source: source_host,
type: "com.wasmcloud.lattice.host_started"
}
) do
new_host = %{actors: %{}, providers: %{}, labels: labels}
hosts = Map.put(state.hosts, source_host, new_host)
%State{state | hosts: hosts}
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
data: _data,
datacontenttype: "application/json",
source: source_host,
type: "com.wasmcloud.lattice.host_stopped"
}
) do
hosts = Map.delete(state.hosts, source_host)
%State{state | hosts: hosts}
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand All @@ -247,8 +343,6 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
type: "com.wasmcloud.lattice.host_heartbeat"
}
) do
Logger.debug("Handling host heartbeat")

current_host = Map.get(state.hosts, source_host, %{})

# TODO: Also ensure that actors don't exist in the dashboard that aren't in the health check
Expand Down Expand Up @@ -365,6 +459,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
end
end

# No-op. Ignore periodic statuses where provider health hasn't changed
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.health_check_status"
}
) do
state
end

defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
Expand Down Expand Up @@ -423,6 +527,26 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
%State{state | linkdefs: linkdefs}
end

# No-op. Ignore invocation result events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.invocation_failed"
}
) do
state
end

# No-op. Ignore invocation result events
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.invocation_succeeded"
}
) do
state
end

# Fallthrough event handler to prevent errors for new events
defp process_event(
state,
Expand Down

0 comments on commit 1a8ec38

Please sign in to comment.