From bb85528aa39bf91350322f4201164be21dbe75e3 Mon Sep 17 00:00:00 2001 From: Ryar Nyah Date: Sun, 22 Sep 2019 21:49:33 +0200 Subject: [PATCH] Add ssl_ciphers && ssl_supported_protocols for kafka client --- library/kafka_lib.py | 104 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 8 deletions(-) diff --git a/library/kafka_lib.py b/library/kafka_lib.py index 093496ae..03636d3c 100644 --- a/library/kafka_lib.py +++ b/library/kafka_lib.py @@ -16,8 +16,13 @@ import itertools import json import tempfile +import ssl from pkg_resources import parse_version +# Init logging +import logging +import sys + from kafka.client import KafkaClient from kafka.protocol.api import Request, Response from kafka.protocol.metadata import MetadataRequest_v1 @@ -47,6 +52,11 @@ from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.pycompat24 import get_exception +# Default logging +log = logging.getLogger('kafka') +log.addHandler(logging.StreamHandler(sys.stdout)) +log.setLevel(logging.INFO) + ANSIBLE_METADATA = {'metadata_version': '1.0'} @@ -200,6 +210,14 @@ description: - 'when using ssl for Kafka, content of crl file or path to cert ' - 'crl file.' + ssl_supported_protocols: + description: + - 'when using ssl for Kafka, protocols supported by kafka client ' + choices: [TLSv1, TLSv1.1, TLSv1.2] + ssl_ciphers: + description: + - 'when using ssl for Kafka, available ciphers for ssl connections. ' \ + 'It should be a string in the OpenSSL cipher list format. ' sasl_mechanism: description: - 'when using sasl, whether use PLAIN or GSSAPI.' @@ -562,6 +580,59 @@ class CreatePartitionsRequest_v0(Request): ) +def generate_ssl_context(ssl_check_hostname, + ssl_cafile, + ssl_certfile, + ssl_keyfile, + ssl_password, + ssl_crlfile, + ssl_supported_protocols, + ssl_ciphers): + """ + Generate SSLContext for kafka client. + """ + log.debug('Configuring default SSL Context') + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.options |= ssl.OP_NO_SSLv2 + ssl_context.options |= ssl.OP_NO_SSLv3 + ssl_context.verify_mode = ssl.CERT_OPTIONAL + if ssl_supported_protocols: + if 'TLSv1' not in ssl_supported_protocols: + ssl_context.options |= ssl.OP_NO_TLSv1 + if 'TLSv1.1' not in ssl_supported_protocols: + ssl_context.options |= ssl.OP_NO_TLSv1_1 + if 'TLSv1.2' not in ssl_supported_protocols: + ssl_context.options |= ssl.OP_NO_TLSv1_2 + if ssl_check_hostname: + ssl_context.check_hostname = True + if ssl_cafile: + log.info('Loading SSL CA from %s', ssl_cafile) + ssl_context.load_verify_locations(ssl_cafile) + ssl_context.verify_mode = ssl.CERT_REQUIRED + else: + log.info('Loading system default SSL CAs from %s', + ssl.get_default_verify_paths()) + ssl_context.load_default_certs() + if ssl_certfile and ssl_keyfile: + log.info('Loading SSL Cert from %s', ssl_certfile) + log.info('Loading SSL Key from %s', ssl_keyfile) + ssl_context.load_cert_chain( + certfile=ssl_certfile, + keyfile=ssl_keyfile, + password=ssl_password) + if ssl_crlfile: + if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): + raise RuntimeError('This version of Python does not' + ' support ssl_crlfile!') + log.info('Loading SSL CRL from %s', ssl_crlfile) + ssl_context.load_verify_locations(ssl_crlfile) + ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF + if ssl_ciphers: + log.info('Setting SSL Ciphers: %s', ssl_ciphers) + ssl_context.set_ciphers(ssl_ciphers) + return ssl_context + + def generate_ssl_object(module, ssl_cafile, ssl_certfile, ssl_keyfile, ssl_crlfile=None): """ @@ -1438,7 +1509,7 @@ def main(): 'describe', 'describe_configs', 'idempotent_write', 'read', 'write'], required=False), - acl_pattern_type=dict(choice=['all', 'match', 'literal', + acl_pattern_type=dict(choice=['any', 'match', 'literal', 'prefixed'], required=False, default='literal'), @@ -1529,6 +1600,13 @@ def main(): ssl_crlfile=dict(required=False, default=None, type='path'), + ssl_supported_protocols=dict(required=False, default=None, + type='list', + choices=['TLSv1', 'TLSv1.1', + 'TLSv1.2']), + + ssl_ciphers=dict(required=False, default=None, type='str'), + # only PLAIN is currently available sasl_mechanism=dict(choices=['PLAIN', 'GSSAPI'], default='PLAIN'), @@ -1566,6 +1644,8 @@ def main(): ssl_keyfile = params['ssl_keyfile'] ssl_password = params['ssl_password'] ssl_crlfile = params['ssl_crlfile'] + ssl_supported_protocols = params['ssl_supported_protocols'] + ssl_ciphers = params['ssl_ciphers'] sasl_mechanism = params['sasl_mechanism'] sasl_plain_username = params['sasl_plain_username'] sasl_plain_password = params['sasl_plain_password'] @@ -1602,15 +1682,24 @@ def main(): zookeeper_auth.append(auth) try: + # Generate ssl context to support limit ssl protocols & ciphers + ssl_context = None + if security_protocol in ('SSL', 'SASL_SSL'): + ssl_context = generate_ssl_context( + ssl_check_hostname=ssl_check_hostname, + ssl_cafile=kafka_ssl_files['cafile']['path'], + ssl_certfile=kafka_ssl_files['certfile']['path'], + ssl_keyfile=kafka_ssl_files['keyfile']['path'], + ssl_password=ssl_password, + ssl_crlfile=kafka_ssl_files['crlfile']['path'], + ssl_supported_protocols=ssl_supported_protocols, + ssl_ciphers=ssl_ciphers + ) + manager = KafkaManager( module=module, bootstrap_servers=bootstrap_servers, security_protocol=security_protocol, api_version=api_version, - ssl_check_hostname=ssl_check_hostname, - ssl_cafile=kafka_ssl_files['cafile']['path'], - ssl_certfile=kafka_ssl_files['certfile']['path'], - ssl_keyfile=kafka_ssl_files['keyfile']['path'], - ssl_password=ssl_password, - ssl_crlfile=kafka_ssl_files['crlfile']['path'], + ssl_context=ssl_context, sasl_mechanism=sasl_mechanism, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, @@ -1628,7 +1717,6 @@ def main(): msg='Current version of library is not compatible with ' 'Kafka < 0.11.0.' ) - msg = '%s \'%s\': ' % (resource, name) if resource == 'topic':