diff --git a/wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex b/wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex index c0fae217..375ac123 100644 --- a/wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex +++ b/wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex @@ -36,6 +36,9 @@ defmodule WasmcloudHost.Lattice.StateMonitor do } } + conn = HostCore.Nats.control_connection(prefix) + {:ok, _sub} = Gnat.sub(conn, self(), "wasmbus.evt.#{prefix}") + PubSub.subscribe(:hostcore_pubsub, "latticeevents:#{prefix}") PubSub.subscribe(:hostcore_pubsub, "cacheloader:#{prefix}") @@ -113,9 +116,47 @@ defmodule WasmcloudHost.Lattice.StateMonitor do GenServer.call(:state_monitor, :refmaps_query) end + # handle cloud events from HostCore PubSub @impl true - def handle_info({:lattice_event, event}, state) do - state = handle_event(state, event) + def handle_info({:lattice_event, raw_event}, state) do + case Cloudevents.from_json(raw_event) do + {:ok, event} -> + process_event(state, event) + + # No-op + _ -> + Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring") + state + end + + {:noreply, state} + end + + # handle cloud events from NATS + @impl true + def handle_info( + {:msg, %{body: body, topic: topic}}, + state + ) do + state = + with true <- String.starts_with?(topic, "wasmbus.evt."), + {:ok, event = %Cloudevents.Format.V_1_0.Event{source: source}} <- + Cloudevents.from_json(body), + {pk, _pid, _prefix} <- WasmcloudHost.Application.first_host(), + true <- source != pk do + process_event(state, event) + else + # ignoring bad topic or event from this host + false -> + state + + {:error, _} -> + Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring") + state + + _ -> + state + end {:noreply, state} end @@ -158,18 +199,6 @@ defmodule WasmcloudHost.Lattice.StateMonitor do {:noreply, %State{state | refmaps: ocirefs}} end - defp handle_event(state, body) do - case Cloudevents.from_json(body) do - {:ok, evt} -> - process_event(state, evt) - - # No-op - _ -> - Logger.warning("Received event that couldn't be parsed as a Cloudevent, ignoring") - state - end - end - defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -186,6 +215,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do %State{state | hosts: hosts} end + # No-op. Ignore start failure events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.actor_start_failed" + } + ) do + state + end + defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -200,6 +239,26 @@ defmodule WasmcloudHost.Lattice.StateMonitor do %State{state | hosts: hosts} end + # No-op. Ignore actor updated events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.actor_updated" + } + ) do + state + end + + # No-op. Ignore actor update failure events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.actor_update_failed" + } + ) do + state + end + defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -218,6 +277,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do %State{state | hosts: hosts} end + # No-op. Ignore start failure events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.provider_start_failed" + } + ) do + state + end + defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -238,6 +307,33 @@ defmodule WasmcloudHost.Lattice.StateMonitor do %State{state | hosts: hosts} end + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + data: %{"labels" => labels}, + datacontenttype: "application/json", + source: source_host, + type: "com.wasmcloud.lattice.host_started" + } + ) do + new_host = %{actors: %{}, providers: %{}, labels: labels} + hosts = Map.put(state.hosts, source_host, new_host) + %State{state | hosts: hosts} + end + + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + data: _data, + datacontenttype: "application/json", + source: source_host, + type: "com.wasmcloud.lattice.host_stopped" + } + ) do + hosts = Map.delete(state.hosts, source_host) + %State{state | hosts: hosts} + end + defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -247,8 +343,6 @@ defmodule WasmcloudHost.Lattice.StateMonitor do type: "com.wasmcloud.lattice.host_heartbeat" } ) do - Logger.debug("Handling host heartbeat") - current_host = Map.get(state.hosts, source_host, %{}) # TODO: Also ensure that actors don't exist in the dashboard that aren't in the health check @@ -365,6 +459,16 @@ defmodule WasmcloudHost.Lattice.StateMonitor do end end + # No-op. Ignore periodic statuses where provider health hasn't changed + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.health_check_status" + } + ) do + state + end + defp process_event( state, %Cloudevents.Format.V_1_0.Event{ @@ -423,6 +527,26 @@ defmodule WasmcloudHost.Lattice.StateMonitor do %State{state | linkdefs: linkdefs} end + # No-op. Ignore invocation result events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.invocation_failed" + } + ) do + state + end + + # No-op. Ignore invocation result events + defp process_event( + state, + %Cloudevents.Format.V_1_0.Event{ + type: "com.wasmcloud.lattice.invocation_succeeded" + } + ) do + state + end + # Fallthrough event handler to prevent errors for new events defp process_event( state,