Fix various import sanity test issues.
- Relative imports are now properly recognized. - Correct script invocation of Ansible modules is used. - Warnings are now consistently reported as errors. - Errors are now consistently reported with the file tested. Resolves https://github.com/ansible/ansible/issues/62723 Resolves https://github.com/ansible/ansible/issues/61884
This commit is contained in:
parent
b52d715567
commit
92ccdeac31
2 changed files with 320 additions and 229 deletions
|
@ -7,24 +7,28 @@ __metaclass__ = type
|
|||
def main():
|
||||
"""
|
||||
Main program function used to isolate globals from imported code.
|
||||
Changes to globals in imported modules on Python 2.7 will overwrite our own globals.
|
||||
Changes to globals in imported modules on Python 2.x will overwrite our own globals.
|
||||
"""
|
||||
import contextlib
|
||||
import os
|
||||
import re
|
||||
import runpy
|
||||
import sys
|
||||
import traceback
|
||||
import types
|
||||
import warnings
|
||||
|
||||
import_dir = os.environ['SANITY_IMPORT_DIR']
|
||||
minimal_dir = os.environ['SANITY_MINIMAL_DIR']
|
||||
ansible_path = os.environ['PYTHONPATH']
|
||||
temp_path = os.environ['SANITY_TEMP_PATH'] + os.path.sep
|
||||
collection_full_name = os.environ.get('SANITY_COLLECTION_FULL_NAME')
|
||||
|
||||
try:
|
||||
import importlib.util
|
||||
imp = None # pylint: disable=invalid-name
|
||||
# noinspection PyCompatibility
|
||||
from importlib import import_module
|
||||
except ImportError:
|
||||
importlib = None # pylint: disable=invalid-name
|
||||
import imp
|
||||
def import_module(name):
|
||||
__import__(name)
|
||||
return sys.modules[name]
|
||||
|
||||
try:
|
||||
# noinspection PyCompatibility
|
||||
|
@ -32,23 +36,55 @@ def main():
|
|||
except ImportError:
|
||||
from io import StringIO
|
||||
|
||||
import ansible.module_utils.basic
|
||||
import ansible.module_utils.common.removed
|
||||
# pre-load an empty ansible package to prevent unwanted code in __init__.py from loading
|
||||
# without this the ansible.release import there would pull in many Python modules which Ansible modules should not have access to
|
||||
ansible_module = types.ModuleType('ansible')
|
||||
ansible_module.__file__ = os.path.join(os.environ['PYTHONPATH'], 'ansible', '__init__.py')
|
||||
ansible_module.__path__ = [os.path.dirname(ansible_module.__file__)]
|
||||
ansible_module.__package__ = 'ansible'
|
||||
|
||||
try:
|
||||
sys.modules['ansible'] = ansible_module
|
||||
|
||||
if collection_full_name:
|
||||
# allow importing code from collections when testing a collection
|
||||
from ansible.utils.collection_loader import AnsibleCollectionLoader
|
||||
except ImportError:
|
||||
# noinspection PyPep8Naming
|
||||
AnsibleCollectionLoader = None
|
||||
from ansible.module_utils._text import to_bytes
|
||||
|
||||
# These are the public attribute sof a doc-only module
|
||||
doc_keys = ('ANSIBLE_METADATA',
|
||||
'DOCUMENTATION',
|
||||
'EXAMPLES',
|
||||
'RETURN',
|
||||
'absolute_import',
|
||||
'division',
|
||||
'print_function')
|
||||
def get_source(self, fullname):
|
||||
mod = sys.modules.get(fullname)
|
||||
if not mod:
|
||||
mod = self.load_module(fullname)
|
||||
|
||||
with open(to_bytes(mod.__file__), 'rb') as mod_file:
|
||||
source = mod_file.read()
|
||||
|
||||
return source
|
||||
|
||||
def get_code(self, fullname):
|
||||
return compile(source=self.get_source(fullname), filename=self.get_filename(fullname), mode='exec', flags=0, dont_inherit=True)
|
||||
|
||||
def is_package(self, fullname):
|
||||
return self.get_filename(fullname).endswith('__init__.py')
|
||||
|
||||
def get_filename(self, fullname):
|
||||
mod = sys.modules.get(fullname) or self.load_module(fullname)
|
||||
|
||||
return mod.__file__
|
||||
|
||||
# monkeypatch collection loader to work with runpy
|
||||
# remove this (and the associated code above) once implemented natively in the collection loader
|
||||
AnsibleCollectionLoader.get_source = get_source
|
||||
AnsibleCollectionLoader.get_code = get_code
|
||||
AnsibleCollectionLoader.is_package = is_package
|
||||
AnsibleCollectionLoader.get_filename = get_filename
|
||||
|
||||
collection_loader = AnsibleCollectionLoader()
|
||||
|
||||
# noinspection PyCallingNonCallable
|
||||
sys.meta_path.insert(0, collection_loader)
|
||||
else:
|
||||
# do not support collection loading when not testing a collection
|
||||
collection_loader = None
|
||||
|
||||
class ImporterAnsibleModuleException(Exception):
|
||||
"""Exception thrown during initialization of ImporterAnsibleModule."""
|
||||
|
@ -58,185 +94,241 @@ def main():
|
|||
def __init__(self, *args, **kwargs):
|
||||
raise ImporterAnsibleModuleException()
|
||||
|
||||
# stop Ansible module execution during AnsibleModule instantiation
|
||||
ansible.module_utils.basic.AnsibleModule = ImporterAnsibleModule
|
||||
# no-op for _load_params since it may be called before instantiating AnsibleModule
|
||||
ansible.module_utils.basic._load_params = lambda *args, **kwargs: {} # pylint: disable=protected-access
|
||||
# no-op for removed_module since it is called in place of AnsibleModule instantiation
|
||||
ansible.module_utils.common.removed.removed_module = lambda *args, **kwargs: None
|
||||
class ImportBlacklist:
|
||||
"""Blacklist inappropriate imports."""
|
||||
def __init__(self, path, name):
|
||||
self.path = path
|
||||
self.name = name
|
||||
self.loaded_modules = set()
|
||||
|
||||
def find_module(self, fullname, path=None):
|
||||
"""Return self if the given fullname is blacklisted, otherwise return None.
|
||||
:param fullname: str
|
||||
:param path: str
|
||||
:return: ImportBlacklist | None
|
||||
"""
|
||||
if fullname in self.loaded_modules:
|
||||
return None # ignore modules that are already being loaded
|
||||
|
||||
if is_name_in_namepace(fullname, ['ansible']):
|
||||
if fullname in ('ansible.module_utils.basic', 'ansible.module_utils.common.removed'):
|
||||
return self # intercept loading so we can modify the result
|
||||
|
||||
if is_name_in_namepace(fullname, ['ansible.module_utils', self.name]):
|
||||
return None # module_utils and module under test are always allowed
|
||||
|
||||
if os.path.exists(convert_ansible_name_to_absolute_path(fullname)):
|
||||
return self # blacklist ansible files that exist
|
||||
|
||||
return None # ansible file does not exist, do not blacklist
|
||||
|
||||
if is_name_in_namepace(fullname, ['ansible_collections']):
|
||||
if not collection_loader:
|
||||
return self # blacklist collections when we are not testing a collection
|
||||
|
||||
if is_name_in_namepace(fullname, ['ansible_collections...plugins.module_utils', self.name]):
|
||||
return None # module_utils and module under test are always allowed
|
||||
|
||||
if collection_loader.find_module(fullname, path):
|
||||
return self # blacklist collection files that exist
|
||||
|
||||
return None # collection file does not exist, do not blacklist
|
||||
|
||||
# not a namespace we care about
|
||||
return None
|
||||
|
||||
def load_module(self, fullname):
|
||||
"""Raise an ImportError.
|
||||
:type fullname: str
|
||||
"""
|
||||
if fullname == 'ansible.module_utils.basic':
|
||||
module = self.__load_module(fullname)
|
||||
|
||||
# stop Ansible module execution during AnsibleModule instantiation
|
||||
module.AnsibleModule = ImporterAnsibleModule
|
||||
# no-op for _load_params since it may be called before instantiating AnsibleModule
|
||||
module._load_params = lambda *args, **kwargs: {} # pylint: disable=protected-access
|
||||
|
||||
return module
|
||||
|
||||
if fullname == 'ansible.module_utils.common.removed':
|
||||
module = self.__load_module(fullname)
|
||||
|
||||
# no-op for removed_module since it is called in place of AnsibleModule instantiation
|
||||
module.removed_module = lambda *args, **kwargs: None
|
||||
|
||||
return module
|
||||
|
||||
raise ImportError('import of "%s" is not allowed in this context' % fullname)
|
||||
|
||||
def __load_module(self, fullname):
|
||||
"""Load the requested module while avoiding infinite recursion.
|
||||
:type fullname: str
|
||||
:rtype: module
|
||||
"""
|
||||
self.loaded_modules.add(fullname)
|
||||
return import_module(fullname)
|
||||
|
||||
def run():
|
||||
"""Main program function."""
|
||||
base_dir = os.getcwd()
|
||||
messages = set()
|
||||
|
||||
if AnsibleCollectionLoader:
|
||||
# allow importing code from collections
|
||||
# noinspection PyCallingNonCallable
|
||||
sys.meta_path.insert(0, AnsibleCollectionLoader())
|
||||
|
||||
for path in sys.argv[1:] or sys.stdin.read().splitlines():
|
||||
test_python_module(path, base_dir, messages, False)
|
||||
test_python_module(path, base_dir, messages, True)
|
||||
name = convert_relative_path_to_name(path)
|
||||
test_python_module(path, name, base_dir, messages)
|
||||
|
||||
if messages:
|
||||
exit(10)
|
||||
|
||||
def test_python_module(path, base_dir, messages, ansible_module):
|
||||
if ansible_module:
|
||||
# importing modules with __main__ under Python 2.6 exits with status code 1
|
||||
if sys.version_info < (2, 7):
|
||||
return
|
||||
def test_python_module(path, name, base_dir, messages):
|
||||
"""Test the given python module by importing it.
|
||||
:type path: str
|
||||
:type name: str
|
||||
:type base_dir: str
|
||||
:type messages: set[str]
|
||||
"""
|
||||
if name in sys.modules:
|
||||
return # cannot be tested because it has already been loaded
|
||||
|
||||
# only run __main__ protected code for Ansible modules
|
||||
if not path.startswith('lib/ansible/modules/'):
|
||||
return
|
||||
is_ansible_module = (path.startswith('lib/ansible/modules/') or path.startswith('plugins/modules/')) and os.path.basename(path) != '__init__.py'
|
||||
run_main = is_ansible_module
|
||||
|
||||
# __init__ in module directories is empty (enforced by a different test)
|
||||
if path.endswith('__init__.py'):
|
||||
return
|
||||
if path == 'lib/ansible/modules/utilities/logic/async_wrapper.py':
|
||||
# async_wrapper is a non-standard Ansible module (does not use AnsibleModule) so we cannot test the main function
|
||||
run_main = False
|
||||
|
||||
# async_wrapper is not an Ansible module
|
||||
if path == 'lib/ansible/modules/utilities/logic/async_wrapper.py':
|
||||
return
|
||||
|
||||
name = calculate_python_module_name(path)
|
||||
# show the Ansible module responsible for the exception, even if it was thrown in module_utils
|
||||
filter_dir = os.path.join(base_dir, 'lib/ansible/modules')
|
||||
else:
|
||||
# Calculate module name
|
||||
name = calculate_python_module_name(path)
|
||||
|
||||
# show the Ansible file responsible for the exception, even if it was thrown in 3rd party code
|
||||
filter_dir = base_dir
|
||||
|
||||
capture = Capture()
|
||||
capture_normal = Capture()
|
||||
capture_main = Capture()
|
||||
|
||||
try:
|
||||
if imp:
|
||||
with capture_output(capture):
|
||||
# On Python2 without absolute_import we have to import parent modules all
|
||||
# the way up the tree
|
||||
full_path = os.path.abspath(path)
|
||||
parent_mod = None
|
||||
with monitor_sys_modules(path, messages):
|
||||
with blacklist_imports(path, name, messages):
|
||||
with capture_output(capture_normal):
|
||||
import_module(name)
|
||||
|
||||
py_packages = name.split('.')
|
||||
# BIG HACK: reimporting module_utils breaks the monkeypatching of basic we did
|
||||
# above and also breaks modules which import names directly from module_utils
|
||||
# modules (you'll get errors like ERROR:
|
||||
# lib/ansible/modules/storage/netapp/na_ontap_vserver_cifs_security.py:151:0:
|
||||
# AttributeError: 'module' object has no attribute 'netapp').
|
||||
# So when we import a module_util here, use a munged name.
|
||||
if 'module_utils' in py_packages:
|
||||
# Avoid accidental double underscores by using _1 as a prefix
|
||||
py_packages[-1] = '_1%s' % py_packages[-1]
|
||||
name = '.'.join(py_packages)
|
||||
|
||||
for idx in range(1, len(py_packages)):
|
||||
parent_name = '.'.join(py_packages[:idx])
|
||||
if parent_mod is None:
|
||||
toplevel_end = full_path.find('ansible/module')
|
||||
toplevel = full_path[:toplevel_end]
|
||||
parent_mod_info = imp.find_module(parent_name, [toplevel])
|
||||
else:
|
||||
parent_mod_info = imp.find_module(py_packages[idx - 1], parent_mod.__path__)
|
||||
|
||||
parent_mod = imp.load_module(parent_name, *parent_mod_info)
|
||||
# skip distro due to an apparent bug or bad interaction in
|
||||
# imp.load_module() with our distro/__init__.py.
|
||||
# distro/__init__.py sets sys.modules['ansible.module_utils.distro']
|
||||
# = _distro.pyc
|
||||
# but after running imp.load_module(),
|
||||
# sys.modules['ansible.module_utils.distro._distro'] = __init__.pyc
|
||||
# (The opposite of what we set)
|
||||
# This does not affect runtime so regular import seems to work. It's
|
||||
# just imp.load_module()
|
||||
if name == 'ansible.module_utils.distro._1__init__':
|
||||
return
|
||||
|
||||
with open(path, 'r') as module_fd:
|
||||
module = imp.load_module(name, module_fd, full_path, ('.py', 'r', imp.PY_SOURCE))
|
||||
if ansible_module:
|
||||
run_if_really_module(module)
|
||||
else:
|
||||
spec = importlib.util.spec_from_file_location(name, os.path.abspath(path))
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
|
||||
with capture_output(capture):
|
||||
spec.loader.exec_module(module)
|
||||
if ansible_module:
|
||||
run_if_really_module(module)
|
||||
|
||||
capture_report(path, capture, messages)
|
||||
if run_main:
|
||||
with monitor_sys_modules(path, messages):
|
||||
with blacklist_imports(path, name, messages):
|
||||
with capture_output(capture_main):
|
||||
runpy.run_module(name, run_name='__main__')
|
||||
except ImporterAnsibleModuleException:
|
||||
# module instantiated AnsibleModule without raising an exception
|
||||
pass
|
||||
# We truly want to catch anything the plugin might do here, including call sys.exit() so we
|
||||
# catch BaseException
|
||||
except BaseException as ex: # pylint: disable=locally-disabled, broad-except
|
||||
capture_report(path, capture, messages)
|
||||
|
||||
# intentionally catch all exceptions, including calls to sys.exit
|
||||
exc_type, _exc, exc_tb = sys.exc_info()
|
||||
message = str(ex)
|
||||
results = list(reversed(traceback.extract_tb(exc_tb)))
|
||||
source = None
|
||||
line = 0
|
||||
offset = 0
|
||||
full_path = os.path.join(base_dir, path)
|
||||
base_path = base_dir + os.path.sep
|
||||
source = None
|
||||
|
||||
if isinstance(ex, SyntaxError) and ex.filename.endswith(path): # pylint: disable=locally-disabled, no-member
|
||||
# A SyntaxError in the source we're importing will have the correct path, line and offset.
|
||||
# However, the traceback will report the path to this importer.py script instead.
|
||||
# We'll use the details from the SyntaxError in this case, as it's more accurate.
|
||||
source = path
|
||||
line = ex.lineno or 0 # pylint: disable=locally-disabled, no-member
|
||||
offset = ex.offset or 0 # pylint: disable=locally-disabled, no-member
|
||||
message = str(ex)
|
||||
|
||||
# Hack to remove the filename and line number from the message, if present.
|
||||
message = message.replace(' (%s, line %d)' % (os.path.basename(path), line), '')
|
||||
else:
|
||||
for result in results:
|
||||
if result[0].startswith(filter_dir):
|
||||
source = result[0][len(base_dir) + 1:].replace('test/lib/ansible_test/_data/sanity/import/', '')
|
||||
line = result[1] or 0
|
||||
break
|
||||
|
||||
if not source:
|
||||
# If none of our source files are found in the traceback, report the file we were testing.
|
||||
# I haven't been able to come up with a test case that encounters this issue yet.
|
||||
source = path
|
||||
message += ' (in %s:%d)' % (results[-1][0], results[-1][1] or 0)
|
||||
|
||||
# avoid line wraps in messages
|
||||
message = re.sub(r'\n *', ': ', message)
|
||||
error = '%s:%d:%d: %s: %s' % (source, line, offset, exc_type.__name__, message)
|
||||
|
||||
report_message(error, messages)
|
||||
for result in results:
|
||||
if result[0] == full_path:
|
||||
# save the line number for the file under test
|
||||
line = result[1] or 0
|
||||
|
||||
def run_if_really_module(module):
|
||||
# Module was removed
|
||||
if ('removed' not in module.ANSIBLE_METADATA['status'] and
|
||||
# Documentation only module
|
||||
[attr for attr in
|
||||
(frozenset(module.__dict__.keys()).difference(doc_keys))
|
||||
if not (attr.startswith('__') and attr.endswith('__'))]):
|
||||
# Run main() code for ansible_modules
|
||||
module.main()
|
||||
if not source and result[0].startswith(base_path) and not result[0].startswith(temp_path):
|
||||
# save the first path and line number in the traceback which is in our source tree
|
||||
source = (os.path.relpath(result[0], base_path), result[1] or 0, 0)
|
||||
|
||||
def calculate_python_module_name(path):
|
||||
name = None
|
||||
try:
|
||||
idx = path.index('ansible/modules')
|
||||
except ValueError:
|
||||
try:
|
||||
idx = path.index('ansible/module_utils')
|
||||
except ValueError:
|
||||
try:
|
||||
idx = path.index('ansible_collections')
|
||||
except ValueError:
|
||||
# Default
|
||||
name = 'module_import_test'
|
||||
if name is None:
|
||||
name = path[idx:-len('.py')].replace('/', '.')
|
||||
if isinstance(ex, SyntaxError):
|
||||
# SyntaxError has better information than the traceback
|
||||
if ex.filename == full_path: # pylint: disable=locally-disabled, no-member
|
||||
# syntax error was reported in the file under test
|
||||
line = ex.lineno or 0 # pylint: disable=locally-disabled, no-member
|
||||
offset = ex.offset or 0 # pylint: disable=locally-disabled, no-member
|
||||
elif ex.filename.startswith(base_path) and not ex.filename.startswith(temp_path): # pylint: disable=locally-disabled, no-member
|
||||
# syntax error was reported in our source tree
|
||||
source = (os.path.relpath(ex.filename, base_path), ex.lineno or 0, ex.offset or 0) # pylint: disable=locally-disabled, no-member
|
||||
|
||||
# remove the filename and line number from the message
|
||||
# either it was extracted above, or it's not really useful information
|
||||
message = re.sub(r' \(.*?, line [0-9]+\)$', '', message)
|
||||
|
||||
if source and source[0] != path:
|
||||
message += ' (at %s:%d:%d)' % (source[0], source[1], source[2])
|
||||
|
||||
report_message(path, line, offset, 'traceback', '%s: %s' % (exc_type.__name__, message), messages)
|
||||
finally:
|
||||
capture_report(path, capture_normal, messages)
|
||||
capture_report(path, capture_main, messages)
|
||||
|
||||
def is_name_in_namepace(name, namespaces):
|
||||
"""Returns True if the given name is one of the given namespaces, otherwise returns False."""
|
||||
name_parts = name.split('.')
|
||||
|
||||
for namespace in namespaces:
|
||||
namespace_parts = namespace.split('.')
|
||||
length = min(len(name_parts), len(namespace_parts))
|
||||
|
||||
truncated_name = name_parts[0:length]
|
||||
truncated_namespace = namespace_parts[0:length]
|
||||
|
||||
# empty parts in the namespace are treated as wildcards
|
||||
# to simplify the comparison, use those empty parts to indicate the positions in the name to be empty as well
|
||||
for idx, part in enumerate(truncated_namespace):
|
||||
if not part:
|
||||
truncated_name[idx] = part
|
||||
|
||||
# example: name=ansible, allowed_name=ansible.module_utils
|
||||
# example: name=ansible.module_utils.system.ping, allowed_name=ansible.module_utils
|
||||
if truncated_name == truncated_namespace:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def check_sys_modules(path, before, messages):
|
||||
"""Check for unwanted changes to sys.modules.
|
||||
:type path: str
|
||||
:type before: dict[str, module]
|
||||
:type messages: set[str]
|
||||
"""
|
||||
after = sys.modules
|
||||
removed = set(before.keys()) - set(after.keys())
|
||||
changed = set(key for key, value in before.items() if key in after and value != after[key])
|
||||
|
||||
# additions are checked by our custom PEP 302 loader, so we don't need to check them again here
|
||||
|
||||
for module in sorted(removed):
|
||||
report_message(path, 0, 0, 'unload', 'unloading of "%s" in sys.modules is not supported' % module, messages)
|
||||
|
||||
for module in sorted(changed):
|
||||
report_message(path, 0, 0, 'reload', 'reloading of "%s" in sys.modules is not supported' % module, messages)
|
||||
|
||||
def convert_ansible_name_to_absolute_path(name):
|
||||
"""Calculate the module path from the given name.
|
||||
:type name: str
|
||||
:rtype: str
|
||||
"""
|
||||
return os.path.join(ansible_path, name.replace('.', os.path.sep))
|
||||
|
||||
def convert_relative_path_to_name(path):
|
||||
"""Calculate the module name from the given path.
|
||||
:type path: str
|
||||
:rtype: str
|
||||
"""
|
||||
if path.endswith('/__init__.py'):
|
||||
clean_path = os.path.dirname(path)
|
||||
else:
|
||||
clean_path = path
|
||||
|
||||
clean_path = os.path.splitext(clean_path)[0]
|
||||
|
||||
name = clean_path.replace(os.path.sep, '.')
|
||||
|
||||
if collection_loader:
|
||||
# when testing collections the relative paths (and names) being tested are within the collection under test
|
||||
name = 'ansible_collections.%s.%s' % (collection_full_name, name)
|
||||
else:
|
||||
# when testing ansible all files being imported reside under the lib directory
|
||||
name = name[len('lib/'):]
|
||||
|
||||
return name
|
||||
|
||||
|
@ -245,7 +337,6 @@ def main():
|
|||
def __init__(self):
|
||||
self.stdout = StringIO()
|
||||
self.stderr = StringIO()
|
||||
self.warnings = []
|
||||
|
||||
def capture_report(path, capture, messages):
|
||||
"""Report on captured output.
|
||||
|
@ -255,44 +346,61 @@ def main():
|
|||
"""
|
||||
if capture.stdout.getvalue():
|
||||
first = capture.stdout.getvalue().strip().splitlines()[0].strip()
|
||||
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardOutputUsed', first)
|
||||
report_message(message, messages)
|
||||
report_message(path, 0, 0, 'stdout', first, messages)
|
||||
|
||||
if capture.stderr.getvalue():
|
||||
first = capture.stderr.getvalue().strip().splitlines()[0].strip()
|
||||
message = '%s:%d:%d: %s: %s' % (path, 0, 0, 'StandardErrorUsed', first)
|
||||
report_message(message, messages)
|
||||
report_message(path, 0, 0, 'stderr', first, messages)
|
||||
|
||||
for warning in capture.warnings:
|
||||
msg = re.sub(r'\s+', ' ', '%s' % warning.message).strip()
|
||||
|
||||
filepath = os.path.relpath(warning.filename)
|
||||
lineno = warning.lineno
|
||||
|
||||
if filepath.startswith('../') or filepath.startswith(minimal_dir):
|
||||
# The warning occurred outside our source tree.
|
||||
# The best we can do is to report the file which was tested that triggered the warning.
|
||||
# If the responsible import is in shared code this warning will be repeated for each file tested which imports the shared code.
|
||||
msg += ' (in %s:%d)' % (warning.filename, warning.lineno)
|
||||
filepath = path
|
||||
lineno = 0
|
||||
elif filepath.startswith(import_dir):
|
||||
# Strip the import dir from warning paths in shared code.
|
||||
# Needed when warnings occur in places like module_utils but are caught by the modules importing the module_utils.
|
||||
filepath = os.path.relpath(filepath, import_dir)
|
||||
|
||||
message = '%s:%d:%d: %s: %s' % (filepath, lineno, 0, warning.category.__name__, msg)
|
||||
report_message(message, messages)
|
||||
|
||||
def report_message(message, messages):
|
||||
def report_message(path, line, column, code, message, messages):
|
||||
"""Report message if not already reported.
|
||||
:type path: str
|
||||
:type line: int
|
||||
:type column: int
|
||||
:type code: str
|
||||
:type message: str
|
||||
:type messages: set[str]
|
||||
"""
|
||||
message = '%s:%d:%d: %s: %s' % (path, line, column, code, message)
|
||||
|
||||
if message not in messages:
|
||||
messages.add(message)
|
||||
print(message)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def blacklist_imports(path, name, messages):
|
||||
"""Blacklist imports.
|
||||
:type path: str
|
||||
:type name: str
|
||||
:type messages: set[str]
|
||||
"""
|
||||
blacklist = ImportBlacklist(path, name)
|
||||
|
||||
sys.meta_path.insert(0, blacklist)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if sys.meta_path[0] != blacklist:
|
||||
report_message(path, 0, 0, 'metapath', 'changes to sys.meta_path[0] are not permitted', messages)
|
||||
|
||||
while blacklist in sys.meta_path:
|
||||
sys.meta_path.remove(blacklist)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def monitor_sys_modules(path, messages):
|
||||
"""Monitor sys.modules for unwanted changes, reverting any additions made to our own namespaces."""
|
||||
snapshot = sys.modules.copy()
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
check_sys_modules(path, snapshot, messages)
|
||||
|
||||
for key in set(sys.modules.keys()) - set(snapshot.keys()):
|
||||
if is_name_in_namepace(key, ('ansible', 'ansible_collections')):
|
||||
del sys.modules[key] # only unload our own code since we know it's native Python
|
||||
|
||||
@contextlib.contextmanager
|
||||
def capture_output(capture):
|
||||
"""Capture sys.stdout and sys.stderr.
|
||||
|
@ -304,12 +412,19 @@ def main():
|
|||
sys.stdout = capture.stdout
|
||||
sys.stderr = capture.stderr
|
||||
|
||||
with warnings.catch_warnings(record=True) as captured_warnings:
|
||||
# clear all warnings registries to make all warnings available
|
||||
for module in sys.modules.values():
|
||||
try:
|
||||
module.__warningregistry__.clear()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter('error')
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
capture.warnings = captured_warnings
|
||||
|
||||
sys.stdout = old_stdout
|
||||
sys.stderr = old_stderr
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ from ..util import (
|
|||
display,
|
||||
parse_to_list_of_dict,
|
||||
is_subdir,
|
||||
ANSIBLE_LIB_ROOT,
|
||||
generate_pip_command,
|
||||
find_python,
|
||||
)
|
||||
|
@ -33,7 +32,6 @@ from ..util import (
|
|||
from ..util_common import (
|
||||
intercept_command,
|
||||
run_command,
|
||||
write_text_file,
|
||||
ResultType,
|
||||
)
|
||||
|
||||
|
@ -107,40 +105,18 @@ class ImportTest(SanityMultipleVersion):
|
|||
if not args.explain:
|
||||
os.symlink(os.path.abspath(os.path.join(SANITY_ROOT, 'import', 'importer.py')), importer_path)
|
||||
|
||||
# create a minimal python library
|
||||
python_path = os.path.join(temp_root, 'lib')
|
||||
ansible_path = os.path.join(python_path, 'ansible')
|
||||
ansible_init = os.path.join(ansible_path, '__init__.py')
|
||||
ansible_link = os.path.join(ansible_path, 'module_utils')
|
||||
|
||||
if not args.explain:
|
||||
remove_tree(ansible_path)
|
||||
|
||||
write_text_file(ansible_init, '', create_directories=True)
|
||||
|
||||
os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'module_utils'), ansible_link)
|
||||
|
||||
if data_context().content.collection:
|
||||
# inject just enough Ansible code for the collections loader to work on all supported Python versions
|
||||
# the __init__.py files are needed only for Python 2.x
|
||||
# the empty modules directory is required for the collection loader to generate the synthetic packages list
|
||||
|
||||
write_text_file(os.path.join(ansible_path, 'utils/__init__.py'), '', create_directories=True)
|
||||
|
||||
os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'utils', 'collection_loader.py'), os.path.join(ansible_path, 'utils', 'collection_loader.py'))
|
||||
os.symlink(os.path.join(ANSIBLE_LIB_ROOT, 'utils', 'singleton.py'), os.path.join(ansible_path, 'utils', 'singleton.py'))
|
||||
|
||||
write_text_file(os.path.join(ansible_path, 'modules/__init__.py'), '', create_directories=True)
|
||||
|
||||
# activate the virtual environment
|
||||
env['PATH'] = '%s:%s' % (virtual_environment_bin, env['PATH'])
|
||||
env['PYTHONPATH'] = python_path
|
||||
|
||||
env.update(
|
||||
SANITY_IMPORT_DIR=os.path.relpath(temp_root, data_context().content.root) + os.path.sep,
|
||||
SANITY_MINIMAL_DIR=os.path.relpath(virtual_environment_path, data_context().content.root) + os.path.sep,
|
||||
SANITY_TEMP_PATH=ResultType.TMP.path,
|
||||
)
|
||||
|
||||
if data_context().content.collection:
|
||||
env.update(
|
||||
SANITY_COLLECTION_FULL_NAME=data_context().content.collection.full_name,
|
||||
)
|
||||
|
||||
virtualenv_python = os.path.join(virtual_environment_bin, 'python')
|
||||
virtualenv_pip = generate_pip_command(virtualenv_python)
|
||||
|
||||
|
|
Loading…
Reference in a new issue