Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invoke NettyBody callback in a separate fiber to avoid deadlocks #2402

Conversation

myazinn
Copy link
Contributor

@myazinn myazinn commented Aug 25, 2023

When reading a request from a super-fast producer, it is possible that zio.http.netty.AsyncBodyReader accumulates more data in buffer than can fit into Stream buffer in zio.http.netty.NettyBody.AsyncBody#asStream. In that case, the request will hang because we can't consume messages until callback pushes all the data, which it can't do until we start consuming messages.
Minimized example of an issue can be found here
https://scastie.scala-lang.org/sZ8DrRKxR2qAKxrAOV52hg
For a reproduction step on a full server, you can use steps from this ticket
#2297
Sometimes server is able to read data from yes hello | http --chunked POST localhost:8080/forever, sometimes it's just silent.
I did try to write a test for, but couldn't find a way to reproduce it in tests. So I'd really appreciate if someone gave me a hint on how to do it. Also, callbacks sometimes blow my mind, so I hope nothing goes wrong if we move callback invocation to a separate fiber 🤔

@CLAassistant
Copy link

CLAassistant commented Aug 25, 2023

CLA assistant check
All committers have signed the CLA.

@myazinn myazinn force-pushed the invoke-callback-in-a-separate-fiber-to-avoid-deadlocks branch from 7768ff7 to c99c6b8 Compare August 25, 2023 20:15
@codecov-commenter
Copy link

codecov-commenter commented Aug 25, 2023

Codecov Report

Patch coverage: 100.00% and project coverage change: -0.09% ⚠️

Comparison is base (ca6513f) 63.41% compared to head (c99c6b8) 63.33%.

❗ Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2402      +/-   ##
==========================================
- Coverage   63.41%   63.33%   -0.09%     
==========================================
  Files         137      137              
  Lines        7113     7113              
  Branches     1259     1259              
==========================================
- Hits         4511     4505       -6     
- Misses       2602     2608       +6     
Files Changed Coverage Δ
...http/src/main/scala/zio/http/netty/NettyBody.scala 70.83% <100.00%> (ø)

... and 1 file with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@lackhoa
Copy link
Contributor

lackhoa commented Sep 23, 2023

@myazinn Does this fix #2297 ?

@myazinn
Copy link
Contributor Author

myazinn commented Sep 24, 2023

@lackhoa nope 🙁

@jdegoes
Copy link
Member

jdegoes commented Sep 24, 2023

Rather than doing ZIO stuff inside a separate fiber, we should consider another mechanism that could be higher-performance. Will take a look at the code and suggest something.

@myazinn myazinn force-pushed the invoke-callback-in-a-separate-fiber-to-avoid-deadlocks branch from c99c6b8 to ffca507 Compare September 24, 2023 22:13
@myazinn
Copy link
Contributor Author

myazinn commented Sep 24, 2023

@jdegoes another easy option would be to break abstraction a little bit and handle it in zio.http.netty.AsyncBodyReader#connect. Instead of submitting chunks one by one

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

We can merge these chunks in place and submit one big chunk

          val (buffered, isLast) =
            buffer
              .result()
              .foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) =>
                (acc ++ chunk, isLast)
              }
          callback(buffered, isLast)

I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR.
Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace

private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]()

...

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

with

  private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]()
  private var isLast: Boolean            = false
  
  ...
           callback(buffer.result(), isLast)

What do you think?

@myazinn
Copy link
Contributor Author

myazinn commented Sep 24, 2023

Looks like main branch doesn't compile now 🤔

@vigoo
Copy link
Contributor

vigoo commented Sep 25, 2023

@jdegoes another easy option would be to break abstraction a little bit and handle it in zio.http.netty.AsyncBodyReader#connect. Instead of submitting chunks one by one

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

We can merge these chunks in place and submit one big chunk

          val (buffered, isLast) =
            buffer
              .result()
              .foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) =>
                (acc ++ chunk, isLast)
              }
          callback(buffered, isLast)

I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR. Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace

private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]()

...

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

with

  private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]()
  private var isLast: Boolean            = false
  
  ...
           callback(buffer.result(), isLast)

What do you think?

The purpose of the async body reader is to be able to process the body in a streaming way - when you don't want to buffer it in memory. So changing that would basically disable the streaming feature.

@myazinn
Copy link
Contributor Author

myazinn commented Sep 25, 2023

@vigoo not sure how it disables streaming feature. This buffer is used only until we connect some consumer to a channel. Once consumer is connected, this buffer will not be used and chunks received from netty will be passed directly to it.
The only thing that changes is what happens when a consumer connects to a body reader, that has already buffered a lot of data. Currently it'll feed chunk of chunks to a consumer, while I propose to flatten them and pass as one large chunk

@vigoo
Copy link
Contributor

vigoo commented Sep 25, 2023

Ah ok sorry, misunderstood.

@jdegoes
Copy link
Member

jdegoes commented Sep 28, 2023

@myazinn Let's do this in the most performant way possible, please! 🙏

@jdegoes
Copy link
Member

jdegoes commented Sep 28, 2023

@myazinn Master is fixed, btw. 👍

@987Nabil
Copy link
Contributor

987Nabil commented Mar 6, 2024

@myazinn could you update your branch?

@jdegoes
Copy link
Member

jdegoes commented Aug 27, 2024

@kyri-petrou Do you have any thoughts on how to do this better?

@kyri-petrou
Copy link
Collaborator

kyri-petrou commented Aug 28, 2024

@kyri-petrou Do you have any thoughts on how to do this better?

@jdegoes I've given this some thought; I think the main issue here is that we're using a bounded queue in ZStream.async. To be honest I don't think using a bounded queue makes much sense because in the case where we have an ultra-fast producer and a slow consumer, the messages will still end up piling in Netty's queues, so backpressure doesn't really help with anything (unless we were to limit netty's queue sizes, which we're not doing).

AFAICT there are 2 possible solutions:

  1. Limit Netty's inbound queue size to the same size as the ZStream buffer size. Frankly, I'm not even sure how would this behave in cases the queue becomes full.
  2. Use an unbounded queue as the source of the ZStream. I think this makes the most sense in this situation. In case we do want to add backpressure at some point, it can be done on Netty's side

Let me know if you're happy with (2) and I'll start working on it; should be relatively easy to implement

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants