Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming IAsyncEnumerable<FluxRecord> and IAsyncEnumerable<string> #452

Open
adambajguz opened this issue Jan 11, 2023 · 1 comment
Open
Labels
enhancement New feature or request

Comments

@adambajguz
Copy link

Proposal/Current behavior:
Current IQueryApi doesn't allow for streaming raw results, i.e., results of type FluxRecord or string.

Desired behavior:

IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null, CancellationToken cancellationToken = default(CancellationToken));

IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null, CancellationToken cancellationToken = default(CancellationToken));

When T is string or FluxRecord don't map the result to POCO.

Alternatives considered:
Add new methods that return IAsyncEnumerable<string> and IAsyncEnumerable<FluxRecord>.

Use case:
Current methods with delegates are not flexible enought (e.g. callbacks cannot use async code). Also its not always desired to map to POCO from performance reasons.

@powersj powersj added the enhancement New feature or request label Jan 25, 2023
@Kratheon
Copy link

Kratheon commented Sep 3, 2024

Hi,

You can try to create your own extension method using as a workaround Channels for buffers.
I was inspired by reading this stackOverflow ticket.

public static class QueryApiExtensions
{
    public static IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(this QueryApi queryApi,
        Query query, string? org = null, CancellationToken cancellationToken = default)
    {
       
       //Set up these options to be sure to not keep all previous data in the Channel. This might takes a lot of RAM.
       //The channel will only one record at a time.
       //The channel will recieve the next FluxRecord when a reader has read the previous one (and only one). 
        var options = new BoundedChannelOptions(1)
        {
            SingleWriter = true,
            SingleReader = true,
            FullMode = BoundedChannelFullMode.Wait
        };

        var buffer = Channel.CreateBounded<FluxRecord>(options);

        queryApi.QueryAsync(query,
            onNext: async (record) => await buffer.Writer.WriteAsync(record),
            onComplete: () => buffer.Writer.Complete(),
            onError: (e) => buffer.Writer.Complete(e),
            org: org,
            cancellationToken: cancellationToken);

        return buffer.Reader.ReadAllAsync(cancellationToken);
    }
}

Of course the best long-term solution will be to create a new pull request and create another QueryAsyncEnumerable function that takes not generic type, will not convert the record and send it back instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants