Skip to content

Commit

Permalink
Update to go-ma-actor v0.3.0
Browse files Browse the repository at this point in the history
ALso make room for having a room here directly, not
just a node.
  • Loading branch information
bahner committed Mar 17, 2024
1 parent 31b7625 commit 51b3757
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 233 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
NAME = go-space
MODULE_NAME = github.com/bahner/go-space
VAULT_TOKEN ?= space
NODE = $(NAME)-node

GO ?= go
PREFIX ?= /usr/local
Expand All @@ -22,11 +23,13 @@ go.mod:
tidy: go.mod
$(GO) mod tidy

$(NAME): tidy
$(GO) build -o $(NAME)
node: $(NODE)

$(NODE): tidy
$(GO) build -o $(NODE) ./cmd/node

clean:
rm -f $(NAME)
rm -f $(NAME) $(NODE)

console:
docker-compose up -d
Expand Down
File renamed without changes.
File renamed without changes.
14 changes: 7 additions & 7 deletions entity.go → cmd/node/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"sync"

"github.com/bahner/go-ma-actor/entity"
"github.com/bahner/go-ma-actor/entity/actor"
"github.com/bahner/go-ma/did/doc"
log "github.com/sirupsen/logrus"
)
Expand All @@ -21,12 +21,12 @@ func init() {

// GetOrCreateEntity returns an entity from the cache or creates a new one
// The id is just the uniqgue name of the calling entity, not the full DID
func getOrCreateEntity(id string) (*entity.Entity, error) {
func getOrCreateEntity(id string) (*actor.Actor, error) {

// Attempt to retrieve the entity from cache.
// This is runtime, so entities will be generated at least once.
if cachedEntity, ok := entities.Get(id); ok {
if entity, ok := cachedEntity.(*entity.Entity); ok {
if entity, ok := cachedEntity.(*actor.Actor); ok {
log.Debugf("found topic: %s in entities cache.", id)
return entity, nil // Successfully type-asserted and returned
}
Expand All @@ -39,21 +39,21 @@ func getOrCreateEntity(id string) (*entity.Entity, error) {
return nil, fmt.Errorf("failed to get or create keyset: %w", err)
}

// Assuming entity.NewFromKeyset returns *entity.Entity
e, err := entity.NewFromKeyset(k, id)
// Assuming entity.NewFromKeyset returns *actor.Actor
e, err := actor.NewFromKeyset(k)
if err != nil {
return nil, fmt.Errorf("failed to create entity: %w", err)
}

err = e.CreateDocument(e.DID.String())
e.Entity.Doc, err = e.CreateDocument(e.Entity.DID.Id)
if err != nil {
return nil, fmt.Errorf("failed to create DID Document: %w", err)
}

// Force publication of document.
o := doc.DefaultPublishOptions()
o.Force = true
e.Doc.Publish(o)
e.Entity.Doc.Publish(o)

// Cache the newly created entity for future retrievals
entities.Set(id, e)
Expand Down
16 changes: 9 additions & 7 deletions envelopes.go → cmd/node/envelopes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ import (

func (s *Subscription) handleEnvelopesLoop(ctx context.Context) {

log.Debugf("Starting subscription envelope handling loop for topic: %s", s.entity.Topic.String())
log.Debugf("Reading envelopes from: %v", s.entity.Envelopes)
t := s.actor.Entity.Topic.String()

log.Debugf("Starting subscription envelope handling loop for topic: %s", t)
log.Debugf("Reading envelopes from: %v", s.actor.Envelopes)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
log.Infof("Context for %s closed.", s.entity.Topic.String())
log.Infof("Context for %s closed.", t)
return
default:
envelope, ok := <-s.entity.Envelopes
envelope, ok := <-s.actor.Envelopes
if !ok {
log.Infof("subscriptionHandleEnvelopesLoop: Envelopes channel for %s closed.", s.entity.Topic.String())
log.Infof("subscriptionHandleEnvelopesLoop: Envelopes channel for %s closed.", t)
return
}
log.Debugf("subscriptionHandleEnvelopesLoop: Received envelope: %s", envelope)
msg, err := envelope.Open(s.entity.Keyset.EncryptionKey.PrivKey[:])
msg, err := envelope.Open(s.actor.Keyset.EncryptionKey.PrivKey[:])
if err != nil {
log.Errorf("subscriptionHandleEnvelopesLoop: Error opening envelope: %s", err)
continue
Expand All @@ -38,7 +40,7 @@ func (s *Subscription) handleEnvelopesLoop(ctx context.Context) {
}

log.Debugf("subscriptionHandleEnvelopesLoop: open envelope and found: %v", msg)
s.entity.Messages <- msg
s.actor.Entity.Messages <- msg
}
}
}
10 changes: 2 additions & 8 deletions main.go → cmd/node/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"context"

"github.com/bahner/go-ma-actor/p2p"
"github.com/spf13/pflag"

Expand All @@ -15,17 +13,13 @@ func main() {
// Init config and logger
pflag.Parse()
config.Init("space")
config.InitLogging()
config.InitP2P()

p, err := p2p.Init(nil)
if err != nil {
log.Fatalf("Error initialising P2P: %v", err)
}
log.Infof("My node ID is %s", p.Node.ID().String())

p.DiscoverPeers()
go p.DiscoveryLoop(context.Background())
// log.Infof("My node ID is %s", p.Node.ID().String())
log.Infof("My node ID is %s", p.Host.ID().String())

// Start application
StartApplication(p)
Expand Down
24 changes: 12 additions & 12 deletions messages.go → cmd/node/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@ package main

import (
"context"
"encoding/json"

"github.com/ergo-services/ergo/etf"
"github.com/fxamacker/cbor/v2"
log "github.com/sirupsen/logrus"
)

func (s *Subscription) handleMessagesLoop(ctx context.Context) {

log.Debugf("Starting subscription message handling loop for topic: %s", s.entity.Topic.String())
log.Debugf("Reading messages from: %v", s.entity.Messages)
t := s.actor.Entity.Topic.String()

log.Debugf("Starting subscription message handling loop for topic: %s", t)
log.Debugf("Reading messages from: %v", s.actor.Entity.Messages)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for {
select {
case <-ctx.Done():
log.Infof("Context for %s closed.", s.entity.Topic.String())
log.Infof("Context for %s closed.", t)
return
default:
message, ok := <-s.entity.Messages
message, ok := <-s.actor.Entity.Messages
if !ok {
log.Infof("Messages channel for %s closed.", s.entity.Topic.String())
log.Infof("Messages channel for %s closed.", t)
return
}

log.Debugf("subscriptionHandleMessagesLoop: received message: %v", message)

// Marshal the message and send it to the owner
msgJson, err := json.Marshal(message)
// Send message as CBOR to owner
msgBytes, err := cbor.Marshal(message)
if err != nil {
log.Errorf("Error marshaling message: %s", err)
log.Errorf("Error marshalling message: %v", err)
continue
}

// Send message as JSON to owner
s.deliverMessage(msgJson)
s.deliverMessage(msgBytes)
}
}
}
Expand Down
File renamed without changes.
File renamed without changes.
51 changes: 27 additions & 24 deletions subscription.go → cmd/node/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"fmt"
"sync"

"github.com/bahner/go-ma-actor/entity"
"github.com/bahner/go-ma/did"
"github.com/bahner/go-ma-actor/entity/actor"
"github.com/bahner/go-ma/msg"
"github.com/ergo-services/ergo/etf"
"github.com/ergo-services/ergo/gen"
Expand All @@ -26,9 +25,9 @@ var (

type Subscription struct {
gen.Server
sp *gen.ServerProcess
owner gen.ProcessID
entity *entity.Entity
sp *gen.ServerProcess
owner gen.ProcessID
actor *actor.Actor

messages chan *msg.Message
envelopes chan *msg.Envelope
Expand All @@ -41,7 +40,7 @@ func (s *Subscription) Verify() error {
if s.owner.Name == "" {
return fmt.Errorf("owner name is empty")
}
if s.entity == nil {
if s.actor == nil {
return fmt.Errorf("entity is nil")
}
if s.messages == nil {
Expand All @@ -66,22 +65,22 @@ func New(id string) gen.ServerBehavior {

log.Debugf("Creating new genServer: %s", id)

entity, err := getOrCreateEntity(id)
a, err := getOrCreateEntity(id)
if err != nil {
log.Errorf("Error getting or creating entity: %s", err)
return nil
}

log.Debugf("Created topic: %s", entity.Topic.String())
log.Debugf("Created topic: %s", a.Entity.Topic.String())

// The owner is identified by the fragment of the DID
// It's the local name ad ID of the owner of the entity
owner := createOwnerProcessId(did.GetFragment(id))
owner := createOwnerProcessId(a.Entity.DID.Fragment)
log.Debugf("Created owner process id: %s", owner)

s := &Subscription{
owner: owner,
entity: entity,
actor: a,
messages: make(chan *msg.Message, MESSAGE_CHANNEL_SIZE),
envelopes: make(chan *msg.Envelope, ENVELOPE_CHANNEL_SIZE),
}
Expand All @@ -93,19 +92,19 @@ func New(id string) gen.ServerBehavior {

func (s *Subscription) Init(sp *gen.ServerProcess, args ...etf.Term) error {

s.sp = sp // Save the server process, so we can send messages from it
t := s.actor.Entity.Topic

ctx := context.Background()
s.sp = sp // Save the server process, so we can send messages from it

log.Infof("Subscription init subscribing to topic: %s", s.entity.DID.String())
log.Infof("Subscription init subscribing to topic: %s", s.actor.Entity.DID.Id)

log.Debugf("Subscription entity: %v", s.entity)
go s.entity.Subscribe(ctx, s.entity)
log.Debugf("Subscription entity: %v", s.actor)
go t.Subscribe()
go s.subscribe()

sp.Process.Send(s.owner, etf.Tuple{
etf.Atom(":go_space_topic_subscription_created"),
etf.String(s.entity.Topic.String()),
etf.String(t.String()),
})

return nil
Expand All @@ -118,6 +117,8 @@ func (s *Subscription) HandleCast(server_procces *gen.ServerProcess, message etf

func (s *Subscription) HandleCall(serverProcess *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) {

t := s.actor.Entity.Topic

log.Debugf("Received message: %s from: %v", message, from)

action, data, err := extractActionData(message)
Expand All @@ -130,12 +131,12 @@ func (s *Subscription) HandleCall(serverProcess *gen.ServerProcess, from gen.Ser

case "publish":
log.Debugf("Received publish message: %s", data)
s.entity.Topic.Publish(s.entity.Ctx, data[0].([]byte))
t.Publish(s.actor.Entity.Ctx, data[0].([]byte))
return etf.Atom("ok"), gen.ServerStatusOK

case "list_peers":
log.Debug("Received list_peers message.")
result := s.entity.Topic.ListPeers()
result := t.ListPeers()
return result, gen.ServerStatusOK

case "get_topics":
Expand All @@ -158,19 +159,18 @@ func (s *Subscription) HandleInfo(serverProcess *gen.ServerProcess, message etf.
func (s *Subscription) Terminate(sp *gen.ServerProcess, reason string) {

// Close the topic.
s.entity.Cancel()
s.actor.Entity.Cancel()

sp.Kill()

log.Debugf("Terminating subscription: %s", reason)
}

// Takes the DID Fragment as the id for the process.
func createOwnerProcessId(id string) gen.ProcessID {

fragment := did.GetFragment(id)

return gen.ProcessID{
Name: fragment,
Name: id,
Node: viper.GetString("node.space"),
}
}
Expand All @@ -197,7 +197,10 @@ func extractActionData(term etf.Term) (etf.Atom, []etf.Term, error) {
}

func (s *Subscription) subscribe() {
log.Debugf("Starting subscription loop: %s", s.entity.Topic.String())

t := s.actor.Entity.Topic

log.Debugf("Starting subscription loop: %s", t.String())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -206,5 +209,5 @@ func (s *Subscription) subscribe() {
go s.handleEnvelopesLoop(ctx)

<-ctx.Done()
log.Infof("Context for %s closed.", s.entity.Topic.String())
log.Infof("Context for %s closed.", t.String())
}
Loading

0 comments on commit 51b3757

Please sign in to comment.