Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception When Running AzureServiceBusSinkConnector With Premium Azure Service Bus #1446

Open
dallinb opened this issue Oct 24, 2024 · 4 comments

Comments

@dallinb
Copy link

dallinb commented Oct 24, 2024

Issue Guidelines

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

8.1.11

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

AFAIK

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Using Kafka running in Docker Compose as the data source as this is a proof of concept/prototype. The error also doesn't happen when using the same rig but deploying to a Standard Azure Service Bus.

Have you read the docs?

Yes

What is the expected behaviour?

Messages to be transferred from Kafka to Azure Service Bus (happens when Service Bus is "Standard").

What was observed?

When the Azure Service Bus is "Premium" we get the following error status from Kafka Connect:

{
  "name": "AzureServiceBusSinkConnector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect-dev:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect-dev:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSendingException: Number of retries exhausted. Cause:\n\tat io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$4(TaskToSenderBridge.java:153)\n\tat java.base/java.util.Optional.map(Optional.java:260)\n\tat io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:153)\n\tat java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)\n\tat java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)\n\tat io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)\n\tat io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\nCaused by: com.azure.messaging.servicebus.ServiceBusException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.\n\tat com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:953)\n\tat reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)\n\tat reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)\n\tat reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)\n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)\n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)\n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)\n\tat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)\n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\tat reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)\n\tat reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)\n\tat reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)\n\tat reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)\n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)\n\tat reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)\n\tat reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)\n\tat reactor.core.publisher.Operators.error(Operators.java:198)\n\tat reactor.core.publisher.MonoError.subscribe(MonoError.java:53)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4491)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\n\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)\n\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)\n\tat com.azure.core.amqp.implementation.RetriableWorkItem.error(RetriableWorkItem.java:76)\n\tat com.azure.core.amqp.implementation.ReactorSender.cleanupFailedSend(ReactorSender.java:715)\n\tat com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:672)\n\tat reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)\n\tat reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)\n\tat reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)\n\tat reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)\n\tat reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)\n\tat com.azure.core.amqp.implementation.handler.SendLinkHandler.onDelivery(SendLinkHandler.java:205)\n\tat org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)\n\tat org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)\n\tat org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)\n\tat org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)\n\tat com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\t... 3 more\n\tSuppressed: java.lang.Exception: #block terminated with an error\n\t\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:141)\n\t\tat reactor.core.publisher.Mono.block(Mono.java:1766)\n\t\tat com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessages(ServiceBusSenderClient.java:296)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.lambda$submitBatch$1(ServiceBusSenderFacade.java:99)\n\t\tat cyclops.control.Try.runWithCatch(Try.java:889)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.submitBatch(ServiceBusSenderFacade.java:99)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.sendMessages(ServiceBusSenderFacade.java:91)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:145)\n\t\tat java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)\n\t\tat java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)\n\t\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)\n\t\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)\n\t\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)\n\t\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\t\tat java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)\n\t\tat io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\t\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\t\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\t\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\t\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\t\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\t... 3 more\nCaused by: java.lang.UnsupportedOperationException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.\n\tat com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:78)\n\tat com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:650)\n\t... 18 more\n"
    }
  ],
  "type": "sink"
}

What is your Connect cluster configuration (connect-avro-distributed.properties)?

services:
  connect-dev:
    build:
      args:
        - STREAM_REACTOR_VERSION=8.1.11
      context: kafka-connect
    depends_on:
      kafka-kc:
        condition: service_healthy
      kafka-tm-dev:
        condition: service_healthy
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka-kc:9092
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: -1
      CONNECT_CONFIG_STORAGE_TOPIC: dev.connect.config
      CONNECT_GROUP_ID: kc_dev
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: -1
      CONNECT_OFFSET_STORAGE_TOPIC: dev.connect.storage
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN"
      CONNECT_LOG4J_ROOT_LOGLEVEL: ERROR
      CONNECT_MAX_REQUEST_SIZE: 4194352
      CONNECT_PLUGIN_PATH: "/usr/share/java/,/usr/share/confluent-hub-components/,/usr/local/lib/connectors"
      CONNNECT_PRODUCER_BATCH_SIZE: 4194352
      CONNECT_PRODUCER_MAX_REQUEST_SIZE: 4194352
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-dev
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: -1
      CONNECT_STATUS_STORAGE_TOPIC: dev.connect.status
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
    healthcheck:
      test: curl --fail localhost:8083/connectors
      interval: 10s
      timeout: 30s
      retries: 6
      start_period: 20s
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - "8083:8083"
    volumes:
      - "./connectors:/mnt/connectors:ro"

