Source code for univention.service_info

# -*- coding: utf-8 -*-
#
# Univention Configuration Registry
#  Service information: read information about registered Config Registry
#  variables
#
# Copyright 2007-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

import os
import shlex
import subprocess
from logging import getLogger
import six
try:
	from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple  # noqa: F401
except ImportError:  # pragma: no cache
	pass

import univention.info_tools as uit


[docs]class ServiceError(Exception): """Error when starting, stopping or restarting a service."""
[docs]class Service(uit.LocalizedDictionary): """ Description for a system service. """ REQUIRED = frozenset(('description', 'programs')) OPTIONAL = frozenset(('start_type', 'systemd', 'icon', 'name', 'init_script')) KNOWN = REQUIRED | OPTIONAL def __init__(self, *args, **kwargs): # type: (*Any, **Any) -> None uit.LocalizedDictionary.__init__(self, *args, **kwargs) self.running = False def __repr__(self): # type: () -> str return '%s(%s)' % ( self.__class__.__name__, dict.__repr__(self), )
[docs] def check(self): # type: () -> List[str] """Check service entry for validity, returning list of incomplete entries.""" incomplete = [key for key in self.REQUIRED if not self.get(key, None)] unknown = [key for key in self.keys() if key.lower() not in self.KNOWN] return incomplete + unknown
def _update_status(self): # type: () -> None for prog in self['programs'].split(','): prog = prog.strip() if prog and not pidof(prog): self.running = False break else: self.running = True
[docs] def start(self): # type: () -> bool """Start the service.""" return self._change_state('start')
[docs] def stop(self): # type: () -> bool """Stop the service.""" return self._change_state('stop')
[docs] def restart(self): # type: () -> bool """Restart the service.""" return self._change_state('restart')
[docs] def status(self): """Get status of the service.""" try: return self.__change_state('status')[1] except EnvironmentError: return u''
def _change_state(self, action): # type: (str) -> bool rc, output = self.__change_state(action) if rc: raise ServiceError(self.status() or output) return True def __change_state(self, action): # type: (str) -> Tuple[int, str] if self.get('init_script'): # samba currently must not be started via systemd return self._exec(('/etc/init.d/%s' % (self['init_script'],), action)) service_name = self._service_name() return self._exec(('/usr/sbin/service', service_name, action)) def _service_name(self): # type: () -> str service_name = self.get('systemd', self.get('name')) if service_name.endswith('.service'): service_name = service_name.rsplit('.', 1)[0] return service_name def _exec(self, args): # type: (Sequence[str]) -> Tuple[int, str] process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) output = process.communicate()[0] return process.returncode, output.decode('utf-8', 'replace').strip()
[docs]def pidof(name, docker='/var/run/docker.pid'): # type: (str, str) -> List[int] """ Return list of process IDs matching name. :param name: Procress name. :param docker: File name containing process ID of docker process. >>> import os,sys;os.getpid() in list(pidof(os.path.realpath(sys.executable))) + list(pidof(sys.executable)) + list(pidof(sys.argv[0])) True """ result = set() # type: Set[int] log = getLogger(__name__) children = {} # type: Dict[int, List[int]] if isinstance(docker, six.string_types): try: with open(docker, 'r') as stream: docker = int(stream.read(), 10) log.info('Found docker.pid=%d', docker) except (EnvironmentError, ValueError) as ex: log.info('No docker found: %s', ex) cmd = shlex.split(name) for proc in os.listdir('/proc'): try: pid = int(proc, 10) except ValueError: continue cmdline = os.path.join('/proc', proc, 'cmdline') try: with open(cmdline, 'rb') as fd: commandline = fd.read().rstrip(b'\x00').decode('UTF-8', 'replace') link = os.readlink(os.path.join('/proc', proc, 'exe')) except EnvironmentError: continue # kernel thread if not commandline: continue if docker: stat = os.path.join('/proc', proc, 'stat') status = None try: with open(stat, 'rb') as fd: status = fd.readline() ppid = int(status[status.rfind(b')') + 2:].split()[1], 10) children.setdefault(ppid, []).append(pid) except (EnvironmentError, ValueError) as ex: log.error('Failed getting parent: %s: %r', ex, status) def _running(): # type: () -> Iterator[bool] yield cmd == [link] args = commandline.split('\x00') if '\x00' in commandline else shlex.split(commandline) yield len(cmd) == 1 and cmd[0] in args # FIXME: it detects "vim /usr/sbin/service" as running process! yield len(cmd) > 1 and all(a == c for a, c in zip(args, cmd)) if any(_running()): log.info('found %d: %r', pid, commandline) result.add(pid) if docker: remove = children.pop(docker, []) while remove: pid = remove.pop() log.debug('Removing docker child %s', pid) result.discard(pid) remove += children.pop(pid, []) return list(result)
[docs]class ServiceInfo(object): BASE_DIR = '/etc/univention/service.info' SERVICES = 'services' CUSTOMIZED = '_customized' FILE_SUFFIX = '.cfg' def __init__(self, install_mode=False): # type: (bool) -> None self.services = {} # type: Dict if not install_mode: self.__load_services() self.update_services()
[docs] def update_services(self): # type: () -> None """Update the run state of all services.""" for serv in self.services.values(): serv._update_status()
[docs] def check_services(self): # type: () -> Dict[str, List[str]] """ Check service descriptions for completeness. :returns: dictionary of incomplete service descriptions. """ incomplete = {} # type: Dict[str, List[str]] for name, srv in self.services.items(): miss = srv.check() if miss: incomplete[name] = miss return incomplete
[docs] def write_customized(self): # type: () -> bool """Save service cusomization.""" filename = os.path.join(ServiceInfo.BASE_DIR, ServiceInfo.SERVICES, ServiceInfo.CUSTOMIZED) try: with open(filename, 'w') as fd: cfg = uit.UnicodeConfig() for name, srv in self.services.items(): cfg.add_section(name) for key in srv.keys(): items = srv.normalize(key) for item, value in items.items(): cfg.set(name, item, value) cfg.write(fd) return True except EnvironmentError: return False
[docs] def read_services(self, filename=None, package=None, override=False): # type: (Optional[str], Optional[str], bool) -> None """ Read start/stop levels of services. :param filename: Explicit filename for loading. :param package: Explicit package name. :param override: `True` to overwrite already loaded descriptions. :raises AttributeError: if neither `filename` nor `package` are given. """ if not filename: if not package: raise AttributeError("neither 'filename' nor 'package' is specified") filename = os.path.join(ServiceInfo.BASE_DIR, ServiceInfo.SERVICES, package + ServiceInfo.FILE_SUFFIX) cfg = uit.UnicodeConfig() cfg.read(filename) for sec in cfg.sections(): # service already known? if not override and sec in self.services: continue srv = Service() srv['name'] = sec for name, value in cfg.items(sec): srv[name] = value for path in srv.get('programs', '').split(','): # "programs" defines the "/proc/self/cmdline" of the service, # not the executable, therefore we test for a leading "/": # check if it is a real file split to remove parameters if path.startswith('/') and not os.path.exists(path.split(' ', 1)[0]): break # ==> do not execute else else: self.services[sec] = srv
def __load_services(self): # type: () -> None """Load definition of all defined services.""" path = os.path.join(ServiceInfo.BASE_DIR, ServiceInfo.SERVICES) for entry in os.listdir(path): # customized service descrptions are read afterwards if entry == ServiceInfo.CUSTOMIZED: continue cfgfile = os.path.join(path, entry) if os.path.isfile(cfgfile) and cfgfile.endswith(ServiceInfo.FILE_SUFFIX): self.read_services(cfgfile) # read modified/added service descriptions self.read_customized()
[docs] def read_customized(self): # type: () -> None """Read service cusomization.""" custom = os.path.join(ServiceInfo.BASE_DIR, ServiceInfo.SERVICES, ServiceInfo.CUSTOMIZED) self.read_services(custom, override=True)
[docs] def get_services(self): # type: () -> Iterable[str] """ Return a list fo service names. :returns: List of service names. """ return self.services.keys()
[docs] def get_service(self, name): # type: (str) -> Optional[Service] """ Return the service object associated with the given name. :param name: Service name. :returns: description object or `None`. """ return self.services.get(name, None)
[docs] def add_service(self, name, service): # type: (str, Service) -> None """ Add a new service object or overrides an old entry. :param name: Service name. :param service: :py:class:`Service` instance. """ if not service.check(): self.services[name] = service
if __name__ == '__main__': import doctest doctest.testmod()