Skip to content

Commit

Permalink
refact: make custom config also defined within env blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
Colstuwjx committed Dec 13, 2019
1 parent af6da63 commit 1e1938d
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 92 deletions.
3 changes: 0 additions & 3 deletions assets/filebeat/filebeat.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
json.keys_under_root: true
{{end}}
fields:
{{range $key, $value := .CustomFields}}
{{ $key }}: {{ $value }}
{{end}}
{{range $key, $value := .Tags}}
{{ $key }}: {{ $value }}
{{end}}
Expand Down
3 changes: 2 additions & 1 deletion docs/filebeat/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ There are many labels you can use to describe the log info.
- `aliyun.logs.$name=$path`
- Name is an identify, can be any string you want. The valid characters in name are `0-9a-zA-Z_-`
- Path is the log file path, can contains wildcard. `stdout` is a special value which means stdout of the container.
- `aliyun.logs.$name.tags="k1=v1,k2=v2"`: tags will be appended to log.
- `aliyun.logs.$name.tags="k1=v1,k2=v2"`: tags will be appended to log.
- `aliyun.logs.$name.configs="k1=v1,k2=v2"`: customized configs for filebeat, it will be part of the filebeat configs which corresponding the container.
- `aliyun.logs.$name.target=target-for-log-storage`: target is used by the output plugins, instruct the plugins to store
logs in appropriate place. For elasticsearch output, target means the log index in elasticsearch. For aliyun_sls output,
target means the logstore in aliyun sls. The default value of target is the log name.
7 changes: 4 additions & 3 deletions pilot/filebeat_piloter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package pilot
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
)

// Global variables for FilebeatPiloter
Expand Down
106 changes: 45 additions & 61 deletions pilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ const (
ENV_PILOT_CREATE_SYMLINK = "PILOT_CREATE_SYMLINK"
ENV_LOGGING_OUTPUT = "LOGGING_OUTPUT"

ENV_SERVICE_LOGS_TEMPL = "%s_logs_"
ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL = "%s_logs_custom_config"
LABEL_SERVICE_LOGS_TEMPL = "%s.logs."
LABEL_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL = "%s.logs.custom.config"
LABEL_PROJECT_SWARM_MODE = "com.docker.stack.namespace"
LABEL_PROJECT = "com.docker.compose.project"
LABEL_SERVICE = "com.docker.compose.service"
LABEL_SERVICE_SWARM_MODE = "com.docker.swarm.service.name"
LABEL_K8S_POD_NAMESPACE = "io.kubernetes.pod.namespace"
LABEL_K8S_CONTAINER_NAME = "io.kubernetes.container.name"
LABEL_POD = "io.kubernetes.pod.name"
SYMLINK_LOGS_BASE = "/acs/log/"
ENV_SERVICE_LOGS_TEMPL = "%s_logs_"
LABEL_SERVICE_LOGS_TEMPL = "%s.logs."
LABEL_PROJECT_SWARM_MODE = "com.docker.stack.namespace"
LABEL_PROJECT = "com.docker.compose.project"
LABEL_SERVICE = "com.docker.compose.service"
LABEL_SERVICE_SWARM_MODE = "com.docker.swarm.service.name"
LABEL_K8S_POD_NAMESPACE = "io.kubernetes.pod.namespace"
LABEL_K8S_CONTAINER_NAME = "io.kubernetes.container.name"
LABEL_POD = "io.kubernetes.pod.name"
SYMLINK_LOGS_BASE = "/acs/log/"

ERR_ALREADY_STARTED = "already started"
)
Expand Down Expand Up @@ -183,7 +181,6 @@ type LogConfig struct {
EstimateTime bool
Stdout bool

CustomFields map[string]string
CustomConfigs map[string]string
}

