Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka trigger timeout #93

Open
nikolicdragoslav opened this issue Sep 18, 2024 · 32 comments
Open

Kafka trigger timeout #93

nikolicdragoslav opened this issue Sep 18, 2024 · 32 comments
Labels
area/plugin Plugin-related issue or feature request bug Something isn't working

Comments

@nikolicdragoslav
Copy link

Describe the issue

Hi,

I am having a weird issue when using Kafka realtime plugin as a trigger to my workflow.

This is my trigger:

triggers:
  - id: kafkaTrigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    keyDeserializer: JSON
    groupId: "kestraConsumer"
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: "{{envs.kafka_bootstrap_servers}}"
      sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
      sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.mechanism: AWS_MSK_IAM
      security.protocol: SASL_SSL
    topic: test-topic
    valueDeserializer: JSON

Trigger works great for couple of days, but at one point it stops pooling on the Kafka topic. The messages are there and can be read by other consumers, it just seems that the Kestra somehow stops consuming the messages and in return does not kickoff any execution.

Workaround at the moment is editing the flow code by simply adding or removing a blank line and saving it. When saved, all missed messages are being read at the same time and it starts bunch of executions at the same time.

As of now, there doesn't seem to be any log that shows an error, both in Kafka and in Kestra, it just seems that the trigger is disabled, but it is in fact enabled.

Is there some setting in Kafka or in Kestra that causes this timeout and needs to be altered? If not, is this simply a bug in the plugin?

Best,
Dragoslav

Environment

  • Kestra Version: 0.18.2
  • Operating System (OS/Docker/Kubernetes): Kubernetes
@nikolicdragoslav nikolicdragoslav added the bug Something isn't working label Sep 18, 2024
@github-project-automation github-project-automation bot moved this to Backlog in Issues Sep 18, 2024
@tchiotludo
Copy link
Member

Can you provide the log from scheduler please?

@Ben8t Ben8t added the area/plugin Plugin-related issue or feature request label Sep 18, 2024
@Ben8t Ben8t transferred this issue from kestra-io/kestra Sep 18, 2024
@nikolicdragoslav
Copy link
Author

Sure, I will provide a log when I encounter the issue, which should be in the next couple of days

@nikolicdragoslav
Copy link
Author

Hi @tchiotludo,

The issue happened again, here is the scheduler log:

2024-09-23 11:43:52,827 INFO  main         org.flywaydb.core.FlywayExecutor Database: jdbc:postgresql://url:5432/postgres (PostgreSQL 14.10)
2024-09-23 11:43:53,252 INFO  main         o.f.core.internal.command.DbValidate Successfully validated 20 migrations (execution time 00:00.172s)
2024-09-23 11:43:53,394 INFO  main         o.f.core.internal.command.DbMigrate Current version of schema "public": 1.21
2024-09-23 11:43:53,442 INFO  main         o.f.core.internal.command.DbMigrate Schema "public" is up to date. No migration necessary.
2024-09-23 11:44:04,945 INFO  scheduler    io.kestra.cli.AbstractCommand Starting Kestra with environments [k8s, cloud, cli]
2024-09-23 11:44:05,881 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 78 core plugins (scan done in 795ms)
2024-09-23 11:44:14,092 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 466 plugins from 95 groups (scan done in 8150ms)
2024-09-23 11:44:16,896 INFO  scheduler    io.kestra.cli.AbstractCommand Server Running: http://kestra-scheduler-56b9994b77-gm2xk:8080, Management server on port http://kestra-scheduler-56b9994b77-gm2xk:8081/health
2024-09-23 11:44:34,638 INFO  scheduler    i.k.c.c.servers.SchedulerCommand Scheduler started

@nikolicdragoslav
Copy link
Author

I also took a scheduler log print from when it was working and it looks like:

