Skip to content

Commit

Permalink
Deploying to dev from @ asynkron/protoactor-go@6f33251 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed May 28, 2022
1 parent 2d8b3af commit d2268c2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 38 deletions.
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,23 @@ This command exectutes all tests in the repository except for consul integration
those tests). We also skip directories that don't contain any tests.

```
go test `go list ./... | grep -v "/_examples/" | grep -v "/persistence" | grep -v "/scheduler"`
go test `go list ./... | grep -v consul` | grep -v 'no test files'
```

If everything is ok, you will get the output:

```
ok github.com/asynkron/protoactor-go/actor 0.115s
ok github.com/asynkron/protoactor-go/eventstream 0.020s
ok github.com/asynkron/protoactor-go/internal/queue/goring 2.524s
ok github.com/asynkron/protoactor-go/internal/queue/mpsc 2.385s
ok github.com/asynkron/protoactor-go/log 0.017s
ok github.com/asynkron/protoactor-go/mailbox 2.742s
ok github.com/asynkron/protoactor-go/plugin 1.227s
ok github.com/asynkron/protoactor-go/router 1.836s
ok github.com/asynkron/protoactor-go/stream 0.017s
```

## Hello world

```go
Expand Down Expand Up @@ -397,4 +412,4 @@ Our awesome sponsors:
<img src="https://contributors-img.web.app/image?repo=asynkron/protoactor-go" />
</a>

Made with [contributors-img](https://contributors-img.web.app).
Made with [contributors-img](https://contributors-img.web.app).
12 changes: 0 additions & 12 deletions cluster/clusterproviders/zk/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,3 @@ func getRunTimeStack() []byte {
buf := make([]byte, size)
return buf[:runtime.Stack(buf, false)]
}

func getParentDir(path string) string {
parent := path[:strings.LastIndex(path, "/")]
if parent == "" {
return "/"
}
return parent
}

func joinPath(paths ...string) string {
return strings.Join(paths, "/")
}
43 changes: 26 additions & 17 deletions cluster/clusterproviders/zk/zk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"path/filepath"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -33,7 +34,6 @@ type Provider struct {
cluster *cluster.Cluster
baseKey string
clusterName string
clusterKey string
deregistered bool
shutdown bool
self *Node
Expand All @@ -57,7 +57,6 @@ func New(endpoints []string, opts ...Option) (*Provider, error) {
p := &Provider{
cluster: &cluster.Cluster{},
baseKey: zkCfg.BaseKey,
clusterKey: "",
clusterName: "",
deregistered: false,
shutdown: false,
Expand Down Expand Up @@ -99,13 +98,12 @@ func (p *Provider) init(c *cluster.Cluster) error {

p.cluster = c
p.clusterName = p.cluster.Config.Name
p.clusterKey = joinPath(p.baseKey, p.clusterName)
knownKinds := c.GetClusterKinds()
nodeName := fmt.Sprintf("%v@%v:%v", p.clusterName, host, port)
p.self = NewNode(nodeName, host, port, knownKinds)
p.self.SetMeta(metaKeyID, p.getID())

if err = p.createClusterNode(p.clusterKey); err != nil {
if err = p.createClusterNode(p.getClusterKey()); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -175,16 +173,20 @@ func (p *Provider) getID() string {
return p.self.ID
}

func (p *Provider) getClusterKey() string {
return p.buildKey(p.clusterName)
}

func (p *Provider) registerService() error {
data, err := p.self.Serialize()
if err != nil {
plog.Error("registerService Serialize fail.", log.Error(err))
return err
}

path, err := p.createEphemeralChildNode(data)
path, err := p.createEphemeralChildNode(p.getClusterKey(), data)
if err != nil {
plog.Error("createEphemeralChildNode fail.", log.String("node", p.clusterKey), log.Error(err))
plog.Error("createEphemeralChildNode fail.", log.String("node", p.getClusterKey()), log.Error(err))
return err
}
p.fullpath = path
Expand All @@ -207,7 +209,7 @@ func (p *Provider) createClusterNode(dir string) error {
if exist {
return nil
}
if err = p.createClusterNode(getParentDir(dir)); err != nil {
if err = p.createClusterNode(filepath.Dir(dir)); err != nil {
return err
}
if _, err = p.conn.Create(dir, []byte{}, 0, zk.WorldACL(zk.PermAll)); err != nil {
Expand All @@ -227,9 +229,10 @@ func (p *Provider) deregisterService() error {
}

func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error {
evtChan, err := p.addWatcher(ctx, p.clusterKey)
clusterKey := p.buildKey(p.clusterName)
evtChan, err := p.addWatcher(ctx, clusterKey)
if err != nil {
plog.Error("list children fail", log.String("node", p.clusterKey), log.Error(err))
plog.Error("list children fail", log.String("node", clusterKey), log.Error(err))
return err
}

Expand Down Expand Up @@ -302,9 +305,10 @@ func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) erro
}

func (p *Provider) clusterNotContainsSelfPath() bool {
children, _, err := p.conn.Children(p.clusterKey)
clusterKey := p.buildKey(p.clusterName)
children, _, err := p.conn.Children(clusterKey)
return err == nil && !stringContains(mapString(children, func(s string) string {
return joinPath(p.clusterKey, s)
return filepath.Join(clusterKey, s)
}), p.fullpath)
}

Expand Down Expand Up @@ -391,16 +395,21 @@ func (p *Provider) GetHealthStatus() error {
return p.clusterError
}

func (p *Provider) buildKey(names ...string) string {
return filepath.Join(append([]string{p.baseKey}, names...)...)
}

func (p *Provider) fetchNodes() ([]*Node, int32, error) {
children, stat, err := p.conn.Children(p.clusterKey)
key := p.buildKey(p.clusterName)
children, stat, err := p.conn.Children(key)
if err != nil {
plog.Error("FetchNodes fail.", log.String("node", p.clusterKey), log.Error(err))
plog.Error("FetchNodes fail.", log.String("node", key), log.Error(err))
return nil, 0, err
}

var nodes []*Node
for _, short := range children {
long := joinPath(p.clusterKey, short)
long := filepath.Join(key, short)
value, _, err := p.conn.Get(long)
if err != nil {
plog.Error("FetchNodes fail.", log.String("node", long), log.Error(err))
Expand Down Expand Up @@ -487,16 +496,16 @@ func splitHostPort(addr string) (host string, port int, err error) {
return
}

func (pro *Provider) createEphemeralChildNode(data []byte) (string, error) {
func (pro *Provider) createEphemeralChildNode(baseKey string, data []byte) (string, error) {
acl := zk.WorldACL(zk.PermAll)
prefix := joinPath(pro.clusterKey, "actor-")
prefix := fmt.Sprintf("%s/actor-", baseKey)
path := ""
var err error
for i := 0; i < 3; i++ {
path, err = pro.conn.CreateProtectedEphemeralSequential(prefix, data, acl)
if err == zk.ErrNoNode {
// Create parent node.
parts := strings.Split(pro.clusterKey, "/")
parts := strings.Split(baseKey, "/")
pth := ""
for _, p := range parts[1:] {
var exists bool
Expand Down
11 changes: 4 additions & 7 deletions cluster/clusterproviders/zk/zk_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,13 @@ func (suite *ZookeeperTestSuite) TestMultiNodes() {
helloKind := cluster.NewKind("hello", props)

name := `cluster1`
c := suite.start(name, cluster.WithKinds(helloKind))
defer c.Shutdown()
c1 := suite.start(name, cluster.WithKinds(helloKind))
defer c1.Shutdown()
c2 := suite.start(name, cluster.WithKinds(helloKind))
defer c2.Shutdown()
c1.Cluster.Get(`a1`, `hello`)
c2.Cluster.Get(`a2`, `hello`)
c.Cluster.Get(`a1`, `hello`)
c1.Cluster.Get(`a2`, `hello`)
for actorCount != 2 {
time.Sleep(time.Microsecond * 5)
}
suite.Assert().Equal(2, c1.Cluster.MemberList.Members().Len(), "Expected 2 members in the cluster")
suite.Assert().Equal(2, c2.Cluster.MemberList.Members().Len(), "Expected 2 members in the cluster")

}

0 comments on commit d2268c2

Please sign in to comment.