Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACL Support for nomad_cluster #241

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 114 additions & 60 deletions pkg/clients/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
//go:generate mockery --name Nomad --filename nomad.go
type Nomad interface {
// SetConfig for the client, path is a valid Nomad JSON config file
SetConfig(address string, port, nodes int) error
SetConfig(address string, port, nodes int, acl_token string) error
// Create jobs in the provided files
Create(files []string) error
// Stop jobs in the provided files
Expand All @@ -30,9 +30,11 @@ type Nomad interface {
// HealthCheckAPI uses the Nomad API to check that all servers and nodes
// are ready. The function will block until either all nodes are healthy or the
// timeout period elapses.
HealthCheckAPI(time.Duration) error
HealthCheckAPI(time.Duration, bool) error
// Endpoints returns a list of endpoints for a cluster
Endpoints(job, group, task string) ([]map[string]string, error)
// Bootstrap ACLs
BootstrapACLs() (string, error)
}

// NomadImpl is an implementation of the Nomad interface
Expand All @@ -43,6 +45,7 @@ type NomadImpl struct {
address string
port int
clientNodes int
aclToken string
}

// NewNomad creates a new Nomad client
Expand All @@ -59,17 +62,51 @@ type createRequest struct {
Job string
}

func (n *NomadImpl) setAuthHeaders(rq *http.Request) {
if n.aclToken != "" {
rq.Header.Set("X-Nomad-Token", n.aclToken)
}
}

// SetConfig loads the Nomad config from a file
func (n *NomadImpl) SetConfig(address string, port, nodes int) error {
func (n *NomadImpl) SetConfig(address string, port, nodes int, acl_token string) error {
n.address = address
n.port = port
n.clientNodes = nodes
n.aclToken = acl_token

return nil
}

// HealthCheckAPI executes a HTTP heath check for a Nomad cluster
func (n *NomadImpl) HealthCheckAPI(timeout time.Duration) error {
func (n *NomadImpl) BootstrapACLs() (string, error) {
n.l.Debug("Bootstrapping ACLs", "address", n.address)

rq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s:%d/v1/acl/bootstrap", n.address, n.port), nil)
if err != nil {
return "", err
}

resp, err := n.httpClient.Do(rq)
if err != nil {
return "", xerrors.Errorf("Unable to bootstrap ACLs: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
// try to read the body for the error
d, _ := ioutil.ReadAll(resp.Body)
return "", xerrors.Errorf("Error bootstrapping ACLs, got status code %d, error: %s", resp.StatusCode, string(d))
} else {
result := map[string]interface{}{}
// check number of nodes
json.NewDecoder(resp.Body).Decode(&result)
n.aclToken = result["SecretID"].(string)
return result["SecretID"].(string), nil
}
}

// HealthCheckAPI executes a HTTP health check for a Nomad cluster
func (n *NomadImpl) HealthCheckAPI(timeout time.Duration, simple bool) error {
n.l.Debug("Performing Nomad health check", "address", n.address)
st := time.Now()
for {
Expand All @@ -79,75 +116,87 @@ func (n *NomadImpl) HealthCheckAPI(timeout time.Duration) error {
return fmt.Errorf("Timeout waiting for Nomad healthcheck %s", n.address)
}

rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/nodes", n.address, n.port), nil)
if err != nil {
return err
}

resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == 200 {
nodes := []map[string]interface{}{}
// check number of nodes
json.NewDecoder(resp.Body).Decode(&nodes)

// loop nodes and check ready
readyCount := 0
for _, node := range nodes {
// get the node status
nodeStatus := node["Status"].(string)
nodeName := node["Name"].(string)
nodeEligable := node["SchedulingEligibility"].(string)

n.l.Debug("Node status", "node", nodeName, "status", nodeStatus, "eligible", nodeEligable)
// get the driver status
drivers, ok := node["Drivers"].(map[string]interface{})
if !ok {
continue
}

var driversHealthy = true
var dockerDetected = false
for k, v := range drivers {
driver, ok := v.(map[string]interface{})
if !ok {
continue
}
if simple {
rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/status/leader", n.address, n.port), nil)
if err != nil {
return err
}
resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
} else {
rq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/nodes", n.address, n.port), nil)
n.setAuthHeaders(rq)
if err != nil {
return err
}

healthy, ok := driver["Healthy"].(bool)
resp, err := n.httpClient.Do(rq)
if err == nil && resp.StatusCode == http.StatusOK {
nodes := []map[string]interface{}{}
// check number of nodes
json.NewDecoder(resp.Body).Decode(&nodes)

// loop nodes and check ready
readyCount := 0
for _, node := range nodes {
// get the node status
nodeStatus := node["Status"].(string)
nodeName := node["Name"].(string)
nodeEligable := node["SchedulingEligibility"].(string)

n.l.Debug("Node status", "node", nodeName, "status", nodeStatus, "eligible", nodeEligable)
// get the driver status
drivers, ok := node["Drivers"].(map[string]interface{})
if !ok {
continue
}

detected, ok := driver["Detected"].(bool)
if !ok || !detected {
continue
}
var driversHealthy = true
var dockerDetected = false
for k, v := range drivers {
driver, ok := v.(map[string]interface{})
if !ok {
continue
}

healthy, ok := driver["Healthy"].(bool)
if !ok {
continue
}

detected, ok := driver["Detected"].(bool)
if !ok || !detected {
continue
}

// we need to make a special case to check the docker driver is
// present as if the nomad server starts before docker then the
// presence of docker will not be detected
if k == "docker" {
dockerDetected = true
}

n.l.Debug("Driver status", "node", nodeName, "driver", k, "healthy", healthy)
if !healthy {
driversHealthy = false
}

// we need to make a special case to check the docker driver is
// present as if the nomad server starts before docker then the
// presence of docker will not be detected
if k == "docker" {
dockerDetected = true
}

n.l.Debug("Driver status", "node", nodeName, "driver", k, "healthy", healthy)
if !healthy {
driversHealthy = false
if nodeStatus == "ready" && nodeEligable == "eligible" && driversHealthy && dockerDetected {
readyCount++
}

}

if nodeStatus == "ready" && nodeEligable == "eligible" && driversHealthy && dockerDetected {
readyCount++
if readyCount == n.clientNodes {
n.l.Debug("Nomad check complete", "address", n.address)
return nil
}
}

if readyCount == n.clientNodes {
n.l.Debug("Nomad check complete", "address", n.address)
return nil
n.l.Debug("Nodes not ready", "ready", readyCount, "nodes", n.clientNodes)
}

n.l.Debug("Nodes not ready", "ready", readyCount, "nodes", n.clientNodes)
}

// backoff
Expand All @@ -171,6 +220,7 @@ func (n *NomadImpl) Create(files []string) error {
cr := fmt.Sprintf(`{"Job": %s}`, string(jsonJob))

r, err := http.NewRequest(http.MethodPost, addr, bytes.NewReader([]byte(cr)))
n.setAuthHeaders(r)
if err != nil {
return xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -201,6 +251,7 @@ func (n *NomadImpl) Stop(files []string) error {

// stop the job
r, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s:%d/v1/job/%s", n.address, n.port, id), nil)
n.setAuthHeaders(r)
if err != nil {
return xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -235,6 +286,7 @@ func (n *NomadImpl) ParseJob(file string) ([]byte, error) {

// validate the config with the Nomad API
r, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s:%d/v1/jobs/parse", n.address, n.port), bytes.NewReader(jobData))
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -313,6 +365,7 @@ func (n *NomadImpl) Endpoints(job, group, task string) ([]map[string]string, err
}

r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/allocation/%s", n.address, n.port, j["ID"]), nil)
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down Expand Up @@ -393,6 +446,7 @@ func (n *NomadImpl) Endpoints(job, group, task string) ([]map[string]string, err
func (n *NomadImpl) getJobAllocations(job string) ([]map[string]interface{}, error) {
// get the allocations for the job
r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d/v1/job/%s/allocations", n.address, n.port, job), nil)
n.setAuthHeaders(r)
if err != nil {
return nil, xerrors.Errorf("Unable to create http request: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/clients/nomad/nomad_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func setupNomadTests(t *testing.T) (Nomad, string, *mocks.HTTP) {
)

c := NewNomad(mh, 1*time.Millisecond, logger.NewTestLogger(t))
c.SetConfig("local", 4646, 1)
c.SetConfig("local", 4646, 1, "")

return c, tmpDir, mh
}
Expand Down
35 changes: 27 additions & 8 deletions pkg/config/resources/nomad/provider_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (p *ClusterProvider) Refresh() error {

wg.Wait()

p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1)
err := p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1, p.config.ACLToken)
err := p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -174,8 +174,8 @@ func (p *ClusterProvider) Refresh() error {
p.log.Debug("Successfully created client node", "ref", p.config.ID, "client", fqdn)
}

p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1)
err := p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, p.config.ClientNodes+1, p.config.ACLToken)
err := p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -391,8 +391,19 @@ func (p *ClusterProvider) createNomad() error {
}

// ensure all client nodes are up
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, clientNodes)
err = p.nomadClient.HealthCheckAPI(startTimeout)
p.nomadClient.SetConfig(fmt.Sprintf("http://%s", p.config.ExternalIP), p.config.APIPort, clientNodes, p.config.ACLToken)
err = p.nomadClient.HealthCheckAPI(startTimeout, true)
if err != nil {
return err
}
if p.config.ACLEnabled {
acl_token, err := p.nomadClient.BootstrapACLs()
if err != nil {
return err
}
p.config.ACLToken = acl_token
}
err = p.nomadClient.HealthCheckAPI(startTimeout, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -423,7 +434,7 @@ func (p *ClusterProvider) createServerNode(img ctypes.Image, volumeID string, is
}

// generate the server config
sc := dataDir + "\n" + fmt.Sprintf(serverConfig, p.config.Datacenter, cpu)
sc := dataDir + "\n" + fmt.Sprintf(serverConfig, p.config.Datacenter, cpu, p.config.ACLEnabled)

// write the nomad config to a file
os.MkdirAll(p.config.ConfigDir, os.ModePerm)
Expand Down Expand Up @@ -540,7 +551,7 @@ func (p *ClusterProvider) createServerNode(img ctypes.Image, volumeID string, is
// returns the fqdn, docker id, and an error if unsuccessful
func (p *ClusterProvider) createClientNode(id string, image, volumeID, serverID string) (string, string, error) {
// generate the client config
sc := dataDir + "\n" + fmt.Sprintf(clientConfig, p.config.Datacenter, serverID)
sc := dataDir + "\n" + fmt.Sprintf(clientConfig, p.config.Datacenter, serverID, p.config.ACLEnabled)

// write the default config to a file
clientConfigPath := path.Join(p.config.ConfigDir, "client_config.hcl")
Expand Down Expand Up @@ -1002,6 +1013,10 @@ client {
%s
}

acl {
enabled = %t
}

plugin "raw_exec" {
config {
enabled = true
Expand All @@ -1020,6 +1035,10 @@ client {
}
}

acl {
enabled = %t
}

plugin "raw_exec" {
config {
enabled = true
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/resources/nomad/provider_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *JobProvider) Create() error {
nomadCluster := p.config.Cluster

// load the config
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes)
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes, nomadCluster.ACLToken)

err := p.client.Create(p.config.Paths)
if err != nil {
Expand Down Expand Up @@ -99,7 +99,7 @@ func (p *JobProvider) Destroy() error {
nomadCluster := p.config.Cluster

// load the config
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes)
p.client.SetConfig(fmt.Sprintf("http://%s", nomadCluster.ExternalIP), nomadCluster.APIPort, nomadCluster.ClientNodes, nomadCluster.ACLToken)

err := p.client.Stop(p.config.Paths)
if err != nil {
Expand Down
Loading
Loading