Skip to content

Commit

Permalink
Add ssl_ciphers && ssl_supported_protocols for kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
ryarnyah authored and StephenSorriaux committed Sep 29, 2019
1 parent e243ed2 commit bb85528
Showing 1 changed file with 96 additions and 8 deletions.
104 changes: 96 additions & 8 deletions library/kafka_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
import itertools
import json
import tempfile
import ssl
from pkg_resources import parse_version

# Init logging
import logging
import sys

from kafka.client import KafkaClient
from kafka.protocol.api import Request, Response
from kafka.protocol.metadata import MetadataRequest_v1
Expand Down Expand Up @@ -47,6 +52,11 @@
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.pycompat24 import get_exception

# Default logging
log = logging.getLogger('kafka')
log.addHandler(logging.StreamHandler(sys.stdout))
log.setLevel(logging.INFO)

ANSIBLE_METADATA = {'metadata_version': '1.0'}


Expand Down Expand Up @@ -200,6 +210,14 @@
description:
- 'when using ssl for Kafka, content of crl file or path to cert '
- 'crl file.'
ssl_supported_protocols:
description:
- 'when using ssl for Kafka, protocols supported by kafka client '
choices: [TLSv1, TLSv1.1, TLSv1.2]
ssl_ciphers:
description:
- 'when using ssl for Kafka, available ciphers for ssl connections. ' \
'It should be a string in the OpenSSL cipher list format. '
sasl_mechanism:
description:
- 'when using sasl, whether use PLAIN or GSSAPI.'
Expand Down Expand Up @@ -562,6 +580,59 @@ class CreatePartitionsRequest_v0(Request):
)


def generate_ssl_context(ssl_check_hostname,
ssl_cafile,
ssl_certfile,
ssl_keyfile,
ssl_password,
ssl_crlfile,
ssl_supported_protocols,
ssl_ciphers):
"""
Generate SSLContext for kafka client.
"""
log.debug('Configuring default SSL Context')
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context.options |= ssl.OP_NO_SSLv2
ssl_context.options |= ssl.OP_NO_SSLv3
ssl_context.verify_mode = ssl.CERT_OPTIONAL
if ssl_supported_protocols:
if 'TLSv1' not in ssl_supported_protocols:
ssl_context.options |= ssl.OP_NO_TLSv1
if 'TLSv1.1' not in ssl_supported_protocols:
ssl_context.options |= ssl.OP_NO_TLSv1_1
if 'TLSv1.2' not in ssl_supported_protocols:
ssl_context.options |= ssl.OP_NO_TLSv1_2
if ssl_check_hostname:
ssl_context.check_hostname = True
if ssl_cafile:
log.info('Loading SSL CA from %s', ssl_cafile)
ssl_context.load_verify_locations(ssl_cafile)
ssl_context.verify_mode = ssl.CERT_REQUIRED
else:
log.info('Loading system default SSL CAs from %s',
ssl.get_default_verify_paths())
ssl_context.load_default_certs()
if ssl_certfile and ssl_keyfile:
log.info('Loading SSL Cert from %s', ssl_certfile)
log.info('Loading SSL Key from %s', ssl_keyfile)
ssl_context.load_cert_chain(
certfile=ssl_certfile,
keyfile=ssl_keyfile,
password=ssl_password)
if ssl_crlfile:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
raise RuntimeError('This version of Python does not'
' support ssl_crlfile!')
log.info('Loading SSL CRL from %s', ssl_crlfile)
ssl_context.load_verify_locations(ssl_crlfile)
ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
if ssl_ciphers:
log.info('Setting SSL Ciphers: %s', ssl_ciphers)
ssl_context.set_ciphers(ssl_ciphers)
return ssl_context


def generate_ssl_object(module, ssl_cafile, ssl_certfile, ssl_keyfile,
ssl_crlfile=None):
"""
Expand Down Expand Up @@ -1438,7 +1509,7 @@ def main():
'describe', 'describe_configs',
'idempotent_write', 'read', 'write'],
required=False),
acl_pattern_type=dict(choice=['all', 'match', 'literal',
acl_pattern_type=dict(choice=['any', 'match', 'literal',
'prefixed'],
required=False, default='literal'),

Expand Down Expand Up @@ -1529,6 +1600,13 @@ def main():

ssl_crlfile=dict(required=False, default=None, type='path'),

ssl_supported_protocols=dict(required=False, default=None,
type='list',
choices=['TLSv1', 'TLSv1.1',
'TLSv1.2']),

ssl_ciphers=dict(required=False, default=None, type='str'),

# only PLAIN is currently available
sasl_mechanism=dict(choices=['PLAIN', 'GSSAPI'], default='PLAIN'),

Expand Down Expand Up @@ -1566,6 +1644,8 @@ def main():
ssl_keyfile = params['ssl_keyfile']
ssl_password = params['ssl_password']
ssl_crlfile = params['ssl_crlfile']
ssl_supported_protocols = params['ssl_supported_protocols']
ssl_ciphers = params['ssl_ciphers']
sasl_mechanism = params['sasl_mechanism']
sasl_plain_username = params['sasl_plain_username']
sasl_plain_password = params['sasl_plain_password']
Expand Down Expand Up @@ -1602,15 +1682,24 @@ def main():
zookeeper_auth.append(auth)

try:
# Generate ssl context to support limit ssl protocols & ciphers
ssl_context = None
if security_protocol in ('SSL', 'SASL_SSL'):
ssl_context = generate_ssl_context(
ssl_check_hostname=ssl_check_hostname,
ssl_cafile=kafka_ssl_files['cafile']['path'],
ssl_certfile=kafka_ssl_files['certfile']['path'],
ssl_keyfile=kafka_ssl_files['keyfile']['path'],
ssl_password=ssl_password,
ssl_crlfile=kafka_ssl_files['crlfile']['path'],
ssl_supported_protocols=ssl_supported_protocols,
ssl_ciphers=ssl_ciphers
)

manager = KafkaManager(
module=module, bootstrap_servers=bootstrap_servers,
security_protocol=security_protocol, api_version=api_version,
ssl_check_hostname=ssl_check_hostname,
ssl_cafile=kafka_ssl_files['cafile']['path'],
ssl_certfile=kafka_ssl_files['certfile']['path'],
ssl_keyfile=kafka_ssl_files['keyfile']['path'],
ssl_password=ssl_password,
ssl_crlfile=kafka_ssl_files['crlfile']['path'],
ssl_context=ssl_context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=sasl_plain_username,
sasl_plain_password=sasl_plain_password,
Expand All @@ -1628,7 +1717,6 @@ def main():
msg='Current version of library is not compatible with '
'Kafka < 0.11.0.'
)

msg = '%s \'%s\': ' % (resource, name)

if resource == 'topic':
Expand Down

0 comments on commit bb85528

Please sign in to comment.