Add more verbose debugging options for accelerate

This commit is contained in:
James Cammarata 2013-09-30 14:08:07 -05:00
parent 9b5fb9ad6b
commit 52a42bf607
2 changed files with 59 additions and 19 deletions

View file

@ -21,7 +21,7 @@ import base64
import socket
import struct
import time
from ansible.callbacks import vvv
from ansible.callbacks import vvv, vvvv
from ansible.runner.connection_plugins.ssh import Connection as SSHConnection
from ansible.runner.connection_plugins.paramiko_ssh import Connection as ParamikoConnection
from ansible import utils
@ -84,12 +84,13 @@ class Connection(object):
utils.AES_KEYS = self.runner.aes_keys
def _execute_accelerate_module(self):
args = "password=%s port=%s" % (base64.b64encode(self.key.__str__()), str(self.accport))
args = "password=%s port=%s debug=%d" % (base64.b64encode(self.key.__str__()), str(self.accport), int(utils.VERBOSITY))
inject = dict(password=self.key)
if self.runner.accelerate_inventory_host:
inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.runner.accelerate_inventory_host))
else:
inject = utils.combine_vars(inject, self.runner.inventory.get_variables(self.host))
vvvv("attempting to start up the accelerate daemon...")
self.ssh.connect()
tmp_path = self.runner._make_tmp_path(self.ssh)
return self.runner._execute_module(self.ssh, tmp_path, 'accelerate', args, inject=inject)
@ -103,11 +104,13 @@ class Connection(object):
tries = 3
self.conn = socket.socket()
self.conn.settimeout(300.0)
vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport))
while tries > 0:
try:
self.conn.connect((self.host,self.accport))
break
except:
vvvv("failed, retrying...")
time.sleep(0.1)
tries -= 1
if tries == 0:
@ -133,18 +136,24 @@ class Connection(object):
header_len = 8 # size of a packed unsigned long long
data = b""
try:
vvvv("%s: in recv_data(), waiting for the header" % self.host)
while len(data) < header_len:
d = self.conn.recv(1024)
if not d:
vvvv("%s: received nothing, bailing out" % self.host)
return None
data += d
vvvv("%s: got the header, unpacking" % self.host)
data_len = struct.unpack('Q',data[:header_len])[0]
data = data[header_len:]
vvvv("%s: data received so far (expecting %d): %d" % (self.host,data_len,len(data)))
while len(data) < data_len:
d = self.conn.recv(1024)
if not d:
vvvv("%s: received nothing, bailing out" % self.host)
return None
data += d
vvvv("%s: received all of the data, returning" % self.host)
return data
except socket.timeout:
raise errors.AnsibleError("timed out while waiting to receive data")

View file

