Source code for univention.admin.license

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| wrapper around :py:mod:`univention.license` that translates error codes to exceptions"""

import collections

from ldap.filter import filter_format

import univention.admin.filter
import univention.admin.license_data as licenses
import univention.admin.localization
import univention.admin.modules
import univention.admin.uexceptions
import univention.license
from univention.admin._ucr import configRegistry
from univention.admin.log import log
from univention.lib.misc import custom_username


translation = univention.admin.localization.translation('univention/admin')
_ = translation.translate

log = log.getChild('LICENSE')

_license = None

LDAP_FILTER_normal_user_account = '(univentionObjectType=users/user)'
LDAP_FILTER_account_not_disabled = '(!(&(shadowExpire=1)(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD       ])(sambaAcctFlags=[ULD       ]))))'
LDAP_FILTER_managedclients = '(|(objectClass=univentionWindows)(objectclass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionCorporateClient)(objectClass=univentionMacOSClient))'


[docs] def ldap_filter_not_objectflag(flag_string_list): ldap_filter_parts = [] for flag_string in flag_string_list: ldap_filter_parts.append(filter_format('(univentionObjectFlag=%s)', [flag_string])) if not ldap_filter_parts: return '' elif len(ldap_filter_parts) == 1: return '(!%s)' % ''.join(ldap_filter_parts) else: return '(!(|%s))' % ''.join(ldap_filter_parts)
user_exclude_objectflags = ['temporary', 'functional', 'hidden'] managedclient_exclude_objectflags = [] if configRegistry.is_true('ad/member'): user_exclude_objectflags.append('synced') managedclient_exclude_objectflags.append('synced')
[docs] class License: (ACCOUNT, CLIENT, DESKTOP, GROUPWARE) = range(4) (USERS, SERVERS, MANAGEDCLIENTS, CORPORATECLIENTS, VIRTUALDESKTOPUSERS, VIRTUALDESKTOPCLIENTS) = range(6) SYSACCOUNTS = 5 def __init__(self): if _license: raise Exception('never create this object directly') self.new_license = False self.disable_add = 0 self._expired = False self.endDate = None self.oemProductTypes = [] self.licenseBase = None self.types = [] self.version = '1' self.searchResult = None self.sysAccountNames = ( custom_username('Administrator'), 'krbkeycloak', 'join-backup', 'join-slave', 'spam', 'oxadmin', 'krbtgt', 'pcpatch', # opsi app 'opsiconfd', # opsi app custom_username('Guest'), 'dns-*', 'http-%s' % configRegistry.get('hostname'), 'http-proxy-%s' % configRegistry.get('hostname'), 'zarafa-%s' % configRegistry.get('hostname'), custom_username('SBSMonAcct'), # SBS account custom_username('Network Administrator'), # SBS role custom_username('Standard User'), # SBS role custom_username('WebWorkplaceTools'), # SBS role "Standard User with administration links" 'IUSR_WIN-*', # IIS account ) self.sysAccountsFound = 0 self.licenses = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: None, License.CLIENT: None, License.DESKTOP: None, License.GROUPWARE: None, }, '2': { # Version 2 since UCS 3.1 License.USERS: None, License.SERVERS: None, License.MANAGEDCLIENTS: None, License.CORPORATECLIENTS: None, }, } self.real = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 0, License.CLIENT: 0, License.DESKTOP: 0, License.GROUPWARE: 0, }, '2': { # Version 2 since UCS 3.1 License.USERS: 0, License.SERVERS: 0, License.MANAGEDCLIENTS: 0, License.CORPORATECLIENTS: 0, }, } self.names = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 'Accounts', License.CLIENT: 'Clients', License.DESKTOP: 'Desktops', License.GROUPWARE: 'Groupware Accounts', }, '2': { # Version 2 since UCS 3.1 License.USERS: 'Users', License.SERVERS: 'Servers', License.MANAGEDCLIENTS: 'Managed Clients', License.CORPORATECLIENTS: 'Corporate Clients', }, } self.keys = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 'univentionLicenseAccounts', License.CLIENT: 'univentionLicenseClients', License.DESKTOP: 'univentionLicenseuniventionDesktops', License.GROUPWARE: 'univentionLicenseGroupwareAccounts', }, '2': { # Version 1 till UCS 3.1 License.USERS: 'univentionLicenseUsers', License.SERVERS: 'univentionLicenseServers', License.MANAGEDCLIENTS: 'univentionLicenseManagedClients', License.CORPORATECLIENTS: 'univentionLicenseCorporateClients', }, } self.filters = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: '(&(|(&(objectClass=posixAccount)(objectClass=shadowAccount))(objectClass=sambaSamAccount))(!(uidNumber=0))(!(uid=*$))(!(&(shadowExpire=1)(krb5KDCFlags=254)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))))', License.CLIENT: '(|(objectClass=univentionThinClient)(objectClass=univentionClient)(objectClass=univentionMobileClient)(objectClass=univentionWindows)(objectClass=univentionMacOSClient))', License.DESKTOP: '(|(objectClass=univentionThinClient)(&(objectClass=univentionClient)(objectClass=posixAccount))(objectClass=univentionMobileClient))', License.GROUPWARE: '(&(objectclass=kolabInetOrgPerson)(kolabHomeServer=*)(!(&(shadowExpire=1)(krb5KDCFlags=254)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))))', }, '2': { # Version 2 since UCS 3.1 License.USERS: '(&%s)' % ''.join([LDAP_FILTER_normal_user_account, ldap_filter_not_objectflag(user_exclude_objectflags), LDAP_FILTER_account_not_disabled]), License.SERVERS: '(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(!(univentionObjectFlag=docker)))', # Managed Clients, Windows Clients, Ubuntu Clients, Linux Clients, MaxOS X Clients License.MANAGEDCLIENTS: '(&%s)' % ''.join([LDAP_FILTER_managedclients, ldap_filter_not_objectflag(managedclient_exclude_objectflags)]), License.CORPORATECLIENTS: '(&(objectclass=univentionCorporateClient))', }, } self.__selected = False def _load_license_via_c_module(self, module): return univention.license.select(module) def _load_license_via_python(self, module, lo): # Try to set the version even if the license load was not successful self.searchResult = lo.authz_connection.search(filter=filter_format('(&(objectClass=univentionLicense)(univentionLicenseModule=%s))', [module])) if self.searchResult: self.version = self.searchResult[0][1].get('univentionLicenseVersion', [b'1'])[0].decode('ASCII')
[docs] def select(self, module, lo=None): if not self.__selected: self.error = self._load_license_via_c_module(module) if self.error != 0 and lo: self._load_license_via_python(module, lo) self.set_values(lo, module) self.__raiseException() self.__selected = True
[docs] def isValidFor(self, module): log.debug('check license for module', type=module, types=repr(self.types)) if module in licenses.modules: mlics = licenses.modules[module] log.debug('module for license', type=mlics) # empty list -> valid return mlics.valid(self.types) # unknown modules are always valid (e.g. customer modules) return True
[docs] def modifyOptions(self, mod): if mod in licenses.modules: opts = licenses.modules[mod].options(self.types) if opts: module = univention.admin.modules.modules[mod] if module and hasattr(module, 'options'): log.debug('modify options for license', options=opts) for opt, val in opts: if callable(val): val = val(self) if isinstance(val, collections.Sequence): module.options[opt].disabled, module.options[opt].default = val log.debug('modify options', options=opt, disabled=module.options[opt].disabled, default=module.options[opt].default)
[docs] def checkModules(self): deleted_mods = [] for mod in univention.admin.modules.modules.keys(): # remove module if valid license is missing if self.isValidFor(mod): log.debug('update: License is valid!!', type=mod) # check module options according to given license type self.modifyOptions(mod) else: log.debug('update: License is NOT valid!!', type=mod) del univention.admin.modules.modules[mod] deleted_mods.append(mod) # remove child modules that were deleted because of an invalid license for mod in univention.admin.modules.modules.values(): if hasattr(mod, 'childmodules'): new = [] for child in mod.childmodules: if child in deleted_mods: continue new.append(child) mod.childmodules = new # remove operations for adding or modifying if license is expired if self._expired: for mod in univention.admin.modules.modules.values(): if hasattr(mod, 'operations'): try: mod.operations.remove('add') except Exception: pass try: mod.operations.remove('edit') except Exception: pass
def __cmp(self, x, y): """ Replacement for built-in function cmp that was removed in Python 3 Compare the two objects x and y and return an integer according to the outcome. The return value is negative if x < y, zero if x == y and strictly positive if x > y. """ return (x > y) - (x < y) def __cmp_gt(self, val1, val2): return self.compare(val1, val2) == 1 def __cmp_eq(self, val1, val2): return self.compare(val1, val2) == 0
[docs] def compare(self, val1, val2): if val1 == 'unlimited' and val2 == 'unlimited': return 0 if val1 == 'unlimited': return 1 if val2 == 'unlimited': return -1 return self.__cmp(int(val1), int(val2))
[docs] def set_values(self, lo, module): self.__readLicense() disable_add = 0 self.__countSysAccounts(lo) if self.new_license: lic = None real = None if self.version == '1': self.__countObject(License.ACCOUNT, lo) self.__countObject(License.CLIENT, lo) self.__countObject(License.DESKTOP, lo) self.__countObject(License.GROUPWARE, lo) lic = ( self.licenses[self.version][License.ACCOUNT], self.licenses[self.version][License.CLIENT], self.licenses[self.version][License.DESKTOP], self.licenses[self.version][License.GROUPWARE], ) real = ( self.real[self.version][License.ACCOUNT], self.real[self.version][License.CLIENT], self.real[self.version][License.DESKTOP], self.real[self.version][License.GROUPWARE], ) elif self.version == '2': self.__countObject(License.USERS, lo) self.__countObject(License.SERVERS, lo) self.__countObject(License.MANAGEDCLIENTS, lo) self.__countObject(License.CORPORATECLIENTS, lo) lic = ( self.licenses[self.version][License.USERS], self.licenses[self.version][License.SERVERS], self.licenses[self.version][License.MANAGEDCLIENTS], self.licenses[self.version][License.CORPORATECLIENTS], ) real = ( self.real[self.version][License.USERS], self.real[self.version][License.SERVERS], self.real[self.version][License.MANAGEDCLIENTS], self.real[self.version][License.CORPORATECLIENTS], ) self.licenseKeyID = self.__getValue('univentionLicenseKeyID', '') self.licenseSupport = self.__getValue('univentionLicenseSupport', '0') self.licensePremiumSupport = self.__getValue('univentionLicensePremiumSupport', '0') disable_add = self.checkObjectCounts(lic, real) self.licenseBase = self.__getValue('univentionLicenseBaseDN', '') if disable_add: self._expired = True elif not disable_add and self.licenseBase in ('Free for personal use edition', 'UCS Core Edition'): disable_add = 5 # check modules list for validity and accepted operations self.checkModules() return disable_add
[docs] def init_select(self, lo, module): self.select(module, lo) return self.set_values(lo, module)
[docs] def checkObjectCounts(self, lic, real): disable_add = 0 if self.version == '1': lic_account, lic_client, lic_desktop, lic_groupware = lic real_account, real_client, real_desktop, real_groupware = real if lic_client and lic_account: if self.__cmp_gt(lic_account, lic_client) and self.__cmp_gt(real_client, lic_client): disable_add = 1 elif self.__cmp_gt(lic_client, lic_account) and self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 elif self.__cmp_eq(lic_client, lic_account): if self.__cmp_gt(real_client, lic_client): disable_add = 1 elif self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 else: if lic_client and self.__cmp_gt(real_client, lic_client): disable_add = 1 if lic_account and self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 if lic_desktop and real_desktop and self.__cmp_gt(real_desktop, lic_desktop): log.trace('Code 3') disable_add = 3 if lic_groupware and real_groupware and self.__cmp_gt(real_groupware, lic_groupware): log.trace('Code 4') disable_add = 4 elif self.version == '2': ( lic_users, _lic_servers, lic_managedclients, lic_corporateclients, ) = lic ( real_users, _real_servers, real_managedclients, real_corporateclients, ) = real if lic_users and self.__cmp_gt(int(real_users) - self.sysAccountsFound, lic_users): disable_add = 6 # The license should be valid even if we have more servers than the license allowed # if lic_servers and self.__cmp_gt( real_servers, lic_servers ): # disable_add = 7 if lic_managedclients and self.__cmp_gt(real_managedclients, lic_managedclients): disable_add = 8 if lic_corporateclients and self.__cmp_gt(real_corporateclients, lic_corporateclients): disable_add = 9 return disable_add
def __countSysAccounts(self, lo): version = self.version if version not in self.licenses: version = '2' userfilter = [univention.admin.filter.expression('uid', account) for account in self.sysAccountNames] filter = univention.admin.filter.conjunction('&', [univention.admin.filter.conjunction('|', userfilter), self.filters[version][License.USERS]]) try: self.sysAccountsFound = len(lo.authz_connection.searchDn(filter=str(filter))) except univention.admin.uexceptions.noObject: pass log.debug('Univention sysAccountsFound', count=self.sysAccountsFound) def __countObject(self, obj, lo): version = self.version if version not in self.licenses: version = '2' if self.licenses[version][obj] and self.licenses[version][obj] != 'unlimited': result = lo.authz_connection.searchDn(filter=self.filters[version][obj]) if result is None: self.real[version][obj] = 0 else: self.real[version][obj] = len(result) log.debug('Univention License', license=self.names[version][obj], count=self.real[version][obj]) else: self.real[version][obj] = 0 def __raiseException(self): if self.error != 0: if self.error == -1: raise univention.admin.uexceptions.licenseNotFound() elif self.error == 2: raise univention.admin.uexceptions.licenseExpired() elif self.error == 4: raise univention.admin.uexceptions.licenseWrongBaseDn() else: raise univention.admin.uexceptions.licenseInvalid() def __getValue(self, key, default, name='', errormsg=''): name = name or key try: value = univention.license.getValue(key) self.new_license = True log.debug('Univention License allowed', license=name, value=value) except (KeyError, Exception): if self.searchResult: value = self.searchResult[0][1].get(key, [default]) value = [x.decode('ASCII') if isinstance(x, bytes) else x for x in value] if not isinstance(default, list): value = value[0] self.new_license = True else: log.debug('get value failed', license=name, error=errormsg) value = default log.debug('get license value:', license=name, value=value) return value def __readLicense(self): self.version = self.__getValue('univentionLicenseVersion', '1', 'Version', None) if self.version == '1': self.licenses[self.version][License.ACCOUNT] = self.__getValue(self.keys[self.version][License.ACCOUNT], None, 'Accounts', 'Univention Accounts not found') self.licenses[self.version][License.CLIENT] = self.__getValue(self.keys[self.version][License.CLIENT], None, 'Clients', 'Univention Clients not found') self.licenses[self.version][License.DESKTOP] = self.__getValue(self.keys[self.version][License.DESKTOP], 2, 'Desktops', 'Univention Desktops not found') self.licenses[self.version][License.GROUPWARE] = self.__getValue(self.keys[self.version][License.GROUPWARE], 2, 'Groupware Accounts', 'Groupware not found') # if no type field is found it must be an old UCS license (<=1.3-0) self.types = self.__getValue('univentionLicenseType', ['UCS'], 'License Type', 'Type attribute not found') if not isinstance(self.types, list | tuple): self.types = [self.types] self.types = list(self.types) # handle license type "OXAE" the same way as license type "UCS" if 'OXAE' in self.types and 'UCS' not in self.types: self.types.append('UCS') elif self.version == '2': self.licenses[self.version][License.USERS] = self.__getValue(self.keys[self.version][License.USERS], None, 'Users', 'Users not found') self.licenses[self.version][License.SERVERS] = self.__getValue(self.keys[self.version][License.SERVERS], None, 'Servers', 'Servers not found') self.licenses[self.version][License.MANAGEDCLIENTS] = self.__getValue( self.keys[self.version][License.MANAGEDCLIENTS], None, 'Managed Clients', 'Managed Clients not found', ) self.licenses[self.version][License.CORPORATECLIENTS] = self.__getValue( self.keys[self.version][License.CORPORATECLIENTS], None, 'Corporate Clients', 'Corporate Clients not found', ) self.types = self.__getValue('univentionLicenseProduct', ['Univention Corporate Server'], 'License Product', 'Product attribute not found') if not isinstance(self.types, list | tuple): self.types = [self.types] self.types = list(self.types) self.oemProductTypes = self.__getValue('univentionLicenseOEMProduct', [], 'License Type', 'univentionLicenseOEMProduct attribute not found') if not isinstance(self.oemProductTypes, list | tuple): self.oemProductTypes = [self.oemProductTypes] self.types.extend(self.oemProductTypes) self.endDate = self.__getValue('univentionLicenseEndDate', None, 'License end date', 'univentionLicenseEndDate attribute not found')
_license = License() # for compatibility select = _license.select init_select = _license.init_select is_valid_for = _license.isValidFor