-
Notifications
You must be signed in to change notification settings - Fork 0
/
driver.go
124 lines (107 loc) · 3.33 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
cloudevent "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/v02"
"github.com/vmware/dispatch/pkg/events"
"github.com/vmware/dispatch/pkg/events/driverclient"
)
type validationEvent struct {
Data struct {
ValidationCode string `json:"validationCode"`
ValidationURL string `json:"validationUrl"`
} `json:"data"`
EventType string `json:"eventType"`
Topic string `json:"topic"`
}
type validationResponse struct {
ValidationResponse string `json:"validationResponse"`
}
// debug
var dryRun = flag.Bool("dry-run", false, "Debug, pull messages and do not send Dispatch events")
var org = flag.String("org", "default", "organization of this event driver")
var dispatchEndpoint = flag.String("dispatch-api-endpoint", "localhost:8080", "dispatch server host")
var port = flag.Int("port", 80, "Port to listen on")
var sharedSecret = flag.String("shared-secret", "", "A token or shared secret that the client should pass")
// (not required) sink will be automatically set when using with knative container source. Note: this flag will supersede
// gateway.
var sink = flag.String("sink", "", "knative sink url")
func getDriverClient() driverclient.Client {
if *dryRun {
return nil
}
token := os.Getenv(driverclient.AuthToken)
var client driverclient.Client
var err error
// If sink is set use the sink url and ignore gateway
if *sink != "" {
log.Printf("Using sink URL %s", *sink)
client, err = driverclient.NewHTTPClient(driverclient.WithURL(*sink), driverclient.WithToken(token))
} else {
client, err = driverclient.NewHTTPClient(driverclient.WithGateway(*dispatchEndpoint), driverclient.WithToken(token))
}
if err != nil {
log.Fatalf("Error when creating the events client: %s", err.Error())
}
log.Println("Event driver initialized.")
return client
}
func main() {
flag.Parse()
client := getDriverClient()
marshaller := v02.NewDefaultHTTPMarshaller()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
event, err := marshaller.FromRequest(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
v02Event, ok := event.(*v02.Event)
if !ok {
log.Printf("wrong event type: %v", v02Event)
w.WriteHeader(http.StatusBadRequest)
return
}
ceBytes, err := json.Marshal(v02Event.Data)
if err != nil {
log.Printf("failed to marshal event data: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
callbackURL := r.Header.Get("X-Callback-URL")
dispatchEvent := &events.CloudEvent{
EventType: v02Event.Type,
CloudEventsVersion: cloudevent.Version01,
ContentType: v02Event.ContentType,
EventID: v02Event.ID,
Source: v02Event.Source,
Extensions: map[string]interface{}{
"callback-url": callbackURL,
},
Data: json.RawMessage(ceBytes),
}
if client != nil {
err := client.SendOne(dispatchEvent)
if err != nil {
log.Printf("Error sending event: %v", err)
return
}
}
pretty, _ := json.MarshalIndent(dispatchEvent, "", " ")
log.Printf("Sent event successfully: %s", string(pretty))
})
// Create chan signal
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), nil))
}()
<-done
}