Source code for univention.updater.locking

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2008-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
"""Univention Updater locking"""

from __future__ import annotations

import os
import sys
from contextlib import contextmanager
from errno import EEXIST, ENOENT, ESRCH
from time import monotonic, sleep
from typing import TYPE_CHECKING

from .errors import UpdaterException


if TYPE_CHECKING:
    from types import TracebackType


try:
    from typing import Self  # type: ignore[attr-defined]
except ImportError:
    Self = "UpdaterLock"


FN_LOCK_UP = '/var/lock/univention-updater'
FN_LOCK_APT = "/var/run/apt-get.lock"


[docs] class LockingError(UpdaterException): """ Signal other updater process running. >>> raise LockingError(1, "Invalid PID") # doctest: +ELLIPSIS,+IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... univention.updater.locking.LockingError: Another updater process 1 is currently running according to ...: Invalid PID """ def __str__(self) -> str: return "Another updater process %s is currently running according to %s: %s" % ( self.args[0], FN_LOCK_UP, self.args[1], )
[docs] class UpdaterLock: """Context wrapper for updater-lock :file:`/var/lock/univention-updater`.""" def __init__(self, timeout: int = 0) -> None: self.timeout = timeout self.lock = 0 def __enter__(self) -> Self: try: self.lock = self.updater_lock_acquire() return self except LockingError as ex: print(ex, file=sys.stderr) sys.exit(5) def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None) -> None: if not self.updater_lock_release(): print('WARNING: updater-lock already released!', file=sys.stderr)
[docs] def updater_lock_acquire(self) -> int: """ Acquire the updater-lock. :returns: 0 if it could be acquired within <timeout> seconds, >= 1 if locked by parent. :rtype: int :raises EnvironmentError: on file system access errors. :raises LockingError: on invalid PID or timeout. """ deadline = monotonic() + self.timeout lock_pid = 0 while True: try: lock_fd = os.open(FN_LOCK_UP, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) my_pid = b"%d\n" % os.getpid() bytes_written = os.write(lock_fd, my_pid) assert bytes_written == len(my_pid) os.close(lock_fd) return 0 except OSError as ex: if ex.errno != EEXIST: raise try: lock_fd = os.open(FN_LOCK_UP, os.O_RDONLY | os.O_EXCL) try: lock_pid_b = os.read(lock_fd, 11) # sizeof(s32) + len('\n') finally: os.close(lock_fd) except OSError as ex: if ex.errno != ENOENT: raise else: try: lock_pid_s = lock_pid_b.decode('ASCII').strip() except UnicodeDecodeError: raise LockingError(lock_pid_b, "Invalid PID") if not lock_pid_s: print('Empty lockfile %s, removing.' % (FN_LOCK_UP,), file=sys.stderr) os.remove(FN_LOCK_UP) continue # redo acquire try: lock_pid = int(lock_pid_s) except ValueError: raise LockingError(lock_pid_s, "Invalid PID") if lock_pid == os.getpid(): return 0 if lock_pid == os.getppid(): # u-repository-* called from u-updater return 1 try: os.kill(lock_pid, 0) except OSError as ex: if ex.errno == ESRCH: print('Stale PID %d in lockfile %s, removing.' % (lock_pid, FN_LOCK_UP), file=sys.stderr) os.remove(FN_LOCK_UP) continue # redo acquire # PID is valid and process is still alive... if monotonic() > deadline: raise LockingError(lock_pid, "Check lockfile") else: sleep(1)
[docs] def updater_lock_release(self) -> bool: """ Release the updater-lock. :returns: True if it has been unlocked (or decremented when nested), False if it was already unlocked. :rtype: bool """ if self.lock > 0: # parent process still owns the lock, do nothing and just return success return True try: os.remove(FN_LOCK_UP) return True except OSError as error: if error.errno == ENOENT: return False else: raise
[docs] @contextmanager def apt_lock(timeout=300, out=sys.stdout): """ Acquire and release lock for APT. :param timeout: Time to wait. :param out: Output stream for progress and error messages. """ for count in range(timeout, 0, -1): if not os.path.exists(FN_LOCK_APT): break print("\r%3d Waiting for updater lock %s ..." % (count, FN_LOCK_APT), end="", file=out) sleep(1) else: print("Updater is still locked: %s" % (FN_LOCK_APT,), file=out) # FIXME: Abort? open(FN_LOCK_APT, "w").close() yield None if os.path.exists(FN_LOCK_APT): os.unlink(FN_LOCK_APT)