diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index c1adf09788..c66b5e1537 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -375,7 +375,14 @@ defmodule Electric.ShapeCacheTest do end test "correctly propagates the error", %{shape_cache_opts: opts} do - shape = %Shape{root_table: {"public", "nonexistent"}, root_table_id: 2} + shape = %Shape{ + @shape + | root_table: {"public", "nonexistent"}, + root_table_id: 2, + table_info: %{ + {"public", "nonexistent"} => Map.fetch!(@shape.table_info, @shape.root_table) + } + } {shape_handle, log} = with_log(fn -> diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 863c2d603e..2ffa4108a3 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -312,12 +312,16 @@ defmodule Electric.Shapes.ConsumerTest do last_log_offset = LogOffset.new(lsn, 0) Mock.ShapeStatus - |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) - |> allow( - self(), - Shapes.Consumer.name(ctx.stack_id, @shape_handle1) + |> expect(:remove_shape, 1, fn _, @shape_handle1 -> :ok end) + |> allow(self(),Consumer.name(ctx.stack_id, @shape_handle1) ) + shape1 = ctx.shapes[@shape_handle1] + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + + txn = %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} |> Transaction.prepend_change(%Changes.TruncatedRelation{ @@ -364,6 +368,7 @@ defmodule Electric.Shapes.ConsumerTest do lsn = Lsn.from_string("0/10") last_log_offset = LogOffset.new(lsn, 0) + Mock.ShapeStatus |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) |> allow( @@ -371,6 +376,14 @@ defmodule Electric.Shapes.ConsumerTest do Shapes.Consumer.name(ctx.stack_id, @shape_handle1) ) + shape = ctx.shapes[@shape_handle1] + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape, _ -> :ok end) + |> allow( + self(), + Shapes.Consumer.name(ctx.stack_id, @shape_handle1) + ) + txn = %Transaction{ xid: xid, @@ -510,6 +523,14 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + assert :ok = ShapeLogCollector.handle_relation_msg(rel, ctx.producer) assert_receive {:DOWN, ^ref1, :process, _, _} @@ -551,6 +572,14 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) assert_receive {Support.TestStorage, :cleanup!, @shape_handle1} @@ -573,6 +602,14 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + shape1 = ctx.shapes[@shape_handle1] + shape2 = ctx.shapes[@shape_handle2] + Mock.PublicationManager + |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + GenServer.cast(Consumer.name(ctx.stack_id, @shape_handle1), :unexpected_cast) assert_receive {Support.TestStorage, :cleanup!, @shape_handle1} @@ -590,6 +627,12 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 0, fn _, _ -> :ok end) |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + shape1 = ctx.shapes[@shape_handle1] + Mock.PublicationManager + |> expect(:remove_shape, 0, fn ^shape1, _ -> :ok end) + |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + + GenServer.stop(Consumer.name(ctx.stack_id, @shape_handle1)) refute_receive {Support.TestStorage, :cleanup!, @shape_handle1}