-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
346 lines (289 loc) · 8.39 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"github.com/oschwald/geoip2-golang"
"google.golang.org/protobuf/proto"
fediwatchProto "thomas-leister.de/fediwatch/fediwatchProto"
)
/*
* Const section
*/
const (
WsMessageBufferSize = 9999
)
// Version string var
var versionString string = "0.0.0"
/*
* Define ConnDir enum type (In, Out)
*/
type ConnDir int64
const (
In ConnDir = iota
Out
)
func (connDir ConnDir) String() string {
if connDir == In {
return "in"
} else if connDir == Out {
return "out"
} else {
return "(invalid)"
}
}
type Config struct {
HomeLocation []float32 `toml:"homeLocation"`
HttpPort int16 `toml:"httpPort"`
WebsocketPort int16 `toml:"websocketPort"`
RedisHost string `toml:"redisHost"`
RedisPort int16 `toml:"redisPort"`
DatabasePath string `toml:"databasePath"`
WebsocketUrl string `toml:"websocketUrl"`
}
type Coordinates struct {
Lat float32 `json:"lat"`
Long float32 `json:"long"`
}
type WebSettings struct {
HomeLocation Coordinates `json:"homeLocation"`
WebsocketUrl string `json:"websocketUrl"`
}
/*
* Global app vars
*/
var (
config Config
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
clients = make(map[*websocket.Conn]bool)
clientsMutex sync.Mutex
broadcast = make(chan []byte, WsMessageBufferSize)
quit = make(chan bool)
)
/*
* Observe websocket connections and log strange disconnects
*/
func checkIfWsClosed(conn *websocket.Conn, connClosed chan bool) {
defer func() {
// Graceful Close the Connection once this
// function is done
connClosed <- true
}()
for {
// ReadMessage is used to read the next message in queue
// in the connection
_, _, err := conn.ReadMessage()
if err != nil {
// If Connection is closed, we will receive an error here
// We only want to log Strange errors, but not simple Disconnection
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error reading message: %v", err)
}
break // Break the loop to close conn & Cleanup
}
}
}
func handleWsConnection(w http.ResponseWriter, r *http.Request) {
connClosed := make(chan bool)
// Upgrade HTTP connection to Websocket connection
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close()
// Add new client to list
clientsMutex.Lock()
clients[conn] = true
clientsMutex.Unlock()
fmt.Println("Created a new Websocket connection")
// In another Goroutine: Check if conn was closed by constantly trying to read from conn
go checkIfWsClosed(conn, connClosed)
// Wait for new message in broadcast channel or
// Close command in connClosed channel
for {
select {
case <-connClosed:
// "websocket close" received. remove conn.
fmt.Println("Received WS close!")
clientsMutex.Lock()
delete(clients, conn)
clientsMutex.Unlock()
case message := <-broadcast:
// Receive a message and distribute it across all
// websocket clients. Not send this to "this" / own WS client, but all,
// since a broadcast message is lost once it has been withdrawn from the broadcast channel.
for client := range clients {
clientsMutex.Lock()
err := client.WriteMessage(websocket.BinaryMessage, message)
clientsMutex.Unlock()
if err != nil {
log.Println(err)
return
}
}
}
}
}
func handleWebSettings(w http.ResponseWriter, r *http.Request) {
// Create a response struct with the home location
response := WebSettings{
HomeLocation: Coordinates{Lat: config.HomeLocation[0], Long: config.HomeLocation[1]},
WebsocketUrl: config.WebsocketUrl,
}
// Set the Content-Type header to application/json
w.Header().Set("Content-Type", "application/json")
// Encode the response as JSON and write it to the response writer
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
/*
* Resolve an IP address to coordinates using the GeoIP database
*/
func resolveToCoordinates(ipAddress string, databasePath string) (float64, float64, error) {
db, err := geoip2.Open(databasePath)
if err != nil {
return 0, 0, err
}
defer db.Close()
// Parse IP address
ip := net.ParseIP(ipAddress)
if ip == nil {
return 0, 0, fmt.Errorf("Invalid IP address: %s\n", ipAddress)
}
// Get IP info
record, err := db.City(ip)
if err != nil {
return 0, 0, err
}
// Extract GPS coordinates
latitude := record.Location.Latitude
longitude := record.Location.Longitude
return latitude, longitude, nil
}
/*
* Resolve an hostname to IP address(es) using a DNS lookup
*/
func resolveToIp(hostname string) (string, error) {
ips, err := net.LookupIP(hostname)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not get IPs: %v\n", err)
return "", err
}
// Select a random IP address
randomIndex := rand.Intn(len(ips))
randomIP := ips[randomIndex]
return randomIP.String(), err
}
func handleRedisMessages(redisChannel <-chan *redis.Message, connDir ConnDir) {
var protoConnDir fediwatchProto.Connection_Direction
for {
select {
case msg := <-redisChannel:
randIP, err := resolveToIp(msg.Payload)
if err != nil {
continue
}
latitude, longitude, err := resolveToCoordinates(randIP, config.DatabasePath)
if err != nil {
fmt.Printf("Failed to resolve IP address to location: %v\n", err)
continue
}
// Output on console
if connDir == In {
protoConnDir = fediwatchProto.Connection_IN
fmt.Printf("IN connection to: %s\t %s\t Latitude %.4f, Longitude %.4f\n", msg.Payload, randIP, latitude, longitude)
} else {
protoConnDir = fediwatchProto.Connection_OUT
fmt.Printf("OUT connection to: %s\t %s\t Latitude %.4f, Longitude %.4f\n", msg.Payload, randIP, latitude, longitude)
}
// Feed into websocket broadcaster
if len(broadcast) != cap(broadcast) {
myConn := &fediwatchProto.Connection{
Dir: protoConnDir,
Lat: float32(latitude),
Lng: float32(longitude),
}
broadcastBytes, err := proto.Marshal(myConn)
if err != nil {
fmt.Println("Encoding Connection object failed!")
continue
}
// Send Protobuf bytes to websocket
broadcast <- broadcastBytes
}
case <-time.After(time.Second * 5):
fmt.Printf("Waiting for new %s connection\n", connDir)
}
}
}
func startStaticFilesServer() {
fs := http.FileServer(http.Dir("./static"))
http.Handle("/", fs)
serverAddr := fmt.Sprintf("localhost:%d", config.HttpPort)
log.Printf("HTTP server listening at %s\n", serverAddr)
err := http.ListenAndServe(serverAddr, nil)
if err != nil {
log.Fatal("Failed to start the HTTP server: ", err)
}
}
func startWebsocketServer() {
http.HandleFunc("/ws", handleWsConnection)
http.HandleFunc("/settings", handleWebSettings)
serverAddr := fmt.Sprintf("localhost:%d", config.WebsocketPort)
fmt.Printf("WebSocket server is listening at %s\n", serverAddr)
err := http.ListenAndServe(serverAddr, nil)
if err != nil {
log.Fatal("Failed to start the Websocket server: ", err)
}
}
func main() {
// Greeting
fmt.Printf("Starting FediWatch %s\n", versionString)
// Read config file
f := "config.toml"
if _, err := os.Stat(f); err != nil {
log.Fatal("Could not find config file")
os.Exit(1)
}
if _, err := toml.DecodeFile(f, &config); err != nil {
log.Fatalf("Could not parse config file: %s", err)
os.Exit(1)
}
// Configure Redis connection
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", config.RedisHost, config.RedisPort),
Password: "",
DB: 0,
})
defer redisClient.Close()
ctx := context.Background()
// Create channels for inbound and outbound connections
pubsub_inbound := redisClient.Subscribe(ctx, "activitypub-inbound-hosts")
pubsub_outbound := redisClient.Subscribe(ctx, "activitypub-outbound-hosts")
// Check both Redis pubsub channels for new messages
ch_inbound := pubsub_inbound.Channel(redis.WithChannelSize(WsMessageBufferSize))
ch_outbound := pubsub_outbound.Channel(redis.WithChannelSize(WsMessageBufferSize))
fmt.Println("Waiting for Redis messages ...")
go handleRedisMessages(ch_inbound, In)
go handleRedisMessages(ch_outbound, Out)
// Start static files server and websocket server...
go startStaticFilesServer()
go startWebsocketServer()
<-quit
}