#!/usr/bin/python3
#
# Univention S4 Connector
# s4 cache
#
# SPDX-FileCopyrightText: 2014-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import base64
import sqlite3
from logging import getLogger
from univention.logging import Structured
log = Structured(getLogger("LDAP").getChild(__name__))
def _encode_base64(val):
return base64.b64encode(val).decode('ASCII')
[docs]
class EntryDiff:
def __init__(self, old, new):
self.old = old
self.new = new
if not old:
old = {}
if not new:
new = {}
self.set_old = set(old.keys())
self.set_new = set(new.keys())
self.intersect = self.set_new.intersection(self.set_old)
[docs]
def added(self):
return self.set_new - self.intersect
[docs]
def removed(self):
return self.set_old - self.intersect
[docs]
def changed(self):
return {o for o in self.intersect if set(self.old[o]) != set(self.new[o])}
[docs]
class S4Cache:
"""
Local cache for the current Samba 4 state of the s4connector.
With this cache the connector has the possibility to create
a diff between the new Samba 4 object and the old one from
cache.
"""
def __init__(self, filename):
self.filename = filename
self._dbcon = sqlite3.connect(self.filename)
self.s4cache = {}
self.__create_tables()
[docs]
def add_entry(self, guid, entry):
if not self._guid_exists(guid):
self._add_entry(guid, entry)
else:
self._update_entry(guid, entry)
self.s4cache[guid] = entry
[docs]
def diff_entry(self, old_entry, new_entry):
result = {'added': None, 'removed': None, 'changed': None}
diff = EntryDiff(old_entry, new_entry)
result['added'] = diff.added()
result['removed'] = diff.removed()
result['changed'] = diff.changed()
return result
[docs]
def get_entry(self, guid):
entry = {}
guid_id = self._get_guid_id(guid)
if not guid_id:
return None
# The SQLite Python module should do the escaping, that's
# the reason why we use the tuple ? syntax.
# I've chosen the str call because I want to make sure
# that we use the same SQL value as before switching
# to the tuple ? syntax
sql_commands = [
(
"SELECT ATTRIBUTES.attribute,data.value from data \
inner join ATTRIBUTES ON data.attribute_id=attributes.id where guid_id = ?;",
(str(guid_id),),
),
]
rows = self.__execute_sql_commands(sql_commands, fetch_result=True)
if not rows:
return None
for line in rows:
if not entry.get(line[0]):
entry[str(line[0])] = []
entry[line[0]].append(base64.b64decode(line[1]))
return entry
[docs]
def remove_entry(self, guid):
guid_id = self._get_guid_id(guid)
if not guid_id:
return
sql_commands = [
("DELETE FROM data WHERE guid_id=?;", (str(guid_id),)),
("DELETE FROM guids WHERE id=?;", (str(guid_id),)),
]
self.__execute_sql_commands(sql_commands, fetch_result=False)
def __execute_sql_commands(self, sql_commands, fetch_result=False):
for _i in [1, 2]:
try:
cur = self._dbcon.cursor()
for sql_command in sql_commands:
if isinstance(sql_command, tuple):
log.trace("S4Cache: Execute SQL command: '%s', '%s'", sql_command[0], sql_command[1])
cur.execute(sql_command[0], sql_command[1])
else:
log.trace("S4Cache: Execute SQL command: '%s'", sql_command)
cur.execute(sql_command)
self._dbcon.commit()
if fetch_result:
rows = cur.fetchall()
cur.close()
if fetch_result:
log.trace("S4Cache: Return SQL result: '%s'", rows)
return rows
return None
except sqlite3.Error as exp:
log.warning("S4Cache: sqlite: %s. SQL command was: %s", exp, sql_commands)
if self._dbcon:
self._dbcon.close()
self._dbcon = sqlite3.connect(self.filename)
def __create_tables(self):
sql_commands = [
"CREATE TABLE IF NOT EXISTS GUIDS (id INTEGER PRIMARY KEY, guid TEXT);",
"CREATE TABLE IF NOT EXISTS ATTRIBUTES (id INTEGER PRIMARY KEY, attribute TEXT);",
"CREATE TABLE IF NOT EXISTS DATA (id INTEGER PRIMARY KEY, guid_id INTEGER, attribute_id INTEGER, value TEXT);",
"CREATE INDEX IF NOT EXISTS data_foreign_keys ON data(guid_id, attribute_id);",
"CREATE INDEX IF NOT EXISTS attributes_attribute ON attributes(attribute);",
"CREATE INDEX IF NOT EXISTS guids_guid ON guids(guid);",
]
self.__execute_sql_commands(sql_commands, fetch_result=False)
def _guid_exists(self, guid):
return self._get_guid_id(guid.strip()) is not None
def _get_guid_id(self, guid):
sql_commands = [
("SELECT id FROM GUIDS WHERE guid=?;", (str(guid),)),
]
rows = self.__execute_sql_commands(sql_commands, fetch_result=True)
if rows:
return rows[0][0]
return None
def _append_guid(self, guid):
sql_commands = [
("INSERT INTO GUIDS(guid) VALUES(?);", (str(guid),)),
]
self.__execute_sql_commands(sql_commands, fetch_result=False)
def _get_attr_id(self, attr):
sql_commands = [
("SELECT id FROM ATTRIBUTES WHERE attribute=?;", (str(attr),)),
]
rows = self.__execute_sql_commands(sql_commands, fetch_result=True)
if rows:
return rows[0][0]
return None
def _attr_exists(self, guid):
return self._get_attr_id(guid) is not None
def _create_attr(self, attr):
sql_commands = [
("INSERT INTO ATTRIBUTES(attribute) VALUES(?);", (str(attr),)),
]
self.__execute_sql_commands(sql_commands, fetch_result=False)
def _get_attr_id_and_create_if_not_exists(self, attr):
attr_id = self._get_attr_id(attr)
if not attr_id:
self._create_attr(attr)
attr_id = self._get_attr_id(attr)
return attr_id
def _add_entry(self, guid, entry):
guid = guid.strip()
self._append_guid(guid)
guid_id = self._get_guid_id(guid)
sql_commands = []
for attr in entry.keys():
attr_id = self._get_attr_id_and_create_if_not_exists(attr)
for value in entry[attr]:
sql_commands.append(
(
"INSERT INTO DATA(guid_id,attribute_id,value) VALUES(?,?,?);",
(str(guid_id), str(attr_id), _encode_base64(value)),
),
)
if sql_commands:
self.__execute_sql_commands(sql_commands, fetch_result=False)
def _update_entry(self, guid, entry):
guid = guid.strip()
guid_id = self._get_guid_id(guid)
old_entry = self.get_entry(guid)
diff = self.diff_entry(old_entry, entry)
sql_commands = []
for attribute in diff['removed']:
sql_commands.append(
(
"DELETE FROM data WHERE data.id IN (\
SELECT data.id FROM DATA INNER JOIN ATTRIBUTES ON data.attribute_id=attributes.id \
where attributes.attribute=? and guid_id=? \
);",
(str(attribute), str(guid_id)),
),
)
for attribute in diff['added']:
attr_id = self._get_attr_id_and_create_if_not_exists(attribute)
for value in entry[attribute]:
sql_commands.append(
(
"INSERT INTO DATA(guid_id,attribute_id,value) VALUES(?,?,?);",
(str(guid_id), str(attr_id), _encode_base64(value)),
),
)
for attribute in diff['changed']:
attr_id = self._get_attr_id_and_create_if_not_exists(attribute)
for value in set(old_entry.get(attribute)) - set(entry.get(attribute)):
sql_commands.append(
(
"DELETE FROM data WHERE data.id IN (\
SELECT data.id FROM DATA INNER JOIN ATTRIBUTES ON data.attribute_id=attributes.id \
where attributes.id=? and guid_id = ? and value = ? \
);",
(str(attr_id), str(guid_id), _encode_base64(value)),
),
)
for value in set(entry.get(attribute)) - set(old_entry.get(attribute)):
sql_commands.append(
(
"INSERT INTO DATA(guid_id,attribute_id,value) VALUES(?,?,?);",
(str(guid_id), str(attr_id), _encode_base64(value)),
),
)
if sql_commands:
self.__execute_sql_commands(sql_commands, fetch_result=False)
if __name__ == '__main__':
print('Starting S4cache test example ', end=' ')
s4cache = S4Cache('cache.sqlite')
guid = '1234'
entry = {
'attr1': [b'foobar'],
'attr2': [b'val1', b'val2', b'val3'],
}
s4cache.add_entry(guid, entry)
entry_old = s4cache.get_entry(guid)
diff_entry = s4cache.diff_entry(entry_old, entry)
if diff_entry.get('changed') or diff_entry.get('removed') or diff_entry.get('added'):
raise Exception(f'Test 1 failed: {diff_entry}')
print('.', end=' ')
entry['attr3'] = [b'val2']
entry['attr2'] = [b'val1', b'val3']
diff_entry = s4cache.diff_entry(entry_old, entry)
if diff_entry.get('changed') != {'attr2'} or diff_entry.get('removed') or diff_entry.get('added') != {'attr3'}:
raise Exception(f'Test 2 failed: {diff_entry}')
print('.', end=' ')
s4cache.add_entry(guid, entry)
entry_old = s4cache.get_entry(guid)
diff_entry = s4cache.diff_entry(entry_old, entry)
if diff_entry.get('changed') or diff_entry.get('removed') or diff_entry.get('added'):
raise Exception(f'Test 3 failed: {diff_entry}')
print('.', end=' ')
print(' done')