WIP on the re-implementation of fact caching and various backends.

This commit is contained in:
Josh Drake 2014-07-02 20:02:28 -05:00 committed by Michael DeHaan
parent fb5a1403dd
commit aa419044c4
11 changed files with 466 additions and 48 deletions

59
lib/ansible/cache/__init__.py vendored Normal file
View file

@ -0,0 +1,59 @@
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from collections import MutableMapping
from ansible import utils
from ansible import constants as C
from ansible import errors
class FactCache(MutableMapping):
def __init__(self, *args, **kwargs):
self._plugin = utils.plugins.cache_loader.get(C.CACHE_PLUGIN)
if self._plugin is None:
return
def __getitem__(self, key):
if key not in self:
raise KeyError
return self._plugin.get(key)
def __setitem__(self, key, value):
self._plugin.set(key, value)
def __delitem__(self, key):
self._plugin.delete(key)
def __contains__(self, key):
return self._plugin.contains(key)
def __iter__(self):
return iter(self._plugin.keys())
def __len__(self):
return len(self._plugin.keys())
def copy(self):
"""
Return a primitive copy of the keys and values from the cache.
"""
return dict([(k, v) for (k, v) in self.iteritems()])
def keys(self):
return self._plugin.keys()

15
lib/ansible/cache/base.py vendored Normal file
View file

@ -0,0 +1,15 @@
class BaseCacheModule(object):
def get(self, key):
raise NotImplementedError("Subclasses of {} must implement the '{}' method".format(self.__class__.__name__, self.__name__))
def set(self, key, value):
raise NotImplementedError("Subclasses of {} must implement the '{}' method".format(self.__class__.__name__, self.__name__))
def keys(self):
raise NotImplementedError("Subclasses of {} must implement the '{}' method".format(self.__class__.__name__, self.__name__))
def contains(self, key):
raise NotImplementedError("Subclasses of {} must implement the '{}' method".format(self.__class__.__name__, self.__name__))
def delete(self, key):
raise NotImplementedError("Subclasses of {} must implement the '{}' method".format(self.__class__.__name__, self.__name__))

65
lib/ansible/cache/file.py vendored Normal file
View file

@ -0,0 +1,65 @@
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import collections
import json
import os
import shutil
import sys
import tempfile
from datetime import datetime
from ansible import constants as C
from ansible.cache.memory import CacheModule as MemoryCacheModule
class CacheModule(MemoryCacheModule):
def __init__(self):
super(CacheModule, self).__init__()
self._timeout = int(C.CACHE_PLUGIN_TIMEOUT)
self._filename = '/tmp/ansible_facts.json'
if os.access(self._filename, os.R_OK):
mtime = datetime.fromtimestamp(os.path.getmtime(self._filename))
if self._timeout == 0 or (datetime.now() - mtime).total_seconds() < self._timeout:
with open(self._filename, 'rb') as f:
# we could make assumptions about the MemoryCacheModule here if we wanted
# to be more efficient, but performance isn't the priority with this module
data = json.load(f)
if isinstance(data, collections.Mapping):
for k, v in data.items():
super(CacheModule, self).set(k, v)
def set(self, *args, **kwargs):
super(CacheModule, self).set(*args, **kwargs)
self.flush()
def delete(self, *args, **kwargs):
super(CacheModule, self).delete(*args, **kwargs)
self.flush()
def flush(self):
temp = tempfile.TemporaryFile('r+b')
try:
json.dump(self._cache, temp, separators=(',', ':'))
temp.seek(0)
with open(self._filename, 'w+b') as f:
shutil.copyfileobj(temp, f)
finally:
temp.close()

112
lib/ansible/cache/memcached.py vendored Normal file
View file

