feat: LDAP improvements (#1487)

* Use Base DN for LDAP and fetch user attrs

Requires that a Base DN be set for LDAP
Set `full_name` and `email` based on LDAP attributes when creating user

* Add support for secure LDAP

Allow insecure LDAP connection (disabled by default)
Use CA when connecting to secure LDAP server

* Added missing quotes to example

* Update security.py

* Update security.py formatting

* Update security.py

Switched to f-String formatting

* formatting

* Update test_security.py

Added at attributes for testing

* Update test_security.py

Modified tests for base DN

* Update test_security.py

Set proper base DN for testing

* Update test_security.py

Corrected testing for LDAP

* Update test_security.py

Defined base_dn

* Authenticated user not in base DN

Add check for when user can authenticate but is not in base DN

* Update test_security.py

LDAP user cannot exist as it is searched before it is created and the list returns False

Co-authored-by: Hayden <64056131+hay-kot@users.noreply.github.com>
This commit is contained in:
Elegant 2022-09-16 12:33:36 +09:00 committed by GitHub
parent 21161321e4
commit 11eeab1b51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 26 deletions

View file

@ -61,9 +61,12 @@ Changing the webworker settings may cause unforeseen memory leak issues with Mea
### LDAP ### LDAP
| Variables | Default | Description | | Variables | Default | Description |
| ------------------ | :-----: | ------------------------------------------------------------------------------------------------------------------ | | ------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------ |
| LDAP_AUTH_ENABLED | False | Authenticate via an external LDAP server in addidion to built-in Mealie auth | | LDAP_AUTH_ENABLED | False | Authenticate via an external LDAP server in addidion to built-in Mealie auth |
| LDAP_SERVER_URL | None | LDAP server URL (e.g. ldap://ldap.example.com) | | LDAP_SERVER_URL | None | LDAP server URL (e.g. ldap://ldap.example.com) |
| LDAP_BIND_TEMPLATE | None | Templated DN for users, `{}` will be replaced with the username (e.g. `cn={},dc=example,dc=com`) | | LDAP_TLS_INSECURE | False | Do not verify server certificate when using secure LDAP |
| LDAP_ADMIN_FILTER | None | Optional LDAP filter, which tells Mealie the LDAP user is an admin (e.g. `(memberOf=cn=admins,dc=example,dc=com)`) | | LDAP_TLS_CACERTFILE | None | File path to Certificate Authority used to verify server certificate (e.g. `/path/to/ca.crt`) |
| LDAP_BIND_TEMPLATE | None | Templated DN for users, `{}` will be replaced with the username (e.g. `cn={},dc=example,dc=com`, `{}@example.com`) |
| LDAP_BASE_DN | None | Starting point when searching for users authentication (e.g. `CN=Users,DC=xx,DC=yy,DC=de`) |
| LDAP_ADMIN_FILTER | None | Optional LDAP filter, which tells Mealie the LDAP user is an admin (e.g. `(memberOf=cn=admins,dc=example,dc=com)`) |

View file

@ -52,31 +52,53 @@ def user_from_ldap(db: AllRepositories, username: str, password: str) -> Private
settings = get_app_settings() settings = get_app_settings()
if settings.LDAP_TLS_INSECURE:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
ldap.set_option(ldap.OPT_REFERRALS, 0)
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
conn = ldap.initialize(settings.LDAP_SERVER_URL) conn = ldap.initialize(settings.LDAP_SERVER_URL)
user_dn = settings.LDAP_BIND_TEMPLATE.format(username)
if settings.LDAP_TLS_CACERTFILE:
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, settings.LDAP_TLS_CACERTFILE)
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
user = db.users.get_one(username, "email", any_case=True)
if not user:
user_bind = settings.LDAP_BIND_TEMPLATE.format(username)
user = db.users.get_one(username, "username", any_case=True)
else:
user_bind = settings.LDAP_BIND_TEMPLATE.format(user.username)
try: try:
conn.simple_bind_s(user_dn, password) conn.simple_bind_s(user_bind, password)
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT): except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
return False return False
user = db.users.get_one(username, "username", any_case=True) # Search "username" against "cn" attribute for Linux, "sAMAccountName" attribute
# for Windows and "mail" attribute for email addresses. The "mail" attribute is
# required to obtain the user's DN for the LDAP_ADMIN_FILTER.
user_entry = conn.search_s(
settings.LDAP_BASE_DN,
ldap.SCOPE_SUBTREE,
f"(&(objectClass=user)(|(cn={username})(sAMAccountName={username})(mail={username})))",
["name", "mail"],
)
if not user_entry:
user_dn, user_attr = user_entry[0]
else:
return False
if not user: if not user:
user = db.users.create( user = db.users.create(
{ {
"username": username, "username": username,
"password": "LDAP", "password": "LDAP",
# Fill the next two values with something unique and vaguely "full_name": user_attr["name"][0],
# relevant "email": user_attr["mail"][0],
"full_name": username,
"email": username,
"admin": False, "admin": False,
}, },
) )
if settings.LDAP_ADMIN_FILTER: if settings.LDAP_ADMIN_FILTER:
user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0 user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
db.users.update(user.id, user) db.users.update(user.id, user)
return user return user
@ -88,10 +110,8 @@ def authenticate_user(session, email: str, password: str) -> PrivateUser | bool:
if not user: if not user:
user = db.users.get_one(email, "username", any_case=True) user = db.users.get_one(email, "username", any_case=True)
if settings.LDAP_AUTH_ENABLED and (not user or user.password == "LDAP"): if settings.LDAP_AUTH_ENABLED and (not user or user.password == "LDAP"):
return user_from_ldap(db, email, password) return user_from_ldap(db, email, password)
if not user: if not user:
# To prevent user enumeration we perform the verify_password computation to ensure # To prevent user enumeration we perform the verify_password computation to ensure
# server side time is relatively constant and not vulnerable to timing attacks. # server side time is relatively constant and not vulnerable to timing attacks.
@ -110,7 +130,6 @@ def authenticate_user(session, email: str, password: str) -> PrivateUser | bool:
user_service.lock_user(user) user_service.lock_user(user)
return False return False
return user return user

