forked from gojek/fiber
-
Notifications
You must be signed in to change notification settings - Fork 0
/
combiner.go
74 lines (59 loc) · 1.95 KB
/
combiner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package fiber
import (
"context"
"github.com/gojek/fiber/util"
)
// Combiner is a network component, that uses BaseFanOut to dispatch incoming request
// by all of its sub-routes and then merge all the responseQueue from them into a single
// response, using provided FanIn
type Combiner struct {
BaseComponent
FanOut
fanIn FanIn
}
// NewCombiner is a factory for the Combiner type.
func NewCombiner(id string) *Combiner {
if id == "" {
id = "combiner_" + util.UID()
}
return &Combiner{
BaseComponent: BaseComponent{id: id, kind: CombinerKind},
FanOut: NewFanOut("fan_out"),
}
}
// ID is the getter for the combiner's ID
func (c *Combiner) ID() string {
return c.BaseComponent.ID()
}
// Kind is the getter for the combiner's type
func (c *Combiner) Kind() ComponentKind {
return c.BaseComponent.kind
}
// WithFanIn is a Setter for the FanIn (aggregation strategy) on the given Combiner
func (c *Combiner) WithFanIn(fanIn FanIn) *Combiner {
c.fanIn = fanIn
return c
}
// Dispatch method on the Combiner will ask its embedded dispatcher to simultaneously
// dispatch the incoming request by all of its nested components. After that, Combiner's FanIn
// listens to responseQueue and aggregate them into a single response, that is being sent to output
func (c *Combiner) Dispatch(ctx context.Context, req Request) ResponseQueue {
ctx = c.beforeDispatch(ctx, req)
out := make(chan Response, 1)
queue := NewResponseQueue(out, 1)
defer c.afterDispatch(ctx, req, queue)
go func() {
defer c.afterCompletion(ctx, req, queue)
out <- c.fanIn.Aggregate(ctx, req, c.FanOut.Dispatch(ctx, req))
close(out)
}()
return queue
}
// AddInterceptor can be used to add the given interceptor to the Combiner and optionally,
// to all its nested components.
func (c *Combiner) AddInterceptor(recursive bool, interceptor ...Interceptor) {
if recursive {
c.FanOut.AddInterceptor(recursive, interceptor...)
}
c.BaseComponent.AddInterceptor(recursive, interceptor...)
}