diff --git a/packages/elixir-client/lib/electric/client/message.ex b/packages/elixir-client/lib/electric/client/message.ex index ed992d6f13..8d323f959a 100644 --- a/packages/elixir-client/lib/electric/client/message.ex +++ b/packages/elixir-client/lib/electric/client/message.ex @@ -54,18 +54,32 @@ defmodule Electric.Client.Message do offset: Offset.t() } + require Logger + def from_message(msg, value_mapping_fun) do %{ "headers" => headers, "offset" => offset, - "value" => value + "value" => raw_value } = msg + value = + try do + value_mapping_fun.(raw_value) + rescue + exception -> + Logger.error( + "Unable to cast field values: #{Exception.format(:error, exception, __STACKTRACE__)}" + ) + + reraise exception, __STACKTRACE__ + end + %__MODULE__{ key: msg["key"], offset: Client.Offset.from_string!(offset), headers: Headers.from_message(headers), - value: value_mapping_fun.(value) + value: value } end end @@ -73,7 +87,7 @@ defmodule Electric.Client.Message do defmodule ResumeMessage do @moduledoc """ Emitted by the synchronisation stream before terminating early. If passed - as an option to [`Client.stream`](`Electric.Client.stream/3`) allows for + as an option to [`Client.stream/3`](`Electric.Client.stream/3`) allows for resuming a shape stream at the given point. E.g.