VMware: not ssl.SSLContext if validate_certs false (#57185)

Python < 2.7.9 does not have the ssl.SSLContext attribute.
ssl.SSLContext is only required when we want to validate the SSL
connection. If `validate_certs` is false, we don't initialize the
`ssl_context` variable.

Add unit-test coverage and a little refactoring:

- avoid the use of `mocker`, when we can push `monkeypatch` which is
  `pytest`'s default.
- use `mock.Mocker()` when possible

closes: #57072
This commit is contained in:
Gonéri Le Bouder 2019-09-09 12:11:46 -04:00 committed by Abhijeet Kasurde
parent 1f38a12057
commit 3ea8e0a144
3 changed files with 121 additions and 63 deletions

View file

@ -0,0 +1,2 @@
bugfixes:
- vmware - Ensure we can use the modules with Python < 2.7.9 or RHEL/CentOS < 7.4, this as soon as ``validate_certs`` is disabled.

View file

@ -526,12 +526,17 @@ def connect_to_api(module, disconnect_atexit=True, return_si=False):
if validate_certs and not hasattr(ssl, 'SSLContext'): if validate_certs and not hasattr(ssl, 'SSLContext'):
module.fail_json(msg='pyVim does not support changing verification mode with python < 2.7.9. Either update ' module.fail_json(msg='pyVim does not support changing verification mode with python < 2.7.9. Either update '
'python or use validate_certs=false.') 'python or use validate_certs=false.')
elif validate_certs:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
if validate_certs:
ssl_context.verify_mode = ssl.CERT_REQUIRED ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True ssl_context.check_hostname = True
ssl_context.load_default_certs() ssl_context.load_default_certs()
elif hasattr(ssl, 'SSLContext'):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context.verify_mode = ssl.CERT_NONE
ssl_context.check_hostname = False
else: # Python < 2.7.9 or RHEL/Centos < 7.4
ssl_context = None
service_instance = None service_instance = None
proxy_host = module.params.get('proxy_host') proxy_host = module.params.get('proxy_host')

View file

@ -6,12 +6,15 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
import ssl
import sys import sys
import pytest import pytest
pyvmomi = pytest.importorskip('pyVmomi') pyvmomi = pytest.importorskip('pyVmomi')
from ansible.module_utils.vmware import connect_to_api, PyVmomi from units.compat import mock
import ansible.module_utils.vmware as vmware_module_utils
test_data = [ test_data = [
@ -76,61 +79,38 @@ test_ids = [
] ]
class AnsibleModuleExit(Exception): class FailJsonException(BaseException):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class ExitJson(AnsibleModuleExit):
pass
class FailJson(AnsibleModuleExit):
pass pass
@pytest.fixture @pytest.fixture
def fake_ansible_module(): def fake_ansible_module():
return FakeAnsibleModule() ret = mock.Mock()
ret.params = test_data[3][0]
ret.tmpdir = None
ret.fail_json.side_effect = FailJsonException()
return ret
class FakeAnsibleModule: def fake_connect_to_api(module, return_si=None):
def __init__(self): return None, mock.Mock(),
self.params = {}
self.tmpdir = None
def exit_json(self, *args, **kwargs):
raise ExitJson(*args, **kwargs)
def fail_json(self, *args, **kwargs):
raise FailJson(*args, **kwargs)
def fake_connect_to_api(module, disconnect_atexit=True, return_si=False): testdata = [
class MyContent(): ('HAS_PYVMOMI', 'PyVmomi'),
customFieldsManager = None ('HAS_REQUESTS', 'requests'),
if return_si: ]
return (None, MyContent())
return MyContent()
def test_pyvmomi_lib_exists(mocker, fake_ansible_module): @pytest.mark.parametrize("key,libname", testdata)
def test_lib_loading_failure(monkeypatch, fake_ansible_module, key, libname):
""" Test if Pyvmomi is present or not""" """ Test if Pyvmomi is present or not"""
mocker.patch('ansible.module_utils.vmware.HAS_PYVMOMI', new=False) monkeypatch.setattr(vmware_module_utils, key, False)
with pytest.raises(FailJson) as exec_info: with pytest.raises(FailJsonException):
PyVmomi(fake_ansible_module) vmware_module_utils.PyVmomi(fake_ansible_module)
error_str = 'Failed to import the required Python library (%s)' % libname
assert 'Failed to import the required Python library (PyVmomi) on' in exec_info.value.kwargs['msg'] assert fake_ansible_module.fail_json.called_once()
assert error_str in fake_ansible_module.fail_json.call_args[1]['msg']
def test_requests_lib_exists(mocker, fake_ansible_module):
""" Test if requests is present or not"""
mocker.patch('ansible.module_utils.vmware.HAS_REQUESTS', new=False)
with pytest.raises(FailJson) as exec_info:
PyVmomi(fake_ansible_module)
assert 'Failed to import the required Python library (requests) on' in exec_info.value.kwargs['msg']
@pytest.mark.skipif(sys.version_info < (2, 7), reason="requires python2.7 and greater") @pytest.mark.skipif(sys.version_info < (2, 7), reason="requires python2.7 and greater")
@ -138,40 +118,111 @@ def test_requests_lib_exists(mocker, fake_ansible_module):
def test_required_params(request, params, msg, fake_ansible_module): def test_required_params(request, params, msg, fake_ansible_module):
""" Test if required params are correct or not""" """ Test if required params are correct or not"""
fake_ansible_module.params = params fake_ansible_module.params = params
with pytest.raises(FailJson) as exec_info: with pytest.raises(FailJsonException):
connect_to_api(fake_ansible_module) vmware_module_utils.connect_to_api(fake_ansible_module)
assert msg in exec_info.value.kwargs['msg'] assert fake_ansible_module.fail_json.called_once()
assert msg in fake_ansible_module.fail_json.call_args[1]['msg']
def test_validate_certs(mocker, fake_ansible_module): def test_validate_certs(monkeypatch, fake_ansible_module):
""" Test if SSL is required or not""" """ Test if SSL is required or not"""
fake_ansible_module.params = test_data[3][0] fake_ansible_module.params = test_data[3][0]
mocker.patch('ansible.module_utils.vmware.ssl', new=None) monkeypatch.setattr(vmware_module_utils, 'ssl', None)
with pytest.raises(FailJson) as exec_info: with pytest.raises(FailJsonException):
PyVmomi(fake_ansible_module) vmware_module_utils.PyVmomi(fake_ansible_module)
msg = 'pyVim does not support changing verification mode with python < 2.7.9.' \ msg = 'pyVim does not support changing verification mode with python < 2.7.9.' \
' Either update python or use validate_certs=false.' ' Either update python or use validate_certs=false.'
assert msg == exec_info.value.kwargs['msg'] assert fake_ansible_module.fail_json.called_once()
assert msg in fake_ansible_module.fail_json.call_args[1]['msg']
def test_vmdk_disk_path_split(mocker, fake_ansible_module): def test_vmdk_disk_path_split(monkeypatch, fake_ansible_module):
""" Test vmdk_disk_path_split function""" """ Test vmdk_disk_path_split function"""
fake_ansible_module.params = test_data[0][0] fake_ansible_module.params = test_data[0][0]
mocker.patch('ansible.module_utils.vmware.connect_to_api', new=fake_connect_to_api) monkeypatch.setattr(vmware_module_utils, 'connect_to_api', fake_connect_to_api)
pyv = PyVmomi(fake_ansible_module) pyv = vmware_module_utils.PyVmomi(fake_ansible_module)
v = pyv.vmdk_disk_path_split('[ds1] VM_0001/VM0001_0.vmdk') v = pyv.vmdk_disk_path_split('[ds1] VM_0001/VM0001_0.vmdk')
assert v == ('ds1', 'VM_0001/VM0001_0.vmdk', 'VM0001_0.vmdk', 'VM_0001') assert v == ('ds1', 'VM_0001/VM0001_0.vmdk', 'VM0001_0.vmdk', 'VM_0001')
def test_vmdk_disk_path_split_negative(mocker, fake_ansible_module): def test_vmdk_disk_path_split_negative(monkeypatch, fake_ansible_module):
""" Test vmdk_disk_path_split function""" """ Test vmdk_disk_path_split function"""
fake_ansible_module.params = test_data[0][0] fake_ansible_module.params = test_data[0][0]
mocker.patch('ansible.module_utils.vmware.connect_to_api', new=fake_connect_to_api) monkeypatch.setattr(vmware_module_utils, 'connect_to_api', fake_connect_to_api)
with pytest.raises(FailJson) as exec_info: with pytest.raises(FailJsonException):
pyv = PyVmomi(fake_ansible_module) pyv = vmware_module_utils.PyVmomi(fake_ansible_module)
pyv.vmdk_disk_path_split('[ds1]') pyv.vmdk_disk_path_split('[ds1]')
assert fake_ansible_module.fail_json.called_once()
assert 'Bad path' in fake_ansible_module.fail_json.call_args[1]['msg']
assert 'Bad path' in exec_info.value.kwargs['msg']
@pytest.mark.skipif(sys.version_info < (2, 7), reason="requires python2.7 and greater")
def test_connect_to_api_validate_certs(monkeypatch, fake_ansible_module):
monkeypatch.setattr(vmware_module_utils, 'connect', mock.Mock())
def MockSSLContext(proto):
ssl_context.proto = proto
return ssl_context
# New Python with SSLContext + validate_certs=True
vmware_module_utils.connect.reset_mock()
ssl_context = mock.Mock()
monkeypatch.setattr(vmware_module_utils.ssl, 'SSLContext', MockSSLContext)
fake_ansible_module.params['validate_certs'] = True
vmware_module_utils.connect_to_api(fake_ansible_module)
assert ssl_context.proto == ssl.PROTOCOL_SSLv23
assert ssl_context.verify_mode == ssl.CERT_REQUIRED
assert ssl_context.check_hostname is True
vmware_module_utils.connect.SmartConnect.assert_called_once_with(
host='esxi1',
port=443,
pwd='Esxi@123$%',
user='Administrator@vsphere.local',
sslContext=ssl_context)
# New Python with SSLContext + validate_certs=False
vmware_module_utils.connect.reset_mock()
ssl_context = mock.Mock()
monkeypatch.setattr(vmware_module_utils.ssl, 'SSLContext', MockSSLContext)
fake_ansible_module.params['validate_certs'] = False
vmware_module_utils.connect_to_api(fake_ansible_module)
assert ssl_context.proto == ssl.PROTOCOL_SSLv23
assert ssl_context.verify_mode == ssl.CERT_NONE
assert ssl_context.check_hostname is False
vmware_module_utils.connect.SmartConnect.assert_called_once_with(
host='esxi1',
port=443,
pwd='Esxi@123$%',
user='Administrator@vsphere.local',
sslContext=ssl_context)
# Old Python with no SSLContext + validate_certs=True
vmware_module_utils.connect.reset_mock()
ssl_context = mock.Mock()
ssl_context.proto = None
monkeypatch.delattr(vmware_module_utils.ssl, 'SSLContext')
fake_ansible_module.params['validate_certs'] = True
with pytest.raises(FailJsonException):
vmware_module_utils.connect_to_api(fake_ansible_module)
assert ssl_context.proto is None
fake_ansible_module.fail_json.assert_called_once_with(msg=(
'pyVim does not support changing verification mode with python '
'< 2.7.9. Either update python or use validate_certs=false.'))
assert not vmware_module_utils.connect.SmartConnect.called
# Old Python with no SSLContext + validate_certs=False
vmware_module_utils.connect.reset_mock()
ssl_context = mock.Mock()
ssl_context.proto = None
monkeypatch.delattr(vmware_module_utils.ssl, 'SSLContext', raising=False)
fake_ansible_module.params['validate_certs'] = False
vmware_module_utils.connect_to_api(fake_ansible_module)
assert ssl_context.proto is None
vmware_module_utils.connect.SmartConnect.assert_called_once_with(
host='esxi1',
port=443,
pwd='Esxi@123$%',
user='Administrator@vsphere.local')