Source code for univention.testing.ucsschool.test_samba4

from __future__ import print_function

from os import getenv
from subprocess import PIPE, Popen
from sys import exit
from time import sleep

from samba.param import LoadParm

from ucsschool.lib import schoolldap
from univention.config_registry import ConfigRegistry
from univention.lib.umc import ConnectionError, HTTPError
from univention.testing import utils
from univention.testing.codes import TestCodes
from univention.testing.umc import Client


[docs]class TestSamba4(object): def __init__(self): """ Test class constructor """ self.UCR = ConfigRegistry() self.UCR.load() self.client = None self.admin_username = "" self.admin_password = "" self.ldap_master = "" self.gpo_reference = ""
[docs] def return_code_result_skip(self): """ Stops the test returning the code 77 (RESULT_SKIP). """ exit(TestCodes.REASON_INSTALL)
[docs] def remove_samba_warnings(self, input_str): """ Removes the Samba Warning/Note from the given input_str. """ # ignoring following messages (Bug #37362): input_str = input_str.replace("WARNING: No path in service IPC$ - making it unavailable!", "") return input_str.replace("NOTE: Service IPC$ is flagged unavailable.", "").strip()
[docs] def create_and_run_process(self, cmd, stdin=None, std_input=None, shell=False, stdout=PIPE): """ Creates a process as a Popen instance with a given 'cmd' and executes it. When stdin is needed, it can be provided with kwargs. To write to a file an istance can be provided to stdout. """ print("\n create_and_run_process(%r, shell=%r)" % (cmd, shell)) proc = Popen(cmd, stdin=stdin, stdout=stdout, stderr=PIPE, shell=shell, close_fds=True) stdout, stderr = proc.communicate(std_input if std_input is None else std_input.encode("UTF-8")) if stderr is not None: stderr = stderr.decode("UTF-8") if stderr: stderr = self.remove_samba_warnings(stderr) if stdout is not None: stdout = stdout.decode("UTF-8") if stdout: stdout = self.remove_samba_warnings(stdout) return stdout, stderr
[docs] def start_stop_service(self, service, action): """ Starts, stops or restarts the given 'service' depending on the given 'action' is 'start', 'stop', 'restart' respectively. """ if action in ("start", "stop", "restart"): cmd = ("service", service, action) print("\nExecuting command:", cmd) stdout, stderr = self.create_and_run_process(cmd) if stderr: utils.fail( "An error occured during %sing the '%s' service: %s" % (action, service, stderr) ) stdout = stdout.strip() if not stdout: utils.fail( "The %s command did not produce any output to stdout, while a confirmation was " "expected" % action ) print(stdout) else: print( "\nUnknown state '%s' is given for the service '%s', accepted 'start' to start it " "'stop' to stop or 'restart' to restart" % (action, service) )
[docs] def dc_master_has_samba4(self): """ Returns 'True' when Primary Directory Node has Samba4 according to "service=Samba 4" """ if not self.ldap_master: self.ldap_master = self.UCR.get("ldap/master") if self.ldap_master in self.get_udm_list_dcs("domaincontroller_master", with_samba4=True): return True
[docs] def is_a_school_branch_site(self, host_dn): """ Returns True if the given 'host_dn' is located in the School branch site. """ if schoolldap.SchoolSearchBase.getOU(host_dn): return True
[docs] def grep_for_key(self, grep_in, key): """ Runs grep on given 'grep_in' with a given 'key'. Returns the output. """ stdout, stderr = self.create_and_run_process(("grep", key), PIPE, grep_in) if stderr: utils.fail( "An error occured while running a grep with a keyword '%s':\n'%s'" % (key, stderr) ) return stdout
[docs] def sed_for_key(self, input, key): """ Runs sed on given 'input' with a given 'key'. Returns the output. """ cmd = ("sed", "-n", "s/%s//p" % (key,)) stdout, stderr = self.create_and_run_process(cmd, PIPE, input) if stderr: utils.fail( "An error occured while running a sed command '%s':\n'%s'" % (" ".join(cmd), stderr) ) return stdout
[docs] def get_udm_list_dcs(self, dc_type, with_samba4=True, with_ucsschool=False): """ Runs the "udm computers/'dc_type' list" and returns the output. If 'with_samba4' is 'True' returns only those running Samba 4. """ if dc_type not in ( "domaincontroller_master", "domaincontroller_backup", "domaincontroller_slave", ): print("\nThe given DC type '%s' is unknown" % dc_type) self.return_code_result_skip() cmd = ("udm", "computers/" + dc_type, "list") if with_samba4: cmd += ("--filter", "service=Samba 4") if with_ucsschool: cmd += ("--filter", "service=UCS@school") stdout, stderr = self.create_and_run_process(cmd) if stderr: utils.fail( "An error occured while running a '%s' command to find all '%s' in the domain:\n'%s'" % (" ".join(cmd), dc_type, stderr) ) return stdout
[docs] def get_udm_list_dc_slaves_with_samba4(self, with_ucsschool=False): """ Returns the output of "udm computers/domaincontroller_slave list --filter service=Samba 4" command. """ return self.get_udm_list_dcs("domaincontroller_slave", with_ucsschool=with_ucsschool)
[docs] def select_school_ou(self, schoolname_only=False): """ Returns the first found School OU from the list of Replica Directory Nodes in domain. """ print("\nSelecting the School OU for the test") sed_stdout = self.sed_for_key(self.get_udm_list_dc_slaves_with_samba4(), "^DN: ") ous = [schoolldap.SchoolSearchBase.getOUDN(x) for x in sed_stdout.split()] ous = [schoolldap.SchoolSearchBase.getOU(ou) if schoolname_only else ou for ou in ous if ou] print("\nselect_school_ou: SchoolSearchBase found these OUs: %s" % (ous,)) try: return ous[0] except IndexError: print("\nselect_school_ou: split: %s" % (sed_stdout.split(),)) utils.fail( "Could not find the DN in the udm list output, thus cannot select the School OU to use " "as a container" )
[docs] def get_samba_sam_ldb_path(self): """ Returns the 'sam.ldb' path using samba conf or defaults. """ print("\nObtaining the Samba configuration to determine Samba private path") smb_conf_path = getenv("SMB_CONF_PATH") SambaLP = LoadParm() if smb_conf_path: SambaLP.load(smb_conf_path) else: SambaLP.load_default() return SambaLP.private_path("sam.ldb")
[docs] def get_ucr_test_credentials(self): """ Loads the UCR to get credentials for the test. """ account = utils.UCSTestDomainAdminCredentials() self.admin_username = account.username self.admin_password = account.bindpw
[docs] def create_umc_connection_authenticate(self): """ Creates UMC connection and authenticates to Primary Directory Node with the test user credentials. """ if not self.ldap_master: self.ldap_master = self.UCR.get("ldap/master") try: self.client = Client(self.ldap_master, self.admin_username, self.admin_password) except (ConnectionError, HTTPError) as exc: print("An HTTP Error occured while trying to authenticate to UMC: %r" % exc) print("Waiting 10 seconds and making another attempt") sleep(10) self.client.authenticate(self.admin_username, self.admin_password)
[docs] def delete_samba_gpo(self): """ Deletes the Group Policy Object using the 'samba-tool gpo del'. """ print( "\nRemoving previously created Group Policy Object (GPO) with a reference: %s" % self.gpo_reference ) cmd = ( "samba-tool", "gpo", "del", self.gpo_reference, "--username=" + self.admin_username, "--password=" + self.admin_password, ) stdout, stderr = self.create_and_run_process(cmd) if stderr: print("\nExecuting cmd:", cmd) print("\nAn error message while removing the GPO using 'samba-tool':\n%s" % stderr) print("\nSamba-tool produced the following output:\n", stdout)