#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Mail Dovecot - shared code for listeners
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
import os
import grp
import stat
import subprocess
import re
import traceback
import imaplib
import shutil
import tempfile
try:
from typing import Any, Dict, List, Optional, Tuple # noqa: F401
except ImportError:
pass
import univention.admin.modules
from univention.admin.uldap import getMachineConnection
from univention.config_registry import handler_set
from univention.lib.misc import custom_username
from univention.mail.dovecot import DovecotListener
# UDM name → (IMAP, doveadm)
dovecot_acls = {
"read": ("lrws", ["lookup", "read", "write", "write-seen"]),
"post": ("lrwsp", ["lookup", "read", "write", "write-seen", "post"]),
"append": ("lrwspi", ["lookup", "read", "write", "write-seen", "post", "insert"]),
"write": ("lrwspite", ["lookup", "read", "write", "write-seen", "post", "insert", "write-deleted", "expunge"]),
"all": ("lrwspitekxa", ["lookup", "read", "write", "write-seen", "post", "insert", "write-deleted", "expunge", "create", "delete", "admin"]),
}
global_acl_path = '/etc/dovecot/global-acls'
glocal_acl_pattern1 = re.compile(r'(?P<folder>[^ ]+) "(?P<id>.+)" (?P<acl>\w+)')
glocal_acl_pattern2 = re.compile(r'(?P<folder>[^ ]+) (?P<id>.+) (?P<acl>\w+)')
[docs]class DovecotFolderAclEntry(object):
def __init__(self, folder_name, identifier, acl): # type: (str, str, str) -> None
self.folder_name = folder_name
self.identifier = identifier
self.acl = acl
def __eq__(self, other): # type: ignore
return all((
self.folder_name == other.folder_name,
self.identifier == other.identifier,
self.acl == other.acl
))
def __repr__(self): # type: () -> str
return '{} "{}" {}'.format(self.folder_name, self.identifier, self.acl)
[docs] @classmethod
def from_str(cls, line): # type (str) -> DovecotFolderAclEntry
# try with quotation marks first
m = glocal_acl_pattern1.match(line.strip())
if m:
val = m.groupdict()
return cls(val['folder'], val['id'], val['acl'])
# try without quotation marks (created with univention-mail-dovecot 3.0.1-4)
m = glocal_acl_pattern2.match(line.strip())
if m:
val = m.groupdict()
return cls(val['folder'], val['id'], val['acl'])
else:
raise ValueError("Line {!r} doesn't match ACL pattern.".format(line))
[docs]class DovecotGlobalAclFile(object):
dovemail_gid = grp.getgrnam('dovemail').gr_gid
def __init__(self, listener): # type: (Any) -> None
self.listener = listener
self._acls = list() # type: List[DovecotFolderAclEntry]
self._fix_permissions()
[docs] def add_acls(self, acl_list): # type: (List[DovecotFolderAclEntry]) -> None
self._read()
for acl in acl_list:
if acl not in self._acls:
self._acls.append(acl)
self._write()
[docs] def remove_acls(self, folder_name): # type: (str) -> None
self._read()
self._acls = [acl for acl in self._acls if acl.folder_name != folder_name]
self._write()
def _fix_permissions(self, path=global_acl_path, fileno=None): # type: (str, Optional[int]) -> None
def set_perms(fileno): # type: (int) -> None
os.fchown(fileno, 0, self.dovemail_gid)
os.fchmod(fileno, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
try:
self.listener.setuid(0)
if fileno:
set_perms(fileno)
else:
mode = 'rb' if os.path.exists(path) else 'wb'
with open(path, mode) as fp:
set_perms(fp.fileno())
finally:
self.listener.unsetuid()
def _read(self): # type: () -> None
self._acls = list()
try:
self.listener.setuid(0)
for line in open(global_acl_path, 'r'):
self._acls.append(DovecotFolderAclEntry.from_str(line))
finally:
self.listener.unsetuid()
def _write(self): # type: () -> None
fileno, filename = tempfile.mkstemp(prefix='.global-acls')
for acl in self._acls:
os.write(fileno, '{}\n'.format(acl).encode('UTF-8'))
self._fix_permissions(fileno=fileno)
os.close(fileno)
try:
self.listener.setuid(0)
shutil.move(filename, global_acl_path)
finally:
self.listener.unsetuid()
[docs]class DovecotSharedFolderListener(DovecotListener):
def __init__(self, *args, **kwargs):
super(DovecotSharedFolderListener, self).__init__(*args, **kwargs)
self.modules = ["mail/folder"]
self.acl_key = "univentionMailACL"
self.global_acls = DovecotGlobalAclFile(self.listener)
[docs] def add_shared_folder(self, new): # type: (Dict[str, List[bytes]]) -> None
if "mailPrimaryAddress" in new:
# use a shared folder
new_mailbox = new["mailPrimaryAddress"][0].decode('ASCII')
# the maildir will be autocreated by dovecot
acls = new.get(self.acl_key, [])[:]
acls = [acl.decode('UTF-8') for acl in acls]
# Even if there are no ACL entries, we must still _change_ at
# least one entry through IMAP, so the shared mailbox list
# dictionary is updated. Lets remove the (afterwards) unnecessary
# master-user entry.
acls.append("dovecotadmin none")
try:
# give master-user admin rights on mailbox
self.doveadm_set_mailbox_acls("shared/%s" % new_mailbox, ["dovecotadmin all"])
# use IMAP to set actual ACLs, so the shared mailbox list dictionary is updated
self.imap_set_mailbox_acls(new_mailbox, "INBOX", acls)
self.add_global_acls(new)
except Exception as exc:
self.log_e("Failed setting ACLs on new shared mailbox '%s': %s" % (new_mailbox, exc))
return
self.log_p("Created shared mailbox '%s'." % new_mailbox)
else:
# use a public folder
new_mailbox = new["cn"][0].decode('UTF-8')
try:
self.update_public_mailbox_configuration()
self.create_public_folder(new_mailbox)
acls = new.get(self.acl_key)
if acls:
acls = [acl.decode('UTF-8') for acl in acls]
self.doveadm_set_mailbox_acls("%s/INBOX" % new_mailbox, acls)
self.log_p("Set ACLs on '%s'." % new_mailbox)
except Exception:
self.log_e("Failed creating public mailbox '%s'." % new_mailbox)
return
self.log_p("Created public mailbox '%s'." % new_mailbox)
[docs] def del_shared_folder(self, old): # type: (Dict[str, List[bytes]]) -> None
if "mailPrimaryAddress" in old:
# shared folder
old_mailbox = old["mailPrimaryAddress"][0].decode('ASCII')
old_loc, old_domain = old_mailbox.split("@")
global_mail_home = self.get_maillocation()
path = str(global_mail_home).replace("%Ld", old_domain).replace("%Ln", old_loc)
# cannot unsubscribe to non-existing shared folder (a.k.a. private mailbox)
self.remove_global_acls(old)
else:
# public folder
old_mailbox = old["cn"][0].decode('UTF-8')
old_loc, old_domain = old_mailbox.split("@")
path = self.get_public_location(old_mailbox)
if self.acl_key in old:
# Only users with ACL entries can potentially have subscribed, unsubscribe them.
# For performance reasons this intentionally ignores groups.
folder = "%s/INBOX" % old_mailbox
self.unsubscribe_from_mailbox([acl.decode('ASCII').split()[0] for acl in old[self.acl_key] if b"@" in acl.split()[0]], folder)
# update namespaces
self.update_public_mailbox_configuration(delete_only=old_mailbox)
# remove mailbox from disk
if self.listener.configRegistry.is_true("mail/dovecot/mailbox/delete", False):
try:
self.listener.setuid(0)
shutil.rmtree(path, ignore_errors=True)
except Exception:
self.log_e("Error deleting mailbox '%s'." % old_mailbox)
return
finally:
self.listener.unsetuid()
self.log_p("Deleted mailbox '%s'." % old_mailbox)
else:
self.log_p("Deleting of mailboxes disabled (mailbox '%s')." % old_mailbox)
[docs] def mod_shared_folder(self, old, new): # type: (Dict[str, List[bytes]], Dict[str, List[bytes]]) -> None
if "mailPrimaryAddress" in new:
# use a shared folder
new_mailbox = new["mailPrimaryAddress"][0].decode('ASCII')
if "mailPrimaryAddress" in old:
# it remains a shared folder
old_mailbox = old["mailPrimaryAddress"][0].decode('ASCII')
if new_mailbox != old_mailbox:
# rename/move mailbox inside private namespace
#
# cannot unsubscribe to non-existing shared folder (a.k.a. private mailbox)
self.move_user_home(new_mailbox, old_mailbox, True)
self.remove_global_acls(old)
# self.add_global_acls(new) is further down
self.log_p("Moved mailbox '%s' -> '%s'." % (old_mailbox, new_mailbox))
else:
# no address change
pass
else:
# move mailbox from public to private namespace
self.log_p("Moving mailbox from public to private namespace...")
old_mailbox = old["cn"][0].decode('UTF-8')
try:
pub_loc = self.get_public_location(old_mailbox)
new_user_home = self.get_user_home(new_mailbox)
if self.acl_key in old:
old_acl_users = [acl.split()[0].decode('UTF-8') for acl in old[self.acl_key] if b"@" in acl.split()[0]]
self.unsubscribe_from_mailbox(old_acl_users, "%s/INBOX" % old_mailbox)
# update dovecot config
self.update_public_mailbox_configuration()
# move mail home
self.move_mail_home(pub_loc, new_user_home, new_mailbox, True)
old_maildir = os.path.join(new_user_home, ".INBOX")
new_maildir = os.path.join(new_user_home, "Maildir")
try:
# rename mailbox
self.listener.setuid(0)
shutil.move(old_maildir, new_maildir)
except Exception:
self.log_e("Failed to move mail home (of '%s') from '%s' to '%s'.\n%s" % (
new_mailbox, old_maildir, new_maildir, traceback.format_exc()))
raise
finally:
self.listener.unsetuid()
except Exception:
self.log_e("Could not rename/move mailbox ('%s' -> '%s').\n%s" % (old_mailbox, new_mailbox, traceback.format_exc()))
return
self.log_p("Moved mailbox '%s' -> '%s'." % (old_mailbox, new_mailbox))
# set ACLs
acls = self._diff_acls(old, new)
# Even if there are no ACL entries, we must still _change_ at
# least one entry through IMAP, so the shared mailbox list
# dictionary is updated. Lets remove the (afterwards) unnecessary
# master-user entry.
acls.append("dovecotadmin none")
try:
# give master-user admin rights on mailbox, so it can change its ACL
self.doveadm_set_mailbox_acls("shared/%s" % new_mailbox, ["dovecotadmin all"])
# use IMAP to set actual ACLs, so the shared mailbox list dictionary is updated
self.imap_set_mailbox_acls(new_mailbox, "INBOX", acls)
self.remove_global_acls(old)
self.add_global_acls(new)
except Exception as exc:
self.log_e("Failed setting ACLs on moved shared mailbox ('%s' -> '%s'): %s" % (old_mailbox, new_mailbox, exc))
return
self.log_p("Set ACLs on '%s'." % new_mailbox)
else:
# use a public folder
new_mailbox = new["cn"][0].decode('UTF-8')
if "mailPrimaryAddress" in old:
# move mailbox from private to public namespace
self.log_p("Moving mailbox from private to public namespace...")
old_mailbox = old["mailPrimaryAddress"][0].decode('ASCII')
old_loc, old_domain = old_mailbox.rsplit("@", 1)
# cannot unsubscribe to non-existing shared folder (a.k.a. private mailbox)
try:
global_mail_home = self.get_maillocation()
old_path = str(global_mail_home).replace("%Ld", old_domain).replace("%Ln", old_loc).lower()
# update dovecot config
self.update_public_mailbox_configuration()
pub_loc = self.get_public_location(new_mailbox)
# move mail home
self.move_mail_home(old_path, pub_loc, new_mailbox, True)
old_maildir = os.path.join(pub_loc, "Maildir")
new_maildir = os.path.join(pub_loc, ".INBOX")
try:
# rename mailbox
self.listener.setuid(0)
shutil.move(old_maildir, new_maildir)
except Exception:
self.log_e("Failed to move mail home (of '%s') from '%s' to '%s'.\n%s" % (
new_mailbox, old_maildir, new_maildir, traceback.format_exc()))
raise
finally:
self.listener.unsetuid()
self.remove_global_acls(old)
except Exception:
self.log_e("Could not rename/move mailbox ('%s' -> '%s').\n%s" % (old_mailbox, new_mailbox, traceback.format_exc()))
return
self.log_p("Moved mailbox '%s' -> '%s'." % (old_mailbox, new_mailbox))
else:
# it remained a public folder
# renaming of public folders is disabled in UDM
# quota may have changed, update dovecot config
self.update_public_mailbox_configuration()
# set ACLs
try:
curacl = self._diff_acls(old, new)
self.doveadm_set_mailbox_acls("%s/INBOX" % new_mailbox, curacl)
except Exception:
self.log_e("Error changing ACLs for mailbox '%s'." % new_mailbox)
self.log_p("Set ACLs on '%s'." % new_mailbox)
[docs] def get_public_location(self, ns): # type: (str) -> str
try:
pub_loc = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "namespace/" + ns + "/location"], r"maildir:(\S+):INDEXPVT.*")
except Exception:
self.log_e("Failed to get location of public folder '%s' from Dovecot configuration.\n%s" % (ns, traceback.format_exc()))
raise
return pub_loc
[docs] def create_public_folder(self, folder_name): # type: (str) -> str
try:
user, group = self.get_dovecot_user()
pub_loc = self.get_public_location(folder_name)
path = os.path.join(pub_loc, ".INBOX")
self.mkdir_p(pub_loc)
self.read_from_ext_proc_as_root(["/usr/bin/maildirmake.dovecot", path, "%s:%s" % (user, group)])
self.listener.setuid(0)
except Exception:
self.log_e("Failed to create maildir '%s'." % folder_name)
raise
finally:
self.listener.unsetuid()
return path
[docs] def read_from_ext_proc_as_root(self, cmd, regexp=None, stdin=None, stdout=subprocess.PIPE, stderr=None, stdin_input=None):
"""
Wrapper around Popen(), runs external command as root and return its
output, optionally the first hit of a regexp. May raise an exception.
:param cmd: list: with executable path as first item
:param regexp: string: regexp for re.findall()
:return: string
"""
try:
self.listener.setuid(0)
cmd_proc = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr)
cmd_out, cmd_err = cmd_proc.communicate(input=stdin_input and stdin_input.encode('UTF-8'))
cmd_exit = cmd_proc.wait()
if cmd_out and not cmd_err and cmd_exit == 0:
if regexp:
res = re.findall(regexp, cmd_out.decode('UTF-8'))
return res[0]
else:
return cmd_out.decode('UTF-8').rstrip()
finally:
self.listener.unsetuid()
[docs] def doveadm_set_mailbox_acls(self, mailbox, acls): # type: (str, List[str]) -> None
for acl in acls:
identifier, right = self._split_udm_imap_acl_doveadm(acl)
if right == "none":
cmd = ["/usr/bin/doveadm", "acl", "delete", "-u", custom_username("Administrator"), mailbox, identifier]
else:
cmd = ["/usr/bin/doveadm", "acl", "set", "-u", custom_username("Administrator"), mailbox, identifier]
cmd.extend(dovecot_acls[right][1])
try:
self.read_from_ext_proc_as_root(cmd)
except Exception:
self.log_e("Failed to set ACL using doveadm using command '%s'." % cmd)
raise
[docs] def imap_set_mailbox_acls(self, mb_owner, mailbox, acls): # type: (str, str, List[str]) -> None
master_name, master_pw = self.get_masteruser_credentials()
imap = None
try:
imap = imaplib.IMAP4("localhost")
imap.login("%s*%s" % (mb_owner, master_name), master_pw)
for acl in acls:
identifier, right = self._split_udm_imap_acl_imap(acl)
if right == "none":
imap.deleteacl(mailbox, identifier)
else:
# Bug #53111: escape double quotes within identifier, then put the string between
# double quotes to prevent problems with e.g. whitespace (e.g. group 'Domain Users').
imap.setacl(mailbox, '"{}"'.format(identifier.replace('"', r'\"')), dovecot_acls[right][0])
except Exception:
self.log_e("Failed to set ACLs '%s' on mailbox '%s' for '%s'.\n%s" % (acls, mailbox, mb_owner, traceback.format_exc()))
raise
finally:
if imap:
imap.logout()
[docs] def update_public_mailbox_configuration(self, delete_only=None): # type: (Optional[str]) -> None
"""
Cache public folders and their quota into a UCRV.
:param delete_only: if True removes only entry 'delete_only', else recreates from scratch.
:return: None
"""
# TODO: create distinct configurations for each server (honor univentionMailHomeServer)
# When deleting, remove only one entry, so in the case of multi-remove
# subsequent code can still access the remaining namespace configuration.
# In any other case (add/modify) recreate from scratch to ensure
# consistency with the LDAP.
if delete_only:
try:
self.listener.setuid(0)
old_info = self.listener.configRegistry.get("mail/dovecot/internal/sharedfolders", "").split()
emails_quota = [info for info in old_info if not info.startswith(delete_only + ":")]
except Exception:
self.log_e("update_public_mailbox_configuration(): Failed to update public mailbox configuration:\n%s" % traceback.format_exc())
raise
finally:
self.listener.unsetuid()
else:
public_folders = list() # type: List[Any]
for module in self.modules:
try:
public_folders.extend(self.get_udm_infos(module, "(!(mailPrimaryAddress=*))"))
except Exception:
self.log_e("update_public_mailbox_configuration(): Failed to update public mailbox configuration:\n%s" % traceback.format_exc())
raise
finally:
self.listener.unsetuid()
emails_quota = [
"%s@%s:%s" % (
pf["name"] or pf.dn.split("@")[0].split("=")[1],
pf["mailDomain"],
pf.get("mailQuota", 0)
)
for pf in public_folders
]
try:
self.listener.setuid(0)
handler_set(["mail/dovecot/internal/sharedfolders=%s" % " ".join(emails_quota)])
self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "reload"])
except Exception:
self.log_e("update_public_mailbox_configuration(): Failed to update public mailbox configuration:\n%s" % traceback.format_exc())
raise
finally:
self.listener.unsetuid()
self.log_p("Updated shared mailbox configuration.")
[docs] def unsubscribe_from_mailbox(self, users, mailbox): # type: (List[str], str) -> None
for user in users:
try:
self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "mailbox", "unsubscribe", "-u", user, mailbox])
except Exception:
self.log_e("Failed to unsubscribe user '%s' from mailbox '%s'." % (user, mailbox))
[docs] def get_udm_infos(self, udm_module, udm_filter): # type: (Any, str) -> List[Any]
try:
self.listener.setuid(0)
univention.admin.modules.update()
lo, po = getMachineConnection()
mod = univention.admin.modules.get(udm_module)
return mod.lookup(None, lo, udm_filter)
except Exception:
self.log_e("get_udm_infos(%s, %s): Failed to retrieve UDM info:\n%s" % (udm_module, udm_filter, traceback.format_exc()))
raise
finally:
self.listener.unsetuid()
def _diff_acls(self, old, new):
# type: (Dict[str, List[bytes]], Dict[str, List[bytes]]) -> List[str]
acl_diff = dict()
# find new ACLs
for acl in new.get(self.acl_key, []):
acl = acl.decode('UTF-8')
right = acl.split()[-1]
identifier = " ".join(acl.split()[:-1])
acl_diff[identifier] = right
# remove old ACLs
for acl in old.get(self.acl_key, []):
acl = acl.decode('UTF-8')
identifier = " ".join(acl.split()[:-1])
if identifier not in acl_diff:
acl_diff[identifier] = "none"
return [" ".join(x) for x in acl_diff.items()]
@staticmethod
def _split_udm_imap_acl_doveadm(udm_imap_acl): # type: (str) -> Tuple[str, str]
right = udm_imap_acl.split()[-1]
identifier = " ".join(udm_imap_acl.split()[:-1])
if "@" in identifier or identifier == "dovecotadmin":
identifier = "user=" + identifier
elif identifier in ["anyone", "authenticated"]:
pass
else:
identifier = "group=" + identifier
return identifier, right
@staticmethod
def _split_udm_imap_acl_imap(udm_imap_acl): # type: (str) -> Tuple[str, str]
identifier, right = udm_imap_acl.rsplit(None, 1)
if "@" in identifier or identifier in ["anyone", "authenticated", "dovecotadmin"]:
pass
else:
# group
identifier = '${}'.format(identifier)
return identifier, right
[docs] def add_global_acls(self, new): # type: (Dict[str, List[bytes]]) -> None
new_mailbox = 'shared/{}'.format(new["mailPrimaryAddress"][0].decode('ASCII'))
acls = new.get(self.acl_key, [])
folder_acls = list()
for acl in acls:
acl = acl.decode('UTF-8')
identifier, right = self._split_udm_imap_acl_doveadm(acl)
folder_acls.append(DovecotFolderAclEntry(new_mailbox, identifier, dovecot_acls[right][0]))
self.global_acls.add_acls(folder_acls)
[docs] def remove_global_acls(self, old): # type: (Dict[str, List[bytes]]) -> None
old_mailbox = 'shared/{}'.format(old["mailPrimaryAddress"][0].decode('ASCII'))
self.global_acls.remove_acls(old_mailbox)