Expand Down Expand Up @@ -357,13 +354,6 @@ func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {

for _, e := range env {
for _, prefix := range p.logPrefix {
customConfig := fmt.Sprintf(ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL, prefix)
if strings.HasPrefix(e, customConfig) {
labels[customConfig] = e[len(customConfig)+1:]
log.Infof("Get customConfig key = %s, value = %s", customConfig, labels[customConfig])
continue
}

serviceLogs := fmt.Sprintf(ENV_SERVICE_LOGS_TEMPL, prefix)
if !strings.HasPrefix(e, serviceLogs) {
continue
Expand Down Expand Up @@ -496,13 +486,13 @@ func (p *Pilot) hostDirOf(path string, mounts map[string]types.MountPoint) strin
return ""
}

func (p *Pilot) parseTags(tags string) (map[string]string, error) {
tagMap := make(map[string]string)
if tags == "" {
return tagMap, nil
func (p *Pilot) parseBlocks(blocks string) (map[string]string, error) {
blockMap := make(map[string]string)
if blocks == "" {
return blockMap, nil
}

kvArray := strings.Split(tags, ",")
kvArray := strings.Split(blocks, ",")
for _, kv := range kvArray {
arr := strings.Split(kv, "=")
if len(arr) != 2 {
Expand All @@ -513,9 +503,9 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) {
if key == "" || value == "" {
return nil, fmt.Errorf("%s is not a valid k=v format", kv)
}
tagMap[key] = value
blockMap[key] = value
}
return tagMap, nil
return blockMap, nil
}

func (p *Pilot) tryCheckKafkaTopic(topic string) error {
Expand Down Expand Up @@ -551,11 +541,19 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

tags := info.get("tags")
tagMap, err := p.parseTags(tags)
tagMap, err := p.parseBlocks(tags)
if err != nil {
return nil, fmt.Errorf("parse tags for %s error: %v", name, err)
}

customConfigs := info.get("configs")
customConfigMap, err := p.parseBlocks(customConfigs)
if err != nil {
return nil, fmt.Errorf("parse custom configs for %s error: %v", name, err)
}

log.Infof("got custom configs %v", customConfigMap)

target := info.get("target")
// add default index or topic
if _, ok := tagMap["index"]; !ok {
Expand Down Expand Up @@ -602,15 +600,16 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

return &LogConfig{
Name: name,
HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
File: logFile,
Format: format.value,
Tags: tagMap,
FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
Target: target,
EstimateTime: false,
Stdout: true,
Name: name,
HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
File: logFile,
Format: format.value,
Tags: tagMap,
CustomConfigs: customConfigMap,
FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
Target: target,
EstimateTime: false,
Stdout: true,
}, nil
}

Expand All @@ -630,14 +629,15 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

cfg := &LogConfig{
Name: name,
ContainerDir: containerDir,
Format: format.value,
File: file,
Tags: tagMap,
HostDir: filepath.Join(p.baseDir, hostDir),
FormatConfig: formatConfig,
Target: target,
Name: name,
ContainerDir: containerDir,
Format: format.value,
File: file,
Tags: tagMap,
CustomConfigs: customConfigMap,
HostDir: filepath.Join(p.baseDir, hostDir),
FormatConfig: formatConfig,
Target: target,
}

if formatConfig["time_key"] == "" {
Expand Down Expand Up @@ -699,25 +699,10 @@ func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, lab
labelNames = append(labelNames, k)
}

customConfigs := make(map[string]string)

sort.Strings(labelNames)
root := newLogInfoNode("")
for _, k := range labelNames {
for _, prefix := range p.logPrefix {
customConfig := fmt.Sprintf(ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL, prefix)
if customConfig == k {
configs := strings.Split(labels[k], "\n")
for _, c := range configs {
if c == "" {
continue
}
customLabel := strings.SplitN(c, "=", 2)
customConfigs[customLabel[0]] = customLabel[1]
}
continue
}

serviceLogs := fmt.Sprintf(LABEL_SERVICE_LOGS_TEMPL, prefix)
if !strings.HasPrefix(k, serviceLogs) || strings.Count(k, ".") == 1 {
continue
Expand All @@ -735,7 +720,6 @@ func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, lab
if err != nil {
return nil, err
}
CustomConfig(name, customConfigs, logConfig)
ret = append(ret, logConfig)
}
return ret, nil
Expand Down
8 changes: 6 additions & 2 deletions pilot/pilot_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package pilot

import (
"os"
"testing"

log "github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"gopkg.in/check.v1"
"os"
"testing"
)

func Test(t *testing.T) {
Expand All @@ -32,6 +33,7 @@ func (p *PilotSuite) TestGetLogConfigs(c *check.C) {
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "json",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.configs": "multiline.pattern='^\\[',multiline.negate=true,multiline.match=after",
"aliyun.logs.hello.format.time_format": "%Y-%m-%d",
}

Expand All @@ -52,13 +54,15 @@ func (p *PilotSuite) TestGetLogConfigs(c *check.C) {
c.Assert(configs[0].ContainerDir, check.Equals, "/var/log")
c.Assert(configs[0].File, check.Equals, "hello.log")
c.Assert(configs[0].Tags, check.HasLen, 4)
c.Assert(configs[0].CustomConfigs, check.HasLen, 3)
c.Assert(configs[0].FormatConfig, check.HasLen, 2)

//Test regex format
labels = map[string]string{
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "regexp",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.configs": "multiline.pattern='^\\[',multiline.negate=true,multiline.match=after",
"aliyun.logs.hello.format.pattern": "(?=name:hello).*",
}
configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels)
Expand Down
22 changes: 0 additions & 22 deletions pilot/piloter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pilot
import (
"fmt"
"os"
"strings"
)

// Global variables for piloter
Expand Down Expand Up @@ -39,24 +38,3 @@ func NewPiloter(baseDir string) (Piloter, error) {
}
return nil, fmt.Errorf("InvalidPilotType")
}

// CustomConfig custom config
func CustomConfig(name string, customConfigs map[string]string, logConfig *LogConfig) {
if os.Getenv(ENV_PILOT_TYPE) == PILOT_FILEBEAT {
fields := make(map[string]string)
configs := make(map[string]string)
for k, v := range customConfigs {
if strings.HasPrefix(k, name) {
key := strings.TrimPrefix(k, name+".")
if strings.HasPrefix(key, "fields") {
key2 := strings.TrimPrefix(key, "fields.")
fields[key2] = v
} else {
configs[key] = v
}
}
}
logConfig.CustomFields = fields
logConfig.CustomConfigs = configs
}
}

0 comments on commit 1e1938d

Please sign in to comment.