-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] maxTriggerDelay feature in Pulsar-Spark Connector #117
Labels
type/feature
Indicates new functionality
Comments
@syhily please have a look. |
@keenborder786 This config request translates into implementing the We will work on that during Q2 |
@nlu90 Should I close the issue then? |
@keenborder786 we can leave it open and close it when the implementation is done |
okay great. |
@nlu90 has there been any update regarding this, since Q2 has elapsed? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is your feature request related to a problem? Please describe.
I am consuming data from Pulsar through Spark Structure Streaming in micro-batches.
Right now, what happens is that spark consumes messages as soon as they arrive in the Pulsar Broker queue i.e a micro-batch gets executed as soon as the messages arrive in the pulsar queue.
Describe the solution you'd like
However, I want to trigger a single micro-batch only if a certain number of messages have arrived in the queue or if a specific time period has passed. This is possible in the Spark-Kafka connector with the following configurations
Basically, it will be perfect if we can have the above config in pulsar-spark connector.
Describe alternatives you've considered
I know that there is an option
pulsar.reader.receiverQueueSize
which we can pass as follows :However, this only resolves one side of the problem i.e configuring the maximum size of the message queue. Even, if we set this, currently what happens is that the connector triggers the micro-batch as soon as new messages arrive. Ideally, what should happen is that the micro-batch execution should wait until 'x' unit of time provided the receiverQueueSize threshold has not been reached.
Further, I also know that this is possible in Java Client for Pulsar on top of which Pulsar-Spark connector is written through Batch Receiving Policy
Additional context
I am using the following environment:
The text was updated successfully, but these errors were encountered: