#!/usr/bin/python3
# SPDX-FileCopyrightText: 2021-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from __future__ import annotations
import json
import os
from contextlib import contextmanager
from dbm import gnu as gdbm
from pwd import getpwnam
from typing import TYPE_CHECKING, Any
from univention.ldap_cache.cache.backend import Caches, LdapCache, Shard, _s
from univention.ldap_cache.log import debug, log
if TYPE_CHECKING:
from collections.abc import Iterator
MAX_FAIL_COUNT = 5
[docs]
class GdbmCaches(Caches):
def _add_sub_cache(self, name: str, single_value: bool, reverse: bool) -> GdbmCache:
db_file = os.path.join(self._directory, '%s.db' % name)
debug('Using GDBM %s', name)
cache = GdbmCache(name, single_value, reverse)
cache.db_file = db_file
self._caches[name] = cache
return cache
[docs]
class GdbmCache(LdapCache):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.fail_count = 0
super().__init__(*args, **kwargs)
log('%s - Recreating!', self.name)
def _fix_permissions(self) -> None:
listener_uid = getpwnam('listener').pw_uid
os.chown(self.db_file, listener_uid, -1)
os.chmod(self.db_file, 0o640)
[docs]
@contextmanager
def writing(self, writer: Any | None = None) -> Iterator[Any]:
if writer is not None:
yield writer
else:
if not os.path.exists(self.db_file):
self.clear()
writer = gdbm.open(self.db_file, 'csu')
try:
yield writer
finally:
writer.close()
reading = writing
[docs]
def save(self, key: str, values: list[str]) -> None:
with self.writing() as writer:
if self.reverse:
for value in values:
current = self.get(value, writer) or []
if key in current:
continue
debug('%s - Adding %s %r', self.name, value, key)
current.append(key)
writer[value] = json.dumps(current)
else:
self.delete(key, values, writer)
if not values:
return
debug('%s - Saving %s %r', self.name, key, values)
if self.single_value:
writer[key] = values[0]
else:
writer[key] = json.dumps(values)
[docs]
def clear(self) -> None:
log('%s - Clearing whole DB!', self.name)
gdbm.open(self.db_file, 'nu').close()
self._fix_permissions()
[docs]
def cleanup(self) -> None:
with self.writing() as db:
try:
db.reorganize()
except gdbm.error:
if self.fail_count > MAX_FAIL_COUNT:
raise
self.fail_count += 1
log('%s - Cleaning up DB FAILED %s times', self.name, self.fail_count)
else:
log('%s - Cleaning up DB WORKED', self.name)
self.fail_count = 0
self._fix_permissions()
[docs]
def delete(self, key: str, values: list[str], writer: Any = None) -> None:
debug('%s - Delete %s', self.name, key)
with self.writing(writer) as writer:
if self.reverse:
for value in values:
current = self.get(value, writer) or []
try:
current.remove(key)
except ValueError:
continue
writer[value] = json.dumps(current)
else:
try:
del writer[key]
except KeyError:
pass
[docs]
def keys(self) -> Iterator[str]:
with self.reading() as reader:
key = _s(reader.firstkey())
while key is not None:
yield key
key = _s(reader.nextkey(key))
def __iter__(self) -> Iterator[tuple[str, Any]]:
with self.reading() as reader:
for key in self.keys():
yield key, self.get(key, reader)
[docs]
def get(self, key: str, reader: Any = None) -> Any:
with self.reading(reader) as reader:
try:
value = reader[key]
except KeyError:
if self.single_value:
return None
return []
if self.single_value:
return _s(value)
elif value:
return _s(json.loads(value))
[docs]
def load(self) -> dict[str, Any]:
debug('%s - Loading', self.name)
return dict(list(self))
[docs]
class GdbmShard(Shard):
key = 'dn'