From ad43c4ebf088689be6c70c512b01da8d62dc72c7 Mon Sep 17 00:00:00 2001 From: Beni Budiharto Date: Thu, 14 Apr 2022 10:43:40 +0700 Subject: [PATCH] feat: add cri format parser for istio & kube parser --- parser/istioparser/istioparser.go | 60 ++++++++++++++++++-------- parser/istioparser/istioparser_test.go | 25 +++++++++++ parser/kubeparser/kubeparser.go | 34 ++++++++++++--- parser/kubeparser/kubeparser_test.go | 22 ++++++++++ parser/parser.go | 4 ++ 5 files changed, 121 insertions(+), 24 deletions(-) diff --git a/parser/istioparser/istioparser.go b/parser/istioparser/istioparser.go index 66cf160..58edaa6 100644 --- a/parser/istioparser/istioparser.go +++ b/parser/istioparser/istioparser.go @@ -10,18 +10,40 @@ import ( ) // IstioParser parse a JSON string. -type IstioParser struct{} +type IstioParser struct { + fnGetKubeLog func(string) string +} // NewIstioParser returns a new json parser. func NewIstioParser() *IstioParser { return &IstioParser{} } +func NewIstioCRIParser() *IstioParser { + return &IstioParser{ + fnGetKubeLog: func(s string) string { + // timestamp stdout [FP] actualLog + return strings.SplitN(s, " ", 4)[3] + }, + } +} + +// kubernetes wrap the actual log line, need to unwrap it to get the actual +func (p *IstioParser) getKubeLogLine(line string) string { + if p.fnGetKubeLog == nil { + p.fnGetKubeLog = func(s string) string { + line := gjson.Get(s, "log").String() + return strings.TrimSpace(line) + } + } + + return p.fnGetKubeLog(line) +} + // ParseString implements the Parser interface. // The value in the map is not necessarily a string, so it needs to be converted. -func (j *IstioParser) ParseString(line string) (map[string]string, error) { - actualLogLine := gjson.Get(line, "log").String() - actualLogLine = strings.TrimSpace(actualLogLine) +func (p *IstioParser) ParseString(line string) (map[string]string, error) { + actualLogLine := p.getKubeLogLine(line) var parsed map[string]interface{} err := json.Unmarshal([]byte(actualLogLine), &parsed) @@ -38,37 +60,37 @@ func (j *IstioParser) ParseString(line string) (map[string]string, error) { } } - requestTime, err := strconv.ParseFloat(j.GetValue(fields, "duration", ""), 64) + requestTime, err := strconv.ParseFloat(p.GetValue(fields, "duration", ""), 64) if err != nil { - return nil, fmt.Errorf("Failed to parse request_time, value: %+v", j.GetValue(fields, "duration", "")) + return nil, fmt.Errorf("Failed to parse request_time, value: %+v", p.GetValue(fields, "duration", "")) } - upstreamRequestTime, err := strconv.ParseFloat(j.GetValue(fields, "upstream_service_time", ""), 64) + upstreamRequestTime, err := strconv.ParseFloat(p.GetValue(fields, "upstream_service_time", ""), 64) if err != nil { upstreamRequestTime = 0 } result := map[string]string{ - "body_bytes_sent": j.GetValue(fields, "bytes_sent", ""), + "body_bytes_sent": p.GetValue(fields, "bytes_sent", ""), "request": fmt.Sprintf( "%s %s %s", - j.GetValue(fields, "method", ""), - j.GetValue(fields, "path", ""), - j.GetValue(fields, "protocol", ""), + p.GetValue(fields, "method", ""), + p.GetValue(fields, "path", ""), + p.GetValue(fields, "protocol", ""), ), - "request_length": j.GetValue(fields, "bytes_received", ""), - "request_method": j.GetValue(fields, "method", ""), + "request_length": p.GetValue(fields, "bytes_received", ""), + "request_method": p.GetValue(fields, "method", ""), "request_time": strconv.FormatFloat(requestTime/1000, 'f', 3, 64), // divide by 1000, convert from ms to s "upstream_response_time": strconv.FormatFloat(upstreamRequestTime/1000, 'f', 3, 64), // divide by 1000, convert from ms to s - "status": j.GetValue(fields, "response_code", ""), - "time_local": j.GetValue(fields, "start_time", ""), - "upstream_cluster": j.GetUpstreamCluster(fields), - "authority": j.GetValue(fields, "authority", ""), + "status": p.GetValue(fields, "response_code", ""), + "time_local": p.GetValue(fields, "start_time", ""), + "upstream_cluster": p.GetUpstreamCluster(fields), + "authority": p.GetValue(fields, "authority", ""), } return result, nil } -func (j *IstioParser) GetUpstreamCluster(fields map[string]string) string { +func (p *IstioParser) GetUpstreamCluster(fields map[string]string) string { if v, ok := fields["upstream_cluster"]; ok { s := strings.Split(v, "|") if len(s) > 0 { @@ -81,7 +103,7 @@ func (j *IstioParser) GetUpstreamCluster(fields map[string]string) string { } } -func (j *IstioParser) GetValue(fields map[string]string, key, defaultValue string) string { +func (p *IstioParser) GetValue(fields map[string]string, key, defaultValue string) string { if v, ok := fields[key]; ok { return v } else { diff --git a/parser/istioparser/istioparser_test.go b/parser/istioparser/istioparser_test.go index 70fed25..9ed5f31 100644 --- a/parser/istioparser/istioparser_test.go +++ b/parser/istioparser/istioparser_test.go @@ -35,6 +35,31 @@ func TestIstioParse(t *testing.T) { } } +func TestIstioCriParse(t *testing.T) { + parser := NewIstioCRIParser() + + line := `2022-04-14T02:37:40.432312202Z stdout F {"bytes_sent":"37","upstream_cluster":"outbound|80||payment-service","downstream_remote_address":"127.0.0.1:40421","authority":"payment.example.com","path":"/v1/order/2145","protocol":"HTTP/1.1","upstream_service_time":"26","upstream_local_address":"127.0.0.1:53232","duration":"27","upstream_transport_failure_reason":"-","route_name":"-","downstream_local_address":"127.0.0.1:8443","user_agent":"my-user-agent","response_code":"203","response_flags":"-","start_time":"2021-02-03T11:22:33.033Z","method":"PUT","request_id":"046CD4F5-7E0F-4EB1-B0C7-7BF404B08F10","upstream_host":"127.0.0.1:80","x_forwarded_for":"127.0.0.1","requested_server_name":"payment.example.com","bytes_received":"123","istio_policy_status":"-"}` + + got, err := parser.ParseString(line) + require.NoError(t, err) + + want := map[string]string{ + "time_local": "2021-02-03T11:22:33.033Z", + "request_time": "0.027", + "request_length": "123", + "upstream_response_time": "0.026", + "status": "203", + "body_bytes_sent": "37", + "request": "PUT /v1/order/2145 HTTP/1.1", + "request_method": "PUT", + "upstream_cluster": "payment-service", + "authority": "payment.example.com", + } + if !reflect.DeepEqual(got, want) { + t.Errorf("IstioParser.Parse(), got:\n%v\nwant\n%v", got, want) + } +} + func BenchmarkParseIstio(b *testing.B) { parser := NewIstioParser() jsonLine := `{"bytes_sent":"37","upstream_cluster":"outbound|80||payment-service","downstream_remote_address":"127.0.0.1:40421","authority":"payment.example.com","path":"/v1/order/2145","protocol":"HTTP/1.1","upstream_service_time":"26","upstream_local_address":"127.0.0.1:53232","duration":"27","upstream_transport_failure_reason":"-","route_name":"-","downstream_local_address":"127.0.0.1:8443","user_agent":"my-user-agent","response_code":"203","response_flags":"-","start_time":"2021-02-03T11:22:33.033Z","method":"PUT","request_id":"046CD4F5-7E0F-4EB1-B0C7-7BF404B08F10","upstream_host":"127.0.0.1:80","x_forwarded_for":"127.0.0.1","requested_server_name":"payment.example.com","bytes_received":"123","istio_policy_status":"-"}` diff --git a/parser/kubeparser/kubeparser.go b/parser/kubeparser/kubeparser.go index 8b3922c..efd633d 100644 --- a/parser/kubeparser/kubeparser.go +++ b/parser/kubeparser/kubeparser.go @@ -10,7 +10,8 @@ import ( // KubeParser parses variables patterns using config.NamespaceConfig.Format. type KubeParser struct { - parser *gonx.Parser + parser *gonx.Parser + fnGetKubeLog func(string) string } // NewKubeParser returns a new text parser. @@ -20,12 +21,35 @@ func NewKubeParser(format string) *KubeParser { } } +func NewKubeCRIParser(format string) *KubeParser { + return &KubeParser{ + parser: gonx.NewParser(format), + fnGetKubeLog: func(s string) string { + // timestamp stdout [FP] actualLog + line := strings.SplitN(s, " ", 4)[3] + line = gjson.Get(line, "log").String() + return strings.TrimSpace(line) + }, + } +} + +// kubernetes wrap the actual log line, need to unwrap it to get the actual +func (p *KubeParser) getKubeLogLine(line string) string { + if p.fnGetKubeLog == nil { + p.fnGetKubeLog = func(s string) string { + line := gjson.Get(s, "log").String() + return strings.TrimSpace(line) + } + } + + return p.fnGetKubeLog(line) +} + // ParseString implements the Parser interface. -func (t *KubeParser) ParseString(line string) (map[string]string, error) { - actualLogLine := gjson.Get(line, "log").String() - actualLogLine = strings.TrimSpace(actualLogLine) +func (p *KubeParser) ParseString(line string) (map[string]string, error) { + actualLogLine := p.getKubeLogLine(line) - entry, err := t.parser.ParseString(actualLogLine) + entry, err := p.parser.ParseString(actualLogLine) if err != nil { return nil, fmt.Errorf("text log parsing err: %w", err) } diff --git a/parser/kubeparser/kubeparser_test.go b/parser/kubeparser/kubeparser_test.go index d6341ba..8c51f88 100644 --- a/parser/kubeparser/kubeparser_test.go +++ b/parser/kubeparser/kubeparser_test.go @@ -30,6 +30,28 @@ func TestKubeParse(t *testing.T) { } } +func TestKubeCRIParse(t *testing.T) { + parser := NewKubeCRIParser(`[$time_local] $request_method "$request" $request_length $body_bytes_sent $status $request_time $upstream_response_time`) + line := `2022-04-14T02:37:40.432312202Z stdout F {"log": "[03/Feb/2021:11:22:33 +0800] GET \"GET /order/2145 HTTP/1.1\" 123 518 200 0.544 0.543\n", "stream":"stdout","time":"2021-07-21T07:26:39.102952491Z"}` + + got, err := parser.ParseString(line) + require.NoError(t, err) + + want := map[string]string{ + "time_local": "03/Feb/2021:11:22:33 +0800", + "request_time": "0.544", + "request_length": "123", + "upstream_response_time": "0.543", + "status": "200", + "body_bytes_sent": "518", + "request": "GET /order/2145 HTTP/1.1", + "request_method": "GET", + } + if !reflect.DeepEqual(got, want) { + t.Errorf("KubeParser.Parse() = \n%v\n%v", got, want) + } +} + func BenchmarkParseKube(b *testing.B) { parser := NewKubeParser(`[$time_local] $request_method "$request" $request_length $body_bytes_sent $status $request_time $upstream_response_time`) line := `{"log": "[03/Feb/2021:11:22:33 +0800] GET \"GET /order/2145 HTTP/1.1\" 123 518 200 0.544 0.543", "stream":"stdout","time":"2021-07-21T07:26:39.102952491Z"}` diff --git a/parser/parser.go b/parser/parser.go index 35e2a9f..f0efa7b 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -25,8 +25,12 @@ func NewParser(nsCfg config.NamespaceConfig) Parser { return jsonparser.NewJsonParser() case "kube": return kubeparser.NewKubeParser(nsCfg.Format) + case "kube-cri": + return kubeparser.NewKubeCRIParser(nsCfg.Format) case "istio": return istioparser.NewIstioParser() + case "istio-cri": + return istioparser.NewIstioCRIParser() default: return textparser.NewTextParser(nsCfg.Format) }