diff --git a/serve-test/Project.toml b/serve-test/Project.toml new file mode 100644 index 00000000..e17812af --- /dev/null +++ b/serve-test/Project.toml @@ -0,0 +1,5 @@ +[deps] +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +Infiltrator = "5903a43b-9cc3-4c30-8d17-598619ec4e9b" +Oxygen = "df9a0d86-3283-4920-82dc-4555fc0d1d8b" +Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" diff --git a/serve-test/blocked-serve.jl b/serve-test/blocked-serve.jl new file mode 100644 index 00000000..b7dfc41e --- /dev/null +++ b/serve-test/blocked-serve.jl @@ -0,0 +1,46 @@ +using Base.Threads +using Infiltrator +using Revise +using Oxygen +using HTTP +ENV["JULIA_DEBUG"] = "Oxygen" + +# Run this with -t 1,1 + +println("$(nthreadpools()) threadpools. default=$(nthreads(:default)) interactive=$(nthreads(:interactive))") + +function block(n::Int) + start_time = time() + while (time() - start_time) < n + end + return "Blocking complete after $n seconds" +end + + +@get "/health" function() + @info "entered /health threadid=$(Threads.threadid())" + json(Dict(:status => "healthy")) +end + +@get "/block" function () + block_time = 10 + @info "blocking for $block_time seconds. threadid=$(Threads.threadid())" + # using block because sleep() would yield + block(block_time) + @info "done blocking threadid=$(Threads.threadid())" + text("done") +end + +@get "/throw" function() + throw( + ErrorException("error from route impl") + ) +end + +function start() + serveparallel( + is_prioritized = (req::HTTP.Request) -> req.target == "/health" + ) +end + +start() diff --git a/src/core.jl b/src/core.jl index 1cc0195d..df87650a 100644 --- a/src/core.jl +++ b/src/core.jl @@ -87,6 +87,7 @@ function serve(ctx::Context; docs_path = "/docs", schema_path = "/schema", external_url = nothing, + is_prioritized = nothing, kwargs...) :: Server # set the external url if it's passed @@ -121,7 +122,7 @@ function serve(ctx::Context; end # wrap top level handler with parallel handler - handle_stream = parallel_stream_handler(handle_stream) + handle_stream = parallel_stream_handler(handle_stream, is_prioritized) end # The cleanup of resources are put at the topmost level in `methods.jl` @@ -197,6 +198,8 @@ function stream_handler(middleware::Function) # extract the caller's ip address ip, _ = Sockets.getpeername(stream) # build up a streamhandler to handle our incoming requests + @debug "decorating with $ip" + handle_stream = HTTP.streamhandler(middleware |> decorate_request(ip, stream)) # handle the incoming request return handle_stream(stream) @@ -210,13 +213,29 @@ end This function uses `Threads.@spawn` to schedule a new task on any available thread. Inside this task, `@async` is used for cooperative multitasking, allowing the task to yield during I/O operations. """ -function parallel_stream_handler(handle_stream::Function) +function parallel_stream_handler(handle_stream::Function, is_prioritized::Function) function (stream::HTTP.Stream) - task = Threads.@spawn begin - handle = @async handle_stream(stream) - wait(handle) + # parse request before spawning handler + @debug "parallel_stream_handler threadid=$(Threads.threadid())" + + req::HTTP.Request = stream.message + + # if prioritized path + if (is_prioritized(req)) + @debug "prioritized request to $(req.target), running in Main interactive thread" + handle_stream(stream) + else + task = Threads.@spawn begin + @debug "within @spawn begin threadid=$(Threads.threadid())" + handle = @async handle_stream(stream) + @debug "wait on @async handle_stream threadid=$(Threads.threadid())" + wait(handle) + @debug "after wait threadid=$(Threads.threadid())" + end + @debug "root wait threadid=$(Threads.threadid())" + wait(task) + @debug "after root wait threadid=$(Threads.threadid())" end - wait(task) end end @@ -227,6 +246,7 @@ users to 'chain' middleware functions like `serve(handler1, handler2, handler3)` application and have them execute in the order they were passed (left to right) for each incoming request """ function setupmiddleware(ctx::Context; middleware::Vector=[], docs::Bool=true, metrics::Bool=true, serialize::Bool=true, catch_errors::Bool=true, show_errors=true)::Function + @debug "setupmiddleware" # determine if we have any special router or route-specific middleware custom_middleware = !isempty(ctx.service.custommiddleware) ? [compose(ctx.service.router, middleware, ctx.service.custommiddleware, ctx.service.middleware_cache)] : reverse(middleware) @@ -337,6 +357,7 @@ Create a default serializer function that handles HTTP requests and formats the function DefaultSerializer(catch_errors::Bool; show_errors::Bool) return function (handle) return function (req::HTTP.Request) + @debug "default serializer impl $(string(req.target)) tid=$(Threads.threadid())" return handlerequest(catch_errors; show_errors) do response = handle(req) format_response!(req, response)