Skip to content

Commit

Permalink
Chunk large satellite payloads (#1399)
Browse files Browse the repository at this point in the history
To prevent the 100MB payload limit:
```
[Symbol(kError)]: RangeError: Max payload size exceeded
      at Receiver.haveLength (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/[email protected]/node_modules/ws/lib/receiver.js:419:28)
      at Receiver.getPayloadLength64 (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/[email protected]/node_modules/ws/lib/receiver.js:406:10)
      at Receiver.startLoop (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/[email protected]/node_modules/ws/lib/receiver.js:161:16)
      at Receiver._write (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/[email protected]/node_modules/ws/lib/receiver.js:94:10)
      at writeOrBuffer (node:internal/streams/writable:564:12)
      at _write (node:internal/streams/writable:493:10)
      at Writable.write (node:internal/streams/writable:502:10)
      at Socket.socketOnData (/Users/rob/src/electric-sql/electric/node_modules/.pnpm/[email protected]/node_modules/ws/lib/websocket.js:1303:35)
      at Socket.emit (node:events:519:28)
      at addChunk (node:internal/streams/readable:559:12) {
    code: 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH',
    [Symbol(status-code)]: 1009
    ```
  • Loading branch information
robacourt authored Jul 1, 2024
1 parent ab3e89a commit 9c16bb5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/rare-dancers-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Limit the number of changes in a websocket frame to 100 changes to reduce the chance of frame exceeding 100MB limit in the case where there are lots of changes
13 changes: 10 additions & 3 deletions components/electric/lib/electric/satellite/serialization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule Electric.Satellite.Serialization do
commit_op = %SatTransOp{op: {:commit, tx_commit}}

{
[%SatOpLog{ops: [begin_op | Enum.reverse([commit_op | state.ops])]}],
messages_from_ops([begin_op | Enum.reverse([commit_op | state.ops])]),
state.new_relations,
state.known_relations
}
Expand All @@ -93,7 +93,7 @@ defmodule Electric.Satellite.Serialization do
# The changes cannot be migration relations, so our "state" is limited
state = Enum.reduce(changes, state, &serialize_change/2)

{[%SatOpLog{ops: [begin_op | state.ops]}], state.new_relations, state.known_relations}
{messages_from_ops([begin_op | state.ops]), state.new_relations, state.known_relations}
end

def serialize_shape_data_as_tx(changes, known_relations) do
Expand All @@ -106,7 +106,14 @@ defmodule Electric.Satellite.Serialization do
# The changes cannot be migration relations, so our "state" is limited
state = Enum.reduce(changes, state, &serialize_change/2)

{[%SatOpLog{ops: state.ops}], state.new_relations, state.known_relations}
{messages_from_ops(state.ops), state.new_relations, state.known_relations}
end

@max_ops_per_message 100
defp messages_from_ops(ops) do
ops
|> Enum.chunk_every(@max_ops_per_message)
|> Enum.map(&%SatOpLog{ops: &1})
end

defp serialize_change(record, state) when is_migration_relation(record.relation) do
Expand Down

0 comments on commit 9c16bb5

Please sign in to comment.