Source code for ucsschool.netlogon

# Univention UCS@school
#
# Copyright 2007-2021 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.

import os
import sqlite3
import stat
from typing import Any, List, Optional, Tuple

import univention.config_registry

FN_NETLOGON_USER_QUEUE = "/var/spool/ucs-school-netlogon-user-logonscripts/user_queue.sqlite"


[docs]def get_netlogon_path_list(): if not get_netlogon_path_list.script_path: ucr = univention.config_registry.ConfigRegistry() ucr.load() result = [] ucsschool_netlogon_path = ucr.get("ucsschool/userlogon/netlogon/path", "").strip().rstrip("/") samba_netlogon_path = ucr.get("samba/share/netlogon/path", "").strip().rstrip("/") if ucsschool_netlogon_path: result.append(ucsschool_netlogon_path) elif samba_netlogon_path: result.append(samba_netlogon_path) else: result.append("/var/lib/samba/netlogon/user") result.append( "/var/lib/samba/sysvol/%s/scripts/user" % (ucr.get("kerberos/realm", "").lower(),) ) get_netlogon_path_list.script_path = result return get_netlogon_path_list.script_path
get_netlogon_path_list.script_path = []
[docs]class SqliteQueueException(Exception): pass
[docs]class Cursor(object): """Open DB, execute command, close DB.""" def __init__(self, filename): # type: (str) -> None self._db = None self._cursor = None self.filename = filename def __enter__(self): # type: () -> Cursor self._db = sqlite3.connect(self.filename, timeout=30) # type: sqlite3.Connection os.chown(self.filename, 0, 0) os.chmod(self.filename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) self._cursor = self._db.cursor() # type: sqlite3.Cursor return self def __exit__(self, exc_type, exc_value, traceback): self._db.commit() self._db.close() self._cursor = None self._db = None
[docs] def execute(self, query, params=None): # type: (str, str) -> sqlite3.Cursor if params: return self._cursor.execute(query, params) else: return self._cursor.execute(query)
[docs] def fetchone(self): # type: () -> None return self._cursor.fetchone()
[docs]class SqliteQueue(object): """ Holds items (user DNs) in a FIFO queue. """ IDX_DB_DN = 0 def __init__(self, logger, filename=None): # type: (Any, Optional[str]) -> None self.filename = filename if filename is not None else FN_NETLOGON_USER_QUEUE self.logger = logger self.setup_database()
[docs] def setup_database(self): # type: () -> None """Open DB connection, optionally create it, create cursor.""" # create directory if missing if not os.path.exists(os.path.dirname(self.filename)): self.logger.error("directory %r does not exist" % (os.path.dirname(self.filename),)) raise SqliteQueueException( "Cannot open database - directory %r does not exist" % (os.path.dirname(self.filename),) ) if not os.path.exists(self.filename): self.logger.warning( "database does not exist - creating new one (filename=%r)" % (self.filename,) ) with Cursor(self.filename) as cursor: # create table if missing cursor.execute( u"CREATE TABLE IF NOT EXISTS user_queue (id INTEGER PRIMARY KEY AUTOINCREMENT, userdn" u" TEXT, username TEXT)" ) # create index if missing cursor.execute(u"CREATE INDEX IF NOT EXISTS idx_userdn ON user_queue (userdn)")
[docs] def truncate_database(self): # type: () -> None # SQLITE does not have a TRUNCATE TABLE command, but DELETE FROM # without WHERE is optimized to delete the entire table without # iterating over its rows. with Cursor(self.filename) as cursor: cursor.execute(u"DELETE FROM user_queue") with Cursor(self.filename) as cursor: cursor.execute(u"VACUUM")
[docs] def add(self, users): # type: (List[Tuple[str, str]]) -> None """ Adds a user DN to user queue if not already existant. If the user DN already exists in queue, the queue item remains unchanged. userdn and username have to be UTF-8 encoded strings or unicode strings. :param users - list of 2-tuples: (userdn, username) """ with Cursor(self.filename) as cursor: for userdn, username in users: if isinstance(userdn, bytes): # Python 2 userdn = userdn.decode("utf-8") if isinstance(username, bytes): # Python 2 username = username.decode("utf-8") if username is not None: cursor.execute( u"insert or replace into user_queue (id, userdn, username) VALUES " u"((select id from user_queue where userdn = ?), ?, ?)", (userdn, userdn, username), ) else: cursor.execute( u"insert or replace into user_queue (id, userdn, username) VALUES " u"((select id from user_queue where userdn = ?), ?, " u"(select username from user_queue where userdn = ?))", (userdn, userdn, userdn), ) self.logger.debug( "added/updated entries: {}".format( ", ".join( [ "username={!r}".format(_username) if _username else "userdn={!r}".format(_userdn) for _userdn, _username in users ] ) ) )
[docs] def remove(self, userdn): # type: (str) -> None """ Removes a specific user DN from queue. userdn has to be a UTF-8 encoded string or unicode string. """ if isinstance(userdn, bytes): # Python 2 userdn = userdn.decode("utf-8") with Cursor(self.filename) as cursor: cursor.execute(u"DELETE FROM user_queue WHERE userdn=?", (userdn,)) self.logger.debug("removed entry: userdn=%r" % (userdn,))
[docs] def query_next_user(self): # type: () -> [str] """ Returns next user dn and username of user_queue as UTF-8 encoded strings. """ query = u"SELECT userdn,username FROM user_queue ORDER BY id LIMIT 1" self.logger.debug("starting sqlite query: %r" % (query,)) with Cursor(self.filename) as cursor: cursor.execute(query) row = cursor.fetchone() if row is not None: userdn = row[0] if userdn is not None and bytes is str: # Python 2 userdn = userdn.encode("utf-8") username = row[1] if username is not None and bytes is str: # Python 2 username = username.encode("utf-8") self.logger.debug("next entry: userdn=%r" % (userdn,)) return userdn, username return None, None