@ -85,8 +85,22 @@ PIDFILE = os.path.expanduser("~/.accelerate.pid")
# which leaves room for the TCP/IP header
CHUNK_SIZE=10240
def log(msg):
syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg)
# FIXME: this all should be moved to module_common, as it's
# pretty much a copy from the callbacks/util code
DEBUG_LEVEL=0
def log(msg, cap=0):
global DEBUG_LEVEL
if cap >= DEBUG_LEVEL:
syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg)
def vv(msg):
log(msg, cap=2)
def vvv(msg):
log(msg, cap=3)
def vvvv(msg):
log(msg, cap=4)
if os.path.exists(PIDFILE):
try:
@ -114,7 +128,7 @@ def daemonize_self(module, password, port, minutes):
try:
pid = os.fork()
if pid > 0:
log("exiting pid %s" % pid)
vvv("exiting pid %s" % pid)
# exit first parent
module.exit_json(msg="daemonized accelerate on port %s for %s minutes" % (port, minutes))
except OSError, e:
@ -134,7 +148,7 @@ def daemonize_self(module, password, port, minutes):
pid_file = open(PIDFILE, "w")
pid_file.write("%s" % pid)
pid_file.close()
log("pidfile written")
vvv("pidfile written")
sys.exit(0)
except OSError, e:
log("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
@ -162,52 +176,64 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
def recv_data(self):
header_len = 8 # size of a packed unsigned long long
data = b""
vvvv("in recv_data(), waiting for the header")
while len(data) < header_len:
d = self.request.recv(1024)
if not d:
vvv("received nothing, bailing out")
return None
data += d
vvvv("in recv_data(), got the header, unpacking")
data_len = struct.unpack('Q',data[:header_len])[0]
data = data[header_len:]
vvvv("data received so far (expecting %d): %d" % (data_len,len(data)))
while len(data) < data_len:
d = self.request.recv(1024)
if not d:
vvv("received nothing, bailing out")
return None
data += d
vvvv("received all of the data, returning")
return data
def handle(self):
while True:
#log("waiting for data")
vvvv("waiting for data")
data = self.recv_data()
if not data:
vvvv("received nothing back from recv_data(), breaking out")
break
try:
#log("got data, decrypting")
vvvv("got data, decrypting")
data = self.server.key.Decrypt(data)
#log("decryption done")
vvvv("decryption done")
except:
log("bad decrypt, skipping...")
vv("bad decrypt, skipping...")
data2 = json.dumps(dict(rc=1))
data2 = self.server.key.Encrypt(data2)
send_data(client, data2)
return
#log("loading json from the data")
vvvv("loading json from the data")
data = json.loads(data)
mode = data['mode']
response = {}
if mode == 'command':
vvvv("received a command request, running it")
response = self.command(data)
elif mode == 'put':
vvvv("received a put request, putting it")
response = self.put(data)
elif mode == 'fetch':
vvvv("received a fetch request, getting it")
response = self.fetch(data)
data2 = json.dumps(response)
data2 = self.server.key.Encrypt(data2)
vvvv("sending the response back to the controller")
self.send_data(data2)
vvvv("done sending the response")
def command(self, data):
if 'cmd' not in data:
@ -217,14 +243,14 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
if 'executable' not in data:
return dict(failed=True, msg='internal error: executable is required')
#log("executing: %s" % data['cmd'])
vvvv("executing: %s" % data['cmd'])
rc, stdout, stderr = self.server.module.run_command(data['cmd'], executable=data['executable'], close_fds=True)
if stdout is None:
stdout = ''
if stderr is None:
stderr = ''
#log("got stdout: %s" % stdout)
#log("got stderr: %s" % stderr)
vvvv("got stdout: %s" % stdout)
vvvv("got stderr: %s" % stderr)
return dict(rc=rc, stdout=stdout, stderr=stderr)
@ -235,7 +261,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
try:
fd = file(data['in_path'], 'rb')
fstat = os.stat(data['in_path'])
log("FETCH file is %d bytes" % fstat.st_size)
vvv("FETCH file is %d bytes" % fstat.st_size)
while fd.tell() < fstat.st_size:
data = fd.read(CHUNK_SIZE)
last = False
@ -276,7 +302,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
final_path = None
final_user = None
if 'user' in data and data.get('user') != getpass.getuser():
log("the target user doesn't match this user, we'll move the file into place via sudo")
vv("the target user doesn't match this user, we'll move the file into place via sudo")
(fd,out_path) = tempfile.mkstemp(prefix='ansible.', dir=os.path.expanduser('~/.ansible/tmp/'))
out_fd = os.fdopen(fd, 'w', 0)
final_path = data['out_path']
@ -306,11 +332,11 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
log("failed to put the file: %s" % tb)
return dict(failed=True, stdout="Could not write the file")
finally:
#log("wrote %d bytes" % bytes)
vvvv("wrote %d bytes" % bytes)
out_fd.close()
if final_path:
log("moving %s to %s" % (out_path, final_path))
vvv("moving %s to %s" % (out_path, final_path))
args = ['sudo','cp',out_path,final_path]
rc, stdout, stderr = self.server.module.run_command(args, close_fds=True)
if rc != 0:
@ -334,7 +360,7 @@ def daemonize(module, password, port, minutes):
server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password)
server.allow_reuse_address = True
log("serving!")
vv("serving!")
server.serve_forever(poll_interval=1.0)
except Exception, e:
tb = traceback.format_exc()
@ -342,11 +368,13 @@ def daemonize(module, password, port, minutes):
sys.exit(0)
def main():
global DEBUG_LEVEL
module = AnsibleModule(
argument_spec = dict(
port=dict(required=False, default=5099),
password=dict(required=True),
minutes=dict(required=False, default=30),
debug=dict(required=False, default=0, type='int')
),
supports_check_mode=True
)
@ -354,10 +382,13 @@ def main():
password = base64.b64decode(module.params['password'])
port = int(module.params['port'])
minutes = int(module.params['minutes'])
debug = int(module.params['debug'])
if not HAS_KEYCZAR:
module.fail_json(msg="keyczar is not installed")
DEBUG_LEVEL=debug
daemonize(module, password, port, minutes)