# Univention Portal
# Copyright 2019-2022 Univention GmbH
# https://www.univention.de/
# All rights reserved.
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
import json
from six import with_metaclass
from tornado.httpclient import AsyncHTTPClient, HTTPError, HTTPRequest
from univention.portal import Plugin
from univention.portal.log import get_logger
from univention.portal.user import User
[docs]class Session(object):
def __init__(self, nonce):
self.nonce = nonce
self.user = None
[docs] def is_valid(self):
return True
[docs]class Authenticator(with_metaclass(Plugin)):
Our base class for authentication
May hold all the sessions, set cookies, etc.
The idea is that this class handles the following
methods from the Portal:
`login_request`: A user GETs to the login action
`login_user`: Credentials are POSTed to this action
`get_user`: While gathering the portal data, the caller wants
This base class does nothing...
[docs] def get_auth_mode(self, request): # pragma: no cover
return "ucs"
[docs] async def login_request(self, request): # pragma: no cover
[docs] async def login_user(self, request): # pragma: no cover
[docs] async def logout_user(self, request): # pragma: no cover
[docs] async def get_user(self, request): # pragma: no cover
return User(username=None, display_name=None, groups=[], headers={})
[docs] def refresh(self, reason=None): # pragma: no cover
[docs]class UMCAuthenticator(Authenticator):
Specialized Authenticator that relies on a UMC that actually holds any session.
Asks UMC for every request if this session is known.
The preferred mode for auth. The portal hands it over to the frontend.
The URL where to go to with the cookie. Expects a json answer with the username.
As UMC does not return groups, we need a cache object that gets us the groups for the username.
def __init__(self, auth_mode, umc_session_url, group_cache):
self.auth_mode = auth_mode
self.umc_session_url = umc_session_url
self.group_cache = group_cache
[docs] def get_auth_mode(self, request):
return self.auth_mode
[docs] def refresh(self, reason=None):
return self.group_cache.refresh(reason=reason)
[docs] async def get_user(self, request):
cookies = dict((key, morsel.value) for key, morsel in request.cookies.items())
username, display_name = await self._get_username(cookies)
groups = self.group_cache.get().get(username, [])
return User(username, display_name=display_name, groups=groups, headers=dict(request.request.headers))
async def _get_username(self, cookies):
for cookie in cookies:
if cookie.startswith("UMCSessionId"):
# UMCSessionId-1234 -> Host: localhost:1234
host_port = cookie[13:]
if host_port:
host_port = ":{}".format(host_port)
get_logger("user").debug("no user given")
return None, None
headers = {"Host": "localhost" + host_port}
get_logger("user").debug("searching user for cookies=%r" % cookies)
username = await self._ask_umc(cookies, headers)
if username is None:
get_logger("user").debug("no user found")
return None, None
get_logger("user").debug("found %s" % (username,))
return username.lower(), username
async def _ask_umc(self, cookies, headers):
headers['Cookie'] = '; '.join('='.join(c) for c in cookies.items())
req = HTTPRequest(self.umc_session_url, method="GET", headers=headers)
http_client = AsyncHTTPClient()
response = await http_client.fetch(req)
data = json.loads(response.body.decode('UTF-8'))
username = data["result"]["username"]
except HTTPError as exc:
get_logger("user").error("request failed: %s" % exc)
except EnvironmentError as exc:
get_logger("user").error("connection failed: %s" % exc)
except ValueError:
get_logger("user").error("malformed answer!")
except KeyError:
get_logger("user").warning("session unknown!")
return username