Skip to content

Commit

Permalink
Add weight to proxy backend for janitor
Browse files Browse the repository at this point in the history
  • Loading branch information
svaroqui committed Jun 2, 2023
1 parent a848575 commit 4226940
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 286 deletions.
2 changes: 2 additions & 0 deletions cluster/prx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Proxy struct {
Variables map[string]string `json:"-"`
ServiceName string `json:"serviceName"`
Agent string `json:"agent"`
Weight string `json:"weight"`
}

type DatabaseProxy interface {
Expand Down Expand Up @@ -110,6 +111,7 @@ type DatabaseProxy interface {
IsFilterInTags(filter string) bool
IsDown() bool
GetProxyConfig() string
GetJanitorWeight() string
// GetInitContainer(collector opensvc.Collector) string
GetBindAddress() string
GetBindAddressExtraIPV6() string
Expand Down
13 changes: 11 additions & 2 deletions cluster/prx_consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ import (
"github.com/micro/go-micro/registry"
"github.com/signal18/replication-manager/config"
"github.com/signal18/replication-manager/utils/misc"
"github.com/spf13/pflag"
)

type ConsulProxy struct {
Proxy
}

func (proxy *ConsulProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
flags.BoolVar(&conf.RegistryConsul, "registry-consul", false, "Register write and read SRV DNS to consul")
flags.StringVar(&conf.RegistryConsulCredential, "registry-consul-credential", ":", "Consul credential user:password")
flags.StringVar(&conf.RegistryConsulToken, "registry-consul-token", "", "Consul Token")
flags.StringVar(&conf.RegistryConsulHosts, "registry-servers", "127.0.0.1", "Comma-separated list of registry addresses")
flags.StringVar(&conf.RegistryConsulJanitorWeights, "registry-consul-weights", "100", "Weight of each proxysql host inside janitor proxy")
}

func NewConsulProxy(placement int, cluster *Cluster, proxyHost string) *ConsulProxy {
conf := cluster.Conf
prx := new(ConsulProxy)
Expand All @@ -36,7 +45,7 @@ func NewConsulProxy(placement int, cluster *Cluster, proxyHost string) *ConsulPr
prx.WritePort, _ = strconv.Atoi(conf.ProxysqlPort)
prx.ReadPort, _ = strconv.Atoi(conf.ProxysqlPort)

prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6, conf.RegistryConsulJanitorWeights)

if conf.ProvNetCNI {
if conf.ClusterHead == "" {
Expand All @@ -58,7 +67,7 @@ func (proxy *ConsulProxy) Init() {
if cluster.Conf.RegistryConsul == false || cluster.IsActive() == false {
return
}
opt.Addrs = strings.Split(cluster.Conf.RegistryHosts, ",")
opt.Addrs = strings.Split(cluster.Conf.RegistryConsulHosts, ",")
//DefaultRegistry()
//opt := registry.DefaultRegistry
reg := registry.NewRegistry()
Expand Down
4 changes: 4 additions & 0 deletions cluster/prx_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (prx *Proxy) GetClusterConnection() (*sqlx.DB, error) {

}

func (proxy *Proxy) GetJanitorWeight() string {
return proxy.Weight
}

func (proxy *Proxy) GetProxyConfig() string {
proxy.ClusterGroup.LogPrintf(LvlInfo, "Proxy Config generation "+proxy.Datadir+"/config.tar.gz")
err := proxy.ClusterGroup.Configurator.GenerateProxyConfig(proxy.Datadir, proxy.ClusterGroup.Conf.WorkingDir+"/"+proxy.ClusterGroup.Name, proxy.GetEnv())
Expand Down
3 changes: 2 additions & 1 deletion cluster/prx_haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type HaproxyProxy struct {
func NewHaproxyProxy(placement int, cluster *Cluster, proxyHost string) *HaproxyProxy {
conf := cluster.Conf
prx := new(HaproxyProxy)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSHaProxyPartitions, conf.HaproxyHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSHaProxyPartitions, conf.HaproxyHostsIPV6, conf.HaproxyJanitorWeights)
prx.Type = config.ConstProxyHaproxy
prx.Port = strconv.Itoa(conf.HaproxyAPIPort)
prx.ReadPort = conf.HaproxyReadPort
Expand All @@ -55,6 +55,7 @@ func (proxy *HaproxyProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
flags.StringVar(&conf.HaproxyUser, "haproxy-user", "admin", "Haproxy API user")
flags.StringVar(&conf.HaproxyPassword, "haproxy-password", "admin", "Haproxy API password")
flags.StringVar(&conf.HaproxyHosts, "haproxy-servers", "127.0.0.1", "HaProxy hosts")
flags.StringVar(&conf.HaproxyJanitorWeights, "haproxy-janitor-weights", "100", "Weight of each HaProxy host inside janitor proxy")
flags.IntVar(&conf.HaproxyAPIPort, "haproxy-api-port", 1999, "HaProxy runtime api port")
flags.IntVar(&conf.HaproxyWritePort, "haproxy-write-port", 3306, "HaProxy read-write port to leader")
flags.IntVar(&conf.HaproxyReadPort, "haproxy-read-port", 3307, "HaProxy load balance read port to all nodes")
Expand Down
4 changes: 4 additions & 0 deletions cluster/prx_janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (proxy *ProxyJanitor) Connect() (proxysql.ProxySQL, error) {
Host: proxy.Host,
Port: proxy.Port,
WriterHG: "0",
Weight: "1",
}
var err error
err = psql.Connect()
Expand Down Expand Up @@ -137,6 +138,7 @@ func (proxy *ProxyJanitor) Init() {
}
} else {
//weight string, max_replication_lag string, max_connections string, compression string
psql.Weight = s.GetJanitorWeight()
err = psql.AddServerAsWriter(misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), proxy.UseSSL())

if cluster.Conf.LogLevel > 2 || cluster.Conf.ProxysqlDebug {
Expand Down Expand Up @@ -220,6 +222,7 @@ func (proxy *ProxyJanitor) Refresh() error {
}

if err != nil {
// cluster.LogPrintf(LvlErr, "Backend %s:%s not found error:%s ", misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), err)
isFoundBackendWrite = false
} else {
proxy.BackendsWrite = append(proxy.BackendsWrite, bke)
Expand All @@ -243,6 +246,7 @@ func (proxy *ProxyJanitor) Refresh() error {
}
} else {
//scenario restart with failed leader
psql.Weight = s.GetJanitorWeight()
err = psql.AddServerAsWriter(misc.Unbracket(s.GetHost()), strconv.Itoa(s.GetWritePort()), proxy.UseSSL())
}
updated = true
Expand Down
3 changes: 2 additions & 1 deletion cluster/prx_mariadbshardproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MariadbShardProxy struct {
func NewMariadbShardProxy(placement int, cluster *Cluster, proxyHost string) *MariadbShardProxy {
conf := cluster.Conf
prx := new(MariadbShardProxy)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSShardProxyPartitions, conf.MdbsHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSShardProxyPartitions, conf.MdbsHostsIPV6, conf.MdbsJanitorWeights)
prx.Type = config.ConstProxySpider
prx.Host, prx.Port = misc.SplitHostPort(proxyHost)
prx.User, prx.Pass = misc.SplitPair(cluster.Conf.GetDecryptedValue("shardproxy-credential"))
Expand Down Expand Up @@ -70,6 +70,7 @@ func NewMariadbShardProxy(placement int, cluster *Cluster, proxyHost string) *Ma
func (proxy *MariadbShardProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
flags.BoolVar(&conf.MdbsProxyOn, "shardproxy", false, "MariaDB Spider proxy")
flags.StringVar(&conf.MdbsProxyHosts, "shardproxy-servers", "127.0.0.1:3307", "MariaDB spider proxy hosts IP:Port,IP:Port")
flags.StringVar(&conf.MdbsJanitorWeights, "shardproxy-janitor-weights", "100", "Weight of each MariaDB spider host inside janitor proxy")
flags.StringVar(&conf.MdbsProxyCredential, "shardproxy-credential", "root:mariadb", "MariaDB spider proxy credential")
flags.BoolVar(&conf.MdbsProxyCopyGrants, "shardproxy-copy-grants", true, "Copy grants from shards master")
flags.BoolVar(&conf.MdbsProxyLoadSystem, "shardproxy-load-system", true, "Load Spider system tables")
Expand Down
3 changes: 2 additions & 1 deletion cluster/prx_maxscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewMaxscaleProxy(placement int, cluster *Cluster, proxyHost string) *Maxsca
conf := cluster.Conf
prx := new(MaxscaleProxy)
prx.Type = config.ConstProxyMaxscale
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSMaxscalePartitions, conf.MxsHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSMaxscalePartitions, conf.MxsHostsIPV6, conf.MxsJanitorWeights)
prx.Port = conf.MxsPort
prx.User = conf.MxsUser
prx.Pass = conf.MxsPass
Expand All @@ -58,6 +58,7 @@ func (proxy *MaxscaleProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config)
flags.BoolVar(&conf.MxsDisableMonitor, "maxscale-disable-monitor", false, "Disable maxscale monitoring and fully drive server state")
flags.StringVar(&conf.MxsGetInfoMethod, "maxscale-get-info-method", "maxadmin", "How to get infos from Maxscale maxinfo|maxadmin")
flags.StringVar(&conf.MxsHost, "maxscale-servers", "", "MaxScale hosts ")
flags.StringVar(&conf.MxsJanitorWeights, "maxscale-janitor-weights", "100", "Weight of each MariaDB maxscale inside janitor proxy")
flags.StringVar(&conf.MxsPort, "maxscale-port", "6603", "MaxScale admin port")
flags.StringVar(&conf.MxsUser, "maxscale-user", "admin", "MaxScale admin user")
flags.StringVar(&conf.MxsPass, "maxscale-pass", "mariadb", "MaxScale admin password")
Expand Down
4 changes: 3 additions & 1 deletion cluster/prx_proxysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewProxySQLProxy(placement int, cluster *Cluster, proxyHost string) *ProxyS
prx.WritePort, _ = strconv.Atoi(conf.ProxysqlPort)
prx.ReadPort, _ = strconv.Atoi(conf.ProxysqlPort)

prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSProxySQLPartitions, conf.ProxysqlHostsIPV6, conf.ProxysqlJanitorWeights)

if conf.ProvNetCNI {
if conf.ClusterHead == "" {
Expand All @@ -54,6 +54,7 @@ func (proxy *ProxySQLProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config)
flags.BoolVar(&conf.ProxysqlSaveToDisk, "proxysql-save-to-disk", false, "Save proxysql change to sqllight")
flags.StringVar(&conf.ProxysqlHosts, "proxysql-servers", "", "ProxySQL hosts")
flags.StringVar(&conf.ProxysqlHostsIPV6, "proxysql-servers-ipv6", "", "ProxySQL extra IPV6 bind for interfaces")
flags.StringVar(&conf.ProxysqlJanitorWeights, "proxysql-janitor-weights", "100", "Weight of each proxysql host inside janitor proxy")
flags.StringVar(&conf.ProxysqlPort, "proxysql-port", "3306", "ProxySQL read/write proxy port")
flags.StringVar(&conf.ProxysqlAdminPort, "proxysql-admin-port", "6032", "ProxySQL admin interface port")
flags.StringVar(&conf.ProxysqlReaderHostgroup, "proxysql-reader-hostgroup", "1", "ProxySQL reader hostgroup")
Expand All @@ -77,6 +78,7 @@ func (proxy *ProxySQLProxy) Connect() (proxysql.ProxySQL, error) {
Port: proxy.Port,
WriterHG: fmt.Sprintf("%d", proxy.WriterHostgroup),
ReaderHG: fmt.Sprintf("%d", proxy.ReaderHostgroup),
Weight: proxy.Weight,
}

var err error
Expand Down
7 changes: 6 additions & 1 deletion cluster/prx_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ func (proxy *Proxy) SetServiceName(namespace string) {
proxy.ServiceName = namespace + "/svc/" + proxy.Name
}

func (proxy *Proxy) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, ProxysqlHostsIPV6 string) {
func (proxy *Proxy) SetPlacement(k int, ProvAgents string, SlapOSDBPartitions string, ProxysqlHostsIPV6 string, Weights string) {
slapospartitions := strings.Split(SlapOSDBPartitions, ",")
agents := strings.Split(ProvAgents, ",")
ipv6hosts := strings.Split(ProxysqlHostsIPV6, ",")
weights := strings.Split(Weights, ",")
if k < len(slapospartitions) {
proxy.SlapOSDatadir = slapospartitions[k]
}
if ProvAgents != "" {
proxy.Agent = agents[k%len(agents)]
}
if Weights != "" {
proxy.Weight = weights[k%len(weights)]
}

if k < len(ipv6hosts) {
proxy.HostIPV6 = ipv6hosts[k]
}
Expand Down
3 changes: 2 additions & 1 deletion cluster/prx_sphinx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type SphinxProxy struct {
func NewSphinxProxy(placement int, cluster *Cluster, proxyHost string) *SphinxProxy {
conf := cluster.Conf
prx := new(SphinxProxy)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSSphinxPartitions, conf.SphinxHostsIPV6)
prx.SetPlacement(placement, conf.ProvProxAgents, conf.SlapOSSphinxPartitions, conf.SphinxHostsIPV6, conf.SphinxJanitorWeights)
prx.Type = config.ConstProxySphinx

prx.Port = conf.SphinxQLPort
Expand All @@ -49,6 +49,7 @@ func NewSphinxProxy(placement int, cluster *Cluster, proxyHost string) *SphinxPr
func (proxy *SphinxProxy) AddFlags(flags *pflag.FlagSet, conf *config.Config) {
flags.BoolVar(&conf.SphinxOn, "sphinx", false, "Turn on SphinxSearch detection")
flags.StringVar(&conf.SphinxHosts, "sphinx-servers", "127.0.0.1", "SphinxSearch hosts")
flags.StringVar(&conf.SphinxJanitorWeights, "sphinx-janitor-weights", "100", "Weight of each Sphinx host inside janitor proxy")
flags.StringVar(&conf.SphinxPort, "sphinx-port", "9312", "SphinxSearch API port")
flags.StringVar(&conf.SphinxQLPort, "sphinx-sql-port", "9306", "SphinxSearch SQL port")
if runtime.GOOS == "linux" {
Expand Down
Loading

0 comments on commit 4226940

Please sign in to comment.