Skip to content

Commit

Permalink
feature. add cluster_byoh batch job.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktkfree committed Oct 10, 2023
1 parent f624ab0 commit 35dd33a
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 12 deletions.
107 changes: 107 additions & 0 deletions cmd/server/cluster_byoh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package main

import (
"bytes"
"encoding/json"
"fmt"

_apiClient "github.com/openinfradev/tks-api/pkg/api-client"

Check failure on line 8 in cmd/server/cluster_byoh.go

View workflow job for this annotation

GitHub Actions / unittest

missing go.sum entry for module providing package github.com/openinfradev/tks-api/pkg/api-client (imported by github.com/openinfradev/tks-batch/cmd/server); to add:

Check failure on line 8 in cmd/server/cluster_byoh.go

View workflow job for this annotation

GitHub Actions / unittest

missing go.sum entry for module providing package github.com/openinfradev/tks-api/pkg/api-client (imported by github.com/openinfradev/tks-batch/cmd/server); to add:
"github.com/openinfradev/tks-api/pkg/domain"
"github.com/openinfradev/tks-api/pkg/log"
"github.com/spf13/viper"
)

var token string

func processClusterByoh() error {
// get clusters
clusters, err := clusterAccessor.GetBootstrappedByohClusters()
if err != nil {
return err
}
if len(clusters) == 0 {
return nil
}
log.Info("byoh clusters : ", clusters)

token = getTksApiToken()
for _, cluster := range clusters {
clusterId := cluster.ID

// check agent node
apiClient, err := _apiClient.New(fmt.Sprintf("%s:%d", viper.GetString("tks-api-address"), viper.GetInt("tks-api-port")), token)
if err != nil {
log.Error(err)
continue
}

url := fmt.Sprintf("organizations/%s/stacks/%s/nodes", cluster.OrganizationId, clusterId)
body, err := apiClient.Get(url)
if err != nil {
return err
}

var out domain.GetStackNodesResponse
transcode(body, &out)

completed := true
for _, node := range out.Nodes {
log.Info(node.Status)
if node.Status != "Completed" {
completed = false
}
}
if completed {
log.Debug(fmt.Sprintf("all agent registered! starting stack creation. clusterId %s", clusterId))
err := clusterAccessor.UpdateClusterStatus(clusterId, domain.ClusterStatus_PENDING)
if err != nil {
log.Error("Failed to update cluster status err : ", err)
continue
}
}
}
return nil
}

func transcode(in, out interface{}) {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(in)
if err != nil {
fmt.Println(err)
}
err = json.NewDecoder(buf).Decode(out)
if err != nil {
fmt.Println(err)
}
}

func getTksApiToken() string {
apiClient, err := _apiClient.New(fmt.Sprintf("%s:%d", viper.GetString("tks-api-address"), viper.GetInt("tks-api-port")), "")
if err != nil {
log.Error(err)
return ""
}

_, err = apiClient.Post("auth/ping", domain.PingTokenRequest{
Token: token,
OrganizationId: "master",
})
if err != nil {
body, err := apiClient.Post("auth/login", domain.LoginRequest{
AccountId: viper.GetString("tks-api-account"),
Password: viper.GetString("tks-api-password"),
OrganizationId: "master",
})
if err != nil {
return ""
}

var out domain.LoginResponse
transcode(body, &out)

log.Info(out.User.Token)
token = out.User.Token
}

return token
}
13 changes: 12 additions & 1 deletion cmd/server/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func processClusterStatus() error {
case "Error":
newStatus = domain.ClusterStatus_DELETE_ERROR
}
} else if status == domain.ClusterStatus_BOOTSTRAPPING {
switch workflow.Status.Phase {
case "Running":
newStatus = domain.ClusterStatus_BOOTSTRAPPING
case "Succeeded":
newStatus = domain.ClusterStatus_BOOTSTRAPPED
case "Failed":
newStatus = domain.ClusterStatus_BOOTSTRAP_ERROR
case "Error":
newStatus = domain.ClusterStatus_BOOTSTRAP_ERROR
}
}
if newStatus == domain.ClusterStatus_PENDING {
continue
Expand All @@ -72,7 +83,7 @@ func processClusterStatus() error {

if status != newStatus || statusDesc != newMessage {
log.Debug(fmt.Sprintf("update status!! clusterId [%s], newStatus [%s], newMessage [%s]", clusterId, newStatus, newMessage))
err := clusterAccessor.UpdateClusterStatus(clusterId, newStatus, newMessage, workflowId)
err := clusterAccessor.UpdateClusterStatusWithWorkflow(clusterId, newStatus, newMessage, workflowId)
if err != nil {
log.Error("Failed to update cluster status err : ", err)
continue
Expand Down
8 changes: 8 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func init() {
flag.Int("port", 9112, "service port")
flag.String("argo-address", "localhost", "server address for argo-workflow-server")
flag.Int("argo-port", 2746, "server port for argo-workflow-server")
flag.String("tks-api-address", "http://localhost", "server address for tks-api")
flag.Int("tks-api-port", 8080, "server port number for tks-api")
flag.String("tks-api-account", "", "account name for tks-api")
flag.String("tks-api-password", "", "the password for tks-api account")

flag.String("dbhost", "localhost", "host of postgreSQL")
flag.String("dbport", "5432", "port of postgreSQL")
Expand Down Expand Up @@ -86,6 +90,10 @@ func main() {
if err != nil {
log.Error(err)
}
err = processClusterByoh()
if err != nil {
log.Error(err)
}

time.Sleep(time.Second * INTERVAL_SEC)
}
Expand Down
Binary file added cmd/server/server
Binary file not shown.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openinfradev/tks-batch
go 1.18

require (
github.com/openinfradev/tks-api v0.0.0-20230621070855-6fc105a25e72
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
gorm.io/driver/postgres v1.4.5
Expand All @@ -28,8 +29,8 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/openinfradev/tks-api v0.0.0-20230621070855-6fc105a25e72 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/openinfradev/tks-api v0.0.0-20230524092045-07d78d318640 h1:g+vESHF0vnB5ObeTbnb/U3Dwx7kDpAuRc2ftxu/o3x4=
github.com/openinfradev/tks-api v0.0.0-20230524092045-07d78d318640/go.mod h1:FfhP5GE5TpRaHGUgTlMyxSjRsr6szNy+KhzXWBkMQ4g=
github.com/openinfradev/tks-api v0.0.0-20230621070855-6fc105a25e72 h1:07S05WaRiIXS+i4Avbavr/vAr+97IKC62/J5YcXDH+c=
github.com/openinfradev/tks-api v0.0.0-20230621070855-6fc105a25e72/go.mod h1:FfhP5GE5TpRaHGUgTlMyxSjRsr6szNy+KhzXWBkMQ4g=
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
39 changes: 33 additions & 6 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (

// Cluster represents a kubernetes cluster information.
type Cluster struct {
ID string `gorm:"primarykey"`
WorkflowId string
Status domain.ClusterStatus
StatusDesc string
ID string `gorm:"primarykey"`
OrganizationId string
WorkflowId string
Status domain.ClusterStatus
StatusDesc string
}

// Accessor accesses cluster info in DB.
Expand All @@ -38,7 +39,7 @@ func (x *ClusterAccessor) GetIncompleteClusters() ([]Cluster, error) {
var clusters []Cluster

res := x.db.
Where("status IN ?", []domain.ClusterStatus{domain.ClusterStatus_INSTALLING, domain.ClusterStatus_DELETING}).
Where("status IN ?", []domain.ClusterStatus{domain.ClusterStatus_BOOTSTRAPPING, domain.ClusterStatus_INSTALLING, domain.ClusterStatus_DELETING}).
Find(&clusters)

if res.Error != nil {
Expand All @@ -48,7 +49,21 @@ func (x *ClusterAccessor) GetIncompleteClusters() ([]Cluster, error) {
return clusters, nil
}

func (x *ClusterAccessor) UpdateClusterStatus(clusterId string, status domain.ClusterStatus, statusDesc string, workflowId string) error {
func (x *ClusterAccessor) GetBootstrappedByohClusters() ([]Cluster, error) {
var clusters []Cluster

res := x.db.
Where("cloud_service = 'BYOH' AND status IN ?", []domain.ClusterStatus{domain.ClusterStatus_BOOTSTRAPPED}).
Find(&clusters)

if res.Error != nil {
return nil, res.Error
}

return clusters, nil
}

func (x *ClusterAccessor) UpdateClusterStatusWithWorkflow(clusterId string, status domain.ClusterStatus, statusDesc string, workflowId string) error {
log.Info(fmt.Sprintf("UpdateClusterStatus. clusterId[%s], status[%d], statusDesc[%s], workflowId[%s]", clusterId, status, statusDesc, workflowId))
res := x.db.Model(Cluster{}).
Where("ID = ?", clusterId).
Expand All @@ -59,3 +74,15 @@ func (x *ClusterAccessor) UpdateClusterStatus(clusterId string, status domain.Cl
}
return nil
}

func (x *ClusterAccessor) UpdateClusterStatus(clusterId string, status domain.ClusterStatus) error {
log.Info(fmt.Sprintf("UpdateClusterStatus. clusterId[%s], status[%d]", clusterId, status))
res := x.db.Model(Cluster{}).
Where("ID = ?", clusterId).
Updates(map[string]interface{}{"Status": status})

if res.Error != nil || res.RowsAffected == 0 {
return fmt.Errorf("nothing updated in cluster with id %s", clusterId)
}
return nil
}

0 comments on commit 35dd33a

Please sign in to comment.