From 9bf745e4d4be33b176768155c16a62d60ae4fd35 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Wed, 6 Sep 2023 20:59:17 -0700 Subject: [PATCH] feat(sse): support last-event-id --- features/sse.testfile | 131 ++++++++++++++++++++++++++++++++++++++++++ sse/encoder.go | 4 +- sse/sse.go | 25 ++++++++ 3 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 features/sse.testfile diff --git a/features/sse.testfile b/features/sse.testfile new file mode 100644 index 00000000..ae6752c5 --- /dev/null +++ b/features/sse.testfile @@ -0,0 +1,131 @@ +launch :anycable, + "./dist/anycable-go --sse --turbo_rails_cleartext --jwt_id_key=qwerty --broadcast_adapter=http --presets=broker" + +wait_tcp 8080 + +payload = {ext: {}.to_json, exp: (Time.now.to_i + 60)} + +token = ::JWT.encode(payload, "qwerty", "HS256") +stream_name = "chat/2023" + +require "uri" +require "net/http" +require "fiber" + +identifier = URI.encode_www_form_component({channel: "Turbo::StreamsChannel", signed_stream_name: stream_name}.to_json) + +url = "http://localhost:8080/events?jid=#{token}&identifier=#{identifier}" + +Event = Struct.new(:type, :data, :id, :retry) + +def broadcast(stream, data) + uri = URI.parse("http://localhost:8090/_broadcast") + header = {"Content-Type": "application/json"} + data = {stream:, data: data.to_json} + http = Net::HTTP.new(uri.host, uri.port) + request = Net::HTTP::Post.new(uri.request_uri, header) + request.body = data.to_json + response = http.request(request) + + if response.code != "201" + fail "Broadcast returned unexpected status: #{response.code}" + end +end + +def parse_sse_chunk(chunk) + event = Event.new + chunk.split("\n").each do |line| + field, value = line.split(":", 2).map(&:strip) + + case field + when "data" + event.data = JSON.parse(value) + when "event" + event.type = value + when "id" + event.id = value + when "retry" + event.retry = value.to_i + end + end + event +end + +def streaming_request(uri, headers: {}) + begin + fiber = Fiber.new do + Net::HTTP.start(uri.host, uri.port, read_timeout: 2) do |http| + request = Net::HTTP::Get.new(uri) + headers.each do |key, value| + request[key] = value + end + catch :stop do + http.request(request) do |response| + response.read_body do |chunk| + chunk.split("\n\n").each do |raw_event| + event = parse_sse_chunk(raw_event) + # ignore pings + next if event.type == "ping" + + cmd = Fiber.yield(event) + if cmd == :stop + throw :stop + end + end + end + end + end + end + end + yield fiber + rescue => e + fiber.resume(:stop) + raise + end +end + +last_id = nil + +streaming_request(URI(url)) do |stream| + first_event = stream.resume + + if first_event.type != "welcome" + fail "Expected welcome, got: #{first_event}" + end + + second_event = stream.resume + + if second_event.type != "confirm_subscription" + fail "Expected confirm_subscription, got: #{second_event}" + end + + # Broadcast a message + broadcast stream_name, {"text" => "Hello, stream!"} + + broadcast_event = stream.resume + + if broadcast_event.data.fetch("message") != {"text" => "Hello, stream!"} + fail "Expected broadcast data, got: #{broadcast_event.data}" + end + + last_id = broadcast_event.id + + # Stop first session + stream.resume(:stop) +end + +# Broadcast another message +broadcast stream_name, {"text" => "Where are you, stream?"} + +# Start new session with last ID +streaming_request(URI(url), headers: {"Last-Event-ID" => last_id}) do |stream| + fail "Expected welcome" unless stream.resume.type == "welcome" + fail "Expected confirmation" unless stream.resume.type == "confirm_subscription" + + # And now we should receive the missed message + missed_message = stream.resume + + if missed_message.data.fetch("message") != {"text" => "Where are you, stream?"} + fail "Expected missed message, got: #{missed_message.data}" + end +end diff --git a/sse/encoder.go b/sse/encoder.go index 9181ff1f..b8f8a459 100644 --- a/sse/encoder.go +++ b/sse/encoder.go @@ -15,6 +15,8 @@ const sseEncoderID = "sse" // Tell the client to reconnect in a year in case we don't really want it to re-connect const retryNoReconnect = 31536000000 +const lastIdDelimeter = "/" + // Encoder is responsible for converting messages to SSE format (event:, data:, etc.) // NOTE: It's only used to encode messages from server to client. type Encoder struct { @@ -39,7 +41,7 @@ func (Encoder) Encode(msg encoders.EncodedMessage) (*ws.SentFrame, error) { if reply, ok := msg.(*common.Reply); ok { if reply.Offset > 0 && reply.Epoch != "" && reply.StreamID != "" { - payload += "\nid: " + fmt.Sprintf("%d/%s/%s", reply.Offset, reply.Epoch, reply.StreamID) + payload += "\nid: " + fmt.Sprintf("%d%s%s%s%s", reply.Offset, lastIdDelimeter, reply.Epoch, lastIdDelimeter, reply.StreamID) } } diff --git a/sse/sse.go b/sse/sse.go index 52ae4c52..b64508c2 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -6,6 +6,8 @@ import ( "errors" "io" "net/http" + "strconv" + "strings" "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/node" @@ -73,6 +75,29 @@ func subscribeCommandFromGetRequest(r *http.Request) (*common.Message, error) { msg.Identifier = identifier + if lastId := r.Header.Get("last-event-id"); lastId != "" { + offsetParts := strings.SplitN(lastId, lastIdDelimeter, 3) + + if len(offsetParts) == 3 { + offset, err := strconv.ParseUint(offsetParts[0], 10, 64) + + if err != nil { + return nil, errorx.Decorate(err, "failed to parse last event id: %s", lastId) + } + + epoch := offsetParts[1] + stream := offsetParts[2] + + streams := make(map[string]common.HistoryPosition) + + streams[stream] = common.HistoryPosition{Offset: offset, Epoch: epoch} + + msg.History = common.HistoryRequest{ + Streams: streams, + } + } + } + return msg, nil }