Skip to content

Commit

Permalink
Emit telemetry event on Phoenix broadcast (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkgil authored Mar 12, 2024
1 parent 4f50c79 commit da3d425
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
18 changes: 16 additions & 2 deletions lib/phoenix/channel/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ defmodule Phoenix.Channel.Server do
def join(socket, channel, message, opts) do
%{topic: topic, payload: payload, ref: ref, join_ref: join_ref} = message

starter = opts[:starter] || &PoolSupervisor.start_child/3
starter = opts[:starter] || (&PoolSupervisor.start_child/3)
assigns = Map.merge(socket.assigns, Keyword.get(opts, :assigns, %{}))
socket = %{socket | topic: topic, channel: channel, join_ref: join_ref || ref, assigns: assigns}

socket = %{
socket
| topic: topic,
channel: channel,
join_ref: join_ref || ref,
assigns: assigns
}

ref = make_ref()
from = {self(), ref}
child_spec = channel.child_spec({socket.endpoint, from})
Expand Down Expand Up @@ -85,6 +93,12 @@ defmodule Phoenix.Channel.Server do
Hook invoked by Phoenix.PubSub dispatch.
"""
def dispatch(subscribers, from, %Broadcast{event: event} = msg) do
:telemetry.execute(
[:phoenix, :endpoint, :broadcast],
%{},
%{subscribers: subscribers, message: Map.from_struct(msg)}
)

Enum.reduce(subscribers, %{}, fn
{pid, _}, cache when pid == from ->
cache
Expand Down
38 changes: 38 additions & 0 deletions test/phoenix/endpoint/endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,44 @@ defmodule Phoenix.Endpoint.EndpointTest do
event: "event3", payload: %{key: :val}, topic: "sometopic"}
end

test "emits telemetry event on pubsub broadcast", ctx do
me = self()

:telemetry.attach(
ctx.test,
[:phoenix, :endpoint, :broadcast],
fn event, measurements, metadata, _config ->
send(me, {:telemetry, event, measurements, metadata})
end,
nil
)

Endpoint.broadcast!("atopic", "event1", %{key: :val})

refute_receive {:telemetry, _, _, _}

Endpoint.subscribe("atopic")
some = spawn fn -> :ok end

Endpoint.broadcast_from(some, "atopic", "event1", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event1", payload: %{key: :val}}}}

Endpoint.broadcast_from!(some, "atopic", "event2", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event2", payload: %{key: :val}}}}

Endpoint.broadcast("atopic", "event3", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event3", payload: %{key: :val}}}}

Endpoint.broadcast!("atopic", "event4", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event4", payload: %{key: :val}}}}

Endpoint.local_broadcast_from(some, "atopic", "event5", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event5", payload: %{key: :val}}}}

Endpoint.local_broadcast("atopic", "event6", %{key: :val})
assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event6", payload: %{key: :val}}}}
end

test "loads cache manifest from specified application" do
config = put_in(@config[:cache_static_manifest], {:phoenix, "../../../../test/fixtures/digest/compile/cache_manifest.json"})

Expand Down

0 comments on commit da3d425

Please sign in to comment.