-
Notifications
You must be signed in to change notification settings - Fork 0
/
ConnectionHistory.py
181 lines (152 loc) · 5.83 KB
/
ConnectionHistory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import datetime as dt
from contextlib import closing
import numpy as np
import pandas as pd
import pymysql.cursors
import sqlite3
from config import CACHE_DB, WP_DB, PROD_DB
RADACCT_COLS = [
'radacctid',
'username AS mac',
'acctstarttime',
'acctstoptime',
'acctsessiontime',
'acctoutputoctets',
'acctinputoctets',
]
WP_COLS = [
'id',
'time AS acctstarttime',
'login AS mac',
'phone'
]
CONNECTIONS_COLS = [
'radacctid',
'mac',
'phone',
'acctstarttime',
'acctstoptime',
'acctsessiontime',
'acctoutputoctets',
'acctinputoctets'
]
class ConnectionCache:
def __init__(self, hotspot_name: str):
self.table = hotspot_name
def read(self, startdate=dt.date.min) -> pd.DataFrame:
self.update()
condition = f"'{startdate}' < acctstarttime"
sql = f"SELECT * FROM {self.table!r} WHERE {condition}"
with closing(sqlite3.connect(**CACHE_DB)) as c:
return pd.read_sql(sql, c, parse_dates=['acctstarttime', 'acctstoptime'])
def update(self):
radacct = self._select_new_radacct_rows()
# Temporary fix. Radius container writes entries in UTC-0,
# while WP uses the WordPress setting.
rows_after_tz_change = radacct.radacctid > 4742025
radacct.loc[rows_after_tz_change, "acctstarttime"] += pd.Timedelta(hours=3)
if radacct.empty:
reserve = 20 # in case new connections occured
self.last_row = self._get_latest_radacctid() - reserve
return
new_connections = self._merge_with_wp(radacct)
self._del_temp_connections()
self._save_new_connections(new_connections)
self._update_cache_info(new_connections)
def _select_new_radacct_rows(self) -> pd.DataFrame:
columns = ", ".join(RADACCT_COLS)
conditions = [
f"radacctid > {self.last_row}",
f"calledstationid = {repr(self.table)}"
]
conditions = " AND ".join(conditions)
table = "radius.radacct"
sql = f"SELECT {columns} FROM {table} WHERE {conditions}"
with pymysql.connect(**PROD_DB) as c:
return pd.read_sql(sql, c)
@staticmethod
def _merge_with_wp(radacct: pd.DataFrame) -> pd.DataFrame:
"""
Retrieve the phone number used to authenticate each MAC
by performing a "backward" search on connection time
in wp_proxy_entries table.
"""
wp = WpProxyEntries.read()
radacct.sort_values(by='acctstarttime', inplace=True)
wp.sort_values(by='acctstarttime', inplace=True)
connections = pd.merge_asof(radacct, wp, on='acctstarttime', by='mac')
connections = connections[CONNECTIONS_COLS]
return connections
def _del_temp_connections(self):
"""
The connections within the last 24 hours may be unfinished,
so they are cached temporarily and replaced with next update() call.
"""
if not self.last_row:
return
sql = f"DELETE FROM {self.table!r} WHERE radacctid > {self.last_row}"
with closing(sqlite3.connect(**CACHE_DB)) as c:
c.execute(sql)
def _save_new_connections(self, new_connections: pd.DataFrame):
with closing(sqlite3.connect(**CACHE_DB)) as c:
dtype = {'radacctid': 'INTEGER PRIMARY KEY'}
new_connections.to_sql(
self.table, c, if_exists="append", index=False, dtype=dtype
)
def _update_cache_info(self, new_conn: pd.DataFrame):
temp_cache = new_conn.acctstarttime > dt.datetime.now() - dt.timedelta(days=1)
last_row = new_conn[~temp_cache].radacctid.max() # -> np.int64 | np.nan
if not np.isnan(last_row):
self.last_row = int(last_row)
@staticmethod
def _get_latest_radacctid() -> int:
sql = f"SELECT MAX(radacctid) FROM radius.radacct"
with pymysql.connect(**PROD_DB) as c:
with c.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchone()[0]
@property
def last_row(self) -> int:
sql = f"SELECT last_row FROM cache_info WHERE hotspot = {self.table!r}"
with closing(sqlite3.connect(**CACHE_DB)) as c:
result = c.execute(sql).fetchone()
return result[0] if result else 0
@last_row.setter
def last_row(self, value):
upd = f"UPDATE cache_info SET last_row = ? WHERE hotspot = {self.table!r}"
ins = f"INSERT INTO cache_info VALUES ({self.table!r}, ?)"
with closing(sqlite3.connect(**CACHE_DB)) as c:
if not c.execute(upd, [value]).rowcount:
c.execute(ins, [value])
class WpProxyEntries:
@classmethod
def read(cls):
cls.update()
sql = f"SELECT MAC, acctstarttime, phone FROM proxy_entries"
with closing(sqlite3.connect(**WP_DB)) as c:
return pd.read_sql(sql, c, parse_dates='acctstarttime')
@classmethod
def update(cls):
wp = cls._select_new_wp_rows()
if wp.empty:
return
with closing(sqlite3.connect(**WP_DB)) as c:
dtype = {'id': 'INTEGER PRIMARY KEY'}
table = "proxy_entries"
wp.to_sql(table, c, if_exists="append", index=False, dtype=dtype)
@classmethod
def _select_new_wp_rows(cls) -> pd.DataFrame:
columns = ", ".join(WP_COLS)
table = "wifi_wp_base.wp_proxy_entries"
sql = f"SELECT {columns} from {table} WHERE id > {cls.last_row}"
with pymysql.connect(**PROD_DB) as c:
return pd.read_sql(sql, c)
@classmethod
@property
def last_row(cls) -> int: # noqa Pycharm
sql = f"SELECT MAX(id) FROM proxy_entries"
with closing(sqlite3.connect(**WP_DB)) as c:
try:
return c.execute(sql).fetchone()[0]
except sqlite3.OperationalError:
return 0