Skip to content

Commit

Permalink
feat: add cri format parser for istio & kube parser
Browse files Browse the repository at this point in the history
  • Loading branch information
bentol committed Apr 14, 2022
1 parent 7ed9120 commit ad43c4e
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 24 deletions.
60 changes: 41 additions & 19 deletions parser/istioparser/istioparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,40 @@ import (
)

// IstioParser parse a JSON string.
type IstioParser struct{}
type IstioParser struct {
fnGetKubeLog func(string) string
}

// NewIstioParser returns a new json parser.
func NewIstioParser() *IstioParser {
return &IstioParser{}
}

func NewIstioCRIParser() *IstioParser {
return &IstioParser{
fnGetKubeLog: func(s string) string {
// timestamp stdout [FP] actualLog
return strings.SplitN(s, " ", 4)[3]
},
}
}

// kubernetes wrap the actual log line, need to unwrap it to get the actual
func (p *IstioParser) getKubeLogLine(line string) string {
if p.fnGetKubeLog == nil {
p.fnGetKubeLog = func(s string) string {
line := gjson.Get(s, "log").String()
return strings.TrimSpace(line)
}
}

return p.fnGetKubeLog(line)
}

// ParseString implements the Parser interface.
// The value in the map is not necessarily a string, so it needs to be converted.
func (j *IstioParser) ParseString(line string) (map[string]string, error) {
actualLogLine := gjson.Get(line, "log").String()
actualLogLine = strings.TrimSpace(actualLogLine)
func (p *IstioParser) ParseString(line string) (map[string]string, error) {
actualLogLine := p.getKubeLogLine(line)

var parsed map[string]interface{}
err := json.Unmarshal([]byte(actualLogLine), &parsed)
Expand All @@ -38,37 +60,37 @@ func (j *IstioParser) ParseString(line string) (map[string]string, error) {
}
}

requestTime, err := strconv.ParseFloat(j.GetValue(fields, "duration", ""), 64)
requestTime, err := strconv.ParseFloat(p.GetValue(fields, "duration", ""), 64)
if err != nil {
return nil, fmt.Errorf("Failed to parse request_time, value: %+v", j.GetValue(fields, "duration", ""))
return nil, fmt.Errorf("Failed to parse request_time, value: %+v", p.GetValue(fields, "duration", ""))
}

upstreamRequestTime, err := strconv.ParseFloat(j.GetValue(fields, "upstream_service_time", ""), 64)
upstreamRequestTime, err := strconv.ParseFloat(p.GetValue(fields, "upstream_service_time", ""), 64)
if err != nil {
upstreamRequestTime = 0
}

result := map[string]string{
"body_bytes_sent": j.GetValue(fields, "bytes_sent", ""),
"body_bytes_sent": p.GetValue(fields, "bytes_sent", ""),
"request": fmt.Sprintf(
"%s %s %s",
j.GetValue(fields, "method", ""),
j.GetValue(fields, "path", ""),
j.GetValue(fields, "protocol", ""),
p.GetValue(fields, "method", ""),
p.GetValue(fields, "path", ""),
p.GetValue(fields, "protocol", ""),
),
"request_length": j.GetValue(fields, "bytes_received", ""),
"request_method": j.GetValue(fields, "method", ""),
"request_length": p.GetValue(fields, "bytes_received", ""),
"request_method": p.GetValue(fields, "method", ""),
"request_time": strconv.FormatFloat(requestTime/1000, 'f', 3, 64), // divide by 1000, convert from ms to s
"upstream_response_time": strconv.FormatFloat(upstreamRequestTime/1000, 'f', 3, 64), // divide by 1000, convert from ms to s
"status": j.GetValue(fields, "response_code", ""),
"time_local": j.GetValue(fields, "start_time", ""),
"upstream_cluster": j.GetUpstreamCluster(fields),
"authority": j.GetValue(fields, "authority", ""),
"status": p.GetValue(fields, "response_code", ""),
"time_local": p.GetValue(fields, "start_time", ""),
"upstream_cluster": p.GetUpstreamCluster(fields),
"authority": p.GetValue(fields, "authority", ""),
}
return result, nil
}

