Skip to content

Commit

Permalink
Update to go-ma-actor v0.3.0
Browse files Browse the repository at this point in the history
Also make room for having a room here directly, not
just a node.
  • Loading branch information
bahner committed Mar 17, 2024
1 parent c0bb43c commit 2c04540
Show file tree
Hide file tree
Showing 21 changed files with 447 additions and 251 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ go.work
# Binaries created by go build ., ie the module name.
go-space
.*.log
go-space-node
launch.json
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
NAME = go-space
MODULE_NAME = github.com/bahner/go-space
VAULT_TOKEN ?= space
NODE = $(NAME)-node

GO ?= go
PREFIX ?= /usr/local
Expand All @@ -22,11 +23,13 @@ go.mod:
tidy: go.mod
$(GO) mod tidy

$(NAME): tidy
$(GO) build -o $(NAME)
node: $(NODE)

$(NODE): tidy
$(GO) build -o $(NODE) ./cmd/node

clean:
rm -f $(NAME)
rm -f $(NAME) $(NODE)

console:
docker-compose up -d
Expand Down
40 changes: 40 additions & 0 deletions cmd/node/actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/entity"
"github.com/bahner/go-ma-actor/entity/actor"
log "github.com/sirupsen/logrus"
)

func initActorOrPanic() *actor.Actor {
// The actor is needed for initialisation of the WebHandler.
fmt.Println("Creating actor from keyset...")
a, err := actor.NewFromKeyset(config.ActorKeyset())
if err != nil {
log.Debugf("error creating actor: %s", err)
}

id := a.Entity.DID.Id

fmt.Println("Creating and setting DID Document for actor...")
err = a.CreateAndSetDocument(id)
if err != nil {
panic(fmt.Sprintf("error creating document: %s", err))
}

// Better safe than sorry.
// Without a valid actor, we can't do anything.
if a == nil || a.Verify() != nil {
panic(fmt.Sprintf("%s is not a valid actor: %v", id, err))
}

_, err = entity.GetOrCreateFromDID(a.Entity.DID, false)
if err != nil {
panic(fmt.Sprintf("error getting or creating entity: %s", err))
}

return a
}
File renamed without changes.
File renamed without changes.
31 changes: 31 additions & 0 deletions cmd/node/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"fmt"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/p2p/connmgr"
"github.com/bahner/go-ma-actor/p2p/node"
"github.com/libp2p/go-libp2p"
)

func DHT(cg *connmgr.ConnectionGater) (*p2p.DHT, error) {

// THese are the relay specific parts.
p2pOpts := []libp2p.Option{
libp2p.ConnectionGater(cg),
}

n, err := node.New(config.NodeIdentity(), p2pOpts...)
if err != nil {
return nil, fmt.Errorf("pong: failed to create libp2p node: %w", err)
}

d, err := p2p.NewDHT(n, cg)
if err != nil {
return nil, fmt.Errorf("pong: failed to create DHT: %w", err)
}

return d, nil
}
14 changes: 7 additions & 7 deletions entity.go → cmd/node/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"sync"

