Source code for univention.ldap_cache.cache.backend.lmdb_cache
#!/usr/bin/python3
# SPDX-FileCopyrightText: 2021-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
from __future__ import annotations
import os
from contextlib import contextmanager
from pwd import getpwnam
from typing import TYPE_CHECKING, Any
import lmdb
from univention.ldap_cache.cache.backend import Caches, LdapCache, Shard
if TYPE_CHECKING:
from collections.abc import Iterator
[docs]
class LmdbCaches(Caches):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.env = lmdb.open(self._directory, 2 ** 32 - 1, max_dbs=128)
self._fix_permissions(self._directory)
def _fix_permissions(self, db_directory: str) -> None:
listener_uid = getpwnam('listener').pw_uid
os.chown(os.path.join(db_directory, 'data.mdb'), listener_uid, -1)
os.chown(os.path.join(db_directory, 'lock.mdb'), listener_uid, -1)
os.chmod(os.path.join(db_directory, 'data.mdb'), 0o640)
os.chmod(os.path.join(db_directory, 'lock.mdb'), 0o640)
def _add_sub_cache(self, name: str, single_value: bool, reverse: bool) -> LmdbCache:
sub_db = self.env.open_db(name, dupsort=not single_value)
cache = LmdbCache(name, single_value, reverse)
cache.env = self.env
cache.sub_db = sub_db
self._caches[name] = cache
return cache
[docs]
class LmdbCache(LdapCache):
[docs]
@contextmanager
def writing(self, writer: Any | None = None) -> Iterator[Any]:
if writer is not None:
yield writer
else:
with self.env.begin(self.sub_db, write=True) as writer:
yield writer
[docs]
def save(self, key: str, values: list[str]) -> None:
with self.writing() as writer:
self.delete(key, writer)
for value in values:
writer.put(key, value)
[docs]
def clear(self) -> None:
with self.env.begin(write=True) as writer:
writer.drop(self.sub_db, delete=False)
[docs]
def cleanup(self) -> None:
pass
[docs]
def delete(self, key: str, writer: Any = None) -> None:
with self.writing(writer) as writer:
writer.delete(key)
[docs]
@contextmanager
def reading(self) -> Iterator[Any]:
with self.env.begin(self.sub_db) as txn, txn.cursor() as cursor:
yield cursor
def __iter__(self) -> Iterator[tuple[str, Any]]:
with self.reading() as reader:
yield from reader
[docs]
def get(self, key: str) -> Any:
with self.reading() as reader:
if self.single_value:
return reader.get(key)
else:
reader.set_key(key)
return list(reader.iternext_dup())
[docs]
def load(self) -> dict[str, Any]:
ret: dict[str, Any] = {}
with self._load_key_translations() as translations, self.reading() as reader:
for key in reader.iternext_nodup():
translated = translations.get(key)
if translated is None:
continue
ret[translated] = self.get(key)
return ret
@contextmanager
def _load_key_translations(self) -> Iterator[Any]:
entry_uuid_db = self.env.open_db('EntryUUID', dupsort=False)
with self.env.begin(entry_uuid_db) as txn:
yield txn
[docs]
class LmdbShard(Shard):
key = 'entryUUID'