Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: make custom config also defined within env blocks. #264

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions assets/filebeat/filebeat.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
json.keys_under_root: true
{{end}}
fields:
{{range $key, $value := .CustomFields}}
{{ $key }}: {{ $value }}
{{end}}
{{range $key, $value := .Tags}}
{{ $key }}: {{ $value }}
{{end}}
Expand Down
3 changes: 2 additions & 1 deletion docs/filebeat/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ There are many labels you can use to describe the log info.
- `aliyun.logs.$name=$path`
- Name is an identify, can be any string you want. The valid characters in name are `0-9a-zA-Z_-`
- Path is the log file path, can contains wildcard. `stdout` is a special value which means stdout of the container.
- `aliyun.logs.$name.tags="k1=v1,k2=v2"`: tags will be appended to log.
- `aliyun.logs.$name.tags="k1=v1,k2=v2"`: tags will be appended to log.
- `aliyun.logs.$name.configs="k1=v1,k2=v2"`: customized configs for filebeat, it will be part of the filebeat configs which corresponding the container.
- `aliyun.logs.$name.target=target-for-log-storage`: target is used by the output plugins, instruct the plugins to store
logs in appropriate place. For elasticsearch output, target means the log index in elasticsearch. For aliyun_sls output,
target means the logstore in aliyun sls. The default value of target is the log name.
7 changes: 4 additions & 3 deletions pilot/filebeat_piloter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package pilot
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
)

// Global variables for FilebeatPiloter
Expand Down
106 changes: 45 additions & 61 deletions pilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ const (
ENV_PILOT_CREATE_SYMLINK = "PILOT_CREATE_SYMLINK"
ENV_LOGGING_OUTPUT = "LOGGING_OUTPUT"

ENV_SERVICE_LOGS_TEMPL = "%s_logs_"
ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL = "%s_logs_custom_config"
LABEL_SERVICE_LOGS_TEMPL = "%s.logs."
LABEL_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL = "%s.logs.custom.config"
LABEL_PROJECT_SWARM_MODE = "com.docker.stack.namespace"
LABEL_PROJECT = "com.docker.compose.project"
LABEL_SERVICE = "com.docker.compose.service"
LABEL_SERVICE_SWARM_MODE = "com.docker.swarm.service.name"
LABEL_K8S_POD_NAMESPACE = "io.kubernetes.pod.namespace"
LABEL_K8S_CONTAINER_NAME = "io.kubernetes.container.name"
LABEL_POD = "io.kubernetes.pod.name"
SYMLINK_LOGS_BASE = "/acs/log/"
ENV_SERVICE_LOGS_TEMPL = "%s_logs_"
LABEL_SERVICE_LOGS_TEMPL = "%s.logs."
LABEL_PROJECT_SWARM_MODE = "com.docker.stack.namespace"
LABEL_PROJECT = "com.docker.compose.project"
LABEL_SERVICE = "com.docker.compose.service"
LABEL_SERVICE_SWARM_MODE = "com.docker.swarm.service.name"
LABEL_K8S_POD_NAMESPACE = "io.kubernetes.pod.namespace"
LABEL_K8S_CONTAINER_NAME = "io.kubernetes.container.name"
LABEL_POD = "io.kubernetes.pod.name"
SYMLINK_LOGS_BASE = "/acs/log/"

ERR_ALREADY_STARTED = "already started"
)
Expand Down Expand Up @@ -183,7 +181,6 @@ type LogConfig struct {
EstimateTime bool
Stdout bool

CustomFields map[string]string
CustomConfigs map[string]string
}

