Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

协程泄露 #1094

Open
showsmall opened this issue Nov 23, 2024 · 1 comment
Open

协程泄露 #1094

showsmall opened this issue Nov 23, 2024 · 1 comment

Comments

@showsmall
Copy link

Relevant config.toml

[global]
print_configs = false
hostname = ""
omit_hostname = false
interval = 15
providers = ["local"]
concurrency = -1
[global.labels]
[log]
file_name = "/Data/Categraf/categraf-v0.3.82-linux-amd64/run.log"
max_size = 100
max_age = 1
max_backups = 7
local_time = true
compress = false
[writer_opt]
batch = 1000
chan_size = 1000000
[[writers]]
url = "http://10.10.2.19:17000/prometheus/v1/write"
basic_auth_user = ""
basic_auth_pass = ""
timeout = 5000
dial_timeout = 2500
max_idle_conns_per_host = 100
[http]
enable = false
address = ":9100"
print_access = false
run_mode = "release"
ignore_hostname = false
agent_host_tag = ""
ignore_global_labels = false
[ibex]
enable = false
interval = "1000ms"
servers = ["127.0.0.1:20090"]
meta_dir = "./meta"
[heartbeat]
enable = true
url = "http://10.10.2.19:17000/v1/n9e/heartbeat"
interval = 10
basic_auth_user = ""
basic_auth_pass = ""
timeout = 5000
dial_timeout = 2500
max_idle_conns_per_host = 100
[prometheus]
enable = false
scrape_config_file = "/path/to/in_cluster_scrape.yaml"
log_level = "info"

Logs from categraf

[logs]
api_key = "ef4ahfbwzwwtlwfpbertgq1i6mq0ab1q"
enable = true
send_to = "xxx.xxx.xxx.xxx"
send_type = "http"
topic = "flashcatcloud"
use_compress = false
send_with_tls = false
batch_wait = 5
run_path = "/Data/Categraf/categraf-v0.3.82-linux-amd64/run"
open_files_limit = 500
scan_period = 10
frame_size = 9000
chan_size = 1000
pipeline=4
kafka_version="3.3.2"
batch_max_concurrence = 0
batch_max_size=100
batch_max_content_size=1000000
producer_timeout= 10
sasl_enable = false
sasl_user = "admin"
sasl_password = "admin"
sasl_mechanism= "PLAIN"
sasl_version=1
sasl_handshake = true
enable_collect_container=false
collect_container_all = false
#总有16个类似的采集规则
  [[logs.items]]
  type = "file"
  path = "/xxx/xxx/xxx/log/error/2024-11-*.log"
  source = "xxx-error"
  service = "xxx-error"
    [[logs.items.log_processing_rules]]
      type = "multi_line"
      name = "new_line_with_date"
      pattern="\\d{4}-\\d{2}-\\d{2}"

System info

v0.3.82

Docker

No response

Steps to reproduce

监控发现突然线程升高:
image
经过开启pprof 排查发现:高度怀疑是日志采集这个地方导致的。
(pprof) top
Showing nodes accounting for 1468, 99.86% of 1470 total
Dropped 49 nodes (cum <= 7)
Showing top 10 nodes out of 19
flat flat% sum% cum cum%
1468 99.86% 99.86% 1468 99.86% runtime.gopark
0 0% 99.86% 19 1.29% flashcat.cloud/categraf/agent.(*InputReader).startInput
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(*Decoder).run
0 0% 99.86% 281 19.12% flashcat.cloud/categraf/logs/decoder.(*MultiLineHandler).run
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(*SingleLineParser).run
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).forwardMessages
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).readForever
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).wait (inline)
0 0% 99.86% 7 0.48% internal/poll.(*FD).Read
0 0% 99.86% 8 0.54% internal/poll.(*pollDesc).wait
(pprof)

