-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
237 lines (212 loc) · 7.72 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package main
import (
"context"
"fmt"
"regexp"
"time"
"github.com/XSAM/otelsql"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
)
const (
scrapeInterval = 5 * time.Second
)
var traceparentRegex = regexp.MustCompile(`traceparent='([^\s]+)'`)
var tracer oteltrace.Tracer
var db *sqlx.DB
func main() {
ctx := context.Background()
tp := trace.NewTracerProvider(
trace.WithBatcher(newGRPCExporter(ctx)),
trace.WithResource(newResource()),
)
defer tp.Shutdown(ctx)
otel.SetTracerProvider(tp)
tracer = tp.Tracer("mysql")
otel.SetTextMapPropagator(propagation.TraceContext{})
tracedDB, err := otelsql.Open("mysql", "root:@tcp(localhost:3306)/performance_schema?parseTime=true&loc=UTC",
otelsql.WithAttributes(
semconv.DBSystemMySQL,
),
// NB the SQL commenter injects trace propagation headers into SQL statements as comments
// this format is technically up in the air as per https://github.com/open-telemetry/opentelemetry-specification/issues/2279
otelsql.WithSQLCommenter(true),
)
defer func() {
err = tracedDB.Close()
if err != nil {
panic(err)
}
}()
if err != nil {
panic(err)
}
db = sqlx.NewDb(tracedDB, "mysql")
if err != nil {
panic(err)
}
defer db.Close()
enablePerfMonitoring(ctx)
ticker := time.NewTicker(scrapeInterval)
var cursor uint64
for range ticker.C {
fmt.Printf("scraping events ending after %d\n", cursor)
cursor = recordSpans(cursor)
}
}
func recordSpans(cursor uint64) (newCursor uint64) {
ctx := context.Background()
ctx, recordSpansSpan := tracer.Start(ctx, "record_spans")
defer recordSpansSpan.End()
recordSpansSpan.AddEvent("issuing query", oteltrace.WithAttributes(attribute.Int64("cursor", int64(cursor))))
timeOrigin, err := getTimeOrigin(ctx)
if err != nil {
panic(err)
}
// extract events from the performance schema
events, err := extractEvents(ctx, cursor)
if err != nil {
panic(err)
} else if len(events) == 0 {
// no events in previous period, shortcircuit
return
} else {
newCursor = uint64(events[0].StatementEvent.TimerEnd)
}
// transform events into tree structure
statementsById := map[uniqueEventId]StatementEvent{}
stagesById := map[uniqueEventId]StageEvent{}
waitsById := map[uniqueEventId]WaitEvent{}
transactionsByStatementId := map[uniqueEventId]TransactionEvent{}
stagesByStatementId := map[uniqueEventId][]StageEvent{}
waitsByStageId := map[uniqueEventId][]WaitEvent{}
for _, event := range events {
statementId := event.StatementEvent.uniqueId()
stageId := event.StageEvent.uniqueId()
waitId := event.WaitEvent.uniqueId()
if _, ok := statementsById[statementId]; event.StatementEvent.EventId.Valid && !ok {
statementsById[statementId] = event.StatementEvent
}
if _, ok := transactionsByStatementId[statementId]; event.TransactionEvent.EventId.Valid && !ok {
transactionsByStatementId[statementId] = event.TransactionEvent
}
if _, ok := stagesById[stageId]; event.StageEvent.EventId.Valid && !ok {
stagesById[stageId] = event.StageEvent
stagesByStatementId[statementId] = append(stagesByStatementId[statementId], event.StageEvent)
}
if _, ok := waitsById[waitId]; event.WaitEvent.EventId.Valid && !ok {
waitsById[waitId] = event.WaitEvent
waitsByStageId[stageId] = append(waitsByStageId[stageId], event.WaitEvent)
}
}
// load the events as spans
for statementId, statement := range statementsById {
if statement.SqlText.Valid {
traceparentMatches := traceparentRegex.FindStringSubmatch(statement.SqlText.String)
if len(traceparentMatches) == 2 {
propagator := propagation.TraceContext{}
carrier := propagation.MapCarrier{}
carrier.Set("traceparent", traceparentMatches[1])
ctx = propagator.Extract(ctx, carrier)
} else {
ctx = context.Background()
}
var txnSpan oteltrace.Span
if transaction, ok := transactionsByStatementId[statementId]; ok {
txnSpan = transaction.toSpan(ctx, timeOrigin)
}
statementSpan := statement.toSpan(ctx, timeOrigin, txnSpan)
stages := stagesByStatementId[statementId]
for _, stage := range stages {
stageSpan := stage.toSpan(ctx, timeOrigin, statementSpan)
waits := waitsByStageId[stage.uniqueId()]
for _, wait := range waits {
// todo link to the span holding the lock??
wait.toSpan(ctx, timeOrigin, stageSpan)
}
}
}
}
return newCursor
}
func getTimeOrigin(ctx context.Context) (time.Time, error) {
ctx, sp := tracer.Start(ctx, "get_time_origin")
defer sp.End()
timeOrigins := []TimeOrigin{}
err := db.SelectContext(
ctx,
&timeOrigins,
"SELECT "+
"FROM_UNIXTIME((((UNIX_TIMESTAMP(CURTIME()) * 1000000) + MICROSECOND(CURTIME(6))) - (TIMER_START+TIMER_WAIT)/1000000)/1000000) as STARTED "+
"FROM performance_schema.events_statements_current WHERE END_EVENT_ID IS NULL "+
"LIMIT 1",
)
if err != nil {
sp.RecordError(err)
return time.Time{}, err
} else if len(timeOrigins) != 1 {
err = fmt.Errorf("unexpected timeorigin %v", timeOrigins)
sp.RecordError(err)
return time.Time{}, err
}
timeOrigin := timeOrigins[0].Started
sp.AddEvent("got time origin", oteltrace.WithAttributes(attribute.String("timeOrigin", timeOrigin.String())))
return timeOrigin, nil
}
func extractEvents(ctx context.Context, cursor uint64) ([]Event, error) {
ctx, sp := tracer.Start(ctx, "extract_events")
defer sp.End()
events := []Event{}
err := db.SelectContext(
ctx,
&events,
fmt.Sprintf(
"SELECT %s,%s,%s,%s FROM events_statements_history_long se "+
"LEFT JOIN events_transactions_history_long te ON se.NESTING_EVENT_ID=te.EVENT_ID AND se.THREAD_ID=te.THREAD_ID "+
"LEFT JOIN events_stages_history_long stge ON stge.NESTING_EVENT_ID=se.EVENT_ID AND stge.THREAD_ID=se.THREAD_ID "+
"LEFT JOIN events_waits_history_long we ON we.NESTING_EVENT_ID=stge.EVENT_ID AND we.THREAD_ID=stge.THREAD_ID "+
fmt.Sprintf("WHERE se.TIMER_START > %d ", cursor)+
"ORDER BY se.TIMER_END DESC",
Columns(TransactionEvent{}),
Columns(StatementEvent{}),
Columns(StageEvent{}),
Columns(WaitEvent{}),
),
)
if err != nil {
sp.RecordError(err)
return nil, err
}
sp.AddEvent("scraped_events", oteltrace.WithAttributes(attribute.Int("num_events", len(events))))
return events, nil
}
// fixme we should probably make sure the user is cool with us doing this for them
func enablePerfMonitoring(ctx context.Context) {
execStatement := func(statement string) {
_, err := db.ExecContext(ctx, statement)
if err != nil {
panic(err)
}
}
// fixme raise performance_schema_max_sql_text_length somehow
// https://dev.mysql.com/doc/mysql-perfschema-excerpt/8.0/en/performance-schema-system-variables.html#sysvar_performance_schema_max_sql_text_length
statements := []string{
"UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME LIKE 'wait/%'",
"UPDATE performance_schema.setup_consumers SET ENABLED = 'YES' WHERE NAME LIKE 'events_waits%'",
"UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME LIKE 'stage/%'",
"UPDATE performance_schema.setup_consumers SET ENABLED = 'YES' WHERE NAME LIKE 'events_stages%'",
"UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME LIKE 'statement/%'",
"UPDATE performance_schema.setup_consumers SET ENABLED = 'YES' WHERE NAME LIKE '%statements%'",
"UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'transaction'",
"UPDATE performance_schema.setup_consumers SET ENABLED = 'YES' WHERE NAME LIKE 'events_transactions%'",
}
for _, statement := range statements {
execStatement(statement)
}
}