add _load_params debug overrides for module args/file passed on cmdline

Updated python module wrapper explode method to drop 'args' file next to module.
Both execute() and excommunicate() debug methods now pass the module args via file to enable debuggers that are picky about stdin.
Updated unit tests to use a context manager for masking/restoring default streams and argv.
This commit is contained in:
nitzmahone 2016-04-14 09:06:38 -07:00
parent 6322ed833e
commit 5b336832af
10 changed files with 358 additions and 448 deletions

View file

@ -142,6 +142,7 @@ def debug(command, zipped_mod, json_params):
# Okay to use __file__ here because we're running from a kept file
basedir = os.path.abspath(os.path.dirname(__file__))
args_path = os.path.join(basedir, 'args')
if command == 'explode':
# transform the ZIPDATA into an exploded directory of code and then
# print the path to the code. This is an easy way for people to look
@ -163,6 +164,11 @@ def debug(command, zipped_mod, json_params):
f.write(z.read(filename))
f.close()
# write the args file
f = open(args_path, 'w')
f.write(json_params)
f.close()
print('Module expanded into:')
print('%%s' %% os.path.join(basedir, 'ansible'))
exitcode = 0
@ -171,7 +177,29 @@ def debug(command, zipped_mod, json_params):
# Execute the exploded code instead of executing the module from the
# embedded ZIPDATA. This allows people to easily run their modified
# code on the remote machine to see how changes will affect it.
exitcode = invoke_module(os.path.join(basedir, 'ansible_module_%(ansible_module)s.py'), basedir, json_params)
# This differs slightly from default Ansible execution of Python modules
# as it passes the arguments to the module via a file instead of stdin.
pythonpath = os.environ.get('PYTHONPATH')
if pythonpath:
os.environ['PYTHONPATH'] = ':'.join((basedir, pythonpath))
else:
os.environ['PYTHONPATH'] = basedir
p = subprocess.Popen(['%(interpreter)s', 'ansible_module_%(ansible_module)s.py', args_path], env=os.environ, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
(stdout, stderr) = p.communicate()
if not isinstance(stderr, (bytes, unicode)):
stderr = stderr.read()
if not isinstance(stdout, (bytes, unicode)):
stdout = stdout.read()
if PY3:
sys.stderr.buffer.write(stderr)
sys.stdout.buffer.write(stdout)
else:
sys.stderr.write(stderr)
sys.stdout.write(stdout)
return p.returncode
elif command == 'excommunicate':
# This attempts to run the module in-process (by importing a main
@ -182,8 +210,9 @@ def debug(command, zipped_mod, json_params):
# when using this that are only artifacts of how we're invoking here,
# not actual bugs (as they don't affect the real way that we invoke
# ansible modules)
sys.stdin = IOStream(json_params)
sys.path.insert(0, basedir)
# stub the
sys.argv = ['%(ansible_module)s', args_path]
from ansible_module_%(ansible_module)s import main
main()
print('WARNING: Module returned to wrapper instead of exiting')

View file

@ -1435,11 +1435,23 @@ class AnsibleModule(object):
def _load_params(self):
''' read the input and set the params attribute. Sets the constants as well.'''
# debug overrides to read args from file or cmdline
# Avoid tracebacks when locale is non-utf8
if sys.version_info < (3,):
buffer = sys.stdin.read()
if len(sys.argv) > 1:
if os.path.isfile(sys.argv[1]):
fd = open(sys.argv[1], 'rb')
buffer = fd.read()
fd.close()
else:
buffer = sys.argv[1]
# default case, read from stdin
else:
buffer = sys.stdin.buffer.read()
if sys.version_info < (3,):
buffer = sys.stdin.read()
else:
buffer = sys.stdin.buffer.read()
try:
params = json.loads(buffer.decode('utf-8'))
except ValueError:

View file

@ -0,0 +1,57 @@
# (c) 2016, Matt Davis <mdavis@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import sys
from contextlib import contextmanager
from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
@contextmanager
def swap_stdin_and_argv(stdin_data='', argv_data=tuple()):
"""
context manager that temporarily masks the test runner's values for stdin and argv
"""
real_stdin = sys.stdin
if PY3:
sys.stdin = StringIO(stdin_data)
sys.stdin.buffer = BytesIO(to_bytes(stdin_data))
else:
sys.stdin = BytesIO(to_bytes(stdin_data))
real_argv = sys.argv
sys.argv = argv_data
yield
sys.stdin = real_stdin
sys.argv = real_argv
@contextmanager
def swap_stdout():
"""
context manager that temporarily replaces stdout for tests that need to verify output
"""
old_stdout = sys.stdout
fake_stream = BytesIO()
sys.stdout = fake_stream
yield fake_stream
sys.stdout = old_stdout

View file

@ -22,75 +22,62 @@ __metaclass__ = type
import sys
import json
from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from units.mock.procenv import swap_stdin_and_argv
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import MagicMock
class TestModuleUtilsBasic(unittest.TestCase):
def setUp(self):
self.real_stdin = sys.stdin
args = json.dumps(
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
def test_module_utils_basic__log_invocation(self):
with swap_stdin_and_argv(stdin_data=json.dumps(
dict(
ANSIBLE_MODULE_ARGS=dict(
foo=False, bar=[1,2,3], bam="bam", baz=u'baz'),
ANSIBLE_MODULE_CONSTANTS=dict()
))):
from ansible.module_utils import basic
# test basic log invocation
am = basic.AnsibleModule(
argument_spec=dict(
foo = dict(default=True, type='bool'),
bar = dict(default=[], type='list'),
bam = dict(default="bam"),
baz = dict(default=u"baz"),
password = dict(default=True),
no_log = dict(default="you shouldn't see me", no_log=True),
),
)
am.log = MagicMock()
am._log_invocation()
# Message is generated from a dict so it will be in an unknown order.
# have to check this manually rather than with assert_called_with()
args = am.log.call_args[0]
self.assertEqual(len(args), 1)
message = args[0]
self.assertEqual(len(message), len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD'))
self.assertTrue(message.startswith('Invoked with '))
self.assertIn(' bam=bam', message)
self.assertIn(' bar=[1, 2, 3]', message)
self.assertIn(' foo=False', message)
self.assertIn(' baz=baz', message)
self.assertIn(' no_log=NOT_LOGGING_PARAMETER', message)
self.assertIn(' password=NOT_LOGGING_PASSWORD', message)
kwargs = am.log.call_args[1]
self.assertEqual(kwargs,
dict(log_args={
'foo': 'False',
'bar': '[1, 2, 3]',
'bam': 'bam',
'baz': 'baz',
'password': 'NOT_LOGGING_PASSWORD',
'no_log': 'NOT_LOGGING_PARAMETER',
})
)
)
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
def tearDown(self):
sys.stdin = self.real_stdin
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
def test_module_utils_basic__log_invocation(self):
from ansible.module_utils import basic
# test basic log invocation
am = basic.AnsibleModule(
argument_spec=dict(
foo = dict(default=True, type='bool'),
bar = dict(default=[], type='list'),
bam = dict(default="bam"),
baz = dict(default=u"baz"),
password = dict(default=True),
no_log = dict(default="you shouldn't see me", no_log=True),
),
)
am.log = MagicMock()
am._log_invocation()
# Message is generated from a dict so it will be in an unknown order.
# have to check this manually rather than with assert_called_with()
args = am.log.call_args[0]
self.assertEqual(len(args), 1)
message = args[0]
self.assertEqual(len(message), len('Invoked with bam=bam bar=[1, 2, 3] foo=False baz=baz no_log=NOT_LOGGING_PARAMETER password=NOT_LOGGING_PASSWORD'))
self.assertTrue(message.startswith('Invoked with '))
self.assertIn(' bam=bam', message)
self.assertIn(' bar=[1, 2, 3]', message)
self.assertIn(' foo=False', message)
self.assertIn(' baz=baz', message)
self.assertIn(' no_log=NOT_LOGGING_PARAMETER', message)
self.assertIn(' password=NOT_LOGGING_PASSWORD', message)
kwargs = am.log.call_args[1]
self.assertEqual(kwargs,
dict(log_args={
'foo': 'False',
'bar': '[1, 2, 3]',
'bam': 'bam',
'baz': 'baz',
'password': 'NOT_LOGGING_PASSWORD',
'no_log': 'NOT_LOGGING_PARAMETER',
})
)

View file

@ -23,39 +23,34 @@ __metaclass__ = type
import copy
import json
import sys
from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from ansible.compat.tests import unittest
from units.mock.procenv import swap_stdin_and_argv, swap_stdout
from ansible.module_utils import basic
from ansible.module_utils.basic import heuristic_log_sanitize
from ansible.module_utils.basic import return_values, remove_values
empty_invocation = {u'module_args': {}}
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
class TestAnsibleModuleExitJson(unittest.TestCase):
def setUp(self):
self.old_stdin = sys.stdin
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
self.stdin_swap_ctx = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap_ctx.__enter__()
self.old_stdout = sys.stdout
self.fake_stream = BytesIO()
sys.stdout = self.fake_stream
# since we can't use context managers and "with" without overriding run(), call them directly
self.stdout_swap_ctx = swap_stdout()
self.fake_stream = self.stdout_swap_ctx.__enter__()
self.module = basic.AnsibleModule(argument_spec=dict())
def tearDown(self):
sys.stdout = self.old_stdout
sys.stdin = self.old_stdin
# since we can't use context managers and "with" without overriding run(), call them directly to clean up
self.stdin_swap_ctx.__exit__(None, None, None)
self.stdout_swap_ctx.__exit__(None, None, None)
def test_exit_json_no_args_exits(self):
with self.assertRaises(SystemExit) as ctx:
@ -123,42 +118,24 @@ class TestAnsibleModuleExitValuesRemoved(unittest.TestCase):
),
)
def setUp(self):
self.old_stdin = sys.stdin
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
self.old_stdout = sys.stdout
def tearDown(self):
sys.stdin = self.old_stdin
sys.stdout = self.old_stdout
def test_exit_json_removes_values(self):
self.maxDiff = None
for args, return_val, expected in self.dataset:
sys.stdout = BytesIO()
params = dict(ANSIBLE_MODULE_ARGS=args, ANSIBLE_MODULE_CONSTANTS={})
params = json.dumps(params)
if PY3:
sys.stdin = StringIO(params)
sys.stdin.buffer = BytesIO(to_bytes(params))
else:
sys.stdin = BytesIO(to_bytes(params))
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.exit_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
with swap_stdin_and_argv(stdin_data=params):
with swap_stdout():
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.exit_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
def test_fail_json_removes_values(self):
self.maxDiff = None
@ -166,21 +143,17 @@ class TestAnsibleModuleExitValuesRemoved(unittest.TestCase):
expected = copy.deepcopy(expected)
del expected['changed']
expected['failed'] = True
sys.stdout = BytesIO()
params = dict(ANSIBLE_MODULE_ARGS=args, ANSIBLE_MODULE_CONSTANTS={})
params = json.dumps(params)
if PY3:
sys.stdin = StringIO(params)
sys.stdin.buffer = BytesIO(to_bytes(params))
else:
sys.stdin = BytesIO(to_bytes(params))
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.fail_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)
with swap_stdin_and_argv(stdin_data=params):
with swap_stdout():
module = basic.AnsibleModule(
argument_spec = dict(
username=dict(),
password=dict(no_log=True),
token=dict(no_log=True),
),
)
with self.assertRaises(SystemExit) as ctx:
self.assertEquals(module.fail_json(**return_val), expected)
self.assertEquals(json.loads(sys.stdout.getvalue()), expected)

