Skip to content

Commit

Permalink
implement message queue for WebChannels (#703)
Browse files Browse the repository at this point in the history
* implement message queue for WebChannels

* Removed println debug

---------

Co-authored-by: Adrian Salceanu <[email protected]>
  • Loading branch information
hhaensel and essenciary authored Feb 7, 2024
1 parent 1e2f3cd commit 786326e
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions src/WebChannels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import Genie, Genie.Renderer

const ClientId = UInt # web socket hash
const ChannelName = String
const MESSAGE_QUEUE = Dict{UInt, Tuple{
Channel{Tuple{String, Channel{Nothing}}},
Task}
}()

struct ChannelNotFoundException <: Exception
name::ChannelName
Expand Down Expand Up @@ -105,10 +109,14 @@ end
Unsubscribes a web socket client `ws` from `channel`.
"""
function unsubscribe(ws::HTTP.WebSockets.WebSocket, channel::ChannelName) :: ChannelClientsCollection
haskey(CLIENTS, id(ws)) && deleteat!(CLIENTS[id(ws)].channels, CLIENTS[id(ws)].channels .== channel)
pop_subscription(id(ws), channel)
client = id(ws)

haskey(CLIENTS, client) && deleteat!(CLIENTS[client].channels, CLIENTS[client].channels .== channel)
pop_subscription(client, channel)
delete_queue!(MESSAGE_QUEUE, client)

@debug "Unsubscribed: $(id(ws)) ($(Dates.now()))"

@debug "Unsubscribed: $(client) ($(Dates.now()))"
CLIENTS
end
function unsubscribe(channel_client::ChannelClient, channel::ChannelName) :: ChannelClientsCollection
Expand Down Expand Up @@ -285,16 +293,52 @@ end
"""
Writes `msg` to web socket for `client`.
"""
function message(client::ClientId, msg::String)
ws = Genie.WebChannels.CLIENTS[client].client
# setup a reply channel
myfuture = Channel{Nothing}(1)

# retrieve the message queue or set it up if not present
q, _ = get!(MESSAGE_QUEUE, client) do
queue = Channel{Tuple{String, Channel{Nothing}}}(10)
handler = @async while true
message, future = take!(queue)
try
Sockets.send(ws, message)
finally
put!(future, nothing)
end
end
queue, handler
end

put!(q, (msg, myfuture))

take!(myfuture) # Wait until the message is processed
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
end
function message(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
message(id(ws), msg)
end

function message_unsafe(ws::HTTP.WebSockets.WebSocket, msg::String) :: Int
Sockets.send(ws, msg)
end
function message(client::ClientId, msg::String) :: Int
message(CLIENTS[client].client, msg)
function message_unsafe(client::ClientId, msg::String) :: Int
message_unsafe(CLIENTS[client].client, msg)
end
function message(client::ChannelClient, msg::String) :: Int
message(client.client, msg)
function message_unsafe(client::ChannelClient, msg::String) :: Int
message_unsafe(client.client, msg)
end

function delete_queue!(d::Dict, client::UInt)
queue, handler = pop!(MESSAGE_QUEUE, client, (nothing, nothing))
if queue !== nothing
@async Base.throwto(handler, InterruptException())
end
end

"""
Encodes `msg` in Base64 and tags it with `Genie.config.webchannels_base64_marker`.
Expand Down

2 comments on commit 786326e

@essenciary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/100389

Tip: Release Notes

Did you know you can add release notes too? Just add markdown formatted text underneath the comment after the text
"Release notes:" and it will be added to the registry PR, and if TagBot is installed it will also be added to the
release that TagBot creates. i.e.

@JuliaRegistrator register

Release notes:

## Breaking changes

- blah

To add them here just re-invoke and the PR will be updated.

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v5.24.2 -m "<description of version>" 786326e597c3dd104b70fab1fb272c334dfd66d5
git push origin v5.24.2

Please sign in to comment.