From b7241db0dd7171786576044a32240f30aa976d56 Mon Sep 17 00:00:00 2001 From: sunzhiwei Date: Sat, 29 Dec 2018 12:10:55 +0800 Subject: [PATCH 1/2] add flume plugin --- Dockerfile.flume | 48 ++++++++ assets/entrypoint | 8 +- assets/flume/flume.tpl | 20 ++++ build-image.sh | 3 + pilot/flume_piloter.go | 253 +++++++++++++++++++++++++++++++++++++++++ pilot/pilot.go | 4 +- pilot/piloter.go | 4 + 7 files changed, 336 insertions(+), 4 deletions(-) create mode 100644 Dockerfile.flume create mode 100644 assets/flume/flume.tpl create mode 100644 pilot/flume_piloter.go diff --git a/Dockerfile.flume b/Dockerfile.flume new file mode 100644 index 00000000..57edfdc9 --- /dev/null +++ b/Dockerfile.flume @@ -0,0 +1,48 @@ +FROM golang:1.9-alpine3.6 as builder + +ENV PILOT_DIR /go/src/github.com/AliyunContainerService/log-pilot +ARG GOOS=linux +ARG GOARCH=amd64 +RUN set -ex && apk add --no-cache make git +WORKDIR $PILOT_DIR +COPY . $PILOT_DIR +RUN go install + +FROM alpine:3.6 +COPY assets/glibc/glibc-2.26-r0.apk /tmp/ + +RUN apk update && \ + apk add tzdata && \ + ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + echo "Asia/Shanghai" > /etc/timezone && \ + apk add python && \ + apk add ca-certificates wget && \ + update-ca-certificates && \ + wget wget --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz && \ + tar -xvzf jdk-8u191-linux-x64.tar.gz -C /usr/local/ && ln -s /usr/local/jdk/jdk1.8.0_191 /usr/local/java && \ + rm -f jdk-8u191-linux-x64.tar.gz && \ + apk add curl openssl && \ + apk add --allow-untrusted /tmp/glibc-2.26-r0.apk && \ + rm -rf /tmp/glibc-2.26-r0.apk && \ + apk add --no-cache bash bash-doc bash-completion && \ + rm -rf /var/cache/apk/* + +ENV JAVA_HOME="/usr/local/java" \ + JRE_HOME="/usr/local/java/jre" \ + CLASSPATH=".:/usr/local/java/lib:/opt/soft/java/jre/lib:" \ + PATH="/usr/local/java/bin:/usr/local/java/jre/bin:$PATH" + +COPY --from=builder /go/bin/log-pilot /pilot/pilot +COPY assets/entrypoint assets/flume/ assets/healthz /pilot/ +RUN cd /pilot && \ + wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz && \ + tar -zvxf apache-flume-1.8.0-bin.tar.gz && ln -s apache-flume-1.8.0-bin flume && mkdir -p /pilot/flume/conf/tmp && \ + chmod +x /pilot/pilot /pilot/entrypoint /pilot/healthz + +HEALTHCHECK CMD /pilot/healthz + +VOLUME /opt/log +VOLUME /flume/log_meta +WORKDIR /pilot/ +ENV PILOT_TYPE=flume +ENTRYPOINT ["/pilot/entrypoint"] diff --git a/assets/entrypoint b/assets/entrypoint index f0e306ba..2f666790 100755 --- a/assets/entrypoint +++ b/assets/entrypoint @@ -37,8 +37,10 @@ def run(): pilot_type = os.environ.get(ENV_PILOT_TYPE) if pilot_filebeat == pilot_type: tpl_config = "/pilot/filebeat.tpl" - else: + elif pilot_fluentd == pilot_type: tpl_config = "/pilot/fluentd.tpl" + else: + tpl_config = "/pilot/flume.tpl" os.execve('/pilot/pilot', ['/pilot/pilot', '-template', tpl_config, '-base', base, '-log-level', 'debug'], os.environ) @@ -49,8 +51,8 @@ def config(): if pilot_filebeat == pilot_type: print "start log-pilot:", pilot_filebeat subprocess.check_call(['/pilot/config.filebeat']) - else: - print "start log-pilot:", pilot_fluentd + elif pilot_fluentd == pilot_type: + print "enable pilot:", pilot_fluentd subprocess.check_call(['/pilot/config.fluentd']) diff --git a/assets/flume/flume.tpl b/assets/flume/flume.tpl new file mode 100644 index 00000000..03d29984 --- /dev/null +++ b/assets/flume/flume.tpl @@ -0,0 +1,20 @@ +{{range .configList}} + +a1.sources.{{ $.containerId }}_{{ .Name }}_source.type = TAILDIR +a1.sources.{{ $.containerId }}_{{ .Name }}_source.channels = {{ $.containerId }}_{{ .Name }}_channel +a1.sources.{{ $.containerId }}_{{ .Name }}_source.positionFile = /flume/log_meta/source/{{ $.containerId }}/{{ .Name }}/taildir_position.json +a1.sources.{{ $.containerId }}_{{ .Name }}_source.filegroups = f1 +a1.sources.{{ $.containerId }}_{{ .Name }}_source.filegroups.f1 = {{ .HostDir }}/{{ .File }} + +a1.channels.{{ $.containerId }}_{{ .Name }}_channel.type = file +a1.channels.{{ $.containerId }}_{{ .Name }}_channel.checkpointDir = /flume/log_meta/channel/{{ $.containerId }}/{{ .Name }}/checkpoint +a1.channels.{{ $.containerId }}_{{ .Name }}_channel.dataDirs = /flume/log_meta/channel/{{ $.containerId }}/{{ .Name }}/buffer + +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.type = file_roll +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.channel = {{ $.containerId }}_{{ .Name }}_channel +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.sink.directory = {{ $.output }}/{{ index $.container "docker_container" }} +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.sink.rollInterval = 3600000 +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.sink.pathManager.prefix = {{ .Name }} +a1.sinks.{{ $.containerId }}_{{ .Name }}_sink.sink.pathManager.extension = log + +{{end}} \ No newline at end of file diff --git a/build-image.sh b/build-image.sh index 3d999820..8ea79423 100755 --- a/build-image.sh +++ b/build-image.sh @@ -14,6 +14,9 @@ case $1 in fluentd) build fluentd ;; +flume) + build flume + ;; *) build filebeat ;; diff --git a/pilot/flume_piloter.go b/pilot/flume_piloter.go new file mode 100644 index 00000000..b0fabd50 --- /dev/null +++ b/pilot/flume_piloter.go @@ -0,0 +1,253 @@ +package pilot + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "os" + "os/exec" + "time" + "regexp" + "io/ioutil" + "bufio" + "syscall" + "io" +) + +const ( + FLUME_EXEC_BIN = "/pilot/flume/bin/flume-ng" // agent -n a1 -c conf -f conf/flume.properties + FLUME_CONF_HOME = "/pilot/flume/conf" + FLUME_CONF_DIR = FLUME_CONF_HOME + "/tmp" // 存放各个容器的日志采集配置文件 + FLUME_CONF_FILE = FLUME_CONF_HOME + "/flume-conf.properties" + + ENV_FLUME_OUTPUT = "FLUME_OUTPUT" +) + +var flume *exec.Cmd + +type FlumePiloter struct { + name string +} + +func NewFlumePiloter() (Piloter, error) { + return &FlumePiloter{ + name: PILOT_FLUME, + }, nil +} + +func (p *FlumePiloter) Start() error { + if flume != nil { + pid := flume.Process.Pid + log.Infof("flume started, pid: %v", pid) + return fmt.Errorf(ERR_ALREADY_STARTED) + } + + log.Info("start generate flume conf") + err := p.GenConf(FLUME_CONF_HOME) + if err != nil{ + log.Errorf("flume conf error in start : %v", err) + //return err + } else { + //agent -n a1 -c conf -f conf/flume.properties + log.Info("starting flume") + flume = exec.Command(FLUME_EXEC_BIN, "agent", + "-c", FLUME_CONF_HOME, + fmt.Sprintf("-Dlog4j.configuration=file:%s/log4j.properties", FLUME_CONF_HOME), + "-n", "a1", + "-f", FLUME_CONF_FILE) + flume.Stderr = os.Stderr + flume.Stdout = os.Stdout + err := flume.Start() + if err != nil { + log.Errorf("flume start fail: %v", err) + } + + go func() { + log.Infof("flume started: %v", flume.Process.Pid) + err := flume.Wait() + if err != nil { + log.Errorf("flume exited: %v", err) + if exitError, ok := err.(*exec.ExitError); ok { + processState := exitError.ProcessState + log.Errorf("flume exited pid: %v", processState.Pid()) + } + } + + // try to restart flume + log.Warningf("flume exited and try to restart") + flume = nil + //time.Sleep(5 * time.Second) + p.Start() + }() + } + + return err +} + +func (p *FlumePiloter) Stop() error { + pid := flume.Process.Pid + command := fmt.Sprintf("ps -ef | grep %d | grep flume | grep -v grep | head 1 | awk '{print $1}'", pid) + childId := flumeFindShell(command) + log.Infof("before stop flume childId : %s", childId) + flume.Process.Signal(syscall.SIGHUP) + time.Sleep(5 * time.Second) + afterChildId := shell(command) + log.Infof("after stop flume childId : %s", afterChildId) + if childId == afterChildId { + log.Infof("kill childId : %s", childId) + shell("kill -9 " + childId) + } + return nil +} + +func (p *FlumePiloter) Reload() error { + if flume == nil { + err := fmt.Errorf("flume have not started") + log.Error(err) + return err + } + + log.Info("reload flume") + ch := make(chan struct{}) + go func(pid int) { + // jps | grep Application | grep pid | grep -v grep | awk '{print $1}' 测试下哪个好用 + command := fmt.Sprintf("ps -ef | grep %d | grep flume | grep -v grep | head 1 | awk '{print $1}'", pid) + childId := flumeFindShell(command) + log.Infof("before reload flume childId : %s", childId) + flume.Process.Signal(syscall.SIGHUP) + time.Sleep(5 * time.Second) + afterChildId := flumeFindShell(command) + log.Infof("after reload flume childId : %s", afterChildId) + if childId == afterChildId { + log.Infof("kill childId : %s", childId) + flumeFindShell("kill -9 " + childId) + } + close(ch) + }(flume.Process.Pid) + <-ch + return nil +} + +func (p *FlumePiloter) GetConfPath(container string) string { + return fmt.Sprintf("%s/%s.properties", FLUME_CONF_DIR, container) +} + +func (p *FlumePiloter) GetConfHome() string { + return FLUME_CONF_DIR +} + +func (p *FlumePiloter) Name() string { + return p.name +} + +func flumeFindShell(command string) string { + cmd := exec.Command("/bin/sh", "-c", command) + out, err := cmd.Output() + if err != nil { + fmt.Printf("error %v", err) + } + return string(out) +} + +func (p *FlumePiloter) GenConf(path string) error { + tmpConf, err := ioutil.ReadDir(fmt.Sprintf("%s/tmp", path)) + if err != nil { + log.Error(err) + err := fmt.Errorf("flume conf read tmp path error") + //log.Error(err) + return err + } + + // 正则取出source channel sink name + sourceRep, _ := regexp.Compile("([0-9a-zA-Z]+_[0-9a-zA-Z]+_source)(.+=.+)") + channelRep, _ := regexp.Compile("([0-9a-zA-Z]+_[0-9a-zA-Z]+_channel)(.+=.+)") + sinkRep, _ := regexp.Compile("([0-9a-zA-Z]+_[0-9a-zA-Z]+_sink)(.+=.+)") + + var sources = map[string]string{} + var channels = map[string]string{} + var sinks = map[string]string{} + var buf []byte + for _, file := range tmpConf { + fi, err := os.Open(fmt.Sprintf("%s/tmp/%s", path, file.Name())) + if err != nil { + fmt.Printf("read tmp conf error: %s\n", err) + return nil + } + defer fi.Close() + + br := bufio.NewReader(fi) + for { + line, _, c := br.ReadLine() + if c == io.EOF { + break + } + buf = append(buf, line...) + buf = append(buf, []byte("\n")...) + + source := sourceRep.FindStringSubmatch(string(line)) + if source != nil { + if _, no := sources[source[1]]; !no { + sources[source[1]] = source[1] + } + //continue + } + + channel := channelRep.FindStringSubmatch(string(line)) + if channel != nil { + if _, no := channels[channel[1]]; !no { + channels[channel[1]] = channel[1] + } + //continue + } + + sink := sinkRep.FindStringSubmatch(string(line)) + if sink != nil { + if _, no := sinks[sink[1]]; !no { + sinks[sink[1]] = sink[1] + } + //continue + } + + } + } + + var resultBuf []byte + var sourceBuf []byte + var channelBuf []byte + var sinkBuf []byte + for k, _ := range sources { + sourceBuf = append(sourceBuf, []byte(" " + k)...) + } + for k, _ := range channels { + channelBuf = append(channelBuf, []byte(" " + k)...) + } + for k, _ := range sinks { + sinkBuf = append(sinkBuf, []byte(" " + k)...) + } + + if len(sourceBuf) > 0 { + resultBuf = append(resultBuf, []byte("a1.sources =")...) + resultBuf = append(resultBuf, sourceBuf...) + resultBuf = append(resultBuf, []byte("\n")...) + } + if len(channelBuf) > 0 { + resultBuf = append(resultBuf, []byte("a1.channels =")...) + resultBuf = append(resultBuf, channelBuf...) + resultBuf = append(resultBuf, []byte("\n")...) + } + if len(sinkBuf) > 0 { + resultBuf = append(resultBuf, []byte("a1.sinks =")...) + resultBuf = append(resultBuf, sinkBuf...) + resultBuf = append(resultBuf, []byte("\n")...) + } + resultBuf = append(resultBuf, buf...) + + if err = ioutil.WriteFile(fmt.Sprintf("%s/flume-conf.properties", path), resultBuf, os.FileMode(0644)); err != nil { + return err + } + return nil +} + +func (p *FlumePiloter) OnDestroyEvent(container string) error { + log.Info("refactor in the future!!!") + return nil +} \ No newline at end of file diff --git a/pilot/pilot.go b/pilot/pilot.go index 07a04962..c0b96c1b 100644 --- a/pilot/pilot.go +++ b/pilot/pilot.go @@ -410,7 +410,7 @@ func (p *Pilot) delContainer(id string) error { p.removeVolumeSymlink(id) //fixme refactor in the future - if p.piloter.Name() == PILOT_FLUENTD { + if p.piloter.Name() == PILOT_FLUENTD || p.piloter.Name() == PILOT_FLUME { clean := func() { log.Infof("Try removing log config %s", id) if err := os.Remove(p.piloter.GetConfPath(id)); err != nil { @@ -691,6 +691,8 @@ func (p *Pilot) render(containerId string, container map[string]string, configLi output := os.Getenv(ENV_FLUENTD_OUTPUT) if p.piloter.Name() == PILOT_FILEBEAT { output = os.Getenv(ENV_FILEBEAT_OUTPUT) + }else if p.piloter.Name() == PILOT_FLUME { + output = os.Getenv(ENV_FLUME_OUTPUT) } if output == "" { output = os.Getenv(ENV_LOGGING_OUTPUT) diff --git a/pilot/piloter.go b/pilot/piloter.go index 2f8d6755..720be46a 100644 --- a/pilot/piloter.go +++ b/pilot/piloter.go @@ -10,6 +10,7 @@ const ( PILOT_FILEBEAT = "filebeat" PILOT_FLUENTD = "fluentd" + PILOT_FLUME = "flume" ) type Piloter interface { @@ -32,5 +33,8 @@ func NewPiloter(baseDir string) (Piloter, error) { if os.Getenv(ENV_PILOT_TYPE) == PILOT_FLUENTD { return NewFluentdPiloter() } + if os.Getenv(ENV_PILOT_TYPE) == PILOT_FLUME { + return NewFlumePiloter() + } return nil, fmt.Errorf("InvalidPilotType") } From 40ad59df2df76f1ba49fa332c78cbb78ed6dc80e Mon Sep 17 00:00:00 2001 From: sunzhiwei Date: Sat, 29 Dec 2018 13:31:47 +0800 Subject: [PATCH 2/2] add flume yml --- examples/pilot-flume.yml | 60 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 examples/pilot-flume.yml diff --git a/examples/pilot-flume.yml b/examples/pilot-flume.yml new file mode 100644 index 00000000..ecbfeea4 --- /dev/null +++ b/examples/pilot-flume.yml @@ -0,0 +1,60 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: log-pilot + labels: + k8s-app: log-pilot +spec: + template: + metadata: + labels: + k8s-app: log-pilot + spec: + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + containers: + - name: log-pilot + image: registry.cn-hangzhou.aliyuncs.com/acs/log-pilot:0.9.5-flume + imagePullPolicy: Always + env: + - name: "PILOT_TYPE" + value: "flume" + - name: "FLUME_OUTPUT" + value: "/opt/log/log-pilot-out" + - name: "NODE_NAME" + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: sock + mountPath: /var/run/docker.sock + - name: root + mountPath: /host + - name: log-meta + mountPath: /flume/log_meta + - name: logout + mountPath: /opt/log/log-pilot-out + - name: flume-log + mountPath: /opt/log/flume + securityContext: + capabilities: + add: + - SYS_ADMIN + terminationGracePeriodSeconds: 30 + volumes: + - name: sock + hostPath: + path: /var/run/docker.sock + - name: root + hostPath: + path: / + - name: logout + hostPath: + path: /opt/log/log-pilot-out + - name: log-meta + hostPath: + path: /opt/log/log-pilot-out/log_meta + - name: flume-log + hostPath: + path: /opt/log/flume \ No newline at end of file