What is your connector properties configuration (my-connector.properties)?

{
    "name": "AzureServiceBusSinkConnector",
    "config": {
      "consumer.override.bootstrap.servers": "kafka-tm-dev:9093",
      "config.action.reload": "restart",
      "connect.servicebus.connection.string": "Endpoint=sb://sbns-sbus-poc-uks-001.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED",
      "connect.servicebus.kcql": "INSERT INTO `vault.api.v1.accounts.account.created` SELECT * FROM `vault.api.v1.accounts.account.created` BATCH=1 PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.accounts.account.created.failures` SELECT * FROM `vault.api.v1.accounts.account.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.accounts.account.instance_param_vals.updated` SELECT * FROM `vault.api.v1.accounts.account.instance_param_vals.updated` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.accounts.account.instance_param_vals.updated.failures` SELECT * FROM `vault.api.v1.accounts.account.instance_param_vals.updated.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.accounts.account.status.updated` SELECT * FROM `vault.api.v1.accounts.account.status.updated` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.accounts.account.status.updated.failures` SELECT * FROM `vault.api.v1.accounts.account.status.updated.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.action_logs.action_log.created` SELECT * FROM `vault.api.v1.action_logs.action_log.created` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.action_logs.action_log.created.failures` SELECT * FROM `vault.api.v1.action_logs.action_log.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.audit_logs.audit_log.created` SELECT * FROM `vault.api.v1.audit_logs.audit_log.created` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.audit_logs.audit_log.created.failures` SELECT * FROM `vault.api.v1.audit_logs.audit_log.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.customers.customer.created` SELECT * FROM `vault.api.v1.customers.customer.created` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.customers.customer.created.failures` SELECT * FROM `vault.api.v1.customers.customer.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.customers.customer.customer_details.updated` SELECT * FROM `vault.api.v1.customers.customer.customer_details.updated` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.customers.customer.customer_details.updated.failures` SELECT * FROM `vault.api.v1.customers.customer.customer_details.updated.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.postings.posting_instruction_batch.created` SELECT * FROM `vault.api.v1.postings.posting_instruction_batch.created` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.postings.posting_instruction_batch.created.contracts_bridge.dlq` SELECT * FROM `vault.api.v1.postings.posting_instruction_batch.created.contracts_bridge.dlq` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.postings.posting_instruction_batch.created.failures` SELECT * FROM `vault.api.v1.postings.posting_instruction_batch.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.postings.posting_instruction_batch.created.migrated_postings_bridge.dlq` SELECT * FROM `vault.api.v1.postings.posting_instruction_batch.created.migrated_postings_bridge.dlq` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.products.product_version.created` SELECT * FROM `vault.api.v1.products.product_version.created` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.products.product_version.created.failures` SELECT * FROM `vault.api.v1.products.product_version.created.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.products.product_version.parameter.updated` SELECT * FROM `vault.api.v1.products.product_version.parameter.updated` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.products.product_version.parameter.updated.failures` SELECT * FROM `vault.api.v1.products.product_version.parameter.updated.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.create.requests` SELECT * FROM `vault.api.v1.workflows.workflow_instance.create.requests` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.create.requests.failures` SELECT * FROM `vault.api.v1.workflows.workflow_instance.create.requests.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.external_operation.requests` SELECT * FROM `vault.api.v1.workflows.workflow_instance.external_operation.requests` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.external_operation.requests.failures` SELECT * FROM `vault.api.v1.workflows.workflow_instance.external_operation.requests.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.external_operation.responses` SELECT * FROM `vault.api.v1.workflows.workflow_instance.external_operation.responses` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.api.v1.workflows.workflow_instance.external_operation.responses.failures` SELECT * FROM `vault.api.v1.workflows.workflow_instance.external_operation.responses.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core.postings.async_creation_api.responses` SELECT * FROM `vault.core.postings.async_creation_api.responses` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core.postings.requests.dlq.v1` SELECT * FROM `vault.core.postings.requests.dlq.v1` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core.postings.requests.v1` SELECT * FROM `vault.core.postings.requests.v1` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account.events` SELECT * FROM `vault.core_api.v1.accounts.account.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account.events.failures` SELECT * FROM `vault.core_api.v1.accounts.account.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_post_posting_execution_failure.events` SELECT * FROM `vault.core_api.v1.accounts.account_post_posting_execution_failure.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_post_posting_execution_failure.events.failures` SELECT * FROM `vault.core_api.v1.accounts.account_post_posting_execution_failure.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_schedule_job_execution_failure.events` SELECT * FROM `vault.core_api.v1.accounts.account_schedule_job_execution_failure.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_schedule_job_execution_failure.events.failures` SELECT * FROM `vault.core_api.v1.accounts.account_schedule_job_execution_failure.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_update.events` SELECT * FROM `vault.core_api.v1.accounts.account_update.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_update.events.failures` SELECT * FROM `vault.core_api.v1.accounts.account_update.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_update_batch.events` SELECT * FROM `vault.core_api.v1.accounts.account_update_batch.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.accounts.account_update_batch.events.failures` SELECT * FROM `vault.core_api.v1.accounts.account_update_batch.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.balances.account_balance.events` SELECT * FROM `vault.core_api.v1.balances.account_balance.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.balances.account_balance.events.failures` SELECT * FROM `vault.core_api.v1.balances.account_balance.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.balances.balance.events` SELECT * FROM `vault.core_api.v1.balances.balance.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.balances.balance.events.failures` SELECT * FROM `vault.core_api.v1.balances.balance.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar.events` SELECT * FROM `vault.core_api.v1.calendar.calendar.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar.events.failures` SELECT * FROM `vault.core_api.v1.calendar.calendar.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar_event.events` SELECT * FROM `vault.core_api.v1.calendar.calendar_event.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar_event.events.failures` SELECT * FROM `vault.core_api.v1.calendar.calendar_event.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar_period.events` SELECT * FROM `vault.core_api.v1.calendar.calendar_period.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.calendar.calendar_period.events.failures` SELECT * FROM `vault.core_api.v1.calendar.calendar_period.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.contracts.contract_notification.events` SELECT * FROM `vault.core_api.v1.contracts.contract_notification.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.contracts.contract_notification.events.failures` SELECT * FROM `vault.core_api.v1.contracts.contract_notification.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.customers.customer_address.events` SELECT * FROM `vault.core_api.v1.customers.customer_address.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.customers.customer_address.events.failures` SELECT * FROM `vault.core_api.v1.customers.customer_address.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.flags.flag.events` SELECT * FROM `vault.core_api.v1.flags.flag.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.flags.flag.events.failures` SELECT * FROM `vault.core_api.v1.flags.flag.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.ledger_balances.bucket_reassignment.events` SELECT * FROM `vault.core_api.v1.ledger_balances.bucket_reassignment.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.ledger_balances.bucket_reassignment.events.failures` SELECT * FROM `vault.core_api.v1.ledger_balances.bucket_reassignment.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_devices.payment_device.events` SELECT * FROM `vault.core_api.v1.payment_devices.payment_device.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_devices.payment_device.events.failures` SELECT * FROM `vault.core_api.v1.payment_devices.payment_device.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_devices.payment_device_link.events` SELECT * FROM `vault.core_api.v1.payment_devices.payment_device_link.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_devices.payment_device_link.events.failures` SELECT * FROM `vault.core_api.v1.payment_devices.payment_device_link.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_orders.payment_order.events` SELECT * FROM `vault.core_api.v1.payment_orders.payment_order.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_orders.payment_order.events.failures` SELECT * FROM `vault.core_api.v1.payment_orders.payment_order.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_orders.payment_order_execution.events` SELECT * FROM `vault.core_api.v1.payment_orders.payment_order_execution.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.payment_orders.payment_order_execution.events.failures` SELECT * FROM `vault.core_api.v1.payment_orders.payment_order_execution.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.account_plan_assoc.events` SELECT * FROM `vault.core_api.v1.plans.account_plan_assoc.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.account_plan_assoc.events.failures` SELECT * FROM `vault.core_api.v1.plans.account_plan_assoc.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan.events` SELECT * FROM `vault.core_api.v1.plans.plan.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan.events.failures` SELECT * FROM `vault.core_api.v1.plans.plan.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan_migration.events` SELECT * FROM `vault.core_api.v1.plans.plan_migration.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan_migration.events.failures` SELECT * FROM `vault.core_api.v1.plans.plan_migration.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan_update.events` SELECT * FROM `vault.core_api.v1.plans.plan_update.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.plans.plan_update.events.failures` SELECT * FROM `vault.core_api.v1.plans.plan_update.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.postings.enriched_posting_instruction_batch.events` SELECT * FROM `vault.core_api.v1.postings.enriched_posting_instruction_batch.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.postings.enriched_posting_instruction_batch.events.failures` SELECT * FROM `vault.core_api.v1.postings.enriched_posting_instruction_batch.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.restrictions.restriction_set.events` SELECT * FROM `vault.core_api.v1.restrictions.restriction_set.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.restrictions.restriction_set.events.failures` SELECT * FROM `vault.core_api.v1.restrictions.restriction_set.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.scheduler.operation.events` SELECT * FROM `vault.core_api.v1.scheduler.operation.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v1.scheduler.operation.events.failures` SELECT * FROM `vault.core_api.v1.scheduler.operation.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v2.migrations.instruction.requests` SELECT * FROM `vault.core_api.v2.migrations.instruction.requests` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.core_api.v2.migrations.instruction.responses` SELECT * FROM `vault.core_api.v2.migrations.instruction.responses` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.dependency_group.events` SELECT * FROM `vault.data_loader_api.v1.data_loader.dependency_group.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.dependency_group.events.failures` SELECT * FROM `vault.data_loader_api.v1.data_loader.dependency_group.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource.migrated.events` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource.migrated.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource.migrated.events.failures` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource.migrated.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.create.requests` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.create.requests` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.create.requests.failures` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.create.requests.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.create.responses` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.create.responses` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.create.responses.failures` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.create.responses.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.events` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.events` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.data_loader_api.v1.data_loader.resource_batch.events.failures` SELECT * FROM `vault.data_loader_api.v1.data_loader.resource_batch.events.failures` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.migrations.postings.requests` SELECT * FROM `vault.migrations.postings.requests` PROPERTIES('servicebus.type'='TOPIC'); INSERT INTO `vault.migrations.postings.responses` SELECT * FROM `vault.migrations.postings.responses` PROPERTIES('servicebus.type'='TOPIC');",
      "connect.servicebus.sink.retries.timeout": "500",
      "connector.class": "io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkConnector",
      "errors.deadletterqueue.context.headers.enable": "false",
      "errors.deadletterqueue.topic.replication.factor": "1",
      "errors.log.enable": "false",
      "errors.log.include.messages": "false",
      "errors.tolerance": "none",
      "name": "AzureServiceBusSinkConnector",
      "tasks.max": "1",
      "topics": "vault.api.v1.accounts.account.created,vault.api.v1.accounts.account.created.failures,vault.api.v1.accounts.account.instance_param_vals.updated,vault.api.v1.accounts.account.instance_param_vals.updated.failures,vault.api.v1.accounts.account.status.updated,vault.api.v1.accounts.account.status.updated.failures,vault.api.v1.action_logs.action_log.created,vault.api.v1.action_logs.action_log.created.failures,vault.api.v1.audit_logs.audit_log.created,vault.api.v1.audit_logs.audit_log.created.failures,vault.api.v1.customers.customer.created,vault.api.v1.customers.customer.created.failures,vault.api.v1.customers.customer.customer_details.updated,vault.api.v1.customers.customer.customer_details.updated.failures,vault.api.v1.postings.posting_instruction_batch.created,vault.api.v1.postings.posting_instruction_batch.created.contracts_bridge.dlq,vault.api.v1.postings.posting_instruction_batch.created.failures,vault.api.v1.postings.posting_instruction_batch.created.migrated_postings_bridge.dlq,vault.api.v1.products.product_version.created,vault.api.v1.products.product_version.created.failures,vault.api.v1.products.product_version.parameter.updated,vault.api.v1.products.product_version.parameter.updated.failures,vault.api.v1.workflows.workflow_instance.create.requests,vault.api.v1.workflows.workflow_instance.create.requests.failures,vault.api.v1.workflows.workflow_instance.external_operation.requests,vault.api.v1.workflows.workflow_instance.external_operation.requests.failures,vault.api.v1.workflows.workflow_instance.external_operation.responses,vault.api.v1.workflows.workflow_instance.external_operation.responses.failures,vault.core.postings.async_creation_api.responses,vault.core.postings.requests.dlq.v1,vault.core.postings.requests.v1,vault.core_api.v1.accounts.account.events,vault.core_api.v1.accounts.account.events.failures,vault.core_api.v1.accounts.account_post_posting_execution_failure.events,vault.core_api.v1.accounts.account_post_posting_execution_failure.events.failures,vault.core_api.v1.accounts.account_schedule_job_execution_failure.events,vault.core_api.v1.accounts.account_schedule_job_execution_failure.events.failures,vault.core_api.v1.accounts.account_update.events,vault.core_api.v1.accounts.account_update.events.failures,vault.core_api.v1.accounts.account_update_batch.events,vault.core_api.v1.accounts.account_update_batch.events.failures,vault.core_api.v1.balances.account_balance.events,vault.core_api.v1.balances.account_balance.events.failures,vault.core_api.v1.balances.balance.events,vault.core_api.v1.balances.balance.events.failures,vault.core_api.v1.calendar.calendar.events,vault.core_api.v1.calendar.calendar.events.failures,vault.core_api.v1.calendar.calendar_event.events,vault.core_api.v1.calendar.calendar_event.events.failures,vault.core_api.v1.calendar.calendar_period.events,vault.core_api.v1.calendar.calendar_period.events.failures,vault.core_api.v1.contracts.contract_notification.events,vault.core_api.v1.contracts.contract_notification.events.failures,vault.core_api.v1.customers.customer_address.events,vault.core_api.v1.customers.customer_address.events.failures,vault.core_api.v1.flags.flag.events,vault.core_api.v1.flags.flag.events.failures,vault.core_api.v1.ledger_balances.bucket_reassignment.events,vault.core_api.v1.ledger_balances.bucket_reassignment.events.failures,vault.core_api.v1.payment_devices.payment_device.events,vault.core_api.v1.payment_devices.payment_device.events.failures,vault.core_api.v1.payment_devices.payment_device_link.events,vault.core_api.v1.payment_devices.payment_device_link.events.failures,vault.core_api.v1.payment_orders.payment_order.events,vault.core_api.v1.payment_orders.payment_order.events.failures,vault.core_api.v1.payment_orders.payment_order_execution.events,vault.core_api.v1.payment_orders.payment_order_execution.events.failures,vault.core_api.v1.plans.account_plan_assoc.events,vault.core_api.v1.plans.account_plan_assoc.events.failures,vault.core_api.v1.plans.plan.events,vault.core_api.v1.plans.plan.events.failures,vault.core_api.v1.plans.plan_migration.events,vault.core_api.v1.plans.plan_migration.events.failures,vault.core_api.v1.plans.plan_update.events,vault.core_api.v1.plans.plan_update.events.failures,vault.core_api.v1.postings.enriched_posting_instruction_batch.events,vault.core_api.v1.postings.enriched_posting_instruction_batch.events.failures,vault.core_api.v1.restrictions.restriction_set.events,vault.core_api.v1.restrictions.restriction_set.events.failures,vault.core_api.v1.scheduler.operation.events,vault.core_api.v1.scheduler.operation.events.failures,vault.core_api.v2.migrations.instruction.requests,vault.core_api.v2.migrations.instruction.responses,vault.data_loader_api.v1.data_loader.dependency_group.events,vault.data_loader_api.v1.data_loader.dependency_group.events.failures,vault.data_loader_api.v1.data_loader.resource.migrated.events,vault.data_loader_api.v1.data_loader.resource.migrated.events.failures,vault.data_loader_api.v1.data_loader.resource_batch.create.requests,vault.data_loader_api.v1.data_loader.resource_batch.create.requests.failures,vault.data_loader_api.v1.data_loader.resource_batch.create.responses,vault.data_loader_api.v1.data_loader.resource_batch.create.responses.failures,vault.data_loader_api.v1.data_loader.resource_batch.events,vault.data_loader_api.v1.data_loader.resource_batch.events.failures,vault.migrations.postings.requests,vault.migrations.postings.responses"
    }
  }

