Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: HTTP SenderPool with asyncio support #66

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft

Conversation

amunra
Copy link
Collaborator

@amunra amunra commented Mar 20, 2024

Overview

A new API to make it easier to work with the sender asynchronously with true parallelism.

from questdb.ingress.pool import SenderPool

with SenderPool('http::addr=localhost:9000;') as pool:
    # Buffers can be safely constructed independently,
    # also on separate threads.
    buf1 = pool.transaction('tbl1')
    buf1.row(...)

    buf2 = pool.transaction('tbl2')
    buf2.dataframe(...)

    # parallelism
    fut1 = buf1.commit()
    fut2 = buf2.commit()

    await fut1
    await fut2

Details

  • The buffer can only accumulate rows for a given table.
  • Each flush represents an atomic database transaction.
  • Flush operations can happen in parallel (network operations release the GIL).
  • The ownership of the buffer is "moved" to the pool.
  • By introducing parallelism alleviates the performance penalties of using ILP/HTTP: Network roundtrip times.

API downsides

  • This is a new "parallel" API for more advanced use cases. Creates an API split:
    • Server-style applications written in Python would use this new API.
    • Simpler "jupyter notebook" style stuff would continue using the existing API.
    • Both APIs would continue being supported (since this new one is just a wrapper around the other anyway).
  • Shoe-horning these features into the regular API would be a struggle.
  • This API drops auto-flushing completely, since auto-flushing
    creates silent network-blocking operations in the API.

Thread safety and Parallelism

  • Once a pool object is created, it can be shared between threads.
  • The pool.next_buffer() and pool.flush() methods are thread safe.
  • This allows for N:M concurrency
    • N buffer writer threads
    • M threads responsible for concurrently writing to the database (inside the pool).

Tasks

  • Review the API. Is this even a good idea?
  • Split out SenderPool into new questdb.ingress.pool module.
  • Improve test coverage, including TransactionalBuffer.dataframe(..).
  • Triple-check thread safety of pool.next_buffer() and pool.flush implementations.
  • Skip the indirection and call asyncio.wrap_future in def flush() directly (since that's how it's implemented anyway): https://github.com/python/cpython/blob/8ad88984200b2ccddc0a08229dd2f4c14d1a71fc/Lib/asyncio/base_events.py#L896 - this allows implementing .flush() in terms of .flush_to_future() and cut code duplication.

Closes #64

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

is there an async sender.flush()?
1 participant