Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!(trace): simplify tracing and only write data in complete batches #1437

Draft
wants to merge 9 commits into
base: v0.34.x-celestia
Choose a base branch
from
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,18 +1194,19 @@ type InstrumentationConfig struct {
// Instrumentation namespace.
Namespace string `mapstructure:"namespace"`

// TracePushConfig is the relative path of the push config. This second
// Deprecated: TracePushConfig is the relative path of the push config. This second
// config contains credentials for where and how often to.
TracePushConfig string `mapstructure:"trace_push_config"`

// TracePullAddress is the address that the trace server will listen on for
// Deprecated: TracePullAddress is the address that the trace server will listen on for
// pulling data.
TracePullAddress string `mapstructure:"trace_pull_address"`

// TraceType is the type of tracer used. Options are "local" and "noop".
TraceType string `mapstructure:"trace_type"`

// TraceBufferSize is the number of traces to write in a single batch.
// TraceBufferSize is the size of the buffer in number of events that will
// be kept before dropping events of a single type.
TraceBufferSize int `mapstructure:"trace_push_batch_size"`

// TracingTables is the list of tables that will be traced. See the
Expand Down
7 changes: 6 additions & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,21 @@ namespace = "{{ .Instrumentation.Namespace }}"
# This second config contains credentials for where and how often to
# push trace data to. For example, if the config is next to this config,
# it would be "push_config.json".
#
# WARNING: deprecated
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"

# The tracer pull address specifies which address will be used for pull based
# event collection. If empty, the pull based server will not be started.
#
# WARNING: deprecated
trace_pull_address = "{{ .Instrumentation.TracePullAddress }}"

# The tracer to use for collecting trace data.
trace_type = "{{ .Instrumentation.TraceType }}"

# The size of the batches that are sent to the database.
# The number of events for each type that will buffered before writing.
# If this buffer is reached events will be dropped to avoid blocking.
trace_push_batch_size = {{ .Instrumentation.TraceBufferSize }}

# The list of tables that are updated when tracing. All available tables and
Expand Down
101 changes: 0 additions & 101 deletions pkg/trace/buffered_file.go

This file was deleted.

114 changes: 114 additions & 0 deletions pkg/trace/cached_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package trace

import (
"encoding/json"
"fmt"
"os"
"sync"

"github.com/tendermint/tendermint/libs/log"
)

// cachedFile wraps the os.File with a channel based cache that ensures only
// complete data is written to the file. Data is serialized to JSON before being
// written. The cache is flushed when the chunk size is reached. WARNING: Errors
// are only logged and if the cache is filled writes are ignored!
type cachedFile struct {
wg *sync.WaitGroup
cache chan Event[Entry]
file *os.File
chunkSize int
logger log.Logger
}

// newcachedFile creates a cachedFile which wraps a normal file to ensure that
// only complete data is ever written. cacheSize is the number of events that
// will be cached and chunkSize is the number of events that will trigger a
// write. cacheSize needs to be sufficiently larger (10x to be safe) than chunkSize in order to
// avoid blocking.
func newCachedFile(file *os.File, logger log.Logger, cacheSize int, chunkSize int) *cachedFile {
cf := &cachedFile{
file: file,
cache: make(chan Event[Entry], cacheSize),
chunkSize: chunkSize,
logger: logger,
wg: &sync.WaitGroup{},
}
cf.wg.Add(1)
go cf.startFlushing()
return cf
}

// Cache caches the given bytes to be written to the file.
func (f *cachedFile) Cache(b Event[Entry]) {
select {
case f.cache <- b:
default:
f.logger.Error(fmt.Sprintf("tracing cache full, dropping event: %T", b))
}
}

// startFlushing reads from the cache, serializes the event, and writes to the
// file.
func (f *cachedFile) startFlushing() {
buffer := make([][]byte, 0, f.chunkSize)
defer f.wg.Done()

for {
b, ok := <-f.cache
if !ok {
// Channel closed, flush remaining data and exit
if len(buffer) > 0 {
_, err := f.flush(buffer)
if err != nil {
f.logger.Error("failure to flush remaining events", "error", err)
}
}
return
}

bz, err := json.Marshal(b)
if err != nil {
f.logger.Error("failed to marshal event", "err", err)
close(f.cache)
return
}

// format the file to jsonl
bz = append(bz, '\n')

buffer = append(buffer, bz)
if len(buffer) >= f.chunkSize {
_, err := f.flush(buffer)
if err != nil {
f.logger.Error("tracer failed to write buffered files to file", "error", err)
}
buffer = buffer[:0] // reset buffer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we're continuing to treat the trace data as expendable and instead optimizing for never blocking. If there's an error writing to the file, then we simply log it and throw away the data.

If this is occurring, we either see it in the logs or the data itself.

}
}
}

// flush writes the given bytes to the file.
func (f *cachedFile) flush(buffer [][]byte) (int, error) {
total := 0
for _, b := range buffer {
i, err := f.file.Write(b)
if err != nil {
return total, err
}
total += i
}
return total, nil
}

// Close closes the file.
func (f *cachedFile) Close() error {
// set reading to true to prevent writes while closing the file.
close(f.cache)
f.wg.Wait()
err := f.file.Sync()
if err != nil {
return err
}
return f.file.Close()
}
82 changes: 82 additions & 0 deletions pkg/trace/cahced_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package trace

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
cmtlog "github.com/tendermint/tendermint/libs/log"
)

