Source code for univention.mail.dovecot

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Mail Dovecot - shared code for listeners
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

import os
import os.path
import subprocess
import re
import traceback
import pwd
import grp
import shutil
import stat
try:
	from typing import Any, Optional, Sequence, Tuple  # noqa: F401
except ImportError:
	pass

import univention.debug as ud

default_sieve_script = "/var/lib/dovecot/sieve/default.sieve"


[docs]class DovecotListener(object): def __init__(self, listener, name): # type: (str) -> None self.listener = listener self.name = name
[docs] def log_p(self, msg): # type: (str) -> None ud.debug(ud.LISTENER, ud.PROCESS, "%s: %s" % (self.name, msg))
[docs] def log_e(self, msg): # type: (str) -> None ud.debug(ud.LISTENER, ud.ERROR, "%s: %s" % (self.name, msg))
[docs] def new_email_account(self, email): # type: (str) -> None spam_folder = self.listener.configRegistry.get("mail/dovecot/folder/spam") if self.listener.configRegistry.is_true("mail/dovecot/sieve/spam", True)\ and spam_folder and spam_folder.lower() != "none": try: self.upload_activate_sieve_script(email, default_sieve_script) except Exception: self.log_e("dovecot: Could not upload sieve script to account '%s'." % email) raise finally: self.listener.unsetuid() return
[docs] def delete_email_account(self, dn, email): # type: (str, str) -> None if self.listener.configRegistry.is_true('mail/dovecot/mailbox/delete', False): try: old_localpart, old_domainpart = email.split("@") global_mail_home = self.get_maillocation() old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart) except Exception: self.log_e("dovecot: Delete mailbox: Configuration error. Could not remove mailbox (dn:'%s' old mail: '%s')." % (dn, email)) raise self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", email]) try: self.listener.setuid(0) shutil.rmtree(old_home_calc, ignore_errors=True) except Exception: self.log_e("dovecot: Delete mailbox: Error removing directory '%s' from disk." % old_home_calc) raise finally: self.listener.unsetuid() else: self.log_p("dovecot: Deleting of mailboxes disabled, not removing '%s' (dn '%s')." % (email, dn)) return
[docs] def read_from_ext_proc_as_root(self, cmd, regexp=None, stdin=None, stdout=subprocess.PIPE, stderr=None, stdin_input=None): # type; (Sequence[str], Optional[str], Optional[str], Any, Any, Any) -> str """ Wrapper around Popen(), runs external command as root and return its output, optionally the first hit of a regexp. May raise an exception. :param cmd: list: with executable path as first item :param regexp: string: regexp for re.findall() :return: string """ try: self.listener.setuid(0) cmd_proc = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr) cmd_out, cmd_err = cmd_proc.communicate(input=stdin_input and stdin_input.encode('UTF-8')) cmd_exit = cmd_proc.wait() if cmd_out and not cmd_err and cmd_exit == 0: if regexp: res = re.findall(regexp, cmd_out.decode('UTF-8')) return res[0] else: return cmd_out.decode('UTF-8').rstrip() finally: self.listener.unsetuid()
[docs] def move_user_home(self, newMailPrimaryAddress, oldMailPrimaryAddress, force_rename=False): # type: (str, str, bool) -> None if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False): self.log_p("Renaming of mailboxes disabled, not moving ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress)) return old_localpart, old_domainpart = oldMailPrimaryAddress.lower().split("@") try: global_mail_home = self.get_maillocation() old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart) new_home_dove = self.get_user_home(newMailPrimaryAddress) except Exception: self.log_e("Move mailbox: Configuration error. Could not move mailbox ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress)) return try: self.listener.setuid(0) if not os.path.isdir(old_home_calc): # Either the user never logged in or never got any email, and thus no maildir was ever created, # or it was moved manually. In any case: ignore. self.log_p("Move mailbox: Source directory ('%s') does not exist. Nothing to do for mailbox move ('%s' -> '%s')." % (old_home_calc, oldMailPrimaryAddress, newMailPrimaryAddress)) return if os.path.isdir(new_home_dove) or os.path.isfile(new_home_dove): # We don't know why there is a file or directory already. For security reasons we don't do anything. self.log_e("Move mailbox: Target directory ('%s') exists. For security reasons not moving mailbox for mailbox move ('%s' -> '%s')." % (new_home_dove, oldMailPrimaryAddress, newMailPrimaryAddress)) return finally: self.listener.unsetuid() try: self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", oldMailPrimaryAddress]) except Exception: # ignore pass try: self.move_mail_home(old_home_calc, new_home_dove, newMailPrimaryAddress, force_rename) except Exception: self.log_e("Move mailbox: Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % ( newMailPrimaryAddress, old_home_calc, new_home_dove, traceback.format_exc())) return self.log_p("Moved mail home (of mail: '%s') from '%s' to '%s'." % (newMailPrimaryAddress, old_home_calc, new_home_dove)) return
[docs] def move_mail_home(self, old_path, new_path, email, force_rename=False): # type: (str, str, str, bool) -> None # create parent path in any case to make sure it has correct ownership self.mkdir_p(os.path.dirname(new_path)) if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False): self.log_p("Renaming of mailboxes disabled, not moving mail home (of mail '%s') from '%s' to '%s." % (email, old_path, new_path)) return try: self.listener.setuid(0) st = os.stat(old_path) shutil.move(old_path, new_path) self.chown_r(new_path, st[stat.ST_UID], st[stat.ST_GID]) except Exception: self.log_e("Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % ( email, old_path, new_path, traceback.format_exc())) raise finally: self.listener.unsetuid()
[docs] def get_maillocation(self): # type: () -> str try: return self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_location"], r"\S+:(\S+)/Maildir") except Exception: self.log_e("Failed to get mail_location from Dovecot configuration.\n%s" % traceback.format_exc()) raise
[docs] def upload_activate_sieve_script(self, email, file): # type: (str, str) -> None try: master_name, master_pw = self.get_masteruser_credentials() ca_file = self.listener.configRegistry.get("mail/dovecot/sieve/client/cafile", "/etc/univention/ssl/ucsCA/CAcert.pem") fqdn = "%s.%s" % (self.listener.configRegistry['hostname'], self.listener.configRegistry['domainname']) fqdn = self.listener.configRegistry.get("mail/dovecot/sieve/client/server", fqdn) _cmd = [ "sieve-connect", "--user", "%s*%s" % (email, master_name), "--server", fqdn, "--noclearauth", "--noclearchan", "--tlscafile", ca_file, "--remotesieve", "default"] cmd_upload = list(_cmd) cmd_upload.extend(["--localsieve", file, "--upload"]) self.read_from_ext_proc_as_root(cmd_upload, stdin=subprocess.PIPE, stdin_input=master_pw) cmd_activate = list(_cmd) cmd_activate.extend(["--activate"]) self.read_from_ext_proc_as_root(cmd_activate, stdin=subprocess.PIPE, stdin_input=master_pw) except Exception: self.log_e("upload_activate_sieve_script(): Could not upload sieve script '%s' to mailbox '%s'. Exception:\n%s" % (file, email, traceback.format_exc())) raise
[docs] def get_user_home(self, username): # type: (str) -> str try: return self.read_from_ext_proc_as_root(["/usr/bin/doveadm", 'user', "-f", "home", username]).lower() except Exception: self.log_e("Failed to get mail home for user '%s'.\n%s" % (username, traceback.format_exc())) raise
[docs] def get_masteruser_credentials(self): # type: () -> Tuple[str, str] try: self.listener.setuid(0) return re.findall(r"(\S+):{PLAIN}(\S+)::::::", open("/etc/dovecot/master-users").read())[0] except Exception: self.log_e("Failed to get masteruser password.\n%s" % traceback.format_exc()) raise finally: self.listener.unsetuid()
[docs] def get_dovecot_user(self): # type: () -> Tuple[str, str] if not hasattr(self, "dovecot_user") or not hasattr(self, "dovecot_group"): try: uid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_uid"]) gid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_gid"]) except Exception: uid = "dovemail" gid = "dovemail" self.dovecot_user = uid self.dovecot_group = gid return self.dovecot_user, self.dovecot_group
[docs] def mkdir_p(self, dir): # type: (str) -> None user, group = self.get_dovecot_user() dovecot_uid = pwd.getpwnam(user).pw_uid dovecot_gid = grp.getgrnam(group).gr_gid # spool directory has to be traversed as root self.listener.setuid(0) parent = os.path.dirname(dir) if not os.path.exists(parent): self.listener.unsetuid() self.mkdir_p(parent) else: self.listener.unsetuid() try: self.listener.setuid(0) if not os.path.exists(dir): os.mkdir(dir, 0o2700) os.chown(dir, dovecot_uid, dovecot_gid) except Exception: self.log_e("Failed to create directory '%s'.\n%s" % (dir, traceback.format_exc())) raise finally: self.listener.unsetuid()
[docs] @classmethod def chown_r(cls, path, uid, gid): # type: (str, int, int) -> None """ Recursively set owner and group on a file/directory and its subdirectories. :param str path: file/directory (and its subdirectories) to change ownership on :param int uid: UID to set :param int gid: GID to set :return: None """ def chown_if_different(path_, uid_, gid_): st = os.stat(path_) if st[stat.ST_UID] != uid_ or st[stat.ST_GID] != gid_: os.chown(path_, uid_, gid_) chown_if_different(path, uid, gid) for dirpath, dirnames, filenames in os.walk(path): for dirname in dirnames: cls.chown_r(os.path.join(dirpath, dirname), uid, gid) for filename in filenames: chown_if_different(os.path.join(dirpath, filename), uid, gid)