diff --git a/README.md b/README.md index 0fc3e08b7..eb29a54c4 100644 --- a/README.md +++ b/README.md @@ -120,23 +120,8 @@ This command exectutes all tests in the repository except for consul integration those tests). We also skip directories that don't contain any tests. ``` -go test `go list ./... | grep -v consul` | grep -v 'no test files' +go test `go list ./... | grep -v "/_examples/" | grep -v "/persistence" | grep -v "/scheduler"` ``` - -If everything is ok, you will get the output: - -``` -ok github.com/asynkron/protoactor-go/actor 0.115s -ok github.com/asynkron/protoactor-go/eventstream 0.020s -ok github.com/asynkron/protoactor-go/internal/queue/goring 2.524s -ok github.com/asynkron/protoactor-go/internal/queue/mpsc 2.385s -ok github.com/asynkron/protoactor-go/log 0.017s -ok github.com/asynkron/protoactor-go/mailbox 2.742s -ok github.com/asynkron/protoactor-go/plugin 1.227s -ok github.com/asynkron/protoactor-go/router 1.836s -ok github.com/asynkron/protoactor-go/stream 0.017s -``` - ## Hello world ```go @@ -412,4 +397,4 @@ Our awesome sponsors: -Made with [contributors-img](https://contributors-img.web.app). +Made with [contributors-img](https://contributors-img.web.app). diff --git a/cluster/clusterproviders/zk/utils.go b/cluster/clusterproviders/zk/utils.go index 8a55932f5..ea476bdf9 100644 --- a/cluster/clusterproviders/zk/utils.go +++ b/cluster/clusterproviders/zk/utils.go @@ -69,3 +69,15 @@ func getRunTimeStack() []byte { buf := make([]byte, size) return buf[:runtime.Stack(buf, false)] } + +func getParentDir(path string) string { + parent := path[:strings.LastIndex(path, "/")] + if parent == "" { + return "/" + } + return parent +} + +func joinPath(paths ...string) string { + return strings.Join(paths, "/") +} diff --git a/cluster/clusterproviders/zk/zk_provider.go b/cluster/clusterproviders/zk/zk_provider.go index dbb3492b8..6cb6eae93 100644 --- a/cluster/clusterproviders/zk/zk_provider.go +++ b/cluster/clusterproviders/zk/zk_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "path/filepath" "strconv" "strings" "time" @@ -34,6 +33,7 @@ type Provider struct { cluster *cluster.Cluster baseKey string clusterName string + clusterKey string deregistered bool shutdown bool self *Node @@ -57,6 +57,7 @@ func New(endpoints []string, opts ...Option) (*Provider, error) { p := &Provider{ cluster: &cluster.Cluster{}, baseKey: zkCfg.BaseKey, + clusterKey: "", clusterName: "", deregistered: false, shutdown: false, @@ -98,12 +99,13 @@ func (p *Provider) init(c *cluster.Cluster) error { p.cluster = c p.clusterName = p.cluster.Config.Name + p.clusterKey = joinPath(p.baseKey, p.clusterName) knownKinds := c.GetClusterKinds() nodeName := fmt.Sprintf("%v@%v:%v", p.clusterName, host, port) p.self = NewNode(nodeName, host, port, knownKinds) p.self.SetMeta(metaKeyID, p.getID()) - if err = p.createClusterNode(p.getClusterKey()); err != nil { + if err = p.createClusterNode(p.clusterKey); err != nil { return err } return nil @@ -173,10 +175,6 @@ func (p *Provider) getID() string { return p.self.ID } -func (p *Provider) getClusterKey() string { - return p.buildKey(p.clusterName) -} - func (p *Provider) registerService() error { data, err := p.self.Serialize() if err != nil { @@ -184,9 +182,9 @@ func (p *Provider) registerService() error { return err } - path, err := p.createEphemeralChildNode(p.getClusterKey(), data) + path, err := p.createEphemeralChildNode(data) if err != nil { - plog.Error("createEphemeralChildNode fail.", log.String("node", p.getClusterKey()), log.Error(err)) + plog.Error("createEphemeralChildNode fail.", log.String("node", p.clusterKey), log.Error(err)) return err } p.fullpath = path @@ -209,7 +207,7 @@ func (p *Provider) createClusterNode(dir string) error { if exist { return nil } - if err = p.createClusterNode(filepath.Dir(dir)); err != nil { + if err = p.createClusterNode(getParentDir(dir)); err != nil { return err } if _, err = p.conn.Create(dir, []byte{}, 0, zk.WorldACL(zk.PermAll)); err != nil { @@ -229,10 +227,9 @@ func (p *Provider) deregisterService() error { } func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error { - clusterKey := p.buildKey(p.clusterName) - evtChan, err := p.addWatcher(ctx, clusterKey) + evtChan, err := p.addWatcher(ctx, p.clusterKey) if err != nil { - plog.Error("list children fail", log.String("node", clusterKey), log.Error(err)) + plog.Error("list children fail", log.String("node", p.clusterKey), log.Error(err)) return err } @@ -305,10 +302,9 @@ func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) erro } func (p *Provider) clusterNotContainsSelfPath() bool { - clusterKey := p.buildKey(p.clusterName) - children, _, err := p.conn.Children(clusterKey) + children, _, err := p.conn.Children(p.clusterKey) return err == nil && !stringContains(mapString(children, func(s string) string { - return filepath.Join(clusterKey, s) + return joinPath(p.clusterKey, s) }), p.fullpath) } @@ -395,21 +391,16 @@ func (p *Provider) GetHealthStatus() error { return p.clusterError } -func (p *Provider) buildKey(names ...string) string { - return filepath.Join(append([]string{p.baseKey}, names...)...) -} - func (p *Provider) fetchNodes() ([]*Node, int32, error) { - key := p.buildKey(p.clusterName) - children, stat, err := p.conn.Children(key) + children, stat, err := p.conn.Children(p.clusterKey) if err != nil { - plog.Error("FetchNodes fail.", log.String("node", key), log.Error(err)) + plog.Error("FetchNodes fail.", log.String("node", p.clusterKey), log.Error(err)) return nil, 0, err } var nodes []*Node for _, short := range children { - long := filepath.Join(key, short) + long := joinPath(p.clusterKey, short) value, _, err := p.conn.Get(long) if err != nil { plog.Error("FetchNodes fail.", log.String("node", long), log.Error(err)) @@ -496,16 +487,16 @@ func splitHostPort(addr string) (host string, port int, err error) { return } -func (pro *Provider) createEphemeralChildNode(baseKey string, data []byte) (string, error) { +func (pro *Provider) createEphemeralChildNode(data []byte) (string, error) { acl := zk.WorldACL(zk.PermAll) - prefix := fmt.Sprintf("%s/actor-", baseKey) + prefix := joinPath(pro.clusterKey, "actor-") path := "" var err error for i := 0; i < 3; i++ { path, err = pro.conn.CreateProtectedEphemeralSequential(prefix, data, acl) if err == zk.ErrNoNode { // Create parent node. - parts := strings.Split(baseKey, "/") + parts := strings.Split(pro.clusterKey, "/") pth := "" for _, p := range parts[1:] { var exists bool diff --git a/cluster/clusterproviders/zk/zk_provider_test.go b/cluster/clusterproviders/zk/zk_provider_test.go index 749a5a1d8..ebcae4492 100644 --- a/cluster/clusterproviders/zk/zk_provider_test.go +++ b/cluster/clusterproviders/zk/zk_provider_test.go @@ -63,13 +63,16 @@ func (suite *ZookeeperTestSuite) TestMultiNodes() { helloKind := cluster.NewKind("hello", props) name := `cluster1` - c := suite.start(name, cluster.WithKinds(helloKind)) - defer c.Shutdown() c1 := suite.start(name, cluster.WithKinds(helloKind)) defer c1.Shutdown() - c.Cluster.Get(`a1`, `hello`) - c1.Cluster.Get(`a2`, `hello`) + c2 := suite.start(name, cluster.WithKinds(helloKind)) + defer c2.Shutdown() + c1.Cluster.Get(`a1`, `hello`) + c2.Cluster.Get(`a2`, `hello`) for actorCount != 2 { time.Sleep(time.Microsecond * 5) } + suite.Assert().Equal(2, c1.Cluster.MemberList.Members().Len(), "Expected 2 members in the cluster") + suite.Assert().Equal(2, c2.Cluster.MemberList.Members().Len(), "Expected 2 members in the cluster") + }