#!/usr/bin/env python # (c) 2012, Michael DeHaan <michael.dehaan@gmail.com> # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. # # this script is for testing modules without running through the # entire guts of ansible, and is very helpful for when developing # modules # # example: # ./hacking/test-module -m lib/ansible/modules/commands/command.py -a "/bin/sleep 3" # ./hacking/test-module -m lib/ansible/modules/commands/command.py -a "/bin/sleep 3" --debugger /usr/bin/pdb # ./hacking/test-module -m lib/ansible/modules/files/lineinfile.py -a "dest=/etc/exports line='/srv/home hostname1(rw,sync)'" --check # ./hacking/test-module -m lib/ansible/modules/commands/command.py -a "echo hello" -n -o "test_hello" import optparse import os import subprocess import sys import traceback import shutil import ansible.utils.vars as utils_vars from ansible.parsing.dataloader import DataLoader from ansible.parsing.utils.jsonify import jsonify from ansible.parsing.splitter import parse_kv import ansible.executor.module_common as module_common import ansible.constants as C from ansible.module_utils._text import to_native, to_text try: import json except ImportError: import simplejson as json def parse(): """parse command line :return : (options, args)""" parser = optparse.OptionParser() parser.usage = "%prog -[options] (-h for help)" parser.add_option('-m', '--module-path', dest='module_path', help="REQUIRED: full path of module source to execute") parser.add_option('-a', '--args', dest='module_args', default="", help="module argument string") parser.add_option('-D', '--debugger', dest='debugger', help="path to python debugger (e.g. /usr/bin/pdb)") parser.add_option('-I', '--interpreter', dest='interpreter', help="path to interpreter to use for this module (e.g. ansible_python_interpreter=/usr/bin/python)", metavar='INTERPRETER_TYPE=INTERPRETER_PATH') parser.add_option('-c', '--check', dest='check', action='store_true', help="run the module in check mode") parser.add_option('-n', '--noexecute', dest='execute', action='store_false', default=True, help="do not run the resulting module") parser.add_option('-o', '--output', dest='filename', help="Filename for resulting module", default="~/.ansible_module_generated") options, args = parser.parse_args() if not options.module_path: parser.print_help() sys.exit(1) else: return options, args def write_argsfile(argstring, json=False): """ Write args to a file for old-style module's use. """ argspath = os.path.expanduser("~/.ansible_test_module_arguments") argsfile = open(argspath, 'w') if json: args = parse_kv(argstring) argstring = jsonify(args) argsfile.write(argstring) argsfile.close() return argspath def get_interpreters(interpreter): result = dict() if interpreter: if '=' not in interpreter: print("interpreter must by in the form of ansible_python_interpreter=/usr/bin/python") sys.exit(1) interpreter_type, interpreter_path = interpreter.split('=') if not interpreter_type.startswith('ansible_'): interpreter_type = 'ansible_%s' % interpreter_type if not interpreter_type.endswith('_interpreter'): interpreter_type = '%s_interpreter' % interpreter_type result[interpreter_type] = interpreter_path return result def boilerplate_module(modfile, args, interpreters, check, destfile): """ simulate what ansible does with new style modules """ # module_fh = open(modfile) # module_data = module_fh.read() # module_fh.close() # replacer = module_common.ModuleReplacer() loader = DataLoader() # included_boilerplate = module_data.find(module_common.REPLACER) != -1 or module_data.find("import ansible.module_utils") != -1 complex_args = {} # default selinux fs list is pass in as _ansible_selinux_special_fs arg complex_args['_ansible_selinux_special_fs'] = C.DEFAULT_SELINUX_SPECIAL_FS if args.startswith("@"): # Argument is a YAML file (JSON is a subset of YAML) complex_args = utils_vars.combine_vars(complex_args, loader.load_from_file(args[1:])) args='' elif args.startswith("{"): # Argument is a YAML document (not a file) complex_args = utils_vars.combine_vars(complex_args, loader.load(args)) args='' if args: parsed_args = parse_kv(args) complex_args = utils_vars.combine_vars(complex_args, parsed_args) task_vars = interpreters if check: complex_args['_ansible_check_mode'] = True modname = os.path.basename(modfile) modname = os.path.splitext(modname)[0] (module_data, module_style, shebang) = module_common.modify_module( modname, modfile, complex_args, task_vars=task_vars ) if module_style == 'new' and 'ANSIBALLZ_WRAPPER = True' in to_native(module_data): module_style = 'ansiballz' modfile2_path = os.path.expanduser(destfile) print("* including generated source, if any, saving to: %s" % modfile2_path) if module_style not in ('ansiballz', 'old'): print("* this may offset any line numbers in tracebacks/debuggers!") modfile2 = open(modfile2_path, 'wb') modfile2.write(module_data) modfile2.close() modfile = modfile2_path return (modfile2_path, modname, module_style) def ansiballz_setup(modfile, modname, interpreters): os.system("chmod +x %s" % modfile) if 'ansible_python_interpreter' in interpreters: command = [interpreters['ansible_python_interpreter']] else: command = [] command.extend([modfile, 'explode']) cmd = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = cmd.communicate() out, err = to_text(out, errors='surrogate_or_strict'), to_text(err) lines = out.splitlines() if len(lines) != 2 or 'Module expanded into' not in lines[0]: print("*" * 35) print("INVALID OUTPUT FROM ANSIBALLZ MODULE WRAPPER") print(out) sys.exit(err) debug_dir = lines[1].strip() argsfile = os.path.join(debug_dir, 'args') modfile = os.path.join(debug_dir, 'ansible_module_%s.py' % modname) print("* ansiballz module detected; extracted module source to: %s" % debug_dir) return modfile, argsfile def runtest(modfile, argspath, modname, module_style, interpreters): """Test run a module, piping it's output for reporting.""" invoke = "" if module_style == 'ansiballz': modfile, argspath = ansiballz_setup(modfile, modname, interpreters) if 'ansible_python_interpreter' in interpreters: invoke = "%s " % interpreters['ansible_python_interpreter'] os.system("chmod +x %s" % modfile) invoke = "%s%s" % (invoke, modfile) if argspath is not None: invoke = "%s %s" % (invoke, argspath) cmd = subprocess.Popen(invoke, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (out, err) = cmd.communicate() out, err = to_text(out), to_text(err) try: print("*" * 35) print("RAW OUTPUT") print(out) print(err) results = json.loads(out) except: print("*" * 35) print("INVALID OUTPUT FORMAT") print(out) traceback.print_exc() sys.exit(1) print("*" * 35) print("PARSED OUTPUT") print(jsonify(results,format=True)) def rundebug(debugger, modfile, argspath, modname, module_style, interpreters): """Run interactively with console debugger.""" if module_style == 'ansiballz': modfile, argspath = ansiballz_setup(modfile, modname, interpreters) if argspath is not None: subprocess.call("%s %s %s" % (debugger, modfile, argspath), shell=True) else: subprocess.call("%s %s" % (debugger, modfile), shell=True) def main(): options, args = parse() interpreters = get_interpreters(options.interpreter) (modfile, modname, module_style) = boilerplate_module(options.module_path, options.module_args, interpreters, options.check, options.filename) argspath = None if module_style not in ('new', 'ansiballz'): if module_style in ('non_native_want_json', 'binary'): argspath = write_argsfile(options.module_args, json=True) elif module_style == 'old': argspath = write_argsfile(options.module_args, json=False) else: raise Exception("internal error, unexpected module style: %s" % module_style) if options.execute: if options.debugger: rundebug(options.debugger, modfile, argspath, modname, module_style, interpreters) else: runtest(modfile, argspath, modname, module_style, interpreters) if __name__ == "__main__": try: main() finally: shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)