Skip to content

Commit

Permalink
Attempt to fix asynkron#745 (draft)
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniomacri committed Jan 5, 2023
1 parent 880b460 commit 4200c75
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 24 deletions.
81 changes: 80 additions & 1 deletion actor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
package actor

import (
"container/list"
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/asynkron/protoactor-go/log"

Expand Down Expand Up @@ -39,10 +44,46 @@ func NewMetrics(provider metric.MeterProvider) *Metrics {
return &Metrics{}
}

return &Metrics{
m := &Metrics{
metrics: metrics.NewProtoMetrics(provider),
enabled: true,
}

if instruments := m.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
m.PrepareMailboxLengthGauge()
meter := global.Meter(metrics.LibName)
if err := meter.RegisterCallback([]instrument.Asynchronous{instruments.ActorMailboxLength}, func(goCtx context.Context) {
var start = time.Now()
i := 0
deleted := 0
for _, mbsProvider := range mailboxSizeProviders {
var count int64 = 0
mbsProvider.Lock()
for e := mbsProvider.invokers.Front(); e != nil; {
if c, dead := e.Value.(mailboxSizeInvoker)(); dead {
deadElem := e
e = e.Next() // needs to be before Remove
mbsProvider.invokers.Remove(deadElem)
deleted++
} else {
count += int64(c)
e = e.Next()
}
i++
}
mbsProvider.Unlock()
instruments.ActorMailboxLength.Observe(goCtx, count, mbsProvider.labels...)
}
fmt.Printf("elapsed: %v, providers: %v, iterated: %v, deleted: %v\n",
time.Since(start), len(mailboxSizeProviders), i, deleted)

}); err != nil {
err = fmt.Errorf("failed to instrument Actor Mailbox, %w", err)
plog.Error(err.Error(), log.Error(err))
}
}

return m
}

func (m *Metrics) PrepareMailboxLengthGauge() {
Expand All @@ -66,3 +107,41 @@ func (m *Metrics) CommonLabels(ctx Context) []attribute.KeyValue {

return labels
}

type mailboxSizeInvoker func() (mailboxSize int, dead bool)

var mailboxSizeProviders = make(map[string]*mailboxSizeProvider)

type mailboxSizeProvider struct {
labels []attribute.KeyValue
invokers *list.List
sync.RWMutex
}

func registerMailboxSizeProvider(invoker mailboxSizeInvoker, labels []attribute.KeyValue) {
labelsAsString := labelsToString(labels)
var provider *mailboxSizeProvider
if p, ok := mailboxSizeProviders[labelsAsString]; ok {
provider = p
} else {
provider = &mailboxSizeProvider{labels: labels, invokers: list.New()}
mailboxSizeProviders[labelsAsString] = provider
}
provider.Lock()
provider.invokers.PushBack(invoker)
provider.Unlock()
}

func labelsToString(labels []attribute.KeyValue) string {
sb := strings.Builder{}
sort.Slice(labels, func(i, j int) bool {
return labels[i].Key < labels[j].Key
})
for _, label := range labels {
sb.WriteString(string(label.Key))
sb.WriteRune('=')
sb.WriteString(label.Value.Emit())
sb.WriteRune(',')
}
return sb.String()
}
33 changes: 10 additions & 23 deletions actor/props.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package actor

import (
"context"
"errors"
"fmt"

"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/metrics"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
)

type (
Expand All @@ -27,23 +21,6 @@ var (
ctx := newActorContext(actorSystem, props, parentContext.Self())
mb := props.produceMailbox()

// prepare the mailbox number counter
if ctx.actorSystem.Config.MetricsProvider != nil {
sysMetrics, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && sysMetrics.enabled {
if instruments := sysMetrics.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
sysMetrics.PrepareMailboxLengthGauge()
meter := global.Meter(metrics.LibName)
if err := meter.RegisterCallback([]instrument.Asynchronous{instruments.ActorMailboxLength}, func(goCtx context.Context) {
instruments.ActorMailboxLength.Observe(goCtx, int64(mb.UserMessageCount()), sysMetrics.CommonLabels(ctx)...)
}); err != nil {
err = fmt.Errorf("failed to instrument Actor Mailbox, %w", err)
plog.Error(err.Error(), log.Error(err))
}
}
}
}

dp := props.getDispatcher()
proc := NewActorProcess(mb)
pid, absent := actorSystem.ProcessRegistry.Add(proc, id)
Expand All @@ -58,6 +35,16 @@ var (
mb.PostSystemMessage(startedMessage)
mb.Start()

// prepare the mailbox number counter
if ctx.actorSystem.Config.MetricsProvider != nil {
sysMetrics, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && sysMetrics.enabled {
if instruments := sysMetrics.metrics.Get(metrics.InternalActorMetrics); instruments != nil {
registerMailboxSizeProvider(func() (int, bool) { return mb.UserMessageCount(), proc.dead != 0 }, sysMetrics.CommonLabels(ctx))
}
}
}

return pid, nil
}
defaultContextDecorator = func(ctx Context) Context {
Expand Down

0 comments on commit 4200c75

Please sign in to comment.