forked from ReactiveX/RxGo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replaysubject.go
64 lines (52 loc) · 1.47 KB
/
replaysubject.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package rxgo
import (
"container/list"
"sync"
)
// ReplaySubject subject which replays the last received items to new subscribers
type ReplaySubject struct {
Subject
buffer *list.List
bufferLock sync.Mutex
maxReplayItems int
}
// NewReplaySubject creates a new replay subject
func NewReplaySubject(maxReplayItems int, opts ...Option) *ReplaySubject {
res := ReplaySubject{
Subject: *NewSubject(opts...), // subscriber must be able to received current buffer and new items
maxReplayItems: maxReplayItems,
buffer: list.New(),
bufferLock: sync.Mutex{},
}
return &res
}
// Next shadows base next function to capture the item history
func (s *ReplaySubject) Next(value interface{}) {
s.bufferLock.Lock()
defer s.bufferLock.Unlock()
// add to buffer
s.buffer.PushBack(value)
// check for max length
if s.buffer.Len() > s.maxReplayItems {
// remove oldest item at the front
s.buffer.Remove(s.buffer.Front())
}
s.Subject.Next(value)
}
// Subscribe shadows base subscribe function to replay the item history
func (s *ReplaySubject) Subscribe() (Subscription, Observable) {
s.Lock()
defer s.Unlock()
// create buffered channel to hold all current replay items
sub, obs := s.createSubscription(s.buffer.Len())
subChan := s.subscribers[sub.GetId()]
s.bufferLock.Lock()
defer s.bufferLock.Unlock()
// replay buffered items
elem := s.buffer.Front()
for elem != nil {
subChan <- Of(elem.Value)
elem = elem.Next()
}
return sub, obs
}