forked from 0ceanSlim/crawlr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crawl.go
159 lines (133 loc) · 3.42 KB
/
crawl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package main
import (
"encoding/json"
"fmt"
"io"
"sync"
"golang.org/x/net/websocket"
)
// ReqKind10002 sends a REQ message for kind 10002 events and parses relay URLs
func ReqKind10002(relayURL string) error {
config, err := websocket.NewConfig(relayURL, "http://localhost/")
if err != nil {
return fmt.Errorf("config error: %v", err)
}
ws, err := websocket.DialConfig(config)
if err != nil {
return fmt.Errorf("dial error: %v", err)
}
defer ws.Close()
// Send REQ message
subscriptionID := "crawlr"
req := []interface{}{
"REQ", subscriptionID, map[string]interface{}{
"kinds": []int{10002},
"limit": 100,
},
}
err = websocket.JSON.Send(ws, req)
if err != nil {
return fmt.Errorf("failed to send REQ message: %v", err)
}
// Continuously receive messages until EOSE or connection closed
for {
var msg []byte
err = websocket.Message.Receive(ws, &msg)
if err != nil {
if err == io.EOF {
return nil // Connection closed normally
}
return fmt.Errorf("receive error: %v", err)
}
fmt.Printf("Received message: %s\n", string(msg)) // Debugging
// Parse the message to check if it's an EOSE message
var response []interface{}
if err := json.Unmarshal(msg, &response); err != nil {
fmt.Printf("Error parsing message: %v\n", err)
continue
}
if len(response) > 0 && response[0] == "EOSE" {
fmt.Println("Received EOSE, stopping reception from this relay.")
break
}
if err := parseRelayList(msg); err != nil {
fmt.Printf("Error parsing relay list: %v\n", err)
}
}
return nil
}
// parseRelayList parses relay URLs from kind 10002 messages
func parseRelayList(message []byte) error {
var response []interface{}
if err := json.Unmarshal(message, &response); err != nil {
return fmt.Errorf("failed to parse message: %v", err)
}
// Ensure it's an "EVENT" message
if len(response) < 3 || response[0] != "EVENT" {
return nil
}
eventData, ok := response[2].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid event data format")
}
tags, ok := eventData["tags"].([]interface{})
if !ok {
return fmt.Errorf("invalid tags format")
}
relayURLs := make([]string, 0)
for _, tag := range tags {
tagArr, ok := tag.([]interface{})
if !ok || len(tagArr) < 2 || tagArr[0] != "r" {
continue
}
relayURL, ok := tagArr[1].(string)
if !ok {
continue
}
relayURLs = append(relayURLs, relayURL)
}
// Lock once for all relay classifications
mu.Lock()
defer mu.Unlock()
for _, relayURL := range relayURLs {
classifyRelay(relayURL)
}
return nil
}
// crawlClearOnlineRelays crawls the relays from the clearOnline list concurrently
func crawlClearOnlineRelays(concurrency int) {
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
mu.Lock()
relays := make([]string, 0, len(clearOnline))
for relay := range clearOnline {
if !crawledRelays[relay] {
relays = append(relays, relay)
}
}
mu.Unlock()
for _, relay := range relays {
wg.Add(1)
sem <- struct{}{} // Block when reaching concurrency limit
go func(r string) {
defer wg.Done()
defer func() { <-sem }() // Release semaphore
for i := 0; i < maxTries; i++ {
err := ReqKind10002(r)
if err != nil {
fmt.Printf("Failed to crawl relay %s: %v\n", r, err)
mu.Lock()
clearOffline[r] = clearOnline[r]
delete(clearOnline, r)
mu.Unlock()
} else {
mu.Lock()
crawledRelays[r] = true
mu.Unlock()
break
}
}
}(relay)
}
wg.Wait()
}