#!/usr/bin/python3
#
# Univention Mail Dovecot - shared code for listeners
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
import grp
import os
import os.path
import pwd
import re
import shutil
import stat
import subprocess
import traceback
from collections.abc import Sequence
from typing import Any
import univention.debug as ud
default_sieve_script = "/var/lib/dovecot/sieve/default.sieve"
[docs]
class DovecotListener:
def __init__(self, listener: str, name: str) -> None:
self.listener = listener
self.name = name
[docs]
def log_p(self, msg: str) -> None:
ud.debug(ud.LISTENER, ud.PROCESS, "%s: %s" % (self.name, msg))
[docs]
def log_e(self, msg: str) -> None:
ud.debug(ud.LISTENER, ud.ERROR, "%s: %s" % (self.name, msg))
[docs]
def new_email_account(self, email: str) -> None:
spam_folder = self.listener.configRegistry.get("mail/dovecot/folder/spam")
if self.listener.configRegistry.is_true("mail/dovecot/sieve/spam", True)\
and spam_folder and spam_folder.lower() != "none":
try:
self.upload_activate_sieve_script(email, default_sieve_script)
except Exception:
self.log_e("dovecot: Could not upload sieve script to account '%s'." % email)
raise
finally:
self.listener.unsetuid()
[docs]
def delete_email_account(self, dn: str, email: str) -> None:
if self.listener.configRegistry.is_true('mail/dovecot/mailbox/delete', False):
try:
old_localpart, old_domainpart = email.split("@")
global_mail_home = self.get_maillocation()
old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart)
except Exception:
self.log_e("dovecot: Delete mailbox: Configuration error. Could not remove mailbox (dn:'%s' old mail: '%s')." % (dn, email))
raise
self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", email])
try:
self.listener.setuid(0)
shutil.rmtree(old_home_calc, ignore_errors=True)
except Exception:
self.log_e("dovecot: Delete mailbox: Error removing directory '%s' from disk." % old_home_calc)
raise
finally:
self.listener.unsetuid()
else:
self.log_p("dovecot: Deleting of mailboxes disabled, not removing '%s' (dn '%s')." % (email, dn))
[docs]
def read_from_ext_proc_as_root(self, cmd: Sequence[str], regexp: str | None = None, stdin: str | None = None, stdout: Any = subprocess.PIPE, stderr: Any = None, stdin_input: Any = None) -> str:
"""
Wrapper around Popen(), runs external command as root and return its
output, optionally the first hit of a regexp. May raise an exception.
:param cmd: list: with executable path as first item
:param regexp: string: regexp for re.findall()
:return: string
"""
try:
self.listener.setuid(0)
cmd_proc = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr)
cmd_out, cmd_err = cmd_proc.communicate(input=stdin_input and stdin_input.encode('UTF-8'))
cmd_exit = cmd_proc.wait()
if cmd_out and not cmd_err and cmd_exit == 0:
if regexp:
res = re.findall(regexp, cmd_out.decode('UTF-8'))
return res[0]
else:
return cmd_out.decode('UTF-8').rstrip()
finally:
self.listener.unsetuid()
[docs]
def move_user_home(self, newMailPrimaryAddress: str, oldMailPrimaryAddress: str, force_rename: bool = False) -> None:
if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False):
self.log_p("Renaming of mailboxes disabled, not moving ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress))
return
old_localpart, old_domainpart = oldMailPrimaryAddress.lower().split("@")
try:
global_mail_home = self.get_maillocation()
old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart)
new_home_dove = self.get_user_home(newMailPrimaryAddress)
except Exception:
self.log_e("Move mailbox: Configuration error. Could not move mailbox ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress))
return
try:
self.listener.setuid(0)
if not os.path.isdir(old_home_calc):
# Either the user never logged in or never got any email, and thus no maildir was ever created,
# or it was moved manually. In any case: ignore.
self.log_p("Move mailbox: Source directory ('%s') does not exist. Nothing to do for mailbox move ('%s' -> '%s')." % (old_home_calc, oldMailPrimaryAddress, newMailPrimaryAddress))
return
if os.path.isdir(new_home_dove) or os.path.isfile(new_home_dove):
# We don't know why there is a file or directory already. For security reasons we don't do anything.
self.log_e("Move mailbox: Target directory ('%s') exists. For security reasons not moving mailbox for mailbox move ('%s' -> '%s')." % (new_home_dove, oldMailPrimaryAddress, newMailPrimaryAddress))
return
finally:
self.listener.unsetuid()
try:
self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", oldMailPrimaryAddress])
except Exception:
# ignore
pass
try:
self.move_mail_home(old_home_calc, new_home_dove, newMailPrimaryAddress, force_rename)
except Exception:
self.log_e("Move mailbox: Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % (
newMailPrimaryAddress, old_home_calc, new_home_dove, traceback.format_exc()))
return
self.log_p("Moved mail home (of mail: '%s') from '%s' to '%s'." % (newMailPrimaryAddress, old_home_calc, new_home_dove))
return
[docs]
def move_mail_home(self, old_path: str, new_path: str, email: str, force_rename: bool = False) -> None:
# create parent path in any case to make sure it has correct ownership
self.mkdir_p(os.path.dirname(new_path))
if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False):
self.log_p("Renaming of mailboxes disabled, not moving mail home (of mail '%s') from '%s' to '%s." % (email, old_path, new_path))
return
try:
self.listener.setuid(0)
st = os.stat(old_path)
shutil.move(old_path, new_path)
self.chown_r(new_path, st[stat.ST_UID], st[stat.ST_GID])
except Exception:
self.log_e("Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % (
email, old_path, new_path, traceback.format_exc()))
raise
finally:
self.listener.unsetuid()
[docs]
def get_maillocation(self) -> str:
try:
return self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_location"], r"\S+:(\S+)/Maildir")
except Exception:
self.log_e("Failed to get mail_location from Dovecot configuration.\n%s" % traceback.format_exc())
raise
[docs]
def upload_activate_sieve_script(self, email: str, file: str) -> None:
try:
master_name, master_pw = self.get_masteruser_credentials()
ca_file = self.listener.configRegistry.get("mail/dovecot/sieve/client/cafile", "/etc/univention/ssl/ucsCA/CAcert.pem")
fqdn = "%(hostname)s.%(domainname)s" % self.listener.configRegistry
fqdn = self.listener.configRegistry.get("mail/dovecot/sieve/client/server", fqdn)
_cmd = [
"sieve-connect", "--user", "%s*%s" % (email, master_name),
"--server", fqdn,
"--noclearauth", "--noclearchan",
"--tlscafile", ca_file,
"--remotesieve", "default"]
cmd_upload = list(_cmd)
cmd_upload.extend(["--localsieve", file, "--upload"])
self.read_from_ext_proc_as_root(cmd_upload, stdin=subprocess.PIPE, stdin_input=master_pw)
cmd_activate = list(_cmd)
cmd_activate.extend(["--activate"])
self.read_from_ext_proc_as_root(cmd_activate, stdin=subprocess.PIPE, stdin_input=master_pw)
except Exception:
self.log_e("upload_activate_sieve_script(): Could not upload sieve script '%s' to mailbox '%s'. Exception:\n%s" % (file, email, traceback.format_exc()))
raise
[docs]
def get_user_home(self, username: str) -> str:
try:
return self.read_from_ext_proc_as_root(["/usr/bin/doveadm", 'user', "-f", "home", username]).lower()
except Exception:
self.log_e("Failed to get mail home for user '%s'.\n%s" % (username, traceback.format_exc()))
raise
[docs]
def get_masteruser_credentials(self) -> tuple[str, str]:
try:
self.listener.setuid(0)
return re.findall(r"(\S+):{PLAIN}(\S+)::::::", open("/etc/dovecot/master-users").read())[0]
except Exception:
self.log_e("Failed to get masteruser password.\n%s" % traceback.format_exc())
raise
finally:
self.listener.unsetuid()
[docs]
def get_dovecot_user(self) -> tuple[str, str]:
if not hasattr(self, "dovecot_user") or not hasattr(self, "dovecot_group"):
try:
uid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_uid"])
gid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_gid"])
except Exception:
uid = "dovemail"
gid = "dovemail"
self.dovecot_user = uid
self.dovecot_group = gid
return self.dovecot_user, self.dovecot_group
[docs]
def mkdir_p(self, dir: str) -> None:
user, group = self.get_dovecot_user()
dovecot_uid = pwd.getpwnam(user).pw_uid
dovecot_gid = grp.getgrnam(group).gr_gid
# spool directory has to be traversed as root
self.listener.setuid(0)
parent = os.path.dirname(dir)
if not os.path.exists(parent):
self.listener.unsetuid()
self.mkdir_p(parent)
else:
self.listener.unsetuid()
try:
self.listener.setuid(0)
if not os.path.exists(dir):
os.mkdir(dir, 0o2700)
os.chown(dir, dovecot_uid, dovecot_gid)
except Exception:
self.log_e("Failed to create directory '%s'.\n%s" % (dir, traceback.format_exc()))
raise
finally:
self.listener.unsetuid()
[docs]
@classmethod
def chown_r(cls, path: str, uid: int, gid: int) -> None:
"""
Recursively set owner and group on a file/directory and its
subdirectories.
:param str path: file/directory (and its subdirectories) to change ownership on
:param int uid: UID to set
:param int gid: GID to set
:return: None
"""
def chown_if_different(path_, uid_, gid_):
st = os.stat(path_)
if st[stat.ST_UID] != uid_ or st[stat.ST_GID] != gid_:
os.chown(path_, uid_, gid_)
chown_if_different(path, uid, gid)
for dirpath, dirnames, filenames in os.walk(path):
for dirname in dirnames:
cls.chown_r(os.path.join(dirpath, dirname), uid, gid)
for filename in filenames:
chown_if_different(os.path.join(dirpath, filename), uid, gid)