Expand Down Expand Up @@ -357,13 +354,6 @@ func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {

for _, e := range env {
for _, prefix := range p.logPrefix {
customConfig := fmt.Sprintf(ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL, prefix)
if strings.HasPrefix(e, customConfig) {
labels[customConfig] = e[len(customConfig)+1:]
log.Infof("Get customConfig key = %s, value = %s", customConfig, labels[customConfig])
continue
}

serviceLogs := fmt.Sprintf(ENV_SERVICE_LOGS_TEMPL, prefix)
if !strings.HasPrefix(e, serviceLogs) {
continue
Expand Down Expand Up @@ -496,13 +486,13 @@ func (p *Pilot) hostDirOf(path string, mounts map[string]types.MountPoint) strin
return ""
}

func (p *Pilot) parseTags(tags string) (map[string]string, error) {
tagMap := make(map[string]string)
if tags == "" {
return tagMap, nil
func (p *Pilot) parseBlocks(blocks string) (map[string]string, error) {
blockMap := make(map[string]string)
if blocks == "" {
return blockMap, nil
}

kvArray := strings.Split(tags, ",")
kvArray := strings.Split(blocks, ",")
for _, kv := range kvArray {
arr := strings.Split(kv, "=")
if len(arr) != 2 {
Expand All @@ -513,9 +503,9 @@ func (p *Pilot) parseTags(tags string) (map[string]string, error) {
if key == "" || value == "" {
return nil, fmt.Errorf("%s is not a valid k=v format", kv)
}
tagMap[key] = value
blockMap[key] = value
}
return tagMap, nil
return blockMap, nil
}

func (p *Pilot) tryCheckKafkaTopic(topic string) error {
Expand Down Expand Up @@ -551,11 +541,19 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

tags := info.get("tags")
tagMap, err := p.parseTags(tags)
tagMap, err := p.parseBlocks(tags)
if err != nil {
return nil, fmt.Errorf("parse tags for %s error: %v", name, err)
}

customConfigs := info.get("configs")
customConfigMap, err := p.parseBlocks(customConfigs)
if err != nil {
return nil, fmt.Errorf("parse custom configs for %s error: %v", name, err)
}

log.Infof("got custom configs %v", customConfigMap)

target := info.get("target")
// add default index or topic
if _, ok := tagMap["index"]; !ok {
Expand Down Expand Up @@ -602,15 +600,16 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

return &LogConfig{
Name: name,
HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
File: logFile,
Format: format.value,
Tags: tagMap,
FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
Target: target,
EstimateTime: false,
Stdout: true,
Name: name,
HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
File: logFile,
Format: format.value,
Tags: tagMap,
CustomConfigs: customConfigMap,
FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
Target: target,
EstimateTime: false,
Stdout: true,
}, nil
}

Expand All @@ -630,14 +629,15 @@ func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath strin
}

cfg := &LogConfig{
Name: name,
ContainerDir: containerDir,
Format: format.value,
File: file,
Tags: tagMap,
HostDir: filepath.Join(p.baseDir, hostDir),
FormatConfig: formatConfig,
Target: target,
Name: name,
ContainerDir: containerDir,
Format: format.value,
File: file,
Tags: tagMap,
CustomConfigs: customConfigMap,
HostDir: filepath.Join(p.baseDir, hostDir),
FormatConfig: formatConfig,
Target: target,
}

if formatConfig["time_key"] == "" {
Expand Down Expand Up @@ -699,25 +699,10 @@ func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, lab
labelNames = append(labelNames, k)
}

customConfigs := make(map[string]string)

sort.Strings(labelNames)
root := newLogInfoNode("")
for _, k := range labelNames {
for _, prefix := range p.logPrefix {
customConfig := fmt.Sprintf(ENV_SERVICE_LOGS_CUSTOME_CONFIG_TEMPL, prefix)
if customConfig == k {
configs := strings.Split(labels[k], "\n")
for _, c := range configs {
if c == "" {
continue
}
customLabel := strings.SplitN(c, "=", 2)
customConfigs[customLabel[0]] = customLabel[1]
}
continue
}

serviceLogs := fmt.Sprintf(LABEL_SERVICE_LOGS_TEMPL, prefix)
if !strings.HasPrefix(k, serviceLogs) || strings.Count(k, ".") == 1 {
continue
Expand All @@ -735,7 +720,6 @@ func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, lab
if err != nil {
return nil, err
}
CustomConfig(name, customConfigs, logConfig)
ret = append(ret, logConfig)
}
return ret, nil
Expand Down
8 changes: 6 additions & 2 deletions pilot/pilot_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package pilot

import (
"os"
"testing"

log "github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"gopkg.in/check.v1"
"os"
"testing"
)

func Test(t *testing.T) {
Expand All @@ -32,6 +33,7 @@ func (p *PilotSuite) TestGetLogConfigs(c *check.C) {
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "json",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.configs": "multiline.pattern='^\\[',multiline.negate=true,multiline.match=after",
"aliyun.logs.hello.format.time_format": "%Y-%m-%d",
}

Expand All @@ -52,13 +54,15 @@ func (p *PilotSuite) TestGetLogConfigs(c *check.C) {
c.Assert(configs[0].ContainerDir, check.Equals, "/var/log")
c.Assert(configs[0].File, check.Equals, "hello.log")
c.Assert(configs[0].Tags, check.HasLen, 4)
c.Assert(configs[0].CustomConfigs, check.HasLen, 3)
c.Assert(configs[0].FormatConfig, check.HasLen, 2)

//Test regex format
labels = map[string]string{
"aliyun.logs.hello": "/var/log/hello.log",
"aliyun.logs.hello.format": "regexp",
"aliyun.logs.hello.tags": "name=hello,stage=test",
"aliyun.logs.hello.configs": "multiline.pattern='^\\[',multiline.negate=true,multiline.match=after",
"aliyun.logs.hello.format.pattern": "(?=name:hello).*",
}
configs, err = pilot.getLogConfigs("/path/to/json.log", mounts, labels)
Expand Down
22 changes: 0 additions & 22 deletions pilot/piloter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pilot
import (
"fmt"
"os"
"strings"
)

// Global variables for piloter
Expand Down Expand Up @@ -39,24 +38,3 @@ func NewPiloter(baseDir string) (Piloter, error) {
}
return nil, fmt.Errorf("InvalidPilotType")
}

// CustomConfig custom config
func CustomConfig(name string, customConfigs map[string]string, logConfig *LogConfig) {
if os.Getenv(ENV_PILOT_TYPE) == PILOT_FILEBEAT {
fields := make(map[string]string)
configs := make(map[string]string)
for k, v := range customConfigs {
if strings.HasPrefix(k, name) {
key := strings.TrimPrefix(k, name+".")
if strings.HasPrefix(key, "fields") {
key2 := strings.TrimPrefix(key, "fields.")
fields[key2] = v
} else {
configs[key] = v
}
}
}
logConfig.CustomFields = fields
logConfig.CustomConfigs = configs
}
}