Skip to content

Commit f33c781

Browse files
committed
Refactor Default
1 parent 56a4ebc commit f33c781

File tree

1 file changed

+14
-36
lines changed
  • packages/sync-service/lib/electric/shapes/consumer/event_handler

1 file changed

+14
-36
lines changed

packages/sync-service/lib/electric/shapes/consumer/event_handler/default.ex

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Default do
66
alias Electric.Replication.Changes
77
alias Electric.Replication.Changes.Transaction
88
alias Electric.Shapes.Consumer.LogOp
9-
alias Electric.Shapes.Consumer.Materializer
109
alias Electric.Shapes.Consumer.Plan
1110
alias Electric.Shapes.Shape
1211
alias Electric.Utils.ResultStream
@@ -28,28 +27,16 @@ defmodule Electric.Shapes.Consumer.EventHandler.Default do
2827
last_log_offset: last_log_offset,
2928
changes: changes
3029
}) do
31-
extra_refs =
32-
if Shape.are_deps_filled(state.shape) do
33-
refs = Materializer.get_all_as_refs(state.shape, state.stack_id)
34-
{refs, refs}
35-
end
36-
37-
result =
38-
changes
39-
|> Stream.map(fn
40-
%Changes.TruncatedRelation{} -> {:error, :truncate}
41-
change -> {:ok, change}
42-
end)
43-
|> ResultStream.flat_map(
44-
&Shape.convert_change(state.shape, &1,
45-
stack_id: state.stack_id,
46-
shape_handle: state.shape_handle,
47-
extra_refs: extra_refs
48-
)
30+
changes
31+
|> Stream.map(&error_on_truncate/1)
32+
|> ResultStream.flat_map(
33+
&Shape.convert_change(state.shape, &1,
34+
stack_id: state.stack_id,
35+
shape_handle: state.shape_handle
4936
)
50-
|> ResultStream.to_list()
51-
52-
case result do
37+
)
38+
|> ResultStream.to_list()
39+
|> case do
5340
{:error, :truncate} ->
5441
{:stop, {:truncate, xid}}
5542

@@ -70,25 +57,16 @@ defmodule Electric.Shapes.Consumer.EventHandler.Default do
7057
{:ok, state, %Plan{}}
7158
end
7259

73-
def handle_event(_state, {:pg_snapshot_known, _snapshot}) do
74-
raise ArgumentError, "received {:pg_snapshot_known, snapshot} in Default handler"
75-
end
76-
77-
def handle_event(_state, {:query_move_in_complete, _rows, _move_in_lsn}) do
78-
raise ArgumentError, "received {:query_move_in_complete, ...} in Default handler"
79-
end
80-
81-
def handle_event(_state, {:materializer_changes, _dep_handle, _payload}) do
82-
raise ArgumentError, "received {:materializer_changes, ...} in Default handler"
83-
end
84-
8560
@impl true
8661
def routing_views(_state), do: %{}
8762

8863
defp mark_last_change([]), do: []
8964

9065
defp mark_last_change(changes) do
91-
{last, rest} = List.pop_at(changes, -1)
92-
rest ++ [%{last | last?: true}]
66+
[last | rest] = Enum.reverse(changes)
67+
Enum.reverse([%{last | last?: true} | rest])
9368
end
69+
70+
defp error_on_truncate(%Changes.TruncatedRelation{}), do: {:error, :truncate}
71+
defp error_on_truncate(change), do: {:ok, change}
9472
end

0 commit comments

Comments
 (0)