#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
# univention-app wrapper for udm functions
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#
from ldap.filter import escape_filter_chars
from ldap.dn import explode_dn, escape_dn_chars
import univention.admin.objects as udm_objects
import univention.admin.modules as udm_modules
import univention.admin.filter as udm_filter
import univention.admin.uexceptions as udm_errors
from univention.uldap import access as base_access
from univention.admin.uldap import getMachineConnection, getAdminConnection, access, position
from univention.appcenter.ucr import ucr_get
_initialized = set()
[docs]class FakeApp(object):
def __init__(self, id, version):
self.id = id
self.version = version
def _update_modules():
if not _initialized:
udm_modules.update()
def _get_module(module, lo, pos):
_update_modules()
mod = udm_modules.get(module)
if module not in _initialized:
udm_modules.init(lo, pos, mod)
_initialized.add(module)
return mod
[docs]def init_object(module, lo, pos, dn='', attrs=None):
module = _get_module(module, lo, pos)
obj = udm_objects.get(module, None, lo, pos, dn)
udm_objects.open(obj)
if attrs:
if 'policies' in attrs:
obj.policies = attrs.pop('policies')
for key, value in attrs.items():
obj[key] = value
return obj
[docs]def remove_object_if_exists(module, lo, pos, dn):
try:
obj = init_object(module, lo, pos, dn)
obj.remove()
except udm_errors.noObject:
pass
else:
udm_objects.performCleanup(obj)
[docs]def create_object_if_not_exists(_module, _lo, _pos, **kwargs):
obj = init_object(_module, _lo, _pos, attrs=kwargs)
dn = obj._ldap_dn()
try:
init_object(_module, _lo, _pos, dn)
except udm_errors.noObject:
obj.create()
return obj
else:
# dn already exists
return
[docs]def modify_object(_module, _lo, _pos, _dn, **kwargs):
try:
obj = init_object(_module, _lo, _pos, _dn, attrs=kwargs)
except udm_errors.noObject:
return
else:
obj.modify()
return obj
[docs]def search_objects(_module, _lo, _pos, _base='', **kwargs):
module = _get_module(_module, _lo, _pos)
expressions = []
conj = udm_filter.conjunction('&', expressions)
for key, value in kwargs.items():
expressions.append(udm_filter.expression(key, escape_filter_chars(value), '='))
try:
objs = module.lookup(None, _lo, str(conj), base=_base)
except udm_errors.noObject:
objs = []
for obj in objs:
udm_objects.open(obj)
return objs
[docs]def dn_exists(dn, lo):
try:
lo.searchDn(base=dn, scope='base')
except udm_errors.noObject:
return False
else:
return True
[docs]def get_machine_connection():
return getMachineConnection()
[docs]def get_admin_connection():
return getAdminConnection()
[docs]def get_connection(userdn, password):
port = int(ucr_get('ldap/master/port', '7389'))
host = ucr_get('ldap/master')
base = ucr_get('ldap/base')
lo = base_access(host=host, port=port, base=base, binddn=userdn, bindpw=password)
lo = access(lo=lo)
pos = position(lo.base)
return lo, pos
[docs]def get_read_connection(userdn, password):
port = int(ucr_get('ldap/server/port', '7389'))
host = ucr_get('ldap/server/name')
base = ucr_get('ldap/base')
lo = base_access(host=host, port=port, base=base, binddn=userdn, bindpw=password)
lo = access(lo=lo)
pos = position(lo.base)
return lo, pos
[docs]class ApplicationLDAPObject(object):
def __init__(self, app, lo, pos, create_if_not_exists=False):
self._localhost = '%s.%s' % (ucr_get('hostname'), ucr_get('domainname'))
self._udm_obj = None
self._rdn = '%s_%s' % (app.id, app.version)
self._container = 'cn=%s,cn=apps,cn=univention,%s' % (escape_dn_chars(app.id), ucr_get('ldap/base'))
self._lo = lo
self._pos = pos
self._reload(app, create_if_not_exists)
def __bool__(self):
return self._udm_obj is not None
__nonzero__ = __bool__ # py2 compat
@property
def dn(self):
return 'univentionAppID=%s,%s' % (escape_dn_chars(self._rdn), self._container)
def _reload(self, app, create_if_not_exists=False):
try:
self._udm_obj = init_object('appcenter/app', self._lo, self._pos, self.dn)
except udm_errors.noObject:
self._udm_obj = None
if create_if_not_exists:
self._create_obj(app)
def _create_obj(self, app):
create_recursive_container(self._container, self._lo, self._pos)
self._pos.setDn(self._container)
base64icon = ''
attrs = {
'id': self._rdn,
'name': app.get_localised_list('name'),
'version': app.version,
'shortDescription': app.get_localised_list('description'),
'longDescription': app.get_localised_list('long_description'),
'contact': app.contact,
'maintainer': app.maintainer,
'website': app.get_localised_list('website'),
'websiteVendor': app.get_localised_list('website_vendor'),
'websiteMaintainer': app.get_localised_list('website_maintainer'),
'icon': base64icon,
'category': app.get_localised('categories'),
'webInterface': app.web_interface,
'webInterfaceName': app.web_interface_name,
'conflictingApps': app.conflicted_apps,
'conflictingSystemPackages': app.conflicted_system_packages,
'defaultPackages': app.default_packages,
'defaultPackagesMaster': app.default_packages_master,
'umcModuleName': app.umc_module_name,
'umcModuleFlavor': app.umc_module_flavor,
'serverRole': app.server_role,
}
obj = create_object_if_not_exists('appcenter/app', self._lo, self._pos, **attrs)
if obj:
self._reload(app, create_if_not_exists=False)
[docs] @classmethod
def from_udm_obj(cls, udm_obj, lo, pos):
app_id = explode_dn(udm_obj.dn, 1)[1]
app = FakeApp(id=app_id, version=udm_obj.info.get('version'))
return cls(app, lo, pos)
[docs] def add_localhost(self):
self._udm_obj.info.setdefault('server', [])
if self._localhost not in self._udm_obj.info['server']:
self._udm_obj.info['server'].append(self._localhost)
self._udm_obj.modify()
for ldap_object in self.get_siblings():
if not self._lo.compare_dn(self.dn, ldap_object.dn):
app_obj = self.from_udm_obj(ldap_object, self._lo, self._pos)
app_obj.remove_localhost()
[docs] def remove_localhost(self):
try:
self._udm_obj.info.setdefault('server', [])
self._udm_obj.info['server'].remove(self._localhost)
except ValueError:
pass
else:
self._udm_obj.modify()
if not self.anywhere_installed():
self.remove_from_directory()
[docs] def remove_from_directory(self):
remove_object_if_exists('appcenter/app', self._lo, self._pos, self.dn)
[docs] def installed_on_servers(self):
if not self:
return []
return self._udm_obj.info.get('server', [])
[docs] def get_siblings(self):
return search_objects('appcenter/app', self._lo, self._pos, self._container)
[docs] def anywhere_installed(self):
return bool(self.installed_on_servers())
[docs]def get_app_ldap_object(app, lo=None, pos=None, or_create=False):
if lo is None or pos is None:
lo, pos = get_machine_connection()
return ApplicationLDAPObject(app, lo, pos, or_create)
[docs]def create_recursive_container(dn, lo, pos):
if dn_exists(dn, lo):
return
position_parts = explode_dn(dn)
previous_position = ','.join(position_parts[1:])
create_recursive_container(previous_position, lo, pos)
pos.setDn(previous_position)
name = explode_dn(position_parts[0], 1)[0]
if dn.startswith('ou'):
module = 'container/ou'
else:
module = 'container/cn'
create_object_if_not_exists(module, lo, pos, name=name)