Skip to content
This repository has been archived by the owner on Nov 18, 2017. It is now read-only.

Commit

Permalink
Add API to governor
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Deare committed Sep 20, 2016
1 parent 426748f commit 982a5a2
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 15 deletions.
55 changes: 55 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package api

import (
"encoding/json"
"github.com/compose/governor/fsm"
"github.com/compose/governor/ha"
"github.com/compose/governor/service"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"net/http"
)

func Router(singleFSM fsm.SingleLeaderFSM, singleHA *ha.SingleLeaderHA, singleService service.SingleLeaderService) (*mux.Router, error) {
r := mux.NewRouter()
if err := registerFSMRouter(singleFSM, singleService, r.PathPrefix("/fsm").Subrouter()); err != nil {
return nil, errors.Wrap(err, "Error registering FSM subrouter for API")
}
if err := registerServiceRouter(singleService, r.PathPrefix("/service").Subrouter()); err != nil {
return nil, errors.Wrap(err, "Error registering Service subrouter for API")
}
if err := registerHARouter(singleHA, r.PathPrefix("/ha").Subrouter()); err != nil {
return nil, errors.Wrap(err, "Error registering HA subrouter for API")
}
return r, nil
}

type apiSuccessResponse struct {
Data interface{} `json:"data,omitempty"`
}
type apiErrorResponse struct {
Errors []error `json:"errors"`
}

func sendResponse(code int, jsonData interface{}, errorMsgs []error, w http.ResponseWriter) error {
var resp interface{}
if len(errorMsgs) > 0 {
resp = apiErrorResponse{Errors: errorMsgs}
} else if jsonData != nil {
resp = apiSuccessResponse{Data: jsonData}
} else {
return errors.New("Must supply either errorsMsgs or jsonDATA to response")
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)

if resp == nil {
return nil
}

if err := json.NewEncoder(w).Encode(resp); err != nil {
return errors.Wrap(err, "Error encoding JSON into response body")
}
return nil
}
117 changes: 117 additions & 0 deletions api/fsm_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package api

import (
log "github.com/Sirupsen/logrus"
"github.com/compose/governor/fsm"
"github.com/compose/governor/service"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"net/http"
)

// TODO: Add long listener for updates
func registerFSMRouter(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService, r *mux.Router) error {
r.HandleFunc("/id", singleFSMIDHandler(singleFSM)).Methods("GET")
r.HandleFunc("/leader", singleFSMLeaderHandler(singleFSM, singleService)).Methods("GET")
r.HandleFunc("/member/{id}", singleFSMMemberHandler(singleFSM, singleService)).Methods("GET")
r.HandleFunc("/members", singleFSMMembersHandler(singleFSM, singleService)).Methods("GET")
return nil
}

func singleFSMIDHandler(singleFSM fsm.SingleLeaderFSM) http.HandlerFunc {
type idAPIResp struct {
ID uint64 `json:"id"`
}
return func(w http.ResponseWriter, req *http.Request) {
id := singleFSM.UniqueID()
if err := sendResponse(200, idAPIResp{ID: id}, []error{}, w); err != nil {
log.Error("Error sending response for ID request")
}
}
}

func singleFSMLeaderHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc {
type leaderAPIResp struct {
Leader fsm.Leader `json:"leader"`
Exists bool `json:"exists"`
}
return func(w http.ResponseWriter, req *http.Request) {
leaderData, exists, err := singleFSM.Leader()
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
leader, err := singleService.FSMLeaderFromBytes(leaderData)
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
if err := sendResponse(200, leaderAPIResp{Leader: leader, Exists: exists}, []error{}, w); err != nil {
log.Error("Error sending leader response")

}
}
}

func singleFSMMemberHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc {
type memberAPIResp struct {
Member fsm.Member `json:"member"`
Exists bool `json:"exists"`
}
return func(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
id, ok := vars["id"]
if !ok {
if err := sendResponse(400, nil, []error{errors.New("ID Not provided in request for member")}, w); err != nil {
log.Error("Error sending error response")
}
}

memberData, exists, err := singleFSM.Member(id)
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
member, err := singleService.FSMMemberFromBytes(memberData)
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
if err := sendResponse(200, memberAPIResp{Member: member, Exists: exists}, []error{}, w); err != nil {
log.Error("Error sending leader response")

}
}
}