func (j *IstioParser) GetUpstreamCluster(fields map[string]string) string {
func (p *IstioParser) GetUpstreamCluster(fields map[string]string) string {
if v, ok := fields["upstream_cluster"]; ok {
s := strings.Split(v, "|")
if len(s) > 0 {
Expand All @@ -81,7 +103,7 @@ func (j *IstioParser) GetUpstreamCluster(fields map[string]string) string {
}
}

func (j *IstioParser) GetValue(fields map[string]string, key, defaultValue string) string {
func (p *IstioParser) GetValue(fields map[string]string, key, defaultValue string) string {
if v, ok := fields[key]; ok {
return v
} else {
Expand Down
25 changes: 25 additions & 0 deletions parser/istioparser/istioparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ func TestIstioParse(t *testing.T) {
}
}

func TestIstioCriParse(t *testing.T) {
parser := NewIstioCRIParser()

line := `2022-04-14T02:37:40.432312202Z stdout F {"bytes_sent":"37","upstream_cluster":"outbound|80||payment-service","downstream_remote_address":"127.0.0.1:40421","authority":"payment.example.com","path":"/v1/order/2145","protocol":"HTTP/1.1","upstream_service_time":"26","upstream_local_address":"127.0.0.1:53232","duration":"27","upstream_transport_failure_reason":"-","route_name":"-","downstream_local_address":"127.0.0.1:8443","user_agent":"my-user-agent","response_code":"203","response_flags":"-","start_time":"2021-02-03T11:22:33.033Z","method":"PUT","request_id":"046CD4F5-7E0F-4EB1-B0C7-7BF404B08F10","upstream_host":"127.0.0.1:80","x_forwarded_for":"127.0.0.1","requested_server_name":"payment.example.com","bytes_received":"123","istio_policy_status":"-"}`

got, err := parser.ParseString(line)
require.NoError(t, err)

want := map[string]string{
"time_local": "2021-02-03T11:22:33.033Z",
"request_time": "0.027",
"request_length": "123",
"upstream_response_time": "0.026",
"status": "203",
"body_bytes_sent": "37",
"request": "PUT /v1/order/2145 HTTP/1.1",
"request_method": "PUT",
"upstream_cluster": "payment-service",
"authority": "payment.example.com",
}
if !reflect.DeepEqual(got, want) {
t.Errorf("IstioParser.Parse(), got:\n%v\nwant\n%v", got, want)
}
}

func BenchmarkParseIstio(b *testing.B) {
parser := NewIstioParser()
jsonLine := `{"bytes_sent":"37","upstream_cluster":"outbound|80||payment-service","downstream_remote_address":"127.0.0.1:40421","authority":"payment.example.com","path":"/v1/order/2145","protocol":"HTTP/1.1","upstream_service_time":"26","upstream_local_address":"127.0.0.1:53232","duration":"27","upstream_transport_failure_reason":"-","route_name":"-","downstream_local_address":"127.0.0.1:8443","user_agent":"my-user-agent","response_code":"203","response_flags":"-","start_time":"2021-02-03T11:22:33.033Z","method":"PUT","request_id":"046CD4F5-7E0F-4EB1-B0C7-7BF404B08F10","upstream_host":"127.0.0.1:80","x_forwarded_for":"127.0.0.1","requested_server_name":"payment.example.com","bytes_received":"123","istio_policy_status":"-"}`
Expand Down
34 changes: 29 additions & 5 deletions parser/kubeparser/kubeparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

// KubeParser parses variables patterns using config.NamespaceConfig.Format.
type KubeParser struct {
parser *gonx.Parser
parser *gonx.Parser
fnGetKubeLog func(string) string
}

// NewKubeParser returns a new text parser.
Expand All @@ -20,12 +21,35 @@ func NewKubeParser(format string) *KubeParser {
}
}

func NewKubeCRIParser(format string) *KubeParser {
return &KubeParser{
parser: gonx.NewParser(format),
fnGetKubeLog: func(s string) string {
// timestamp stdout [FP] actualLog
line := strings.SplitN(s, " ", 4)[3]
line = gjson.Get(line, "log").String()
return strings.TrimSpace(line)
},
}
}

// kubernetes wrap the actual log line, need to unwrap it to get the actual
func (p *KubeParser) getKubeLogLine(line string) string {
if p.fnGetKubeLog == nil {
p.fnGetKubeLog = func(s string) string {
line := gjson.Get(s, "log").String()
return strings.TrimSpace(line)
}
}

return p.fnGetKubeLog(line)
}

// ParseString implements the Parser interface.
func (t *KubeParser) ParseString(line string) (map[string]string, error) {
actualLogLine := gjson.Get(line, "log").String()
actualLogLine = strings.TrimSpace(actualLogLine)
func (p *KubeParser) ParseString(line string) (map[string]string, error) {
actualLogLine := p.getKubeLogLine(line)

entry, err := t.parser.ParseString(actualLogLine)
entry, err := p.parser.ParseString(actualLogLine)
if err != nil {
return nil, fmt.Errorf("text log parsing err: %w", err)
}
Expand Down
22 changes: 22 additions & 0 deletions parser/kubeparser/kubeparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ func TestKubeParse(t *testing.T) {
}
}

func TestKubeCRIParse(t *testing.T) {
parser := NewKubeCRIParser(`[$time_local] $request_method "$request" $request_length $body_bytes_sent $status $request_time $upstream_response_time`)
line := `2022-04-14T02:37:40.432312202Z stdout F {"log": "[03/Feb/2021:11:22:33 +0800] GET \"GET /order/2145 HTTP/1.1\" 123 518 200 0.544 0.543\n", "stream":"stdout","time":"2021-07-21T07:26:39.102952491Z"}`

got, err := parser.ParseString(line)
require.NoError(t, err)

want := map[string]string{
"time_local": "03/Feb/2021:11:22:33 +0800",
"request_time": "0.544",
"request_length": "123",
"upstream_response_time": "0.543",
"status": "200",
"body_bytes_sent": "518",
"request": "GET /order/2145 HTTP/1.1",
"request_method": "GET",
}
if !reflect.DeepEqual(got, want) {
t.Errorf("KubeParser.Parse() = \n%v\n%v", got, want)
}
}

func BenchmarkParseKube(b *testing.B) {
parser := NewKubeParser(`[$time_local] $request_method "$request" $request_length $body_bytes_sent $status $request_time $upstream_response_time`)
line := `{"log": "[03/Feb/2021:11:22:33 +0800] GET \"GET /order/2145 HTTP/1.1\" 123 518 200 0.544 0.543", "stream":"stdout","time":"2021-07-21T07:26:39.102952491Z"}`
Expand Down
4 changes: 4 additions & 0 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ func NewParser(nsCfg config.NamespaceConfig) Parser {
return jsonparser.NewJsonParser()
case "kube":
return kubeparser.NewKubeParser(nsCfg.Format)
case "kube-cri":
return kubeparser.NewKubeCRIParser(nsCfg.Format)
case "istio":
return istioparser.NewIstioParser()
case "istio-cri":
return istioparser.NewIstioCRIParser()
default:
return textparser.NewTextParser(nsCfg.Format)
}
Expand Down

0 comments on commit ad43c4e

Please sign in to comment.