diff --git a/docker_router/docker_router.go b/docker_router/docker_router.go deleted file mode 100644 index f22d0b1..0000000 --- a/docker_router/docker_router.go +++ /dev/null @@ -1,114 +0,0 @@ -package docker_router - -import ( - log "github.com/Sirupsen/logrus" - "github.com/fsouza/go-dockerclient" - "time" -) - -const workerTimeout = 180 * time.Second - -type handler interface { - Handle(*docker.APIEvents) error -} - -type DockerRouter struct { - handlers map[string][]handler - dockerClient *docker.Client - listener chan *docker.APIEvents - workers chan *worker - workerTimeout time.Duration -} - -func dockerEventsRouter(bufferSize int, workerPoolSize int, dockerClient *docker.Client, - handlers map[string][]handler) (*DockerRouter, error) { - workers := make(chan *worker, workerPoolSize) - for i := 0; i < workerPoolSize; i++ { - workers <- &worker{} - } - - DockerRouter := &DockerRouter{ - handlers: handlers, - dockerClient: dockerClient, - listener: make(chan *docker.APIEvents, bufferSize), - workers: workers, - workerTimeout: workerTimeout, - } - - return DockerRouter, nil -} - -func (e *DockerRouter) Start() error { - go e.manageEvents() - return e.dockerClient.AddEventListener(e.listener) -} - -func (e *DockerRouter) Stop() error { - if e.listener == nil { - return nil - } - return e.dockerClient.RemoveEventListener(e.listener) -} - -func (e *DockerRouter) manageEvents() { - for { - event := <-e.listener - timer := time.NewTimer(e.workerTimeout) - gotWorker := false - // Wait until we get a free worker or a timeout - // there is a limit in the number of concurrent events managed by workers to avoid resource exhaustion - // so we wait until we have a free worker or a timeout occurs - for !gotWorker { - select { - case w := <-e.workers: - if !timer.Stop() { - <-timer.C - } - go w.doWork(event, e) - gotWorker = true - case <-timer.C: - log.Infof("Timed out waiting.") - } - } - } -} - -type worker struct{} - -func (w *worker) doWork(event *docker.APIEvents, e *DockerRouter) { - defer func() { e.workers <- w }() - if handlers, ok := e.handlers[event.Status]; ok { - log.Infof("Processing event: %#v", event) - for _, handler := range handlers { - if err := handler.Handle(event); err != nil { - log.Errorf("Error processing event %#v. Error: %v", event, err) - } - } - } -} - -type dockerHandler struct { - handlerFunc func(event *docker.APIEvents) error -} - -func (th *dockerHandler) Handle(event *docker.APIEvents) error { - return th.handlerFunc(event) -} - -func (th *dockerHandler) SetHandler(handlerFunc func(event *docker.APIEvents) error) { - th.handlerFunc = handlerFunc -} - -func AddStartStopHandlers(startFn func(event *docker.APIEvents) error, stopFn func(event *docker.APIEvents) error, dockerClient *docker.Client) (*DockerRouter, error) { - startHandler := &dockerHandler{ - handlerFunc: startFn, - } - - stopHandler := &dockerHandler{ - handlerFunc: stopFn, - } - - handlers := map[string][]handler{"start": []handler{startHandler}, "die": []handler{stopHandler}} - router, err := dockerEventsRouter(5, 5, dockerClient, handlers) - return router, err -} diff --git a/redecs_agent/redecs_agent.go b/redecs_agent/redecs_agent.go index 899bb68..51f08c7 100644 --- a/redecs_agent/redecs_agent.go +++ b/redecs_agent/redecs_agent.go @@ -7,21 +7,17 @@ package main // ZSET and write changes to AWS Route53. import ( - docker_router "redecs/docker_router" "errors" "fmt" log "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" - "github.com/fsouza/go-dockerclient" "gopkg.in/redis.v5" "os" - "strconv" - "strings" "time" ) -const checkInterval = 30 * time.Second // how often to check Docker +const reportInterval = 30 * time.Second // how often to report type config struct { LocalIp string @@ -42,12 +38,11 @@ func logErrorNoFatal(err error) { } } -var dockerClient *docker.Client var redisClient *redis.Client // This reports an instance as active. It adds it to the redecs:service_pings set // and publishes a redecs:service_channel event. -func reportServiceActive(serviceName string) error { +func reportActiveService(serviceName string) error { log.Debugf("Reporting %s as active with IP %s", serviceName, configuration.LocalIp) value := serviceName + "_" + configuration.LocalIp // write {timestamp => SERVICENAME_IP} to redecs:service_pings @@ -60,65 +55,16 @@ func reportServiceActive(serviceName string) error { return err } -// This reports an instance as inactive. It removes it from the redecs:service_pings set -// and publishes a redecs:service_channel event. -func reportServiceInactive(serviceName string) error { - log.Debugf("Reporting %s as inactive with IP %s", serviceName, configuration.LocalIp) - value := serviceName + "_" + configuration.LocalIp - // remove SERVICENAME_IP from redecs:service_pings - err := redisClient.ZRem("redecs:service_pings", value).Err() - if err != nil { - return err - } - // notify the channel that this service has been reported active - err = redisClient.Publish("redecs:service_channel", "-"+value).Err() - return err -} - -// Gets the service name from a Docker container environment variable, if present. -func getServiceName(container *docker.Container) string { - // One of the environment variables should be SERVICE__NAME = - // We look for this environment variable doing a split in the "=" and another one in the "_" - // So envEval = [SERVICE__NAME, ] - // nameEval = [SERVICE, , NAME] - for _, env := range container.Config.Env { - envEval := strings.Split(env, "=") - nameEval := strings.Split(envEval[0], "_") - if len(envEval) == 2 && len(nameEval) == 3 && nameEval[0] == "SERVICE" && nameEval[2] == "NAME" { - if _, err := strconv.Atoi(nameEval[1]); err == nil { - return envEval[1] - } - } - } - return "" -} - -func processContainers() { - log.Debug("Listing active containers...") - - // only get running ones - containers, err := dockerClient.ListContainers(docker.ListContainersOptions{All: false}) - // bail out completely if we can't list containers. - logErrorAndFail(err) - for _, containerEntry := range containers { - // have to inspect to get the environment variable - container, err := dockerClient.InspectContainer(containerEntry.ID) - // retry logic? - logErrorAndFail(err) - serviceName := getServiceName(container) - if serviceName != "" { - reportServiceActive(serviceName) - } - } - - log.Debug("Done checking active containers") +// Gets the service name from the SERVICE_NAME environment variable. +func getServiceName() string { + return os.Getenv("SERVICE_NAME") } func main() { var err error var sum int if len(os.Args) < 2 || len(os.Args) > 3 { - err = errors.New(fmt.Sprintf("Usage: %s [Redis host] {host IP (for testing)}\n", os.Args[0])) + err = errors.New(fmt.Sprintf("Usage: %s [Redis host] {IP for testing purposes}\n", os.Args[0])) logErrorAndFail(err) } if len(os.Args) == 3 { @@ -130,11 +76,12 @@ func main() { configuration.LocalIp = localIp } configuration.RedisHost = os.Args[1] - endpoint := "unix:///var/run/docker.sock" - dockerClient, err = docker.NewClient(endpoint) - // bail out if we can't talk to Docker - logErrorAndFail(err) + serviceName := getServiceName() + if serviceName == "" { + err = errors.New("'SERVICE_NAME' environment variable is not defined! Exiting.") + logErrorAndFail(err) + } sum = 1 for { @@ -158,64 +105,13 @@ func main() { sum += 2 } - startFn := func(event *docker.APIEvents) error { - var err error - container, err := dockerClient.InspectContainer(event.ID) - logErrorAndFail(err) - service := getServiceName(container) - if service != "" { - sum = 1 - for { - if err = reportServiceActive(service); err == nil { - break - } - if sum > 8 { - log.Errorf("Error reporting service %s active", service) - break - } - time.Sleep(time.Duration(sum) * time.Second) - sum += 2 - } - } - log.Debugf("Docker %s started", event.ID) - return nil - } - - stopFn := func(event *docker.APIEvents) error { - var err error - container, err := dockerClient.InspectContainer(event.ID) - logErrorAndFail(err) - service := getServiceName(container) - if service != "" { - sum = 1 - for { - if err = reportServiceInactive(service); err == nil { - break - } - if sum > 8 { - log.Errorf("Error reporting service %s inactive", service) - break - } - time.Sleep(time.Duration(sum) * time.Second) - sum += 2 - } - } - log.Debugf("Docker %s stopped", event.ID) - return nil - } - - dockerRouter, err := docker_router.AddStartStopHandlers(startFn, stopFn, dockerClient) - defer dockerRouter.Stop() - logErrorAndFail(err) - dockerRouter.Start() - - log.Debug("Waiting for Docker events") + log.Debug("redecs_agent started") // continue processing once per minute - ticker := time.NewTicker(checkInterval) + ticker := time.NewTicker(reportInterval) for { - processContainers() + reportActiveService(serviceName) <-ticker.C } }