func singleFSMMembersHandler(singleFSM fsm.SingleLeaderFSM, singleService service.SingleLeaderService) http.HandlerFunc {
type membersAPIResp struct {
Members []fsm.Member `json:"members"`
}
return func(w http.ResponseWriter, req *http.Request) {
membersData, err := singleFSM.Members()
members := []fsm.Member{}
for _, memberData := range membersData {
member, err := singleService.FSMMemberFromBytes(memberData)
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
members = append(members, member)
}
if err != nil {
if err := sendResponse(500, nil, []error{err}, w); err != nil {
log.Error("Error sending error response")
}
}
if err := sendResponse(200, membersAPIResp{Members: members}, []error{}, w); err != nil {
log.Error("Error sending leader response")

}
}
}
28 changes: 28 additions & 0 deletions api/ha_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package api

import (
log "github.com/Sirupsen/logrus"
"github.com/compose/governor/ha"
"github.com/gorilla/mux"
"net/http"
)

func registerHARouter(singleHA *ha.SingleLeaderHA, r *mux.Router) error {
r.HandleFunc("/is_leader", singleHAIsLeaderHandler(singleHA)).Methods("GET")
return nil
}

func singleHAIsLeaderHandler(singleHA *ha.SingleLeaderHA) http.HandlerFunc {
type isLeaderAPIResp struct {
IsLeader bool `json:"is_leader"`
}
return func(w http.ResponseWriter, req *http.Request) {
isLeader, err := singleHA.IsLeader()
if err != nil {
sendResponse(500, nil, []error{err}, w)
}
if err := sendResponse(200, isLeaderAPIResp{IsLeader: isLeader}, []error{}, w); err != nil {
log.Error("Error sending response for request")
}
}
}
51 changes: 51 additions & 0 deletions api/service_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package api

import (
log "github.com/Sirupsen/logrus"
"github.com/compose/governor/service"
"github.com/gorilla/mux"
"net/http"
)

func registerServiceRouter(singleService service.SingleLeaderService, r *mux.Router) error {
r.HandleFunc("/running_as_leader", singleServiceRunningAsLeaderHandler(singleService)).Methods("GET")
r.HandleFunc("/is_running", singleServiceIsRunningHandler(singleService)).Methods("GET")
r.HandleFunc("/is_healthy", singleServiceIsHealthyHandler(singleService)).Methods("GET")
return nil
}

func singleServiceRunningAsLeaderHandler(singleService service.SingleLeaderService) http.HandlerFunc {
type isLeaderAPIResp struct {
RunningAsLeader bool `json:"running_as_leader"`
}
return func(w http.ResponseWriter, req *http.Request) {
runningAsLeader := singleService.RunningAsLeader()
if err := sendResponse(200, isLeaderAPIResp{RunningAsLeader: runningAsLeader}, []error{}, w); err != nil {
log.Error("Error sending response for ID request")
}
}
}

func singleServiceIsRunningHandler(singleService service.SingleLeaderService) http.HandlerFunc {
type isRunningAPIResp struct {
IsRunning bool `json:"is_running"`
}
return func(w http.ResponseWriter, req *http.Request) {
running := singleService.IsRunning()
if err := sendResponse(200, isRunningAPIResp{IsRunning: running}, []error{}, w); err != nil {
log.Error("Error sending response for ID request")
}
}
}

func singleServiceIsHealthyHandler(singleService service.SingleLeaderService) http.HandlerFunc {
type isHealthyAPIResp struct {
IsHealthy bool `json:"is_healthy"`
}
return func(w http.ResponseWriter, req *http.Request) {
healthy := singleService.IsHealthy()
if err := sendResponse(200, isHealthyAPIResp{IsHealthy: healthy}, []error{}, w); err != nil {
log.Error("Error sending response for ID request")
}
}
}
2 changes: 1 addition & 1 deletion configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Configuration struct {
DataDir string `yaml:"data_dir"`
FSM *fsm.Config `yaml:"fsm"`
Postgresql *service.PostgresqlConfig `yaml:"postgresql"`
HAHealth string `yaml:"haproxy_health_endpoint"`
APIPort int `yaml:"api_port"`
}

