#!/usr/bin/python3
# SPDX-FileCopyrightText: 2004-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""adduser part for the command line interface"""
from __future__ import annotations
import getopt
import os
import subprocess
from logging import getLogger
from ldap.filter import filter_format
import univention.admin.config
import univention.admin.handlers.computers.windows
import univention.admin.handlers.groups.group
import univention.admin.handlers.users.user
import univention.admin.modules
import univention.admin.uldap
import univention.config_registry
import univention.logging
log = getLogger('ADMIN')
[docs]
def status(msg: str) -> str:
# univention-adduser is called by Samba when doing "vampire." Since
# vampire produces a lot of output, and we'd like to print a moderate
# log, we prepend UNIVENTION to our output. That way we can identify
# distinguish them from all of Samba's log messages
out = 'UNIVENTION %s' % (msg,)
return out
[docs]
def nscd_invalidate(table: str) -> None:
if table:
log.debug('NSCD: --invalidate %s', table)
try:
subprocess.check_call(['/usr/sbin/nscd', '--invalidate', table], close_fds=True)
except (OSError, subprocess.CalledProcessError):
log.debug('NSCD: failed')
else:
log.debug('NSCD: ok')
[docs]
def get_user_object(user: str, position: univention.admin.uldap.position, lo: univention.admin.uldap.access) -> univention.admin.modules.UdmModule | str:
try:
# user Account
return univention.admin.modules.lookup(univention.admin.handlers.users.user, None, lo, scope='domain', base=position.getDn(), filter=filter_format('(username=%s)', [user]), required=True, unique=True)[0]
except Exception:
# machine Account
for handler in [univention.admin.handlers.computers.windows, univention.admin.handlers.computers.domaincontroller_master, univention.admin.handlers.computers.domaincontroller_slave, univention.admin.handlers.computers.domaincontroller_backup, univention.admin.handlers.computers.memberserver]:
try:
return univention.admin.modules.lookup(handler, None, lo, scope='domain', base=position.getDn(), filter=filter_format('(uid=%s)', [user]), required=True, unique=True)[0]
except Exception: # noqa: S112
continue
return 'ERROR: account not found, nothing modified'
[docs]
def doit(arglist):
configRegistry = univention.config_registry.ConfigRegistry()
configRegistry.load()
structured = configRegistry.is_true('directory/manager/cmd/debug/structured-logging', False)
univention.logging.basicConfig(filename='/var/log/univention/directory-manager-cmd.log', univention_debug_level=1, use_structured_logging=structured)
out: list[str] = []
op = 'add'
scope = 'user'
cmd = os.path.basename(arglist[0])
if cmd == 'univention-addgroup':
scope = 'group'
op = 'add'
elif cmd == 'univention-deluser':
scope = 'user'
op = 'del'
elif cmd == 'univention-delgroup':
scope = 'group'
op = 'del'
elif cmd == 'univention-addmachine':
scope = 'machine'
op = 'add'
elif cmd == 'univention-delmachine':
scope = 'machine'
op = 'del'
elif cmd == 'univention-setprimarygroup':
scope = 'user'
op = 'primarygroup'
_opts, args = getopt.getopt(arglist[1:], '', ['status-fd=', 'status-fifo='])
try:
lo, position = univention.admin.uldap.getAdminConnection()
except Exception as exc:
log.warning('authentication error: %s', exc)
try:
lo, position = univention.admin.uldap.getMachineConnection()
except Exception as exc2:
log.warning('authentication error: %s', exc2)
out.append('authentication error: %s' % (exc,))
out.append('authentication error: %s' % (exc2,))
return out
univention.admin.modules.update()
if len(args) == 1:
if scope == 'machine':
machine = args[0]
machine = machine.removesuffix('$')
if configRegistry.get('samba/defaultcontainer/computer'):
position.setDn(configRegistry['samba/defaultcontainer/computer'])
else:
position.setDn(univention.admin.config.getDefaultContainer(lo, 'computers/windows'))
elif scope == 'group':
group = args[0]
if configRegistry.get('samba/defaultcontainer/group'):
position.setDn(configRegistry['samba/defaultcontainer/group'])
else:
position.setDn(univention.admin.config.getDefaultContainer(lo, 'groups/group'))
else:
user = args[0]
if configRegistry.get('samba/defaultcontainer/user'):
position.setDn(configRegistry['samba/defaultcontainer/user'])
else:
position.setDn(univention.admin.config.getDefaultContainer(lo, 'users/user'))
action = op + scope
elif len(args) == 2:
user, group = args
if op == 'del':
action = 'deluserfromgroup'
elif op == 'primarygroup':
action = 'setprimarygroup'
else:
action = 'addusertogroup'
else:
return out
if action == 'adduser':
out.append(status('Adding user %s' % (user,)))
object = univention.admin.handlers.users.user.object(None, lo, position=position)
object.open()
object['username'] = user
object['lastname'] = user.encode('utf-8').decode('ASCII')
object['password'] = subprocess.check_output(['/usr/bin/makepasswd', '--minchars=8'], close_fds=True).strip().decode('ASCII', 'ignore')
object['primaryGroup'] = univention.admin.config.getDefaultValue(lo, 'group')
object.create()
nscd_invalidate('passwd')
elif action == 'deluser':
out.append(status('Removing user %s' % (user,)))
object = univention.admin.modules.lookup(univention.admin.handlers.users.user, None, lo, scope='domain', base=position.getDomain(), filter=filter_format('(username=%s)', [user]), required=True, unique=True)[0]
object.open()
object.remove()
nscd_invalidate('passwd')
elif action == 'addgroup':
out.append(status('Adding group %s' % (group,)))
object = univention.admin.handlers.groups.group.object(None, lo, position=position)
object.open()
object.options = ['posix']
object['name'] = group
object.create()
nscd_invalidate('group')
elif action == 'delgroup':
out.append(status('Removing group %s' % (group,)))
object = univention.admin.modules.lookup(univention.admin.handlers.groups.group, None, lo, scope='domain', base=position.getDomain(), filter=filter_format('(name=%s)', [group]), required=True, unique=True)[0]
object.open()
object.remove()
nscd_invalidate('group')
elif action == 'addusertogroup':
if group in configRegistry.get('samba/addusertogroup/filter/group', '').split(','):
out.append(status('addusertogroup: filter protects group "%s"' % (group,)))
return out
out.append(status('Adding user %s to group %s' % (user, group)))
groupobject = univention.admin.modules.lookup(univention.admin.handlers.groups.group, None, lo, scope='domain', base=position.getDn(), filter=filter_format('(name=%s)', [group]), required=True, unique=True)[0]
groupobject.open()
userobject = get_user_object(user, position, lo)
if isinstance(userobject, str):
out.append(userobject)
return out
if userobject.dn not in groupobject['users']:
if groupobject['users'] == [''] or groupobject['users'] == []:
groupobject['users'] = [userobject.dn]
else:
groupobject['users'].append(userobject.dn)
groupobject.modify()
nscd_invalidate('group')
elif action == 'deluserfromgroup':
out.append(status('Removing user %s from group %s' % (user, group)))
groupobject = univention.admin.modules.lookup(univention.admin.handlers.groups.group, None, lo, scope='domain', base=position.getDn(), filter=filter_format('(name=%s)', [group]), required=True, unique=True)[0]
groupobject.open()
userobject = get_user_object(user, position, lo)
if isinstance(userobject, str):
out.append(userobject)
return out
userobject.open()
if userobject.dn in groupobject['users'] and userobject['primaryGroup'] != groupobject.dn:
groupobject['users'].remove(userobject.dn)
groupobject.modify()
nscd_invalidate('group')
elif action == 'addmachine':
out.append(status('Adding machine %s' % (machine,)))
object = univention.admin.handlers.computers.windows.object(None, lo, position=position)
object.open()
object.options = ['posix']
object['name'] = machine
object['primaryGroup'] = univention.admin.config.getDefaultValue(lo, 'computerGroup')
object.create()
nscd_invalidate('hosts')
nscd_invalidate('passwd')
elif action == 'delmachine':
out.append(status('Removing machine %s' % (machine,)))
object = univention.admin.modules.lookup(univention.admin.handlers.computers.windows, None, lo, scope='domain', base=position.getDomain(), filter=filter_format('(name=%s)', [machine]), required=True, unique=True)[0]
object.open()
object.remove()
nscd_invalidate('hosts')
elif action == 'setprimarygroup':
out.append(status('Set primary group %s for user %s' % (group, user)))
try:
groupobject = univention.admin.modules.lookup(univention.admin.handlers.groups.group, None, lo, scope='domain', base=position.getDn(), filter=filter_format('(name=%s)', [group]), required=True, unique=True)[0]
except Exception:
out.append('ERROR: group not found, nothing modified')
return out
groupobject.open()
userobject = get_user_object(user, position, lo)
if isinstance(userobject, str):
out.append(userobject)
return out
if 'samba' in userobject.options:
userobject.options.remove('samba')
userobject.open()
if userobject.has_property('primaryGroup'):
userobject['primaryGroup'] = groupobject.dn
elif userobject.has_property('machineAccountGroup'):
userobject['machineAccountGroup'] = groupobject.dn
else:
out.append('ERROR: unknown group attribute, nothing modified')
return out
userobject.modify()
if userobject.dn not in groupobject['users']:
groupobject['users'].append(userobject.dn)
groupobject.modify()
nscd_invalidate('group')
nscd_invalidate('passwd')
return out
if __name__ == '__main__':
import sys
print('\n'.join(doit(sys.argv)))