Source code for univention.admin.license

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
|UDM| wrapper around :py:mod:`univention.license` that translates error codes to exceptions
"""

import collections
from ldap.filter import filter_format
import univention.license
import univention.debug as ud
import univention.admin.modules
import univention.admin.filter
import univention.admin.uexceptions
import univention.admin.localization
import univention.admin.license_data as licenses
from univention.admin._ucr import configRegistry
from univention.lib.misc import custom_username

translation = univention.admin.localization.translation('univention/admin')
_ = translation.translate

_license = None

LDAP_FILTER_normal_user_account = '(univentionObjectType=users/user)'
LDAP_FILTER_account_not_disabled = '(!(&(shadowExpire=1)(krb5KDCFlags:1.2.840.113556.1.4.803:=128)(|(sambaAcctFlags=[UD       ])(sambaAcctFlags=[ULD       ]))))'
LDAP_FILTER_managedclients = '(|(objectClass=univentionWindows)(objectclass=univentionUbuntuClient)(objectClass=univentionLinuxClient)(objectClass=univentionCorporateClient)(objectClass=univentionMacOSClient))'


[docs]def ldap_filter_not_objectflag(flag_string_list): ldap_filter_parts = [] for flag_string in flag_string_list: ldap_filter_parts.append(filter_format('(univentionObjectFlag=%s)', [flag_string])) if not ldap_filter_parts: return '' elif len(ldap_filter_parts) == 1: return '(!%s)' % ''.join(ldap_filter_parts) else: return '(!(|%s))' % ''.join(ldap_filter_parts)
user_exclude_objectflags = ['temporary', 'functional', 'hidden'] managedclient_exclude_objectflags = [] if configRegistry.is_true('ad/member'): user_exclude_objectflags.append('synced') managedclient_exclude_objectflags.append('synced')
[docs]class License(object): (ACCOUNT, CLIENT, DESKTOP, GROUPWARE) = range(4) (USERS, SERVERS, MANAGEDCLIENTS, CORPORATECLIENTS, VIRTUALDESKTOPUSERS, VIRTUALDESKTOPCLIENTS) = range(6) SYSACCOUNTS = 5 def __init__(self): if _license: raise Exception('never create this object directly') self.new_license = False self.disable_add = 0 self._expired = False self.endDate = None self.oemProductTypes = [] self.licenseBase = None self.types = [] self.version = '1' self.searchResult = None self.sysAccountNames = ( custom_username('Administrator'), 'ucs-sso', 'join-backup', 'join-slave', 'spam', 'oxadmin', 'krbtgt', 'pcpatch', # opsi app 'opsiconfd', # opsi app custom_username('Guest'), 'dns-*', 'http-%s' % configRegistry.get('hostname'), 'http-proxy-%s' % configRegistry.get('hostname'), 'zarafa-%s' % configRegistry.get('hostname'), custom_username('SBSMonAcct'), # SBS account custom_username('Network Administrator'), # SBS role custom_username('Standard User'), # SBS role custom_username('WebWorkplaceTools'), # SBS role "Standard User with administration links" 'IUSR_WIN-*', # IIS account ) self.sysAccountsFound = 0 self.licenses = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: None, License.CLIENT: None, License.DESKTOP: None, License.GROUPWARE: None, }, '2': { # Version 2 since UCS 3.1 License.USERS: None, License.SERVERS: None, License.MANAGEDCLIENTS: None, License.CORPORATECLIENTS: None, }, } self.real = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 0, License.CLIENT: 0, License.DESKTOP: 0, License.GROUPWARE: 0, }, '2': { # Version 2 since UCS 3.1 License.USERS: 0, License.SERVERS: 0, License.MANAGEDCLIENTS: 0, License.CORPORATECLIENTS: 0, }, } self.names = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 'Accounts', License.CLIENT: 'Clients', License.DESKTOP: 'Desktops', License.GROUPWARE: 'Groupware Accounts', }, '2': { # Version 2 since UCS 3.1 License.USERS: 'Users', License.SERVERS: 'Servers', License.MANAGEDCLIENTS: 'Managed Clients', License.CORPORATECLIENTS: 'Corporate Clients', }, } self.keys = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: 'univentionLicenseAccounts', License.CLIENT: 'univentionLicenseClients', License.DESKTOP: 'univentionLicenseuniventionDesktops', License.GROUPWARE: 'univentionLicenseGroupwareAccounts' }, '2': { # Version 1 till UCS 3.1 License.USERS: 'univentionLicenseUsers', License.SERVERS: 'univentionLicenseServers', License.MANAGEDCLIENTS: 'univentionLicenseManagedClients', License.CORPORATECLIENTS: 'univentionLicenseCorporateClients', }, } self.filters = { '1': { # Version 1 till UCS 3.1 License.ACCOUNT: '(&(|(&(objectClass=posixAccount)(objectClass=shadowAccount))(objectClass=sambaSamAccount))(!(uidNumber=0))(!(uid=*$))(!(&(shadowExpire=1)(krb5KDCFlags=254)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))))', License.CLIENT: '(|(objectClass=univentionThinClient)(objectClass=univentionClient)(objectClass=univentionMobileClient)(objectClass=univentionWindows)(objectClass=univentionMacOSClient))', License.DESKTOP: '(|(objectClass=univentionThinClient)(&(objectClass=univentionClient)(objectClass=posixAccount))(objectClass=univentionMobileClient))', License.GROUPWARE: '(&(objectclass=kolabInetOrgPerson)(kolabHomeServer=*)(!(&(shadowExpire=1)(krb5KDCFlags=254)(|(sambaAcctFlags=[UD ])(sambaAcctFlags=[ULD ])))))', }, '2': { # Version 2 since UCS 3.1 License.USERS: '(&%s)' % ''.join([LDAP_FILTER_normal_user_account, ldap_filter_not_objectflag(user_exclude_objectflags), LDAP_FILTER_account_not_disabled]), License.SERVERS: '(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(!(univentionObjectFlag=docker)))', # Managed Clients, Windows Clients, Ubuntu Clients, Linux Clients, MaxOS X Clients License.MANAGEDCLIENTS: '(&%s)' % ''.join([LDAP_FILTER_managedclients, ldap_filter_not_objectflag(managedclient_exclude_objectflags)]), License.CORPORATECLIENTS: '(&(objectclass=univentionCorporateClient))', }, } self.__selected = False def _load_license_via_c_module(self, module): return univention.license.select(module) def _load_license_via_python(self, module, lo): # Try to set the version even if the license load was not successful self.searchResult = lo.search(filter=filter_format('(&(objectClass=univentionLicense)(univentionLicenseModule=%s))', [module])) if self.searchResult: self.version = self.searchResult[0][1].get('univentionLicenseVersion', [b'1'])[0].decode('ASCII')
[docs] def select(self, module, lo=None): if not self.__selected: self.error = self._load_license_via_c_module(module) if self.error != 0 and lo: self._load_license_via_python(module, lo) self.set_values(lo, module) self.__raiseException() self.__selected = True
[docs] def isValidFor(self, module): ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: check license for module %s, %r' % (module, self.types)) if module in licenses.modules: mlics = licenses.modules[module] ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: module license: %r' % (mlics,)) # empty list -> valid return mlics.valid(self.types) # unknown modules are always valid (e.g. customer modules) return True
[docs] def modifyOptions(self, mod): if mod in licenses.modules: opts = licenses.modules[mod].options(self.types) if opts: module = univention.admin.modules.modules[mod] if module and hasattr(module, 'options'): ud.debug(ud.ADMIN, ud.INFO, 'modifyOptions: %r' % (opts,)) for opt, val in opts: if callable(val): val = val(self) if isinstance(val, collections.Sequence): module.options[opt].disabled, module.options[opt].default = val ud.debug(ud.ADMIN, ud.INFO, 'modifyOption: %s, %d, %d' % (opt, module.options[opt].disabled, module.options[opt].default))
[docs] def checkModules(self): deleted_mods = [] for mod in univention.admin.modules.modules.keys(): # remove module if valid license is missing if self.isValidFor(mod): ud.debug(ud.ADMIN, ud.INFO, 'update: License is valid for module %s!!' % (mod,)) # check module options according to given license type self.modifyOptions(mod) else: ud.debug(ud.ADMIN, ud.INFO, 'update: License is NOT valid for module %s!!' % (mod,)) del univention.admin.modules.modules[mod] deleted_mods.append(mod) # remove child modules that were deleted because of an invalid license for name, mod in univention.admin.modules.modules.items(): if hasattr(mod, 'childmodules'): new = [] for child in mod.childmodules: if child in deleted_mods: continue new.append(child) mod.childmodules = new # remove operations for adding or modifying if license is expired if self._expired: for name, mod in univention.admin.modules.modules.items(): if hasattr(mod, 'operations'): try: mod.operations.remove('add') except Exception: pass try: mod.operations.remove('edit') except Exception: pass
def __cmp(self, x, y): """ Replacement for built-in function cmp that was removed in Python 3 Compare the two objects x and y and return an integer according to the outcome. The return value is negative if x < y, zero if x == y and strictly positive if x > y. """ return (x > y) - (x < y) def __cmp_gt(self, val1, val2): return self.compare(val1, val2) == 1 def __cmp_eq(self, val1, val2): return self.compare(val1, val2) == 0
[docs] def compare(self, val1, val2): if val1 == 'unlimited' and val2 == 'unlimited': return 0 if val1 == 'unlimited': return 1 if val2 == 'unlimited': return -1 return self.__cmp(int(val1), int(val2))
[docs] def set_values(self, lo, module): self.__readLicense() disable_add = 0 self.__countSysAccounts(lo) if self.new_license: lic = None real = None if self.version == '1': self.__countObject(License.ACCOUNT, lo) self.__countObject(License.CLIENT, lo) self.__countObject(License.DESKTOP, lo) self.__countObject(License.GROUPWARE, lo) lic = ( self.licenses[self.version][License.ACCOUNT], self.licenses[self.version][License.CLIENT], self.licenses[self.version][License.DESKTOP], self.licenses[self.version][License.GROUPWARE]) real = ( self.real[self.version][License.ACCOUNT], self.real[self.version][License.CLIENT], self.real[self.version][License.DESKTOP], self.real[self.version][License.GROUPWARE]) elif self.version == '2': self.__countObject(License.USERS, lo) self.__countObject(License.SERVERS, lo) self.__countObject(License.MANAGEDCLIENTS, lo) self.__countObject(License.CORPORATECLIENTS, lo) lic = ( self.licenses[self.version][License.USERS], self.licenses[self.version][License.SERVERS], self.licenses[self.version][License.MANAGEDCLIENTS], self.licenses[self.version][License.CORPORATECLIENTS]) real = ( self.real[self.version][License.USERS], self.real[self.version][License.SERVERS], self.real[self.version][License.MANAGEDCLIENTS], self.real[self.version][License.CORPORATECLIENTS]) self.licenseKeyID = self.__getValue('univentionLicenseKeyID', '') self.licenseSupport = self.__getValue('univentionLicenseSupport', '0') self.licensePremiumSupport = self.__getValue('univentionLicensePremiumSupport', '0') disable_add = self.checkObjectCounts(lic, real) self.licenseBase = self.__getValue('univentionLicenseBaseDN', '') if disable_add: self._expired = True elif not disable_add and self.licenseBase in ('Free for personal use edition', 'UCS Core Edition'): disable_add = 5 # check modules list for validity and accepted operations self.checkModules() return disable_add
[docs] def init_select(self, lo, module): self.select(module, lo) return self.set_values(lo, module)
[docs] def checkObjectCounts(self, lic, real): disable_add = 0 if self.version == '1': lic_account, lic_client, lic_desktop, lic_groupware = lic real_account, real_client, real_desktop, real_groupware = real if lic_client and lic_account: if self.__cmp_gt(lic_account, lic_client) and self.__cmp_gt(real_client, lic_client): disable_add = 1 elif self.__cmp_gt(lic_client, lic_account) and self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 elif self.__cmp_eq(lic_client, lic_account): if self.__cmp_gt(real_client, lic_client): disable_add = 1 elif self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 else: if lic_client and self.__cmp_gt(real_client, lic_client): disable_add = 1 if lic_account and self.__cmp_gt(int(real_account) - max(License.SYSACCOUNTS, self.sysAccountsFound), lic_account): disable_add = 2 if lic_desktop: if real_desktop and self.__cmp_gt(real_desktop, lic_desktop): ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: 3') disable_add = 3 if lic_groupware: if real_groupware and self.__cmp_gt(real_groupware, lic_groupware): ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: 4') disable_add = 4 elif self.version == '2': lic_users, lic_servers, lic_managedclients, lic_corporateclients, = lic real_users, real_servers, real_managedclients, real_corporateclients, = real if lic_users and self.__cmp_gt(int(real_users) - self.sysAccountsFound, lic_users): disable_add = 6 # The license should be valid even if we have more servers than the license allowed # if lic_servers and self.__cmp_gt( real_servers, lic_servers ): # disable_add = 7 if lic_managedclients and self.__cmp_gt(real_managedclients, lic_managedclients): disable_add = 8 if lic_corporateclients and self.__cmp_gt(real_corporateclients, lic_corporateclients): disable_add = 9 return disable_add
def __countSysAccounts(self, lo): version = self.version if version not in self.licenses: version = '2' if self.licenses[version][License.USERS] == 'unlimited': self.sysAccountsFound = 0 return userfilter = [univention.admin.filter.expression('uid', account) for account in self.sysAccountNames] filter = univention.admin.filter.conjunction('&', [ univention.admin.filter.conjunction('|', userfilter), self.filters[version][License.USERS]]) try: self.sysAccountsFound = len(lo.searchDn(filter=str(filter))) except univention.admin.uexceptions.noObject: pass ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: Univention sysAccountsFound: %d' % (self.sysAccountsFound,)) def __countObject(self, obj, lo): version = self.version if version not in self.licenses: version = '2' if self.licenses[version][obj] and not self.licenses[version][obj] == 'unlimited': result = lo.searchDn(filter=self.filters[version][obj]) if result is None: self.real[version][obj] = 0 else: self.real[version][obj] = len(result) ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: Univention %s real %d' % (self.names[version][obj], self.real[version][obj])) else: self.real[version][obj] = 0 def __raiseException(self): if self.error != 0: if self.error == -1: raise univention.admin.uexceptions.licenseNotFound() elif self.error == 2: raise univention.admin.uexceptions.licenseExpired() elif self.error == 4: raise univention.admin.uexceptions.licenseWrongBaseDn() else: raise univention.admin.uexceptions.licenseInvalid() def __getValue(self, key, default, name='', errormsg=''): name = name or key try: value = univention.license.getValue(key) self.new_license = True ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: Univention %r allowed %r' % (name, value)) except (KeyError, Exception): if self.searchResult: value = self.searchResult[0][1].get(key, [default]) value = [x.decode('ASCII') if isinstance(x, bytes) else x for x in value] if not isinstance(default, list): value = value[0] self.new_license = True else: ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: %r: %s' % (name, errormsg,)) value = default ud.debug(ud.ADMIN, ud.INFO, 'LICENSE: %r = %r' % (name, value)) return value def __readLicense(self): self.version = self.__getValue('univentionLicenseVersion', '1', 'Version', None) if self.version == '1': self.licenses[self.version][License.ACCOUNT] = self.__getValue(self.keys[self.version][License.ACCOUNT], None, 'Accounts', 'Univention Accounts not found') self.licenses[self.version][License.CLIENT] = self.__getValue(self.keys[self.version][License.CLIENT], None, 'Clients', 'Univention Clients not found') self.licenses[self.version][License.DESKTOP] = self.__getValue(self.keys[self.version][License.DESKTOP], 2, 'Desktops', 'Univention Desktops not found') self.licenses[self.version][License.GROUPWARE] = self.__getValue(self.keys[self.version][License.GROUPWARE], 2, 'Groupware Accounts', 'Groupware not found') # if no type field is found it must be an old UCS license (<=1.3-0) self.types = self.__getValue('univentionLicenseType', ['UCS'], 'License Type', 'Type attribute not found') if not isinstance(self.types, (list, tuple)): self.types = [self.types] self.types = list(self.types) # handle license type "OXAE" the same way as license type "UCS" if 'OXAE' in self.types and 'UCS' not in self.types: self.types.append('UCS') elif self.version == '2': self.licenses[self.version][License.USERS] = self.__getValue(self.keys[self.version][License.USERS], None, 'Users', 'Users not found') self.licenses[self.version][License.SERVERS] = self.__getValue(self.keys[self.version][License.SERVERS], None, 'Servers', 'Servers not found') self.licenses[self.version][License.MANAGEDCLIENTS] = self.__getValue(self.keys[self.version][License.MANAGEDCLIENTS], None, 'Managed Clients', 'Managed Clients not found') self.licenses[self.version][License.CORPORATECLIENTS] = self.__getValue(self.keys[self.version][License.CORPORATECLIENTS], None, 'Corporate Clients', 'Corporate Clients not found') self.types = self.__getValue('univentionLicenseProduct', ['Univention Corporate Server'], 'License Product', 'Product attribute not found') if not isinstance(self.types, (list, tuple)): self.types = [self.types] self.types = list(self.types) self.oemProductTypes = self.__getValue('univentionLicenseOEMProduct', [], 'License Type', 'univentionLicenseOEMProduct attribute not found') if not isinstance(self.oemProductTypes, (list, tuple)): self.oemProductTypes = [self.oemProductTypes] self.types.extend(self.oemProductTypes) self.endDate = self.__getValue('univentionLicenseEndDate', None, 'License end date', 'univentionLicenseEndDate attribute not found')
_license = License() # for compatibility select = _license.select init_select = _license.init_select is_valid_for = _license.isValidFor