func LoadConfiguration(path string) (Configuration, error) {
Expand Down
14 changes: 7 additions & 7 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func (f *fsm) RaceForInit(timeout time.Duration) (bool, error) {

// TODO: allow custom logger to be passed in
type Config struct {
RaftPort int `yaml:"raft_port"`
APIPort int `yaml:"api_port"`
BootstrapPeers []string `yaml:"bootstrap_peers"`
BootstrapNode bool `yaml:"is_bootstrap"`
DataDir string `yaml:"data_dir"`
ClusterID uint64 `yaml:"cluster_id"`
RaftPort int `yaml:"raft_port"`
ClusterConfigPort int `yaml:"cluster_config_port"`
BootstrapPeers []string `yaml:"bootstrap_peers"`
BootstrapNode bool `yaml:"is_bootstrap"`
DataDir string `yaml:"data_dir"`
ClusterID uint64 `yaml:"cluster_id"`
// LeaderTTL in milliseconds
LeaderTTL int `yaml:"leader_ttl"`
// MemberTTL in milliseconds
Expand All @@ -138,7 +138,7 @@ func NewGovernorFSM(config *Config) (SingleLeaderFSM, error) {
FSM: newFSM,
ClusterID: config.ClusterID,
RaftPort: config.RaftPort,
APIPort: config.APIPort,
APIPort: config.ClusterConfigPort,
BootstrapPeers: config.BootstrapPeers,
BootstrapNode: config.BootstrapNode,
DataDir: config.DataDir,
Expand Down
11 changes: 11 additions & 0 deletions governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
//"os/exec"
"flag"
log "github.com/Sirupsen/logrus"
"github.com/compose/governor/api"
"github.com/compose/governor/fsm"
"github.com/compose/governor/ha"
"github.com/compose/governor/service"
"net/http"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -102,6 +104,15 @@ func main() {
}).Info("Clean Shutdown Finished")
}
}(singleHA, singleLeaderState, pg)

go func() {
router, err := api.Router(singleLeaderState, singleHA, pg)
if err != nil {
log.Error("Could not start API")
}
http.ListenAndServe(fmt.Sprintf(":%d", configuration.APIPort), router)
}()

if err := singleHA.Run(); err != nil {
log.Fatalf("Error Running HA, %+v", err)
}
Expand Down
11 changes: 7 additions & 4 deletions ha/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (ha *SingleLeaderHA) RunCycle() error {
}
}

isLeader, err := ha.isLeader()
isLeader, err := ha.IsLeader()
if err != nil {
return err
}
Expand Down Expand Up @@ -469,14 +469,17 @@ func (ha *SingleLeaderHA) leaderIsMe(leader fsm.Leader) (bool, error) {

}

func (ha *SingleLeaderHA) isLeader() (bool, error) {
curLeader := ha.service.FSMLeaderTemplate()
exists, err := ha.fsm.Leader(curLeader)
func (ha *SingleLeaderHA) IsLeader() (bool, error) {
leaderData, exists, err := ha.fsm.Leader()
if err != nil {
return false, err
} else if !exists {
return false, nil
}
curLeader, err := ha.service.FSMLeaderFromBytes(leaderData)
if err != nil {
return false, errors.Wrap(err, "Error getting leader from bytes")
}

meAsLeader, err := ha.service.AsFSMLeader()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion postgres0.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
loop_wait: 1000 #milliseconds
data_dir: "data/postgres0" #canoe and pg data will go here
api_port: 5000
fsm:
raft_port: 1234
api_port: 1244
cluster_config_port: 1244
#bootstrap_peers:
#- http://localhost:1245
#TODO: Alter canoe to allow set-list of bootstrap peers
Expand Down
3 changes: 2 additions & 1 deletion postgres1.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
loop_wait: 1000 #milliseconds
data_dir: "data/postgres1" #canoe and pg data will go here
api_port: 5001
fsm:
raft_port: 1235
api_port: 1245
cluster_config_port: 1245
bootstrap_peers:
- http://localhost:1244
#TODO: Alter canoe to allow set-list of bootstrap peers
Expand Down
3 changes: 2 additions & 1 deletion postgres2.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
loop_wait: 1000 #milliseconds
data_dir: "data/postgres2" #canoe and pg data will go here
api_port: 5002
fsm:
raft_port: 1236
api_port: 1246
cluster_config_port: 1246
bootstrap_peers:
- http://localhost:1244
- http://localhost:1245
Expand Down

0 comments on commit 982a5a2

Please sign in to comment.