Autoscroll:On      FullScreen:On      Timestamps:Off     Wrap:On
2024-09-12 09:59:37,766 INFO  main         org.flywaydb.core.FlywayExecutor Database: jdbc:postgresql://url:5432/postgres (PostgreSQL 14.10)
2024-09-12 09:59:38,478 INFO  main         o.f.core.internal.command.DbValidate Successfully validated 20 migrations (execution time 00:00.562s)
2024-09-12 09:59:38,508 INFO  main         o.f.core.internal.command.DbMigrate Current version of schema "public": 1.21
2024-09-12 09:59:38,639 INFO  main         o.f.core.internal.command.DbMigrate Schema "public" is up to date. No migration necessary.
2024-09-12 09:59:50,018 INFO  scheduler    io.kestra.cli.AbstractCommand Starting Kestra with environments [k8s, cloud, cli]
2024-09-12 09:59:51,197 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 78 core plugins (scan done in 997ms)
2024-09-12 09:59:59,217 INFO  scheduler    i.kestra.core.plugins.PluginScanner Registered 466 plugins from 95 groups (scan done in 8007ms)
2024-09-12 10:00:01,959 INFO  scheduler    io.kestra.cli.AbstractCommand Server Running: http://kestra-scheduler-8555dd45d9-9x7xb:8080, Management server on port http://kestra-scheduler-8555dd45d9-9x7xb:8081/health
2024-09-12 10:00:16,224 INFO  scheduler    i.k.c.c.servers.SchedulerCommand Scheduler started
2024-09-13 00:00:02,166 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 4kqbBIIdenwKP3rM5RpLhZ at '2024-09-13T00:00Z' started at '2024-09-13T00:00:02Z[Etc/UTC]'
2024-09-14 00:00:01,954 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 7OJ9pJQIYOpCIil9yKzFK1 at '2024-09-14T00:00Z' started at '2024-09-14T00:00:01Z[Etc/UTC]'
2024-09-15 00:00:01,935 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 77lD4hRQ8C8R15pwYNvflu at '2024-09-15T00:00Z' started at '2024-09-15T00:00:01Z[Etc/UTC]'
2024-09-16 00:00:01,935 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution K3CYv9o9FHWFnE3sX4h9O at '2024-09-16T00:00Z' started at '2024-09-16T00:00:01Z[Etc/UTC]'
2024-09-17 00:00:01,928 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 1cFqwzYMtJisnPC3hvgRH at '2024-09-17T00:00Z' started at '2024-09-17T00:00:01Z[Etc/UTC]'
2024-09-18 00:00:01,923 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 5CYcYKIcFbefmK1TbtZZqz at '2024-09-18T00:00Z' started at '2024-09-18T00:00:01Z[Etc/UTC]'
2024-09-18 00:00:01,955 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespace_sst] [trigger: daily_trigger] Scheduled execution 7N0rCt8emqKTfWZdIr7OPn at '2024-09-18T00:00Z' started at '2024-09-18T00:00:01Z[Etc/UTC]'
2024-09-19 00:00:01,924 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespaces] [trigger: daily_trigger] Scheduled execution 5ULFFwfGuHVIlKkue592J3 at '2024-09-19T00:00Z' started at '2024-09-19T00:00:01Z[Etc/UTC]'
2024-09-19 00:00:01,979 INFO  pool-4-thread-1 i.k.c.schedulers.AbstractScheduler [namespace: system] [flow: sync_namespace_sst] [trigger: daily_trigger] Scheduled execution 2kEo25r8LGsMrh9SEl1DWd at '2024-09-19T00:00Z' started at '2024-09-19T00:00:01Z[Etc/UTC]'

@tchiotludo
Copy link
Member

you have the log from the executor please?
The one from scheduler didn't give any clue 😢

@nikolicdragoslav
Copy link
Author

executor.txt
@tchiotludo here is the executor log, seems like the triggers never switched on

@nikolicdragoslav
Copy link
Author

Maybe a side note - seems that when the Scheduler Kubernetes pod is recreated, it does not pick up the processes and does not start the triggers to pool from Kafka, then when flows are manually edited it starts them again

@tchiotludo
Copy link
Member

ok so now, I will need the worker log 😅
I was wondering if there is any error that blocked the scheduler

@nikolicdragoslav
Copy link
Author

nikolicdragoslav commented Sep 23, 2024

