-
Notifications
You must be signed in to change notification settings - Fork 0
/
ltm-ssl
executable file
·1477 lines (1184 loc) · 52.9 KB
/
ltm-ssl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/local/bin/python2.7 -u
#
# vim: set tabstop=4:shiftwidth=4:expandtab
'''
ltm-ssl
Manage SSL certificates and profiles on F5 Local Traffic Managers
This script uses the F5 iControl RESET interface. See also:
https://devcentral.f5.com/wiki/iControlrest.HomePage.ashx
Copyright (c) 2016 O'Shaughnessy Evans <shaug + github-ltm-ssl @ wumpus.org>
'''
import argparse
import collections
import copy
import datetime
import json
import os
import random
import requests
import signal
import sys
import tempfile
import textwrap
import time
# https://docs.python.org/2/library/simplehttpserver.html
import BaseHTTPServer
import SimpleHTTPServer
import SocketServer
LTM_HOST = 'FIXME'
LTM_USER = 'FIXME'
LTM_PASS = 'FIXME'
SSL_PROFILE_PARENT = 'clientssl-customer-instance'
Verbose = 0
class iControlREST:
"""Manage an F5 Local Traffic Manager using the iControl REST protocol.
"""
REST_PATHS = {
'clientssl': '/mgmt/tm/ltm/profile/clientSsl',
'clientssl_id': '/mgmt/tm/ltm/profile/clientSsl/~Common~{id}',
'crypto_cert': '/mgmt/tm/sys/crypto/cert',
'crypto_cert_id': '/mgmt/tm/sys/crypto/cert/{id}',
'crypto_key': '/mgmt/tm/sys/crypto/key',
'crypto_key_id': '/mgmt/tm/sys/crypto/key/{id}',
'file_sslcert': '/mgmt/tm/sys/file/sslCert',
'file_sslcert_id': '/mgmt/tm/sys/file/sslCert/~Common~{id}',
'file_sslkey': '/mgmt/tm/sys/file/sslKey',
'file_sslkey_id': '/mgmt/tm/sys/file/sslKey/~Common~{id}',
'transaction': '/mgmt/tm/transaction',
'transaction_id': '/mgmt/tm/transaction/{id}',
'virtual': '/mgmt/tm/ltm/virtual',
'virtual_id_443': '/mgmt/tm/ltm/virtual/~Common~{id}_443',
'virtual_profiles': '/mgmt/tm/ltm/virtual/~Common~{id}/profiles',
'virtual_profile': '/mgmt/tm/ltm/virtual/~Common~{id}/profiles/~Common~{profile}',
}
# Where do we store transitory files on the LTM?
# This probably won't ever need to change. It just needs to be unique to us.
LTM_TMP_PATH = '/var/tmp/ltm-ssl-'
# How many times should we try a REST request before giving up?
MAX_REQUEST_ATTEMPTS = 5
# If we get an unexpected error during a request, how many seconds
# should we wait before we try again?
RETRY_WAIT = 5
def __init__(self, verbose=0, user=None, password=None, hostname=None,
http_scheme='https', ssl_profile_parent=None):
"""Initialize a new iControlREST object.
Keyword arguments:
- verbose: integer that controls how much we talk about progress
- user: API authentication username
- password: API authentication password
- hostname: host or IP to which API calls will be sent
- http_scheme: "http" or "https"; defaults to "https"
- ssl_profile_parent: parent to assign to new SSL profiles
"""
self.verbose = verbose
self.http_scheme = http_scheme
self.hostname = hostname
self.session = requests.Session()
self.session.headers.update({ 'Content-Type': 'application/json' })
self.virtual_profiles = collections.OrderedDict()
self.virtual_servers = collections.OrderedDict()
self.ssl_profile_parent = ssl_profile_parent
self.ssl_profiles = collections.OrderedDict()
self.ssl_certs = collections.OrderedDict()
self.ssl_keys = collections.OrderedDict()
if user and password:
self.session.auth = (user, password)
# don't verify the SSL certificate from the LTM
requests.packages.urllib3.disable_warnings()
self.session.verify = False
def icontrol_url(self, function, *args, **kwargs):
"""Return the URL to call a iControl REST function.
Builds a string from the object's http_schem and hostname, which
were set during initialization. Passes any keyword arguments to
format(), where they can be used to fill in parts of the function
path taken from iControlREST.REST_PATHS[].
Arguments:
- function: name of the REST function to call, in lowercase.
"""
path = self.REST_PATHS[function]
return self.http_scheme + '://' + self.hostname + path.format(**kwargs)
def send(self, method, url,
payload={},
result_name='kind',
success_code=requests.codes.ok, **kwargs):
"""Send a REST API requst.
Arguments:
- method: 'DELETE', 'GET', 'PATCH', 'POST', or 'PUT'
- url: location to which the request will be sent
Keyword arguments:
- payload: hash of data to send in any request but GET or DELETE
- result_name: what index should we look for in the JSON response
that will indicate a success? The LTM seems to return "kind" with
just about any data request, so that's the default.
- success_code: what HTTP return code defines success?
This would normally be requests.codes.ok (200), but in the
case of a DELETE request, it will be 404.
- *: any other arguments are passed directly through to Requests
"""
if self.verbose >= 3:
sys.stderr.write("\n - {} {}\n".format(method, url))
if self.verbose >= 4 and payload != {}:
sys.stderr.write("\n - {} payload: {}\n".format(method,
gentlemanly_json(payload)))
for attempt in xrange(self.MAX_REQUEST_ATTEMPTS):
if self.verbose >= 3:
sys.stderr.write("\n Attempt %s\n" % (attempt + 1))
try:
if method == 'GET':
r = self.session.get(url, **kwargs)
elif method == 'DELETE':
r = self.session.delete(url, **kwargs)
elif method == 'PATCH':
r = self.session.patch(url, payload, **kwargs)
elif method == 'POST':
r = self.session.post(url, payload, **kwargs)
elif method == 'PUT':
r = self.session.put(url, payload, **kwargs)
except Exception as e:
sys.stderr.write("error when querying the device:\n")
for err in e:
sys.stderr.write(" {}\n".format(err))
r = None
if r is not None:
try:
if r.status_code != success_code:
r.raise_for_status()
#sys.stderr.write(" text: \"{}\"\n".format(r.text))
try:
rj = r.json()
except:
continue
if result_name in rj:
break
elif r.status_code != success_status and 'errorStack' in rj:
sys.stderr.write("error: {}\n".format(rj['message']))
return None
except requests.exceptions.HTTPError as e:
if r.status_code == 404:
if self.verbose:
sys.stderr.write("error: {} not found\n".format(
url))
else:
sys.stderr.write("unexpected error: {}\n".format(e))
return None
else:
sys.stderr.write(("received unexpected reponse on attempt " +
"{attempt}; retrying in {retry}s\n").format(
attempt=attempt+1, retry=self.RETRY_WAIT))
if self.verbose >= 3 and r is not None:
sys.stderr.write("result:\n{}\n".format(r.json()))
time.sleep(self.RETRY_WAIT)
else:
if r is not None:
sys.stderr.write("error: no valid response received; " +
"last reply:\n%s\n" % r.json())
else:
sys.stderr.write("error: no valid response received\n")
return None
if self.verbose >= 3:
sys.stdout.write("\n")
return r.json()
def DELETE(self, url, payload, result_name, **kwargs):
"""Send an HTTP DELETE to the LTM
See the send() method for argument descriptions.
"""
result = self.send('DELETE', url, payload, result_name,
success_code=requests.codes.not_found, **kwargs)
if result is not None:
if 'code' in result and 'errorStack' in result:
if result['code'] == 404 and result['errorStack'] == []:
return True
if self.verbose >= 3:
sys.stderr.write("suspicious DELETE reply:\n" +
gentlemanly_json(result))
return result
else:
return result
def GET(self, url, result_name, **kwargs):
"""Send an HTTP GET to the LTM
See the send() method for argument descriptions.
"""
result = self.send('GET', url, result_name=result_name, **kwargs)
if result is not None:
if 'code' in result and 'errorStack' in result:
if self.verbose:
sys.stderr.write("error: {}\n".format(result['errorStack']))
return False
if result_name not in result and self.verbose >= 3:
sys.stderr.write("suspicious GET reply:\n" +
gentlemanly_json(result))
return result
else:
return False
def PATCH(self, url, payload, result_name, **kwargs):
"""Send an HTTP PATCH to the LTM
See the send() method for argument descriptions.
"""
result = self.send('PATCH', url, payload, result_name, **kwargs)
if result is not None:
if 'code' in result and 'errorStack' in result:
if self.verbose:
sys.stderr.write("error: {}\n".format(result['errorStack']))
return False
if result_name not in result and self.verbose >= 3:
sys.stderr.write("suspicious PATCH reply:\n" +
gentlemanly_json(result))
return result
else:
return False
def POST(self, url, payload, result_name, **kwargs):
"""Send an HTTP POST to the LTM
See the send() method for argument descriptions.
"""
result = self.send('POST', url, payload, result_name, **kwargs)
if result is not None:
if 'code' in result and 'errorStack' in result:
if self.verbose:
sys.stderr.write("error: {}\n".format(result['errorStack']))
return False
if result_name not in result and self.verbose >= 3:
sys.stderr.write("suspicious POST reply:\n" +
gentlemanly_json(result))
return result
else:
return False
def PUT(self, url, payload, result_name, **kwargs):
"""Send an HTTP PUT to the LTM
See the send() method for argument descriptions.
"""
result = self.send('PUT', url, payload, result_name, **kwargs)
if result is not None:
if 'code' in result and 'errorStack' in result:
if self.verbose:
sys.stderr.write("error: {}\n".format(result['errorStack']))
return False
if result_name not in result and self.verbose >= 3:
sys.stderr.write("suspicious PUT reply:\n" +
gentlemanly_json(result))
return result
else:
return False
def load_virtual_servers(self):
"""Read a list of all the virtual servers on the LTM.
This script will only retain information on those with a
destination on port 443, just to avoid wasting memory on any
that aren't likely to be running an SSL service.
"""
if self.verbose >= 2:
sys.stdout.write("* Loading all virtual servers\n")
self.virtual_servers = collections.OrderedDict()
url = self.icontrol_url('virtual')
results = self.GET(url, 'items')
if 'items' in results:
for item in results['items']:
#if self.verbose >= 2:
# sys.stdout.write(("** retrieved virt server {}: {}\n"
# ).format(
# item['name'],
# item['destination']
# ))
if not item['destination'].endswith(':443'):
continue
if self.verbose >= 3:
sys.stdout.write("** ... remembering SSL server\n")
self.virtual_servers[item['name']] = item.copy()
else:
sys.stderr.write("error: no valid response received;" +
"last reply:\n%s\n" % results)
return False
for s in self.virtual_servers.keys():
self.load_virtual_profiles(s)
return True
def load_virtual_profiles(self, name):
"""Read all the profiles associated with a virtual server
Arguments:
- name: name of the virtual server
"""
if self.verbose >= 3:
sys.stdout.write("* Loading virtual profiles for {}\n".format(name))
self.virtual_profiles[name] = collections.OrderedDict()
url = self.icontrol_url('virtual_profiles', id=name)
result = self.GET(url, 'items')
if result is False:
return None
elif 'items' in result:
for item in result['items']:
if self.verbose >= 3:
sys.stdout.write(("** copying virt profile {}: {}\n"
).format(item['name'], item.copy()))
self.virtual_profiles[name][item['name']] = item.copy()
else:
sys.stderr.write("error: no valid response received;" +
"last reply:\n%s\n" % result)
return False
return True
def print_virtuals_with_profile(self, profile, context):
"""Print a list of virtual servers with the given profile/context.
Arguments:
- profile: name of the profile to find in the list of servers
- context: context of profiles to display; passed directly through
to print_virtual_profile()
"""
for s in self.virtual_servers.viewkeys():
#print "* looking for {} in {}\n".format(profile, s)
if self.virtual_has_profile(s, profile):
print s
def print_virtual_profiles(self, context):
"""Print information about every Virtual Server profile
Arguments:
- context: context of profiles to display; passed directly through
to print_virtual_profile()
"""
for s in self.virtual_servers.viewkeys():
self.print_virtual_profile(s, context)
def print_virtual_profile(self, name, context):
"""Print all the details about a Virtual Server profile to stdout
Arguments:
- name: name of the virtual server
- context: context of profiles to display ('clientside',
'serverside', or 'all')
"""
if not self.has_virtual_profiles(name):
return False
if self.verbose:
sys.stdout.write("\n### Virtual server {}\n\n".format(name))
for p in self.virtual_profiles[name].viewvalues():
if context != p['context']:
continue
sys.stdout.write("SSL profile {}:\n\n".format(p['name']))
for (k, v) in p.viewitems():
sys.stdout.write("* {}={}\n".format(k, v))
sys.stdout.write("\n")
else:
sys.stdout.write("\n{}:\n\n".format(name))
for p in self.virtual_profiles[name].viewvalues():
if context != p['context']:
continue
sys.stdout.write("* {}\n".format(p['name']))
sys.stdout.write("\n")
def load_ssl_profiles(self):
"""Fetch and store details about all the SSL profiles on the LTM
"""
if self.verbose >= 2:
sys.stdout.write("* Loading all SSL profiles\n")
url = self.icontrol_url('clientssl')
result = self.GET(url, 'items')
if 'items' in result:
for item in result['items']:
self.ssl_profiles[item['name']] = item.copy()
else:
sys.stderr.write("error: no valid response received;" +
"last reply:\n%s\n" % result)
return False
return True
def load_ssl_certs(self):
"""Fetch and store details about all the SSL certificates on the LTM
"""
if self.verbose >= 2:
sys.stdout.write("* Loading all SSL certificate info\n")
url = self.icontrol_url('crypto_cert')
result = self.GET(url, 'items')
if 'items' in result:
for item in result['items']:
self.ssl_certs[item['name']] = item.copy()
else:
sys.stderr.write("error: no valid response received;" +
"last reply:\n%s\n" % result)
return False
return True
def load_ssl_keys(self):
"""Fetch and store details about all the SSL keys on the LTM
"""
if self.verbose >= 2:
sys.stdout.write("* Loading all SSL key info\n")
url = self.icontrol_url('crypto_key')
result = self.GET(url, 'items')
if 'items' in result:
for item in result['items']:
self.ssl_keys[item['name']] = item.copy()
else:
sys.stderr.write("error: no valid response received;" +
"last reply:\n%s\n" % result)
return False
return True
def virtual_has_profile(self, server, profile):
"""Return whether the given virtual server has the given profile.
Parameters:
- server: server name of the virtual server to look up
- profile: server name of the profile to locate in that server's info
"""
# return False if we don't know anything about the server
if server not in self.virtual_servers:
#print "server {} is not in virtual_servers".format(server)
return False
# return False if the virtual server doesn't have any profiles
# (seems weird... but possible?)
if server not in self.virtual_profiles:
#print "server {} is not in virtual_profiles".format(server)
return False
server_profiles = self.virtual_profiles[server]
if profile in server_profiles:
return True
else:
return False
def has_virtual_profiles(self, name):
"""Return whether we have info on the profiles of the named server.
Parameters:
- name: server name of the profile to locate
"""
if name in self.virtual_profiles:
return True
else:
return False
def has_ssl_cert(self, name):
"""Return whether we have information about the named SSL certificate
Parameters:
- name: server name of the certificate to locate
"""
if name+'.crt' in self.ssl_certs:
return True
else:
return False
def has_ssl_key(self, name):
"""Return whether we have information about the named SSL key
Parameters:
- name: server name of the key to locate
"""
if name+'.key' in self.ssl_keys:
return True
else:
return False
def has_ssl_profile(self, name):
"""Return whether we have information about the named SSL profile
Parameters:
- name: server name of the profile to locate
"""
if name in self.ssl_profiles:
return True
else:
return False
def print_ssl_profile(self, name):
"""Write details about a profile to stdout
Parameters:
- name: server name of the profile to display
"""
if name not in self.ssl_profiles:
return False
profile = self.ssl_profiles[name]
sys.stdout.write("\n## SSL profile {}\n\n".format(name))
for (k, v) in profile.viewitems():
sys.stdout.write("* {}={}\n".format(k, v))
return True
def print_ssl_status(self, name):
"""Write a brief statement showing whether an SSL profile is enabled
Parameters:
- name: server name of the profile to display
"""
if not self.has_ssl_profile(name):
return False
else:
profile = self.ssl_profiles[name]
sys.stdout.write("{}: {}\n".format(name, profile['mode']))
return True
def print_ssl_cert(self, name):
"""Write details about an SSL certificate to stdout
Parameters:
- name: server name of the certificate to display
"""
if not self.has_ssl_cert(name):
sys.stderr.write("SSL certificate for {} not found.\n".format(name))
return False
else:
cert = self.ssl_certs[name+'.crt']
sys.stdout.write("\n## SSL certificate {}\n\n".format(name))
for (k, v) in cert.viewitems():
sys.stdout.write("* {}={}\n".format(k, v))
return True
def print_ssl_key(self, name):
"""Write details about an SSL key to stdout
Parameters:
- name: server name of the key to display
"""
if not self.has_ssl_key(name):
sys.stderr.write("SSL key for {} not found.\n".format(name))
return False
else:
key = self.ssl_keys[name+'.key']
sys.stdout.write("\n## SSL key {}\n\n".format(name))
for (k, v) in key.viewitems():
sys.stdout.write("* {}={}\n".format(k, v))
return True
def add_virtual_profile(self, host, profile, context):
"""Add a profile to a virtual server.
Positional arguments:
- host: name of a virtual server
- profile: name of a profile not already associated with that host
- context: the context to which the profile is applied ("clientside",
"serverside", or "all".
"""
payload = {
'kind': 'tm:ltm:virtual:profiles:profilesstate',
'name': profile,
'context': context,
'partition': 'Common',
}
url = self.icontrol_url('virtual_profiles', id=host)
result = self.POST(url, json.dumps(payload), 'generation')
if result is False:
sys.stderr.write("error: no valid response received\n")
return False
else:
return True
def delete_virtual_profile(self, host, profile):
"""Remove a profile from a virtual server on the LTM.
Positional arguments:
- host: name of a virtual server
- profile: name of a profile already associated with that host
"""
url = self.icontrol_url('virtual_profile', id=host,
profile=profile)
return self.DELETE(url, '', 'code')
def get_crypto_cert(self, name=None):
"""Get the details of an SSL certificate resource"
Keyword arguments:
- name: name of the certificate to load
"""
# I don't know if this is a bug, but requesting .../crypto/cert/{id}
# and .../crypto/key/{id} doesn't work when you include "~Common~".
# If that part is stripped out, requests work, even though the
# reference URL includes that part.
if name:
if not name.endswith('.crt'):
name = name+'.crt'
relative_name = name.replace('~Common~', '')
url = self.icontrol_url('crypto_cert_id', id=relative_name)
return self.GET(url, 'name')
else:
return None
def add_crypto_cert(self, name=None, cert=None, chain=None):
"""Copy and install an SSL certificate file (and chain)
Keyword arguments:
- name: name by which the certificate and chain will be referenced
- cert: path to a local file with the SSL certificate
- chain: path to a local file with the SSL intermediate certificates
"""
dest_cert = None
return_status = True
# Create a temporary file that will hold all our cert components.
# We'll combine the cert and chain together then copy the result.
temp_cert = tempfile.NamedTemporaryFile()
if cert:
with open(cert, 'r') as f: cert_text = f.read()
if self.verbose >= 3:
sys.stdout.write("* appending {} to {}\n".format(cert,
temp_cert.name))
if self.verbose >= 4:
sys.stdout.write("\n## SSL certificate \"{}\":\n".format(cert))
sys.stdout.write("\n" + cert_text + "\n")
temp_cert.write(cert_text)
if chain:
with open(chain, 'r') as f: cert_text = f.read()
if self.verbose >= 3:
sys.stdout.write("* appending {} to {}\n".format(chain,
temp_cert.name))
if self.verbose >= 4:
sys.stdout.write("\n## SSL chain \"{}\":\n".format(cert))
sys.stdout.write("\n" + cert_text + "\n")
temp_cert.write(cert_text)
if temp_cert:
tmp_name = random.randint(100000, 999999)
dest_cert = '{}crt.{}'.format(self.LTM_TMP_PATH, tmp_name)
scp_cmd = 'scp -q \'{}\' \'{}:{}\''.format(temp_cert.name,
self.hostname, dest_cert)
if self.verbose >= 3:
sys.stdout.write("* Uploading {} to {}\n".format(temp_cert.name,
dest_cert))
if self.verbose >= 4:
sys.stdout.write("\n {}\n".format(scp_cmd))
os.system(scp_cmd)
cert_url = self.icontrol_url('crypto_cert')
cert_payload = {
'name': name,
'command': 'install',
'from-local-file': dest_cert,
}
result = self.POST(cert_url, json.dumps(cert_payload), 'kind')
if result is False:
sys.stderr.write("error: no valid response received\n")
return_status = False
# remove the files we uploaded to the LTM's temp dir
if dest_cert:
ssh_cmd = 'ssh {} rm {}'.format(self.hostname, dest_cert)
if self.verbose >= 3:
sys.stdout.write("* Removing {} from {}\n".format(dest_cert,
self.hostname))
if self.verbose >= 4:
sys.stdout.write("\n {}\n".format(ssh_cmd))
os.system(ssh_cmd)
return return_status
def delete_crypto_cert(self, name=None):
"""Delete an SSL certificate resource"
Keyword arguments:
- name: name of the cert to delete
"""
if name:
if not name.endswith('.crt'):
name = name+'.crt'
url = self.icontrol_url('crypto_cert_id', id=name)
return self.DELETE(url, '', 'code')
else:
return None
def add_crypto_key(self, name=None, key=None):
"""Copy and install an SSL key file
Keyword arguments:
- name: name by which the key will be referenced
- key: path to a local file with the unencrypted SSL key
"""
dest_key = None
return_status = True
if key:
tmp_name = random.randint(100000, 999999)
dest_key = '{}key.{}'.format(self.LTM_TMP_PATH, tmp_name)
scp_cmd = 'scp -q \'{}\' \'{}:{}\''.format(key, self.hostname,
dest_key)
if self.verbose >= 3:
sys.stdout.write("* Uploading {} to {}\n".format(key,
dest_key))
if self.verbose >= 4:
sys.stdout.write("\n {}\n".format(scp_cmd))
os.system(scp_cmd)
key_url = self.icontrol_url('crypto_key')
key_payload = {
'name': name,
'command': 'install',
'from-local-file': dest_key,
}
result = self.POST(key_url, json.dumps(key_payload), 'kind')
if result is False:
sys.stderr.write("error: no valid response received\n")
return_status = False
if dest_key:
ssh_cmd = 'ssh {} rm {}'.format(self.hostname, dest_key)
if self.verbose >= 3:
sys.stdout.write("* Removing {} from {}\n".format(dest_key,
self.hostname))
if self.verbose >= 4:
sys.stdout.write("\n {}\n".format(ssh_cmd))
os.system(ssh_cmd)
return return_status
def get_crypto_key(self, name=None):
"""Get the details of an SSL key resource"
Keyword arguments:
- name: name of the key to load
"""
# I don't know if this is a bug, but requesting .../crypto/cert/{id}
# and .../crypto/key/{id} doesn't work when you include "~Common~".
# If that part is stripped out, requests work, even though the
# reference URL includes that part.
if name:
if not name.endswith('.key'):
name = name+'.key'
relative_name = name.replace('~Common~', '')
url = self.icontrol_url('crypto_key_id', id=relative_name)
return self.GET(url, 'name')
else:
return None
def delete_crypto_key(self, name=None):
"""Delete an SSL key resource"
Keyword arguments:
- name: name of the key to delete
"""
if name:
if not name.endswith('.key'):
name = name+'.key'
url = self.icontrol_url('crypto_key_id', id=name)
return self.DELETE(url, '', 'code')
else:
return None
def create_ssl_profile(self, profile_name=None, servername=None,
cert=None, chain=None, key=None):
"""Create a new SSL profile on the LTM.
Uses scp to transfer the given files to /var/tmp on the LTM,
uses REST to install each, then uses REST to create a profile
that references them.
Keyword arguments:
- profile_name: name of an existing profile
- servername: name of a virtual server
- cert, chain, and key: strings of local file paths
"""
if self.verbose >= 1:
sys.stdout.write("\n## Creating SSL profile \"{name}\"\n".format(
name=profile_name))
# Upload our new certificate components.
if self.verbose >= 2:
sys.stdout.write("\n### Uploading new keychain components\n\n")
cert_date_suffix = '%Y-%m-%d-%H:%M:%S'
new_cert_name = '{}-{}'.format(profile_name,
datetime.datetime.now().strftime(
cert_date_suffix))
if self.verbose >= 2:
sys.stdout.write("New keychain name: " +
"\"{}\"\n\n".format(new_cert_name))
# Install the new SSL private key
result = self.add_crypto_key(name=new_cert_name, key=key)
if result is False:
sys.stderr.write("\n")
print_error('''
Failure when installing new private certificate key;
aborting SSL profile creation.
''')
sys.stderr.write("\n")
return False
else:
new_key = self.get_crypto_key(new_cert_name)
# Install the new SSL public certificate
result = self.add_crypto_cert(name=new_cert_name, cert=cert,
chain=chain)
if result is False:
sys.stderr.write("\n")
print_error('''
Failure when installing new public certificate;
aborting SSL profile creation.
''')
if self.verbose:
sys.stderr.write("\n")
print_error('''
Make sure you include intermediate certificates.
They can be inside your cert file or specified
separately as a chain file.
''')
sys.stderr.write("\n")
return False
else:
new_cert = self.get_crypto_cert(new_cert_name)
if self.verbose >= 1:
sys.stdout.write(" * key: {}\n".format(new_key['fullPath']))
sys.stdout.write(" * cert: {}\n".format(new_cert['fullPath']))
sys.stdout.write(" * CN: {}\n".format(new_cert['commonName']))
sys.stdout.write(" * SAN: {}\n".format(
new_cert['subjectAlternativeName']))
sys.stdout.write(" * expires: {}\n".format(
new_cert['apiRawValues']['expiration']))
if self.verbose >= 2:
sys.stdout.write("\n### Updating SSL profile with new keychain\n\n")
url = self.icontrol_url('clientssl')
payload = {
'name': profile_name,
'defaultsFrom': self.ssl_profile_parent,
'description': '{}, created {}'.format(servername,
datetime.datetime.now().strftime('%c')),
#'certKeyChain': {
'cert': '{}.crt'.format(new_cert_name),
'chain': '{}.crt'.format(new_cert_name),
'key': '{}.key'.format(new_cert_name),
#},
}
if servername:
payload['serverName'] = servername
result = self.POST(url, json.dumps(payload), 'name')
if result is False:
sys.stderr.write("error: no valid response received")
return False
if self.verbose >= 1:
sys.stdout.write("Complete! All steps succeeded.\n")
sys.stdout.write("New keychain for {} is {}.\n".format(
profile_name, new_cert_name))
return True
def replace_ssl_profile(self, profile_name=None,
cert=None, chain=None, key=None):
"""Replace an SSL profile, swapping in new keychain components.
If given, the local files for `cert`, `chain`, and `key` will be
uploaded and installed using `profile_name` as a prefix and a
generated suffix, so that we can know they'll be unique in the
system but match each other.
These files are scp'd to the LTM device using the credentials &
environment of the invoking user. After copying to the device, each
is installed.
After installation of the files, the given SSL profile is modified
to reference these files, and the old files are removed.
Keyword arguments:
- profile_name: name of an existing profile
- cert, chain, and key: strings of local file paths
"""