forked from yomorun/yomo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
source.go
135 lines (119 loc) · 3.6 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package yomo
import (
"context"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/trace"
)
// Source is responsible for sending data to yomo.
type Source interface {
// Close will close the connection to YoMo-Zipper.
Close() error
// Connect to YoMo-Zipper.
Connect() error
// Write the data to directed downstream.
Write(tag uint32, data []byte) error
// Broadcast broadcast the data to all downstream.
Broadcast(tag uint32, data []byte) error
// SetErrorHandler set the error handler function when server error occurs
SetErrorHandler(fn func(err error))
// [Experimental] SetReceiveHandler set the observe handler function
SetReceiveHandler(fn func(tag uint32, data []byte))
}
// YoMo-Source
type yomoSource struct {
name string
zipperAddr string
client *core.Client
fn func(uint32, []byte)
}
var _ Source = &yomoSource{}
// NewSource create a yomo-source
func NewSource(name, zipperAddr string, opts ...SourceOption) Source {
clientOpts := make([]core.ClientOption, len(opts))
for k, v := range opts {
clientOpts[k] = core.ClientOption(v)
}
client := core.NewClient(name, core.ClientTypeSource, clientOpts...)
return &yomoSource{
name: name,
zipperAddr: zipperAddr,
client: client,
}
}
// Close will close the connection to YoMo-Zipper.
func (s *yomoSource) Close() error {
if err := s.client.Close(); err != nil {
s.client.Logger().Error("failed to close the source", "err", err)
return err
}
s.client.Logger().Debug("the source is closed")
return nil
}
// Connect to YoMo-Zipper.
func (s *yomoSource) Connect() error {
// set backflowframe handler
s.client.SetBackflowFrameObserver(func(frm *frame.BackflowFrame) {
if s.fn != nil {
s.fn(frm.Tag, frm.Carriage)
}
})
err := s.client.Connect(context.Background(), s.zipperAddr)
return err
}
// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
return s.write(tag, data, false)
}
// SetErrorHandler set the error handler function when server error occurs
func (s *yomoSource) SetErrorHandler(fn func(err error)) {
s.client.SetErrorHandler(fn)
}
// [Experimental] SetReceiveHandler set the observe handler function
func (s *yomoSource) SetReceiveHandler(fn func(uint32, []byte)) {
s.fn = fn
s.client.Logger().Info("receive hander set for the source")
}
// Broadcast write the data to all downstreams.
func (s *yomoSource) Broadcast(tag uint32, data []byte) error {
return s.write(tag, data, true)
}
func (s *yomoSource) write(tag uint32, data []byte, broadcast bool) error {
var tid, sid string
// trace
tp := s.client.TracerProvider()
traced := false
if tp != nil {
span, err := trace.NewSpan(tp, core.StreamTypeSource.String(), s.name, "", "")
if err != nil {
s.client.Logger().Error("source trace error", "err", err)
} else {
defer span.End()
tid = span.SpanContext().TraceID().String()
sid = span.SpanContext().SpanID().String()
traced = true
}
}
if tid == "" {
s.client.Logger().Debug("source create new tid")
tid = id.TID()
}
if sid == "" {
s.client.Logger().Debug("source create new sid")
sid = id.SID()
}
s.client.Logger().Debug("source metadata", "tid", tid, "sid", sid, "broadcast", broadcast, "traced", traced)
// metadata
md, err := core.NewDefaultMetadata(s.client.ClientID(), broadcast, tid, sid, traced).Encode()
if err != nil {
return err
}
f := &frame.DataFrame{
Tag: tag,
Metadata: md,
Payload: data,
}
s.client.Logger().Debug("source write", "tag", tag, "data", data, "broadcast", broadcast)
return s.client.WriteFrame(f)
}