Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prioritized route proof of concept #224

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions serve-test/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[deps]
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
Infiltrator = "5903a43b-9cc3-4c30-8d17-598619ec4e9b"
Oxygen = "df9a0d86-3283-4920-82dc-4555fc0d1d8b"
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
46 changes: 46 additions & 0 deletions serve-test/blocked-serve.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using Base.Threads
using Infiltrator
using Revise
using Oxygen
using HTTP
ENV["JULIA_DEBUG"] = "Oxygen"

# Run this with -t 1,1

println("$(nthreadpools()) threadpools. default=$(nthreads(:default)) interactive=$(nthreads(:interactive))")

function block(n::Int)
start_time = time()
while (time() - start_time) < n
end
return "Blocking complete after $n seconds"
end


@get "/health" function()
@info "entered /health threadid=$(Threads.threadid())"
json(Dict(:status => "healthy"))
end

@get "/block" function ()
block_time = 10
@info "blocking for $block_time seconds. threadid=$(Threads.threadid())"
# using block because sleep() would yield
block(block_time)
@info "done blocking threadid=$(Threads.threadid())"
text("done")
end

@get "/throw" function()
throw(
ErrorException("error from route impl")
)
end

function start()
serveparallel(
is_prioritized = (req::HTTP.Request) -> req.target == "/health"
)
end

start()
33 changes: 27 additions & 6 deletions src/core.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ function serve(ctx::Context;
docs_path = "/docs",
schema_path = "/schema",
external_url = nothing,
is_prioritized = nothing,
kwargs...) :: Server

# set the external url if it's passed
Expand Down Expand Up @@ -121,7 +122,7 @@ function serve(ctx::Context;
end

# wrap top level handler with parallel handler
handle_stream = parallel_stream_handler(handle_stream)
handle_stream = parallel_stream_handler(handle_stream, is_prioritized)
end

# The cleanup of resources are put at the topmost level in `methods.jl`
Expand Down Expand Up @@ -197,6 +198,8 @@ function stream_handler(middleware::Function)
# extract the caller's ip address
ip, _ = Sockets.getpeername(stream)
# build up a streamhandler to handle our incoming requests
@debug "decorating with $ip"

handle_stream = HTTP.streamhandler(middleware |> decorate_request(ip, stream))
# handle the incoming request
return handle_stream(stream)
Expand All @@ -210,13 +213,29 @@ end
This function uses `Threads.@spawn` to schedule a new task on any available thread.
Inside this task, `@async` is used for cooperative multitasking, allowing the task to yield during I/O operations.
"""
function parallel_stream_handler(handle_stream::Function)
function parallel_stream_handler(handle_stream::Function, is_prioritized::Function)
function (stream::HTTP.Stream)
task = Threads.@spawn begin
handle = @async handle_stream(stream)
wait(handle)
# parse request before spawning handler
@debug "parallel_stream_handler threadid=$(Threads.threadid())"

req::HTTP.Request = stream.message

# if prioritized path
if (is_prioritized(req))
@debug "prioritized request to $(req.target), running in Main interactive thread"
handle_stream(stream)
else
task = Threads.@spawn begin
@debug "within @spawn begin threadid=$(Threads.threadid())"
handle = @async handle_stream(stream)
@debug "wait on @async handle_stream threadid=$(Threads.threadid())"
wait(handle)
@debug "after wait threadid=$(Threads.threadid())"
end
@debug "root wait threadid=$(Threads.threadid())"
wait(task)
@debug "after root wait threadid=$(Threads.threadid())"
end
wait(task)
end
end

Expand All @@ -227,6 +246,7 @@ users to 'chain' middleware functions like `serve(handler1, handler2, handler3)`
application and have them execute in the order they were passed (left to right) for each incoming request
"""
function setupmiddleware(ctx::Context; middleware::Vector=[], docs::Bool=true, metrics::Bool=true, serialize::Bool=true, catch_errors::Bool=true, show_errors=true)::Function
@debug "setupmiddleware"

# determine if we have any special router or route-specific middleware
custom_middleware = !isempty(ctx.service.custommiddleware) ? [compose(ctx.service.router, middleware, ctx.service.custommiddleware, ctx.service.middleware_cache)] : reverse(middleware)
Expand Down Expand Up @@ -337,6 +357,7 @@ Create a default serializer function that handles HTTP requests and formats the
function DefaultSerializer(catch_errors::Bool; show_errors::Bool)
return function (handle)
return function (req::HTTP.Request)
@debug "default serializer impl $(string(req.target)) tid=$(Threads.threadid())"
return handlerequest(catch_errors; show_errors) do
response = handle(req)
format_response!(req, response)
Expand Down