forked from francium/microurl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.py
130 lines (101 loc) · 3.34 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import hashlib
import json
import sys
import time
from Crypto.Hash import SHA256
import MySQLdb as sqldb
class DB_Interface:
def __init__(self):
try:
config = parse_config()
self.user = config['user']
self.passwd = config['password']
self.host = config['host']
self.db_name = config['db_name']
except FileNotFoundError as fnfe:
pass # FIXME Fail object creation (override __new__)
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def connect(self):
self.db = sqldb.connect(db=self.db_name, user=self.user,
host=self.host, passwd=self.passwd)
self.cur = self.db.cursor()
def close(self):
self.db.close()
def create(self):
sql = read_schema()
rc = self.cur.execute(sql)
return rc
def clear(self):
sql = 'delete from Micros'
rc = self.cur.execute(sql)
self.db.commit()
return rc
def clear_expired(self):
sql = 'delete from Micros where expiration >= %s'
tnow = int(time.time())
rc = self.cur.execute(sql, (tnow,))
self.db.commit()
return rc
def increment_hit(self, query):
sql = 'update Micros set hits = hits + 1 where micro_link = %s'
rc = self.cur.execute(sql, (query,))
self.db.commit()
return rc
def insert(self, micro, url, creation, expiration, public):
sql = 'insert into Micros (' \
'id, micro_link, real_link, creation, expiration, public) ' \
'values (%s, %s, %s, %s, %s, %s)'
id = SHA256.new(url.encode('latin1')).hexdigest()
rc = self.cur.execute(sql, (id, micro, url, creation, expiration, public))
self.db.commit()
return rc
def get_all(self):
sql = 'select * from Micros'
self.cur.execute(sql)
return self.cur.fetchall()
def get_top(self):
sql = '''select *
from Micros
where public = 1 and hits >= 3
order by hits desc'''
self.cur.execute(sql)
return self.cur.fetchall()
def get_recent(self):
sql = '''select *
from Micros
where public = 1
order by creation desc'''
self.cur.execute(sql)
return self.cur.fetchall()
def query(self, value):
sql = 'select * from Micros where micro_link = %s'
self.cur.execute(sql, (value,))
return self.cur.fetchone()
def parse_config():
with open('.config.json', 'r') as f:
return json.loads(f.read())
def read_schema():
with open('schema.sql', 'r') as f:
return f.read()
def yesno(msg):
return input(msg).lower() in ('yes', 'y')
def create_database():
with DB_Interface() as db:
db.create()
def clear_database():
if yesno('Clear database? Are you sure? '):
with DB_Interface() as db:
db.clear()
print('Database cleared.')
if __name__ == '__main__':
if (len(sys.argv) > 1):
if sys.argv[1] == 'create':
sys.exit(create_database())
elif sys.argv[1] == 'clear':
sys.exit(clear_database())
else:
print('{}: unknown command'.format(sys.argv[0]))