-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
wigle.py
178 lines (150 loc) · 6.63 KB
/
wigle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
__author__ = 'forrest'
__version__ = '1.0.0'
__license__ = 'GPL3'
__description__ = 'This plugin automatically uploads collected wifis to wigle.net'
import os
import logging
import json
from io import StringIO
import csv
from datetime import datetime
import requests
from pwnagotchi.utils import WifiInfo, FieldNotFoundError, extract_from_pcap, StatusFile
READY = False
REPORT = StatusFile('/root/.wigle_uploads', data_format='json')
SKIP = list()
OPTIONS = dict()
def on_loaded():
global OPTIONS
global READY
if 'api_key' not in OPTIONS or ('api_key' in OPTIONS and OPTIONS['api_key'] is None):
logging.error("WIGLE: api_key isn't set. Can't upload to wigle.net")
return
READY = True
def on_internet_available(agent):
from scapy.all import Scapy_Exception
global OPTIONS
global READY
global REPORT
global SKIP
# Anti-spaghet
if not READY:
return
config = agent.config()
display = agent.view()
reported = REPORT.data_field_or('reported', default=list())
all_gps_files = [os.path.join(config['bettercap']['handshakes'], filename) for filename in os.listdir(config['bettercap']['handshakes']) if filename.endswith('.gps.json')]
new_gps_files = set(all_gps_files) - set(reported) - set(SKIP)
# Anti-spaghet
if not new_gps_files:
return
logging.info("WIGLE: Internet connectivity detected. Uploading new handshakes to wigle.net")
no_err_entries = list()
data_tuples = []
for gps_file in new_gps_files:
pcap_filename = gps_file.replace('.gps.json', '.pcap')
if not os.path.exists(pcap_filename):
logging.error("WIGLE: Can't find pcap for %s", gps_file)
SKIP.append(gps_file)
continue
try:
gps_data = _extract_gps_data(gps_file)
except OSError as os_err:
logging.error("WIGLE: %s", os_err)
SKIP.append(gps_file)
continue
except json.JSONDecodeError as json_err:
logging.error("WIGLE: %s", json_err)
SKIP.append(gps_file)
continue
if gps_data['Latitude'] == 0 and gps_data['Longitude'] == 0:
logging.warning("WIGLE: Not enough gps-information for %s. Trying again next time.", gps_file)
SKIP.append(gps_file)
continue
try:
pcap_data = extract_from_pcap(pcap_filename, [WifiInfo.BSSID, WifiInfo.ESSID, WifiInfo.ENCRYPTION, WifiInfo.CHANNEL, WifiInfo.RSSI])
except FieldNotFoundError:
logging.error("WIGLE: Could not extract all information. Skip %s", gps_file)
SKIP.append(gps_file)
continue
except Scapy_Exception as sc_e:
logging.error("WIGLE: %s", sc_e)
SKIP.append(gps_file)
continue
data_tuples.append((gps_data, pcap_data))
no_err_entries.append(gps_file)
# Anti-spaghet
if len(data_tuples) == 0:
return
display.set('status', "Uploading gps-data to wigle.net ...")
display.update(force=True)
try:
_send_to_wigle(_create_kismet_wigle_csv(data_tuples), OPTIONS['api_key'])
reported += no_err_entries
REPORT.update(data={'reported': reported})
logging.info("WIGLE: Successfully uploaded one file with %s access points.", len(no_err_entries))
# The previous exceptions here were as pointless as the one that follows:
except Exception as e:
# Ignoring valid Wigle data because we hit an exception doesn't seem like a good idea.
# We would be throwing away legit, hard-earned data for things like "requests failed because internet sucks"
logging.error("WIGLE: Encountered an exception while uploading Kismet Wigle CSV file: %s", str(e))
def _extract_gps_data(path):
"""
Extract data from gps-file
return json-obj
"""
try:
with open(path, 'r') as json_file:
return json.load(json_file)
except OSError as os_err:
raise os_err
except json.JSONDecodeError as json_err:
raise json_err
def _create_kismet_wigle_csv(data_tuples):
"""
Transform to wigle entry in file
"""
# Reference: https://github.com/kismetwireless/kismet/blob/master/log_tools/kismetdb_to_wiglecsv.cc
dummy = StringIO()
# write kismet wigle csv header
dummy.write('WigleWifi-1.4,appRelease=20190201,model=Kismet,release=2019.02.01.{},device=kismet,display=kismet,board=kismet,brand=kismet\r\n')
dummy.write('MAC,SSID,AuthMode,FirstSeen,Channel,RSSI,CurrentLatitude,CurrentLongitude,AltitudeMeters,AccuracyMeters,Type\r\n')
writer = csv.writer(dummy, delimiter=',', quoting=csv.QUOTE_NONE, escapechar='\\')
for data_tuple in data_tuples:
gps_data = data_tuple[0]
pcap_data = data_tuple[1]
writer.writerow([
pcap_data[WifiInfo.BSSID],
pcap_data[WifiInfo.ESSID],
''.join(['[{}]'.format(y) for y in pcap_data[WifiInfo.ENCRYPTION]]),
datetime.strptime(gps_data['Updated'].rsplit('.')[0],'%Y-%m-%dT%H:%M:%S').strftime('%Y-%m-%d %H:%M:%S'),
pcap_data[WifiInfo.CHANNEL],
pcap_data[WifiInfo.RSSI],
gps_data['Latitude'],
gps_data['Longitude'],
gps_data['Altitude'],
# Accuracy, in meters. According to gps.gov, the expected user range error should be below 7.8m, 95% of the time.
# A global study, summarized by gps.gov on 2016-05-11, claims actual global URE was <= 0.715m, 95% of the time.
# They also claim that a smartphone is typically accurate to within a 4.9m radius (2014-2015), though I'm not going to
# read the ~360 pages to find out if that includes positioning data based on the GSM network or just GNSS data:
# https://ion.org/publications/abstract.cfm?articleID=13079
# So I'm just gonna yolo select 5m for accuracy, until I figure out why you put a 0 here or find a way to add accuracy.
5,
'WIFI'])
return dummy.getvalue()
def _send_to_wigle(csv_data, api_key, timeout=30):
"""
Uploads the file to wigle-net
"""
dummy = StringIO()
dummy.write(csv_data)
dummy.seek(0)
headers = {'Authorization': 'Basic {}'.format(api_key), 'Accept': 'application/json'}
data = {'donate': 'false'}
payload = {'file': dummy, 'type': 'text/csv'}
try:
json_res = requests.post('https://api.wigle.net/api/v2/file/upload', data=data, headers=headers, files=payload, timeout=timeout).json()
if not json_res['success']:
raise requests.exceptions.RequestException(json_res['message'])
except requests.exceptions.RequestException as re_e:
raise re_e