Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add SDS protocol for scalable e2e reliability #108

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions vac/raw/sds.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
---
title: SDS
name: Scalable Data Sync protocol for distributed logs
status: raw
editor: Hanno Cornelius <[email protected]>
contributors:
- Akhil Peddireddy <[email protected]>
---

## Abstract

This specification introduces the Scalable Data Sync (SDS) protocol to achieve end-to-end reliability
when consolidating distributed logs in a decentralized manner.
The protocol is designed for a peer-to-peer (p2p) topology
where an append-only log is maintained by each member of a group of nodes
who may individually append new entries to their local log at any time and
is interested in merging new entries from other nodes in real-time or close to real-time
while maintaining a consistent order.
The outcome of the log consolidation procedure is
that all nodes in the group eventually reflect in their own logs the same entries in the same order.
The protocol aims to scale to very large groups.

## Motivation

A common application that fits this model is a p2p group chat (or group communication),
where the participants act as log nodes
and the group conversation is modelled as the consolidated logs maintained on each node.
The problem of end-to-end reliability can then be stated as
ensuring that all participants eventually see the same sequence of messages
in the same causal order,
despite the challenges of network latency, message loss, and scalability present in any communications transport layer.
The rest of this document will assume the terminology of a group communication:
log nodes being the _participants_ in the group chat
and the logged entries being the _messages_ exchanged between participants.
jm-clius marked this conversation as resolved.
Show resolved Hide resolved

## Design Assumptions

We make the following simplifying assumptions for a proposed reliability protocol:

* **Broadcast routing:**
Messages are broadcast disseminated by the underlying transport.
The selected transport takes care of routing messages to all participants of the communication.
* **Store nodes:**
There are high-availability caches (a.k.a. Store nodes) from which missed messages can be retrieved.
These caches maintain the full history of all messages that have been broadcast.
This is an optional element in the protocol design,
but improves scalability by reducing direct interactions between participants.
* **Message ID:**
Each message has a globally unique, immutable ID (or hash).
Messages can be requested from the high-availability caches or other participants using the corresponding message ID.

## Wire protocol

Check failure on line 52 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Headings should be surrounded by blank lines

vac/raw/sds.md:52 MD022/blanks-around-headings Headings should be surrounded by blank lines [Expected: 1; Actual: 0; Below] [Context: "## Wire protocol"] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md022.md
The keywords “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”,

Check failure on line 53 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Trailing spaces

vac/raw/sds.md:53:77 MD009/no-trailing-spaces Trailing spaces [Expected: 0 or 2; Actual: 1] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md009.md
“SHOULD NOT”, “RECOMMENDED”, “MAY”, and
“OPTIONAL” in this document are to be interpreted as described in [2119](https://www.ietf.org/rfc/rfc2119.txt).
### Message

Check failure on line 56 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Headings should be surrounded by blank lines

vac/raw/sds.md:56 MD022/blanks-around-headings Headings should be surrounded by blank lines [Expected: 1; Actual: 0; Above] [Context: "### Message"] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md022.md

Messages MUST adhere to the following meta structure:

```protobuf
syntax = "proto3";

message Message {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may actually make sense to have a LogEntry or Entry terminology in this spec because the term message is very generic, and actually already a bit confusing in the status-go codebase.

Or maybe doing like WakuMessage and prefix/postfix to help with setting the context.

Copy link
Contributor Author

@jm-clius jm-clius Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, although I'm struggling to find a term that wouldn't create confusion elsewhere. "Log" and "log entries" are already overloaded terms too and the mental model of "messages = log entries" may not be immediately apparent to someone approaching the code. Perhaps just calling it a SyncedMessage? Wdyt?

// 1 Reserved for sender/participant id
string message_id = 2; // Unique identifier of the message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concretely, is that a Waku message id or a Status envelop id?

I assume Waku message id because one needs to be able to retrieve it from a store node?

But then if you cannot find the message from a Waku store, you can't have a re-transmission of it because it would change the hash of the message (timestamp is included).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a Status-level ID as the Waku message ID would (well, should) be inaccessible at this layer. A mapping could be done from SDS message ID to Waku message hash that gets updated with each retransmission.

string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
optional repeated string causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
optional bytes content = 20; // Actual content of the message
}
```

Each message MUST include its globally unique identifier in the `message_id` field, likely based on a message hash.
The `channel_id` field MUST be set to the identifier of the channel of group communication that is being synchronized.
For simple group communications without individual channels,
the `channel_id` SHOULD be set to `0`.
The `lamport_timestamp`, `causal_history` and `bloom_filter` fields MUST be set according to the [protocol steps](#protocol-steps) set out below.
These fields MAY be left unset in the case of [ephemeral messages](#ephemeral-messages).
The message `content` MAY be left empty for [periodic sync messages](#periodic-sync-message),
otherwise it MUST contain the application-level content

### Participant state

Each participant MUST maintain:
- A Lamport timestamp for each channel of communication,

Check failure on line 86 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Unordered list style

vac/raw/sds.md:86:1 MD004/ul-style Unordered list style [Expected: asterisk; Actual: dash] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md004.md

Check failure on line 86 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Lists should be surrounded by blank lines

vac/raw/sds.md:86 MD032/blanks-around-lists Lists should be surrounded by blank lines [Context: "- A Lamport timestamp for each..."] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md032.md
initialized to current epoch time in nanosecond resolution.
- A bloom filter for received message IDs per channel.

Check failure on line 88 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Unordered list style

vac/raw/sds.md:88:1 MD004/ul-style Unordered list style [Expected: asterisk; Actual: dash] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md004.md
The bloom filter SHOULD be rolled over and recomputed once it reaches a predefined capacity of message IDs.
Furthermore, it SHOULD be designed to minimize false positives through an optimal selection of size and hash functions.
- A buffer for unacknowledged outgoing messages

Check failure on line 91 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Unordered list style

vac/raw/sds.md:91:1 MD004/ul-style Unordered list style [Expected: asterisk; Actual: dash] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md004.md
- A buffer for incoming messages with unmet causal dependencies

Check failure on line 92 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Unordered list style

vac/raw/sds.md:92:1 MD004/ul-style Unordered list style [Expected: asterisk; Actual: dash] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md004.md
- A local log (or history) for each channel,

Check failure on line 93 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Unordered list style

vac/raw/sds.md:93:1 MD004/ul-style Unordered list style [Expected: asterisk; Actual: dash] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md004.md
containing all message IDs in the communication channel,
ordered by Lamport timestamp.

Messages in the unacknowledged outgoing buffer can be in one of three states:
1. **Unacknowledged** - there has been no acknowledgement of message receipt by any participant in the channel

Check failure on line 98 in vac/raw/sds.md

View workflow job for this annotation

GitHub Actions / lint

Lists should be surrounded by blank lines

vac/raw/sds.md:98 MD032/blanks-around-lists Lists should be surrounded by blank lines [Context: "1. **Unacknowledged** - there ..."] https://github.com/DavidAnson/markdownlint/blob/v0.33.0/doc/md032.md
2. **Possibly acknowledged** - there has been ambiguous indication that the message has been _possibly_ received by at least one participant in the channel
3. **Acknowledged** - there has been sufficient indication that the message has been received by at least some of the participants in the channel.
This state will also remove the message from the outgoing buffer.

### Protocol Steps

For each channel of communication,
participants MUST follow these protocol steps to populate and interpret
the `lamport_timestamp`, `causal_history` and `bloom_filter` fields.

#### Send Message

Before broadcasting a message:
- the participant MUST increase its local Lamport timestamp by `1` and include this in the `lamport_timestamp` field.
- the participant MUST determine the preceding few message IDs in the local history and include these in an ordered list in the `causal_history` field.
The number of message IDs to include in the `causal_history` depends on the application.
We recommend a causal history of two message IDs.
- the participant MUST include the current `bloom_filter` state in the broadcast message.

After broadcasting a message, the message MUST be added to the participant’s buffer of unacknowledged outgoing messages.

#### Receive Message

Upon receiving a message,
- the participant MUST [review the ACK status](#review-ack-status) of messages in its unacknowledged outgoing buffer
using the received message's causal history and bloom filter.
- the participant MUST include the received message ID in its local bloom filter.
- the participant MUST verify that all causal dependencies are met for the received message.
Dependencies are met if the message IDs in the `causal_history` of the received message
appear in the local history of the receiving participant.

If all dependencies are met,
the participant MUST [deliver the message](#deliver-message).
If dependencies are unmet,
the participant MUST add the message to the incoming buffer of messages with unmet causal dependencies.

#### Deliver Message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be Record message instead? I understand this step is about inscribing the message to the local log to handover to the application. or Transcribe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we chose "deliver" as it matches terminology in related literature (specifically here: https://arxiv.org/pdf/2012.00472)


Triggered by the [Receive Message](#receive-message) procedure.

If the received message’s Lamport timestamp is greater than the participant's local Lamport timestamp,
the participant MUST update its local Lamport timestamp to match the received message.
The participant MUST insert the message ID into its local log,
based on Lamport timestamp.
If one or more message IDs with the same Lamport timestamp already exists,
the participant MUST follow the [Resolve Conflicts](#resolve-conflicts) procedure.

#### Resolve Conflicts

Triggered by the [Deliver Message](#deliver-message) procedure.

The participant MUST order messages with the same Lamport timestamp in ascending order of message ID.
jm-clius marked this conversation as resolved.
Show resolved Hide resolved
If the message ID is implemented as a hash of the message,
this means the message with the lowest hash would precede
other messages with the same Lamport timestamp in the local log.

#### Review ACK Status

Triggered by the [Receive Message](#receive-message) procedure.

For each message in the unacknowledged outgoing buffer,
based on the received `bloom_filter` and `causal_history`:
- the participant MUST mark all messages in the received `causal_history` as **acknowledged**.
- the participant MUST mark all messages included in the `bloom_filter` as **possibly acknowledged**.
If a message appears as **possibly acknowledged** in multiple received bloom filters,
the participant MAY mark it as acknowledged based on probabilistic grounds,
taking into account the bloom filter size and hash number.

#### Periodic Incoming Buffer Sweep

The participant MUST periodically check causal dependencies for each message in the incoming buffer.
For each message in the incoming buffer:
- the participant MAY attempt to retrieve missing dependencies from the Store node (high-availability cache) or other peers.
- if all dependencies of a message are met,
the participant MUST proceed to [deliver the message](#deliver-message).

If a message's causal dependencies have failed to be met after a predetermined amount of time,
the participant MAY mark them as **irretrievably lost**.

#### Periodic Outgoing Buffer Sweep

The participant MUST rebroadcast **unacknowledged** outgoing messages after a set period.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waku message hash changes when re-broadcast. How is this handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this spec, as it focuses on the SDS protocol in isolation.
In practice, the various Waku "transport level" hashes should be mapped to the SDS-level message ID. When a message gets rebroadcast, the Waku hash may be updated for the same message ID entry.

The participant SHOULD use distinct resend periods for **unacknowledged** and **possibly acknowledged** messages,
prioritizing **unacknowledged** messages.

#### Periodic Sync Message

For each channel of communication,
participants SHOULD periodically send an empty-content message to maintain sync state,
without incrementing the Lamport timestamp.
To avoid network activity bursts in large groups,
a participant MAY choose to only send periodic sync messages if no other messages have been broadcast in the channel after a random backoff period.

Participants MUST process these sync messages following the same steps as regular messages.

#### Ephemeral Messages

Participants MAY choose to send short-lived messages for which no synchronization or reliability is required.
These messages are termed _ephemeral_.

Ephemeral messages SHOULD be sent with `lamport_timestamp`, `causal_history`, and `bloom_filter` unset.
Ephemeral messages SHOULD NOT be added to the unacknowledged outgoing buffer after broadcast.
Upon reception,
ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies or including in the local log.

## Copyright

Copyright and related rights waived via [CC0](https://creativecommons.org/publicdomain/zero/1.0/).
Loading