From 1ce2bf56a2161a0e92c21792dc6853fb07c3031d Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 3 Aug 2017 13:27:17 +0200 Subject: [PATCH] crypto/openssl_*: Standardize implementaton and add support keyUsage, extenededKeyUsage (#27281) * openssl_csr: make subjectAltNames a list * csr module now uses the new standard way to build openssl crypto modules * add check functions for subject and subjectAltNames * added support for keyUsage and extendedKeyUsage * check if CSR signature is correct (aka the privatekey belongs to the CSR) * fixes for first PR review * fixes for second PR review * openssl_csr: there is no need to pass on privatekey as it can be accessed directly * openssl_csr: documentation fixes --- lib/ansible/module_utils/crypto.py | 33 ++++ lib/ansible/modules/crypto/openssl_csr.py | 209 ++++++++++++++++------ 2 files changed, 192 insertions(+), 50 deletions(-) diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index b29e44cc0b..711e7b856d 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -84,6 +84,39 @@ def load_certificate(path): raise OpenSSLObjectError(exc) +def load_certificate_request(path): + """Load the specified certificate signing request.""" + + try: + csr_content = open(path, 'rb').read() + csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_content) + return csr + except (IOError, OSError) as exc: + raise OpenSSLObjectError(exc) + + +keyUsageLong = { + "digitalSignature": "Digital Signature", + "nonRepudiation": "Non Repudiation", + "keyEncipherment": "Key Encipherment", + "dataEncipherment": "Data Encipherment", + "keyAgreement": "Key Agreement", + "keyCertSign": "Certificate Sign", + "cRLSign": "CRL Sign", + "encipherOnly": "Encipher Only", + "decipherOnly": "Decipher Only", +} + +extendedKeyUsageLong = { + "serverAuth": "TLS Web Server Authentication", + "clientAuth": "TLS Web Client Authentication", + "codeSigning": "Code Signing", + "emailProtection": "E-mail Protection", + "timeStamping": "Time Stamping", + "OCSPSigning": "OCSP Signing", +} + + @six.add_metaclass(abc.ABCMeta) class OpenSSLObject(object): diff --git a/lib/ansible/modules/crypto/openssl_csr.py b/lib/ansible/modules/crypto/openssl_csr.py index aeb2e75fd9..b92bf4421b 100644 --- a/lib/ansible/modules/crypto/openssl_csr.py +++ b/lib/ansible/modules/crypto/openssl_csr.py @@ -21,9 +21,10 @@ version_added: "2.4" short_description: Generate OpenSSL Certificate Signing Request (CSR) description: - "This module allows one to (re)generates OpenSSL certificate signing requests. - It uses the pyOpenSSL python library to interact with openssl. This module support - the subjectAltName extension. Note: At least one of commonName or subjectAltName must - be specified. This module uses file common arguments to specify generated file permissions." + It uses the pyOpenSSL python library to interact with openssl. This module supports + the subjectAltName as well as the keyUsage and extendedKeyUsage extensions. + Note: At least one of commonName or subjectAltName must be specified. + This module uses file common arguments to specify generated file permissions." requirements: - "python-pyOpenSSL" options: @@ -62,10 +63,6 @@ options: required: true description: - Name of the folder in which the generated OpenSSL certificate signing request will be written - subjectAltName: - required: false - description: - - SAN extension to attach to the certificate signing request countryName: required: false aliases: [ 'C' ] @@ -101,6 +98,29 @@ options: aliases: [ 'E' ] description: - emailAddress field of the certificate signing request subject + subjectAltName: + required: false + description: + - SAN extension to attach to the certificate signing request + - This can either be a 'comma separated string' or a YAML list. + keyUsage: + required: false + description: + - This defines the purpose (e.g. encipherment, signature, certificate signing) + of the key contained in the certificate. + - This can either be a 'comma separated string' or a YAML list. + extendedKeyUsage: + required: false + aliases: [ 'extKeyUsage' ] + description: + - Additional restrictions (e.g. client authentication, server authentication) + on the allowed purposes for which the public key may be used. + - This can either be a 'comma separated string' or a YAML list. + +notes: + - "If the certificate signing request already exists it will be checked whether subjectAltName, + keyUsage and extendedKeyUsage only contain the requested values and if the request was signed + by the given private key" ''' @@ -140,11 +160,27 @@ EXAMPLES = ''' privatekey_path: /etc/ssl/private/ansible.com.pem force: True commonName: www.ansible.com + +# Generate an OpenSSL Certificate Signing Request with special key usages +- openssl_csr: + path: /etc/ssl/csr/www.ansible.com.csr + privatekey_path: /etc/ssl/private/ansible.com.pem + commonName: www.ansible.com + keyUsage: + - digitlaSignature + - keyAgreement + extKeyUsage: + - clientAuth ''' RETURN = ''' -csr: +privatekey: + description: Path to the TLS/SSL private key the CSR was generated for + returned: changed or success + type: string + sample: /etc/ssl/private/ansible.com.pem +filename: description: Path to the generated Certificate Signing Request returned: changed or success type: string @@ -157,13 +193,26 @@ subject: subjectAltName: description: The alternative names this CSR is valid for returned: changed or success - type: string - sample: 'DNS:www.ansible.com,DNS:m.ansible.com' + type: list + sample: [ 'DNS:www.ansible.com', 'DNS:m.ansible.com' ] +keyUsage: + description: Purpose for which the public key may be used + returned: changed or success + type: list + sample: [ 'digitalSignature', 'keyAgreement' ] +extendedKeyUsage: + description: Additional restriction on the public key purposes + returned: changed or success + type: list + sample: [ 'clientAuth' ] ''' -import errno import os +from ansible.module_utils import crypto as crypto_utils +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native + try: from OpenSSL import crypto except ImportError: @@ -171,27 +220,27 @@ except ImportError: else: pyopenssl_found = True -from ansible.module_utils import crypto as crypto_utils -from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils._text import to_native - -class CertificateSigningRequestError(Exception): +class CertificateSigningRequestError(crypto_utils.OpenSSLObjectError): pass -class CertificateSigningRequest(object): +class CertificateSigningRequest(crypto_utils.OpenSSLObject): def __init__(self, module): - self.state = module.params['state'] + super(CertificateSigningRequest, self).__init__( + module.params['path'], + module.params['state'], + module.params['force'], + module.check_mode + ) self.digest = module.params['digest'] - self.force = module.params['force'] - self.subjectAltName = module.params['subjectAltName'] - self.path = module.params['path'] self.privatekey_path = module.params['privatekey_path'] self.privatekey_passphrase = module.params['privatekey_passphrase'] self.version = module.params['version'] - self.changed = True + self.subjectAltName = module.params['subjectAltName'] + self.keyUsage = module.params['keyUsage'] + self.extendedKeyUsage = module.params['extendedKeyUsage'] self.request = None self.privatekey = None @@ -205,15 +254,15 @@ class CertificateSigningRequest(object): 'emailAddress': module.params['emailAddress'], } - if self.subjectAltName is None: - self.subjectAltName = 'DNS:%s' % self.subject['CN'] + if not self.subjectAltName: + self.subjectAltName = ['DNS:%s' % self.subject['CN']] self.subject = dict((k, v) for k, v in self.subject.items() if v) def generate(self, module): '''Generate the certificate signing request.''' - if not os.path.exists(self.path) or self.force: + if not self.check(module, perms_required=False) or self.force: req = crypto.X509Req() req.set_version(self.version) subject = req.get_subject() @@ -221,13 +270,18 @@ class CertificateSigningRequest(object): if value is not None: setattr(subject, key, value) - if self.subjectAltName is not None: - req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))]) + altnames = ', '.join(self.subjectAltName) + extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))] - self.privatekey = crypto_utils.load_privatekey( - self.privatekey_path, - self.privatekey_passphrase - ) + if self.keyUsage: + usages = ', '.join(self.keyUsage) + extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii'))) + + if self.extendedKeyUsage: + usages = ', '.join(self.extendedKeyUsage) + extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii'))) + + req.add_extensions(extensions) req.set_pubkey(self.privatekey) req.sign(self.privatekey, self.digest) @@ -239,31 +293,86 @@ class CertificateSigningRequest(object): csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) - else: - self.changed = False + + self.changed = True file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): self.changed = True - def remove(self): - '''Remove the Certificate Signing Request.''' + def check(self, module, perms_required=True): + """Ensure the resource is in its desired state.""" + state_and_perms = super(CertificateSigningRequest, self).check(module, perms_required) - try: - os.remove(self.path) - except OSError as exc: - if exc.errno != errno.ENOENT: - raise CertificateSigningRequestError(exc) + self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase) + + def _check_subject(csr): + subject = csr.get_subject() + for (key, value) in self.subject.items(): + if getattr(subject, key, None) != value: + return False + + return True + + def _check_subjectAltName(extensions): + altnames_ext = next((ext.__str__() for ext in extensions if ext.get_short_name() == b'subjectAltName'), '') + altnames = [altname.strip() for altname in altnames_ext.split(',')] + # apperently openssl returns 'IP address' not 'IP' as specifier when converting the subjectAltName to string + # although it won't accept this specifier when generating the CSR. (https://github.com/openssl/openssl/issues/4004) + altnames = [name if not name.startswith('IP Address:') else "IP:" + name.split(':', 1)[1] for name in altnames] + if self.subjectAltName: + if set(altnames) != set(self.subjectAltName): + return False else: - self.changed = False + if altnames: + return False + + return True + + def _check_keyUsage_(extensions, extName, expected, long): + usages_ext = [str(ext) for ext in extensions if ext.get_short_name() == extName] + if (not usages_ext and expected) or (usages_ext and not expected): + return False + elif not usages_ext and not expected: + return True + else: + current = [usage.strip() for usage in usages_ext[0].split(',')] + expected = [long[usage] if usage in long else usage for usage in expected] + return current == expected + + def _check_keyUsage(extensions): + return _check_keyUsage_(extensions, b'keyUsage', self.keyUsage, crypto_utils.keyUsageLong) + + def _check_extenededKeyUsage(extensions): + return _check_keyUsage_(extensions, b'extendedKeyUsage', self.extendedKeyUsage, crypto_utils.extendedKeyUsageLong) + + def _check_extensions(csr): + extensions = csr.get_extensions() + return _check_subjectAltName(extensions) and _check_keyUsage(extensions) and _check_extenededKeyUsage(extensions) + + def _check_signature(csr): + try: + return csr.verify(self.privatekey) + except crypto.Error: + return False + + if not state_and_perms: + return False + + csr = crypto_utils.load_certificate_request(self.path) + + return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) def dump(self): '''Serialize the object into a dictionary.''' result = { - 'csr': self.path, + 'privatekey': self.privatekey_path, + 'filename': self.path, 'subject': self.subject, 'subjectAltName': self.subjectAltName, + 'keyUsage': self.keyUsage, + 'extendedKeyUsage': self.extendedKeyUsage, 'changed': self.changed } @@ -279,7 +388,6 @@ def main(): privatekey_passphrase=dict(type='str', no_log=True), version=dict(default='3', type='int'), force=dict(default=False, type='bool'), - subjectAltName=dict(aliases=['subjectAltName'], type='str'), path=dict(required=True, type='path'), countryName=dict(aliases=['C'], type='str'), stateOrProvinceName=dict(aliases=['ST'], type='str'), @@ -288,6 +396,9 @@ def main(): organizationalUnitName=dict(aliases=['OU'], type='str'), commonName=dict(aliases=['CN'], type='str'), emailAddress=dict(aliases=['E'], type='str'), + subjectAltName=dict(type='list'), + keyUsage=dict(type='list'), + extendedKeyUsage=dict(aliases=['extKeyUsage'], type='list'), ), add_file_common_args=True, supports_check_mode=True, @@ -297,11 +408,9 @@ def main(): if not pyopenssl_found: module.fail_json(msg='the python pyOpenSSL module is required') - path = module.params['path'] base_dir = os.path.dirname(module.params['path']) - if not os.path.isdir(base_dir): - module.fail_json(name=path, msg='The directory %s does not exist' % path) + module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir) csr = CertificateSigningRequest(module) @@ -309,24 +418,24 @@ def main(): if module.check_mode: result = csr.dump() - result['changed'] = module.params['force'] or not os.path.exists(path) + result['changed'] = module.params['force'] or not csr.check(module) module.exit_json(**result) try: csr.generate(module) - except CertificateSigningRequestError as exc: + except (CertificateSigningRequestError, crypto_utils.OpenSSLObjectError) as exc: module.fail_json(msg=to_native(exc)) else: if module.check_mode: result = csr.dump() - result['changed'] = os.path.exists(path) + result['changed'] = os.path.exists(module.params['path']) module.exit_json(**result) try: csr.remove() - except CertificateSigningRequestError as exc: + except (CertificateSigningRequestError, crypto_utils.OpenSSLObjectError) as exc: module.fail_json(msg=to_native(exc)) result = csr.dump()