Skip to content

Commit

Permalink
Merge pull request #6 from ifwe/make-redecs-containerized
Browse files Browse the repository at this point in the history
Simplify redecs_agent to run inside a container
  • Loading branch information
mishan authored Mar 31, 2017
2 parents e27bedb + 7ed48cf commit c970b0d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 232 deletions.
114 changes: 0 additions & 114 deletions docker_router/docker_router.go

This file was deleted.

132 changes: 14 additions & 118 deletions redecs_agent/redecs_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@ package main
// ZSET and write changes to AWS Route53.

import (
docker_router "redecs/docker_router"
"errors"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/fsouza/go-dockerclient"
"gopkg.in/redis.v5"
"os"
"strconv"
"strings"
"time"
)

const checkInterval = 30 * time.Second // how often to check Docker
const reportInterval = 30 * time.Second // how often to report

type config struct {
LocalIp string
Expand All @@ -42,12 +38,11 @@ func logErrorNoFatal(err error) {
}
}

var dockerClient *docker.Client
var redisClient *redis.Client

// This reports an instance as active. It adds it to the redecs:service_pings set
// and publishes a redecs:service_channel event.
func reportServiceActive(serviceName string) error {
func reportActiveService(serviceName string) error {
log.Debugf("Reporting %s as active with IP %s", serviceName, configuration.LocalIp)
value := serviceName + "_" + configuration.LocalIp
// write {timestamp => SERVICENAME_IP} to redecs:service_pings
Expand All @@ -60,65 +55,16 @@ func reportServiceActive(serviceName string) error {
return err
}

// This reports an instance as inactive. It removes it from the redecs:service_pings set
// and publishes a redecs:service_channel event.
func reportServiceInactive(serviceName string) error {
log.Debugf("Reporting %s as inactive with IP %s", serviceName, configuration.LocalIp)
value := serviceName + "_" + configuration.LocalIp
// remove SERVICENAME_IP from redecs:service_pings
err := redisClient.ZRem("redecs:service_pings", value).Err()
if err != nil {
return err
}
// notify the channel that this service has been reported active
err = redisClient.Publish("redecs:service_channel", "-"+value).Err()
return err
}

// Gets the service name from a Docker container environment variable, if present.
func getServiceName(container *docker.Container) string {
// One of the environment variables should be SERVICE_<port>_NAME = <name of the service>
// We look for this environment variable doing a split in the "=" and another one in the "_"
// So envEval = [SERVICE_<port>_NAME, <name>]
// nameEval = [SERVICE, <port>, NAME]
for _, env := range container.Config.Env {
envEval := strings.Split(env, "=")
nameEval := strings.Split(envEval[0], "_")
if len(envEval) == 2 && len(nameEval) == 3 && nameEval[0] == "SERVICE" && nameEval[2] == "NAME" {
if _, err := strconv.Atoi(nameEval[1]); err == nil {
return envEval[1]
}
}
}
return ""
}

func processContainers() {
log.Debug("Listing active containers...")

// only get running ones
containers, err := dockerClient.ListContainers(docker.ListContainersOptions{All: false})
// bail out completely if we can't list containers.
logErrorAndFail(err)
for _, containerEntry := range containers {
// have to inspect to get the environment variable
container, err := dockerClient.InspectContainer(containerEntry.ID)
// retry logic?
logErrorAndFail(err)
serviceName := getServiceName(container)
if serviceName != "" {
reportServiceActive(serviceName)
}
}

log.Debug("Done checking active containers")
// Gets the service name from the SERVICE_NAME environment variable.
func getServiceName() string {
return os.Getenv("SERVICE_NAME")
}

func main() {
var err error
var sum int
if len(os.Args) < 2 || len(os.Args) > 3 {
err = errors.New(fmt.Sprintf("Usage: %s [Redis host] {host IP (for testing)}\n", os.Args[0]))
err = errors.New(fmt.Sprintf("Usage: %s [Redis host] {IP for testing purposes}\n", os.Args[0]))
logErrorAndFail(err)
}
if len(os.Args) == 3 {
Expand All @@ -130,11 +76,12 @@ func main() {
configuration.LocalIp = localIp
}
configuration.RedisHost = os.Args[1]
endpoint := "unix:///var/run/docker.sock"
dockerClient, err = docker.NewClient(endpoint)

// bail out if we can't talk to Docker
logErrorAndFail(err)
serviceName := getServiceName()
if serviceName == "" {
err = errors.New("'SERVICE_NAME' environment variable is not defined! Exiting.")
logErrorAndFail(err)
}

sum = 1
for {
Expand All @@ -158,64 +105,13 @@ func main() {
sum += 2
}

startFn := func(event *docker.APIEvents) error {
var err error
container, err := dockerClient.InspectContainer(event.ID)
logErrorAndFail(err)
service := getServiceName(container)
if service != "" {
sum = 1
for {
if err = reportServiceActive(service); err == nil {
break
}
if sum > 8 {
log.Errorf("Error reporting service %s active", service)
break
}
time.Sleep(time.Duration(sum) * time.Second)
sum += 2
}
}
log.Debugf("Docker %s started", event.ID)
return nil
}

stopFn := func(event *docker.APIEvents) error {
var err error
container, err := dockerClient.InspectContainer(event.ID)
logErrorAndFail(err)
service := getServiceName(container)
if service != "" {
sum = 1
for {
if err = reportServiceInactive(service); err == nil {
break
}
if sum > 8 {
log.Errorf("Error reporting service %s inactive", service)
break
}
time.Sleep(time.Duration(sum) * time.Second)
sum += 2
}
}
log.Debugf("Docker %s stopped", event.ID)
return nil
}

dockerRouter, err := docker_router.AddStartStopHandlers(startFn, stopFn, dockerClient)
defer dockerRouter.Stop()
logErrorAndFail(err)
dockerRouter.Start()

log.Debug("Waiting for Docker events")
log.Debug("redecs_agent started")

// continue processing once per minute
ticker := time.NewTicker(checkInterval)
ticker := time.NewTicker(reportInterval)

for {
processContainers()
reportActiveService(serviceName)
<-ticker.C
}
}

0 comments on commit c970b0d

Please sign in to comment.