Source code for univention.admin.handlers.groups.group

# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

"""|UDM| module for groups"""

from __future__ import annotations

import copy
import time

import ldap
from ldap.filter import filter_format

import univention.admin
import univention.admin.allocators
import univention.admin.filter
import univention.admin.handlers
import univention.admin.localization
from univention.admin import configRegistry
from univention.admin.guardian_roles import member_role_layout, member_role_properties, register_member_role_mapping
from univention.admin.layout import Group, Tab
from univention.dn import DN


translation = univention.admin.localization.translation('univention.admin.handlers.groups')
_ = translation.translate

module = 'groups/group'
operations = ['add', 'edit', 'remove', 'search', 'move', 'copy']
childs = False
short_description = _('Group')
object_name = _('Group')
object_name_plural = _('Groups')
long_description = ''
# fmt: off
options = {
    'default': univention.admin.option(
        short_description=short_description,
        default=True,
        objectClasses=['top', 'univentionGroup'],
    ),
    'posix': univention.admin.option(
        short_description=_('Posix group'),
        default=1,
        objectClasses=('posixGroup',),
    ),
    'samba': univention.admin.option(
        short_description=_('Samba group'),
        default=1,
        objectClasses=('sambaGroupMapping',),
    ),
}

property_descriptions = {
    'name': univention.admin.property(
        short_description=_('Name'),
        long_description='',
        syntax=univention.admin.syntax.gid,
        include_in_default_search=True,
        required=True,
        identifies=True,
        readonly_when_synced=True,
    ),
    'gidNumber': univention.admin.property(
        short_description=_('Group ID'),
        long_description='',
        syntax=univention.admin.syntax.integer,
        may_change=False,
        options=['posix', 'samba'],
    ),
    'sambaRID': univention.admin.property(
        short_description=_('Relative ID'),
        long_description='',
        syntax=univention.admin.syntax.integer,
        readonly_when_synced=True,
        options=['samba'],
    ),
    'sambaGroupType': univention.admin.property(
        short_description=_('Windows group type'),
        long_description='',
        syntax=univention.admin.syntax.sambaGroupType,
        default=('2', []),
        options=['samba'],
        copyable=True,
    ),
    'sambaPrivileges': univention.admin.property(
        short_description=_('Samba privilege'),
        long_description=_('Manage samba privileges'),
        syntax=univention.admin.syntax.SambaPrivileges,
        multivalue=True,
        options=['samba'],
        copyable=True,
    ),
    'adGroupType': univention.admin.property(
        short_description=_('AD group type'),
        long_description=_('Active Directory group type'),
        syntax=univention.admin.syntax.adGroupType,
        options=['samba'],
        default=('-2147483646', []),
        dontsearch=True,
        readonly_when_synced=True,
        copyable=True,
    ),
    'description': univention.admin.property(
        short_description=_('Description'),
        long_description='',
        syntax=univention.admin.syntax.string,
        include_in_default_search=True,
        options=['posix', 'samba'],
        readonly_when_synced=True,
        copyable=True,
    ),
    'users': univention.admin.property(
        short_description=_('Users'),
        long_description='',
        syntax=univention.admin.syntax.UserDN,
        multivalue=True,
        options=['posix'],
        dontsearch=True,
        readonly_when_synced=True,
        copyable=True,
    ),
    'hosts': univention.admin.property(
        short_description=_('Hosts'),
        long_description='',
        syntax=univention.admin.syntax.HostDN,
        multivalue=True,
        options=['posix'],
        license=['UGS', 'UCS'],
        dontsearch=True,
        readonly_when_synced=True,
        copyable=True,
    ),
    'mailAddress': univention.admin.property(
        short_description=_('Mail address'),
        long_description='',
        syntax=univention.admin.syntax.emailAddressValidDomain,
        include_in_default_search=True,
        options=['posix'],
        readonly_when_synced=True,
    ),
    'memberOf': univention.admin.property(
        short_description=_('Member of'),
        long_description='',
        syntax=univention.admin.syntax.GroupDN,
        multivalue=True,
        options=['posix'],
        dontsearch=True,
        readonly_when_synced=True,
        copyable=True,
    ),
    'nestedGroup': univention.admin.property(
        short_description=_('Groups'),
        long_description='',
        syntax=univention.admin.syntax.GroupDN,
        multivalue=True,
        options=['posix'],
        dontsearch=True,
        readonly_when_synced=True,
        copyable=True,
    ),
    'allowedEmailUsers': univention.admin.property(
        short_description=_('Users that are allowed to send e-mails to the group'),
        long_description='',
        syntax=univention.admin.syntax.UserDN,
        multivalue=True,
        options=['posix'],
        dontsearch=True,
        copyable=True,
    ),
    'allowedEmailGroups': univention.admin.property(
        short_description=_('Groups that are allowed to send e-mails to the group'),
        long_description='',
        syntax=univention.admin.syntax.GroupDN,
        multivalue=True,
        options=['posix'],
        dontsearch=True,
        copyable=True,
    ),
    'univentionSourceIAM': univention.admin.property(
        short_description=_('Immutable Identifier of the source IAM'),
        long_description=_('Immutable attribute referencing the name of the source IAM'),
        syntax=univention.admin.syntax.string,
        may_change=False,
        dontsearch=True,
    ),
}

