From c38ff3b8f84aed46fd087d91a457a6372eda9609 Mon Sep 17 00:00:00 2001 From: Adrian Likins Date: Tue, 8 Aug 2017 16:10:03 -0400 Subject: [PATCH] pylint fixes for vault related code (#27721) * rm unneeded parens following assert * rm unused parse_vaulttext_envelope from yaml.constructor * No longer need index/enumerate over vault_ids * rm unnecessary else * rm unused VaultCli.secrets * rm unused vault_id arg on VaultAES.decrypt() pylint: Unused argument 'vault_id' pylint: Unused parse_vaulttext_envelope imported from ansible.parsing.vault pylint: Unused variable 'index' pylint: Unnecessary parens after 'assert' keyword pylint: Unnecessary "else" after "return" (no-else-return) pylint: Attribute 'editor' defined outside __init__ * use 'dummy' for unused variables instead of _ Based on pylint unused variable warnings. Existing code use '_' for this, but that is old and busted. The hot new thing is 'dummy'. It is so fetch. Except for where we get warnings for reusing the 'dummy' var name inside of a list comprehension. * Add super().__init__ call to PromptVaultSecret.__init__ pylint: __init__ method from base class 'VaultSecret' is not called (super-init-not-called) * Make FileVaultSecret.read_file reg method again The base class read_file() doesnt need self but the sub classes do. Rm now unneeded loader arg to read_file() * Fix err msg string literal that had no effect pylint: String statement has no effect The indent on the continuation of the msg_format was wrong so the second half was dropped. There was also no need to join() filename (copy/paste from original with a command list I assume...) * Use local cipher_name in VaultEditor.edit_file not instance pylint: Unused variable 'cipher_name' pylint: Unused variable 'b_ciphertext' Use the local cipher_name returned from parse_vaulttext_envelope() instead of the instance self.cipher_name var. Since there is only one valid cipher_name either way, it was equilivent, but it will not be with more valid cipher_names * Rm unused b_salt arg on VaultAES256._encrypt* pylint: Unused argument 'b_salt' Previously the methods computed the keys and iv themselves so needed to be passed in the salt, but now the key/iv are built before and passed in so b_salt arg is not used anymore. * rm redundant import of call from subprocess pylint: Imports from package subprocess are not grouped use via subprocess module now instead of direct import. * self._bytes is set in super init now, rm dup * Make FileVaultSecret.read_file() -> _read_file() _read_file() is details of the implementation of load(), so now 'private'. --- lib/ansible/cli/__init__.py | 2 +- lib/ansible/cli/vault.py | 3 +- lib/ansible/parsing/vault/__init__.py | 56 +++++++++---------- lib/ansible/parsing/yaml/constructor.py | 3 +- test/units/parsing/vault/test_vault_editor.py | 16 +++--- 5 files changed, 37 insertions(+), 43 deletions(-) diff --git a/lib/ansible/cli/__init__.py b/lib/ansible/cli/__init__.py index 2844b4d8ff..353186f86a 100644 --- a/lib/ansible/cli/__init__.py +++ b/lib/ansible/cli/__init__.py @@ -227,7 +227,7 @@ class CLI(with_metaclass(ABCMeta, object)): vault_password_files, ask_vault_pass) - for index, vault_id_slug in enumerate(vault_ids): + for vault_id_slug in vault_ids: vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug) if vault_id_value in ['prompt', 'prompt_ask_vault_pass']: diff --git a/lib/ansible/cli/vault.py b/lib/ansible/cli/vault.py index 1f28cfb8ce..3ffad1a8a1 100644 --- a/lib/ansible/cli/vault.py +++ b/lib/ansible/cli/vault.py @@ -211,10 +211,9 @@ class VaultCLI(CLI): self.new_encrypt_secret = new_encrypt_secret[1] loader.set_vault_secrets(vault_secrets) - self.secrets = vault_secrets # FIXME: do we need to create VaultEditor here? its not reused - vault = VaultLib(self.secrets) + vault = VaultLib(vault_secrets) self.editor = VaultEditor(vault) self.execute() diff --git a/lib/ansible/parsing/vault/__init__.py b/lib/ansible/parsing/vault/__init__.py index 8e8af48e07..9040cbf633 100644 --- a/lib/ansible/parsing/vault/__init__.py +++ b/lib/ansible/parsing/vault/__init__.py @@ -32,7 +32,6 @@ from binascii import unhexlify from hashlib import md5 from hashlib import sha256 from io import BytesIO -from subprocess import call HAS_CRYPTOGRAPHY = False HAS_PYCRYPTO = False @@ -75,6 +74,7 @@ except ImportError: from ansible.errors import AnsibleError from ansible import constants as C from ansible.module_utils.six import PY3, binary_type +# Note: on py2, this zip is izip not the list based zip() builtin from ansible.module_utils.six.moves import zip from ansible.module_utils._text import to_bytes, to_text @@ -240,7 +240,7 @@ class PromptVaultSecret(VaultSecret): default_prompt_formats = ["Vault password (%s): "] def __init__(self, _bytes=None, vault_id=None, prompt_formats=None): - self._bytes = _bytes + super(PromptVaultSecret, self).__init__(_bytes=_bytes) self.vault_id = vault_id if prompt_formats is None: @@ -293,8 +293,8 @@ def get_file_vault_secret(filename=None, vault_id_name=None, encoding=None, load if loader.is_executable(this_path): # TODO: pass vault_id_name to script via cli return ScriptVaultSecret(filename=this_path, encoding=encoding, loader=loader) - else: - return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader) + + return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader) # TODO: mv these classes to a seperate file so we don't pollute vault with 'subprocess' etc @@ -319,15 +319,15 @@ class FileVaultSecret(VaultSecret): return None def load(self): - self._bytes = self.read_file(self.filename, self.loader) + self._bytes = self._read_file(self.filename) - @staticmethod - def read_file(filename, loader): + def _read_file(self, filename): """ Read a vault password from a file or if executable, execute the script and retrieve password from STDOUT """ + # TODO: replace with use of self.loader try: f = open(filename, "rb") vault_pass = f.read().strip() @@ -344,23 +344,21 @@ class FileVaultSecret(VaultSecret): class ScriptVaultSecret(FileVaultSecret): - - @staticmethod - def read_file(filename, loader): - if not loader.is_executable(filename): + def _read_file(self, filename): + if not self.loader.is_executable(filename): raise AnsibleVaultError("The vault password script %s was not executable" % filename) try: # STDERR not captured to make it easier for users to prompt for input in their scripts p = subprocess.Popen(filename, stdout=subprocess.PIPE) except OSError as e: - msg_format = "Problem running vault password script %s (%s)." - "If this is not a script, remove the executable bit from the file." - msg = msg_format % (' '.join(filename), e) + msg_format = "Problem running vault password script %s (%s)." \ + " If this is not a script, remove the executable bit from the file." + msg = msg_format % (filename, e) raise AnsibleError(msg) - stdout, stderr = p.communicate() + stdout, dummy = p.communicate() if p.returncode != 0: raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr)) @@ -394,7 +392,7 @@ def match_encrypt_secret(secrets): '''Find the best/first/only secret in secrets to use for encrypting''' # ie, consider all of the available secrets as matches - _vault_id_matchers = [_vault_id for _vault_id, _vault_secret in secrets] + _vault_id_matchers = [_vault_id for _vault_id, dummy in secrets] best_secret = match_best_secret(secrets, _vault_id_matchers) # can be empty list sans any tuple return best_secret @@ -441,7 +439,7 @@ class VaultLib: if secret is None: if self.secrets: - secret_vault_id, secret = match_encrypt_secret(self.secrets) + dummy, secret = match_encrypt_secret(self.secrets) else: raise AnsibleVaultError("A vault password must be specified to encrypt data") @@ -489,7 +487,7 @@ class VaultLib: msg += "%s is not a vault encrypted file" % filename raise AnsibleError(msg) - b_vaulttext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) + b_vaulttext, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) # create the cipher object, note that the cipher used for decrypt can # be different than the cipher used for encrypt @@ -528,7 +526,7 @@ class VaultLib: # the known vault secrets. if not C.DEFAULT_VAULT_ID_MATCH: # Add all of the known vault_ids as candidates for decrypting a vault. - vault_id_matchers.extend([_vault_id for _vault_id, _secret in self.secrets if _vault_id != vault_id]) + vault_id_matchers.extend([_vault_id for _vault_id, _dummy in self.secrets if _vault_id != vault_id]) matched_secrets = match_secrets(self.secrets, vault_id_matchers) @@ -600,7 +598,7 @@ class VaultEditor: fh.write(data) fh.write(data[:file_len % chunk_len]) - assert(fh.tell() == file_len) # FIXME remove this assert once we have unittests to check its accuracy + assert fh.tell() == file_len # FIXME remove this assert once we have unittests to check its accuracy os.fsync(fh) def _shred_file(self, tmp_path): @@ -621,7 +619,7 @@ class VaultEditor: return try: - r = call(['shred', tmp_path]) + r = subprocess.call(['shred', tmp_path]) except (OSError, ValueError): # shred is not available on this system, or some other error occurred. # ValueError caught because OS X El Capitan is raising an @@ -648,7 +646,7 @@ class VaultEditor: self.write_data(existing_data, tmp_path, shred=False) # drop the user into an editor on the tmp file - call(self._editor_shell_command(tmp_path)) + subprocess.call(self._editor_shell_command(tmp_path)) except: # whatever happens, destroy the decrypted file self._shred_file(tmp_path) @@ -737,13 +735,13 @@ class VaultEditor: # Figure out the vault id from the file, to select the right secret to re-encrypt it # (duplicates parts of decrypt, but alas...) - b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) + dummy, dummy, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) # if we could decrypt, the vault_id should be in secrets # though we could have multiple secrets for a given vault_id, pick the first one secrets = match_secrets(self.vault.secrets, [vault_id]) secret = secrets[0][1] - if self.vault.cipher_name not in CIPHER_WRITE_WHITELIST: + if cipher_name not in CIPHER_WRITE_WHITELIST: # we want to get rid of files encrypted with the AES cipher self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=True) else: @@ -987,7 +985,7 @@ class VaultAES: return b_plaintext @classmethod - def decrypt(cls, b_vaulttext, secret, key_length=32, vault_id=None): + def decrypt(cls, b_vaulttext, secret, key_length=32): """ Decrypt the given data and return it :arg b_data: A byte string containing the encrypted data @@ -1084,7 +1082,7 @@ class VaultAES256: return b_key1, b_key2, b_iv @staticmethod - def _encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv): + def _encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv): cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND) encryptor = cipher.encryptor() padder = padding.PKCS7(algorithms.AES.block_size).padder() @@ -1099,7 +1097,7 @@ class VaultAES256: return to_bytes(hexlify(b_hmac), errors='surrogate_or_strict'), hexlify(b_ciphertext) @staticmethod - def _encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv): + def _encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv): # PKCS#7 PAD DATA http://tools.ietf.org/html/rfc5652#section-6.3 bs = AES_pycrypto.block_size padding_length = (bs - len(b_plaintext) % bs) or bs @@ -1135,9 +1133,9 @@ class VaultAES256: b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt) if HAS_CRYPTOGRAPHY: - b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_salt, b_key1, b_key2, b_iv) + b_hmac, b_ciphertext = cls._encrypt_cryptography(b_plaintext, b_key1, b_key2, b_iv) elif HAS_PYCRYPTO: - b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv) + b_hmac, b_ciphertext = cls._encrypt_pycrypto(b_plaintext, b_key1, b_key2, b_iv) else: raise AnsibleError(NEED_CRYPTO_LIBRARY + '(Detected in encrypt)') diff --git a/lib/ansible/parsing/yaml/constructor.py b/lib/ansible/parsing/yaml/constructor.py index 52ad8fc0f0..80f660a960 100644 --- a/lib/ansible/parsing/yaml/constructor.py +++ b/lib/ansible/parsing/yaml/constructor.py @@ -26,8 +26,7 @@ from ansible.module_utils._text import to_bytes from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode from ansible.utils.unsafe_proxy import wrap_var -from ansible.parsing.vault import VaultLib, parse_vaulttext_envelope - +from ansible.parsing.vault import VaultLib try: from __main__ import display diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py index 30c68f440d..3c6e73b65d 100644 --- a/test/units/parsing/vault/test_vault_editor.py +++ b/test/units/parsing/vault/test_vault_editor.py @@ -104,7 +104,7 @@ class TestVaultEditor(unittest.TestCase): vault_secrets = self._secrets(self.vault_password) return VaultEditor(VaultLib(vault_secrets)) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_empty_target(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -118,7 +118,7 @@ class TestVaultEditor(unittest.TestCase): self.assertNotEqual(src_contents, b_ciphertext) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_call_exception(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -136,7 +136,7 @@ class TestVaultEditor(unittest.TestCase): src_file_path, self.vault_secret) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_symlink_target(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -170,7 +170,7 @@ class TestVaultEditor(unittest.TestCase): def _faux_command(self, tmp_path): pass - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_helper_no_change(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -308,7 +308,7 @@ class TestVaultEditor(unittest.TestCase): self._assert_file_is_link(src_file_link_path, src_file_path) - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.") @@ -333,9 +333,7 @@ class TestVaultEditor(unittest.TestCase): src_file_plaintext = ve.vault.decrypt(new_src_file_contents) self.assertEqual(src_file_plaintext, new_src_contents) - new_stat = os.stat(src_file_path) - - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_symlink(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.") @@ -371,7 +369,7 @@ class TestVaultEditor(unittest.TestCase): # self.assertEqual(src_file_plaintext, new_src_contents, # 'The decrypted plaintext of the editted file is not the expected contents.') - @patch('ansible.parsing.vault.call') + @patch('ansible.parsing.vault.subprocess.call') def test_edit_file_not_encrypted(self, mock_sp_call): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.")