# SPDX-FileCopyrightText: 2004-2026 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""|UDM| wrapper around :py:mod:`univention.license` that translates error codes to exceptions"""
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from univention.admin.uldap import access
import ldap
from ldap.filter import filter_format
import univention.admin.localization
import univention.admin.modules
import univention.admin.uexceptions
import univention.license
from univention.admin._ucr import configRegistry
from univention.admin.log import log
from univention.lib.misc import custom_username
translation = univention.admin.localization.translation('univention/admin')
_ = translation.translate
log = log.getChild('LICENSE')
_license = None
LDAP_FILTER_license_module = '(&(objectClass=univentionLicense)(univentionLicenseModule=%s))'
LDAP_FILTER_normal_user_account = '(univentionObjectType=users/user)'
LDAP_FILTER_account_not_disabled = '(!(&(shadowExpire=1)(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ]))))'
LDAP_FILTER_managedclients = '(|(objectClass=univentionWindows)(objectclass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionCorporateClient)(objectClass=univentionMacOSClient))'
[docs]
def ldap_filter_not_objectflag(flag_string_list):
ldap_filter_parts = [filter_format('(univentionObjectFlag=%s)', [flag_string]) for flag_string in flag_string_list]
if not ldap_filter_parts:
return ''
elif len(ldap_filter_parts) == 1:
return '(!%s)' % ''.join(ldap_filter_parts)
else:
return '(!(|%s))' % ''.join(ldap_filter_parts)
user_exclude_objectflags = ['temporary', 'functional', 'hidden']
managedclient_exclude_objectflags = []
if configRegistry.is_true('ad/member'):
user_exclude_objectflags.append('synced')
managedclient_exclude_objectflags.append('synced')
[docs]
class License:
"""Non public interface for license handling."""
(ACCOUNT, CLIENT, DESKTOP, GROUPWARE) = range(4)
(USERS, SERVERS, MANAGEDCLIENTS) = range(3)
SYSACCOUNTS = 5
def __init__(self):
if _license:
raise RuntimeError('never create this object directly')
self.new_license = False
self.disable_add = 0
self._expired = False
self.endDate = None
self.oemProductTypes = []
self.licenseBase = None
self.types = []
self.version = '2'
self.searchResult = None
self.sysAccountNames = (
custom_username('Administrator'),
'krbkeycloak',
'join-backup',
'join-slave',
'spam',
'oxadmin',
'krbtgt',
'pcpatch', # opsi app
'opsiconfd', # opsi app
custom_username('Guest'),
'dns-*',
'http-%s' % configRegistry.get('hostname'),
'http-proxy-%s' % configRegistry.get('hostname'),
'zarafa-%s' % configRegistry.get('hostname'),
custom_username('SBSMonAcct'), # SBS account
custom_username('Network Administrator'), # SBS role
custom_username('Standard User'), # SBS role
custom_username('WebWorkplaceTools'), # SBS role "Standard User with administration links"
'IUSR_WIN-*', # IIS account
'sys-idp-user', # Keycloak
)
self.sysAccountsFound = 0
self.licenses = {
'2': {
# Version 2 since UCS 3.1
License.USERS: None,
License.SERVERS: None,
License.MANAGEDCLIENTS: None,
},
}
self.real = {
'2': {
# Version 2 since UCS 3.1
License.USERS: 0,
License.SERVERS: 0,
License.MANAGEDCLIENTS: 0,
},
}
self.names = {
'2': {
# Version 2 since UCS 3.1
License.USERS: 'Users',
License.SERVERS: 'Servers',
License.MANAGEDCLIENTS: 'Managed Clients',
},
}
self.keys = {
'2': {
# Version 1 till UCS 3.1
License.USERS: 'univentionLicenseUsers',
License.SERVERS: 'univentionLicenseServers',
License.MANAGEDCLIENTS: 'univentionLicenseManagedClients',
},
}
self.filters = {
'2': {
# Version 2 since UCS 3.1
License.USERS: '(&%s)' % ''.join([LDAP_FILTER_normal_user_account, ldap_filter_not_objectflag(user_exclude_objectflags), LDAP_FILTER_account_not_disabled]),
License.SERVERS: '(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(!(univentionObjectFlag=docker)))',
# Managed Clients, Windows Clients, Ubuntu Clients, Linux Clients, MaxOS X Clients
License.MANAGEDCLIENTS: '(&%s)' % ''.join([LDAP_FILTER_managedclients, ldap_filter_not_objectflag(managedclient_exclude_objectflags)]),
},
}
self.__selected = False
self.use_cache = True
self.module = 'admin'
[docs]
def init_select(self, lo, module, use_cache: bool = True):
self.use_cache = use_cache
self.select(module, lo)
return self.set_values(lo)
[docs]
def select(self, module, lo=None):
if not self.__selected:
self.module = module
self.error = univention.license.select(module)
if self.error != 0 and lo:
# Try to set the version even if the license load was not successful
self.searchResult = lo.search(filter=filter_format(LDAP_FILTER_license_module, [self.module]))
self.set_values(lo)
try:
if int(self.version) < 2:
raise ValueError()
except ValueError:
raise univention.admin.uexceptions.licenseInvalid()
if self.error != 0:
if self.error == -1:
raise univention.admin.uexceptions.licenseNotFound()
elif self.error & 2 == 2:
raise univention.admin.uexceptions.licenseExpired()
elif self.error & 4 == 4:
raise univention.admin.uexceptions.licenseWrongBaseDn()
else: # 1 == invalid signature
raise univention.admin.uexceptions.licenseInvalid()
self.__selected = True
[docs]
def set_values(self, lo):
self.version = version = self.__getValue('univentionLicenseVersion', '2', 'Version', None)
self.licenses[version][License.USERS] = self.__getValue(self.keys[version][License.USERS], None, 'Users', 'Users not found')
self.licenses[version][License.SERVERS] = self.__getValue(self.keys[version][License.SERVERS], None, 'Servers', 'Servers not found')
self.licenses[version][License.MANAGEDCLIENTS] = self.__getValue(self.keys[version][License.MANAGEDCLIENTS], None, 'Managed Clients', 'Managed Clients not found')
self.types = [self.__getValue('univentionLicenseProduct', 'Univention Corporate Server', 'License Product', 'Product attribute not found')]
oem_types = self.__getValue('univentionLicenseOEMProduct', None, 'License Type', 'univentionLicenseOEMProduct attribute not found')
self.oemProductTypes = [oem_types] if oem_types else []
self.types.extend(self.oemProductTypes)
self.endDate = self.__getValue('univentionLicenseEndDate', None, 'License end date', 'univentionLicenseEndDate attribute not found')
userfilter = '(|%s)' % ''.join(filter_format('(uid=%s)', [account]) for account in self.sysAccountNames)
self.sysAccountsFound = len(lo.authz_connection.searchDn(filter='(&%s%s)' % (userfilter, self.filters['2'][License.USERS])))
log.debug('system accounts found', count=self.sysAccountsFound)
disable_add = 0
if self.new_license:
self.licenseKeyID = self.__getValue('univentionLicenseKeyID', '')
self.licenseSupport = self.__getValue('univentionLicenseSupport', '0')
self.licensePremiumSupport = self.__getValue('univentionLicensePremiumSupport', '0')
self.licenseBase = self.__getValue('univentionLicenseBaseDN', '')
self.real[version][License.USERS] = self.__countObject(License.USERS, lo)
self.real[version][License.SERVERS] = self.__countObject(License.SERVERS, lo)
self.real[version][License.MANAGEDCLIENTS] = self.__countObject(License.MANAGEDCLIENTS, lo)
lic_users = self.licenses[version][License.USERS]
real_users = self.real[version][License.USERS]
if lic_users and self.__cmp_gt(int(real_users) - self.sysAccountsFound, lic_users):
disable_add = 6
# The license should be valid even if we have more servers than the license allowed
# if lic_servers and self.__cmp_gt(real_servers, lic_servers):
# disable_add = 7
lic_managedclients = self.licenses[version][License.MANAGEDCLIENTS]
real_managedclients = self.real[version][License.MANAGEDCLIENTS]
if lic_managedclients and self.__cmp_gt(real_managedclients, lic_managedclients):
disable_add = 8
if disable_add:
self._expired = True
elif not disable_add and self.licenseBase in ('Free for personal use edition', 'UCS Core Edition'):
disable_add = 5
log.debug('Univention License', users=real_users, clients=real_managedclients)
# remove operations for adding or modifying if license is expired
if self._expired:
for mod in univention.admin.modules.modules.values():
if hasattr(mod, 'operations'):
try:
mod.operations.remove('add')
except Exception:
pass
try:
mod.operations.remove('edit')
except Exception:
pass
return disable_add
def __getValue(self, key, default, name='', errormsg=''):
name = name or key
try:
value = univention.license.getValue(key)
self.new_license = True
log.debug('Univention License allowed', license=name, value=value)
except (KeyError, Exception):
if self.searchResult:
value = self.searchResult[0][1].get(key, [default])[0]
if isinstance(value, bytes):
value = value.decode('ASCII')
self.new_license = True
else:
log.debug('get value failed', license=name, error=errormsg)
return default
log.debug('get license value:', license=name, value=value)
return value
def __cmp_gt(self, val1, val2):
return self.compare(val1, val2) == 1
[docs]
def compare(self, val1, val2):
if val1 == 'unlimited' and val2 == 'unlimited':
return 0
if val1 == 'unlimited':
return 1
if val2 == 'unlimited':
return -1
return self.__cmp(int(val1), int(val2))
def __cmp(self, x, y):
"""
Replacement for built-in function cmp that was removed in Python 3
Compare the two objects x and y and return an integer according to
the outcome. The return value is negative if x < y, zero if x == y
and strictly positive if x > y.
"""
return (x > y) - (x < y)
def __countObject(self, obj, lo):
licenses = lo.authz_connection.search(filter=filter_format(LDAP_FILTER_license_module, [self.module]), attr=['modifyTimestamp', 'univentionUsedLicenseUsers', 'univentionUsedLicenseServers', 'univentionUsedLicenseManagedClients'])
try:
_, attrs = licenses[0]
except IndexError:
attrs = {}
version = self.version
usedkey = self.keys[version][obj].replace('univentionLicense', 'univentionUsedLicense')
value = int(attrs.get(usedkey, [b'0'])[0].decode('ASCII'))
# Return cached value if cache is within TTL
if self.use_cache and value and self._check_cache_ttl(attrs.get('modifyTimestamp', [b''])[0].decode('ASCII')):
self.real[version][obj] = value
return value
self.real[version][obj] = len(lo.authz_connection.searchDn(filter=self.filters[version][obj]))
return self.real[version][obj]
[docs]
def recheck(self, lo) -> bool:
res = lo.authz_connection.search(
filter=filter_format(LDAP_FILTER_license_module, [self.module]),
attr=['modifyTimestamp'],
)
attrs = res[0][1] if res else {}
return not self._check_cache_ttl(attrs.get('modifyTimestamp', [b''])[0].decode('ASCII'))
def _check_cache_ttl(self, modify_timestamp) -> int:
cache_ttl = min(max(0, configRegistry.get_int('directory/manager/license-cache/ttl', 3600)), 86400)
if cache_ttl > 0 and modify_timestamp:
dt = datetime.strptime(modify_timestamp, "%Y%m%d%H%M%SZ").replace(tzinfo=UTC)
return datetime.now(UTC) - dt < timedelta(seconds=cache_ttl)
return False
[docs]
def update_cache(self, lo: 'access') -> dict:
"""
Force update all license count caches.
This performs expensive LDAP searches and should only be called by
the cron job or after license import, not during UMC login.
Returns dict with 'users', 'servers', 'clients' counts.
"""
dns = lo.authz_connection.searchDn(filter=filter_format(LDAP_FILTER_license_module, [self.module]))
if not dns:
log.warning('No license object found, cannot update cache')
return {'users': 0, 'servers': 0, 'clients': 0}
dn = dns[0]
version = self.version
counts = {}
for obj, name in [(License.USERS, 'users'), (License.SERVERS, 'servers'), (License.MANAGEDCLIENTS, 'clients')]:
result = lo.authz_connection.searchDn(filter=self.filters[version][obj])
count = len(result) if result else 0
counts[name] = count
self.real[version][obj] = count
usedkey = self.keys[version][obj].replace('univentionLicense', 'univentionUsedLicense')
try:
lo.authz_connection.modify(dn, [(usedkey, [b'X'], [str(count).encode('ASCII')])], ignore_license=True)
except (univention.admin.uexceptions.base, ldap.LDAPError) as exc:
log.warning('Failed to update license cache for %s: %s', name, exc)
log.info('License cache updated', **counts)
return counts
def _get_license_entry_uuid(self, lo):
for dn, attr in lo.authz_connection.search(filter='(&(objectClass=univentionLicense)(univentionLicenseModule=admin))', attr=['entryUUID']):
return attr['entryUUID'][0].decode('ASCII')
return configRegistry.get('uuid/license', '00000000-0000-0000-000-0000000000000')
class LicenseWrapper:
"""Licence interface."""
def initialize(self, lo: 'access') -> None:
"""Initialize license check. Call this before accessing license data."""
try:
_license.init_select(lo, 'admin')
except univention.admin.uexceptions.licenseError:
pass # the license signature is invalid, expired, etc but not the limits are reached, (could not be found?)
def update_cache(self, lo: 'access') -> dict:
"""
Force update the license count cache.
This performs expensive LDAP searches to count users, servers, and clients,
then updates the cache attributes on the license object.
Should be called by:
- The cron job (hourly by default)
- After license import
- When quota is exceeded and admin re-logs in
"""
return _license.update_cache(lo)
def recheck(self, lo) -> bool:
"""
Force-update the license cache and re-check quota.
Only runs if the cache is stale (older than TTL), so fresh-cache over-quota
logins are fast. Returns True if now within quota.
"""
if not _license.recheck(lo):
return False
log.info('Cache is stale and quota exceeded, force-updating license cache...')
try:
admin_lo, _ = univention.admin.uldap.getAdminConnection()
except OSError:
return False
self.initialize(admin_lo)
self.update_cache(admin_lo)
try:
lo.check_license()
except univention.admin.uexceptions.licenseError:
log.info('Still over quota after cache update')
return False
return True
def get_user_limit(self) -> int | None:
limit = _license.licenses[_license.version][License.USERS]
if limit == 'unlimited':
return None
return int(limit or 0)
def get_server_limit(self) -> int | None:
limit = _license.licenses[_license.version][License.SERVERS]
if limit == 'unlimited':
return None
return int(limit or 0)
def get_client_limit(self) -> int | None:
limit = _license.licenses[_license.version][License.MANAGEDCLIENTS]
if limit == 'unlimited':
return None
return int(limit or 0)
def get_user_total(self) -> int:
return _license.real[_license.version][License.USERS]
def get_server_total(self) -> int:
return _license.real[_license.version][License.SERVERS]
def get_client_total(self) -> int:
return _license.real[_license.version][License.MANAGEDCLIENTS]
def get_system_accounts(self):
return _license.sysAccountsFound
def get_key_id(self, lo):
return _license.licenseKeyID or _license._get_license_entry_uuid(lo)
def get_license_data(self):
def tryint(val, default=-1):
try:
return int(val or 0)
except ValueError:
return default
license_data = {}
license_data['version'] = _license.version
license_data['limits'] = {
'user': self.get_user_limit(),
'server': self.get_server_limit(),
'client': self.get_client_limit(),
}
license_data['used'] = {
'user': self.get_user_total(),
'server': self.get_server_total(),
'client': self.get_client_total(),
}
license_data['key_id'] = _license.licenseKeyID
license_data['support'] = tryint(_license.licenseSupport)
license_data['premium_support'] = tryint(_license.licensePremiumSupport)
license_data['license_types'] = _license.types
license_data['oem_product_types'] = _license.oemProductTypes
license_data['end_date'] = _license.endDate
license_data['base_dn'] = _license.licenseBase
free_license = ''
if license_data['base_dn'] in ('Free for personal use edition', 'UCS Core Edition'):
free_license = 'core'
license_data['base_dn'] = configRegistry.get('ldap/base', '')
license_data['free_license'] = free_license
license_data['system_accounts_total'] = _license.sysAccountsFound
return license_data
_license = License()
license = LicenseWrapper()
del LicenseWrapper
# for compatibility
select = _license.select
init_select = _license.init_select