From da3d425afb0d98b33390394fcab1b63694d069bb Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Tue, 12 Mar 2024 10:39:27 +0100 Subject: [PATCH] Emit telemetry event on Phoenix broadcast (#8) --- lib/phoenix/channel/server.ex | 18 ++++++++++-- test/phoenix/endpoint/endpoint_test.exs | 38 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/lib/phoenix/channel/server.ex b/lib/phoenix/channel/server.ex index b5b4906d62..3953e53a11 100644 --- a/lib/phoenix/channel/server.ex +++ b/lib/phoenix/channel/server.ex @@ -18,9 +18,17 @@ defmodule Phoenix.Channel.Server do def join(socket, channel, message, opts) do %{topic: topic, payload: payload, ref: ref, join_ref: join_ref} = message - starter = opts[:starter] || &PoolSupervisor.start_child/3 + starter = opts[:starter] || (&PoolSupervisor.start_child/3) assigns = Map.merge(socket.assigns, Keyword.get(opts, :assigns, %{})) - socket = %{socket | topic: topic, channel: channel, join_ref: join_ref || ref, assigns: assigns} + + socket = %{ + socket + | topic: topic, + channel: channel, + join_ref: join_ref || ref, + assigns: assigns + } + ref = make_ref() from = {self(), ref} child_spec = channel.child_spec({socket.endpoint, from}) @@ -85,6 +93,12 @@ defmodule Phoenix.Channel.Server do Hook invoked by Phoenix.PubSub dispatch. """ def dispatch(subscribers, from, %Broadcast{event: event} = msg) do + :telemetry.execute( + [:phoenix, :endpoint, :broadcast], + %{}, + %{subscribers: subscribers, message: Map.from_struct(msg)} + ) + Enum.reduce(subscribers, %{}, fn {pid, _}, cache when pid == from -> cache diff --git a/test/phoenix/endpoint/endpoint_test.exs b/test/phoenix/endpoint/endpoint_test.exs index 720fb6467a..36e8ff68fc 100644 --- a/test/phoenix/endpoint/endpoint_test.exs +++ b/test/phoenix/endpoint/endpoint_test.exs @@ -272,6 +272,44 @@ defmodule Phoenix.Endpoint.EndpointTest do event: "event3", payload: %{key: :val}, topic: "sometopic"} end + test "emits telemetry event on pubsub broadcast", ctx do + me = self() + + :telemetry.attach( + ctx.test, + [:phoenix, :endpoint, :broadcast], + fn event, measurements, metadata, _config -> + send(me, {:telemetry, event, measurements, metadata}) + end, + nil + ) + + Endpoint.broadcast!("atopic", "event1", %{key: :val}) + + refute_receive {:telemetry, _, _, _} + + Endpoint.subscribe("atopic") + some = spawn fn -> :ok end + + Endpoint.broadcast_from(some, "atopic", "event1", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event1", payload: %{key: :val}}}} + + Endpoint.broadcast_from!(some, "atopic", "event2", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event2", payload: %{key: :val}}}} + + Endpoint.broadcast("atopic", "event3", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event3", payload: %{key: :val}}}} + + Endpoint.broadcast!("atopic", "event4", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event4", payload: %{key: :val}}}} + + Endpoint.local_broadcast_from(some, "atopic", "event5", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event5", payload: %{key: :val}}}} + + Endpoint.local_broadcast("atopic", "event6", %{key: :val}) + assert_receive {:telemetry, _, %{}, %{subscribers: [{^me, nil}], message: %{topic: "atopic", event: "event6", payload: %{key: :val}}}} + end + test "loads cache manifest from specified application" do config = put_in(@config[:cache_static_manifest], {:phoenix, "../../../../test/fixtures/digest/compile/cache_manifest.json"})