Restructure output to prevent rare cases of interlaced I/O in multiprocessing paths.

This commit is contained in:
Michael DeHaan 2013-04-27 09:46:48 -04:00
parent 6351b74612
commit 515fbd5a17

View file

@ -22,32 +22,55 @@ import os
import subprocess
import random
import fnmatch
import tempfile
import fcntl
from ansible.color import stringc
cowsay = None
if os.getenv("ANSIBLE_NOCOWS") is not None:
cowsay = None
elif os.path.exists("/usr/bin/cowsay"):
cowsay = "/usr/bin/cowsay"
elif os.path.exists("/usr/games/cowsay"):
cowsay = "/usr/games/cowsay"
elif os.path.exists("/usr/local/bin/cowsay"):
# BSD path for cowsay
cowsay = "/usr/local/bin/cowsay"
elif os.path.exists("/opt/local/bin/cowsay"):
# MacPorts path for cowsay
cowsay = "/opt/local/bin/cowsay"
noncow = os.getenv("ANSIBLE_COW_SELECTION",None)
if cowsay and noncow == 'random':
cmd = subprocess.Popen([cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = cmd.communicate()
cows = out.split()
cows.append(False)
noncow = random.choice(cows)
callback_plugins = [x for x in utils.plugins.callback_loader.all()]
def get_cowsay_info():
cowsay = None
if os.getenv("ANSIBLE_NOCOWS") is not None:
cowsay = None
elif os.path.exists("/usr/bin/cowsay"):
cowsay = "/usr/bin/cowsay"
elif os.path.exists("/usr/games/cowsay"):
cowsay = "/usr/games/cowsay"
elif os.path.exists("/usr/local/bin/cowsay"):
# BSD path for cowsay
cowsay = "/usr/local/bin/cowsay"
elif os.path.exists("/opt/local/bin/cowsay"):
# MacPorts path for cowsay
cowsay = "/opt/local/bin/cowsay"
noncow = os.getenv("ANSIBLE_COW_SELECTION",None)
if cowsay and noncow == 'random':
cmd = subprocess.Popen([cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = cmd.communicate()
cows = out.split()
cows.append(False)
noncow = random.choice(cows)
return (cowsay, noncow)
cowsay, noncow = get_cowsay_info()
def log_lockfile():
tempdir = tempfile.gettempdir()
uid = os.getuid()
path = os.path.join(tempdir, ".ansible-lock.%s" % uid)
if not os.path.exists(path):
fh = open(path, 'w')
fh.close()
return path
LOG_LOCK = open(log_lockfile(), 'r')
def log_flock():
fcntl.flock(LOG_LOCK, fcntl.LOCK_EX)
def log_unflock():
fcntl.flock(LOG_LOCK, fcntl.LOCK_UN)
def set_play(callback, play):
''' used to notify callback plugins of context '''
callback.play = play
@ -60,6 +83,17 @@ def set_task(callback, task):
for callback_plugin in callback_plugins:
callback_plugin.task = task
def display(msg, color=None, stderr=False):
# prevent a very rare case of interlaced multiprocess I/O
log_flock()
msg2 = msg
if color:
msg2 = stringc(msg, color)
if not stderr:
print msg2
else:
print >>sys.stderr, msg2
log_unflock()
def call_callback_module(method_name, *args, **kwargs):
@ -81,9 +115,9 @@ def vvv(msg, host=None):
def verbose(msg, host=None, caplevel=2):
if utils.VERBOSITY > caplevel:
if host is None:
print stringc(msg, 'blue')
display(msg, color='blue')
else:
print stringc("<%s> %s" % (host, msg), 'blue')
display("<%s> %s" % (host, msg), color='blue')
class AggregateStats(object):
''' holds stats about per-host activity during playbook runs '''
@ -195,17 +229,17 @@ def host_report_msg(hostname, module_name, result, oneline):
''' summarize the JSON results for a particular host '''
failed = utils.is_failed(result)
msg = ''
msg = ('', None)
if module_name in [ 'command', 'shell', 'raw' ] and 'ansible_job_id' not in result and result.get('parsed',True) != False:
if not failed:
msg = command_generic_msg(hostname, result, oneline, 'success')
msg = (command_generic_msg(hostname, result, oneline, 'success'), 'green')
else:
msg = command_generic_msg(hostname, result, oneline, 'FAILED')
msg = (command_generic_msg(hostname, result, oneline, 'FAILED'), 'red')
else:
if not failed:
msg = regular_generic_msg(hostname, result, oneline, 'success')
msg = (regular_generic_msg(hostname, result, oneline, 'success'), 'green')
else:
msg = regular_generic_msg(hostname, result, oneline, 'FAILED')
msg = (regular_generic_msg(hostname, result, oneline, 'FAILED'), 'red')
return msg
###############################################
@ -267,7 +301,7 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks):
def on_unreachable(self, host, res):
if type(res) == dict:
res = res.get('msg','')
print "%s | FAILED => %s" % (host, res)
display("%s | FAILED => %s" % (host, res))
if self.options.tree:
utils.write_tree_file(
self.options.tree, host,
@ -276,15 +310,15 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks):
super(CliRunnerCallbacks, self).on_unreachable(host, res)
def on_skipped(self, host, item=None):
print "%s | skipped" % (host)
display("%s | skipped" % (host))
super(CliRunnerCallbacks, self).on_skipped(host, item)
def on_error(self, host, err):
print >>sys.stderr, "err: [%s] => %s\n" % (host, err)
display("err: [%s] => %s\n" % (host, err), stderr=True)
super(CliRunnerCallbacks, self).on_error(host, err)
def on_no_hosts(self):
print >>sys.stderr, "no hosts matched\n"
display("no hosts matched\n", stderr=True)
super(CliRunnerCallbacks, self).on_no_hosts()
def on_async_poll(self, host, res, jid, clock):
@ -292,26 +326,27 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks):
self._async_notified[jid] = clock + 1
if self._async_notified[jid] > clock:
self._async_notified[jid] = clock
print "<job %s> polling, %ss remaining"%(jid, clock)
display("<job %s> polling, %ss remaining" % (jid, clock))
super(CliRunnerCallbacks, self).on_async_poll(host, res, jid, clock)
def on_async_ok(self, host, res, jid):
print "<job %s> finished on %s => %s"%(jid, host, utils.jsonify(res,format=True))
display("<job %s> finished on %s => %s"%(jid, host, utils.jsonify(res,format=True)))
super(CliRunnerCallbacks, self).on_async_ok(host, res, jid)
def on_async_failed(self, host, res, jid):
print "<job %s> FAILED on %s => %s"%(jid, host, utils.jsonify(res,format=True))
display("<job %s> FAILED on %s => %s"%(jid, host, utils.jsonify(res,format=True)))
super(CliRunnerCallbacks, self).on_async_failed(host,res,jid)
def _on_any(self, host, result):
result2 = result.copy()
result2.pop('invocation', None)
print host_report_msg(host, self.options.module_name, result2, self.options.one_line)
(msg, color) = host_report_msg(host, self.options.module_name, result2, self.options.one_line)
display(msg, color=color)
if self.options.tree:
utils.write_tree_file(self.options.tree, host, utils.jsonify(result2,format=True))
def on_file_diff(self, host, diff):
print utils.get_diff(diff)
display(utils.get_diff(diff))
super(CliRunnerCallbacks, self).on_file_diff(host, diff)
########################################################################
@ -332,7 +367,7 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
msg = "fatal: [%s] => (item=%s) => %s" % (host, item, results)
else:
msg = "fatal: [%s] => %s" % (host, results)
print stringc(msg, 'red')
display(msg, color='red')
super(PlaybookRunnerCallbacks, self).on_unreachable(host, results)
def on_failed(self, host, results, ignore_errors=False):
@ -353,18 +388,18 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
msg = "failed: [%s] => (item=%s) => %s" % (host, item, utils.jsonify(results2))
else:
msg = "failed: [%s] => %s" % (host, utils.jsonify(results2))
print stringc(msg, 'red')
display(msg, color='red')
if stderr:
print stringc("stderr: %s" % stderr, 'red')
display("stderr: %s" % stderr, color='red')
if stdout:
print stringc("stdout: %s" % stdout, 'red')
display("stdout: %s" % stdout, color='red')
if returned_msg:
print stringc("msg: %s" % returned_msg, 'red')
display("msg: %s" % returned_msg, color='red')
if not parsed and module_msg:
print stringc("invalid output was: %s" % module_msg, 'red')
display("invalid output was: %s" % module_msg, color='red')
if ignore_errors:
print stringc("...ignoring", 'cyan')
display("...ignoring", color='cyan')
super(PlaybookRunnerCallbacks, self).on_failed(host, results, ignore_errors=ignore_errors)
def on_ok(self, host, host_result):
@ -395,9 +430,9 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
if msg != '':
if not changed:
print stringc(msg, 'green')
display(msg, color='green')
else:
print stringc(msg, 'yellow')
display(msg, color='yellow')
super(PlaybookRunnerCallbacks, self).on_ok(host, host_result)
def on_error(self, host, err):
@ -409,8 +444,7 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
else:
msg = "err: [%s] => %s" % (host, err)
msg = stringc(msg, 'red')
print >>sys.stderr, msg
display(msg, color='red', stderr=True)
super(PlaybookRunnerCallbacks, self).on_error(host, err)
def on_skipped(self, host, item=None):
@ -419,11 +453,11 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
msg = "skipping: [%s] => (item=%s)" % (host, item)
else:
msg = "skipping: [%s]" % host
print stringc(msg, 'cyan')
display(msg, color='cyan')
super(PlaybookRunnerCallbacks, self).on_skipped(host, item)
def on_no_hosts(self):
print stringc("FATAL: no hosts matched or all hosts have already failed -- aborting\n", 'red')
display("FATAL: no hosts matched or all hosts have already failed -- aborting\n", color='red')
super(PlaybookRunnerCallbacks, self).on_no_hosts()
def on_async_poll(self, host, res, jid, clock):
@ -432,21 +466,21 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
if self._async_notified[jid] > clock:
self._async_notified[jid] = clock
msg = "<job %s> polling, %ss remaining"%(jid, clock)
print stringc(msg, 'cyan')
display(msg, color='cyan')
super(PlaybookRunnerCallbacks, self).on_async_poll(host,res,jid,clock)
def on_async_ok(self, host, res, jid):
msg = "<job %s> finished on %s"%(jid, host)
print stringc(msg, 'cyan')
display(msg, color='cyan')
super(PlaybookRunnerCallbacks, self).on_async_ok(host, res, jid)
def on_async_failed(self, host, res, jid):
msg = "<job %s> FAILED on %s"%(jid, host)
print stringc(msg, 'red')
display(msg, color='red')
super(PlaybookRunnerCallbacks, self).on_async_failed(host,res,jid)
def on_file_diff(self, host, diff):
print utils.get_diff(diff)
display(utils.get_diff(diff))
super(PlaybookRunnerCallbacks, self).on_file_diff(host, diff)
########################################################################
@ -465,11 +499,11 @@ class PlaybookCallbacks(object):
call_callback_module('playbook_on_notify', host, handler)
def on_no_hosts_matched(self):
print stringc("skipping: no hosts matched", 'cyan')
display("skipping: no hosts matched", color='cyan')
call_callback_module('playbook_on_no_hosts_matched')
def on_no_hosts_remaining(self):
print stringc("\nFATAL: all hosts have already failed -- aborting", 'red')
display("\nFATAL: all hosts have already failed -- aborting", color='red')
call_callback_module('playbook_on_no_hosts_remaining')
def on_task_start(self, name, is_conditional):
@ -488,15 +522,15 @@ class PlaybookCallbacks(object):
resp = raw_input('Perform task: %s (y/n/c): ' % name)
if resp.lower() in ['y','yes']:
self.skip_task = False
print banner(msg)
display(banner(msg))
elif resp.lower() in ['c', 'continue']:
self.skip_task = False
self.step = False
print banner(msg)
display(banner(msg))
else:
self.skip_task = True
else:
print banner(msg)
display(banner(msg))
call_callback_module('playbook_on_task_start', name, is_conditional)
@ -519,7 +553,7 @@ class PlaybookCallbacks(object):
second = prompt("confirm " + msg, private)
if result == second:
break
print "***** VALUES ENTERED DO NOT MATCH ****"
display("***** VALUES ENTERED DO NOT MATCH ****")
else:
result = prompt(msg, private)
@ -538,21 +572,21 @@ class PlaybookCallbacks(object):
return result
def on_setup(self):
print banner("GATHERING FACTS")
display(banner("GATHERING FACTS"))
call_callback_module('playbook_on_setup')
def on_import_for_host(self, host, imported_file):
msg = "%s: importing %s" % (host, imported_file)
print stringc(msg, 'cyan')
display(msg, color='cyan')
call_callback_module('playbook_on_import_for_host', host, imported_file)
def on_not_import_for_host(self, host, missing_file):
msg = "%s: not importing file: %s" % (host, missing_file)
print stringc(msg, 'cyan')
display(msg, color='cyan')
call_callback_module('playbook_on_not_import_for_host', host, missing_file)
def on_play_start(self, pattern):
print banner("PLAY [%s]" % pattern)
display(banner("PLAY [%s]" % pattern))
call_callback_module('playbook_on_play_start', pattern)
def on_stats(self, stats):