kubevirt: enable/update tests + fix merge_dicts() (#57685)
* Actually run the unit tests and separate them into two files * Re-add recursion to merge_dicts() * Update tests to work with current code
This commit is contained in:
parent
00604d3f2c
commit
51add5aa79
3 changed files with 53 additions and 50 deletions
|
@ -134,6 +134,14 @@ class KubeVirtRawModule(KubernetesRawModule):
|
|||
merging_dicts can be a dict or a list or tuple of dicts. In the latter case, the
|
||||
dictionaries at the front of the list have higher precedence over the ones at the end.
|
||||
"""
|
||||
def _deepupdate(D, E):
|
||||
for k in E:
|
||||
if isinstance(E[k], dict) and isinstance(D.get(k), dict):
|
||||
D[k] = _deepupdate(D[k], E[k])
|
||||
else:
|
||||
D[k] = E[k]
|
||||
return D
|
||||
|
||||
if not merging_dicts:
|
||||
merging_dicts = ({},)
|
||||
|
||||
|
@ -142,9 +150,9 @@ class KubeVirtRawModule(KubernetesRawModule):
|
|||
|
||||
new_dict = {}
|
||||
for d in reversed(merging_dicts):
|
||||
new_dict.update(d)
|
||||
_deepupdate(new_dict, d)
|
||||
|
||||
new_dict.update(base_dict)
|
||||
_deepupdate(new_dict, base_dict)
|
||||
|
||||
return new_dict
|
||||
|
||||
|
|
25
test/units/module_utils/test_kubevirt.py
Normal file
25
test/units/module_utils/test_kubevirt.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import json
|
||||
import pytest
|
||||
|
||||
from ansible.module_utils import kubevirt as mymodule
|
||||
|
||||
|
||||
def test_simple_merge_dicts():
|
||||
dict1 = {'labels': {'label1': 'value'}}
|
||||
dict2 = {'labels': {'label2': 'value'}}
|
||||
dict3 = json.dumps({'labels': {'label1': 'value', 'label2': 'value'}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtRawModule.merge_dicts(dict1, dict2)), sort_keys=True)
|
||||
|
||||
|
||||
def test_simple_multi_merge_dicts():
|
||||
dict1 = {'labels': {'label1': 'value', 'label3': 'value'}}
|
||||
dict2 = {'labels': {'label2': 'value'}}
|
||||
dict3 = json.dumps({'labels': {'label1': 'value', 'label2': 'value', 'label3': 'value'}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtRawModule.merge_dicts(dict1, dict2)), sort_keys=True)
|
||||
|
||||
|
||||
def test_double_nested_merge_dicts():
|
||||
dict1 = {'metadata': {'labels': {'label1': 'value', 'label3': 'value'}}}
|
||||
dict2 = {'metadata': {'labels': {'label2': 'value'}}}
|
||||
dict3 = json.dumps({'metadata': {'labels': {'label1': 'value', 'label2': 'value', 'label3': 'value'}}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtRawModule.merge_dicts(dict1, dict2)), sort_keys=True)
|
|
@ -7,14 +7,15 @@ from ansible.module_utils import basic
|
|||
from ansible.module_utils._text import to_bytes
|
||||
from ansible.module_utils.k8s.common import K8sAnsibleMixin
|
||||
from ansible.module_utils.k8s.raw import KubernetesRawModule
|
||||
from ansible.module_utils.kubevirt import KubeVirtRawModule
|
||||
|
||||
from ansible.modules.cloud.kubevirt import kubevirt_vm as mymodule
|
||||
|
||||
openshiftdynamic = pytest.importorskip("openshift.dynamic", minversion="0.6.2")
|
||||
helpexceptions = pytest.importorskip("openshift.helper.exceptions", minversion="0.6.2")
|
||||
openshiftdynamic = pytest.importorskip("openshift.dynamic")
|
||||
helpexceptions = pytest.importorskip("openshift.helper.exceptions")
|
||||
|
||||
KIND = 'VirtulMachine'
|
||||
RESOURCE_DEFAULT_ARGS = {'api_version': 'v1', 'group': 'kubevirt.io',
|
||||
RESOURCE_DEFAULT_ARGS = {'api_version': 'v1alpha3', 'group': 'kubevirt.io',
|
||||
'prefix': 'apis', 'namespaced': True}
|
||||
|
||||
|
||||
|
@ -56,7 +57,7 @@ def fail_json(*args, **kwargs):
|
|||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_mixtures(self, monkeypatch):
|
||||
def setup_mixtures(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
KubernetesRawModule, "exit_json", exit_json)
|
||||
monkeypatch.setattr(
|
||||
|
@ -72,12 +73,14 @@ def setup_mixtures(self, monkeypatch):
|
|||
K8sAnsibleMixin.get_api_client = MagicMock()
|
||||
K8sAnsibleMixin.get_api_client.return_value = None
|
||||
K8sAnsibleMixin.find_resource = MagicMock()
|
||||
KubeVirtRawModule.find_supported_resource = MagicMock()
|
||||
|
||||
|
||||
def test_vm_multus_creation(self):
|
||||
def test_vm_multus_creation():
|
||||
# Desired state:
|
||||
args = dict(
|
||||
state='present', name='testvm',
|
||||
namespace='vms', api_version='v1',
|
||||
namespace='vms',
|
||||
interfaces=[
|
||||
{'bridge': {}, 'name': 'default', 'network': {'pod': {}}},
|
||||
{'bridge': {}, 'name': 'mynet', 'network': {'multus': {'networkName': 'mynet'}}},
|
||||
|
@ -86,68 +89,35 @@ def test_vm_multus_creation(self):
|
|||
)
|
||||
set_module_args(args)
|
||||
|
||||
# State as "returned" by the "k8s cluster":
|
||||
openshiftdynamic.Resource.get.return_value = None
|
||||
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
|
||||
K8sAnsibleMixin.find_resource.return_value = openshiftdynamic.Resource(**resource_args)
|
||||
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
|
||||
|
||||
# Actual test:
|
||||
with pytest.raises(AnsibleExitJson) as result:
|
||||
mymodule.KubeVirtVM().execute_module()
|
||||
assert result.value['changed']
|
||||
assert result.value['result']['method'] == 'create'
|
||||
assert result.value['method'] == 'create'
|
||||
|
||||
|
||||
@pytest.mark.parametrize("_wait", (False, True))
|
||||
def test_resource_absent(self, _wait):
|
||||
def test_resource_absent(_wait):
|
||||
# Desired state:
|
||||
args = dict(
|
||||
state='absent', name='testvmi',
|
||||
namespace='vms', api_version='v1',
|
||||
namespace='vms',
|
||||
wait=_wait,
|
||||
)
|
||||
set_module_args(args)
|
||||
|
||||
# State as "returned" by the "k8s cluster":
|
||||
openshiftdynamic.Resource.get.return_value = None
|
||||
resource_args = dict(kind=KIND, **RESOURCE_DEFAULT_ARGS)
|
||||
K8sAnsibleMixin.find_resource.return_value = openshiftdynamic.Resource(**resource_args)
|
||||
KubeVirtRawModule.find_supported_resource.return_value = openshiftdynamic.Resource(**resource_args)
|
||||
|
||||
# Actual test:
|
||||
with pytest.raises(AnsibleExitJson) as result:
|
||||
mymodule.KubeVirtVM().execute_module()
|
||||
assert result.value['result']['method'] == 'delete'
|
||||
|
||||
|
||||
@patch('openshift.watch.Watch')
|
||||
def test_stream_creation(self, mock_watch):
|
||||
# Desired state:
|
||||
args = dict(
|
||||
state='running', name='testvmi', namespace='vms',
|
||||
api_version='v1', wait=True,
|
||||
)
|
||||
set_module_args(args)
|
||||
|
||||
# Actual test:
|
||||
mock_watch.side_effect = helpexceptions.KubernetesException("Test", value=42)
|
||||
with pytest.raises(AnsibleFailJson):
|
||||
mymodule.KubeVirtVM().execute_module()
|
||||
|
||||
|
||||
def test_simple_merge_dicts(self):
|
||||
dict1 = {'labels': {'label1': 'value'}}
|
||||
dict2 = {'labels': {'label2': 'value'}}
|
||||
dict3 = json.dumps({'labels': {'label1': 'value', 'label2': 'value'}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtVM.merge_dicts(dict1, dict2)), sort_keys=True)
|
||||
|
||||
|
||||
def test_simple_multi_merge_dicts(self):
|
||||
dict1 = {'labels': {'label1': 'value', 'label3': 'value'}}
|
||||
dict2 = {'labels': {'label2': 'value'}}
|
||||
dict3 = json.dumps({'labels': {'label1': 'value', 'label2': 'value', 'label3': 'value'}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtVM.merge_dicts(dict1, dict2)), sort_keys=True)
|
||||
|
||||
|
||||
def test_double_nested_merge_dicts(self):
|
||||
dict1 = {'metadata': {'labels': {'label1': 'value', 'label3': 'value'}}}
|
||||
dict2 = {'metadata': {'labels': {'label2': 'value'}}}
|
||||
dict3 = json.dumps({'metadata': {'labels': {'label1': 'value', 'label2': 'value', 'label3': 'value'}}}, sort_keys=True)
|
||||
assert dict3 == json.dumps(dict(mymodule.KubeVirtVM.merge_dicts(dict1, dict2)), sort_keys=True)
|
||||
assert result.value['method'] == 'delete'
|
||||
assert not result.value['kubevirt_vm']
|
||||
|
|
Loading…
Reference in a new issue