diff --git a/lib/ansible/plugins/strategies/__init__.py b/lib/ansible/plugins/strategies/__init__.py index 7cc1709e08..e933ca73d4 100644 --- a/lib/ansible/plugins/strategies/__init__.py +++ b/lib/ansible/plugins/strategies/__init__.py @@ -236,8 +236,6 @@ class StrategyBase: debug("waiting for pending results (%d left)" % self._pending_results) results = self._process_pending_results(iterator) ret_results.extend(results) - if self._tqm._terminated: - break time.sleep(0.01) return ret_results @@ -336,63 +334,6 @@ class StrategyBase: return block_list - def cleanup(self, iterator, connection_info): - ''' - Iterates through failed hosts and runs any outstanding rescue/always blocks - and handlers which may still need to be run after a failure. - ''' - - debug("in cleanup") - result = True - - debug("getting failed hosts") - failed_hosts = self.get_failed_hosts(iterator._play) - if len(failed_hosts) == 0: - debug("there are no failed hosts") - return result - - debug("marking hosts failed in the iterator") - # mark the host as failed in the iterator so it will take - # any required rescue paths which may be outstanding - for host in failed_hosts: - iterator.mark_host_failed(host) - - debug("clearing the failed hosts list") - # clear the failed hosts dictionary now while also - for entry in self._tqm._failed_hosts.keys(): - del self._tqm._failed_hosts[entry] - - work_to_do = True - while work_to_do: - work_to_do = False - for host in failed_hosts: - host_name = host.name - - if host_name in self._tqm._failed_hosts: - iterator.mark_host_failed(host) - del self._tqm._failed_hosts[host_name] - - if host_name in self._blocked_hosts: - work_to_do = True - continue - elif iterator.get_next_task_for_host(host, peek=True) and host_name not in self._tqm._unreachable_hosts: - work_to_do = True - - # pop the task, mark the host blocked, and queue it - self._blocked_hosts[host_name] = True - task = iterator.get_next_task_for_host(host) - task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - self._tqm.send_callback('v2_playbook_on_cleanup_task_start', task) - self._queue_task(host, task, task_vars, connection_info) - - self._process_pending_results(iterator) - time.sleep(0.01) - - # no more work, wait until the queue is drained - self._wait_on_pending_results(iterator) - - return result - def run_handlers(self, iterator, connection_info): ''' Runs handlers on those hosts which have been notified. diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 36e22a9719..7d8cb42ee6 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -22,12 +22,15 @@ __metaclass__ = type from ansible.compat.tests import unittest from ansible.compat.tests.mock import patch, MagicMock +from ansible.errors import AnsibleError, AnsibleParserError from ansible.plugins.strategies import StrategyBase from ansible.executor.task_queue_manager import TaskQueueManager +from ansible.executor.task_result import TaskResult +from six.moves import queue as Queue from units.mock.loader import DictDataLoader -class TestVariableManager(unittest.TestCase): +class TestStrategyBase(unittest.TestCase): def setUp(self): pass @@ -125,3 +128,228 @@ class TestVariableManager(unittest.TestCase): self.assertEqual(strategy_base._cur_worker, 1) self.assertEqual(strategy_base._pending_results, 3) + def test_strategy_base_process_pending_results(self): + mock_tqm = MagicMock() + mock_tqm._terminated = False + mock_tqm._failed_hosts = dict() + mock_tqm._unreachable_hosts = dict() + mock_tqm.send_callback.return_value = None + + queue_items = [] + def _queue_empty(*args, **kwargs): + return len(queue_items) == 0 + def _queue_get(*args, **kwargs): + if len(queue_items) == 0: + raise Queue.Empty + else: + return queue_items.pop() + + mock_queue = MagicMock() + mock_queue.empty.side_effect = _queue_empty + mock_queue.get.side_effect = _queue_get + mock_tqm._final_q = mock_queue + + mock_tqm._stats = MagicMock() + mock_tqm._stats.increment.return_value = None + + mock_iterator = MagicMock() + mock_iterator.mark_host_failed.return_value = None + + mock_host = MagicMock() + mock_host.name = 'test01' + mock_host.vars = dict() + + mock_task = MagicMock() + mock_task._role = None + mock_task.ignore_errors = False + + mock_group = MagicMock() + mock_group.add_host.return_value = None + + def _get_host(host_name): + if host_name == 'test01': + return mock_host + return None + def _get_group(group_name): + if group_name in ('all', 'foo'): + return mock_group + return None + + mock_inventory = MagicMock() + mock_inventory._hosts_cache = dict() + mock_inventory.get_host.side_effect = _get_host + mock_inventory.get_group.side_effect = _get_group + mock_inventory.clear_pattern_cache.return_value = None + + mock_var_mgr = MagicMock() + mock_var_mgr.set_host_variable.return_value = None + mock_var_mgr.set_host_facts.return_value = None + + strategy_base = StrategyBase(tqm=mock_tqm) + strategy_base._inventory = mock_inventory + strategy_base._variable_manager = mock_var_mgr + strategy_base._blocked_hosts = dict() + strategy_base._notified_handlers = dict() + + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + + task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True)) + queue_items.append(('host_task_ok', task_result)) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 1) + self.assertEqual(results[0], task_result) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) + + task_result = TaskResult(host=mock_host, task=mock_task, return_data='{"failed":true}') + queue_items.append(('host_task_failed', task_result)) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 1) + self.assertEqual(results[0], task_result) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) + self.assertIn('test01', mock_tqm._failed_hosts) + del mock_tqm._failed_hosts['test01'] + + task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') + queue_items.append(('host_unreachable', task_result)) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 1) + self.assertEqual(results[0], task_result) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) + self.assertIn('test01', mock_tqm._unreachable_hosts) + del mock_tqm._unreachable_hosts['test01'] + + task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}') + queue_items.append(('host_task_skipped', task_result)) + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 + results = strategy_base._wait_on_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 1) + self.assertEqual(results[0], task_result) + self.assertEqual(strategy_base._pending_results, 0) + self.assertNotIn('test01', strategy_base._blocked_hosts) + + strategy_base._blocked_hosts['test01'] = True + strategy_base._pending_results = 1 + + queue_items.append(('add_host', dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + self.assertEqual(strategy_base._pending_results, 1) + self.assertIn('test01', strategy_base._blocked_hosts) + + queue_items.append(('add_group', mock_host, dict(add_group=dict(group_name='foo')))) + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + self.assertEqual(strategy_base._pending_results, 1) + self.assertIn('test01', strategy_base._blocked_hosts) + + queue_items.append(('notify_handler', mock_host, 'test handler')) + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + self.assertEqual(strategy_base._pending_results, 1) + self.assertIn('test01', strategy_base._blocked_hosts) + self.assertIn('test handler', strategy_base._notified_handlers) + self.assertIn(mock_host, strategy_base._notified_handlers['test handler']) + + queue_items.append(('set_host_var', mock_host, 'foo', 'bar')) + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + self.assertEqual(strategy_base._pending_results, 1) + + queue_items.append(('set_host_facts', mock_host, 'foo', dict())) + results = strategy_base._process_pending_results(iterator=mock_iterator) + self.assertEqual(len(results), 0) + self.assertEqual(strategy_base._pending_results, 1) + + queue_items.append(('bad')) + self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator) + + def test_strategy_base_load_included_file(self): + fake_loader = DictDataLoader({ + "test.yml": """ + - debug: msg='foo' + """, + "bad.yml": """ + """, + }) + + mock_tqm = MagicMock() + mock_tqm._final_q = MagicMock() + + strategy_base = StrategyBase(tqm=mock_tqm) + strategy_base._loader = fake_loader + + mock_play = MagicMock() + + mock_block = MagicMock() + mock_block._play = mock_play + mock_block.vars = dict() + + mock_task = MagicMock() + mock_task._block = mock_block + mock_task._role = None + + mock_inc_file = MagicMock() + mock_inc_file._task = mock_task + + mock_inc_file._filename = "test.yml" + res = strategy_base._load_included_file(included_file=mock_inc_file) + + mock_inc_file._filename = "bad.yml" + self.assertRaises(AnsibleParserError, strategy_base._load_included_file, included_file=mock_inc_file) + + def test_strategy_base_run_handlers(self): + workers = [] + for i in range(0, 3): + worker_main_q = MagicMock() + worker_main_q.put.return_value = None + worker_result_q = MagicMock() + workers.append([i, worker_main_q, worker_result_q]) + + mock_tqm = MagicMock() + mock_tqm._final_q = MagicMock() + mock_tqm.get_workers.return_value = workers + mock_tqm.send_callback.return_value = None + + mock_conn_info = MagicMock() + + mock_handler_task = MagicMock() + mock_handler_task.get_name.return_value = "test handler" + mock_handler_task.has_triggered.return_value = False + + mock_handler = MagicMock() + mock_handler.block = [mock_handler_task] + mock_handler.flag_for_host.return_value = False + + mock_play = MagicMock() + mock_play.handlers = [mock_handler] + + mock_host = MagicMock() + mock_host.name = "test01" + + mock_iterator = MagicMock() + + mock_inventory = MagicMock() + mock_inventory.get_hosts.return_value = [mock_host] + + mock_var_mgr = MagicMock() + mock_var_mgr.get_vars.return_value = dict() + + mock_iterator = MagicMock + mock_iterator._play = mock_play + + strategy_base = StrategyBase(tqm=mock_tqm) + strategy_base._inventory = mock_inventory + strategy_base._notified_handlers = {"test handler": [mock_host]} + + result = strategy_base.run_handlers(iterator=mock_iterator, connection_info=mock_conn_info)