diff --git a/lib/ansible/cli/__init__.py b/lib/ansible/cli/__init__.py index 2844b4d8ff..353186f86a 100644 --- a/lib/ansible/cli/__init__.py +++ b/lib/ansible/cli/__init__.py @@ -227,7 +227,7 @@ class CLI(with_metaclass(ABCMeta, object)): vault_password_files, ask_vault_pass) - for index, vault_id_slug in enumerate(vault_ids): + for vault_id_slug in vault_ids: vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug) if vault_id_value in ['prompt', 'prompt_ask_vault_pass']: diff --git a/lib/ansible/cli/vault.py b/lib/ansible/cli/vault.py index 1f28cfb8ce..3ffad1a8a1 100644 --- a/lib/ansible/cli/vault.py +++ b/lib/ansible/cli/vault.py @@ -211,10 +211,9 @@ class VaultCLI(CLI): self.new_encrypt_secret = new_encrypt_secret[1] loader.set_vault_secrets(vault_secrets) - self.secrets = vault_secrets # FIXME: do we need to create VaultEditor here? its not reused - vault = VaultLib(self.secrets) + vault = VaultLib(vault_secrets) self.editor = VaultEditor(vault) self.execute() diff --git a/lib/ansible/parsing/vault/__init__.py b/lib/ansible/parsing/vault/__init__.py index 8e8af48e07..9040cbf633 100644 --- a/lib/ansible/parsing/vault/__init__.py +++ b/lib/ansible/parsing/vault/__init__.py @@ -32,7 +32,6 @@ from binascii import unhexlify from hashlib import md5 from hashlib import sha256 from io import BytesIO -from subprocess import call HAS_CRYPTOGRAPHY = False HAS_PYCRYPTO = False @@ -75,6 +74,7 @@ except ImportError: from ansible.errors import AnsibleError from ansible import constants as C from ansible.module_utils.six import PY3, binary_type +# Note: on py2, this zip is izip not the list based zip() builtin from ansible.module_utils.six.moves import zip from ansible.module_utils._text import to_bytes, to_text @@ -240,7 +240,7 @@ class PromptVaultSecret(VaultSecret): default_prompt_formats = ["Vault password (%s): "] def __init__(self, _bytes=None, vault_id=None, prompt_formats=None): - self._bytes = _bytes + super(PromptVaultSecret, self).__init__(_bytes=_bytes) self.vault_id = vault_id if prompt_formats is None: @@ -293,8 +293,8 @@ def get_file_vault_secret(filename=None, vault_id_name=None, encoding=None, load if loader.is_executable(this_path): # TODO: pass vault_id_name to script via cli return ScriptVaultSecret(filename=this_path, encoding=encoding, loader=loader) - else: - return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader) + + return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader) # TODO: mv these classes to a seperate file so we don't pollute vault with 'subprocess' etc @@ -319,15 +319,15 @@ class FileVaultSecret(VaultSecret): return None def load(self): - self._bytes = self.read_file(self.filename, self.loader) + self._bytes = self._read_file(self.filename) - @staticmethod - def read_file(filename, loader): + def _read_file(self, filename): """ Read a vault password from a file or if executable, execute the script and retrieve password from STDOUT """ + # TODO: replace with use of self.loader try: f = open(filename, "rb") vault_pass = f.read().strip() @@ -344,23 +344,21 @@ class FileVaultSecret(VaultSecret): class ScriptVaultSecret(FileVaultSecret): - - @staticmethod - def read_file(filename, loader): - if not loader.is_executable(filename): + def _read_file(self, filename): + if not self.loader.is_executable(filename): raise AnsibleVaultError("The vault password script %s was not executable" % filename) try: # STDERR not captured to make it easier for users to prompt for input in their scripts p = subprocess.Popen(filename, stdout=subprocess.PIPE) except OSError as e: - msg_format = "Problem running vault password script %s (%s)." - "If this is not a script, remove the executable bit from the file." - msg = msg_format % (' '.join(filename), e) + msg_format = "Problem running vault password script %s (%s)." \ + " If this is not a script, remove the executable bit from the file." + msg = msg_format % (filename, e) raise AnsibleError(msg) - stdout, stderr = p.communicate() + stdout, dummy = p.communicate() if p.returncode != 0: raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr)) @@ -394,7 +392,7 @@ def match_encrypt_secret(secrets): '''Find the best/first/only secret in secrets to use for encrypting''' # ie, consider all of the available secrets as matches - _vault_id_matchers = [_vault_id for _vault_id, _vault_secret in secrets] + _vault_id_matchers = [_vault_id for _vault_id, dummy in secrets] best_secret = match_best_secret(secrets, _vault_id_matchers) # can be empty list sans any tuple return best_secret @@ -441,7 +439,7 @@ class VaultLib: if secret is None: if self.secrets: - secret_vault_id, secret = match_encrypt_secret(self.secrets) + dummy, secret = match_encrypt_secret(self.secrets) else: raise AnsibleVaultError("A vault password must be specified to encrypt data") @@ -489,7 +487,7 @@ class VaultLib: msg += "%s is not a vault encrypted file" % filename raise AnsibleError(msg) - b_vaulttext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) + b_vaulttext, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) # create the cipher object, note that the cipher used for decrypt can # be different than the cipher used for encrypt @@ -528,7 +526,7 @@ class VaultLib: # the known vault secrets. if not C.DEFAULT_VAULT_ID_MATCH: # Add all of the known vault_ids as candidates for decrypting a vault. - vault_id_matchers.extend([_vault_id for _vault_id, _secret in self.secrets if _vault_id != vault_id]) + vault_id_matchers.extend([_vault_id for _vault_id, _dummy in self.secrets if _vault_id != vault_id]) matched_secrets = match_secrets(self.secrets, vault_id_matchers) @@ -600,7 +598,7 @@ class VaultEditor: fh.write(data) fh.write(data[:file_len % chunk_len]) - assert(fh.tell() == file_len) # FIXME remove this assert once we have unittests to check its accuracy + assert fh.tell() == file_len # FIXME remove this assert once we have unittests to check its accuracy os.fsync(fh) def _shred_file(self, tmp_path): @@ -621,7 +619,7 @@ class VaultEditor: return try: - r = call(['shred', tmp_path]) + r = subprocess.call(['shred', tmp_path]) except (OSError, ValueError): # shred is not available on this system, or some other error occurred. # ValueError caught because OS X El Capitan is raising an @@ -648,7 +646,7 @@ class VaultEditor: self.write_data(existing_data, tmp_path, shred=False) # drop the user into an editor on the tmp file - call(self._editor_shell_command(tmp_path)) + subprocess.call(self._editor_shell_command(tmp_path)) except: # whatever happens, destroy the decrypted file self._shred_file(tmp_path) @@ -737,13 +735,13 @@ class VaultEditor: # Figure out the vault id from the file, to select the right secret to re-encrypt it # (duplicates parts of decrypt, but alas...) - b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) + dummy, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) # if we could decrypt, the vault_id should be in secrets # though we could have multiple secrets for a given vault_id, pick the first one secrets = match_secrets(self.vault.secrets, [vault_id]) secret = secrets[0][1] - if self.vault.cipher_name not in CIPHER_WRITE_WHITELIST: + if cipher_name not in CIPHER_WRITE_WHITELIST: # we want to get rid of files encrypted with the AES cipher self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=True) else: @@ -987,7 +985,7 @@ class VaultAES: return b_plaintext @classmethod - def decrypt(cls, b_vaulttext, secret, key_length=32, vault_id=None): + def decrypt(cls, b_vaulttext, secret, key_length=32): """ Decrypt the given data and return it :arg b_data: A byte string containing the encrypted data @@ -1084,7 +1082,7 @@ class VaultAES256: return b_key1, b_key2, b_iv @staticmethod - def _encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv): + def _encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv): cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND) encryptor = cipher.encryptor() padder = padding.PKCS7(algorithms.AES.block_size).padder() @@ -1099,7 +1097,7 @@ class VaultAES256: return to_bytes(hexlify(b_hmac), errors='surrogate_or_strict'), hexlify(b_ciphertext) @staticmethod - def _encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv): + def _encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv): # PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3 bs = AES_pycrypto.block_size padding_length = (bs - len(b_plaintext) % bs) or bs @@ -1135,9 +1133,9 @@ class VaultAES256: b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt) if HAS_CRYPTOGRAPHY: - b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv) + b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv) elif HAS_PYCRYPTO: - b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv) + b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv) else: raise AnsibleError(NEED_CRYPTO_LIBRARY + '(Detected in encrypt)') diff --git a/lib/ansible/parsing/yaml/constructor.py b/lib/ansible/parsing/yaml/constructor.py index 52ad8fc0f0..80f660a960 100644 --- a/lib/ansible/parsing/yaml/constructor.py +++ b/lib/ansible/parsing/yaml/constructor.py @@ -26,8 +26,7 @@ from ansible.module_utils._text import to_bytes from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode from ansible.utils.unsafe_proxy import wrap_var -from ansible.parsing.vault import VaultLib, parse_vaulttext_envelope - +from ansible.parsing.vault import VaultLib try: from __main__ import display diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py index 30c68f440d..3c6e73b65d 100644 --- a/test/units/parsing/vault/test_vault_editor.py +++ b/test/units/parsing/vault/test_vault_editor.py @@ -104,7 +104,7 @@ class TestVaultEditor(unittest.TestCase): vault_secrets = self._secrets(self.vault_password) return VaultEditor(VaultLib(vault_secrets)) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_empty_target(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -118,7 +118,7 @@ class TestVaultEditor(unittest.TestCase): self.assertNotEqual(src_contents, b_ciphertext) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_call_exception(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -136,7 +136,7 @@ class TestVaultEditor(unittest.TestCase): src_file_path, self.vault_secret) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_symlink_target(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -170,7 +170,7 @@ class TestVaultEditor(unittest.TestCase): def _faux_command(self, tmp_path): pass - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_no_change(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -308,7 +308,7 @@ class TestVaultEditor(unittest.TestCase): self._assert_file_is_link(src_file_link_path, src_file_path) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.") @@ -333,9 +333,7 @@ class TestVaultEditor(unittest.TestCase): src_file_plaintext = ve.vault.decrypt(new_src_file_contents) self.assertEqual(src_file_plaintext, new_src_contents) - new_stat = os.stat(src_file_path) - - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_symlink(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.") @@ -371,7 +369,7 @@ class TestVaultEditor(unittest.TestCase): # self.assertEqual(src_file_plaintext, new_src_contents, # 'The decrypted plaintext of the editted file is not the expected contents.') - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_not_encrypted(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.")