@tchiotludo We have three workers and the logs are quite extensive, they also contain some sensitive information. What should I be looking for?

@tchiotludo
Copy link
Member

Since it's a bug, I don't know what could be the main issues.
Probably searching for stacktrace if you don't have any others choice

@nikolicdragoslav
Copy link
Author

Hi @tchiotludo,

I found lines like:

kestra-worker-docker-dind
time="2024-09-24T08:15:28.897147549Z" level=error msg="loading cgroup for 1606" error="cgroups: cgroup deleted"
kestra-worker-docker-dind
time="2024-09-24T08:15:28.936165515Z" level=error msg="loading cgroup for 1606" error="cgroups: cgroup deleted"

as a result I have a an empty execution with 0s of runtime and no trigger details.

In the worker logs I see also:

kestra-worker
2024-09-24 08:08:44,809 �[1;31mERROR�[0;39m �[35mworker_318 �[0;39m �[36mf.cdmOrchestratorKafka.kafkaTrigger�[0;39m [namespace: orchestrators] [flow: cdmOrchestratorKafka] [execution: 6mQ4zhZ8Itsv2r2eya78ol] [date: 2024-09-24T00:00:43.448Z] Realtime trigger failed to be created in the worker with error: unknown

now these log prints happen when I save the flow with an added blank line, after that everything works properly.

Other messages when the executions were halted yield no error or warning or in fact any log for the affected flows..

To me it looks like the Realtime triggers are not restarted after pod failure, but I have one more idea that can potentially affect this. We are also using io.kestra.plugin.git.SyncFlows that is scheduled every day at midnight and copies flow files from git to Kestra. Maybe that would potentially play with the Realtime triggers and cause some mismatch somewhere. I noticed the 0s executions happening after the sync as well.

Looking forward to hearing from you, let me know if you need any additional information.

Thanks

@nikolicdragoslav
Copy link
Author

Hi @tchiotludo,

any ideas how to proceed?

Best,
Dragoslav

@romangoldberg
Copy link

Hi @tchiotludo , we are planning to go in production soon, could you please assist us with this bug?

@tchiotludo
Copy link
Member

as I see, we need to have a full stack trace to understand, unknown exception could not help, this one should be on the flows > logs page.

@nikolicdragoslav
Copy link
Author

Hi @tchiotludo ,

as I explained earlier, there is no error or stack trace anywhere or any kind of log, not even info log, that is related to the affected flows.

It seems that the Kafka realtime trigger gets disabled at some point and Kestra is not pooling on the new messages. It looks like some kind of timeout somewhere that is affecting Kestra as a consumer.

@tchiotludo
Copy link
Member

Realtime trigger failed to be created in the worker with error: unknown

you are here: https://github.com/kestra-io/kestra/blob/7b73eed06830d7d13cc7e6c6ca57d9fe8eee2369/core/src/main/java/io/kestra/core/runners/Worker.java#L419-L430

You could increase the log level to capture it as trace

@nikolicdragoslav
Copy link
Author

@tchiotludo I will try it and get back to you, thanks

@Ben8t Ben8t added the kind/pending-feedback Idea waiting for user feedback label Nov 13, 2024
@anna-geller
Copy link
Member

@nikolicdragoslav does the issue persist?

@cedrelek
Copy link

cedrelek commented Nov 28, 2024

@nikolicdragoslav
Here is a hint (it needs more investigations to be a solution) : probably, have a look on the kafka property : max.poll.interval.ms which have a sometimes a Int.MAX value by default. So try to set it to 5 minutes in the kafka trigger "properties".
As it is used to considering a Consumer out of service client side (unlike session.timeout.ms in server-side), your kafka consumer pool might take ages (Int.MAX millisecs) before considering your Client consumer is out of service, so before "the pool" initiates a new Consumer. The question is, if my intuition is true, why does a git sync seems to affect this (as it seems to be the case too in my kestra) ?

@nikolicdragoslav
Copy link
Author

@cedrelek @anna-geller

Hi guys, sorry for the late reply.

We ended up disabling the sync workflow and the issue was not showing at first, but recently it is back again..

Here are some points:

  1. Kestra totally stops consuming the messages from Kafka meanwhile the trigger is not erroring out.
  2. When the flow is altered with a blank line and saved again first thing that shows is an execution of the workflow with 0s (in my opinion it was the bugged execution, not exactly sure why)
  3. Secondly consuming resumes from the same point where it stopped and it triggers bunch of executions at the same time (one execution for each missed message)
  4. it works again for sometime but it randomly stops polling

I can try to play with max.poll.interval.ms and see what that gives back, but since this is an ongoing issue for quite sometime and it is preventing us to go to production, would it be possible to have a huddle on slack and try to debug the issue?

Thanks,
Dragoslav

@anna-geller anna-geller removed the kind/pending-feedback Idea waiting for user feedback label Dec 10, 2024
@anna-geller
Copy link
Member

@nikolicdragoslav can you share the flow, your kestra version and any extra details that could help us reproduce? thx a lot!

@nikolicdragoslav
Copy link
Author

@anna-geller of course, here is one of the workflows for which Kestra stops polling from Kafka topic.

id: OrchestratorKafka
namespace: orchestrators

triggers:
  - id: kafkaTrigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    keyDeserializer: JSON
    groupId: "kestraConsumer"
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: "{{envs.kafka_bootstrap_servers}}"
      sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
      sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required;
      sasl.mechanism: AWS_MSK_IAM
      security.protocol: SASL_SSL 
      max.request.size: "2097152"
    topic: test-topic
    valueDeserializer: JSON

tasks:
  - id: printEvent
    type: io.kestra.plugin.core.log.Log
    message: "{{trigger.value}}" 
 
  - id: get_event_type
    type: io.kestra.plugin.transform.jsonata.TransformValue
    from: "{{ trigger.value }}"
    expression: |
        {
            "event_type": '' & event_type
        }.event_type

  - id: check_action_type
    type: io.kestra.plugin.core.flow.If
    condition: "{{ outputs.get_event_type.value == 'delete'}}"
    then:
      - id: parallelDelete
        type: io.kestra.plugin.core.flow.Parallel
        tasks:
        - id: startStatusCheckerDelete
          type: io.kestra.plugin.core.flow.Subflow
          namespace: utility
          flowId: statusCheckerPy
          inputs:
            execution_id: "{{execution.id}}"
            payload: "{{trigger.value}}"
          wait: false
          transmitFailed: true
        - id: startDelete
          type: io.kestra.plugin.core.flow.Subflow
          namespace: delete
          flowId: Delete
          inputs:
            payload: "{{trigger.value}}"
          wait: true
          transmitFailed: true
    else:
      - id: parallel
        type: io.kestra.plugin.core.flow.Parallel
        tasks:
          - id: startStatusChecker
            type: io.kestra.plugin.core.flow.Subflow
            namespace: utility
            flowId: statusCheckerPy
            inputs:
              execution_id: "{{execution.id}}"
              payload: "{{trigger.value}}"
            wait: false
            transmitFailed: true
          - id: Generator
            type: io.kestra.plugin.core.flow.Subflow
            namespace: generators
            flowId: GeneratorKafka
            inputs:
              payload: "{{trigger.value}}"
            wait: true
            transmitFailed: true
            

regarding the version, it happened with:
Kestra Version: 0.18.2
Operating System (OS/Docker/Kubernetes): Kubernetes

but we are using the newer one at the moment with same issues:
Kestra Version: 0.19.11
Operating System (OS/Docker/Kubernetes): Kubernetes

Here is a screenshot of the execution with 0s when I create a new revision of the flow in order for trigger to start running again:
Screenshot 2024-12-11 131953

Last working execution before stoppage has revision of 225, the faulty one has also revision 225 and when I created a new one 226 it started working properly.

Faulty execution with 0s doesn't have any logs in Gantt or in Logs tab, also what is weird is that the Trigger section is empty.
trigger

Normal executions have Trigger section populated with proper event from Kafka topic.

@tchiotludo
Copy link
Member

tchiotludo commented Dec 11, 2024

Can you look at the logs on the server at the time the execution failed please ?
It's where you will have the main information that will allows us to find the issue

@nikolicdragoslav
Copy link
Author

Hi @tchiotludo,

I looked into the logs at the time when I had the faulty execution and this is what I see shortly before the failure. I have masked the broker and IP information from the stack trace.

Just an FYI we are using MSK cluster on AWS.

2024-12-09 11:13:58 | 2024-12-09T10:13:57.952132621Z stdout F 2024-12-09 10:13:57,951 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Metrics scheduler closed
-- | --
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.952186838Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Closing reporter org.apache.kafka.common.metrics.JmxReporter
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.952733136Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.952762055Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter Stopping ClientTelemetryReporter
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.952768394Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter close telemetry sender for client telemetry reporter instance
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.953084955Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter Setting telemetry state from SUBSCRIPTION_NEEDED to TERMINATED
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.953103469Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Metrics reporters closed
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955531644Z stdout F 2024-12-09 10:13:57,955 DEBUG WorkerThread o.a.k.c.network.SslTransportLayer [SslTransportLayer channelId=1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=xxxx/x.x.x.x:xxxx], selector=sun.nio.ch.EPollSelectorImpl@2b4c61cb, interestOps=8, readyOps=0] SSLEngine.closeInBound() raised an exception.
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955560769Z stdout F javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955576277Z stdout F 	at java.base/sun.security.ssl.SSLEngineImpl.closeInbound(Unknown Source)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955581776Z stdout F 	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:203)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955586224Z stdout F 	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:1047)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955592411Z stdout F 	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:155)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955597048Z stdout F 	at org.apache.kafka.common.network.Selector.doClose(Selector.java:956)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955602171Z stdout F 	at org.apache.kafka.common.network.Selector.close(Selector.java:940)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955606913Z stdout F 	at org.apache.kafka.common.network.Selector.close(Selector.java:886)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955611861Z stdout F 	at org.apache.kafka.common.network.Selector.lambda$null$0(Selector.java:368)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955616221Z stdout F 	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1171)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955620464Z stdout F 	at org.apache.kafka.common.utils.Utils.closeAllQuietly(Utils.java:1186)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955625211Z stdout F 	at org.apache.kafka.common.network.Selector.close(Selector.java:367)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955629557Z stdout F 	at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:678)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955635122Z stdout F 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.close(ConsumerNetworkClient.java:548)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955639608Z stdout F 	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1171)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955644261Z stdout F 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1153)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955648924Z stdout F 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1103)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955654179Z stdout F 	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1091)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955658765Z stdout F 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1757)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955663317Z stdout F 	at io.kestra.plugin.kafka.RealtimeTrigger.lambda$publisher$1(RealtimeTrigger.java:131)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955667725Z stdout F 	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955672963Z stdout F 	at reactor.core.publisher.Flux.subscribe(Flux.java:8848)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955677387Z stdout F 	at reactor.core.publisher.Flux.blockLast(Flux.java:2816)
  |   | 2024-12-09 11:13:58 | 2024-12-09T10:13:57.955690295Z stdout F 	at io.kestra.core.runners.WorkerTriggerRealtimeThread.doRun(WorkerTriggerRealtimeThread.java:55)

