-
Notifications
You must be signed in to change notification settings - Fork 33
/
utils.py
234 lines (201 loc) · 7.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# library of useful functions
import datetime
import json
import time
import dateutil.parser
import requests
from bs4 import BeautifulSoup
from django.conf import settings
from django.core.mail import send_mail
from django.template import loader
def kgs_connect():
url = 'http://www.gokgs.com/json/access'
# If you are running this locally and want to run scraper, you should use your own
# KGS credential
if settings.DEBUG:
kgs_password = 'password' # change this for local test
else:
with open('/etc/kgs_password.txt') as f:
kgs_password = f.read().strip()
message = {
'type': 'LOGIN',
'name': 'OSR', # change this if you are testing locally
'password': kgs_password,
'locale': 'de_DE',
}
formatted_message = json.dumps(message)
for _ in range(10):
response = requests.post(url, formatted_message, timeout=10)
time.sleep(3)
if response.status_code == 200:
break
if response.status_code != 200:
return False
cookies = response.cookies
for _ in range(10):
r = requests.get(url, cookies=cookies, timeout=10)
time.sleep(3)
if r.status_code == 200:
break
if response.status_code != 200:
return False
requests.post(url, json.dumps(
{'type': 'LOGOUT'}), cookies=cookies, timeout=10)
return r
def check_byoyomi(s):
"""check if a string is a correct byo-yomi time: at least '3x30 byo-yomi'
We don't have settings for that... for now"""
if s.find('byo-yomi') == -1:
return False
else:
a = s.find('x')
n = int(s[0:a])
b = s.find(' ')
t = int(s[a + 1:b])
return n >= 3 and t >= 30
def get_byoyomi(s):
"""Parse a string '3x30 byo-yomi' and return a couple (3,30)"""
if s.find('byo-yomi') == -1:
return {'n': 0, 't': 0}
else:
a = s.find('x')
n = int(s[0:a])
b = s.find(' ')
t = int(s[a + 1:b])
return {'n': n, 't': t}
def extract_players_from_url(url):
"""get players name from a kgs archive url
'http://files.gokgs.com/games/Year/month/day/white-black-d*.sgf'
first we check wether it's a kgs archive link
otherwise we could populate with dumb data
Note: I am not proud of the way I handle error/exception where the url is not proper
Feel free to correct me here"""
if url.startswith('http://files.gokgs.com/games/'):
start = url.rfind('/') + 1
if start != 0: # if rfind returned -1, it's no good
w_end = url.find('-', start)
white = url[start:w_end]
b_end = url.find('-', w_end + 1)
# there is a -d at the end of the url (players play mutliples times)
if b_end != -1:
black = url[w_end + 1: b_end]
elif url.find('.', w_end + 1) != -1:
b_end = url.find('.', w_end + 1)
black = url[w_end + 1: b_end]
# if unproper url, black is not define
return {'white': white, 'black': black}
return None
def ask_kgs(kgs_username, year, month):
""" return a list of dic: { urlto, game_type} of games for the selected user, year and month
We have to check game_type here because it's not in the sgf but only on kgs website
Do not perform any check on players or whatever."""
if len(str(month)):
month = '0' + str(month)
url = 'https://www.gokgs.com/gameArchives.jsp?user=' + \
str(kgs_username) + '&year=' + str(year) + '&month=' + str(month)
r = requests.get(url)
t = r.text
soup = BeautifulSoup(t, 'html5lib')
# old method that just get the links to games
# we need type too to exclude reviews :(
# la = soup.find_all(href=re.compile('^http://files.gokgs.com/games/'))
l = [] # noqa: E741
if soup.table is None:
return l
trs = soup.table.find_all('tr')
for tr in trs[1:]:
tds = tr.find_all('td')
if tds[0].get_text() == 'Yes':
url = tds[0].a.get('href')
# crappy way to detect if a game is a review the #of row in the table... :(
if len(tds) == 6: # it's a review !
game_type = 'review'
else:
game_type = tds[5].get_text()
l.append({'url': url, 'game_type': game_type})
return l
def findnth(haystack, needle, n):
""" find the nth needle in a haystack. Return the index"""
parts = haystack.split(needle, n+1)
if len(parts) <= n+1:
return -1
return len(haystack)-len(parts[-1])-len(needle)
def parse_sgf_string(sgf_string):
"""parse a sgf from a string and return a dict:
bplayer,wplayer,time,byo,result,handi,komi,size,rule,date,place"""
# First remove all espaces and new lines from the sgf
sgf_string = sgf_string.replace(chr(160), '').replace(
chr(10), '').replace(chr(13), '')
prop = {
'DT': 'date',
'RE': 'result',
'PB': 'bplayer',
'PW': 'wplayer',
'KM': 'komi',
'HA': 'handicap',
'SZ': 'board_size',
'TM': 'time',
'OT': 'byo',
'PC': 'place',
'RU': 'rules',
}
# default values:
out = {
'date': None,
'komi': 0,
'time': 0,
'handicap': 0,
'board_size': 19,
}
for key in prop:
p = sgf_string.find(key + '[') # find the key and get the index
if p != -1:
q = sgf_string.find(']', p) # find the end of the tag
out[prop[key]] = sgf_string[p + 3:q]
# Format date, komi and time in proper type
if out['date'] is not None:
out['date'] = datetime.datetime.strptime(out['date'], '%Y-%m-%d')
out['komi'] = float(out['komi'])
out['time'] = int(out['time'])
out['board_size'] = int(out['board_size'])
out['handicap'] = int(out['handicap'])
# Set handicap to 1 if handicap is 0 and komi is 0.5: https://github.com/climu/openstudyroom/issues/364
if out['handicap'] == 0 and out['komi'] == 0.5:
out['handicap'] = 1
# counting the number of moves. Note that there could be a +-1 diff, but we don't really care
out['number_moves'] = 2 * sgf_string.count(';B[')
# We create a unique string based on exact time (ms) 5 first black moves where played.
# check code is: yyymmddwplayerbplayernsome black moves
code = ''
if out['date'] is not None:
code += datetime.datetime.strftime(out['date'], '%Y%m%d')
code += out['wplayer'] + out['bplayer']
for n in range(1, 7):
p = findnth(sgf_string, 'B[', 8 * n)
if p != -1:
q = sgf_string.find(']', p)
code += sgf_string[p + 2:q]
out['check_code'] = code
return out
def quick_send_mail(user, mail):
"""sends 'user' an email with the contents from the template in 'mail' """
address = user.get_primary_email()
if address is not None:
plaintext = loader.get_template(mail)
context = {'user': user}
message = plaintext.render(context)
send_mail(
'Welcome in the Open Study Room',
message,
[address.email],
fail_silently=False,
)
def parse_ogs_iso8601_datetime(dt_str):
"""turn '2019-04-30T14:41:18.183258-04:00' or '2019-04-30T14:41:18.183258Z' into
datetime.datetime(2019, 4, 30, 18, 41, 18, 183258).
OGS sends us these and we want to compare to a TZ-unaware datetime"""
dt = dateutil.parser.isoparse(dt_str)
dt = dt.astimezone(datetime.timezone.utc)
dt = dt.replace(tzinfo=None)
return dt