from __future__ import print_function
from os import getenv
from subprocess import PIPE, Popen
from sys import exit
from time import sleep
from samba.param import LoadParm
from ucsschool.lib import schoolldap
from univention.config_registry import ConfigRegistry
from univention.lib.umc import ConnectionError, HTTPError
from univention.testing import utils
from univention.testing.codes import TestCodes
from univention.testing.umc import Client
[docs]class TestSamba4(object):
def __init__(self):
"""
Test class constructor
"""
self.UCR = ConfigRegistry()
self.UCR.load()
self.client = None
self.admin_username = ""
self.admin_password = ""
self.ldap_master = ""
self.gpo_reference = ""
[docs] def return_code_result_skip(self):
"""
Stops the test returning the code 77 (RESULT_SKIP).
"""
exit(TestCodes.REASON_INSTALL)
[docs] def remove_samba_warnings(self, input_str):
"""
Removes the Samba Warning/Note from the given input_str.
"""
# ignoring following messages (Bug #37362):
input_str = input_str.replace("WARNING: No path in service IPC$ - making it unavailable!", "")
return input_str.replace("NOTE: Service IPC$ is flagged unavailable.", "").strip()
[docs] def create_and_run_process(self, cmd, stdin=None, std_input=None, shell=False, stdout=PIPE):
"""
Creates a process as a Popen instance with a given 'cmd'
and executes it. When stdin is needed, it can be provided with kwargs.
To write to a file an istance can be provided to stdout.
"""
print("\n create_and_run_process(%r, shell=%r)" % (cmd, shell))
proc = Popen(cmd, stdin=stdin, stdout=stdout, stderr=PIPE, shell=shell, close_fds=True)
stdout, stderr = proc.communicate(std_input if std_input is None else std_input.encode("UTF-8"))
if stderr is not None:
stderr = stderr.decode("UTF-8")
if stderr:
stderr = self.remove_samba_warnings(stderr)
if stdout is not None:
stdout = stdout.decode("UTF-8")
if stdout:
stdout = self.remove_samba_warnings(stdout)
return stdout, stderr
[docs] def start_stop_service(self, service, action):
"""
Starts, stops or restarts the given 'service' depending on the given
'action' is 'start', 'stop', 'restart' respectively.
"""
if action in ("start", "stop", "restart"):
cmd = ("service", service, action)
print("\nExecuting command:", cmd)
stdout, stderr = self.create_and_run_process(cmd)
if stderr:
utils.fail(
"An error occured during %sing the '%s' service: %s" % (action, service, stderr)
)
stdout = stdout.strip()
if not stdout:
utils.fail(
"The %s command did not produce any output to stdout, while a confirmation was "
"expected" % action
)
print(stdout)
else:
print(
"\nUnknown state '%s' is given for the service '%s', accepted 'start' to start it "
"'stop' to stop or 'restart' to restart" % (action, service)
)
[docs] def dc_master_has_samba4(self):
"""
Returns 'True' when Primary Directory Node has Samba4 according to "service=Samba 4"
"""
if not self.ldap_master:
self.ldap_master = self.UCR.get("ldap/master")
if self.ldap_master in self.get_udm_list_dcs("domaincontroller_master", with_samba4=True):
return True
[docs] def is_a_school_branch_site(self, host_dn):
"""
Returns True if the given 'host_dn' is located in the
School branch site.
"""
if schoolldap.SchoolSearchBase.getOU(host_dn):
return True
[docs] def grep_for_key(self, grep_in, key):
"""
Runs grep on given 'grep_in' with a given 'key'. Returns the output.
"""
stdout, stderr = self.create_and_run_process(("grep", key), PIPE, grep_in)
if stderr:
utils.fail(
"An error occured while running a grep with a keyword '%s':\n'%s'" % (key, stderr)
)
return stdout
[docs] def sed_for_key(self, input, key):
"""
Runs sed on given 'input' with a given 'key'. Returns the output.
"""
cmd = ("sed", "-n", "s/%s//p" % (key,))
stdout, stderr = self.create_and_run_process(cmd, PIPE, input)
if stderr:
utils.fail(
"An error occured while running a sed command '%s':\n'%s'" % (" ".join(cmd), stderr)
)
return stdout
[docs] def get_udm_list_dcs(self, dc_type, with_samba4=True, with_ucsschool=False):
"""
Runs the "udm computers/'dc_type' list" and returns the output.
If 'with_samba4' is 'True' returns only those running Samba 4.
"""
if dc_type not in (
"domaincontroller_master",
"domaincontroller_backup",
"domaincontroller_slave",
):
print("\nThe given DC type '%s' is unknown" % dc_type)
self.return_code_result_skip()
cmd = ("udm", "computers/" + dc_type, "list")
if with_samba4:
cmd += ("--filter", "service=Samba 4")
if with_ucsschool:
cmd += ("--filter", "service=UCS@school")
stdout, stderr = self.create_and_run_process(cmd)
if stderr:
utils.fail(
"An error occured while running a '%s' command to find all '%s' in the domain:\n'%s'"
% (" ".join(cmd), dc_type, stderr)
)
return stdout
[docs] def get_udm_list_dc_slaves_with_samba4(self, with_ucsschool=False):
"""
Returns the output of "udm computers/domaincontroller_slave list
--filter service=Samba 4" command.
"""
return self.get_udm_list_dcs("domaincontroller_slave", with_ucsschool=with_ucsschool)
[docs] def select_school_ou(self, schoolname_only=False):
"""
Returns the first found School OU from the list of Replica Directory Nodes in domain.
"""
print("\nSelecting the School OU for the test")
sed_stdout = self.sed_for_key(self.get_udm_list_dc_slaves_with_samba4(), "^DN: ")
ous = [schoolldap.SchoolSearchBase.getOUDN(x) for x in sed_stdout.split()]
ous = [schoolldap.SchoolSearchBase.getOU(ou) if schoolname_only else ou for ou in ous if ou]
print("\nselect_school_ou: SchoolSearchBase found these OUs: %s" % (ous,))
try:
return ous[0]
except IndexError:
print("\nselect_school_ou: split: %s" % (sed_stdout.split(),))
utils.fail(
"Could not find the DN in the udm list output, thus cannot select the School OU to use "
"as a container"
)
[docs] def get_samba_sam_ldb_path(self):
"""
Returns the 'sam.ldb' path using samba conf or defaults.
"""
print("\nObtaining the Samba configuration to determine Samba private path")
smb_conf_path = getenv("SMB_CONF_PATH")
SambaLP = LoadParm()
if smb_conf_path:
SambaLP.load(smb_conf_path)
else:
SambaLP.load_default()
return SambaLP.private_path("sam.ldb")
[docs] def get_ucr_test_credentials(self):
"""
Loads the UCR to get credentials for the test.
"""
account = utils.UCSTestDomainAdminCredentials()
self.admin_username = account.username
self.admin_password = account.bindpw
[docs] def create_umc_connection_authenticate(self):
"""
Creates UMC connection and authenticates to Primary Directory Node with the test
user credentials.
"""
if not self.ldap_master:
self.ldap_master = self.UCR.get("ldap/master")
try:
self.client = Client(self.ldap_master, self.admin_username, self.admin_password)
except (ConnectionError, HTTPError) as exc:
print("An HTTP Error occured while trying to authenticate to UMC: %r" % exc)
print("Waiting 10 seconds and making another attempt")
sleep(10)
self.client.authenticate(self.admin_username, self.admin_password)
[docs] def delete_samba_gpo(self):
"""
Deletes the Group Policy Object using the 'samba-tool gpo del'.
"""
print(
"\nRemoving previously created Group Policy Object (GPO) with a reference: %s"
% self.gpo_reference
)
cmd = (
"samba-tool",
"gpo",
"del",
self.gpo_reference,
"--username=" + self.admin_username,
"--password=" + self.admin_password,
)
stdout, stderr = self.create_and_run_process(cmd)
if stderr:
print("\nExecuting cmd:", cmd)
print("\nAn error message while removing the GPO using 'samba-tool':\n%s" % stderr)
print("\nSamba-tool produced the following output:\n", stdout)