forked from berthubert/googerteller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
123 lines (107 loc) · 4.3 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import asyncio
import threading
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from scapy.all import sniff, IP, IPv6
from ipaddress import ip_network, ip_address
import json
app = FastAPI()
# Mount static files for serving the HTML
app.mount("/html", StaticFiles(directory="html"), name="html")
# Specific CIDR files for IPv4 and IPv6
CIDR_FILES_IPV4 = [
"goog-cloud-prefixes.txt",
"goog-prefixes.txt"
]
CIDR_FILES_IPV6 = [
"goog-cloud-prefixes6.txt",
"goog-prefixes6.txt"
]
# Store connected WebSocket clients
clients = set()
def load_cidr_ranges(directory="."):
"""Load CIDR ranges from specified IPv4 and IPv6 text files and return them with filenames."""
ipv4_networks = []
ipv6_networks = []
# Load IPv4 CIDR ranges with filenames
for filename in CIDR_FILES_IPV4:
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"): # Ignore empty lines and comments
ipv4_networks.append((ip_network(line), filename))
else:
print(f"Warning: {filename} not found in {directory}")
# Load IPv6 CIDR ranges with filenames
for filename in CIDR_FILES_IPV6:
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"): # Ignore empty lines and comments
ipv6_networks.append((ip_network(line), filename))
else:
print(f"Warning: {filename} not found in {directory}")
return ipv4_networks, ipv6_networks
def is_google_ip(ip, ipv4_networks, ipv6_networks):
"""Check if an IP address belongs to the loaded networks and return the filename if matched."""
ip_addr = ip_address(ip)
if ip_addr.version == 4:
for network, filename in ipv4_networks:
if ip_addr in network:
return filename
elif ip_addr.version == 6:
for network, filename in ipv6_networks:
if ip_addr in network:
return filename
return None
async def notify_clients(message):
"""Send a message to all connected WebSocket clients."""
for client in clients:
try:
await client.send_text(message)
except Exception as e:
print(f"Error sending message to client: {e}")
clients.remove(client)
def packet_sniffer(ipv4_networks, ipv6_networks):
"""Sniff network packets and notify clients on Google IP traffic."""
def process_packet(packet):
if IP in packet:
src_ip = packet[IP].src
filename = is_google_ip(src_ip, ipv4_networks, ipv6_networks)
if filename:
print(f"Detected Google IPv4 IP: {src_ip} in {filename}")
message = json.dumps({"ip": src_ip, "file": filename})
asyncio.run(notify_clients(message))
elif IPv6 in packet:
src_ip = packet[IPv6].src
filename = is_google_ip(src_ip, ipv4_networks, ipv6_networks)
if filename:
print(f"Detected Google IPv6 IP: {src_ip} in {filename}")
message = json.dumps({"ip": src_ip, "file": filename})
asyncio.run(notify_clients(message))
sniff(filter="ip or ip6", prn=process_packet)
@app.get("/")
async def get_html():
"""Serve the HTML page."""
return HTMLResponse(open("html/index.html").read())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Handle WebSocket connections."""
await websocket.accept()
clients.add(websocket)
try:
while True:
await websocket.receive_text() # Keep the connection open
except Exception as e:
print(f"Client disconnected: {e}")
finally:
clients.remove(websocket)
# Start the packet sniffer in a separate thread
ipv4_networks, ipv6_networks = load_cidr_ranges() # Load CIDR ranges from files at startup
threading.Thread(target=packet_sniffer, args=(ipv4_networks, ipv6_networks), daemon=True).start()