Please provide full log files (redact and sensitive information)

Logs from Kafka Connect:

connect-dev-1  | ===> User
connect-dev-1  | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
connect-dev-1  | ===> Configuring ...
connect-dev-1  | ===> Running preflight checks ... 
connect-dev-1  | ===> Check if Kafka is healthy ...
connect-dev-1  | Using log4j config /etc/cp-base-new/log4j.properties
connect-dev-1  | ===> Launching ... 
connect-dev-1  | ===> Launching kafka-connect ... 
connect-dev-1  | [2024-10-24 12:34:16,128] ERROR entityPath[vault.api.v1.accounts.account.created], messages-count[256]: Sending messages timed out.
connect-dev-1  | The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher. (com.azure.core.amqp.implementation.RetryUtil)
connect-dev-1  | [2024-10-24 12:34:16,738] ERROR entityPath[vault.api.v1.accounts.account.created], messages-count[256]: Sending messages timed out.
connect-dev-1  | The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher. (com.azure.core.amqp.implementation.RetryUtil)
connect-dev-1  | [2024-10-24 12:34:17,346] ERROR entityPath[vault.api.v1.accounts.account.created], messages-count[256]: Sending messages timed out.
connect-dev-1  | The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher. (com.azure.core.amqp.implementation.RetryUtil)
connect-dev-1  | [2024-10-24 12:34:17,848] ERROR WorkerSinkTask{id=AzureServiceBusSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Number of retries exhausted. Cause: (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-dev-1  | io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSendingException: Number of retries exhausted. Cause:
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$4(TaskToSenderBridge.java:153)
connect-dev-1  |        at java.base/java.util.Optional.map(Optional.java:260)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:153)
connect-dev-1  |        at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
connect-dev-1  |        at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
connect-dev-1  |        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect-dev-1  |        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
connect-dev-1  |        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
connect-dev-1  |        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
connect-dev-1  |        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
connect-dev-1  |        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
connect-dev-1  |        at java.base/java.lang.Thread.run(Thread.java:840)
connect-dev-1  | Caused by: com.azure.messaging.servicebus.ServiceBusException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.
connect-dev-1  |        at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:953)
connect-dev-1  |        at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
connect-dev-1  |        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
connect-dev-1  |        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
connect-dev-1  |        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
connect-dev-1  |        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)
connect-dev-1  |        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
connect-dev-1  |        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
connect-dev-1  |        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
connect-dev-1  |        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
connect-dev-1  |        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
connect-dev-1  |        at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
connect-dev-1  |        at reactor.core.publisher.Operators.error(Operators.java:198)
connect-dev-1  |        at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
connect-dev-1  |        at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
connect-dev-1  |        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
connect-dev-1  |        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
connect-dev-1  |        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
connect-dev-1  |        at com.azure.core.amqp.implementation.RetriableWorkItem.error(RetriableWorkItem.java:76)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.cleanupFailedSend(ReactorSender.java:715)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:672)
connect-dev-1  |        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
connect-dev-1  |        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
connect-dev-1  |        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
connect-dev-1  |        at com.azure.core.amqp.implementation.handler.SendLinkHandler.onDelivery(SendLinkHandler.java:205)
connect-dev-1  |        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
connect-dev-1  |        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
connect-dev-1  |        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
connect-dev-1  |        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
connect-dev-1  |        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
connect-dev-1  |        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
connect-dev-1  |        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
connect-dev-1  |        ... 3 more
connect-dev-1  |        Suppressed: java.lang.Exception: #block terminated with an error
connect-dev-1  |                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:141)
connect-dev-1  |                at reactor.core.publisher.Mono.block(Mono.java:1766)
connect-dev-1  |                at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessages(ServiceBusSenderClient.java:296)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.lambda$submitBatch$1(ServiceBusSenderFacade.java:99)
connect-dev-1  |                at cyclops.control.Try.runWithCatch(Try.java:889)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.submitBatch(ServiceBusSenderFacade.java:99)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.sendMessages(ServiceBusSenderFacade.java:91)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:145)
connect-dev-1  |                at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
connect-dev-1  |                at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
connect-dev-1  |                at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect-dev-1  |                at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
connect-dev-1  |                at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
connect-dev-1  |                at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
connect-dev-1  |                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |                ... 3 more
connect-dev-1  | Caused by: java.lang.UnsupportedOperationException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.
connect-dev-1  |        at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:78)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:650)
connect-dev-1  |        ... 18 more
connect-dev-1  | [2024-10-24 12:34:17,853] ERROR WorkerSinkTask{id=AzureServiceBusSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect-dev-1  | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
connect-dev-1  |        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
connect-dev-1  |        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
connect-dev-1  |        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
connect-dev-1  |        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
connect-dev-1  |        at java.base/java.lang.Thread.run(Thread.java:840)
connect-dev-1  | Caused by: io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSendingException: Number of retries exhausted. Cause:
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$4(TaskToSenderBridge.java:153)
connect-dev-1  |        at java.base/java.util.Optional.map(Optional.java:260)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:153)
connect-dev-1  |        at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
connect-dev-1  |        at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
connect-dev-1  |        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
connect-dev-1  |        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect-dev-1  |        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)
connect-dev-1  |        at io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)
connect-dev-1  |        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect-dev-1  |        ... 11 more
connect-dev-1  | Caused by: com.azure.messaging.servicebus.ServiceBusException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.
connect-dev-1  |        at com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.mapError(ServiceBusSenderAsyncClient.java:953)
connect-dev-1  |        at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3811)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
connect-dev-1  |        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
connect-dev-1  |        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
connect-dev-1  |        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
connect-dev-1  |        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:415)
connect-dev-1  |        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
connect-dev-1  |        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
connect-dev-1  |        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
connect-dev-1  |        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
connect-dev-1  |        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
connect-dev-1  |        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
connect-dev-1  |        at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onError(FluxTimeout.java:219)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
connect-dev-1  |        at reactor.core.publisher.Operators.error(Operators.java:198)
connect-dev-1  |        at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
connect-dev-1  |        at reactor.core.publisher.Mono.subscribe(Mono.java:4491)
connect-dev-1  |        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
connect-dev-1  |        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
connect-dev-1  |        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
connect-dev-1  |        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
connect-dev-1  |        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
connect-dev-1  |        at com.azure.core.amqp.implementation.RetriableWorkItem.error(RetriableWorkItem.java:76)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.cleanupFailedSend(ReactorSender.java:715)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:672)
connect-dev-1  |        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
connect-dev-1  |        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
connect-dev-1  |        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
connect-dev-1  |        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
connect-dev-1  |        at com.azure.core.amqp.implementation.handler.SendLinkHandler.onDelivery(SendLinkHandler.java:205)
connect-dev-1  |        at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
connect-dev-1  |        at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
connect-dev-1  |        at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
connect-dev-1  |        at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
connect-dev-1  |        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
connect-dev-1  |        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
connect-dev-1  |        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
connect-dev-1  |        ... 3 more
connect-dev-1  |        Suppressed: java.lang.Exception: #block terminated with an error
connect-dev-1  |                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:141)
connect-dev-1  |                at reactor.core.publisher.Mono.block(Mono.java:1766)
connect-dev-1  |                at com.azure.messaging.servicebus.ServiceBusSenderClient.sendMessages(ServiceBusSenderClient.java:296)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.lambda$submitBatch$1(ServiceBusSenderFacade.java:99)
connect-dev-1  |                at cyclops.control.Try.runWithCatch(Try.java:889)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.submitBatch(ServiceBusSenderFacade.java:99)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.ServiceBusSenderFacade.sendMessages(ServiceBusSenderFacade.java:91)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.lambda$sendMessages$5(TaskToSenderBridge.java:145)
connect-dev-1  |                at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)
connect-dev-1  |                at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1850)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
connect-dev-1  |                at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
connect-dev-1  |                at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
connect-dev-1  |                at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.TaskToSenderBridge.sendMessages(TaskToSenderBridge.java:156)
connect-dev-1  |                at io.lenses.streamreactor.connect.azure.servicebus.sink.AzureServiceBusSinkTask.put(AzureServiceBusSinkTask.java:80)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect-dev-1  |                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
connect-dev-1  |                at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
connect-dev-1  |                at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
connect-dev-1  |                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect-dev-1  |                ... 3 more
connect-dev-1  | Caused by: java.lang.UnsupportedOperationException: The link 'G6:19770353:vault.api.v1.accounts.account.createdvault.api.v1.accounts.account.created' is force detached by the broker because publisher(link163) received a batch message with no data in it. Detach origin: Publisher.
connect-dev-1  |        at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:78)
connect-dev-1  |        at com.azure.core.amqp.implementation.ReactorSender.processDeliveredMessage(ReactorSender.java:650)
connect-dev-1  |        ... 18 more
@GoMati-MU
Copy link
Contributor

