Skip to content

Commit

Permalink
fix dbJob script, add list port for sst sender
Browse files Browse the repository at this point in the history
  • Loading branch information
emmaloubersac committed May 12, 2023
1 parent e7aa39c commit f60fca0
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 36 deletions.
56 changes: 37 additions & 19 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type Cluster struct {
errorConnectVault error `json:"-"`
SqlErrorLog *logsql.Logger `json:"-"`
SqlGeneralLog *logsql.Logger `json:"-"`
SstAvailablePorts map[string]string `json:"sstAvailablePorts"`
sync.Mutex
crcTable *crc64.Table
}
Expand Down Expand Up @@ -307,6 +308,25 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface
cluster.DynamicFlagMap = dyn
cluster.DefaultFlagMap = def
conf := confs.ConfInit

cluster.Conf = conf

cluster.tlog = tlog
cluster.htlog = loghttp
cluster.termlength = termlength
cluster.Name = cfgGroup

cluster.runUUID = runUUID
cluster.repmgrHostname = repmgrHostname
cluster.repmgrVersion = repmgrVersion
cluster.key = key

cluster.InitFromConf()

return nil
}

func (cluster *Cluster) InitFromConf() {
cluster.SqlErrorLog = logsql.New()
cluster.SqlGeneralLog = logsql.New()
cluster.crcTable = crc64.MakeTable(crc64.ECMA) // http://golang.org/pkg/hash/crc64/#pkg-constants
Expand All @@ -326,36 +346,34 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface
cluster.testStopCluster = true
cluster.testStartCluster = true

cluster.tlog = tlog
cluster.htlog = loghttp
cluster.termlength = termlength
cluster.Name = cfgGroup
cluster.WorkingDir = conf.WorkingDir + "/" + cluster.Name
cluster.runUUID = runUUID
cluster.repmgrHostname = repmgrHostname
cluster.repmgrVersion = repmgrVersion
cluster.key = key

if conf.Arbitration {
cluster.WorkingDir = cluster.Conf.WorkingDir + "/" + cluster.Name
if cluster.Conf.Arbitration {
cluster.Status = ConstMonitorStandby
} else {
cluster.Status = ConstMonitorActif
}
cluster.benchmarkType = "sysbench"
cluster.Log = s18log.NewHttpLog(200)
cluster.MonitorType = conf.GetMonitorType()
cluster.TopologyType = conf.GetTopologyType()
cluster.FSType = conf.GetFSType()
cluster.DiskType = conf.GetDiskType()
cluster.VMType = conf.GetVMType()
cluster.Grants = conf.GetGrantType()

cluster.MonitorType = cluster.Conf.GetMonitorType()
cluster.TopologyType = cluster.Conf.GetTopologyType()
cluster.FSType = cluster.Conf.GetFSType()
cluster.DiskType = cluster.Conf.GetDiskType()
cluster.VMType = cluster.Conf.GetVMType()
cluster.Grants = cluster.Conf.GetGrantType()

cluster.QueryRules = make(map[uint32]config.QueryRule)
cluster.Schedule = make(map[string]cron.Entry)
cluster.JobResults = make(map[string]*JobResult)
cluster.SstAvailablePorts = make(map[string]string)
lstPort := strings.Split(cluster.Conf.SchedulerSenderPorts, ",")
for _, p := range lstPort {
cluster.SstAvailablePorts[p] = p
}
// Initialize the state machine at this stage where everything is fine.
cluster.StateMachine = new(state.StateMachine)
cluster.StateMachine.Init()
cluster.Conf = conf

if cluster.Conf.Interactive {
cluster.LogPrintf(LvlInfo, "Failover in interactive mode")
} else {
Expand Down Expand Up @@ -469,7 +487,6 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface
}
//fmt.Printf("INIT CLUSTER CONF :\n")
//cluster.Conf.PrintConf()
return nil
}

func (cluster *Cluster) initOrchetratorNodes() {
Expand Down Expand Up @@ -1144,6 +1161,7 @@ func (cluster *Cluster) InitAgent(conf config.Config) {
func (cluster *Cluster) ReloadConfig(conf config.Config) {
cluster.Conf = conf
cluster.Configurator.SetConfig(conf)
cluster.InitFromConf()
cluster.StateMachine.SetFailoverState()
cluster.ResetStates()

Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (cluster *Cluster) SetSchedulerDbJobsSsh() {
}
if cluster.Conf.SchedulerJobsSSH {
var err error
cluster.LogPrintf(LvlInfo, "Schedule Sla rotate at: %s", cluster.Conf.SchedulerJobsSSHCron)
cluster.LogPrintf(LvlInfo, "Schedule SshDbJob rotate at: %s", cluster.Conf.SchedulerJobsSSHCron)
cluster.idSchedulerDbsjobsSsh, err = cluster.scheduler.AddFunc(cluster.Conf.SchedulerJobsSSHCron, func() {
for _, s := range cluster.Servers {
s.JobRunViaSSH()
Expand Down
25 changes: 21 additions & 4 deletions cluster/cluster_sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (cluster *Cluster) SSTRunReceiverToRestic(filename string) (string, error)
return "", err
}

sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":0")
sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
if err != nil {
cluster.LogPrintf(LvlErr, "Exiting SST on socket listen %s", err)
return "", err
Expand Down Expand Up @@ -128,16 +128,16 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) (

sst.outfilewriter = io.MultiWriter(writers...)

sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":0")
sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort())
if err != nil {
cluster.LogPrintf(LvlErr, "Exiting SST on socket listen %s", err)
return "", err
}
sst.tcplistener = sst.listener.(*net.TCPListener)
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 120))
sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 3600))
destinationPort := sst.listener.Addr().(*net.TCPAddr).Port
if sst.cluster.Conf.LogSST {
cluster.LogPrintf(LvlInfo, "Listening for SST on port %d", destinationPort)
cluster.LogPrintf(LvlInfo, "Listening for SST on port to file %d", destinationPort)
}
SSTs.Lock()
SSTs.SSTconnections[destinationPort] = sst
Expand All @@ -161,6 +161,7 @@ func (sst *SST) tcp_con_handle_to_file() {
sst.listener.Close()
SSTs.Lock()
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()
}()

Expand Down Expand Up @@ -196,6 +197,7 @@ func (sst *SST) tcp_con_handle_to_restic() {
sst.listener.Close()
SSTs.Lock()
delete(SSTs.SSTconnections, port)
sst.cluster.SSTSenderFreePort(strconv.Itoa(port))
SSTs.Unlock()
}()

Expand Down Expand Up @@ -361,3 +363,18 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) {
}
cluster.LogPrintf(LvlInfo, "Backup has been sent via SSL , closing connection!")
}