View file

@ -23,16 +23,14 @@ __metaclass__ = type
import sys
import json
import syslog
from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from units.mock.procenv import swap_stdin_and_argv
from ansible.module_utils import basic
try:
# Python 3.4+
from importlib import reload
@ -44,15 +42,12 @@ except ImportError:
class TestAnsibleModuleSysLogSmokeTest(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
self.real_stdin = sys.stdin
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
self.am = basic.AnsibleModule(
argument_spec = dict(),
@ -64,7 +59,8 @@ class TestAnsibleModuleSysLogSmokeTest(unittest.TestCase):
basic.has_journal = False
def tearDown(self):
sys.stdin = self.real_stdin
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
basic.has_journal = self.has_journal
def test_smoketest_syslog(self):
@ -84,20 +80,18 @@ class TestAnsibleModuleJournaldSmokeTest(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
self.real_stdin = sys.stdin
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
self.am = basic.AnsibleModule(
argument_spec = dict(),
)
def tearDown(self):
sys.stdin = self.real_stdin
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
@unittest.skipUnless(basic.has_journal, 'python systemd bindings not installed')
def test_smoketest_journal(self):
@ -134,26 +128,26 @@ class TestAnsibleModuleLogSyslog(unittest.TestCase):
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
self.real_stdin = sys.stdin
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
self.am = basic.AnsibleModule(
argument_spec = dict(),
)
self.has_journal = basic.has_journal
if self.has_journal:
# Systems with journal can still test syslog
basic.has_journal = False
def tearDown(self):
sys.stdin = self.real_stdin
# teardown/reset
basic.has_journal = self.has_journal
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
@patch('syslog.syslog', autospec=True)
def test_no_log(self, mock_func):
no_log = self.am.no_log
@ -191,21 +185,22 @@ class TestAnsibleModuleLogJournal(unittest.TestCase):
b'non-utf8 :\xff: test': b'non-utf8 :\xff: test'.decode('utf-8', 'replace')
}
# overriding run lets us use context managers for setup/teardown-esque behavior
def setUp(self):
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
self.real_stdin = sys.stdin
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
self.am = basic.AnsibleModule(
argument_spec = dict(),
)
self.has_journal = basic.has_journal
basic.has_journal = True
if self.has_journal:
# Systems with journal can still test syslog
basic.has_journal = False
self.module_patcher = None
# In case systemd-python is not installed
@ -218,9 +213,12 @@ class TestAnsibleModuleLogJournal(unittest.TestCase):
self._fake_out_reload(basic)
def tearDown(self):
sys.stdin = self.real_stdin
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
# teardown/reset
basic.has_journal = self.has_journal
if self.module_patcher:
self.module_patcher.stop()
reload(basic)

View file

@ -25,12 +25,11 @@ import sys
import time
from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import call, MagicMock, Mock, patch, sentinel
from units.mock.procenv import swap_stdin_and_argv
from ansible.module_utils import basic
from ansible.module_utils.basic import AnsibleModule
@ -46,9 +45,7 @@ class OpenBytesIO(BytesIO):
@unittest.skipIf(sys.version_info[0] >= 3, "Python 3 is not supported on targets (yet)")
class TestAnsibleModuleRunCommand(unittest.TestCase):
def setUp(self):
self.cmd_out = {
# os.read() is returning 'bytes', not strings
sentinel.stdout: BytesIO(),
@ -66,11 +63,10 @@ class TestAnsibleModuleRunCommand(unittest.TestCase):
raise OSError(errno.EPERM, "Permission denied: '/inaccessible'")
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
self.module = AnsibleModule(argument_spec=dict())
self.module.fail_json = MagicMock(side_effect=SystemExit)
@ -96,6 +92,11 @@ class TestAnsibleModuleRunCommand(unittest.TestCase):
self.addCleanup(patch.stopall)
def tearDown(self):
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
def test_list_as_args(self):
self.module.run_command(['/bin/ls', 'a', ' b', 'c '])
self.assertTrue(self.subprocess.Popen.called)

View file

@ -22,60 +22,48 @@ __metaclass__ = type
import sys
import json
from io import BytesIO, StringIO
from ansible.compat.tests import unittest
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from units.mock.procenv import swap_stdin_and_argv
class TestAnsibleModuleExitJson(unittest.TestCase):
def setUp(self):
self.real_stdin = sys.stdin
def tearDown(self):
sys.stdin = self.real_stdin
def test_module_utils_basic_safe_eval(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec=dict(),
)
with swap_stdin_and_argv(stdin_data=args):
am = basic.AnsibleModule(
argument_spec=dict(),
)
# test some basic usage
# string (and with exceptions included), integer, bool
self.assertEqual(am.safe_eval("'a'"), 'a')
self.assertEqual(am.safe_eval("'a'", include_exceptions=True), ('a', None))
self.assertEqual(am.safe_eval("1"), 1)
self.assertEqual(am.safe_eval("True"), True)
self.assertEqual(am.safe_eval("False"), False)
self.assertEqual(am.safe_eval("{}"), {})
# not passing in a string to convert
self.assertEqual(am.safe_eval({'a':1}), {'a':1})
self.assertEqual(am.safe_eval({'a':1}, include_exceptions=True), ({'a':1}, None))
# invalid literal eval
self.assertEqual(am.safe_eval("a=1"), "a=1")
res = am.safe_eval("a=1", include_exceptions=True)
self.assertEqual(res[0], "a=1")
self.assertEqual(type(res[1]), SyntaxError)
self.assertEqual(am.safe_eval("a.foo()"), "a.foo()")
res = am.safe_eval("a.foo()", include_exceptions=True)
self.assertEqual(res[0], "a.foo()")
self.assertEqual(res[1], None)
self.assertEqual(am.safe_eval("import foo"), "import foo")
res = am.safe_eval("import foo", include_exceptions=True)
self.assertEqual(res[0], "import foo")
self.assertEqual(res[1], None)
self.assertEqual(am.safe_eval("__import__('foo')"), "__import__('foo')")
res = am.safe_eval("__import__('foo')", include_exceptions=True)
self.assertEqual(res[0], "__import__('foo')")
self.assertEqual(type(res[1]), ValueError)
# test some basic usage
# string (and with exceptions included), integer, bool
self.assertEqual(am.safe_eval("'a'"), 'a')
self.assertEqual(am.safe_eval("'a'", include_exceptions=True), ('a', None))
self.assertEqual(am.safe_eval("1"), 1)
self.assertEqual(am.safe_eval("True"), True)
self.assertEqual(am.safe_eval("False"), False)
self.assertEqual(am.safe_eval("{}"), {})
# not passing in a string to convert
self.assertEqual(am.safe_eval({'a':1}), {'a':1})
self.assertEqual(am.safe_eval({'a':1}, include_exceptions=True), ({'a':1}, None))
# invalid literal eval
self.assertEqual(am.safe_eval("a=1"), "a=1")
res = am.safe_eval("a=1", include_exceptions=True)
self.assertEqual(res[0], "a=1")
self.assertEqual(type(res[1]), SyntaxError)
self.assertEqual(am.safe_eval("a.foo()"), "a.foo()")
res = am.safe_eval("a.foo()", include_exceptions=True)
self.assertEqual(res[0], "a.foo()")
self.assertEqual(res[1], None)
self.assertEqual(am.safe_eval("import foo"), "import foo")
res = am.safe_eval("import foo", include_exceptions=True)
self.assertEqual(res[0], "import foo")
self.assertEqual(res[1], None)
self.assertEqual(am.safe_eval("__import__('foo')"), "__import__('foo')")
res = am.safe_eval("__import__('foo')", include_exceptions=True)
self.assertEqual(res[0], "__import__('foo')")
self.assertEqual(type(res[1]), ValueError)

View file

@ -31,8 +31,7 @@ try:
except ImportError:
import __builtin__ as builtins
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from units.mock.procenv import swap_stdin_and_argv
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock, mock_open, Mock, call
@ -42,10 +41,14 @@ realimport = builtins.__import__
class TestModuleUtilsBasic(unittest.TestCase):
def setUp(self):
self.real_stdin = sys.stdin
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
def tearDown(self):
sys.stdin = self.real_stdin
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)
def clear_modules(self, mods):
for mod in mods:
@ -271,13 +274,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_creation(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec=dict(),
)
@ -293,94 +289,71 @@ class TestModuleUtilsBasic(unittest.TestCase):
# should test ok
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo": "hello"}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
with swap_stdin_and_argv(stdin_data=args):
am = basic.AnsibleModule(
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
# FIXME: add asserts here to verify the basic config
# fail, because a required param was not specified
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
with swap_stdin_and_argv(stdin_data=args):
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
# fail because of mutually exclusive parameters
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"foo":"hello", "bar": "bad", "bam": "bad"}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
with swap_stdin_and_argv(stdin_data=args):
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
# fail because a param required due to another param was not specified
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={"bam": "bad"}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
with swap_stdin_and_argv(stdin_data=args):
self.assertRaises(
SystemExit,
basic.AnsibleModule,
argument_spec = arg_spec,
mutually_exclusive = mut_ex,
required_together = req_to,
no_log=True,
check_invalid_arguments=False,
add_file_common_args=True,
supports_check_mode=True,
)
def test_module_utils_basic_ansible_module_load_file_common_arguments(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -429,13 +402,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_selinux_mls_enabled(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -455,13 +421,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_selinux_initial_context(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -475,13 +434,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_selinux_enabled(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -513,13 +465,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_selinux_default_context(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -555,13 +500,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_selinux_context(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -604,59 +542,49 @@ class TestModuleUtilsBasic(unittest.TestCase):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={"SELINUX_SPECIAL_FS": "nfs,nfsd,foos"}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
print(am.constants)
with swap_stdin_and_argv(stdin_data=args):
def _mock_find_mount_point(path):
if path.startswith('/some/path'):
return '/some/path'
elif path.startswith('/weird/random/fstype'):
return '/weird/random/fstype'
return '/'
am = basic.AnsibleModule(
argument_spec = dict(),
)
print(am.constants)
am.find_mount_point = MagicMock(side_effect=_mock_find_mount_point)
am.selinux_context = MagicMock(return_value=['foo_u', 'foo_r', 'foo_t', 's0'])
def _mock_find_mount_point(path):
if path.startswith('/some/path'):
return '/some/path'
elif path.startswith('/weird/random/fstype'):
return '/weird/random/fstype'
return '/'
m = mock_open()
m.side_effect = OSError
am.find_mount_point = MagicMock(side_effect=_mock_find_mount_point)
am.selinux_context = MagicMock(return_value=['foo_u', 'foo_r', 'foo_t', 's0'])
with patch.object(builtins, 'open', m, create=True):
self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (False, None))
m = mock_open()
m.side_effect = OSError
mount_data = [
'/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n',
'1.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n',
'whatever /weird/random/fstype foos rw 0 0\n',
]
with patch.object(builtins, 'open', m, create=True):
self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (False, None))
# mock_open has a broken readlines() implementation apparently...
# this should work by default but doesn't, so we fix it
m = mock_open(read_data=''.join(mount_data))
m.return_value.readlines.return_value = mount_data
mount_data = [
'/dev/disk1 / ext4 rw,seclabel,relatime,data=ordered 0 0\n',
'1.1.1.1:/path/to/nfs /some/path nfs ro 0 0\n',
'whatever /weird/random/fstype foos rw 0 0\n',
]
with patch.object(builtins, 'open', m, create=True):
self.assertEqual(am.is_special_selinux_path('/some/random/path'), (False, None))
self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (True, ['foo_u', 'foo_r', 'foo_t', 's0']))
self.assertEqual(am.is_special_selinux_path('/weird/random/fstype/path'), (True, ['foo_u', 'foo_r', 'foo_t', 's0']))
# mock_open has a broken readlines() implementation apparently...
# this should work by default but doesn't, so we fix it
m = mock_open(read_data=''.join(mount_data))
m.return_value.readlines.return_value = mount_data
with patch.object(builtins, 'open', m, create=True):
self.assertEqual(am.is_special_selinux_path('/some/random/path'), (False, None))
self.assertEqual(am.is_special_selinux_path('/some/path/that/should/be/nfs'), (True, ['foo_u', 'foo_r', 'foo_t', 's0']))
self.assertEqual(am.is_special_selinux_path('/weird/random/fstype/path'), (True, ['foo_u', 'foo_r', 'foo_t', 's0']))
def test_module_utils_basic_ansible_module_to_filesystem_str(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -667,13 +595,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_user_and_group(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -688,13 +609,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_find_mount_point(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -718,13 +632,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_set_context_if_different(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -769,13 +676,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_set_owner_if_different(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -814,13 +714,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_set_group_if_different(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -859,13 +752,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
def test_module_utils_basic_ansible_module_set_mode_if_different(self):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -953,13 +839,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)
@ -1137,13 +1016,6 @@ class TestModuleUtilsBasic(unittest.TestCase):
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
am = basic.AnsibleModule(
argument_spec = dict(),
)

View file

@ -26,6 +26,8 @@ from io import BytesIO, StringIO
from ansible.compat.six import PY3
from ansible.utils.unicode import to_bytes
from units.mock.procenv import swap_stdin_and_argv
# for testing
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch
@ -323,26 +325,17 @@ def test_distribution_version():
# needs to be in here, because the import fails with python3 still
import ansible.module_utils.facts as facts
real_stdin = sys.stdin
from ansible.module_utils import basic
args = json.dumps(dict(ANSIBLE_MODULE_ARGS={}, ANSIBLE_MODULE_CONSTANTS={}))
if PY3:
sys.stdin = StringIO(args)
sys.stdin.buffer = BytesIO(to_bytes(args))
else:
sys.stdin = BytesIO(to_bytes(args))
module = basic.AnsibleModule(argument_spec=dict())
for t in TESTSETS:
# run individual tests via generator
# set nicer stdout output for nosetest
_test_one_distribution.description = "check distribution_version for %s" % t['name']
yield _test_one_distribution, facts, module, t
sys.stdin = real_stdin
with swap_stdin_and_argv(stdin_data=args):
module = basic.AnsibleModule(argument_spec=dict())
for t in TESTSETS:
# run individual tests via generator
# set nicer stdout output for nosetest
_test_one_distribution.description = "check distribution_version for %s" % t['name']
yield _test_one_distribution, facts, module, t
def _test_one_distribution(facts, module, testcase):
"""run the test on one distribution testcase