-
Notifications
You must be signed in to change notification settings - Fork 1
/
ircrypt.py
457 lines (368 loc) · 12.5 KB
/
ircrypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
__module_name__ = 'IRCrypt'
__module_version__ = 'Snapshot'
__module_description__ = 'IRCrypt: Encryption layer for IRC'
ircrypt_help_text = '''
Add, change or remove key for nick or channel.
IRCrypt command options:
list List keys, special ciphers and options
set-key [-server <network>] <target> <key> Set key for target
remove-key [-server <network>] <target> Remove key for target
set-cipher [-server <network>] <target> <cipher> Set specific cipher for channel
remove-cipher [-server <network>] <target> Remove specific cipher for channel
set-option <option> <value> Set an option of IRCrypt
Set the key for a channel:
/ircrypt set-key #IRCrypt key
Remove the key:
/ircrypt remove-key #IRCrypt
Set the key for a user:
/ircrypt set-key nick key
Switch to a specific cipher for a channel:
/ircrypt set-cipher #IRCrypt TWOFISH
Unset the specific cipher for a channel:
/ircrypt remove-cipher #IRCrypt
Set option CIPHER to AES
/ircrypt set-option CIPHER AES
'''
import string, os, subprocess, base64, time, xchat
# Global buffers used to store message parts, configuration options, keys, etc.
ircrypt_msg_buffer = {}
ircrypt_keys = {}
ircrypt_ciphers = {}
ircrypt_options = {'CIPHER': 'TWOFISH'}
ircrypt_gpg_binary = None
# Constants used throughout this script
MAX_PART_LEN = 300
MSG_PART_TIMEOUT = 300 # 5min
class MessageParts:
'''Class used for storing parts of messages which were split after
encryption due to their length.'''
modified = 0
last_id = None
message = ''
def update(self, id, msg):
'''This method updates an already existing message part by adding a new
part to the old ones and updating the identifier of the latest received
message part.
'''
# Check if id is correct. If not, throw away old parts:
if self.last_id and self.last_id != id+1:
self.message = ''
# Check if the are old message parts which belong due to their old age
# probably not to this message:
if time.time() - self.modified > MSG_PART_TIMEOUT:
self.message = ''
self.last_id = id
self.message = msg + self.message
self.modified = time.time()
def popen(*args, **kwargs):
'''Calls subprocess.Popen, injecting the settings to suppress the command
shell on Windows.
'''
try:
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
except:
pass
return subprocess.Popen(*args, **kwargs)
def ircrypt_decrypt_hook(word, word_eol, userdata):
global ircrypt_msg_buffer, ircrypt_keys, ircrypt_gpg_binary
if '>ACRY' in word_eol[0]:
if '>ACRY-0' in word_eol[0]:
xchat.command('NOTICE %s :>UCRY-NOASYM' % word[0])
return xchat.EAT_ALL
# get context
con = xchat.get_context()
# Get channel and server from context
channel = con.get_info('channel')
server = con.get_info('network')
nick = word[0]
target = '%s/%s' % (server, channel)
# Get key
key = ircrypt_keys.get('%s/%s' % (server, channel))
if key:
# if key exists and >CRY part of message start symmetric encryption
if '>CRY-' in word_eol[0]:
pre, message = string.split(word_eol[0], '>CRY-', 1)
number, message = string.split(message, ' ', 1 )
message = string.split(message, ' ', 1)[0]
# Get key for the message buffer
buf_key = '%s.%s.%s' % (server, channel, nick)
# Decrypt only if we got last part of the message
# otherwise put the message into a globa buffer and quit
if int(number) != 0:
if not buf_key in ircrypt_msg_buffer:
ircrypt_msg_buffer[buf_key] = MessageParts()
ircrypt_msg_buffer[buf_key].update(int(number), message)
return xchat.EAT_ALL
# Get whole message
try:
message = message + ircrypt_msg_buffer[buf_key].message
except KeyError:
pass
# Decrypt
p = popen([ircrypt_gpg_binary, '--batch', '--no-tty', '--quiet',
'--passphrase-fd', '-', '-d'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.stdin.write('%s\n' % key)
p.stdin.write(base64.b64decode(message))
p.stdin.close()
decrypted = p.stdout.read()
p.stdout.close()
# Get and print GPG errors/warnings
err = p.stderr.read()
p.stderr.close()
if err:
con.prnt('GPG reported error:\n%s' % err)
# Remove old messages from buffer
try:
del ircrypt_msg_buffer[buf_key]
except KeyError:
pass
con.emit_print(userdata, nick, decrypted)
return xchat.EAT_XCHAT
# Not decrypted
return xchat.EAT_NONE
def ircrypt_encrypt_hook(word, word_eol, userdata):
global ircrypt_keys, ircrypt_ciphers, ircrypt_options, ircrypt_gpg_binary
# Get context
con = xchat.get_context()
# Get channel and server from context
channel = con.get_info('channel')
server = con.get_info('network')
target = '%s/%s' % (server, channel)
if target in ircrypt_keys:
# Get cipher
cipher = ircrypt_ciphers.get('%s/%s' % (server, channel))
if not cipher:
cipher = ircrypt_options['CIPHER']
# encrypt message
p = popen([ircrypt_gpg_binary, '--batch', '--no-tty', '--quiet',
'--symmetric', '--cipher-algo', cipher, '--passphrase-fd', '-'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.stdin.write('%s\n' % ircrypt_keys[target])
p.stdin.write(word_eol[0])
p.stdin.close()
encrypted = base64.b64encode(p.stdout.read())
p.stdout.close()
# Get and print GPG errors/warnings
err = p.stderr.read()
p.stderr.close()
if err:
con.prnt('GPG reported error:\n%s' % err)
xchat.emit_print('Your Message', xchat.get_info('nick'), word_eol[0])
# If too long for one message, split and send
if len(encrypted) > MAX_PART_LEN:
xchat.command('PRIVMSG %s :>CRY-1 %s' % (channel, encrypted[MAX_PART_LEN:]))
# Send (rest)
xchat.command('PRIVMSG %s :>CRY-0 %s' % (channel, encrypted))
return xchat.EAT_ALL
def ircrypt_notice_hook(word, word_eol, userdata):
# No key exchange
if '>WCRY' in word_eol[0]:
if '>WCRY-0' in word_eol[0]:
xchat.command('NOTICE %s :>UCRY-NOEXCHANGE' % word[0])
return xchat.EAT_ALL
return xchat.EAT_NONE
def ircrypt_command_hook(word, word_eol, userdata):
global ircrypt_keys, ircrypt_ciphers, ircrypt_options
# Get context
con = xchat.get_context()
# list
if len(word) == 1 or word[1] == 'list':
# Print keys, special cipher and options in current context
if ircrypt_keys:
con.prnt('\nKeys:')
for servchan,key in ircrypt_keys.iteritems():
con.prnt('%s : %s' % (servchan, key))
if ircrypt_ciphers:
con.prnt('\nSpecial Cipher:')
for servchan,spcip in ircrypt_ciphers.iteritems():
con.prnt('%s : %s' % (servchan, spcip))
if ircrypt_options:
con.prnt('\nOptions:')
for option, value in ircrypt_options.iteritems():
con.prnt('%s : %s' % (option, value))
return xchat.EAT_ALL
# Set options
if word[1] == 'set-option':
if len(word) < 4:
con.prnt(param)
return xchat.EAT_ALL
value = ' '.join(word[3:])
ircrypt_options[word[2].upper()] = value
# Print status message to current context
con.prnt('Set option %s to %s' % (word[2], value))
return xchat.EAT_ALL
if not word[1] in ['buffer', 'set-key', 'remove-key',
'set-cipher', 'remove-cipher']:
con.prnt('Unknown command. Try /help ircrypt')
return xchat.EAT_ALL
# Check if a server was set
if (len(word) > 3 and word[2] == '-server'):
server = word[3]
del word[3]
del word[2]
else:
# Try to determine the server automatically
server = con.get_info('network')
# All remaining commands need a server name
if not server:
# if no server was set print message in ircrypt buffer and throw error
con.prnt('Unknown Server. Please use -server to specify server')
return xchat.EAT_ALL
param = 'Not enough parameter. Try /help ircrypt'
# For the remaining commands we need at least one additional argument
if len(word) < 3:
con.prnt(param)
return xchat.EAT_ALL
target = '%s/%s' % (server, word[2])
# Set keys
if word[1] == 'set-key':
if len(word) < 4:
con.prnt(param)
return xchat.EAT_ALL
ircrypt_keys[target] = ' '.join(word[3:])
# Print status message to current context
con.prnt('Set key for %s' % target)
return xchat.EAT_ALL
# Remove keys
if word[1] == 'remove-key':
if len(word) < 3 :
con.prnt(param)
return xchat.EAT_ALL
# Check if key is set and print error in current context otherwise
if target not in ircrypt_keys:
con.prnt('No existing key for %s.' % target)
return xchat.EAT_ALL
# Delete key and print status message in current context
del ircrypt_keys[target]
con.prnt('Removed key for %s' % target)
return xchat.EAT_ALL
# Set special cipher for channel
if word[1] == 'set-cipher':
if len(word) < 4:
con.prnt(param)
return xchat.EAT_ALL
ircrypt_ciphers[target] = ' '.join(word[3:])
# Print status message to current context
con.prnt('Set special cipher for %s' % target)
return xchat.EAT_ALL
# Remove secial cipher for channel
if word[1] == 'remove-cipher':
if len(word) < 3 :
con.prnt(param)
return xchat.EAT_ALL
# Check if cipher is set and print error in current context otherwise
if target not in ircrypt_ciphers:
con.prnt('No existing special cipher for %s.' % target)
return xchat.EAT_ALL
# Delete cipher and print status message in current context
del ircrypt_ciphers[target]
con.prnt('Removed special cipher for %s' % target)
return xchat.EAT_ALL
# Set option
if word[1] == 'set-option':
if len(word) < 4:
con.prnt(param)
return xchat.EAT_ALL
ircrypt_ciphers[target] = ' '.join(word[3:])
# Print status message to current context
con.prnt('Set special cipher for %s' % target)
return xchat.EAT_ALL
# Error if command was unknown
return xchat.EAT_NONE
def ircrypt_init():
global ircrypt_keys, ircrypt_options, ircrypt_ciphers
# Open config file
f = None
try:
f = open('%s/ircrypt.conf' % xchat.get_info('xchatdirfs'), 'r')
except:
pass
if not f :
xchat.prnt('Could not open ircrypt.conf.')
return xchat.EAT_ALL
for line in f:
# Read keys
if line[0:4] == 'key:':
(prefix, target, key) = line.split(':',2)
ircrypt_keys[target] = key[0:-1]
else:
# Read options
if line.startswith('option:'):
(_, option, value) = line.split(':',2)
ircrypt_options[option.upper()] = value.rstrip('\n\r')
else:
# Read special cipher
if line[0:7] == 'cipher:':
(prefix, target, cipher) = line.split(':',2)
ircrypt_ciphers[target] = cipher[0:-1]
xchat.prnt('IRCrypt re(loaded)')
return xchat.EAT_ALL
def ircrypt_unload(userdata):
global ircrypt_keys, ircrypt_options, ircrypt_ciphers
# Open config file
f = open('%s/ircrypt.conf' % xchat.get_info('xchatdirfs'), 'w')
if not f :
xchat.prnt('Could not open ircrypt.conf.')
return xchat.EAT_ALL
# write keys
for target in ircrypt_keys:
f.write('key:%s:%s\n' % (target, ircrypt_keys[target]))
# write options
for option in ircrypt_options:
f.write('option:%s:%s\n' % (option, ircrypt_options[option]))
# write special cipher
for target in ircrypt_ciphers:
f.write('cipher:%s:%s\n' % (target, ircrypt_ciphers[target]))
return xchat.EAT_ALL
def ircrypt_find_gpg_binary():
'''Check for GnuPG binary to use
:returns: Tuple with binary name and version.
'''
for binary in ('gpg2','gpg'):
try:
p = popen([binary, '--version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
version = p.stdout.read().split('\n',1)[0]
if p.wait():
continue
return binary, version
except:
pass
return None, None
def ircrypt_check_binary():
'''If binary is not set, try to determine it automatically
'''
global ircrypt_gpg_binary
ircrypt_gpg_binary = ircrypt_options.get('BINARY')
if ircrypt_gpg_binary:
return
ircrypt_gpg_binary,version = ircrypt_find_gpg_binary()
if not ircrypt_gpg_binary:
xchat.prnt('Automatic detection of the GnuPG binary failed and '
'nothing is set manually. You wont be able to use IRCrypt like '
'this. Please install GnuPG or set the path to the binary to '
'use.')
else:
xchat.prnt('Found %s' % version)
ircrypt_options['BINARY'] = ircrypt_gpg_binary
def test(word, word_eol, userdata):
xchat.prnt(word[-2])
return xchat.EAT_ALL
# Initialize
ircrypt_init()
# Chek if gpg binary is set
ircrypt_check_binary()
# hook for ircrypt command
xchat.hook_command('ircrypt', ircrypt_command_hook, help=ircrypt_help_text)
# hook for encryption
xchat.hook_command('', ircrypt_encrypt_hook)
# hook for decryption
xchat.hook_print('Channel Message', ircrypt_decrypt_hook, 'Channel Message')
# hook to check for asymmetric encryption in notices
xchat.hook_print('Notice', ircrypt_notice_hook)
# Unload
xchat.hook_unload(ircrypt_unload)