Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport PR #16204 to 8.14: Avoid to log file not found errors when DLQ segments are removed concurrently between writer and reader. #16249

Merged
merged 1 commit into from
Jun 20, 2024

Conversation

github-actions[bot]
Copy link
Contributor

Backport PR #16204 to 8.14 branch, original message:


Release notes

Bugfix, avoid to log file not found errors when DLQ segments are removed concurrently between writer and reader.

What does this PR do?

Rework the logic to delete DLQ eldest segments to be more resilient on file not found errors and avoid to log warn messages that hasn't any actionable job for the user to solve.
This commit reimplement the comparator used on DLQ reader side to identify the fully consumed segments when clean_consumed is enabled to avoid logging warn messages of file not found exception. That condition could manifest when also the writer side deletes segments to satisfy the drop_older storage policy.

It also updates the deleteSegmentmethod so that in case of removal of not existing file, no warning logs are emitted, being a condition that could happen during the execution.

Why is it important/What is the impact to the user?

This PR avoid to warn the user with message logs that in reality could happen in normal execution flow. If reader and writer are deleting on same segments set, it's a possible condition that one of the two experiment a ghost file, a file that during listing is present but is not yet present on actual file operation, because another pipeline already eliminated it.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • [ ] I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • run locally

How to test this PR locally

Create an index in an Elasticsearch cluster that's closed, so that it generates events in DLQ. Then configure one upstream pipeline with DLQ enabled and storage policy set to drop_older, and another downstream pipeline with clean_consumed true. In this way the two pipelines conquer the access to DLQ's tail segments, generating the reported error.

Use the following pipeline definitions in your config/pipelines.yml:

- pipeline.id: dlq_upstream
  dead_letter_queue.enable: true
  dead_letter_queue.storage_policy: drop_older
  dead_letter_queue.max_bytes: 20mb
  config.string: |
    input {
      generator {
        message => '{"name": "Andrea"}'
        codec => json
      }
    }
    output {
      elasticsearch {
        cloud_id => "<es_cloud_id>"
        api_key => "s3cr3t"
        index => "test_index"
      }
    }

- pipeline.id: dlq_reader
  config.string: |
    input {
      dead_letter_queue {
        path => "/tmp/logstash/data/dead_letter_queue/"
        pipeline_id => "dlq_upstream"
        clean_consumed => true
      }
    }
    output {
      sink {}
    }   
dead_letter_queue.max_bytes: 20mb

is useful to generate the error, because it' asks for a DLQ that's not bigger than 2 segments (10 MB per segment by default configuration) and is more probable that the error manifests.

Related issues

Use cases

Screenshots

Logs

Example of the error that this PR resolves.

[2024-06-07T11:34:12,431][WARN ][org.logstash.common.io.DeadLetterQueueReader][dlq_reader][054c2e0bd67fa71b6e51a5547c9848571a078328074d5d1a298b5e48d4866580] Error reading file's timestamp for /Users/andrea/workspace/logstash_andsel/data/dead_letter_queue/dlq_upstream/9.log
java.nio.file.NoSuchFileException: /Users/andrea/workspace/logstash_andsel/data/dead_letter_queue/dlq_upstream/9.log
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) ~[?:?]
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) ~[?:?]
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) ~[?:?]
	at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) ~[?:?]
	at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:148) ~[?:?]
	at java.nio.file.Files.readAttributes(Files.java:1851) ~[?:?]
	at java.nio.file.Files.getLastModifiedTime(Files.java:2402) ~[?:?]
	at org.logstash.common.io.DeadLetterQueueReader.compareByFileTimestamp(DeadLetterQueueReader.java:389) ~[logstash-core.jar:?]
	at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:220) ~[?:?]
	at org.logstash.common.io.DeadLetterQueueReader.lambda$removeSegmentsBefore$2(DeadLetterQueueReader.java:364) ~[logstash-core.jar:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178) ~[?:?]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[?:?]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.util.stream.LongPipeline.collect(LongPipeline.java:515) ~[?:?]
	at java.util.stream.LongPipeline.summaryStatistics(LongPipeline.java:492) ~[?:?]
	at org.logstash.common.io.DeadLetterQueueReader.removeSegmentsBefore(DeadLetterQueueReader.java:368) ~[logstash-core.jar:?]
	at org.logstash.common.io.DeadLetterQueueReader.markForDelete(DeadLetterQueueReader.java:287) ~[logstash-core.jar:?]
	at org.logstash.input.DeadLetterQueueInputPlugin.run(DeadLetterQueueInputPlugin.java:145) ~[logstash-input-dead_letter_queue-2.0.0.jar:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:315) ~[jruby.jar:?]
	at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:176) ~[jruby.jar:?]
	at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:42) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:82) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:220) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:82) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:220) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118) ~[jruby.jar:?]
	at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) ~[jruby.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) ~[jruby.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) ~[jruby.jar:?]
	at org.jruby.runtime.Block.call(Block.java:144) ~[jruby.jar:?]
	at org.jruby.RubyProc.call(RubyProc.java:354) ~[jruby.jar:?]
	at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111) ~[jruby.jar:?]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