"github.com/bahner/go-ma-actor/entity"
"github.com/bahner/go-ma-actor/entity/actor"
"github.com/bahner/go-ma/did/doc"
log "github.com/sirupsen/logrus"
)
Expand All @@ -21,12 +21,12 @@ func init() {

// GetOrCreateEntity returns an entity from the cache or creates a new one
// The id is just the uniqgue name of the calling entity, not the full DID
func getOrCreateEntity(id string) (*entity.Entity, error) {
func getOrCreateEntity(id string) (*actor.Actor, error) {

// Attempt to retrieve the entity from cache.
// This is runtime, so entities will be generated at least once.
if cachedEntity, ok := entities.Get(id); ok {
if entity, ok := cachedEntity.(*entity.Entity); ok {
if entity, ok := cachedEntity.(*actor.Actor); ok {
log.Debugf("found topic: %s in entities cache.", id)
return entity, nil // Successfully type-asserted and returned
}
Expand All @@ -39,21 +39,21 @@ func getOrCreateEntity(id string) (*entity.Entity, error) {
return nil, fmt.Errorf("failed to get or create keyset: %w", err)
}

// Assuming entity.NewFromKeyset returns *entity.Entity
e, err := entity.NewFromKeyset(k, id)
// Assuming entity.NewFromKeyset returns *actor.Actor
e, err := actor.NewFromKeyset(k)
if err != nil {
return nil, fmt.Errorf("failed to create entity: %w", err)
}

err = e.CreateDocument(e.DID.String())
e.Entity.Doc, err = e.CreateDocument(e.Entity.DID.Id)
if err != nil {
return nil, fmt.Errorf("failed to create DID Document: %w", err)
}

// Force publication of document.
o := doc.DefaultPublishOptions()
o.Force = true
e.Doc.Publish(o)
e.Entity.Doc.Publish(o)

// Cache the newly created entity for future retrievals
entities.Set(id, e)
Expand Down
16 changes: 9 additions & 7 deletions envelopes.go → cmd/node/envelopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ import (

func (s *Subscription) handleEnvelopesLoop(ctx context.Context) {

log.Debugf("Starting subscription envelope handling loop for topic: %s", s.entity.Topic.String())
log.Debugf("Reading envelopes from: %v", s.entity.Envelopes)
t := s.actor.Entity.Topic.String()

log.Debugf("Starting subscription envelope handling loop for topic: %s", t)
log.Debugf("Reading envelopes from: %v", s.actor.Envelopes)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
log.Infof("Context for %s closed.", s.entity.Topic.String())
log.Infof("Context for %s closed.", t)
return
default:
envelope, ok := <-s.entity.Envelopes
envelope, ok := <-s.actor.Envelopes
if !ok {
log.Infof("subscriptionHandleEnvelopesLoop: Envelopes channel for %s closed.", s.entity.Topic.String())
log.Infof("subscriptionHandleEnvelopesLoop: Envelopes channel for %s closed.", t)
return
}
log.Debugf("subscriptionHandleEnvelopesLoop: Received envelope: %s", envelope)
msg, err := envelope.Open(s.entity.Keyset.EncryptionKey.PrivKey[:])
msg, err := envelope.Open(s.actor.Keyset.EncryptionKey.PrivKey[:])
if err != nil {
log.Errorf("subscriptionHandleEnvelopesLoop: Error opening envelope: %s", err)
continue
Expand All @@ -38,7 +40,7 @@ func (s *Subscription) handleEnvelopesLoop(ctx context.Context) {
}

log.Debugf("subscriptionHandleEnvelopesLoop: open envelope and found: %v", msg)
s.entity.Messages <- msg
s.actor.Entity.Messages <- msg
}
}
}
55 changes: 55 additions & 0 deletions cmd/node/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"errors"
"os"

"github.com/bahner/go-ma-actor/config"
"github.com/bahner/go-ma/did/doc"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
)

const name = "node"

func initConfig() {

// Always parse the flags first
config.InitCommonFlags()
config.InitActorFlags()
pflag.Parse()
config.SetProfile(name)
config.Init()

if config.GenerateFlag() {
// Reinit logging to STDOUT
log.SetOutput(os.Stdout)
log.Info("Generating new actor and node identity")
actor, node := generateActorIdentitiesOrPanic(config.Profile())
actorConfig := configTemplate(actor, node)
config.Generate(actorConfig)
os.Exit(0)
}

// At this point an actor *must* be initialized
config.InitActor()

// This flag is dependent on the actor to be initialized to make sense.
if config.ShowConfigFlag() {
config.Print()
os.Exit(0)
}

}

func generateActorIdentitiesOrPanic(name string) (string, string) {
actor, node, err := config.GenerateActorIdentities(name)
if err != nil {
if errors.Is(err, doc.ErrAlreadyPublished) {
log.Warnf("Actor document already published: %v", err)
} else {
log.Fatal(err)
}
}
return actor, node
}
31 changes: 31 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"

log "github.com/sirupsen/logrus"
)

func main() {

ctx := context.Background()

// Init config and logger
initConfig()

p, err := initP2P()
if err != nil {
log.Fatalf("Error initialising P2P: %v", err)
}

// Init of actor requires P2P to be initialized
a := initActorOrPanic()

go p.StartDiscoveryLoop(ctx)
go a.Subscribe(ctx, a.Entity)

// Start application
StartApplication(p)

select {}
}
13 changes: 8 additions & 5 deletions messages.go → cmd/node/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,24 @@ import (

func (s *Subscription) handleMessagesLoop(ctx context.Context) {

log.Debugf("Starting subscription message handling loop for topic: %s", s.entity.Topic.String())
log.Debugf("Reading messages from: %v", s.entity.Messages)
t := s.actor.Entity.Topic.String()
messages := s.actor.Entity.Messages

log.Debugf("Starting subscription message handling loop for topic: %s", t)
log.Debugf("Reading messages from: %v", messages)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
log.Infof("Context for %s closed.", s.entity.Topic.String())
log.Infof("Context for %s closed.", t)
return
default:
message, ok := <-s.entity.Messages
message, ok := <-messages
if !ok {
log.Infof("Messages channel for %s closed.", s.entity.Topic.String())
log.Infof("Messages channel for %s closed.", t)
return
}

Expand Down
File renamed without changes.
25 changes: 25 additions & 0 deletions cmd/node/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"fmt"

"github.com/bahner/go-ma-actor/p2p"
"github.com/bahner/go-ma-actor/p2p/connmgr"
)

func initP2P() (P2P *p2p.P2P, err error) {
fmt.Println("Initialising libp2p...")

// Everyone needs a connection manager.
cm, err := connmgr.Init()
if err != nil {
panic(fmt.Errorf("pong: failed to create connection manager: %w", err))
}
cg := connmgr.NewConnectionGater(cm)

d, err := DHT(cg)
if err != nil {
panic(fmt.Sprintf("failed to initialize dht: %v", err))
}
return p2p.Init(d)
}
File renamed without changes.
Loading

0 comments on commit 2c04540

Please sign in to comment.