#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention UCS@school
# Copyright 2016-2025 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
"""Create historically unique usernames/email addresses."""
from __future__ import absolute_import
import logging
import re
import string
from typing import TYPE_CHECKING, Callable, Dict, Optional # noqa: F401
import lazy_object_proxy
from ldap.dn import escape_dn_chars
from six import PY3, string_types
from univention.admin.uexceptions import noObject, objectExists
from ..configuration import Configuration
from ..exceptions import BadValueStored, FormatError, NameKeyExists, NoValueStored
from .ldap_connection import get_admin_connection, get_unprivileged_connection
if TYPE_CHECKING:
import univention.admin.uldap # noqa: F401
[docs]
class NameCounterStorageBackend(object):
[docs]
def create(self, name, value): # type: (str, int) -> None
"""
Store a value for a new name.
:param str name: name
:param int value: value to store
:return: None
:raises NameKeyExists: if a value is already stored to such a `name`
"""
raise NotImplementedError()
[docs]
def modify(self, name, old_value, new_value): # type: (str, int, int) -> None
"""
Store a value for an existing name.
:param str name: name
:param int old_value: old value
:param int new_value: new value
:return: None
:raises NameKeyExists: if no value is stored by that `name`
"""
raise NotImplementedError()
[docs]
def retrieve(self, name): # type: (str) -> int
"""
Retrieve a value for a name.
:param str name: name to retrieve
:return: int
:raises NoValueStored: if no value is stored by that `name`
:raises BadValueStored: if the value has a bad format
"""
raise NotImplementedError()
[docs]
def remove(self, name): # type: (str) -> None
"""
Remove a name from storage.
:param str name: name
:return: None
"""
raise NotImplementedError()
[docs]
def purge(self): # type: () -> None
"""
Remove *all* names from storage. *NEVER* do this in a production environment!
:return: None
"""
raise NotImplementedError()
[docs]
class LdapStorageBackend(NameCounterStorageBackend):
"""
Prior to using this class, a node must exist in LDAP:
'cn=unique-<attribute_name>,cn=ucsschool,cn=univention,<base>'.
"""
def __init__(self, attribute_storage_name, lo=None, pos=None):
# type: (str, Optional[univention.admin.uldap.access], Optional[univention.admin.uldap.position]) -> None # noqa: E501
if lo and pos:
self.lo, _pos = lo, pos
else:
self.lo, _pos = get_admin_connection()
self.ldap_base = "cn=unique-{},cn=ucsschool,cn=univention,{}".format(
escape_dn_chars(attribute_storage_name), self.lo.base
)
[docs]
def create(self, name, value): # type: (str, int) -> None
try:
self.lo.add(
"cn={},{}".format(escape_dn_chars(name), self.ldap_base),
[
("objectClass", b"ucsschoolUsername"),
("ucsschoolUsernameNextNumber", str(value).encode("UTF-8")),
],
)
except objectExists:
raise NameKeyExists("Cannot create key {!r} - already exists.".format(name))
[docs]
def modify(self, name, old_value, new_value): # type: (str, int, int) -> None
try:
self.lo.modify(
"cn={},{}".format(escape_dn_chars(name), self.ldap_base),
[
(
"ucsschoolUsernameNextNumber",
str(old_value).encode("UTF-8"),
str(new_value).encode("UTF-8"),
)
],
)
except noObject:
raise NoValueStored("Name {!r} not found.".format(name))
[docs]
def retrieve(self, name): # type: (str) -> int
try:
res = self.lo.get(
"cn={},{}".format(escape_dn_chars(name), self.ldap_base),
attr=["ucsschoolUsernameNextNumber"],
)["ucsschoolUsernameNextNumber"][0].decode("UTF-8")
except (KeyError, noObject):
raise NoValueStored("Name {!r} not found.".format(name))
try:
return int(res)
except ValueError as exc:
raise BadValueStored("Value for name {!r} has wrong format: {}".format(name, exc))
[docs]
def remove(self, name): # type: (str) -> None
try:
self.lo.delete("cn={},{}".format(escape_dn_chars(name), self.ldap_base))
except noObject:
pass
[docs]
def purge(self): # type: () -> None
"""
Remove *all* names from storage. *NEVER* do this in a production environment!
:return: None
"""
for dn in self.lo.searchDn(
filter="objectClass=ucsschoolUsername",
base=self.ldap_base,
):
self.lo.delete(dn)
[docs]
class MemoryStorageBackend(NameCounterStorageBackend):
def __init__(self, attribute_storage_name): # type: (str) -> None
self._mem_store = {} # type: Dict[str, int]
lo, po = get_unprivileged_connection()
self.ldap_backend = LdapStorageBackend(attribute_storage_name, lo, po)
[docs]
def create(self, name, value): # type: (str, int) -> None
self._mem_store[name] = value
[docs]
def modify(self, name, old_value, new_value): # type: (str, int, int) -> None
self._mem_store[name] = new_value
[docs]
def retrieve(self, name): # type: (str) -> int
try:
res = self._mem_store[name]
except KeyError:
res = self.ldap_backend.retrieve(name)
self._mem_store[name] = res
return res
[docs]
def remove(self, name): # type: (str) -> None
"""
This will remove the key only from memory. It may still be stored in
the LDAP backend.
:param str name: name
:return: None
"""
try:
del self._mem_store[name]
except KeyError:
pass
[docs]
def purge(self): # type: () -> None
"""
This will remove keys only from memory. They may still be stored in
the LDAP backend.
:return: None
"""
self._mem_store = {}
[docs]
class UsernameHandler(object):
"""
>>> BAD_CHARS = ''.join(sorted(set(map(chr, range(128))) - set(UsernameHandler(20).allowed_chars)))
>>> UsernameHandler(20).format_username('Max.Mustermann')
'Max.Mustermann'
>>> UsernameHandler(20).format_username('Foo[COUNTER2][COUNTER2]') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
FormatError:
>>> UsernameHandler(20).format_username('.')
''
>>> UsernameHandler(20).format_username('.Max.Mustermann.')
'Max.Mustermann'
>>> UsernameHandler(4).format_username('Max.Mustermann')
'Max'
>>> for c in BAD_CHARS:
... assert 'Max' == UsernameHandler(20).format_username('Ma%sx' % (c,))
...
>>> UsernameHandler(20).format_username('Max.Mustermann12.4567890')
'Max.Mustermann12.456'
>>> for c in '.1032547698ACBEDGFIHKJMLONQPSRUTWVYXZacbedgfihkjmlonqpsrutwvyxz':
... assert 'Ma%sx' % (c,) == UsernameHandler(20).format_username('Ma%sx' % (c,))
...
>>> UsernameHandler(20).format_username('Max[Muster]Mann')
'MaxMusterMann'
>>> UsernameHandler(20).format_username('Max[ALWAYSCOUNTER].Mustermann')
'Max1.Mustermann'
>>> UsernameHandler(20).format_username('Max[ALWAYSCOUNTER].Mustermann')
'Max2.Mustermann'
>>> UsernameHandler(20).format_username('Max[ALWAYSCOUNTER].Mustermann')
'Max3.Mustermann'
>>> UsernameHandler(20).format_username('Max[COUNTER2].Mustermann')
'Max4.Mustermann'
>>> UsernameHandler(20).format_username('Maria[ALWAYSCOUNTER].Musterfrau')
'Maria1.Musterfrau'
>>> UsernameHandler(20).format_username('Moritz[COUNTER2]')
'Moritz'
>>> UsernameHandler(20).format_username('Moritz[COUNTER2]')
'Moritz2'
>>> UsernameHandler(20).format_username('Foo[ALWAYSCOUNTER]')
'Foo1'
>>> for i, c in enumerate(BAD_CHARS + BAD_CHARS, 2):
... username = UsernameHandler(20).format_username('Fo%so[ALWAYSCOUNTER]' % (c,))
... assert 'Foo%d' % (i,) == username, (username, i, c)
>>> UsernameHandler(8).format_username('aaaa[COUNTER2]bbbbcccc')
'aaaab'
>>> UsernameHandler(8).format_username('aaaa[COUNTER2]bbbbcccc')
'aaaa2b'
>>> UsernameHandler(8).format_username('bbbb[ALWAYSCOUNTER]ccccdddd')
'bbbb1c'
>>> UsernameHandler(8).format_username('bbbb[ALWAYSCOUNTER]ccccdddd')
'bbbb2c'
>>> UsernameHandler(20).format_username('..[ALWAYSCOUNTER]..')
''
>>> UsernameHandler(20).format_username('[ALWAYSCOUNTER]')
''
>>> UsernameHandler(20).format_username('[FOObar]')
'FOObar'
""" # noqa: E501
attribute_name = "username"
attribute_storage_name = "usernames"
def __init__(self, max_length, dry_run=True): # type: (int, bool) -> None
"""
:param int max_length: created usernames will be no longer
than this
:param bool dry_run: if False use LDAP to store already-used usernames
if True store for one run only in memory
"""
self.max_length = max_length
self.dry_run = dry_run
self.logger = logging.getLogger(__name__)
self.config = lazy_object_proxy.Proxy(lambda: Configuration())
self.storage_backend = self.get_storage_backend()
self.logger.debug("%r storage_backend=%r", self, self.storage_backend.__class__.__name__)
self.replacement_variable_pattern = re.compile(
r"(%s)" % "|".join(map(re.escape, self.counter_variable_to_function.keys())),
flags=re.IGNORECASE,
)
def __repr__(self): # type: () -> str
return "{}(max_length={!r}, dry_run={!r})".format(
self.__class__.__name__, self.max_length, self.dry_run
)
@property
def allowed_chars(self): # type: () -> str
return (
string.ascii_letters + string.digits + str(self.config["username"]["allowed_special_chars"])
)
[docs]
def get_storage_backend(self): # type: () -> NameCounterStorageBackend
"""
:return: NameCounterStorageBackend instance
:rtype: NameCounterStorageBackend
"""
if self.dry_run:
return MemoryStorageBackend(attribute_storage_name=self.attribute_storage_name)
else:
return LdapStorageBackend(attribute_storage_name=self.attribute_storage_name)
[docs]
def remove_bad_chars(self, name): # type: (str) -> str
"""
Remove characters disallowed for names.
* Name must only contain numbers, letters and dots, and may not be 'admin'!
* Name must not start or end in a dot.
:param str name: name to check
:return: copy of input, possibly modified
:rtype: str
"""
if not self.allowed_chars:
return name
bad_chars = "".join(set(name).difference(set(self.allowed_chars)))
if bad_chars:
self.logger.warning(
"Removing disallowed characters %r from %s %r.",
"".join(sorted(bad_chars)),
self.attribute_name,
name,
)
for char in self.config["username"]["allowed_special_chars"]:
if name.startswith(char) or name.endswith(char):
self.logger.warning(
"Removing disallowed character %r from start and end of %s %r.",
char,
self.attribute_name,
name,
)
name = name.strip(char)
if PY3:
return str(name).translate(str.maketrans("", "", bad_chars))
return str(name).translate(None, bad_chars)
@property
def counter_variable_to_function(self): # type: () -> Dict[str, Callable[[str], str]]
"""
Subclass->override this to support other variables than [ALWAYSCOUNTER]
and [COUNTER2] or change their meaning. Add/Modify corresponding
methods in your subclass.
Variables have to start with '[', end with ']' and must be all
upper case.
:return: mapping: variable name -> function
:rtype: dict
"""
return {"[ALWAYSCOUNTER]": self.always_counter, "[COUNTER2]": self.counter2}
[docs]
def always_counter(self, name_base): # type: (str) -> str
"""
[ALWAYSCOUNTER]
:param str name_base: the name without [ALWAYSCOUNTER]
:return: number to append to name
:rtype: str
"""
return self.get_and_raise(name_base, "1")
[docs]
def counter2(self, name_base): # type: (str) -> str
"""
[COUNTER2]
:param str name_base: the name without [COUNTER2]
:return: number to append to name
:rtype: str
"""
return self.get_and_raise(name_base, "")
[docs]
def get_and_raise(self, name_base, initial_value): # type: (str, str) -> str
"""
Returns the current counter value or initial_value if unset and stores
it raised by 1.
:param str name_base: name without []
:param str initial_value: lowest value
:return: current counter value
:rtype: str
"""
try:
num = self.storage_backend.retrieve(name_base)
self.storage_backend.modify(name_base, num, num + 1)
res = str(num)
# not handling BadValueStored, because a data corruption should stop the import
except NoValueStored:
res = initial_value
self.storage_backend.create(name_base, 2)
return res
[docs]
class EmailHandler(UsernameHandler):
"""
Create unique email addresses.
* Maximum length of an email address is 254 characters.
* Applies counters [ALWAYSCOUNTER/COUNTER2] to local part (left of @) only.
"""
attribute_name = "email"
attribute_storage_name = "email"
def __init__(self, max_length=254, dry_run=True): # type: (int, bool) -> None
"""
:param int max_length maximum length of email address
:param bool dry_run: if False use LDAP to store already-used email addresses
if True store for one run only in memory
"""
super(EmailHandler, self).__init__(max_length, dry_run)
# almost everything is allowed in email addresses (with complicated restrictions)
@property
def allowed_chars(self): # type: () -> str
return ""
[docs]
def remove_bad_chars(self, name): # type: (str) -> str
"""
Space is actually allowed (inside a quoted string), but we'll remove
it anyway. (Although technically allowed, not all mail servers support it.)
"""
bad_chars = "".join(set(name).intersection(set(string.whitespace)))
if bad_chars:
self.logger.warning(
"Removing disallowed characters %r from %s %r.",
"".join(sorted(bad_chars)),
self.attribute_name,
name,
)
if PY3:
return str(name).translate(str.maketrans("", "", bad_chars))
return str(name).translate(None, bad_chars)