Hello, @dallinb !

Thanks for bringing this one to our attention! I recently started looking into it, but after some time I've been unable to replicate it, so I'd like to ask some additional questions, if possible.

  1. Do you guys use Shared access policies to manage your access? (It seems so from the config, but if I could ask you to double-check if it has the Send permission, just to skip the obvious)
  2. What is the typical size of the message that you want to sink to the Service Bus?
  3. You mention that When the Azure Service Bus is "Premium" we get the following error. Did you try this on Standard tier too since you've mentioned it's problem on Premium Tier?

I'd appreciate the answers if you happen to have a spare moment 😉
Have a great day!

@dallinb
Copy link
Author

dallinb commented Nov 14, 2024

Hi @GoMati-MU,

Thanks for getting back. Casting my mind back:

  1. Yes, we used shared access policies and they would/do have write permissions.
  2. On average just over 500K, but with a 99th percentile of messages will be > 1MB <= 4MB which is why we need to be able to send to a Premium tier.
  3. Yes, we did initial testing on a Standard tier namespace (not with the larger message size) and it connected and transferred messages OK. The problem described above happened with it was redeployed as a Premium namespace.

In the meantime, we've developed our own connector, so not sure if I'd ever be able to get the time to recreate this problem. Under those circumstances, feel free to close this issue if it can't be reproduced.

@GoMati-MU
Copy link
Contributor

No worries at all, @dallinb ! All the thanks go to you for helping me out understand the issue :)

  1. On average just over 500K, but with a 99th percentile of messages will be > 1MB <= 4MB which is why we need to be able to send to a Premium tier.
  2. Yes, we did initial testing on a Standard tier namespace (not with the larger message size) and it connected and transferred messages OK. The problem described above happened with it was redeployed as a Premium namespace.

This may be the root cause of the problem for our connector, because most of the data sets I used for testing were pretty heavy in terms of amount of messages (up to 100K) but not that much in terms of size. Unfortunately I am pretty limited when it comes to resources (don't have access to Premium Service Bus) but I'll talk to the team to see if this can be set up. Premium API doesn't change in terms of API so I'd say this may be the valid road to take for debugging.

I'll see whether I can do anything about this one for now, but I'm really happy you guys managed to resolve it for your usecase!

Have a great one!

@davidsloan
Copy link
Collaborator

Please try the latest release along with the extra kcql property batch.enable=false and please let us know if you are successful when disabling batching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants