Source code for univention.management.console.modules.top
#!/usr/bin/python3
#
# Univention Management Console
# module: process overview
#
# SPDX-FileCopyrightText: 2011-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import time
import psutil
from univention.lib.i18n import Translation
from univention.management.console.log import MODULE
from univention.management.console.modules import Base, UMC_Error
from univention.management.console.modules.decorators import sanitize, simple_response
from univention.management.console.modules.sanitizers import (
ChoicesSanitizer, IntegerSanitizer, ListSanitizer, PatternSanitizer,
)
_ = Translation('univention-management-console-module-top').translate
[docs]
class Instance(Base):
[docs]
@sanitize(
pattern=PatternSanitizer(default='.*'),
category=ChoicesSanitizer(choices=['user', 'pid', 'command', 'all'], default='all'),
)
@simple_response
def query(self, pattern, category='all'):
processes = []
for process in psutil.process_iter():
try:
cpu_time = process.cpu_times()
proc = {
'timestamp': time.time(),
'cpu_time': cpu_time.user + cpu_time.system,
'user': process.username(),
'pid': process.pid,
'cpu': 0.0,
'mem': process.memory_percent(),
'command': ' '.join(process.cmdline() or []) or process.name(),
}
except psutil.NoSuchProcess:
continue
categories = [category]
if category == 'all':
categories = ['user', 'pid', 'command']
if any(pattern.match(str(proc[cat])) for cat in categories):
processes.append(proc)
# Calculate correct cpu percentage
time.sleep(1)
for process_entry in processes:
try:
process = psutil.Process(process_entry['pid'])
cpu_time = process.cpu_times()
except psutil.NoSuchProcess:
continue
elapsed_time = time.time() - process_entry.pop('timestamp')
elapsed_cpu_time = cpu_time.user + cpu_time.system - process_entry.pop('cpu_time')
cpu_percent = (elapsed_cpu_time / elapsed_time) * 100
process_entry['cpu'] = cpu_percent
return processes
[docs]
@sanitize(
signal=ChoicesSanitizer(choices=['SIGTERM', 'SIGKILL']),
pid=ListSanitizer(IntegerSanitizer()),
)
@simple_response
def kill(self, signal, pid):
failed = []
for pid_ in pid:
try:
process = psutil.Process(pid_)
if signal == 'SIGTERM':
process.terminate()
elif signal == 'SIGKILL':
process.kill()
except psutil.NoSuchProcess as exc:
failed.append(str(pid_))
MODULE.error('Could not %s pid %s: %s', signal, pid_, exc)
if failed:
failed = ', '.join(failed)
raise UMC_Error(_('No process found with PID %s') % (failed))