@ -0,0 +1,112 @@
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
import collections
import sys
import time
from ansible import constants as C
from ansible.cache.base import BaseCacheModule
try:
import memcache
except ImportError:
print 'python-memcached is required for the memcached fact cache'
sys.exit(1)
class CacheModuleKeys(collections.MutableSet):
"""
A set subclass that keeps track of insertion time and persists
the set in memcached.
"""
PREFIX = 'ansible_cache_keys'
def __init__(self, cache, *args, **kwargs):
self._cache = cache
self._keyset = dict(*args, **kwargs)
def __contains__(self, key):
return key in self._keyset
def __iter__(self):
return iter(self._keyset)
def __len__(self):
return len(self._keyset)
def add(self, key):
self._keyset[key] = time.time()
self._cache.set(self.PREFIX, self._keyset)
def discard(self, key):
del self._keyset[key]
self._cache.set(self.PREFIX, self._keyset)
def remove_by_timerange(self, s_min, s_max):
for k in self._keyset.keys():
t = self._keyset[k]
if s_min < t < s_max:
del self._keyset[k]
self._cache.set(self.PREFIX, self._keyset)
class CacheModule(BaseCacheModule):
def __init__(self, *args, **kwargs):
if C.CACHE_PLUGIN_CONNECTION:
connection = C.CACHE_PLUGIN_CONNECTION.split(',')
else:
connection = ['127.0.0.1:11211']
self._timeout = C.CACHE_PLUGIN_TIMEOUT
self._prefix = C.CACHE_PLUGIN_PREFIX
self._cache = memcache.Client(connection, debug=0)
self._keys = CacheModuleKeys(self._cache, self._cache.get(CacheModuleKeys.PREFIX) or [])
def _make_key(self, key):
return "{}{}".format(self._prefix, key)
def _expire_keys(self):
if self._timeout > 0:
expiry_age = time.time() - self._timeout
self._keys.remove_by_timerange(0, expiry_age)
def get(self, key):
value = self._cache.get(self._make_key(key))
# guard against the key not being removed from the zset;
# this could happen in cases where the timeout value is changed
# between invocations
if value is None:
self.delete(key)
raise KeyError
return value
def set(self, key, value):
self._cache.set(self._make_key(key), value, time=self._timeout)
self._keys.add(key)
def keys(self):
self._expire_keys()
return list(iter(self._keys))
def contains(self, key):
self._expire_keys()
return key in self._keys
def delete(self, key):
self._cache.delete(self._make_key(key))
self._keys.discard(key)

37
lib/ansible/cache/memory.py vendored Normal file
View file

@ -0,0 +1,37 @@
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
class CacheModule(object):
def __init__(self, *args, **kwargs):
self._cache = {}
def get(self, key):
return self._cache.get(key)
def set(self, key, value):
self._cache[key] = value
def keys(self):
return self._cache.keys()
def contains(self, key):
return key in self._cache
def delete(self, key):
del self._cache[key]

108
lib/ansible/cache/redis.py vendored Normal file
View file

@ -0,0 +1,108 @@
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import collections
import pickle
import sys
import time
from ansible import constants as C
from ansible.cache.base import BaseCacheModule
try:
from redis import StrictRedis
except ImportError:
print "The 'redis' Python module is required for the redis fact cache"
sys.exit(1)
class PickledRedis(StrictRedis):
"""
A subclass of StricRedis that uses the pickle module to store and load
representations of the provided values.
"""
def get(self, name):
pickled_value = super(PickledRedis, self).get(name)
if pickled_value is None:
return None
return pickle.loads(pickled_value)
def set(self, name, value, *args, **kwargs):
return super(PickledRedis, self).set(name, pickle.dumps(value), *args, **kwargs)
def setex(self, name, time, value):
return super(PickledRedis, self).setex(name, time, pickle.dumps(value))
class CacheModule(BaseCacheModule):
"""
A caching module backed by redis.
Keys are maintained in a zset with their score being the timestamp
when they are inserted. This allows for the usage of 'zremrangebyscore'
to expire keys. This mechanism is used or a pattern matched 'scan' for
performance.
"""
def __init__(self, *args, **kwargs):
if C.CACHE_PLUGIN_CONNECTION:
connection = C.CACHE_PLUGIN_CONNECTION.split(':')
else:
connection = []
self._timeout = C.CACHE_PLUGIN_TIMEOUT
self._prefix = C.CACHE_PLUGIN_PREFIX
self._cache = PickledRedis(*connection)
self._keys_set = 'ansible_cache_keys'
def _make_key(self, key):
return "{}{}".format(self._prefix, key)
def get(self, key):
value = self._cache.get(self._make_key(key))
# guard against the key not being removed from the zset;
# this could happen in cases where the timeout value is changed
# between invocations
if value is None:
self.delete(key)
raise KeyError
return value
def set(self, key, value):
if self._timeout > 0: # a timeout of 0 is handled as meaning 'never expire'
self._cache.setex(self._make_key(key), self._timeout, value)
else:
self._cache.set(self._make_key(key), value)
self._cache.zadd(self._keys_set, time.time(), key)
def _expire_keys(self):
if self._timeout > 0:
expiry_age = time.time() - self._timeout
self._cache.zremrangebyscore(self._keys_set, 0, expiry_age)
def keys(self):
self._expire_keys()
return self._cache.zrange(self._keys_set, 0, -1)
def contains(self, key):
self._expire_keys()
return (self._cache.zrank(self._keys_set, key) >= 0)
def delete(self, key):
self._cache.delete(self._make_key(key))
self._cache.zrem(self._keys_set, key)

