forked from yomorun/yomo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zipper.go
98 lines (77 loc) · 2.58 KB
/
zipper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package yomo
import (
"context"
"fmt"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/pkg/config"
"golang.org/x/exp/slog"
)
// Zipper is the orchestrator of yomo. There are two types of zipper:
// one is Upstream Zipper, which is used to connect to multiple downstream zippers,
// another one is Downstream Zipper (will call it as Zipper directly), which is used
// to connected by `Upstream Zipper`, `Source` and `Stream Function`.
type Zipper interface {
// Logger returns the logger of zipper.
Logger() *slog.Logger
// ListenAndServe start zipper as server.
ListenAndServe(context.Context, string) error
// Close will close the zipper.
Close() error
}
// RunZipper run a zipper from a config file.
func RunZipper(ctx context.Context, configPath string) error {
conf, err := config.ParseConfigFile(configPath)
if err != nil {
return err
}
// listening address.
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)
options := []ZipperOption{}
if _, ok := conf.Auth["type"]; ok {
if tokenString, ok := conf.Auth["token"]; ok {
options = append(options, WithAuth("token", tokenString))
}
}
zipper, err := NewZipper(conf.Name, conf.Functions, conf.Downstreams, options...)
if err != nil {
return err
}
zipper.Logger().Info("using config file", "file_path", configPath)
return zipper.ListenAndServe(ctx, listenAddr)
}
// NewZipper returns a zipper.
func NewZipper(name string, functions []config.Function, meshConfig map[string]config.Downstream, options ...ZipperOption) (Zipper, error) {
opts := &zipperOptions{}
for _, o := range options {
o(opts)
}
server := core.NewServer(name, opts.serverOption...)
// add downstreams to server.
for _, meshConf := range meshConfig {
addr := fmt.Sprintf("%s:%d", meshConf.Host, meshConf.Port)
clientOptions := append(
opts.clientOption,
core.WithCredential(meshConf.Credential),
core.WithNonBlockWrite(),
core.WithConnectUntilSucceed(),
)
downstream := core.NewClient(name, core.ClientTypeUpstreamZipper, clientOptions...)
server.Logger().Debug("add downstream", "downstream_addr", addr, "downstream_name", downstream.Name())
server.AddDownstreamServer(addr, downstream)
}
server.ConfigRouter(router.Default(functions))
// watch signal.
go waitSignalForShutdownServer(server)
return server, nil
}
func statsToLogger(server *core.Server) {
logger := server.Logger()
logger.Info(
"stats",
"zipper_name", server.Name(),
"connector", server.StatsFunctions(),
"downstreams", server.Downstreams(),
"data_frame_received_num", server.StatsCounter(),
)
}