Source code for univention.ldap_cache.cache.backend.gdbm_cache

#!/usr/bin/python3
# SPDX-FileCopyrightText: 2021-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only

from __future__ import annotations

import json
import os
from contextlib import contextmanager
from dbm import gnu as gdbm
from pwd import getpwnam
from typing import TYPE_CHECKING, Any

from univention.ldap_cache.cache.backend import Caches, LdapCache, Shard, _s
from univention.ldap_cache.log import debug, log


if TYPE_CHECKING:
    from collections.abc import Iterator


MAX_FAIL_COUNT = 5


[docs] class GdbmCaches(Caches): def _add_sub_cache(self, name: str, single_value: bool, reverse: bool) -> GdbmCache: db_file = os.path.join(self._directory, '%s.db' % name) debug('Using GDBM %s', name) cache = GdbmCache(name, single_value, reverse) cache.db_file = db_file self._caches[name] = cache return cache
[docs] class GdbmCache(LdapCache): def __init__(self, *args: Any, **kwargs: Any) -> None: self.fail_count = 0 super().__init__(*args, **kwargs) log('%s - Recreating!', self.name) def _fix_permissions(self) -> None: listener_uid = getpwnam('listener').pw_uid os.chown(self.db_file, listener_uid, -1) os.chmod(self.db_file, 0o640)
[docs] @contextmanager def writing(self, writer: Any | None = None) -> Iterator[Any]: if writer is not None: yield writer else: if not os.path.exists(self.db_file): self.clear() writer = gdbm.open(self.db_file, 'csu') try: yield writer finally: writer.close()
reading = writing
[docs] def save(self, key: str, values: list[str]) -> None: with self.writing() as writer: if self.reverse: for value in values: current = self.get(value, writer) or [] if key in current: continue debug('%s - Adding %s %r', self.name, value, key) current.append(key) writer[value] = json.dumps(current) else: self.delete(key, values, writer) if not values: return debug('%s - Saving %s %r', self.name, key, values) if self.single_value: writer[key] = values[0] else: writer[key] = json.dumps(values)
[docs] def clear(self) -> None: log('%s - Clearing whole DB!', self.name) gdbm.open(self.db_file, 'nu').close() self._fix_permissions()
[docs] def cleanup(self) -> None: with self.writing() as db: try: db.reorganize() except gdbm.error: if self.fail_count > MAX_FAIL_COUNT: raise self.fail_count += 1 log('%s - Cleaning up DB FAILED %s times', self.name, self.fail_count) else: log('%s - Cleaning up DB WORKED', self.name) self.fail_count = 0 self._fix_permissions()
[docs] def delete(self, key: str, values: list[str], writer: Any = None) -> None: debug('%s - Delete %s', self.name, key) with self.writing(writer) as writer: if self.reverse: for value in values: current = self.get(value, writer) or [] try: current.remove(key) except ValueError: continue writer[value] = json.dumps(current) else: try: del writer[key] except KeyError: pass
[docs] def keys(self) -> Iterator[str]: with self.reading() as reader: key = _s(reader.firstkey()) while key is not None: yield key key = _s(reader.nextkey(key))
def __iter__(self) -> Iterator[tuple[str, Any]]: with self.reading() as reader: for key in self.keys(): yield key, self.get(key, reader)
[docs] def get(self, key: str, reader: Any = None) -> Any: with self.reading(reader) as reader: try: value = reader[key] except KeyError: if self.single_value: return None return [] if self.single_value: return _s(value) elif value: return _s(json.loads(value))
[docs] def load(self) -> dict[str, Any]: debug('%s - Loading', self.name) return dict(list(self))
[docs] class GdbmShard(Shard): key = 'dn'