(pprof) list Tailer
Total: 1470
ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(*Tailer).forwardMessages in /root/go/src/sre/categraf/logs/input/file/tailer.go
0 283 (flat, cum) 19.25% of Total
. . 244:func (t *Tailer) forwardMessages() {
. . 245: defer func() {
. . 246: // the decoder has successfully been flushed
. . 247: atomic.StoreInt32(&t.shouldStop, 1)
. . 248: close(t.done)
. . 249: }()
. 283 250: for output := range t.decoder.OutputChan {
. . 251: offset := t.decodedOffset + int64(output.RawDataLen)
. . 252: identifier := t.Identifier()
. . 253: if !t.shouldTrackOffset() {
. . 254: offset = 0
. . 255: identifier = ""
ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(*Tailer).readForever in /root/go/src/sre/categraf/logs/input/file/tailer.go
0 283 (flat, cum) 19.25% of Total
. . 171:func (t *Tailer) readForever() {
. . 172: defer t.onStop()
. . 173: for {
. . 174: n, err := t.read()
. . 175: if err != nil {
. . 176: return
. . 177: }
. . 178: t.recordBytes(int64(n))
. . 179:
. . 180: select {
. . 181: case <-t.stop:
. . 182: if n != 0 && atomic.LoadInt32(&t.didFileRotate) == 1 {
. . 183: log.Println("W! Tailer stopped after rotation close timeout with remaining unread data")
. . 184: }
. . 185: // stop reading data from file
. . 186: return
. . 187: default:
. . 188: if n == 0 {
. . 189: // wait for new data to come
. 283 190: t.wait()
. . 191: }
. . 192: }
. . 193: }
. . 194:}
. . 195:
ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(*Tailer).wait in /root/go/src/sre/categraf/logs/input/file/tailer.go
0 283 (flat, cum) 19.25% of Total
. . 312:func (t *Tailer) wait() {
. 283 313: time.Sleep(t.sleepDuration)
. . 314:}
. . 315:
. . 316:func (t *Tailer) recordBytes(n int64) {
. . 317: t.bytesRead += n
. . 318: t.file.Source.BytesRead.Add(n)
(pprof)

(pprof) list Decoder
Total: 1470
ROUTINE ======================== flashcat.cloud/categraf/logs/decoder.(*Decoder).run in /root/go/src/sre/categraf/logs/decoder/decoder.go
0 283 (flat, cum) 19.25% of Total
. . 225:func (d *Decoder) run() {
. 283 226: for data := range d.InputChan {
. . 227: d.decodeIncomingData(data.content)
. . 228: }
. . 229: // finish to stop decoder
. . 230: d.lineParser.Stop()
. . 231:}
(pprof)

(pprof) [root@devsystem categraf]# go tool pprof --text ./categraf goroutine.pprof
File: categraf
Type: goroutine
Time: Nov 24, 2024 at 12:07am (CST)
Showing nodes accounting for 1468, 99.86% of 1470 total
Dropped 49 nodes (cum <= 7)
flat flat% sum% cum cum%
1468 99.86% 99.86% 1468 99.86% runtime.gopark
0 0% 99.86% 19 1.29% flashcat.cloud/categraf/agent.(*InputReader).startInput
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(*Decoder).run
0 0% 99.86% 281 19.12% flashcat.cloud/categraf/logs/decoder.(*MultiLineHandler).run
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(*SingleLineParser).run
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).forwardMessages
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).readForever
0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(*Tailer).wait (inline)
0 0% 99.86% 7 0.48% internal/poll.(*FD).Read
0 0% 99.86% 8 0.54% internal/poll.(*pollDesc).wait
0 0% 99.86% 8 0.54% internal/poll.(*pollDesc).waitRead (inline)
0 0% 99.86% 8 0.54% internal/poll.runtime_pollWait
0 0% 99.86% 7 0.48% net.(*conn).Read
0 0% 99.86% 7 0.48% net.(*netFD).Read
0 0% 99.86% 863 58.71% runtime.chanrecv
0 0% 99.86% 861 58.57% runtime.chanrecv2
0 0% 99.86% 8 0.54% runtime.netpollblock
0 0% 99.86% 311 21.16% runtime.selectgo
0 0% 99.86% 286 19.46% time.Sleep

Expected behavior

协程泄露

Actual behavior

协程泄露

Additional info

No response

@showsmall
Copy link
Author

蓝色线是因为我重启后线程降下来的,每次重启后,运行几个小时,或者一天以上或者几天,就会出现这种情况,
image
image
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant