Skip to content

Commit

Permalink
feature. change apply rule logic to reload
Browse files Browse the repository at this point in the history
  • Loading branch information
ktkfree committed Apr 25, 2024
1 parent ca72921 commit e1912de
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/server/appgroup_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func processAppGroupStatus() error {
if len(appGroups) == 0 {
return nil
}
log.Info(context.TODO(), "appGroups : ", appGroups)
log.Info(context.TODO(), "[processAppGroupStatus] appGroups : ", appGroups)

for i := range appGroups {
appGroup := appGroups[i]
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cloud_account_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processCloudAccountStatus() error {
if len(cloudAccounts) == 0 {
return nil
}
log.Info(context.TODO(), "cloudAccounts : ", cloudAccounts)
log.Info(context.TODO(), "[processCloudAccountStatus] cloudAccounts : ", cloudAccounts)

for i := range cloudAccounts {
cloudaccount := cloudAccounts[i]
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cluster_byoh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func processClusterByoh() error {
if len(clusters) == 0 {
return nil
}
log.Info(context.TODO(), "byoh clusters : ", clusters)
log.Info(context.TODO(), "[processClusterByoh] byoh clusters : ", clusters)

token = getTksApiToken()
if token != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processClusterStatus() error {
if len(clusters) == 0 {
return nil
}
log.Info(context.TODO(), "clusters : ", clusters)
log.Info(context.TODO(), "[processClusterStatus] clusters : ", clusters)

for i := range clusters {
cluster := clusters[i]
Expand Down
5 changes: 4 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func main() {
if err != nil {
log.Error(context.TODO(), err)
}

err = processReloadThanosRules()
if err != nil {
log.Error(context.TODO(), err)
}
time.Sleep(time.Second * INTERVAL_SEC)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/organization_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processOrganizationStatus() error {
if len(organizations) == 0 {
return nil
}
log.Info(context.TODO(), "organizations : ", organizations)
log.Info(context.TODO(), "[processOrganizationStatus] organizations : ", organizations)

for i := range organizations {
organization := organizations[i]
Expand Down
21 changes: 12 additions & 9 deletions cmd/server/system_notification_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func processSystemNotificationRule() error {
if len(rules) == 0 {
return nil
}
log.Info(context.TODO(), "incompleted rules : ", len(rules))
log.Info(context.TODO(), "[processSystemNotificationRule] incompleted rules : ", len(rules))

incompletedOrganizations := []string{}

Expand Down Expand Up @@ -219,14 +219,17 @@ func applyRules(organizationId string, primaryClusterId string, rc RulerConfig)
}

// restart thanos-ruler
deletePolicy := metav1.DeletePropagationForeground
err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
log.Error(context.TODO(), err)
return err
}
// thanos-ruler reload 방식으로 변경했으나, 혹시 몰라 일단 코드는 주석처리해둠
/*
deletePolicy := metav1.DeletePropagationForeground
err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
log.Error(context.TODO(), err)
return err
}
*/

// update status
err = systemNotificationRuleAccessor.UpdateSystemNotificationRuleStatus(organizationId, domain.SystemNotificationRuleStatus_APPLIED)
Expand Down
102 changes: 102 additions & 0 deletions cmd/server/thanos_ruler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"strconv"

"github.com/openinfradev/tks-api/pkg/kubernetes"
"github.com/openinfradev/tks-api/pkg/log"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const LAST_UPDATED_MIN = 1

func processReloadThanosRules() error {
organizationIds, err := systemNotificationRuleAccessor.GetRecentlyUpdatedOrganizations(LAST_UPDATED_MIN)

Check failure on line 19 in cmd/server/thanos_ruler.go

View workflow job for this annotation

GitHub Actions / unittest

systemNotificationRuleAccessor.GetRecentlyUpdatedOrganizations undefined (type *systemNotification.SystemNotificationAccessor has no field or method GetRecentlyUpdatedOrganizations)
if err != nil {
return err
}
if len(organizationIds) == 0 {
return nil
}
log.Info(context.TODO(), "[processReloadThanosRules] new updated organizationIds : ", organizationIds)

for _, organizationId := range organizationIds {
organization, err := organizationAccessor.Get(organizationId)

Check failure on line 29 in cmd/server/thanos_ruler.go

View workflow job for this annotation

GitHub Actions / unittest

organizationAccessor.Get undefined (type *organization.OrganizationAccessor has no field or method Get)
if err != nil {
log.Error(context.TODO(), err)
continue
}

url, err := GetThanosRulerUrl(organization.PrimaryClusterId)
if err != nil {
log.Error(context.TODO(), err)
continue
}

if err = Reload(url); err != nil {
log.Error(context.TODO(), err)
continue
}
}

return nil
}

func GetThanosRulerUrl(primaryClusterId string) (url string, err error) {
clientset_admin, err := kubernetes.GetClientAdminCluster(context.TODO())
if err != nil {
return url, errors.Wrap(err, "Failed to get client set for user cluster")
}

secrets, err := clientset_admin.CoreV1().Secrets(primaryClusterId).Get(context.TODO(), "tks-endpoint-secret", metav1.GetOptions{})
if err != nil {
log.Info(context.TODO(), "cannot found tks-endpoint-secret. so use LoadBalancer...")

clientset_user, err := kubernetes.GetClientFromClusterId(context.TODO(), primaryClusterId)
if err != nil {
return url, errors.Wrap(err, "Failed to get client set for user cluster")
}

service, err := clientset_user.CoreV1().Services("lma").Get(context.TODO(), "thanos-ruler", metav1.GetOptions{})
if err != nil {
return url, errors.Wrap(err, "Failed to get services.")
}

// LoadBalaner 일경우, aws address 형태의 경우만 가정한다.
if service.Spec.Type != "LoadBalancer" {
return url, fmt.Errorf("Service type is not LoadBalancer. [%s] ", service.Spec.Type)
}

lbs := service.Status.LoadBalancer.Ingress
ports := service.Spec.Ports
if len(lbs) > 0 && len(ports) > 0 {
url = ports[0].TargetPort.StrVal + "://" + lbs[0].Hostname + ":" + strconv.Itoa(int(ports[0].Port))
}
} else {
url = "http://" + string(secrets.Data["thanos"])
}
return url, nil
}

func Reload(thanosRulerUrl string) (err error) {
reqUrl := thanosRulerUrl + "/-/reload"

log.Info(context.TODO(), "url : ", reqUrl)
resp, err := http.Post(reqUrl, "text/plain", nil)
if err != nil {
return err
}

defer resp.Body.Close()

_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
return nil
}

0 comments on commit e1912de

Please sign in to comment.