-
Notifications
You must be signed in to change notification settings - Fork 269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix!(trace): simplify tracing and only write data in complete batches #1437
base: v0.34.x-celestia
Are you sure you want to change the base?
Changes from 5 commits
ab118aa
3f7e302
99dd93e
2b66d3f
a862881
c8dd1d5
18ce0ea
9a476ab
1d9d9bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package trace | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
"sync" | ||
|
||
"github.com/tendermint/tendermint/libs/log" | ||
) | ||
|
||
// cachedFile wraps the os.File with a channel based cache that ensures only | ||
// complete data is written to the file. Data is serialized to JSON before being | ||
// written. The cache is flushed when the chunk size is reached. WARNING: Errors | ||
// are only logged and if the cache is filled writes are ignored! | ||
type cachedFile struct { | ||
wg *sync.WaitGroup | ||
cache chan Event[Entry] | ||
file *os.File | ||
chunkSize int | ||
logger log.Logger | ||
} | ||
|
||
// newcachedFile creates a cachedFile which wraps a normal file to ensure that | ||
// only complete data is ever written. cacheSize is the number of events that | ||
// will be cached and chunkSize is the number of events that will trigger a | ||
// write. cacheSize needs to be sufficiently larger (10x to be safe) than chunkSize in order to | ||
// avoid blocking. | ||
func newCachedFile(file *os.File, logger log.Logger, cacheSize int, chunkSize int) *cachedFile { | ||
cf := &cachedFile{ | ||
file: file, | ||
cache: make(chan Event[Entry], cacheSize), | ||
chunkSize: chunkSize, | ||
logger: logger, | ||
wg: &sync.WaitGroup{}, | ||
} | ||
cf.wg.Add(1) | ||
go cf.startFlushing() | ||
return cf | ||
} | ||
|
||
// Cache caches the given bytes to be written to the file. | ||
func (f *cachedFile) Cache(b Event[Entry]) { | ||
select { | ||
case f.cache <- b: | ||
default: | ||
f.logger.Error(fmt.Sprintf("tracing cache full, dropping event: %T", b)) | ||
} | ||
} | ||
|
||
// startFlushing reads from the cache, serializes the event, and writes to the | ||
// file. | ||
func (f *cachedFile) startFlushing() { | ||
buffer := make([][]byte, 0, f.chunkSize) | ||
defer f.wg.Done() | ||
|
||
for { | ||
b, ok := <-f.cache | ||
if !ok { | ||
// Channel closed, flush remaining data and exit | ||
if len(buffer) > 0 { | ||
_, err := f.flush(buffer) | ||
if err != nil { | ||
f.logger.Error("failure to flush remaining events", "error", err) | ||
} | ||
} | ||
return | ||
} | ||
|
||
bz, err := json.Marshal(b) | ||
if err != nil { | ||
f.logger.Error("failed to marshal event", "err", err) | ||
close(f.cache) | ||
return | ||
} | ||
|
||
// format the file to jsonl | ||
bz = append(bz, '\n') | ||
|
||
buffer = append(buffer, bz) | ||
if len(buffer) >= f.chunkSize { | ||
_, err := f.flush(buffer) | ||
if err != nil { | ||
f.logger.Error("tracer failed to write buffered files to file", "error", err) | ||
} | ||
buffer = buffer[:0] // reset buffer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we're continuing to treat the trace data as expendable and instead optimizing for never blocking. If there's an error writing to the file, then we simply log it and throw away the data. If this is occurring, we either see it in the logs or the data itself. |
||
} | ||
} | ||
} | ||
|
||
// flush writes the given bytes to the file. | ||
func (f *cachedFile) flush(buffer [][]byte) (int, error) { | ||
total := 0 | ||
for _, b := range buffer { | ||
i, err := f.file.Write(b) | ||
if err != nil { | ||
return total, err | ||
} | ||
total += i | ||
} | ||
return total, nil | ||
} | ||
|
||
// Close closes the file. | ||
func (f *cachedFile) Close() error { | ||
// set reading to true to prevent writes while closing the file. | ||
close(f.cache) | ||
f.wg.Wait() | ||
err := f.file.Sync() | ||
if err != nil { | ||
return err | ||
} | ||
return f.file.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.