View file

@ -140,6 +140,7 @@ DEFAULT_ASK_SU_PASS = get_config(p, DEFAULTS, 'ask_su_pass', 'ANSIBLE_ASK_
DEFAULT_GATHERING = get_config(p, DEFAULTS, 'gathering', 'ANSIBLE_GATHERING', 'implicit').lower()
DEFAULT_ACTION_PLUGIN_PATH = get_config(p, DEFAULTS, 'action_plugins', 'ANSIBLE_ACTION_PLUGINS', '/usr/share/ansible_plugins/action_plugins')
DEFAULT_CACHE_PLUGIN_PATH = get_config(p, DEFAULTS, 'cache_plugins', 'ANSIBLE_CACHE_PLUGINS', '/usr/share/ansible_plugins/cache_plugins')
DEFAULT_CALLBACK_PLUGIN_PATH = get_config(p, DEFAULTS, 'callback_plugins', 'ANSIBLE_CALLBACK_PLUGINS', '/usr/share/ansible_plugins/callback_plugins')
DEFAULT_CONNECTION_PLUGIN_PATH = get_config(p, DEFAULTS, 'connection_plugins', 'ANSIBLE_CONNECTION_PLUGINS', '/usr/share/ansible_plugins/connection_plugins')
DEFAULT_LOOKUP_PLUGIN_PATH = get_config(p, DEFAULTS, 'lookup_plugins', 'ANSIBLE_LOOKUP_PLUGINS', '/usr/share/ansible_plugins/lookup_plugins')
@ -147,6 +148,11 @@ DEFAULT_VARS_PLUGIN_PATH = get_config(p, DEFAULTS, 'vars_plugins', '
DEFAULT_FILTER_PLUGIN_PATH = get_config(p, DEFAULTS, 'filter_plugins', 'ANSIBLE_FILTER_PLUGINS', '/usr/share/ansible_plugins/filter_plugins')
DEFAULT_LOG_PATH = shell_expand_path(get_config(p, DEFAULTS, 'log_path', 'ANSIBLE_LOG_PATH', ''))
CACHE_PLUGIN = get_config(p, DEFAULTS, 'cache_plugin', 'ANSIBLE_CACHE_PLUGIN', 'memory')
CACHE_PLUGIN_CONNECTION = get_config(p, DEFAULTS, 'cache_plugin_connection', 'ANSIBLE_CACHE_PLUGIN_CONNECTION', None)
CACHE_PLUGIN_PREFIX = get_config(p, DEFAULTS, 'cache_plugin_prefix', 'ANSIBLE_CACHE_PLUGIN_PREFIX', 'ansible_facts')
CACHE_PLUGIN_TIMEOUT = get_config(p, DEFAULTS, 'cache_plugin_timeout', 'ANSIBLE_CACHE_PLUGIN_TIMEOUT', (60 * 60 * 24), integer=True)
ANSIBLE_FORCE_COLOR = get_config(p, DEFAULTS, 'force_color', 'ANSIBLE_FORCE_COLOR', None, boolean=True)
ANSIBLE_NOCOLOR = get_config(p, DEFAULTS, 'nocolor', 'ANSIBLE_NOCOLOR', None, boolean=True)
ANSIBLE_NOCOWS = get_config(p, DEFAULTS, 'nocows', 'ANSIBLE_NOCOWS', None, boolean=True)

View file

@ -22,6 +22,7 @@ from ansible.utils.template import template
from ansible import utils
from ansible import errors
import ansible.callbacks
import ansible.cache
import os
import shlex
import collections
@ -32,9 +33,10 @@ import pipes
# the setup cache stores all variables about a host
# gathered during the setup step, while the vars cache
# holds all other variables about a host
SETUP_CACHE = collections.defaultdict(dict)
SETUP_CACHE = ansible.cache.FactCache()
VARS_CACHE = collections.defaultdict(dict)
class PlayBook(object):
'''
runs an ansible playbook, given as a datastructure or YAML filename.
@ -470,6 +472,13 @@ class PlayBook(object):
contacted = results.get('contacted', {})
self.stats.compute(results, ignore_errors=task.ignore_errors)
def _register_play_vars(host, result):
# when 'register' is used, persist the result in the vars cache
# rather than the setup cache - vars should be transient between playbook executions
if 'stdout' in result and 'stdout_lines' not in result:
result['stdout_lines'] = result['stdout'].splitlines()
utils.update_hash(self.VARS_CACHE, host, {task.register: result})
# add facts to the global setup cache
for host, result in contacted.iteritems():
if 'results' in result:
@ -478,22 +487,19 @@ class PlayBook(object):
for res in result['results']:
if type(res) == dict:
facts = res.get('ansible_facts', {})
self.SETUP_CACHE[host].update(facts)
utils.update_hash(self.SETUP_CACHE, host, facts)
else:
# when facts are returned, persist them in the setup cache
facts = result.get('ansible_facts', {})
self.SETUP_CACHE[host].update(facts)
utils.update_hash(self.SETUP_CACHE, host, facts)
if task.register:
if 'stdout' in result and 'stdout_lines' not in result:
result['stdout_lines'] = result['stdout'].splitlines()
self.SETUP_CACHE[host][task.register] = result
_register_play_vars(host, result)
# also have to register some failed, but ignored, tasks
if task.ignore_errors and task.register:
failed = results.get('failed', {})
for host, result in failed.iteritems():
if 'stdout' in result and 'stdout_lines' not in result:
result['stdout_lines'] = result['stdout'].splitlines()
self.SETUP_CACHE[host][task.register] = result
_register_play_vars(host, result)
# flag which notify handlers need to be run
if len(task.notify) > 0:
@ -585,8 +591,8 @@ class PlayBook(object):
# let runner template out future commands
setup_ok = setup_results.get('contacted', {})
for (host, result) in setup_ok.iteritems():
self.SETUP_CACHE[host].update({'module_setup': True})
self.SETUP_CACHE[host].update(result.get('ansible_facts', {}))
utils.update_hash(self.SETUP_CACHE, host, {'module_setup': True})
utils.update_hash(self.SETUP_CACHE, host, result.get('ansible_facts', {}))
return setup_results
# *****************************************************

View file

@ -29,6 +29,7 @@ import os
import sys
import uuid
class Play(object):
__slots__ = [
@ -153,6 +154,7 @@ class Play(object):
self._tasks = self._load_tasks(self._ds.get('tasks', []), load_vars)
self._handlers = self._load_tasks(self._ds.get('handlers', []), load_vars)
# apply any missing tags to role tasks
self._late_merge_role_tags()
@ -854,4 +856,4 @@ class Play(object):
# finally, update the VARS_CACHE for the host, if it is set
if host is not None:
self.playbook.VARS_CACHE[host].update(self.playbook.extra_vars)
self.playbook.VARS_CACHE.setdefault(host, {}).update(self.playbook.extra_vars)

View file

@ -201,6 +201,13 @@ action_loader = PluginLoader(
'action_plugins'
)
cache_loader = PluginLoader(
'CacheModule',
'ansible.cache',
C.DEFAULT_CACHE_PLUGIN_PATH,
'cache_plugins'
)
callback_loader = PluginLoader(
'CallbackModule',
'ansible.callback_plugins',

View file

@ -38,6 +38,7 @@ setup(name='ansible',
package_dir={ 'ansible': 'lib/ansible' },
packages=[
'ansible',
'ansible.cache',
'ansible.utils',
'ansible.utils.module_docs_fragments',
'ansible.inventory',