Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!(trace): simplify tracing and only write data in complete batches #1437

Draft
wants to merge 9 commits into
base: v0.34.x-celestia
Choose a base branch
from
4 changes: 2 additions & 2 deletions cmd/cometbft/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ func AddNodeFlags(cmd *cobra.Command) {

cmd.PersistentFlags().String(
trace.FlagTracePushConfig,
config.Instrumentation.TracePushConfig,
config.Instrumentation.TracePushConfig, //nolint:staticcheck
trace.FlagTracePushConfigDescription,
)

cmd.PersistentFlags().String(
trace.FlagTracePullAddress,
config.Instrumentation.TracePullAddress,
config.Instrumentation.TracePullAddress, //nolint:staticcheck
trace.FlagTracePullAddressDescription,
)

Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,18 +1194,19 @@ type InstrumentationConfig struct {
// Instrumentation namespace.
Namespace string `mapstructure:"namespace"`

// TracePushConfig is the relative path of the push config. This second
// Deprecated: TracePushConfig is the relative path of the push config. This second
// config contains credentials for where and how often to.
TracePushConfig string `mapstructure:"trace_push_config"`

// TracePullAddress is the address that the trace server will listen on for
// Deprecated: TracePullAddress is the address that the trace server will listen on for
// pulling data.
TracePullAddress string `mapstructure:"trace_pull_address"`

// TraceType is the type of tracer used. Options are "local" and "noop".
TraceType string `mapstructure:"trace_type"`

// TraceBufferSize is the number of traces to write in a single batch.
// TraceBufferSize is the size of the buffer in number of events that will
// be kept before dropping events of a single type.
TraceBufferSize int `mapstructure:"trace_push_batch_size"`

// TracingTables is the list of tables that will be traced. See the
Expand Down
7 changes: 6 additions & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,21 @@ namespace = "{{ .Instrumentation.Namespace }}"
# This second config contains credentials for where and how often to
# push trace data to. For example, if the config is next to this config,
# it would be "push_config.json".
#
# WARNING: deprecated
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"

# The tracer pull address specifies which address will be used for pull based
# event collection. If empty, the pull based server will not be started.
#
# WARNING: deprecated
trace_pull_address = "{{ .Instrumentation.TracePullAddress }}"

# The tracer to use for collecting trace data.
trace_type = "{{ .Instrumentation.TraceType }}"

# The size of the batches that are sent to the database.
# The number of events for each type that will buffered before writing.
# If this buffer is reached events will be dropped to avoid blocking.
trace_push_batch_size = {{ .Instrumentation.TraceBufferSize }}

# The list of tables that are updated when tracing. All available tables and
Expand Down
71 changes: 9 additions & 62 deletions pkg/trace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ To enable the local tracer, add the following to the config.toml file:
# The tracer to use for collecting trace data.
trace_type = "local"

# The size of the batches that are sent to the database.
# The size of the cache for each table. Data is constantly written to disk,
# but if this is hit data past this limit is ignored.
trace_push_batch_size = 1000

# The list of tables that are updated when tracing. All available tables and
Expand All @@ -32,71 +33,17 @@ if err != nil {
}
```

### Pull Based Event Collection
### Event Collection

Pull based event collection is where external servers connect to and pull trace
data from the consensus node.
Collect the events after the data collection is completed by simply transfering
the files however you see fit. For example, using the `scp` command:

To use this, change the config.toml to store traces in the
.celestia-app/data/traces directory.

```toml
# The tracer pull address specifies which address will be used for pull based
# event collection. If empty, the pull based server will not be started.
trace_pull_address = ":26661"
```

To retrieve a table remotely using the pull based server, call the following
function:

```go
err := GetTable("http://1.2.3.4:26661", "mempool_tx", "directory to store the file")
if err != nil {
return err
}
```

This stores the data locally in the specified directory.


### Push Based Event Collection

Push based event collection is where the consensus node pushes trace data to an
external server. At the moment, this is just an S3 bucket. To use this, two options are available:
#### Using push config file

Add the following to the config.toml file:

```toml
# TracePushConfig is the relative path of the push config.
# This second config contains credentials for where and how often to
# push trace data to. For example, if the config is next to this config,
# it would be "push_config.json".
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"
```

The push config file is a JSON file that should look like this:

```json
{
"bucket": "bucket-name",
"region": "region",
"access_key": "",
"secret_key": "",
"push_delay": 60 // number of seconds to wait between intervals of pushing all files
}
```bash
scp -r user@host:/path/to/.celestia-app/data/traces /path/to/local/directory
```

#### Using environment variables for s3 bucket

Alternatively, you can set the following environment variables:
or using aws s3 (after setting up the aws cli ofc):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
or using aws s3 (after setting up the aws cli ofc):
or using aws s3 (after [setting up the aws cli](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-quickstart.html)):


```bash
export TRACE_PUSH_BUCKET_NAME=bucket-name
export TRACE_PUSH_REGION=region
export TRACE_PUSH_ACCESS_KEY=access-key
export TRACE_PUSH_SECRET_KEY=secret-key
export TRACE_PUSH_DELAY=push-delay
aws s3 cp /path/to/.celestia-app/data/traces s3://<bucket-name>/<prefix> --recursive
```

`bucket_name` , `region`, `access_key`, `secret_key` and `push_delay` are the s3 bucket name, region, access key, secret key and the delay between pushes respectively.
101 changes: 0 additions & 101 deletions pkg/trace/buffered_file.go

This file was deleted.

114 changes: 114 additions & 0 deletions pkg/trace/cached_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package trace

import (
"encoding/json"
"fmt"
"os"
"sync"

"github.com/tendermint/tendermint/libs/log"
)

// cachedFile wraps the os.File with a channel based cache that ensures only
// complete data is written to the file. Data is serialized to JSON before being
// written. The cache is flushed when the chunk size is reached. WARNING: Errors
// are only logged and if the cache is filled writes are ignored!
type cachedFile struct {
wg *sync.WaitGroup
cache chan Event[Entry]
file *os.File
chunkSize int
logger log.Logger
}

// newcachedFile creates a cachedFile which wraps a normal file to ensure that
// only complete data is ever written. cacheSize is the number of events that
// will be cached and chunkSize is the number of events that will trigger a
// write. cacheSize needs to be sufficiently larger (10x to be safe) than chunkSize in order to
// avoid blocking.
func newCachedFile(file *os.File, logger log.Logger, cacheSize int, chunkSize int) *cachedFile {
cf := &cachedFile{
file: file,
cache: make(chan Event[Entry], cacheSize),
chunkSize: chunkSize,
logger: logger,
wg: &sync.WaitGroup{},
}
cf.wg.Add(1)
go cf.startFlushing()
return cf
}

// Cache caches the given bytes to be written to the file.
func (f *cachedFile) Cache(b Event[Entry]) {
select {
case f.cache <- b:
default:
f.logger.Error(fmt.Sprintf("tracing cache full, dropping event: %T", b))
}
}

// startFlushing reads from the cache, serializes the event, and writes to the
// file.
func (f *cachedFile) startFlushing() {
buffer := make([][]byte, 0, f.chunkSize)
defer f.wg.Done()

for {
b, ok := <-f.cache
if !ok {
// Channel closed, flush remaining data and exit
if len(buffer) > 0 {
_, err := f.flush(buffer)
if err != nil {
f.logger.Error("failure to flush remaining events", "error", err)
}
}
return
}

bz, err := json.Marshal(b)
if err != nil {
f.logger.Error("failed to marshal event", "err", err)
close(f.cache)
return
}

// format the file to jsonl
bz = append(bz, '\n')

buffer = append(buffer, bz)
if len(buffer) >= f.chunkSize {
_, err := f.flush(buffer)
if err != nil {
f.logger.Error("tracer failed to write buffered files to file", "error", err)
}
buffer = buffer[:0] // reset buffer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we're continuing to treat the trace data as expendable and instead optimizing for never blocking. If there's an error writing to the file, then we simply log it and throw away the data.

If this is occurring, we either see it in the logs or the data itself.

}
}
}

// flush writes the given bytes to the file.
func (f *cachedFile) flush(buffer [][]byte) (int, error) {
total := 0
for _, b := range buffer {
i, err := f.file.Write(b)
if err != nil {
return total, err
}
total += i
}
return total, nil
}

// Close closes the file.
func (f *cachedFile) Close() error {
// set reading to true to prevent writes while closing the file.
close(f.cache)
f.wg.Wait()
err := f.file.Sync()
if err != nil {
return err
}
return f.file.Close()
}
Loading
Loading