Source code for univention.management.console.modules.passwordreset.sending.send_with_external
#!/usr/bin/python3
#
# Send a token to a user by email.
#
# SPDX-FileCopyrightText: 2015-2025 Univention GmbH
# SPDX-License-Identifier: AGPL-3.0-only
#
#
# This is meant as an example. Please feel free to copy this file and adapt #
# it to your needs. #
#
#
#
#
# If the return code is other that True or an exception is raised and not #
# caught, it is assumed that it was not possible to send the token to the #
# user. The token is then deleted from the database. #
#
#
import os
import subprocess
from univention.config_registry import ConfigRegistry
from univention.lib.i18n import Translation
from univention.management.console.modules.passwordreset.send_plugin import UniventionSelfServiceTokenEmitter
_ = Translation('univention-self-service-passwordreset-umc').translate
ucr = ConfigRegistry()
ucr.load()
[docs]
class SendWithExternal(UniventionSelfServiceTokenEmitter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cmd = self.ucr.get("umc/self-service/passwordreset/external/command", "").split()
if not self.cmd:
raise ValueError("SendWithExternal: UCR umc/self-service/passwordreset/external/command must contain the path to the program to execute.")
[docs]
@staticmethod
def send_method():
return ucr.get("umc/self-service/passwordreset/external/method")
[docs]
@staticmethod
def send_method_label():
return ucr.get("umc/self-service/passwordreset/external/method_label", _("External"))
[docs]
@staticmethod
def is_enabled():
return ucr.is_true("umc/self-service/passwordreset/external/enabled")
@property
def udm_property(self):
ucr.load()
return ucr.get("umc/self-service/passwordreset/external/udm_property")
@property
def token_length(self):
ucr.load()
length = ucr.get("umc/self-service/passwordreset/external/token_length", 64)
try:
length = int(length)
except ValueError:
length = 64
return length
[docs]
def send(self):
env = os.environ.copy()
env["selfservice_username"] = self.data["username"]
env["selfservice_address"] = self.data["address"]
env["selfservice_token"] = self.data["token"]
#
#
# ATTENTION #
# The environment is inherited by all programs that are started by your #
# program. Your program should remove the token from its environment, #
# before starting any other program. #
#
#
self.log("Starting external program %s...", self.cmd)
cmd_proc = subprocess.Popen(self.cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd_out, cmd_err = cmd_proc.communicate()
cmd_out, cmd_err = cmd_out.decode('UTF-8', 'replace'), cmd_err.decode('UTF-8', 'replace')
cmd_exit = cmd_proc.wait()
if cmd_out:
self.log("STDOUT of %s: %s", self.cmd, cmd_out)
if cmd_err:
self.log("STDERR of %s: %s", self.cmd, cmd_err)
if cmd_exit == 0:
return True
else:
raise Exception("Error sending token.")