func Test_cachedFile(t *testing.T) {
type test struct {
name string
cacheSize int
events int
close bool
readEventsFunc func([]Event[TestEntry]) bool
}
tests := []test{
{"don't exceed write threshold", 10, 5, false, func(e []Event[TestEntry]) bool { return len(e) == 0 }},
{"close writes all cached events", 10, 5, true, func(e []Event[TestEntry]) bool { return len(e) == 5 }},
{"exceed write threshold", 10, 10, true, func(e []Event[TestEntry]) bool { return len(e) == 10 }},
{"doesn't block when buffer is full", 10, 100, true, func(e []Event[TestEntry]) bool {
return len(e) < 1000 && len(e) > 0
},
},
}

logger := cmtlog.TestingLogger()
for _, tt := range tests {
t.Run(
tt.name, func(t *testing.T) {
tmp := t.TempDir()
fdir := filepath.Join(tmp, "test.jsonl")
f, err := os.OpenFile(fdir, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0777)
require.NoError(t, err)

cf := newCachedFile(f, logger, tt.cacheSize, 10)

events := generateEvents(tt.events)

for _, event := range events {
cf.Cache(event)
}

if tt.close {
err = cf.Close()
require.NoError(t, err)
}

file, err := os.OpenFile(fdir, os.O_RDONLY, 0777)
require.NoError(t, err)

entries, err := DecodeFile[TestEntry](file)
require.NoError(t, err)

require.True(t, tt.readEventsFunc(entries))
},
)
}
}

var _ Entry = &TestEntry{}

type TestEntry struct {
table string
}

func (te *TestEntry) Table() string {
return te.table
}

func generateEvents(count int) []Event[Entry] {
events := make([]Event[Entry], 0, count)
for i := 0; i < count; i++ {
var entry Entry
entry = &TestEntry{"test"}
events = append(events, NewEvent("test", "test", "test", entry))
}
return events
}
4 changes: 2 additions & 2 deletions pkg/trace/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ func DecodeFile[T any](f *os.File) ([]Event[T], error) {
var out []Event[T]
r := bufio.NewReader(f)
for {
line, err := r.ReadString('\n')
line, err := r.ReadBytes('\n')
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

var e Event[T]
if err := json.Unmarshal([]byte(line), &e); err != nil {
if err := json.Unmarshal(line, &e); err != nil {
return nil, err
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/trace/doc.go

This file was deleted.

Loading
Loading