-
Notifications
You must be signed in to change notification settings - Fork 0
/
emitter.go
114 lines (89 loc) · 2.97 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package event_emitter
import (
"log/slog"
"os"
"reflect"
"sync"
)
// Emitter is the default global instance of EventEmitter.
var Emitter = NewEventEmitter("global")
// EventEmitter represents an event emitter.
type EventEmitter struct {
subscribers map[reflect.Type][]func(event interface{}) // Map of event types to subscriber functions
mutex sync.RWMutex
loggerLevel *slog.LevelVar
logger *slog.Logger
}
func (e *EventEmitter) Debug() {
e.loggerLevel.Set(slog.LevelDebug)
}
// NewEventEmitter creates a new instance of EventEmitter.
func NewEventEmitter(name string, loggerOptionalParam ...*slog.Logger) *EventEmitter {
var (
loggerLevel *slog.LevelVar
logger *slog.Logger
)
if len(loggerOptionalParam) == 0 {
loggerLevel = &slog.LevelVar{}
logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: loggerLevel,
})).With("service", "event-emitter").With("name", name)
} else {
logger = loggerOptionalParam[0]
}
return &EventEmitter{
subscribers: make(map[reflect.Type][]func(event interface{})),
mutex: sync.RWMutex{},
loggerLevel: loggerLevel,
logger: logger,
}
}
// Subscribe subscribes to events of type T with the given subscriber function.
func Subscribe[T any](subscriber func(event T), optionalEmitter ...*EventEmitter) {
eventType := reflect.TypeOf((*T)(nil)).Elem()
emitter := getEmitter(optionalEmitter...)
if emitter.logger != nil {
emitter.logger.Debug("subscribe", "event", eventType.Name())
}
emitter.mutex.Lock()
defer emitter.mutex.Unlock()
if _, ok := emitter.subscribers[eventType]; !ok {
emitter.subscribers[eventType] = make([]func(event interface{}), 0)
}
emitter.subscribers[eventType] = append(emitter.subscribers[eventType], func(event interface{}) {
subscriber(event.(T))
})
}
// Unsubscribe unsubscribes all subscribers from events of type T.
func Unsubscribe[T any](event T, optionalEmitter ...*EventEmitter) {
eventType := reflect.TypeOf((*T)(nil)).Elem()
emitter := getEmitter(optionalEmitter...)
if emitter.logger != nil {
emitter.logger.Debug("unsubscribe", "event", eventType.Name())
}
emitter.mutex.Lock()
defer emitter.mutex.Unlock()
emitter.subscribers[eventType] = make([]func(event interface{}), 0)
}
// getEmitter returns the appropriate emitter instance.
func getEmitter(optionalEmitterParam ...*EventEmitter) *EventEmitter {
if len(optionalEmitterParam) == 0 {
return Emitter
}
return optionalEmitterParam[0]
}
// Emit sends the event to all subscribers of the appropriate type.
func Emit(event interface{}, optionalEmitterParam ...*EventEmitter) {
eventType := reflect.TypeOf(event)
emitter := getEmitter(optionalEmitterParam...)
emitter.mutex.RLock()
defer emitter.mutex.RUnlock()
if emitter.logger != nil {
emitter.logger.Debug("emit", "event", eventType.Name(), "subscribers", len(emitter.subscribers[eventType]))
}
if subscribers, ok := emitter.subscribers[eventType]; ok {
for _, subscriber := range subscribers {
subscriber(event)
}
}
}