From f60fca013605c6f0c2b83740683264c6aa1171c2 Mon Sep 17 00:00:00 2001 From: emma Date: Fri, 12 May 2023 10:23:50 +0200 Subject: [PATCH] fix dbJob script, add list port for sst sender --- cluster/cluster.go | 56 ++++++++++++++++++++++++++-------------- cluster/cluster_set.go | 2 +- cluster/cluster_sst.go | 25 +++++++++++++++--- cluster/srv.go | 5 +--- cluster/srv_job.go | 6 ++++- config/config.go | 1 + etc/config.toml | 3 ++- server/server_add.go | 7 +---- server/server_monitor.go | 2 ++ 9 files changed, 71 insertions(+), 36 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 00fb2c463..e70a9baae 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -195,6 +195,7 @@ type Cluster struct { errorConnectVault error `json:"-"` SqlErrorLog *logsql.Logger `json:"-"` SqlGeneralLog *logsql.Logger `json:"-"` + SstAvailablePorts map[string]string `json:"sstAvailablePorts"` sync.Mutex crcTable *crc64.Table } @@ -307,6 +308,25 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface cluster.DynamicFlagMap = dyn cluster.DefaultFlagMap = def conf := confs.ConfInit + + cluster.Conf = conf + + cluster.tlog = tlog + cluster.htlog = loghttp + cluster.termlength = termlength + cluster.Name = cfgGroup + + cluster.runUUID = runUUID + cluster.repmgrHostname = repmgrHostname + cluster.repmgrVersion = repmgrVersion + cluster.key = key + + cluster.InitFromConf() + + return nil +} + +func (cluster *Cluster) InitFromConf() { cluster.SqlErrorLog = logsql.New() cluster.SqlGeneralLog = logsql.New() cluster.crcTable = crc64.MakeTable(crc64.ECMA) // http://golang.org/pkg/hash/crc64/#pkg-constants @@ -326,36 +346,34 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface cluster.testStopCluster = true cluster.testStartCluster = true - cluster.tlog = tlog - cluster.htlog = loghttp - cluster.termlength = termlength - cluster.Name = cfgGroup - cluster.WorkingDir = conf.WorkingDir + "/" + cluster.Name - cluster.runUUID = runUUID - cluster.repmgrHostname = repmgrHostname - cluster.repmgrVersion = repmgrVersion - cluster.key = key - - if conf.Arbitration { + cluster.WorkingDir = cluster.Conf.WorkingDir + "/" + cluster.Name + if cluster.Conf.Arbitration { cluster.Status = ConstMonitorStandby } else { cluster.Status = ConstMonitorActif } cluster.benchmarkType = "sysbench" cluster.Log = s18log.NewHttpLog(200) - cluster.MonitorType = conf.GetMonitorType() - cluster.TopologyType = conf.GetTopologyType() - cluster.FSType = conf.GetFSType() - cluster.DiskType = conf.GetDiskType() - cluster.VMType = conf.GetVMType() - cluster.Grants = conf.GetGrantType() + + cluster.MonitorType = cluster.Conf.GetMonitorType() + cluster.TopologyType = cluster.Conf.GetTopologyType() + cluster.FSType = cluster.Conf.GetFSType() + cluster.DiskType = cluster.Conf.GetDiskType() + cluster.VMType = cluster.Conf.GetVMType() + cluster.Grants = cluster.Conf.GetGrantType() + cluster.QueryRules = make(map[uint32]config.QueryRule) cluster.Schedule = make(map[string]cron.Entry) cluster.JobResults = make(map[string]*JobResult) + cluster.SstAvailablePorts = make(map[string]string) + lstPort := strings.Split(cluster.Conf.SchedulerSenderPorts, ",") + for _, p := range lstPort { + cluster.SstAvailablePorts[p] = p + } // Initialize the state machine at this stage where everything is fine. cluster.StateMachine = new(state.StateMachine) cluster.StateMachine.Init() - cluster.Conf = conf + if cluster.Conf.Interactive { cluster.LogPrintf(LvlInfo, "Failover in interactive mode") } else { @@ -469,7 +487,6 @@ func (cluster *Cluster) Init(confs *config.ConfVersion, imm map[string]interface } //fmt.Printf("INIT CLUSTER CONF :\n") //cluster.Conf.PrintConf() - return nil } func (cluster *Cluster) initOrchetratorNodes() { @@ -1144,6 +1161,7 @@ func (cluster *Cluster) InitAgent(conf config.Config) { func (cluster *Cluster) ReloadConfig(conf config.Config) { cluster.Conf = conf cluster.Configurator.SetConfig(conf) + cluster.InitFromConf() cluster.StateMachine.SetFailoverState() cluster.ResetStates() diff --git a/cluster/cluster_set.go b/cluster/cluster_set.go index 203239964..ae12be69a 100644 --- a/cluster/cluster_set.go +++ b/cluster/cluster_set.go @@ -238,7 +238,7 @@ func (cluster *Cluster) SetSchedulerDbJobsSsh() { } if cluster.Conf.SchedulerJobsSSH { var err error - cluster.LogPrintf(LvlInfo, "Schedule Sla rotate at: %s", cluster.Conf.SchedulerJobsSSHCron) + cluster.LogPrintf(LvlInfo, "Schedule SshDbJob rotate at: %s", cluster.Conf.SchedulerJobsSSHCron) cluster.idSchedulerDbsjobsSsh, err = cluster.scheduler.AddFunc(cluster.Conf.SchedulerJobsSSHCron, func() { for _, s := range cluster.Servers { s.JobRunViaSSH() diff --git a/cluster/cluster_sst.go b/cluster/cluster_sst.go index 496d6e649..8a7078f5e 100644 --- a/cluster/cluster_sst.go +++ b/cluster/cluster_sst.go @@ -90,7 +90,7 @@ func (cluster *Cluster) SSTRunReceiverToRestic(filename string) (string, error) return "", err } - sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":0") + sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort()) if err != nil { cluster.LogPrintf(LvlErr, "Exiting SST on socket listen %s", err) return "", err @@ -128,16 +128,16 @@ func (cluster *Cluster) SSTRunReceiverToFile(filename string, openfile string) ( sst.outfilewriter = io.MultiWriter(writers...) - sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":0") + sst.listener, err = net.Listen("tcp", cluster.Conf.BindAddr+":"+cluster.SSTGetSenderPort()) if err != nil { cluster.LogPrintf(LvlErr, "Exiting SST on socket listen %s", err) return "", err } sst.tcplistener = sst.listener.(*net.TCPListener) - sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 120)) + sst.tcplistener.SetDeadline(time.Now().Add(time.Second * 3600)) destinationPort := sst.listener.Addr().(*net.TCPAddr).Port if sst.cluster.Conf.LogSST { - cluster.LogPrintf(LvlInfo, "Listening for SST on port %d", destinationPort) + cluster.LogPrintf(LvlInfo, "Listening for SST on port to file %d", destinationPort) } SSTs.Lock() SSTs.SSTconnections[destinationPort] = sst @@ -161,6 +161,7 @@ func (sst *SST) tcp_con_handle_to_file() { sst.listener.Close() SSTs.Lock() delete(SSTs.SSTconnections, port) + sst.cluster.SSTSenderFreePort(strconv.Itoa(port)) SSTs.Unlock() }() @@ -196,6 +197,7 @@ func (sst *SST) tcp_con_handle_to_restic() { sst.listener.Close() SSTs.Lock() delete(SSTs.SSTconnections, port) + sst.cluster.SSTSenderFreePort(strconv.Itoa(port)) SSTs.Unlock() }() @@ -361,3 +363,18 @@ func (cluster *Cluster) SSTRunSenderSSL(backupfile string, sv *ServerMonitor) { } cluster.LogPrintf(LvlInfo, "Backup has been sent via SSL , closing connection!") } + +func (cluster *Cluster) SSTGetSenderPort() string { + port := "0" + if cluster.Conf.SchedulerSenderPorts != "" { + for k, v := range cluster.SstAvailablePorts { + delete(cluster.SstAvailablePorts, k) + return v + } + } + return port +} + +func (cluster *Cluster) SSTSenderFreePort(port string) { + cluster.SstAvailablePorts[port] = port +} diff --git a/cluster/srv.go b/cluster/srv.go index 869845d27..237b5b4a0 100644 --- a/cluster/srv.go +++ b/cluster/srv.go @@ -171,10 +171,7 @@ type ServerMonitor struct { PostgressDB string `json:"postgressDB"` TLSConfigUsed string `json:"tlsConfigUsed"` //used to track TLS config during key rotation SSTPort string `json:"sstPort"` //used to send data to dbjobs - SSTPhysicalBackupPort string `json:"sstPhysicalBackupPort"` - SSTLogErrorPort string `json:"sstLogErrorPort"` - SSTSlowQueryPort string `json:"sstSlowQueryPort"` - Agent string `json:"agent"` //used to provision service in orchestrator + Agent string `json:"agent"` //used to provision service in orchestrator BinaryLogFiles map[string]uint `json:"binaryLogFiles"` MaxSlowQueryTimestamp int64 `json:"maxSlowQueryTimestamp"` IsInSlowQueryCapture bool diff --git a/cluster/srv_job.go b/cluster/srv_job.go index 30b22084b..08bf0a2de 100644 --- a/cluster/srv_job.go +++ b/cluster/srv_job.go @@ -108,10 +108,12 @@ func (server *ServerMonitor) JobBackupPhysical() (int64, error) { return jobid, err } else { */ + port, err := server.ClusterGroup.SSTRunReceiverToFile(server.GetMyBackupDirectory()+server.ClusterGroup.Conf.BackupPhysicalType+".xbtream", ConstJobCreateFile) if err != nil { return 0, nil } + jobid, err := server.JobInsertTaks(server.ClusterGroup.Conf.BackupPhysicalType, port, server.ClusterGroup.Conf.MonitorAddress) return jobid, err @@ -954,7 +956,9 @@ func (server *ServerMonitor) JobRunViaSSH() error { } out := stdout.String() - server.ClusterGroup.LogPrintf(LvlInfo, "Job run via ssh script: %s ,out: %s ,err: %s", scriptpath, out, stderr.String()) + if server.GetCluster().Conf.LogSST { + server.ClusterGroup.LogPrintf(LvlInfo, "Job run via ssh script: %s ,out: %s ,err: %s", scriptpath, out, stderr.String()) + } res := new(JobResult) val := reflect.ValueOf(res).Elem() diff --git a/config/config.go b/config/config.go index 0ce3d367c..0fc2c4840 100644 --- a/config/config.go +++ b/config/config.go @@ -469,6 +469,7 @@ type Config struct { ConfigFile string `mapstructure:"config" toml:"-" json:"-"` MonitorScheduler bool `mapstructure:"monitoring-scheduler" toml:"monitoring-scheduler" json:"monitoringScheduler"` SchedulerReceiverPorts string `mapstructure:"scheduler-db-servers-receiver-ports" toml:"scheduler-db-servers-receiver-ports" json:"schedulerDbServersReceiverPorts"` + SchedulerSenderPorts string `mapstructure:"scheduler-db-servers-sender-ports" toml:"scheduler-db-servers-sender-ports" json:"schedulerDbServersSenderPorts"` SchedulerReceiverUseSSL bool `mapstructure:"scheduler-db-servers-receiver-use-ssl" toml:"scheduler-db-servers-receiver-use-ssl" json:"schedulerDbServersReceiverUseSSL"` SchedulerBackupLogical bool `mapstructure:"scheduler-db-servers-logical-backup" toml:"scheduler-db-servers-logical-backup" json:"schedulerDbServersLogicalBackup"` SchedulerBackupPhysical bool `mapstructure:"scheduler-db-servers-physical-backup" toml:"scheduler-db-servers-physical-backup" json:"schedulerDbServersPhysicalBackup"` diff --git a/etc/config.toml b/etc/config.toml index 92302ab30..d067b75ba 100644 --- a/etc/config.toml +++ b/etc/config.toml @@ -5,7 +5,7 @@ prov-orchestrator = "onpremise" onpremise-ssh = true onpremise-ssh-credential = "emma:" scheduler-jobs-ssh = true -monitoring-address = "192.168.1.136" +monitoring-address = "127.0.0.1" monitoring-scheduler = true scheduler-db-servers-logs = true scheduler-db-servers-logs-cron = "0 * * * * *" @@ -14,6 +14,7 @@ backup-physical-type = "mariabackup" scheduler-db-servers-optimize = false scheduler-db-servers-logical-backup = false scheduler-db-servers-logs-table-rotate = false +scheduler-db-servers-sender-ports="4445,4446,4447,4448,4449" #mariadb prov-db-binary-basedir= "/usr/sbin" diff --git a/server/server_add.go b/server/server_add.go index cf0cf1d09..0307049aa 100644 --- a/server/server_add.go +++ b/server/server_add.go @@ -7,13 +7,10 @@ package server import ( - "fmt" - "github.com/signal18/replication-manager/config" ) func (repman *ReplicationManager) AddCluster(clusterName string, clusterHead string) error { - fmt.Printf("COUCOU 1:\n") var myconf = make(map[string]config.Config) myconf[clusterName] = repman.Conf repman.Lock() @@ -28,19 +25,17 @@ func (repman *ReplicationManager) AddCluster(clusterName string, clusterHead str repman.DynamicFlagMaps[clusterName] = repman.DynamicFlagMaps["default"] repman.Unlock() - fmt.Printf("COUCOU 2:\n") //confs[clusterName] = repman.GetClusterConfig(fistRead, repman.ImmuableFlagMaps["default"], repman.DynamicFlagMaps["default"], clusterName, conf) cluster, _ := repman.StartCluster(clusterName) //fmt.Printf("ADD CLUSTER def map :\n") //fmt.Printf("%s\n", repman.ImmuableFlagMaps) //cluster.Conf.PrintConf() - fmt.Printf("COUCOU 3:\n") + cluster.SetClusterHead(clusterHead) //cluster.SetClusterHead(clusterName) cluster.SetClusterList(repman.Clusters) cluster.Save() - fmt.Printf("COUCOU 4:\n") return nil } diff --git a/server/server_monitor.go b/server/server_monitor.go index 2dfd0d433..9c36b9c24 100644 --- a/server/server_monitor.go +++ b/server/server_monitor.go @@ -340,11 +340,13 @@ func init() { } monitorCmd.Flags().StringVar(&conf.SchedulerReceiverPorts, "scheduler-db-servers-receiver-ports", "4444", "Scheduler TCP port to send data to db node, if list port affection is modulo db nodes") + monitorCmd.Flags().StringVar(&conf.SchedulerSenderPorts, "scheduler-db-servers-sender-ports", "", "Scheduler TCP port to receive data from db node, consume one port per tranfert if not set, pick one available port") monitorCmd.Flags().BoolVar(&conf.SchedulerReceiverUseSSL, "scheduler-db-servers-receiver-use-ssl", false, "Listner to send data to db node is use SSL") monitorCmd.Flags().BoolVar(&conf.SchedulerBackupLogical, "scheduler-db-servers-logical-backup", true, "Schedule logical backup") monitorCmd.Flags().BoolVar(&conf.SchedulerBackupPhysical, "scheduler-db-servers-physical-backup", false, "Schedule logical backup") monitorCmd.Flags().BoolVar(&conf.SchedulerDatabaseLogs, "scheduler-db-servers-logs", false, "Schedule database logs fetching") monitorCmd.Flags().BoolVar(&conf.SchedulerDatabaseOptimize, "scheduler-db-servers-optimize", true, "Schedule database optimize") + monitorCmd.Flags().StringVar(&conf.BackupLogicalCron, "scheduler-db-servers-logical-backup-cron", "0 0 1 * * 6", "Logical backup cron expression represents a set of times, using 6 space-separated fields.") monitorCmd.Flags().StringVar(&conf.BackupPhysicalCron, "scheduler-db-servers-physical-backup-cron", "0 0 0 * * 0-4", "Physical backup cron expression represents a set of times, using 6 space-separated fields.") monitorCmd.Flags().StringVar(&conf.BackupDatabaseOptimizeCron, "scheduler-db-servers-optimize-cron", "0 0 3 1 * 5", "Optimize cron expression represents a set of times, using 6 space-separated fields.")