[2024-06-07T11:34:12,432][WARN ][org.logstash.common.io.DeadLetterQueueReader][dlq_reader][054c2e0bd67fa71b6e51a5547c9848571a078328074d5d1a298b5e48d4866580] Problem occurred in cleaning the segment /Users/andrea/workspace/logstash_andsel/data/dead_letter_queue/dlq_upstream/9.log after a repositioning
java.nio.file.NoSuchFileException: /Users/andrea/workspace/logstash_andsel/data/dead_letter_queue/dlq_upstream/9.log
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) ~[?:?]
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) ~[?:?]
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) ~[?:?]
	at sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:181) ~[?:?]
	at java.nio.channels.FileChannel.open(FileChannel.java:298) ~[?:?]
	at java.nio.channels.FileChannel.open(FileChannel.java:357) ~[?:?]
	at org.logstash.common.io.DeadLetterQueueUtils.countEventsInSegment(DeadLetterQueueUtils.java:67) ~[logstash-core.jar:?]
	at org.logstash.common.io.DeadLetterQueueReader.deleteSegment(DeadLetterQueueReader.java:406) ~[logstash-core.jar:?]
	at org.logstash.common.io.DeadLetterQueueReader.markForDelete(DeadLetterQueueReader.java:294) ~[logstash-core.jar:?]
	at org.logstash.input.DeadLetterQueueInputPlugin.run(DeadLetterQueueInputPlugin.java:145) ~[logstash-input-dead_letter_queue-2.0.0.jar:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:315) ~[jruby.jar:?]
	at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:176) ~[jruby.jar:?]
	at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:42) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:82) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:220) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:82) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD(MixedModeIRMethod.java:201) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:188) ~[jruby.jar:?]
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:220) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:466) ~[jruby.jar:?]
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:244) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:314) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:66) ~[jruby.jar:?]
	at org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:118) ~[jruby.jar:?]
	at org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:136) ~[jruby.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:66) ~[jruby.jar:?]
	at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) ~[jruby.jar:?]
	at org.jruby.runtime.Block.call(Block.java:144) ~[jruby.jar:?]
	at org.jruby.RubyProc.call(RubyProc.java:354) ~[jruby.jar:?]
	at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:111) ~[jruby.jar:?]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]

…urrently between writer and reader. (#16204)

* Rework the logic to delete DLQ eldest segments to be more resilient on file not found errors and avoid to log warn messages that there isn't any action the user can do to solve.

* Fixed test case, when path point to a file that doesn't exist, rely always on path name comparator. Reworked the code to simplify, not needing anymore the tri-state variable

(cherry picked from commit 321e407)
Copy link

Quality Gate passed Quality Gate passed

Issues
0 New issues
0 Fixed issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarQube

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

@yaauie yaauie merged commit a42c71a into 8.14 Jun 20, 2024
5 checks passed
@yaauie yaauie deleted the backport_16204_8.14 branch June 20, 2024 18:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants