Skip to content

Commit

Permalink
feat(sse): support last-event-id
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Sep 7, 2023
1 parent ca55eb6 commit 9bf745e
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 1 deletion.
131 changes: 131 additions & 0 deletions features/sse.testfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
launch :anycable,
"./dist/anycable-go --sse --turbo_rails_cleartext --jwt_id_key=qwerty --broadcast_adapter=http --presets=broker"

wait_tcp 8080

payload = {ext: {}.to_json, exp: (Time.now.to_i + 60)}

token = ::JWT.encode(payload, "qwerty", "HS256")
stream_name = "chat/2023"

require "uri"
require "net/http"
require "fiber"

identifier = URI.encode_www_form_component({channel: "Turbo::StreamsChannel", signed_stream_name: stream_name}.to_json)

url = "http://localhost:8080/events?jid=#{token}&identifier=#{identifier}"

Event = Struct.new(:type, :data, :id, :retry)

def broadcast(stream, data)
uri = URI.parse("http://localhost:8090/_broadcast")
header = {"Content-Type": "application/json"}
data = {stream:, data: data.to_json}
http = Net::HTTP.new(uri.host, uri.port)
request = Net::HTTP::Post.new(uri.request_uri, header)
request.body = data.to_json
response = http.request(request)

if response.code != "201"
fail "Broadcast returned unexpected status: #{response.code}"
end
end

def parse_sse_chunk(chunk)
event = Event.new
chunk.split("\n").each do |line|
field, value = line.split(":", 2).map(&:strip)

case field
when "data"
event.data = JSON.parse(value)
when "event"
event.type = value
when "id"
event.id = value
when "retry"
event.retry = value.to_i
end
end
event
end

def streaming_request(uri, headers: {})
begin
fiber = Fiber.new do
Net::HTTP.start(uri.host, uri.port, read_timeout: 2) do |http|
request = Net::HTTP::Get.new(uri)
headers.each do |key, value|
request[key] = value
end
catch :stop do
http.request(request) do |response|
response.read_body do |chunk|
chunk.split("\n\n").each do |raw_event|
event = parse_sse_chunk(raw_event)
# ignore pings
next if event.type == "ping"

cmd = Fiber.yield(event)
if cmd == :stop
throw :stop
end
end
end
end
end
end
end
yield fiber
rescue => e
fiber.resume(:stop)
raise
end
end

last_id = nil

streaming_request(URI(url)) do |stream|
first_event = stream.resume

if first_event.type != "welcome"
fail "Expected welcome, got: #{first_event}"
end

second_event = stream.resume

if second_event.type != "confirm_subscription"
fail "Expected confirm_subscription, got: #{second_event}"
end

# Broadcast a message
broadcast stream_name, {"text" => "Hello, stream!"}

broadcast_event = stream.resume

if broadcast_event.data.fetch("message") != {"text" => "Hello, stream!"}
fail "Expected broadcast data, got: #{broadcast_event.data}"
end

last_id = broadcast_event.id

# Stop first session
stream.resume(:stop)
end

# Broadcast another message
broadcast stream_name, {"text" => "Where are you, stream?"}

# Start new session with last ID
streaming_request(URI(url), headers: {"Last-Event-ID" => last_id}) do |stream|
fail "Expected welcome" unless stream.resume.type == "welcome"
fail "Expected confirmation" unless stream.resume.type == "confirm_subscription"

# And now we should receive the missed message
missed_message = stream.resume

if missed_message.data.fetch("message") != {"text" => "Where are you, stream?"}
fail "Expected missed message, got: #{missed_message.data}"
end
end
4 changes: 3 additions & 1 deletion sse/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const sseEncoderID = "sse"
// Tell the client to reconnect in a year in case we don't really want it to re-connect
const retryNoReconnect = 31536000000

const lastIdDelimeter = "/"

// Encoder is responsible for converting messages to SSE format (event:, data:, etc.)
// NOTE: It's only used to encode messages from server to client.
type Encoder struct {
Expand All @@ -39,7 +41,7 @@ func (Encoder) Encode(msg encoders.EncodedMessage) (*ws.SentFrame, error) {

if reply, ok := msg.(*common.Reply); ok {
if reply.Offset > 0 && reply.Epoch != "" && reply.StreamID != "" {
payload += "\nid: " + fmt.Sprintf("%d/%s/%s", reply.Offset, reply.Epoch, reply.StreamID)
payload += "\nid: " + fmt.Sprintf("%d%s%s%s%s", reply.Offset, lastIdDelimeter, reply.Epoch, lastIdDelimeter, reply.StreamID)
}
}

Expand Down
25 changes: 25 additions & 0 deletions sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"io"
"net/http"
"strconv"
"strings"

"github.com/anycable/anycable-go/common"
"github.com/anycable/anycable-go/node"
Expand Down Expand Up @@ -73,6 +75,29 @@ func subscribeCommandFromGetRequest(r *http.Request) (*common.Message, error) {

msg.Identifier = identifier

if lastId := r.Header.Get("last-event-id"); lastId != "" {
offsetParts := strings.SplitN(lastId, lastIdDelimeter, 3)

if len(offsetParts) == 3 {
offset, err := strconv.ParseUint(offsetParts[0], 10, 64)

if err != nil {
return nil, errorx.Decorate(err, "failed to parse last event id: %s", lastId)
}

epoch := offsetParts[1]
stream := offsetParts[2]

streams := make(map[string]common.HistoryPosition)

streams[stream] = common.HistoryPosition{Offset: offset, Epoch: epoch}

msg.History = common.HistoryRequest{
Streams: streams,
}
}
}

return msg, nil
}

Expand Down

0 comments on commit 9bf745e

Please sign in to comment.