Source code for univention.management.console.modules.quota.tools

#!/usr/bin/python3
#
# Univention Management Console
#  quota module: modify quota settings
#
# SPDX-FileCopyrightText: 2006-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only


import math
import os
import re
import subprocess

import univention.management.console as umc
from univention.config_registry import handler_set
from univention.lib import fstab
from univention.management.console.config import ucr
from univention.management.console.error import UMC_Error
from univention.management.console.log import MODULE


_ = umc.Translation('univention-management-console-module-quota').translate


[docs] class UserQuota(dict): def __init__(self, partition, user, bused, bsoft, bhard, btime, fused, fsoft, fhard, ftime): self['id'] = '%s@%s' % (user, partition) self['partitionDevice'] = partition self['user'] = user self['sizeLimitUsed'] = block2byte(bused, 'MB') self['sizeLimitSoft'] = block2byte(bsoft, 'MB') self['sizeLimitHard'] = block2byte(bhard, 'MB') self['fileLimitUsed'] = fused self['fileLimitSoft'] = fsoft self['fileLimitHard'] = fhard self.set_time('sizeLimitTime', btime) self.set_time('fileLimitTime', ftime)
[docs] def set_time(self, time, value): if not value: self[time] = '-' elif value == 'none': self[time] = _('Expired') elif value.endswith('days'): self[time] = _('%s Days') % value[:-4] elif ':' in value: self[time] = value
[docs] def repquota(partition): # find filesystem type fs = fstab.File() part = fs.find(spec=partition) args = [] if part.type == 'xfs': args = ['--format', 'xfs'] # -C == do not try to resolve all users at once # -v == verbose cmd = ['/usr/sbin/repquota', '-C', '-v', partition, *args] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout, _stderr = proc.communicate() return (stdout, proc.returncode)
[docs] def repquota_parse(partition, output): result = [] if not output: return result regex = re.compile('(?P<user>[^ ]*) *[-+]+ *(?P<bused>[0-9]*) *(?P<bsoft>[0-9]*) *(?P<bhard>[0-9]*) *((?P<btime>([0-9]*days|none|[0-9]{2}:[0-9]{2})))? *(?P<fused>[0-9]*) *(?P<fsoft>[0-9]*) *(?P<fhard>[0-9]*) *((?P<ftime>([0-9]*days|none|[0-9]{2}:[0-9]{2})))?') for line in output: matches = regex.match(line) if not matches: break grp = matches.groupdict() if not grp['user'] or grp['user'] == 'root': continue quota = UserQuota(partition, grp['user'], grp['bused'], grp['bsoft'], grp['bhard'], grp['btime'], grp['fused'], grp['fsoft'], grp['fhard'], grp['ftime']) result.append(quota) return result
[docs] def setquota(partition, user, bsoft, bhard, fsoft, fhard): return subprocess.call(['/usr/sbin/setquota', '--always-resolve', '-u', user, str(bsoft), str(bhard), str(fsoft), str(fhard), partition])
[docs] class QuotaActivationError(Exception): pass
[docs] def usrquota_is_active(fstab_entry, mt=None): if not mt: try: mt = fstab.File('/etc/mtab') except OSError as error: raise QuotaActivationError(_('Could not open %s') % error.filename) mtab_entry = mt.find(spec=fstab_entry.spec) if not mtab_entry: raise QuotaActivationError(_('Device is not mounted')) # First remount the partition with option "usrquota" if it isn't already return bool(mtab_entry.hasopt('usrquota'))
[docs] def quota_is_enabled(fstab_entry): local_env = os.environ.copy() local_env["LC_MESSAGES"] = "C" cmd = ("/sbin/quotaon", "-p", "-u", fstab_entry.mount_point) p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=local_env) stdout, _stderr = p1.communicate() stdout = stdout.decode('UTF-8', 'replace') if "not found or has no quota enabled" in stdout: return False else: # match lines like "quota on / (/dev/disk/by-uuid/5bf2a723-b25a) is on" pattern = re.compile(r"user quota on %s \([^)]*\) is (on|off)" % fstab_entry.mount_point) match = pattern.match(stdout) if match: return match.group(1) == 'on' else: return None # tertium datur
[docs] def activate_quota(partition, activate): partitions = [partition] if not isinstance(partition, list) else partition result = [] try: fs = fstab.File() except OSError as error: raise UMC_Error(_('Could not open %s') % error.filename, 500) failed = [] for device in partitions: fstab_entry = fs.find(spec=device) if not fstab_entry: failed.append(_('Device %r could not be found') % (device,)) continue try: status = _do_activate_quota_partition(fs, fstab_entry, activate) except QuotaActivationError as exc: failed.append('%s: %s' % (fstab_entry.spec, exc)) continue if fstab_entry.mount_point == '/' and fstab_entry.type == 'xfs': try: enable_quota_in_kernel(activate) except QuotaActivationError as exc: failed.append('%s: %s' % (fstab_entry.spec, exc)) continue result.append(status) if failed: message = _('Failed to activate quota support: ') if activate else _('Failed to deactivate quota support: ') message += '\n'.join(failed) raise UMC_Error(message) message = _('Quota support successfully activated') if activate else _('Quota support successfully deactivated') raise UMC_Error(message, 200, {'objects': result})
def _do_activate_quota_partition(fs, fstab_entry, activate): quota_enabled = quota_is_enabled(fstab_entry) if not (activate ^ quota_enabled): return {'partitionDevice': fstab_entry.spec, 'message': _('Quota already en/disabled')} # persistently change the option in /etc/fstab: if activate: if 'usrquota' not in fstab_entry.options: fstab_entry.options.append('usrquota') else: if 'usrquota' in fstab_entry.options: fstab_entry.options.remove('usrquota') fs.save() if fstab_entry.type == 'xfs': activation_function = _activate_quota_xfs elif fstab_entry.type in ('ext2', 'ext3', 'ext4'): activation_function = _activate_quota_ext else: return {'partitionDevice': fstab_entry.spec, 'message': _('Unknown filesystem')} activation_function(fstab_entry, activate) return {'partitionDevice': fstab_entry.spec, 'message': _('Operation was successful')} def _activate_quota_xfs(fstab_entry, activate=True): if fstab_entry.mount_point != '/': if subprocess.call(('/bin/umount', fstab_entry.spec)): raise QuotaActivationError(_('Unmounting the partition has failed')) if subprocess.call(('/bin/mount', fstab_entry.spec)): raise QuotaActivationError(_('Mounting the partition has failed')) if subprocess.call(('/usr/sbin/invoke-rc.d', 'quota', 'restart')): raise QuotaActivationError(_('Restarting the quota services has failed'))
[docs] def enable_quota_in_kernel(activate): ucr.load() grub_append = ucr.get('grub/append', '') flags = [] option = 'usrquota' match = re.match(r'rootflags=([^\s]*)', grub_append) if match: flags = match.group(1).split(',') if activate and option not in flags: flags.append(option) elif not activate and option in flags: flags.remove(option) flags = ','.join(flags) if flags: flags = 'rootflags=%s' % (flags,) new_grub_append = grub_append if 'rootflags=' not in grub_append: if flags: new_grub_append = '%s %s' % (grub_append, flags) else: new_grub_append = re.sub(r'rootflags=[^\s]*', flags, grub_append) if new_grub_append != grub_append: MODULE.info('Replacing grub/append from %s to %s', grub_append, new_grub_append) handler_set(['grub/append=%s' % (new_grub_append,)]) status = _('enable') if activate else _('disable') raise QuotaActivationError(_('To %s quota support for the root filesystem the system has to be rebooted.') % (status,))
def _activate_quota_ext(fstab_entry, activate=True): if activate: # First remount the partition with option "usrquota" if it isn't already if not usrquota_is_active(fstab_entry): # Since the usrquota option is set in fstab remount will pick it up automatically if subprocess.call(('/bin/mount', '-o', 'remount', fstab_entry.spec)): raise QuotaActivationError(_('Remounting the partition has failed')) # Then make sure that quotacheck can run on the partition by running quotaoff on this partition. if subprocess.call(('/sbin/quotaoff', '-u', fstab_entry.spec)): # exit status should always be zero, even if off already raise QuotaActivationError(_('Restarting the quota services has failed')) # Run quotacheck to create the aquota.user quota file on the partition # Note: This part is the one that makes activation take some time. args = ['/sbin/quotacheck'] if fstab_entry.mount_point == '/': args.append('-m') args.extend(['-uc', fstab_entry.mount_point]) if subprocess.call(args): raise QuotaActivationError(_('Generating the quota information file failed')) # Finally turn on the quota for the partition. if subprocess.call(('/sbin/quotaon', '-u', fstab_entry.spec)): # exit status should be zero raise QuotaActivationError(_('Restarting the quota services has failed')) else: # First turn the userquota off as requested, otherwise "mount -o remount,noquota" fails. if subprocess.call(('/sbin/quotaoff', '-u', fstab_entry.spec)): # exit status should always be zero, even if off already raise QuotaActivationError(_('Restarting the quota services has failed')) # Then we could turn of the usrquota option on the partition. # Note: This is not strictly required technically, we might as well leave it on (until the machine is rebootet). # The important point is that the usrquota option has been removed from fstab, that's what /etc/init.d/quota checks. # # Note2: If the usrquota option is set in mtab but removed in fstab, then remount doesn't automatically pick it up. # # if subprocess.call(('/bin/mount', '-o', 'remount,noquota', fstab_entry.spec)): # raise QuotaActivationError(_('Remounting the partition has failed')) _units = ('B', 'KB', 'MB', 'GB', 'TB') _size_regex = re.compile('(?P<size>[0-9.]+)(?P<unit>(B|KB|MB|GB|TB))?')
[docs] def block2byte(size, convertTo, block_size=1024): size = int(size) * float(block_size) unit = 0 if convertTo in _units: while _units[unit] != convertTo: size /= 1024.0 unit += 1 return size
[docs] def byte2block(size, unit='MB', block_size=1024): factor = 0 if unit in _units: while _units[factor] != unit: factor += 1 size = float(size) * math.pow(1024, factor) return int(size / float(block_size)) else: return ''