Skip to content

Commit

Permalink
fix/590 reconnect actor host call traces
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>

updated to grab context from handle, clippy

Signed-off-by: Brooks Townsend <[email protected]>

updated wasmcloud pin to main

Signed-off-by: Brooks Townsend <[email protected]>

correctly handled actor not running rpc case

Signed-off-by: Brooks Townsend <[email protected]>

refactored with statement for clarity

Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Mar 29, 2023
1 parent f71ad2d commit 0e32e72
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 198 deletions.
12 changes: 11 additions & 1 deletion host_core/lib/host_core/actors/actor_module.ex
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,18 @@ defmodule HostCore.Actors.ActorModule do
runtime_pid = Agent.get(agent, fn a -> a.runtime_pid end)
aref = Agent.get(agent, fn a -> a.actor_reference end)

call_context =
Tracer.current_span_ctx()
|> :erlang.term_to_binary()

ir =
case HostCore.WasmCloud.Runtime.Server.invoke_actor(runtime_pid, aref, operation, payload) do
case HostCore.WasmCloud.Runtime.Server.invoke_actor(
runtime_pid,
aref,
operation,
payload,
call_context
) do
{:ok, msg} ->
chunk_inv_response(%{
msg: msg,
Expand Down
8 changes: 7 additions & 1 deletion host_core/lib/host_core/actors/actor_rpc_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ defmodule HostCore.Actors.ActorRpcServer do

case Registry.lookup(Registry.ActorRegistry, actor_pk) do
[] ->
{:error, "Actor #{actor_pk} is not running. RPC call skipped."}
{:reply,
%{
msg: [],
error: "Invocation received for actor #{actor_pk} that is not running"
}
|> Msgpax.pack!()
|> IO.iodata_to_binary()}

actors ->
eligible_actors =
Expand Down
2 changes: 1 addition & 1 deletion host_core/lib/host_core/wasmcloud/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule HostCore.WasmCloud.Native do
def runtime_new(_config), do: error()
def start_actor(_runtime_resource, _bytes), do: error()
def version(_runtime_resource), do: error()
def call_actor(_actor_resource, _operation, _payload, _from), do: error()
def call_actor(_actor_resource, _operation, _payload, _call_context, _from), do: error()
def instance_receive_callback_result(_callback_token, _success, _result), do: error()

# When the NIF is loaded, it will override functions in this module.
Expand Down
4 changes: 3 additions & 1 deletion host_core/lib/host_core/wasmcloud/runtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ defmodule HostCore.WasmCloud.Runtime do
HostCore.WasmCloud.Runtime.ActorReference.t(),
binary(),
binary(),
binary(),
GenServer.from()
) :: :ok
def call_actor(
%HostCore.WasmCloud.Runtime.ActorReference{resource: actor_resource},
operation,
payload,
call_context,
from
) do
HostCore.WasmCloud.Native.call_actor(actor_resource, operation, payload, from)
HostCore.WasmCloud.Native.call_actor(actor_resource, operation, payload, call_context, from)
end

defimpl Inspect, for: HostCore.WasmCloud.Runtime do
Expand Down
25 changes: 20 additions & 5 deletions host_core/lib/host_core/wasmcloud/runtime_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule HostCore.WasmCloud.Runtime.Server do
"""
use GenServer
require Logger
require OpenTelemetry.Tracer, as: Tracer

alias HostCore.WasmCloud.Runtime.Config, as: RuntimeConfig
alias HostCore.WasmCloud.Runtime.ActorReference
Expand Down Expand Up @@ -63,20 +64,22 @@ defmodule HostCore.WasmCloud.Runtime.Server do
pid :: pid(),
actor_reference :: ActorReference.t(),
operation :: binary(),
payload :: binary()
payload :: binary(),
call_context :: binary()
) :: {:ok, binary()} | {:error, binary()}
def invoke_actor(pid, actor_reference, operation, payload) do
GenServer.call(pid, {:invoke_actor, actor_reference, operation, payload})
def invoke_actor(pid, actor_reference, operation, payload, call_context) do
GenServer.call(pid, {:invoke_actor, actor_reference, operation, payload, call_context})
end

# calls into the NIF to invoke the given operation on the indicated actor instance
@impl true
def handle_call({:invoke_actor, actor_reference, operation, payload}, from, state) do
def handle_call({:invoke_actor, actor_reference, operation, payload, call_context}, from, state) do
:ok =
HostCore.WasmCloud.Runtime.call_actor(
actor_reference,
operation,
payload,
call_context,
from
)

Expand Down Expand Up @@ -122,13 +125,25 @@ defmodule HostCore.WasmCloud.Runtime.Server do

@impl true
def handle_info(
{:invoke_callback, claims, binding, namespace, operation, payload, token},
{:invoke_callback, claims, {binding, namespace, operation}, payload, call_context, token},
{_runtime, config} = state
) do
Task.Supervisor.start_child(RuntimeCallSupervisor, fn ->
# This callback is invoked by the wasmcloud::Runtime's host call handler
payload = payload |> IO.iodata_to_binary()

# Convert the `call_context` list of bytes to the `span_ctx` term
with true <- is_list(call_context),
ctx_binary <- :erlang.list_to_binary(call_context),
true <- is_binary(ctx_binary),
span_ctx <- :erlang.binary_to_term(ctx_binary) do
Tracer.set_current_span(span_ctx)
else
# If the call context didn't contain a span, that's ok and we silently ignore
_ ->
:ok
end

{success, return_value} =
try do
do_invocation(claims, binding, namespace, operation, payload, config)
Expand Down
Loading

0 comments on commit 0e32e72

Please sign in to comment.