Skip to content

Commit

Permalink
Merge pull request #491 from signal18/dynamic_config
Browse files Browse the repository at this point in the history
Dynamic config
  • Loading branch information
svaroqui authored May 12, 2023
2 parents 72b25fe + f60fca0 commit 40d1c0e
Show file tree
Hide file tree
Showing 56 changed files with 1,777 additions and 551 deletions.
570 changes: 510 additions & 60 deletions cluster/cluster.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions cluster/cluster_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ func (cluster *Cluster) IsURLPassACL(strUser string, URL string) bool {
return true
}
}
if cluster.APIUsers[strUser].Grants[config.GrantClusterDelete] {
if strings.Contains(URL, "/api/clusters/actions/delete") {
return true
}
}
/* case cluster.APIUsers[strUser].Grants[config.GrantClusterGrant] == true:
return false
case cluster.APIUsers[strUser].Grants[config.GrantClusterDropMonitor] == true:
Expand Down
13 changes: 8 additions & 5 deletions cluster/cluster_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package cluster

import (
"fmt"
"strings"
"sync"

Expand All @@ -16,18 +17,19 @@ import (
)

func (cluster *Cluster) AddSeededServer(srv string) error {
fmt.Printf("ADD SEEDED SERVER\n")
if cluster.Conf.Hosts != "" {
cluster.Conf.Hosts = cluster.Conf.Hosts + "," + srv
} else {
cluster.Conf.Hosts = srv
}
cluster.sme.SetFailoverState()
cluster.StateMachine.SetFailoverState()
cluster.newServerList()
wg := new(sync.WaitGroup)
wg.Add(1)
go cluster.TopologyDiscover(wg)
wg.Wait()
cluster.sme.RemoveFailoverState()
cluster.StateMachine.RemoveFailoverState()
return nil
}

Expand Down Expand Up @@ -94,7 +96,7 @@ func (cluster *Cluster) AddSeededProxy(prx string, srv string, port string, user
}
case config.ConstProxySqlproxy:
cluster.Conf.ProxysqlOn = true
cluster.Conf.ProxysqlAdminPort = port
cluster.Conf.ProxysqlPort = port
if user != "" || password != "" {
cluster.Conf.ProxysqlUser = user
cluster.Conf.ProxysqlPassword = password
Expand All @@ -116,11 +118,12 @@ func (cluster *Cluster) AddSeededProxy(prx string, srv string, port string, user
cluster.Conf.MdbsProxyHosts = srv + ":" + port
}
}
cluster.sme.SetFailoverState()
cluster.SetClusterProxySqlCredentialsFromConfig()
cluster.StateMachine.SetFailoverState()
cluster.Lock()
cluster.newProxyList()
cluster.Unlock()
cluster.sme.RemoveFailoverState()
cluster.StateMachine.RemoveFailoverState()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster_bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (cluster *Cluster) ResticPurgeRepo() error {

err := resticcmd.Wait()
if err != nil {
cluster.sme.AddState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
return err
}
if errStdout != nil || errStderr != nil {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (cluster *Cluster) ResticInitRepo() error {

err := resticcmd.Wait()
if err != nil {
cluster.sme.AddState("WARN0095", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0095"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("WARN0095", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0095"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
}
if errStdout != nil || errStderr != nil {
return errors.New("failed to capture stdout or stderr\n")
Expand Down Expand Up @@ -159,7 +159,7 @@ func (cluster *Cluster) ResticFetchRepo() error {

err := resticcmd.Wait()
if err != nil {
cluster.sme.AddState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
cluster.ResticInitRepo()
return err
}
Expand Down
60 changes: 44 additions & 16 deletions cluster/cluster_chk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
Expand All @@ -26,8 +27,8 @@ import (

func (cluster *Cluster) CheckFailed() {
// Don't trigger a failover if a switchover is happening
if cluster.sme.IsInFailover() {
cluster.sme.AddState("ERR00001", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00001"]), ErrFrom: "CHECK"})
if cluster.StateMachine.IsInFailover() {
cluster.StateMachine.AddState("ERR00001", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00001"]), ErrFrom: "CHECK"})
return
}
if cluster.master == nil {
Expand Down Expand Up @@ -115,7 +116,7 @@ func (cluster *Cluster) isAutomaticFailover() bool {
if cluster.Conf.Interactive == false {
return true
}
cluster.sme.AddState("ERR00002", state.State{ErrType: "ERR00002", ErrDesc: fmt.Sprintf(clusterError["ERR00002"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00002", state.State{ErrType: "ERR00002", ErrDesc: fmt.Sprintf(clusterError["ERR00002"]), ErrFrom: "CHECK"})
return false
}

Expand All @@ -136,10 +137,10 @@ func (cluster *Cluster) isMaxMasterFailedCountReached() bool {
// no illimited failed count

if cluster.GetMaster().FailCount >= cluster.Conf.MaxFail {
cluster.sme.AddState("WARN0023", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0023"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("WARN0023", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0023"]), ErrFrom: "CHECK"})
return true
} else {
// cluster.sme.AddState("ERR00023", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf("Constraint is blocking state %s, interactive:%t, maxfail reached:%d", cluster.master.State, cluster.Conf.Interactive, cluster.Conf.MaxFail), ErrFrom: "CONF"})
// cluster.StateMachine.AddState("ERR00023", state.State{ErrType: "ERROR", ErrDesc: fmt.Sprintf("Constraint is blocking state %s, interactive:%t, maxfail reached:%d", cluster.master.State, cluster.Conf.Interactive, cluster.Conf.MaxFail), ErrFrom: "CONF"})
}
return false
}
Expand All @@ -151,7 +152,7 @@ func (cluster *Cluster) isMaxClusterFailoverCountNotReached() bool {
return true
}
if cluster.FailoverCtr == cluster.Conf.FailLimit {
cluster.sme.AddState("ERR00027", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00027"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00027", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00027"]), ErrFrom: "CHECK"})
return false
}
return true
Expand All @@ -165,7 +166,7 @@ func (cluster *Cluster) isBetweenFailoverTimeValid() bool {
}
// cluster.LogPrintf("CHECK: Failover Time to short with previous failover")
if rem > 0 {
cluster.sme.AddState("ERR00029", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00029"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00029", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00029"]), ErrFrom: "CHECK"})
return false
}
return true
Expand Down Expand Up @@ -194,7 +195,7 @@ func (cluster *Cluster) isOneSlaveHeartbeatIncreasing() bool {
cluster.LogPrintf(LvlDbg, "SLAVE_RECEIVED_HEARTBEATS %d", status2["SLAVE_RECEIVED_HEARTBEATS"])
}
if status2["SLAVE_RECEIVED_HEARTBEATS"] > saveheartbeats {
cluster.sme.AddState("ERR00028", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00028"], s.URL), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00028", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00028"], s.URL), ErrFrom: "CHECK"})
return true
}
}
Expand Down Expand Up @@ -254,7 +255,7 @@ func (cluster *Cluster) isMaxscaleSupectRunning() bool {

time.Sleep(time.Duration(cluster.Conf.CheckFalsePositiveMaxscaleTimeout) * time.Second)
if strings.Contains(cluster.master.MxsServerStatus, "Running") {
cluster.sme.AddState("ERR00030", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00030"], cluster.master.MxsServerStatus), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00030", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00030"], cluster.master.MxsServerStatus), ErrFrom: "CHECK"})
return true
}
return false
Expand All @@ -272,7 +273,7 @@ func (cluster *Cluster) isFoundCandidateMaster() bool {
}
if key == -1 {
// No candidates found in slaves list
cluster.sme.AddState("ERR00032", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00032"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00032", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00032"]), ErrFrom: "CHECK"})
return false
}
return true
Expand All @@ -299,7 +300,7 @@ func (cluster *Cluster) isActiveArbitration() bool {
resp, err := client.Do(req)
if err != nil {
cluster.LogPrintf(LvlErr, "%s", err.Error())
cluster.sme.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
return false
}
defer resp.Body.Close()
Expand All @@ -313,14 +314,14 @@ func (cluster *Cluster) isActiveArbitration() bool {
err = json.Unmarshal(body, &r)
if err != nil {
cluster.LogPrintf(LvlErr, "Arbitrator sent invalid JSON")
cluster.sme.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
return false
}
if r.Arbitration == "winner" {
cluster.LogPrintf(LvlInfo, "Arbitrator says: winner")
return true
}
cluster.sme.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00022", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00022"]), ErrFrom: "CHECK"})
return false
}

Expand All @@ -338,7 +339,7 @@ func (cluster *Cluster) isExternalOk() bool {
return false
}
if req.StatusCode == 200 {
cluster.sme.AddState("ERR00031", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00031"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00031", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00031"]), ErrFrom: "CHECK"})
return true
}
return false
Expand All @@ -349,7 +350,7 @@ func (cluster *Cluster) isArbitratorAlive() bool {
return true
}
if cluster.IsFailedArbitrator {
cluster.sme.AddState("ERR00055", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00055"], cluster.Conf.ArbitrationSasHosts), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00055", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00055"], cluster.Conf.ArbitrationSasHosts), ErrFrom: "CHECK"})
return false
}
return true
Expand All @@ -364,7 +365,7 @@ func (cluster *Cluster) isNotFirstSlave() bool {
// - first replication-manager start on no topology
// - all cluster down
if cluster.master == nil {
cluster.sme.AddState("ERR00026", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00026"]), ErrFrom: "CHECK"})
cluster.StateMachine.AddState("ERR00026", state.State{ErrType: LvlErr, ErrDesc: fmt.Sprintf(clusterError["ERR00026"]), ErrFrom: "CHECK"})
return false
}

Expand Down Expand Up @@ -746,3 +747,30 @@ func (cluster *Cluster) CheckCanSaveDynamicConfig() {
cluster.SetState("ERR00090", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["ERR00090"]), ErrFrom: "CLUSTER"})
}
}

func (cluster *Cluster) CheckIsOverwrite() {
cluster.LogPrintf(LvlDbg, "Check overwrite conf path : %s\n", cluster.Conf.WorkingDir+"/"+cluster.Name)
if _, err := os.Stat(cluster.Conf.WorkingDir + "/" + cluster.Name + "/overwrite.toml"); !os.IsNotExist(err) {
input, err := ioutil.ReadFile(cluster.Conf.WorkingDir + "/" + cluster.Name + "/overwrite.toml")
if err != nil {
cluster.LogPrintf(LvlErr, "Cannot read config file %s : %s", cluster.Conf.WorkingDir+"/"+cluster.Name+"/overwrite.toml", err)
return
}

lines := strings.Split(string(input), "\n")
for i, line := range lines {
if i == 1 {
line = strings.ReplaceAll(line, " ", "")
if line != "" {
cluster.SetState("WARN0102", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0102"]), ErrFrom: "CLUSTER"})
cluster.LogPrintf(LvlErr, "An immutable parameter has been changed in cluster %s and is tracked in overwrite.toml. Use the config-merge command to save your changes.\n", cluster.Name)
cluster.LogPrintf(LvlDbg, "Check overwrite is not empty line %d : %s\n", i, line)
} else {
cluster.LogPrintf(LvlDbg, "Check overwrite is empty line %d : %s\n", i, line)
}

}
//cluster.SetState("WARN0102", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0102"]), ErrFrom: "CLUSTER"})
}
}
}
Loading

0 comments on commit 40d1c0e

Please sign in to comment.