func (cluster *Cluster) SSTGetSenderPort() string {
port := "0"
if cluster.Conf.SchedulerSenderPorts != "" {
for k, v := range cluster.SstAvailablePorts {
delete(cluster.SstAvailablePorts, k)
return v
}
}
return port
}

func (cluster *Cluster) SSTSenderFreePort(port string) {
cluster.SstAvailablePorts[port] = port
}
5 changes: 1 addition & 4 deletions cluster/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ type ServerMonitor struct {
PostgressDB string `json:"postgressDB"`
TLSConfigUsed string `json:"tlsConfigUsed"` //used to track TLS config during key rotation
SSTPort string `json:"sstPort"` //used to send data to dbjobs
SSTPhysicalBackupPort string `json:"sstPhysicalBackupPort"`
SSTLogErrorPort string `json:"sstLogErrorPort"`
SSTSlowQueryPort string `json:"sstSlowQueryPort"`
Agent string `json:"agent"` //used to provision service in orchestrator
Agent string `json:"agent"` //used to provision service in orchestrator
BinaryLogFiles map[string]uint `json:"binaryLogFiles"`
MaxSlowQueryTimestamp int64 `json:"maxSlowQueryTimestamp"`
IsInSlowQueryCapture bool
Expand Down
6 changes: 5 additions & 1 deletion cluster/srv_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ func (server *ServerMonitor) JobBackupPhysical() (int64, error) {
return jobid, err
} else {
*/

port, err := server.ClusterGroup.SSTRunReceiverToFile(server.GetMyBackupDirectory()+server.ClusterGroup.Conf.BackupPhysicalType+".xbtream", ConstJobCreateFile)
if err != nil {
return 0, nil
}

jobid, err := server.JobInsertTaks(server.ClusterGroup.Conf.BackupPhysicalType, port, server.ClusterGroup.Conf.MonitorAddress)

return jobid, err
Expand Down Expand Up @@ -954,7 +956,9 @@ func (server *ServerMonitor) JobRunViaSSH() error {
}
out := stdout.String()

server.ClusterGroup.LogPrintf(LvlInfo, "Job run via ssh script: %s ,out: %s ,err: %s", scriptpath, out, stderr.String())
if server.GetCluster().Conf.LogSST {
server.ClusterGroup.LogPrintf(LvlInfo, "Job run via ssh script: %s ,out: %s ,err: %s", scriptpath, out, stderr.String())
}

res := new(JobResult)
val := reflect.ValueOf(res).Elem()
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ type Config struct {
ConfigFile string `mapstructure:"config" toml:"-" json:"-"`
MonitorScheduler bool `mapstructure:"monitoring-scheduler" toml:"monitoring-scheduler" json:"monitoringScheduler"`
SchedulerReceiverPorts string `mapstructure:"scheduler-db-servers-receiver-ports" toml:"scheduler-db-servers-receiver-ports" json:"schedulerDbServersReceiverPorts"`
SchedulerSenderPorts string `mapstructure:"scheduler-db-servers-sender-ports" toml:"scheduler-db-servers-sender-ports" json:"schedulerDbServersSenderPorts"`
SchedulerReceiverUseSSL bool `mapstructure:"scheduler-db-servers-receiver-use-ssl" toml:"scheduler-db-servers-receiver-use-ssl" json:"schedulerDbServersReceiverUseSSL"`
SchedulerBackupLogical bool `mapstructure:"scheduler-db-servers-logical-backup" toml:"scheduler-db-servers-logical-backup" json:"schedulerDbServersLogicalBackup"`
SchedulerBackupPhysical bool `mapstructure:"scheduler-db-servers-physical-backup" toml:"scheduler-db-servers-physical-backup" json:"schedulerDbServersPhysicalBackup"`
Expand Down
3 changes: 2 additions & 1 deletion etc/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ prov-orchestrator = "onpremise"
onpremise-ssh = true
onpremise-ssh-credential = "emma:"
scheduler-jobs-ssh = true
monitoring-address = "192.168.1.136"
monitoring-address = "127.0.0.1"
monitoring-scheduler = true
scheduler-db-servers-logs = true
scheduler-db-servers-logs-cron = "0 * * * * *"
Expand All @@ -14,6 +14,7 @@ backup-physical-type = "mariabackup"
scheduler-db-servers-optimize = false
scheduler-db-servers-logical-backup = false
scheduler-db-servers-logs-table-rotate = false
scheduler-db-servers-sender-ports="4445,4446,4447,4448,4449"

#mariadb
prov-db-binary-basedir= "/usr/sbin"
Expand Down
7 changes: 1 addition & 6 deletions server/server_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
package server

import (
"fmt"

"github.com/signal18/replication-manager/config"
)

func (repman *ReplicationManager) AddCluster(clusterName string, clusterHead string) error {
fmt.Printf("COUCOU 1:\n")
var myconf = make(map[string]config.Config)
myconf[clusterName] = repman.Conf
repman.Lock()
Expand All @@ -28,19 +25,17 @@ func (repman *ReplicationManager) AddCluster(clusterName string, clusterHead str
repman.DynamicFlagMaps[clusterName] = repman.DynamicFlagMaps["default"]

repman.Unlock()
fmt.Printf("COUCOU 2:\n")
//confs[clusterName] = repman.GetClusterConfig(fistRead, repman.ImmuableFlagMaps["default"], repman.DynamicFlagMaps["default"], clusterName, conf)

cluster, _ := repman.StartCluster(clusterName)
//fmt.Printf("ADD CLUSTER def map :\n")
//fmt.Printf("%s\n", repman.ImmuableFlagMaps)
//cluster.Conf.PrintConf()
fmt.Printf("COUCOU 3:\n")

cluster.SetClusterHead(clusterHead)
//cluster.SetClusterHead(clusterName)
cluster.SetClusterList(repman.Clusters)
cluster.Save()
fmt.Printf("COUCOU 4:\n")
return nil

}
2 changes: 2 additions & 0 deletions server/server_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,13 @@ func init() {
}

monitorCmd.Flags().StringVar(&conf.SchedulerReceiverPorts, "scheduler-db-servers-receiver-ports", "4444", "Scheduler TCP port to send data to db node, if list port affection is modulo db nodes")
monitorCmd.Flags().StringVar(&conf.SchedulerSenderPorts, "scheduler-db-servers-sender-ports", "", "Scheduler TCP port to receive data from db node, consume one port per tranfert if not set, pick one available port")
monitorCmd.Flags().BoolVar(&conf.SchedulerReceiverUseSSL, "scheduler-db-servers-receiver-use-ssl", false, "Listner to send data to db node is use SSL")
monitorCmd.Flags().BoolVar(&conf.SchedulerBackupLogical, "scheduler-db-servers-logical-backup", true, "Schedule logical backup")
monitorCmd.Flags().BoolVar(&conf.SchedulerBackupPhysical, "scheduler-db-servers-physical-backup", false, "Schedule logical backup")
monitorCmd.Flags().BoolVar(&conf.SchedulerDatabaseLogs, "scheduler-db-servers-logs", false, "Schedule database logs fetching")
monitorCmd.Flags().BoolVar(&conf.SchedulerDatabaseOptimize, "scheduler-db-servers-optimize", true, "Schedule database optimize")

monitorCmd.Flags().StringVar(&conf.BackupLogicalCron, "scheduler-db-servers-logical-backup-cron", "0 0 1 * * 6", "Logical backup cron expression represents a set of times, using 6 space-separated fields.")
monitorCmd.Flags().StringVar(&conf.BackupPhysicalCron, "scheduler-db-servers-physical-backup-cron", "0 0 0 * * 0-4", "Physical backup cron expression represents a set of times, using 6 space-separated fields.")
monitorCmd.Flags().StringVar(&conf.BackupDatabaseOptimizeCron, "scheduler-db-servers-optimize-cron", "0 0 3 1 * 5", "Optimize cron expression represents a set of times, using 6 space-separated fields.")
Expand Down

0 comments on commit f60fca0

Please sign in to comment.