property_descriptions.update(member_role_properties())

layout = [
    Tab(_('General'), _('Basic settings'), layout=[
        Group(_('Group account'), layout=[
            ['name', 'description'],
        ]),
        Group(_('Members of this group'), layout=[
            'users',
            'nestedGroup',
        ]),
    ]),
    Tab(_('Mail'), _('Mail settings of this group'), advanced=True, layout=[
        'mailAddress',
        'allowedEmailUsers',
        'allowedEmailGroups',
    ]),
    Tab(_('Host members'), _('Host members of this group'), advanced=True, layout=[
        'hosts',
    ]),
    Tab(_('Member of'), _('Membership in other groups'), advanced=True, layout=[
        'memberOf',
    ]),
    Tab(_('Group ID'), _('ID of this group'), advanced=True, layout=[
        'gidNumber',
    ]),
    Tab(_('Windows'), _('Windows account settings'), advanced=True, layout=[
        'sambaRID',
        'sambaGroupType',
        'adGroupType',
        'sambaPrivileges',
    ]),
    Tab('Apps'),  # not translated!
]

layout.append(member_role_layout())
# fmt: on


[docs] def unmapSambaRid(oldattr): sid = oldattr.get('sambaSID', [b''])[0].decode('ASCII') sid, has_rid, rid = sid.rpartition('-') if has_rid and rid.isdigit(): return rid
# fmt: off mapping = univention.admin.mapping.mapping() mapping.register('name', 'cn', None, univention.admin.mapping.ListToString) mapping.register('gidNumber', 'gidNumber', None, univention.admin.mapping.ListToString, encoding='ASCII') mapping.register('description', 'description', None, univention.admin.mapping.ListToString) mapping.register('sambaGroupType', 'sambaGroupType', None, univention.admin.mapping.ListToString) mapping.register('mailAddress', 'mailPrimaryAddress', None, univention.admin.mapping.ListToString, encoding='ASCII') mapping.register('adGroupType', 'univentionGroupType', None, univention.admin.mapping.ListToString) mapping.register('sambaPrivileges', 'univentionSambaPrivilegeList', encoding='ASCII') mapping.register('allowedEmailUsers', 'univentionAllowedEmailUsers') mapping.register('allowedEmailGroups', 'univentionAllowedEmailGroups') mapping.registerUnmapping('sambaRID', unmapSambaRid) mapping.register('univentionSourceIAM', 'univentionSourceIAM', None, univention.admin.mapping.ListToString) register_member_role_mapping(mapping) # fmt: on
[docs] class AgingCache: def __new__(type, *args, **kwargs): # Falls es noch keine Instanz dieser Klasse gibt, wird eine erstellt und in _the_instance abgelegt. # Diese wird dann jedes mal zurückgegeben. if '_the_instance' not in type.__dict__: type._the_instance = object.__new__(type, *args, **kwargs) return type._the_instance def __init__(self) -> None: if '_ready' not in dir(self): self._ready = True self.timeout = 300 self.data = {} self.timer = {}
[docs] def is_valid(self, item): if item in self.timer: if self.timer.get(item, -1) > time.time(): return True del self.timer[item] del self.data[item] return False
[docs] def get(self, item): return self.data.get(item, {})
[docs] def set(self, item, data): if not isinstance(data, dict): raise Exception('AgingCache.set() requires a dict as data value') self.data[item] = copy.deepcopy(data) self.timer[item] = time.time() + self.timeout
[docs] def remove(self, item): if item in self.timer: del self.timer[item] del self.data[item]
[docs] def set_timeout(self, timeout): self.timer = timeout
cache_uniqueMember = AgingCache()
[docs] class object(univention.admin.handlers.simpleLdap): module = module
[docs] def open(self) -> None: univention.admin.handlers.simpleLdap.open(self) try: caching_timeout = int(configRegistry.get('directory/manager/web/modules/groups/group/caching/uniqueMember/timeout', '300')) self.cache_uniqueMember.set_timeout(caching_timeout) except Exception: pass if self.exists(): self['memberOf'] = self.lo.authz_connection.searchDn(filter=filter_format('(&(objectClass=posixGroup)(uniqueMember=%s))', [self.dn])) time_start = time.time() self['users'] = [] self['hosts'] = [] self['nestedGroup'] = [] for i in [x.decode('utf-8') for x in self.oldattr.get('uniqueMember', [])]: if cache_uniqueMember.is_valid(i): membertype = cache_uniqueMember.get(i).get('type') if membertype == 'user': self['users'].append(i) elif membertype == 'group': self['nestedGroup'].append(i) elif membertype == 'host': self['hosts'].append(i) elif i.startswith('uid='): self['users'].append(i) cache_uniqueMember.set(i, {'type': 'user'}) else: result = self.lo.authz_connection.getAttr(i, 'objectClass') if result: if b'univentionGroup' in result: self['nestedGroup'].append(i) cache_uniqueMember.set(i, {'type': 'group'}) elif b'univentionHost' in result: self['hosts'].append(i) cache_uniqueMember.set(i, {'type': 'host'}) elif set(result) & {b'person', b'inetOrgPerson', b'organizationalPerson'}: self['users'].append(i) elif b'univentionUserTemplate' in result: continue else: raise RuntimeError('%s not detected: %r' % (i, result)) time_end = time.time() self.log.debug('groups/group: open(): member check duration: %1.2fs', time_end - time_start) self.save()
[docs] def fast_member_add(self, memberdnlist, uidlist): ml = [] uids = set() members = set() searchResult = self.lo.authz_connection.get(self.dn, attr=['uniqueMember', 'memberUid']) if searchResult: uids = {x.decode('UTF-8').lower() for x in searchResult.get('memberUid', [])} members = {x.decode('UTF-8').lower() for x in searchResult.get('uniqueMember', [])} add_uidlist = [uid for uid in uidlist if uid.lower() not in uids] if add_uidlist: ml.append(('memberUid', b'', [x.encode('UTF-8') for x in add_uidlist])) add_memberdnlist = [dn for dn in memberdnlist if dn.lower() not in members] if add_memberdnlist: ml.append(('uniqueMember', b'', [x.encode('UTF-8') for x in add_memberdnlist])) if ml: try: return self.lo.authz_connection.modify(self.dn, ml) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject(self.dn) except ldap.INSUFFICIENT_ACCESS: raise univention.admin.uexceptions.permissionDenied() except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(msg.args[0]['desc']) # return True if object has been modified return bool(ml)
[docs] def fast_member_remove(self, memberdnlist, uidlist, ignore_license=False, _retry_on_attribute_error=True): ml = [] uids = {} members = set() searchResult = self.lo.authz_connection.get(self.dn, attr=['uniqueMember', 'memberUid']) if searchResult: uids = {x.decode('UTF-8').lower(): x.decode('UTF-8') for x in searchResult.get('memberUid', [])} members = {x.decode('UTF-8').lower() for x in searchResult.get('uniqueMember', [])} remove_uidlist = [uids[uid.lower()] for uid in uidlist if uid.lower() in uids] if remove_uidlist: ml.append(('memberUid', [x.encode('UTF-8') for x in remove_uidlist], b'')) remove_memberdnlist = [dn for dn in memberdnlist if dn.lower() in members] if remove_memberdnlist: ml.append(('uniqueMember', [x.encode('UTF-8') for x in remove_memberdnlist], b'')) if ml: try: try: return self.lo.authz_connection.modify(self.dn, ml, exceptions=True, ignore_license=ignore_license) except ldap.NO_SUCH_ATTRIBUTE: # maybe this is the refint overlay: # uniqueMember has already been removed. lets try again, probably with just memberUid... if not _retry_on_attribute_error: raise return self.fast_member_remove(memberdnlist, uidlist, ignore_license=ignore_license, _retry_on_attribute_error=False) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject(self.dn) except ldap.INSUFFICIENT_ACCESS: raise univention.admin.uexceptions.permissionDenied() except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(msg.args[0]['desc']) # return True if object has been modified return bool(ml)
def _check_uid_gid_uniqueness(self) -> None: if not configRegistry.is_true('directory/manager/uid_gid/uniqueness', True): return if 'posix' in self.options or 'samba' in self.options: fg = univention.admin.filter.expression('uidNumber', self['gidNumber'], escape=True) user_objects = univention.admin.handlers.users.user.lookup(self.co, self.lo, filter_s=fg) if user_objects: raise univention.admin.uexceptions.gidNumberAlreadyUsedAsUidNumber(repr(self['gidNumber'])) def _ldap_pre_ready(self) -> None: super()._ldap_pre_ready() # get lock for name if self['name'] and (not self.exists() or self.hasChanged('name') and self['name'].lower() != self.oldinfo['name'].lower()): try: self.request_lock('groupName', self['name']) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.groupNameAlreadyUsed(self['name']) # get lock for mailPrimaryAddress if self['mailAddress'] and (not self.exists() or self.hasChanged('mailAddress') and self['mailAddress'].lower() != self.oldinfo.get('mailAddress', '').lower()): try: self.request_lock('mailPrimaryAddress', self['mailAddress']) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.mailAddressUsed(self['mailAddress']) def _ldap_pre_create(self) -> None: super()._ldap_pre_create() self.request_unique('gidNumber') self.check_for_group_recursion() self._check_uid_gid_uniqueness() def _ldap_pre_modify(self) -> None: super()._ldap_pre_modify() self.check_for_group_recursion() self.check_ad_group_type_change() if self.hasChanged('gidNumber'): # this should never happen, as gidNumber is marked as unchangeable self._check_uid_gid_uniqueness() def _ldap_addlist(self) -> list: al = super()._ldap_addlist() if 'posix' not in self.options: al.append(('objectClass', b'organizationalRole')) # any STRUCTURAL class with 'cn' return al def _ldap_modlist(self) -> list: ml = univention.admin.handlers.simpleLdap._ldap_modlist(self) t1 = time.time() self._samba_sid = None if 'samba' in self.options: # samba privileges if self.hasChanged('sambaPrivileges'): o = self.oldattr.get('objectClass', []) # add univentionSambaPrivileges objectclass if self['sambaPrivileges'] and b'univentionSambaPrivileges' not in o: ml.insert(0, ('objectClass', b'', b'univentionSambaPrivileges')) # samba SID if self['gidNumber'] and not self.exists() or self.hasChanged('sambaRID'): sid = self.__generate_group_sid(self['gidNumber']) ml.append(('sambaSID', self.oldattr.get('sambaSID', [b'']), [sid.encode('ASCII')])) self._samba_sid = sid old = set(self.oldinfo.get('users', []) + self.oldinfo.get('hosts', []) + self.oldinfo.get('nestedGroup', [])) new = set(self.info.get('users', []) + self.info.get('hosts', []) + self.info.get('nestedGroup', [])) if old != new: # create lists for uniqueMember entries to be added or removed uniqueMemberAdd = DN.set(new - old) uniqueMemberRemove = DN.set(old - new) # old and new might contain the same DNs (as str) but with different case. # After unifying the strings to DN objects there might be identical DNs that are # subtracted from uniqueMemberAdd and uniqueMemberRemove. sameMembers = uniqueMemberAdd & uniqueMemberRemove if sameMembers: uniqueMemberRemove = uniqueMemberRemove - sameMembers uniqueMemberAdd = uniqueMemberAdd - sameMembers def getUidList(uniqueMembers: list[DN]) -> list[str]: result = [] for uniqueMember in uniqueMembers: dn = uniqueMember._dn[0] member = next((x[1] for x in dn if x[0].lower() == 'uid'), None) if member is not None: result.append(member) else: # UID is not stored in DN --> fetch UID by DN uid_list = self.lo.authz_connection.getAttr(uniqueMember.dn, 'uid') # a group have no uid attribute, see Bug #12644 if uid_list: result.append(uid_list[0].decode('UTF-8')) if len(uid_list) > 1: self.log.warning('A groupmember has multiple UIDs', dn=uniqueMember, uid=uid_list) return result # calling keepCase is not necessary as the LDAP server already handles the case when removing elements # TODO: removable? def keepCase(members, oldMembers): mapping = {x.lower(): x for x in oldMembers} return [mapping.get(member.lower(), member) for member in members] # create lists for memberUid entries to be added or removed memberUidAdd = getUidList(uniqueMemberAdd) memberUidRemove = getUidList(uniqueMemberRemove) if uniqueMemberRemove: uniqueMemberRemove = [x.encode('UTF-8') for x in DN.values(uniqueMemberRemove)] ml.append(('uniqueMember', uniqueMemberRemove, '')) if uniqueMemberAdd: uniqueMemberAdd = [x.encode('UTF-8') for x in DN.values(uniqueMemberAdd)] ml.append(('uniqueMember', '', uniqueMemberAdd)) oldMemberUids = [x.decode('UTF-8') for x in self.oldattr.get('memberUid', ())] if memberUidRemove: memberUidRemove = keepCase(memberUidRemove, oldMemberUids) memberUidRemove = [x.encode('UTF-8') for x in memberUidRemove] ml.append(('memberUid', memberUidRemove, '')) if memberUidAdd: memberUidAdd = [x.encode('UTF-8') for x in memberUidAdd] ml.append(('memberUid', '', memberUidAdd)) self.log.debug('groups/group: _ldap_modlist(): %.3fs', time.time() - t1) return ml def _ldap_post_create(self) -> None: super()._ldap_post_create() self.__update_membership() def _ldap_post_modify(self) -> None: super()._ldap_post_modify() self.__update_membership() old_sid = self.oldattr.get('sambaSID', [b''])[0].decode('ASCII') if self._samba_sid and self._samba_sid != old_sid: for dn, attr in self.lo.authz_connection.search(ldap.filter.filter_format('(sambaPrimaryGroupSID=%s)', [old_sid]), attr=['sambaPrimaryGroupSID']): self.lo.authz_connection.modify(dn, [('sambaPrimaryGroupSID', attr.get('sambaPrimaryGroupSID', []), [self._samba_sid.encode('ASCII')])]) def _ldap_pre_remove(self) -> None: super()._ldap_pre_remove() self.open() # is this group in mentioned in settings/default? try: _dn, attrs = self.lo.authz_connection.search(filter='objectClass=univentionDefault', base=self.position.getDomain(), scope='domain', unique=True, required=True)[0] except ldap.NO_SUCH_OBJECT: pass else: for attr, value in attrs.items(): if attr.lower().endswith('group') and self.dn.encode('UTF-8') in value: raise univention.admin.uexceptions.primaryGroupUsed(_('It is used as %s.') % attr) gidNum = None groupSid = None if 'posix' in self.old_options: gidNum = self.oldattr['gidNumber'][0].decode('ASCII') if self.lo.authz_connection.searchDn(base=self.position.getDomain(), filter=filter_format('(&(objectClass=person)(gidNumber=%s))', [gidNum]), scope='domain'): raise univention.admin.uexceptions.primaryGroupUsed(gidNum) if 'samba' in self.old_options: groupSid = self.oldattr['sambaSID'][0].decode('ASCII') if self.lo.authz_connection.searchDn( base=self.position.getDomain(), filter=filter_format('(&(objectClass=person)(sambaPrimaryGroupSID=%s))', [groupSid]), scope='domain', ): raise univention.admin.uexceptions.primaryGroupUsed(groupSid) if gidNum: self.alloc.append(('gidNumber', gidNum)) if groupSid: self.alloc.append(('sid', groupSid)) self.alloc.append(('groupName', self.oldattr['cn'][0].decode('UTF-8'))) if self.oldattr.get('mailPrimaryAddress'): self.alloc.append(('mailPrimaryAddress', self.oldattr['mailPrimaryAddress'][0].decode('UTF-8'))) def _ldap_post_remove(self) -> None: super()._ldap_post_remove() for group in self.info.get('memberOf', []): if isinstance(group, list): group = group[0] members = [x.decode('UTF-8') for x in self.lo.authz_connection.getAttr(group, 'uniqueMember')] if not self.__case_insensitive_in_list(self.dn, members): continue newmembers = copy.deepcopy(members) newmembers = self.__case_insensitive_remove_from_list(self.dn, newmembers) self.log.debug('remove from supergroup', group=group, dn=self.dn) self.__set_membership_attributes(group, members, newmembers) def _ldap_post_move(self, olddn: str) -> None: super()._ldap_post_move(olddn) settings_module = univention.admin.modules._get('settings/default') settings_object = univention.admin.objects.get(settings_module, None, self.lo.authz_connection, position='', dn='cn=default,cn=univention,%s' % self.lo.base, authz=False) settings_object.open() for attr in ['defaultGroup', 'defaultMemberServerGroup', 'defaultClientGroup', 'defaultDomainControllerMBGroup', 'defaultDomainControllerGroup', 'defaultComputerGroup']: if settings_object[attr].lower() == olddn.lower(): settings_object[attr] = self.dn settings_object.modify() for group in self.info.get('memberOf', []): if isinstance(group, list): group = group[0] members = [x.decode('UTF-8') for x in self.lo.authz_connection.getAttr(group, 'uniqueMember')] if not self.__case_insensitive_in_list(olddn, members): continue newmembers = copy.deepcopy(members) newmembers = self.__case_insensitive_remove_from_list(olddn, newmembers) newmembers.append(self.dn) self.log.debug('updating supergroup', group=group, dn=self.dn) self.__set_membership_attributes(group, members, newmembers) def __update_membership(self) -> None: if self.exists(): old_groups = self.oldinfo.get('memberOf', []) old_name = self.oldinfo.get('name', '') new_name = self.info.get('name', '') else: old_groups = [] old_name = '' new_name = '' # rewrite membership attributes in "supergroup" if we have a new name (rename) if old_name and old_name != new_name: self.log.debug('groups/group: rewrite memberuid after rename') for group in self.info.get('memberOf', []): if isinstance(group, list): group = group[0] members = [x.decode('UTF-8') for x in self.lo.authz_connection.getAttr(group, 'uniqueMember')] newmembers = copy.deepcopy(members) newmembers = self.__case_insensitive_remove_from_list(self.old_dn, newmembers) newmembers.append(self.dn) self.__set_membership_attributes(group, members, newmembers) add_to_group = [] remove_from_group = [] for group in old_groups: if group and not self.__case_insensitive_in_list(group, self.info.get('memberOf', [])): remove_from_group.append(group) for group in self.info.get('memberOf', []): if group and not self.__case_insensitive_in_list(group, old_groups): add_to_group.append(group) for group in add_to_group: if isinstance(group, list): group = group[0] members = [x.decode('UTF-8') for x in self.lo.authz_connection.getAttr(group, 'uniqueMember')] if self.__case_insensitive_in_list(self.dn, members): continue newmembers = copy.deepcopy(members) newmembers.append(self.dn) self.log.debug('add to supergroup', group=group, dn=self.dn) self.__set_membership_attributes(group, members, newmembers) for group in remove_from_group: if isinstance(group, list): group = group[0] members = [x.decode('UTF-8') for x in self.lo.authz_connection.getAttr(group, 'uniqueMember')] newmembers = copy.deepcopy(members) if self.__case_insensitive_in_list(self.dn, members): newmembers = self.__case_insensitive_remove_from_list(self.dn, newmembers) if self.__case_insensitive_in_list(self.old_dn, newmembers): newmembers = self.__case_insensitive_remove_from_list(self.old_dn, newmembers) if members != newmembers: self.log.debug('remove from supergroup', group=group, dn=self.dn) self.__set_membership_attributes(group, members, newmembers) def __set_membership_attributes(self, group, members, newmembers): members_bytes = [x.encode('UTF-8') for x in members] newmembers_bytes = [x.encode('UTF-8') for x in newmembers] self.lo.authz_connection.modify(group, [('uniqueMember', members_bytes, newmembers_bytes)]) # don't set the memberUid attribute for nested groups, see Bug #11868 # uids = self.lo.authz_connection.getAttr( group, 'memberUid' ) # newuids = map(lambda x: x[x.find('=') + 1: x.find(',')], newmembers) # self.lo.authz_connection.modify( group, [ ( 'memberUid', uids, newuids ) ] ) @staticmethod def __case_insensitive_in_list(dn, members): return dn.lower() in (x.lower() for x in members) @staticmethod def __case_insensitive_remove_from_list(dn, members): dn_lower = dn.lower() return [x for x in members if x.lower() != dn_lower]
[docs] def check_for_group_recursion(self) -> None: # perform check only if membership of groups has changed if not self.hasChanged('memberOf') and not self.hasChanged('nestedGroup'): return # perform check only if enabled via UCR if configRegistry.is_false('directory/manager/web/modules/groups/group/checks/circular_dependency', False): return grpdn2childgrpdns = {} grp_module = univention.admin.modules._get('groups/group') cn = self.info.get('name', 'UNKNOWN') # test self dependency # ==> nestedGroup or memberOf contains self.dn for field in ('nestedGroup', 'memberOf'): if self.dn.lower() in (x.lower() for x in self.info.get(field, [])): raise univention.admin.uexceptions.circularGroupDependency('%s ==> %s' % (cn, cn)) # test short dependencies: A -> B -> A # ==> intersection of nestedGroup and memberOf is not empty set_nestedGroup = {x.lower() for x in self.info.get('nestedGroup', [])} set_memberOf = {x.lower() for x in self.info.get('memberOf', [])} set_intersection = set_nestedGroup & set_memberOf if set_intersection: childdn = next(iter(set_intersection)) # get cn for first detected object childobj = univention.admin.objects.get(grp_module, self.co, self.lo, position='', dn=childdn) childcn = childobj.info.get('name', 'UNKNOWN') raise univention.admin.uexceptions.circularGroupDependency('%s ==> %s ==> %s' % (childcn, cn, childcn)) # test long dependencies: A -> B -> C -> A if self.info.get('memberOf'): # TODO: FIXME: perform extended check only if self.hasChanged('memberOf') is True # if user added some groups to memberOf, the group objects specified in memberOf do not contain self as # uniqueMember (aka nestedGroup) when this test is performed. So this test has to perform the recursion check # with each member of memberOf as parent for upgrp in self.info.get('memberOf', []): for subgrp in self.info.get('nestedGroup', []): self._check_group_childs_for_recursion(grp_module, grpdn2childgrpdns, subgrp.lower(), [upgrp.lower(), self.dn.lower()]) else: for subgrp in self.info.get('nestedGroup', []): self._check_group_childs_for_recursion(grp_module, grpdn2childgrpdns, subgrp.lower(), [self.dn.lower()])
def _check_group_childs_for_recursion(self, grp_module, grpdn2childgrpdns, dn, parents=[]): if dn not in grpdn2childgrpdns: grpobj = univention.admin.objects.get(grp_module, self.co, self.lo, position='', dn=dn) grpobj.open() childs = grpobj.info.get('nestedGroup', []) grpdn2childgrpdns[dn] = childs else: childs = grpdn2childgrpdns[dn] new_parents = [*parents, dn] for childgrp in childs: if childgrp.lower() in new_parents: dnCircle = [*new_parents[new_parents.index(childgrp.lower()):], childgrp.lower()] cnCircle = [] # get missing cn's if required grpdn2cn = {self.dn.lower(): self.info.get('name', 'UNKNOWN')} for x in dnCircle: if x.lower() not in grpdn2cn: xobj = univention.admin.objects.get(grp_module, self.co, self.lo, position='', dn=x) grpdn2cn[x.lower()] = xobj.info.get('name', 'UNKNOWN') cnCircle.append(grpdn2cn[x.lower()]) raise univention.admin.uexceptions.circularGroupDependency(' ==> '.join(cnCircle)) self._check_group_childs_for_recursion(grp_module, grpdn2childgrpdns, childgrp.lower(), new_parents) def __is_groupType_universal(self, adGroupType: bytes) -> int: try: return int(adGroupType) & 0x8 except ValueError: return False def __is_groupType_global(self, adGroupType: bytes) -> int: try: return int(adGroupType) & 0x2 except ValueError: return False def __is_groupType_domain_local(self, adGroupType: bytes) -> int: try: return int(adGroupType) & 0x4 except ValueError: return False def __is_groupType_local(self, adGroupType: bytes) -> int: try: return int(adGroupType) & 0x1 except ValueError: return False def _is_global_member(self) -> bool: searchResult = self.lo.authz_connection.search(base=self.position.getDomain(), filter=filter_format('(uniqueMember=%s)', [self.dn]), attr=['univentionGroupType']) for _dn, attr in searchResult: groupType = attr.get('univentionGroupType', [None])[0] if self.__is_groupType_global(groupType): return True return False def _has_domain_local_member(self) -> bool: for member_dn in [x.decode('UTF-8') for x in self.oldattr.get('uniqueMember', [])]: searchResult = self.lo.authz_connection.getAttr(member_dn, 'univentionGroupType') if searchResult and self.__is_groupType_domain_local(searchResult[0]): return True return False def _has_universal_member(self) -> bool: for member_dn in [x.decode('UTF-8') for x in self.oldattr.get('uniqueMember', [])]: searchResult = self.lo.authz_connection.getAttr(member_dn, 'univentionGroupType') if searchResult and self.__is_groupType_universal(searchResult[0]): return True return False
[docs] def check_ad_group_type_change(self) -> None: if not self.hasChanged('adGroupType'): return old_groupType = self.oldinfo.get('adGroupType', 0) new_groupType = self.info.get('adGroupType', 0) self.log.debug('Changing groupType', old=old_groupType, new=new_groupType) if not old_groupType or not new_groupType: return if self.__is_groupType_local(old_groupType): raise univention.admin.uexceptions.adGroupTypeChangeLocalToAny if self.__is_groupType_local(new_groupType): raise univention.admin.uexceptions.adGroupTypeChangeToLocal # See for details: # http://technet.microsoft.com/en-us/library/cc755692%28v=ws.10%29.aspx if self.__is_groupType_global(old_groupType) and self.__is_groupType_domain_local(new_groupType): raise univention.admin.uexceptions.adGroupTypeChangeGlobalToDomainLocal elif self.__is_groupType_domain_local(old_groupType) and self.__is_groupType_global(new_groupType): raise univention.admin.uexceptions.adGroupTypeChangeDomainLocalToGlobal elif self.__is_groupType_global(old_groupType) and self.__is_groupType_universal(new_groupType): # Global to universal: # This conversion is allowed only if the group that you want to change is not a member of # another global scope group. if self._is_global_member(): raise univention.admin.uexceptions.adGroupTypeChangeGlobalToUniversal elif self.__is_groupType_domain_local(old_groupType) and self.__is_groupType_universal(new_groupType): # Domain local to universal: # This conversion is allowed only if the group that you want to change does not have # another domain local group as a member. if self._has_domain_local_member(): raise univention.admin.uexceptions.adGroupTypeChangeDomainLocalToUniversal elif self.__is_groupType_universal(old_groupType) and self.__is_groupType_global(new_groupType): # Universal to global: # This conversion is allowed only if the group that you want to change does not have # another universal group as a member. if self._has_universal_member(): raise univention.admin.uexceptions.adGroupTypeChangeUniversalToGlobal
def __allocate_rid(self, rid): searchResult = self.lo.authz_connection.search(filter='objectClass=sambaDomain', attr=['sambaSID']) new_groupType = self.info.get('adGroupType', 0) if self.__is_groupType_local(new_groupType): sid = 'S-1-5-32-' + self['sambaRID'] else: domainsid = searchResult[0][1]['sambaSID'][0].decode('ASCII') sid = domainsid + '-' + rid try: return self.request_lock('sid', sid) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.sidAlreadyUsed(rid) def __generate_group_sid(self, gidNum): new_groupType = self.info.get('adGroupType', 0) self.log.debug('Setting groupType', value=new_groupType) if self['sambaRID']: return self.__allocate_rid(self['sambaRID']) elif self.s4connector_present and not self.__is_groupType_local(new_groupType): # In this case Samba 4 must create the SID, the s4 connector will sync the # new sambaSID back from Samba 4. return 'S-1-4-%s' % (gidNum,) num = gidNum generateDomainLocalSid = self.__is_groupType_local(new_groupType) while True: try: groupSid = univention.admin.allocators.requestGroupSid(self.lo, self.position, num, generateDomainLocalSid=generateDomainLocalSid) self.alloc.append(('sid', groupSid)) return groupSid except univention.admin.uexceptions.noLock: num = str(int(num) + 1)
[docs] @classmethod def unmapped_lookup_filter(cls): return univention.admin.filter.conjunction('&', [ univention.admin.filter.expression('cn', '*', escape=False), univention.admin.filter.conjunction('|', [ univention.admin.filter.conjunction('&', [univention.admin.filter.expression('objectClass', 'univentionGroup')]), univention.admin.filter.conjunction('&', [univention.admin.filter.expression('objectClass', 'sambaGroupMapping')]), ]), ]) # fmt: skip
lookup = object.lookup lookup_filter = object.lookup_filter identify = object.identify