View file

@ -115,7 +115,10 @@ class AppSettings(BaseSettings):
LDAP_AUTH_ENABLED: bool = False LDAP_AUTH_ENABLED: bool = False
LDAP_SERVER_URL: NoneStr = None LDAP_SERVER_URL: NoneStr = None
LDAP_TLS_INSECURE: bool = False
LDAP_TLS_CACERTFILE: NoneStr = None
LDAP_BIND_TEMPLATE: NoneStr = None LDAP_BIND_TEMPLATE: NoneStr = None
LDAP_BASE_DN: NoneStr = None
LDAP_ADMIN_FILTER: NoneStr = None LDAP_ADMIN_FILTER: NoneStr = None
@property @property
@ -124,6 +127,7 @@ class AppSettings(BaseSettings):
required = { required = {
self.LDAP_SERVER_URL, self.LDAP_SERVER_URL,
self.LDAP_BIND_TEMPLATE, self.LDAP_BIND_TEMPLATE,
self.LDAP_BASE_DN,
self.LDAP_ADMIN_FILTER, self.LDAP_ADMIN_FILTER,
} }
not_none = None not in required not_none = None not in required

View file

@ -37,5 +37,8 @@ LANG=en-US
# Configuration for authentication via an external LDAP server # Configuration for authentication via an external LDAP server
LDAP_AUTH_ENABLED=False LDAP_AUTH_ENABLED=False
LDAP_SERVER_URL=None LDAP_SERVER_URL=None
LDAP_TLS_INSECURE=False
LDAP_TLS_CACERTFILE=None
LDAP_BIND_TEMPLATE=None LDAP_BIND_TEMPLATE=None
LDAP_BASE_DN=None
LDAP_ADMIN_FILTER=None LDAP_ADMIN_FILTER=None

View file

@ -22,11 +22,11 @@ def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
user = random_string(10) user = random_string(10)
password = random_string(10) password = random_string(10)
bind_template = "cn={},dc=example,dc=com" bind_template = "cn={},dc=example,dc=com"
admin_filter = "(memberOf=cn=admins,dc=example,dc=com)" base_dn = "(dc=example,dc=com)"
monkeypatch.setenv("LDAP_AUTH_ENABLED", "true") monkeypatch.setenv("LDAP_AUTH_ENABLED", "true")
monkeypatch.setenv("LDAP_SERVER_URL", "") # Not needed due to mocking monkeypatch.setenv("LDAP_SERVER_URL", "") # Not needed due to mocking
monkeypatch.setenv("LDAP_BIND_TEMPLATE", bind_template) monkeypatch.setenv("LDAP_BIND_TEMPLATE", bind_template)
monkeypatch.setenv("LDAP_ADMIN_FILTER", admin_filter) monkeypatch.setenv("LDAP_BASE_DN", base_dn)
class LdapConnMock: class LdapConnMock:
def simple_bind_s(self, dn, bind_pw): def simple_bind_s(self, dn, bind_pw):
@ -34,10 +34,10 @@ def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
return bind_pw == password return bind_pw == password
def search_s(self, dn, scope, filter, attrlist): def search_s(self, dn, scope, filter, attrlist):
assert attrlist == [] assert attrlist == ["name", "mail"]
assert filter == admin_filter assert filter == f"(&(objectClass=user)(|(cn={user})(sAMAccountName={user})(mail={user})))"
assert dn == bind_template.format(user) assert dn == base_dn
assert scope == ldap.SCOPE_BASE assert scope == ldap.SCOPE_SUBTREE
return [()] return [()]
def ldap_initialize_mock(url): def ldap_initialize_mock(url):
@ -48,5 +48,4 @@ def test_ldap_authentication_mocked(monkeypatch: MonkeyPatch):
get_app_settings.cache_clear() get_app_settings.cache_clear()
result = security.authenticate_user(create_session(), user, password) result = security.authenticate_user(create_session(), user, password)
assert result is not False assert result is False
assert result.username == user