Do you have any idea what could be causing this and how to resolve it?

Thanks,
Dragoslav

@tchiotludo
Copy link
Member

The error is expected, and I didn't get any clue, seems a transient one. The fact is that is should not prevent future execution.

Does this log are coming from kestra? the format seems not ours.

@nikolicdragoslav
Copy link
Author

nikolicdragoslav commented Dec 12, 2024

@tchiotludo the logs are from Grafana, but Kestra pods are being scraped, only thing I can see in Kestra logs in the UI is:

[namespace: orchestrators] [flow: OrchestratorKafka] [execution: 3P0nFIANtf5O6IznM1HEjn] [date: 2024-12-10T09:33:54.318Z] Realtime trigger failed to be created in the worker with error: unknown

if you disregard this part of the lines, for example:
2024-12-09 11:13:58 | 2024-12-09T10:13:57.952132621Z stdout F

you will see the format is exactly from Kestra logs

@nikolicdragoslav
Copy link
Author

maybe so you can piece it together, this is what happens shortly before the logs I already shared:

2024-12-09T10:13:57.971079459Z stdout F 2024-12-09 10:13:57,970 DEBUG WorkerThread o.a.k.c.s.a.SaslClientAuthenticator [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Set SASL client state to INTERMEDIATE
2024-12-09 11:13:58	
2024-12-09T10:13:58.009064132Z stdout F 2024-12-09 10:13:58,008 DEBUG WorkerThread o.a.k.c.s.a.SaslClientAuthenticator [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Set SASL client state to COMPLETE
2024-12-09 11:13:58	
2024-12-09T10:13:58.018679013Z stdout F 2024-12-09 10:13:58,017 DEBUG WorkerThread o.a.k.c.s.a.SaslClientAuthenticator [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Finished authentication with session expiration in 3598995 ms and session re-authentication on or after 3300056 ms
2024-12-09 11:13:58	
2024-12-09T10:13:58.024323882Z stdout F 2024-12-09 10:13:58,020 DEBUG WorkerThread o.a.kafka.common.network.Selector [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Successfully authenticated with xxxx/x.x.x.x
2024-12-09 11:13:58	
2024-12-09T10:13:58.025009151Z stdout F 2024-12-09 10:13:58,024 DEBUG WorkerThread o.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Initiating API versions fetch from node -2.
2024-12-09 11:13:58	
2024-12-09T10:13:58.025242641Z stdout F 2024-12-09 10:13:58,025 DEBUG WorkerThread o.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-kestraConsumer-21, groupId=kestraConsumer] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=consumer-kestraConsumer-21, correlationId=1, headerVersion=2) and timeout 30000 to node -2: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.7.1')
2024-12-09 11:13:58	
2024-12-09T10:13:57.943821589Z stdout F 2024-12-09 10:13:57,943 DEBUG WorkerThread o.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kestraConsumer-1, correlationId=1009210, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1751902476, responses=[], nodeEndpoints=[])
2024-12-09 11:13:58	
2024-12-09T10:13:57.945043027Z stdout F 2024-12-09 10:13:57,944 DEBUG WorkerThread o.a.k.clients.FetchSessionHandler [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Node 2 sent a full fetch response that created a new incremental fetch session 1751902476 with 0 response partition(s)
2024-12-09 11:13:58	
2024-12-09T10:13:57.945125719Z stdout F 2024-12-09 10:13:57,944 DEBUG WorkerThread o.a.k.c.c.internals.AbstractFetch [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Removing pending request for fetch session: 1751902476 for node:  xxxx:xxxx (id: 2 rack: euc1-az2)
2024-12-09 11:13:58	
2024-12-09T10:13:57.946823718Z stdout F 2024-12-09 10:13:57,946 DEBUG WorkerThread o.apache.kafka.clients.NetworkClient [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-kestraConsumer-1, correlationId=1009212, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=0, responses=[], nodeEndpoints=[])
2024-12-09 11:13:58	
2024-12-09T10:13:57.946905932Z stdout F 2024-12-09 10:13:57,946 DEBUG WorkerThread o.a.k.c.c.internals.AbstractFetch [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Removing pending request for fetch session: 1751902476 for node: xxxx:xxxx (id: 2 rack: euc1-az2)
2024-12-09 11:13:58	
2024-12-09T10:13:57.947679178Z stdout F 2024-12-09 10:13:57,947 DEBUG WorkerThread o.a.k.c.c.internals.AbstractFetch [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Successfully sent a close message for fetch session: 1751902476 to node:  xxxx/xxxx (id: 2 rack: euc1-az2)
2024-12-09 11:13:58	
2024-12-09T10:13:57.948254496Z stdout F 2024-12-09 10:13:57,948 DEBUG WorkerThread o.a.k.c.c.internals.FetchBuffer [Consumer clientId=consumer-kestraConsumer-1, groupId=kestraConsumer] Removing sdp-generate-event-0 from buffered fetch data as it is not in the set of partitions to retain ([])
2024-12-09 11:13:58	
2024-12-09T10:13:57.949387749Z stdout F 2024-12-09 10:13:57,949 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=last-poll-seconds-ago, group=consumer-metrics, description=The number of seconds since the last poll() invocation., tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.950200988Z stdout F 2024-12-09 10:13:57,950 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=time-between-poll-avg, group=consumer-metrics, description=The average delay between invocations of poll() in milliseconds., tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.950513609Z stdout F 2024-12-09 10:13:57,950 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=time-between-poll-max, group=consumer-metrics, description=The max delay between invocations of poll() in milliseconds., tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.950895231Z stdout F 2024-12-09 10:13:57,950 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=poll-idle-ratio-avg, group=consumer-metrics, description=The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records., tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.951508174Z stdout F 2024-12-09 10:13:57,951 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=commit-sync-time-ns-total, group=consumer-metrics, description=The total time the consumer has spent in commitSync in nanoseconds, tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.951864383Z stdout F 2024-12-09 10:13:57,951 DEBUG WorkerThread o.a.k.c.t.i.KafkaMetricsCollector removing kafka metric : MetricName [name=committed-time-ns-total, group=consumer-metrics, description=The total time the consumer has spent in committed in nanoseconds, tags={client-id=consumer-kestraConsumer-1}]
2024-12-09 11:13:58	
2024-12-09T10:13:57.952132621Z stdout F 2024-12-09 10:13:57,951 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Metrics scheduler closed
2024-12-09 11:13:58	
2024-12-09T10:13:57.952186838Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-12-09 11:13:58	
2024-12-09T10:13:57.952733136Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
2024-12-09 11:13:58	
2024-12-09T10:13:57.952762055Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter Stopping ClientTelemetryReporter
2024-12-09 11:13:58	
2024-12-09T10:13:57.952768394Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter close telemetry sender for client telemetry reporter instance
2024-12-09 11:13:58	
2024-12-09T10:13:57.953084955Z stdout F 2024-12-09 10:13:57,952 DEBUG WorkerThread o.a.k.c.t.i.ClientTelemetryReporter Setting telemetry state from SUBSCRIPTION_NEEDED to TERMINATED
2024-12-09 11:13:58	
2024-12-09T10:13:57.953103469Z stdout F 2024-12-09 10:13:57,952 INFO  WorkerThread o.a.kafka.common.metrics.Metrics Metrics reporters closed
2024-12-09 11:13:58	
2024-12-09T10:13:57.955531644Z stdout F 2024-12-09 10:13:57,955 DEBUG WorkerThread o.a.k.c.network.SslTransportLayer [SslTransportLayer channelId=1 key=channel=java.nio.channels.SocketChannel[connection-pending remote= xxxx/x.x.x.x:xxxx], selector=sun.nio.ch.EPollSelectorImpl@2b4c61cb, interestOps=8, readyOps=0] SSLEngine.closeInBound() raised an exception.
2024-12-09 11:13:58	
2024-12-09T10:13:57.955560769Z stdout F javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify

@tchiotludo
Copy link
Member

So it's an invalid ssl connection, could you try again with latest version please?
I see some improvement on the resiliency for Realtime trigger.

@nikolicdragoslav
Copy link
Author

@tchiotludo would you mind providing the tag from
https://hub.docker.com/r/kestra/kestra/tags

I can try doing it.

Thanks

@tchiotludo
Copy link
Member

latest is the perfect one

@nikolicdragoslav
Copy link
Author

nikolicdragoslav commented Dec 13, 2024

@tchiotludo I will get back to you after some testing with the new version, thank you

@nikolicdragoslav
Copy link
Author

@tchiotludo it seems that it has better behavior, I will wait for some time to monitor it closely before closing the ticket. Just for note, latest tag is using v0.20.6 as I can see in the UI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/plugin Plugin-related issue or feature request bug Something isn't working
Projects
Status: Backlog
Development

No branches or pull requests

6 participants