keycloak: improve testability of authentification (#57611)
This commit is contained in:
parent
07c288f56f
commit
649d612d64
8 changed files with 259 additions and 44 deletions
5
.github/BOTMETA.yml
vendored
5
.github/BOTMETA.yml
vendored
|
@ -716,8 +716,9 @@ files:
|
|||
labels:
|
||||
- clustering
|
||||
- k8s
|
||||
$module_utils/keycloak.py:
|
||||
maintainers: eikef
|
||||
$module_utils/identity/keycloak/:
|
||||
maintainers: eikef
|
||||
support: community
|
||||
$module_utils/kubevirt.py: *kubevirt
|
||||
$module_utils/manageiq.py:
|
||||
maintainers: $team_manageiq
|
||||
|
|
0
lib/ansible/module_utils/identity/__init__.py
Normal file
0
lib/ansible/module_utils/identity/__init__.py
Normal file
0
lib/ansible/module_utils/identity/keycloak/__init__.py
Normal file
0
lib/ansible/module_utils/identity/keycloak/__init__.py
Normal file
|
@ -68,49 +68,54 @@ def camel(words):
|
|||
return words.split('_')[0] + ''.join(x.capitalize() or '_' for x in words.split('_')[1:])
|
||||
|
||||
|
||||
class KeycloakError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_token(base_url, validate_certs, auth_realm, client_id,
|
||||
auth_username, auth_password, client_secret):
|
||||
auth_url = URL_TOKEN.format(url=base_url, realm=auth_realm)
|
||||
temp_payload = {
|
||||
'grant_type': 'password',
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret,
|
||||
'username': auth_username,
|
||||
'password': auth_password,
|
||||
}
|
||||
# Remove empty items, for instance missing client_secret
|
||||
payload = dict(
|
||||
(k, v) for k, v in temp_payload.items() if v is not None)
|
||||
try:
|
||||
r = json.load(open_url(auth_url, method='POST',
|
||||
validate_certs=validate_certs,
|
||||
data=urlencode(payload)))
|
||||
except ValueError as e:
|
||||
raise KeycloakError(
|
||||
'API returned invalid JSON when trying to obtain access token from %s: %s'
|
||||
% (auth_url, str(e)))
|
||||
except Exception as e:
|
||||
raise KeycloakError('Could not obtain access token from %s: %s'
|
||||
% (auth_url, str(e)))
|
||||
|
||||
try:
|
||||
return {
|
||||
'Authorization': 'Bearer ' + r['access_token'],
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
except KeyError:
|
||||
raise KeycloakError(
|
||||
'Could not obtain access token from %s' % auth_url)
|
||||
|
||||
|
||||
class KeycloakAPI(object):
|
||||
""" Keycloak API access; Keycloak uses OAuth 2.0 to protect its API, an access token for which
|
||||
is obtained through OpenID connect
|
||||
"""
|
||||
def __init__(self, module):
|
||||
def __init__(self, module, connection_header):
|
||||
self.module = module
|
||||
self.token = None
|
||||
self._connect()
|
||||
|
||||
def _connect(self):
|
||||
""" Obtains an access_token and saves it for use in API accesses
|
||||
"""
|
||||
self.baseurl = self.module.params.get('auth_keycloak_url')
|
||||
self.validate_certs = self.module.params.get('validate_certs')
|
||||
|
||||
auth_url = URL_TOKEN.format(url=self.baseurl, realm=self.module.params.get('auth_realm'))
|
||||
|
||||
payload = {'grant_type': 'password',
|
||||
'client_id': self.module.params.get('auth_client_id'),
|
||||
'client_secret': self.module.params.get('auth_client_secret'),
|
||||
'username': self.module.params.get('auth_username'),
|
||||
'password': self.module.params.get('auth_password')}
|
||||
|
||||
# Remove empty items, for instance missing client_secret
|
||||
payload = dict((k, v) for k, v in payload.items() if v is not None)
|
||||
|
||||
try:
|
||||
r = json.load(open_url(auth_url, method='POST',
|
||||
validate_certs=self.validate_certs, data=urlencode(payload)))
|
||||
except ValueError as e:
|
||||
self.module.fail_json(msg='API returned invalid JSON when trying to obtain access token from %s: %s'
|
||||
% (auth_url, str(e)))
|
||||
except Exception as e:
|
||||
self.module.fail_json(msg='Could not obtain access token from %s: %s'
|
||||
% (auth_url, str(e)))
|
||||
|
||||
if 'access_token' in r:
|
||||
self.token = r['access_token']
|
||||
self.restheaders = {'Authorization': 'Bearer ' + self.token,
|
||||
'Content-Type': 'application/json'}
|
||||
|
||||
else:
|
||||
self.module.fail_json(msg='Could not obtain access token from %s' % auth_url)
|
||||
self.restheaders = connection_header
|
||||
|
||||
def get_clients(self, realm='master', filter=None):
|
||||
""" Obtains client representations for clients in a realm
|
|
@ -628,7 +628,8 @@ end_state:
|
|||
}
|
||||
'''
|
||||
|
||||
from ansible.module_utils.keycloak import KeycloakAPI, camel, keycloak_argument_spec
|
||||
from ansible.module_utils.identity.keycloak.keycloak import KeycloakAPI, camel, \
|
||||
keycloak_argument_spec, get_token, KeycloakError
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
|
@ -715,7 +716,20 @@ def main():
|
|||
result = dict(changed=False, msg='', diff={}, proposed={}, existing={}, end_state={})
|
||||
|
||||
# Obtain access token, initialize API
|
||||
kc = KeycloakAPI(module)
|
||||
try:
|
||||
connection_header = get_token(
|
||||
base_url=module.params.get('auth_keycloak_url'),
|
||||
validate_certs=module.params.get('validate_certs'),
|
||||
auth_realm=module.params.get('auth_realm'),
|
||||
client_id=module.params.get('auth_client_id'),
|
||||
auth_username=module.params.get('auth_username'),
|
||||
auth_password=module.params.get('auth_password'),
|
||||
client_secret=module.params.get('auth_client_secret'),
|
||||
)
|
||||
except KeycloakError as e:
|
||||
module.fail_json(msg=str(e))
|
||||
|
||||
kc = KeycloakAPI(module, connection_header)
|
||||
|
||||
realm = module.params.get('realm')
|
||||
cid = module.params.get('id')
|
||||
|
|
|
@ -246,7 +246,8 @@ end_state:
|
|||
}
|
||||
'''
|
||||
|
||||
from ansible.module_utils.keycloak import KeycloakAPI, camel, keycloak_argument_spec
|
||||
from ansible.module_utils.identity.keycloak.keycloak import KeycloakAPI, camel, \
|
||||
keycloak_argument_spec, get_token, KeycloakError
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
|
@ -289,7 +290,19 @@ def main():
|
|||
result = dict(changed=False, msg='', diff={}, proposed={}, existing={}, end_state={})
|
||||
|
||||
# Obtain access token, initialize API
|
||||
kc = KeycloakAPI(module)
|
||||
try:
|
||||
connection_header = get_token(
|
||||
base_url=module.params.get('auth_keycloak_url'),
|
||||
validate_certs=module.params.get('validate_certs'),
|
||||
auth_realm=module.params.get('auth_realm'),
|
||||
client_id=module.params.get('auth_client_id'),
|
||||
auth_username=module.params.get('auth_username'),
|
||||
auth_password=module.params.get('auth_password'),
|
||||
client_secret=module.params.get('auth_client_secret'),
|
||||
)
|
||||
except KeycloakError as e:
|
||||
module.fail_json(msg=str(e))
|
||||
kc = KeycloakAPI(module, connection_header)
|
||||
|
||||
realm = module.params.get('realm')
|
||||
state = module.params.get('state')
|
||||
|
|
|
@ -207,7 +207,8 @@ group:
|
|||
view: true
|
||||
'''
|
||||
|
||||
from ansible.module_utils.keycloak import KeycloakAPI, camel, keycloak_argument_spec
|
||||
from ansible.module_utils.identity.keycloak.keycloak import KeycloakAPI, camel, \
|
||||
keycloak_argument_spec, get_token, KeycloakError
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
|
@ -235,7 +236,19 @@ def main():
|
|||
result = dict(changed=False, msg='', diff={}, group='')
|
||||
|
||||
# Obtain access token, initialize API
|
||||
kc = KeycloakAPI(module)
|
||||
try:
|
||||
connection_header = get_token(
|
||||
base_url=module.params.get('auth_keycloak_url'),
|
||||
validate_certs=module.params.get('validate_certs'),
|
||||
auth_realm=module.params.get('auth_realm'),
|
||||
client_id=module.params.get('auth_client_id'),
|
||||
auth_username=module.params.get('auth_username'),
|
||||
auth_password=module.params.get('auth_password'),
|
||||
client_secret=module.params.get('auth_client_secret'),
|
||||
)
|
||||
except KeycloakError as e:
|
||||
module.fail_json(msg=str(e))
|
||||
kc = KeycloakAPI(module, connection_header)
|
||||
|
||||
realm = module.params.get('realm')
|
||||
state = module.params.get('state')
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
from itertools import count
|
||||
|
||||
from ansible.module_utils.identity.keycloak.keycloak import (
|
||||
get_token,
|
||||
KeycloakError,
|
||||
)
|
||||
from ansible.module_utils.six import StringIO
|
||||
from ansible.module_utils.six.moves.urllib.error import HTTPError
|
||||
|
||||
|
||||
def build_mocked_request(get_id_user_count, response_dict):
|
||||
def _mocked_requests(*args, **kwargs):
|
||||
url = args[0]
|
||||
method = kwargs['method']
|
||||
future_response = response_dict.get(url, None)
|
||||
return get_response(future_response, method, get_id_user_count)
|
||||
return _mocked_requests
|
||||
|
||||
|
||||
def get_response(object_with_future_response, method, get_id_call_count):
|
||||
if callable(object_with_future_response):
|
||||
return object_with_future_response()
|
||||
if isinstance(object_with_future_response, dict):
|
||||
return get_response(
|
||||
object_with_future_response[method], method, get_id_call_count)
|
||||
if isinstance(object_with_future_response, list):
|
||||
try:
|
||||
call_number = get_id_call_count.__next__()
|
||||
except AttributeError:
|
||||
# manage python 2 versions.
|
||||
call_number = get_id_call_count.next()
|
||||
return get_response(
|
||||
object_with_future_response[call_number], method, get_id_call_count)
|
||||
return object_with_future_response
|
||||
|
||||
|
||||
def create_wrapper(text_as_string):
|
||||
"""Allow to mock many times a call to one address.
|
||||
Without this function, the StringIO is empty for the second call.
|
||||
"""
|
||||
def _create_wrapper():
|
||||
return StringIO(text_as_string)
|
||||
return _create_wrapper
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_good_connection(mocker):
|
||||
token_response = {
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"access_token": "alongtoken"}'), }
|
||||
return mocker.patch(
|
||||
'ansible.module_utils.identity.keycloak.keycloak.open_url',
|
||||
side_effect=build_mocked_request(count(), token_response),
|
||||
autospec=True
|
||||
)
|
||||
|
||||
|
||||
def test_connect_to_keycloak(mock_good_connection):
|
||||
keycloak_header = get_token(
|
||||
base_url='http://keycloak.url/auth',
|
||||
validate_certs=True,
|
||||
auth_realm='master',
|
||||
client_id='admin-cli',
|
||||
auth_username='admin',
|
||||
auth_password='admin',
|
||||
client_secret=None
|
||||
)
|
||||
assert keycloak_header == {
|
||||
'Authorization': 'Bearer alongtoken',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_bad_json_returned(mocker):
|
||||
token_response = {
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"access_token":'), }
|
||||
return mocker.patch(
|
||||
'ansible.module_utils.identity.keycloak.keycloak.open_url',
|
||||
side_effect=build_mocked_request(count(), token_response),
|
||||
autospec=True
|
||||
)
|
||||
|
||||
|
||||
def test_bad_json_returned(mock_bad_json_returned):
|
||||
with pytest.raises(KeycloakError) as raised_error:
|
||||
get_token(
|
||||
base_url='http://keycloak.url/auth',
|
||||
validate_certs=True,
|
||||
auth_realm='master',
|
||||
client_id='admin-cli',
|
||||
auth_username='admin',
|
||||
auth_password='admin',
|
||||
client_secret=None
|
||||
)
|
||||
# cannot check all the message, different errors message for the value
|
||||
# error in python 2.6, 2.7 and 3.*.
|
||||
assert (
|
||||
'API returned invalid JSON when trying to obtain access token from '
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token: '
|
||||
) in str(raised_error.value)
|
||||
|
||||
|
||||
def raise_401(url):
|
||||
def _raise_401():
|
||||
raise HTTPError(url=url, code=401, msg='Unauthorized', hdrs='', fp=StringIO(''))
|
||||
return _raise_401
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_401_returned(mocker):
|
||||
token_response = {
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': raise_401(
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token'),
|
||||
}
|
||||
return mocker.patch(
|
||||
'ansible.module_utils.identity.keycloak.keycloak.open_url',
|
||||
side_effect=build_mocked_request(count(), token_response),
|
||||
autospec=True
|
||||
)
|
||||
|
||||
|
||||
def test_error_returned(mock_401_returned):
|
||||
with pytest.raises(KeycloakError) as raised_error:
|
||||
get_token(
|
||||
base_url='http://keycloak.url/auth',
|
||||
validate_certs=True,
|
||||
auth_realm='master',
|
||||
client_id='admin-cli',
|
||||
auth_username='notadminuser',
|
||||
auth_password='notadminpassword',
|
||||
client_secret=None
|
||||
)
|
||||
assert str(raised_error.value) == (
|
||||
'Could not obtain access token from http://keycloak.url'
|
||||
'/auth/realms/master/protocol/openid-connect/token: '
|
||||
'HTTP Error 401: Unauthorized'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_json_without_token_returned(mocker):
|
||||
token_response = {
|
||||
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"not_token": "It is not a token"}'), }
|
||||
return mocker.patch(
|
||||
'ansible.module_utils.identity.keycloak.keycloak.open_url',
|
||||
side_effect=build_mocked_request(count(), token_response),
|
||||
autospec=True
|
||||
)
|
||||
|
||||
|
||||
def test_json_without_token_returned(mock_json_without_token_returned):
|
||||
with pytest.raises(KeycloakError) as raised_error:
|
||||
get_token(
|
||||
base_url='http://keycloak.url/auth',
|
||||
validate_certs=True,
|
||||
auth_realm='master',
|
||||
client_id='admin-cli',
|
||||
auth_username='admin',
|
||||
auth_password='admin',
|
||||
client_secret=None
|
||||
)
|
||||
assert str(raised_error.value) == (
|
||||
'Could not obtain access token from http://keycloak.url'
|
||||
'/auth/realms/master/protocol/openid-connect/token'
|
||||
)
|
Loading…
Reference in a new issue