Skip to content

Commit

Permalink
feature. change apply rule logic to reload
Browse files Browse the repository at this point in the history
  • Loading branch information
ktkfree committed Apr 26, 2024
1 parent ca72921 commit 4dfcd23
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/server/appgroup_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func processAppGroupStatus() error {
if len(appGroups) == 0 {
return nil
}
log.Info(context.TODO(), "appGroups : ", appGroups)
log.Info(context.TODO(), "[processAppGroupStatus] appGroups : ", appGroups)

for i := range appGroups {
appGroup := appGroups[i]
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cloud_account_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processCloudAccountStatus() error {
if len(cloudAccounts) == 0 {
return nil
}
log.Info(context.TODO(), "cloudAccounts : ", cloudAccounts)
log.Info(context.TODO(), "[processCloudAccountStatus] cloudAccounts : ", cloudAccounts)

for i := range cloudAccounts {
cloudaccount := cloudAccounts[i]
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cluster_byoh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func processClusterByoh() error {
if len(clusters) == 0 {
return nil
}
log.Info(context.TODO(), "byoh clusters : ", clusters)
log.Info(context.TODO(), "[processClusterByoh] byoh clusters : ", clusters)

token = getTksApiToken()
if token != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processClusterStatus() error {
if len(clusters) == 0 {
return nil
}
log.Info(context.TODO(), "clusters : ", clusters)
log.Info(context.TODO(), "[processClusterStatus] clusters : ", clusters)

for i := range clusters {
cluster := clusters[i]
Expand Down
5 changes: 4 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func main() {
if err != nil {
log.Error(context.TODO(), err)
}

err = processReloadThanosRules()
if err != nil {
log.Error(context.TODO(), err)
}
time.Sleep(time.Second * INTERVAL_SEC)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/organization_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func processOrganizationStatus() error {
if len(organizations) == 0 {
return nil
}
log.Info(context.TODO(), "organizations : ", organizations)
log.Info(context.TODO(), "[processOrganizationStatus] organizations : ", organizations)

for i := range organizations {
organization := organizations[i]
Expand Down
21 changes: 12 additions & 9 deletions cmd/server/system_notification_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func processSystemNotificationRule() error {
if len(rules) == 0 {
return nil
}
log.Info(context.TODO(), "incompleted rules : ", len(rules))
log.Info(context.TODO(), "[processSystemNotificationRule] incompleted rules : ", len(rules))

incompletedOrganizations := []string{}

Expand Down Expand Up @@ -219,14 +219,17 @@ func applyRules(organizationId string, primaryClusterId string, rc RulerConfig)
}

// restart thanos-ruler
deletePolicy := metav1.DeletePropagationForeground
err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
log.Error(context.TODO(), err)
return err
}
// thanos-ruler reload 방식으로 변경했으나, 혹시 몰라 일단 코드는 주석처리해둠
/*
deletePolicy := metav1.DeletePropagationForeground
err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
log.Error(context.TODO(), err)
return err
}
*/

// update status
err = systemNotificationRuleAccessor.UpdateSystemNotificationRuleStatus(organizationId, domain.SystemNotificationRuleStatus_APPLIED)
Expand Down
102 changes: 102 additions & 0 deletions cmd/server/thanos_ruler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"strconv"

"github.com/openinfradev/tks-api/pkg/kubernetes"
"github.com/openinfradev/tks-api/pkg/log"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const LAST_UPDATED_MIN = 1

func processReloadThanosRules() error {
organizationIds, err := systemNotificationRuleAccessor.GetRecentlyUpdatedOrganizations(LAST_UPDATED_MIN)
if err != nil {
return err
}
if len(organizationIds) == 0 {
return nil
}
log.Info(context.TODO(), "[processReloadThanosRules] new updated organizationIds : ", organizationIds)

for _, organizationId := range organizationIds {
organization, err := organizationAccessor.Get(organizationId)
if err != nil {
log.Error(context.TODO(), err)
continue
}

url, err := GetThanosRulerUrl(organization.PrimaryClusterId)
if err != nil {
log.Error(context.TODO(), err)
continue
}

if err = Reload(url); err != nil {
log.Error(context.TODO(), err)
continue
}
}

return nil
}

func GetThanosRulerUrl(primaryClusterId string) (url string, err error) {
clientset_admin, err := kubernetes.GetClientAdminCluster(context.TODO())
if err != nil {
return url, errors.Wrap(err, "Failed to get client set for user cluster")
}

secrets, err := clientset_admin.CoreV1().Secrets(primaryClusterId).Get(context.TODO(), "tks-endpoint-secret", metav1.GetOptions{})
if err != nil {
log.Info(context.TODO(), "cannot found tks-endpoint-secret. so use LoadBalancer...")

clientset_user, err := kubernetes.GetClientFromClusterId(context.TODO(), primaryClusterId)
if err != nil {
return url, errors.Wrap(err, "Failed to get client set for user cluster")
}

service, err := clientset_user.CoreV1().Services("lma").Get(context.TODO(), "thanos-ruler", metav1.GetOptions{})
if err != nil {
return url, errors.Wrap(err, "Failed to get services.")
}

// LoadBalaner 일경우, aws address 형태의 경우만 가정한다.
if service.Spec.Type != "LoadBalancer" {
return url, fmt.Errorf("Service type is not LoadBalancer. [%s] ", service.Spec.Type)
}

lbs := service.Status.LoadBalancer.Ingress
ports := service.Spec.Ports
if len(lbs) > 0 && len(ports) > 0 {
url = ports[0].TargetPort.StrVal + "://" + lbs[0].Hostname + ":" + strconv.Itoa(int(ports[0].Port))
}
} else {
url = "http://" + string(secrets.Data["thanos-ruler"])
}
return url, nil
}

func Reload(thanosRulerUrl string) (err error) {
reqUrl := thanosRulerUrl + "/-/reload"

log.Info(context.TODO(), "url : ", reqUrl)
resp, err := http.Post(reqUrl, "text/plain", nil)
if err != nil {
return err
}

defer resp.Body.Close()

_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
return nil
}
18 changes: 14 additions & 4 deletions internal/organization/organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (

// Organization represents a kubernetes organization information.
type Organization struct {
ID string `gorm:"primarykey"`
WorkflowId string
Status domain.OrganizationStatus
StatusDesc string
ID string `gorm:"primarykey"`
WorkflowId string
Status domain.OrganizationStatus
StatusDesc string
PrimaryClusterId string
}

// Accessor accesses organization info in DB.
Expand Down Expand Up @@ -49,6 +50,15 @@ func (x *OrganizationAccessor) GetIncompleteOrganizations() ([]Organization, err
return organizations, nil
}

func (x *OrganizationAccessor) Get(id string) (organization Organization, err error) {
res := x.db.Where("id = ?", id).First(&organization)
if res.Error != nil {
return organization, res.Error
}

return
}

func (x *OrganizationAccessor) UpdateOrganizationStatus(organizationId string, status domain.OrganizationStatus, statusDesc string, workflowId string) error {
log.Info(context.TODO(), fmt.Sprintf("UpdateOrganizationStatus. organizationId[%s], status[%d], statusDesc[%s], workflowId[%s]", organizationId, status, statusDesc, workflowId))
res := x.db.Model(Organization{}).
Expand Down
22 changes: 20 additions & 2 deletions internal/system-notification-rule/syste-notification-rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ func (x *SystemNotificationAccessor) GetIncompletedRules() ([]SystemNotification
Joins("join clusters on clusters.id = organizations.primary_cluster_id AND clusters.status = ?", domain.ClusterStatus_RUNNING).
Joins("join app_groups on app_groups.cluster_id = clusters.id AND app_groups.status = ?", domain.AppGroupStatus_RUNNING).
Where("system_notification_rules.status = ?", domain.SystemNotificationRuleStatus_PENDING).
//Where("system_notification_rules.is_system = false").
Order("system_notification_rules.organization_id").
Unscoped().
Find(&rules)

Expand All @@ -111,6 +109,26 @@ func (x *SystemNotificationAccessor) GetIncompletedRules() ([]SystemNotification
return rules, nil
}

func (x *SystemNotificationAccessor) GetRecentlyUpdatedOrganizations(lastUpdateMin int) ([]string, error) {
var organizationIds []string

res := x.db.Model(&SystemNotificationRule{}).
Select("system_notification_rules.organization_id").
Joins("join organizations on organizations.id = system_notification_rules.organization_id").
Joins("join clusters on clusters.id = organizations.primary_cluster_id AND clusters.status = ?", domain.ClusterStatus_RUNNING).
Joins("join app_groups on app_groups.cluster_id = clusters.id AND app_groups.status = ?", domain.AppGroupStatus_RUNNING).
Where("system_notification_rules.status = ?", domain.SystemNotificationRuleStatus_APPLIED).
Where(fmt.Sprintf("system_notification_rules.updated_at between now()-interval '%d minutes' and now() OR system_notification_rules.deleted_at between now()-interval '%d minutes' and now()", lastUpdateMin, lastUpdateMin)).
Group("system_notification_rules.organization_id").
Unscoped().
Find(&organizationIds)

if res.Error != nil {
return nil, res.Error
}
return organizationIds, nil
}

func (x *SystemNotificationAccessor) GetRules(organizationId string) ([]SystemNotificationRule, error) {
var rules []SystemNotificationRule

Expand Down

0 comments on commit 4dfcd23

Please sign in to comment.