Source code for virttest.utils_sasl

"""
tools to manage sasl.
"""

import logging
import aexpect
import virsh
from autotest.client import utils, os_dep
from autotest.client.shared import error
from virttest import propcan, remote


[docs]class SASL(propcan.PropCanBase): """ Base class of a connection between server and client. """ __slots__ = ("sasl_pwd_cmd", "sasl_user_pwd", "sasl_user_cmd", "auto_recover", "linesep", "prompt", "session", "server_ip", "server_user", "server_pwd", "client", "port") def __init__(self, *args, **dargs): """ Initialize instance """ init_dict = dict(*args, **dargs) init_dict["sasl_pwd_cmd"] = os_dep.command("saslpasswd2") init_dict["sasl_user_cmd"] = os_dep.command("sasldblistusers2") init_dict["sasl_user_pwd"] = init_dict.get("sasl_user_pwd") init_dict["auto_recover"] = init_dict.get("auto_recover", False) init_dict["client"] = init_dict.get("client", "ssh") init_dict["port"] = init_dict.get("port", "22") init_dict["linesep"] = init_dict.get("linesep", "\n") init_dict["prompt"] = init_dict.get("prompt", r"[\#\$]\s*$") self.__dict_set__('session', None) super(SASL, self).__init__(init_dict) def __del__(self): """ Close opened session and clear test environment """ self.close_session() if self.auto_recover: try: self.cleanup() except: raise error.TestError("Failed to clean up test environment!") def _new_session(self): """ Build a new server session. """ port = self.port prompt = self.prompt host = self.server_ip client = self.client username = self.server_user password = self.server_pwd try: session = remote.wait_for_login(client, host, port, username, password, prompt) except remote.LoginTimeoutError: raise error.TestError("Got a timeout error when login to server.") except remote.LoginAuthenticationError: raise error.TestError("Authentication failed to login to server.") except remote.LoginProcessTerminatedError: raise error.TestError("Host terminates during login to server.") except remote.LoginError: raise error.TestError("Some error occurs login to client server.") return session
[docs] def get_session(self): """ Make sure the session is alive and available """ session = self.__dict_get__('session') if (session is not None) and (session.is_alive()): return session else: session = self._new_session() self.__dict_set__('session', session) return session
[docs] def close_session(self): """ If session exists then close it """ if self.session: self.session.close()
[docs] def list_users(self, remote=True, sasldb_path="/etc/libvirt/passwd.db"): """ List users in sasldb """ cmd = "%s -f %s" % (self.sasl_user_cmd, sasldb_path) try: if remote: self.session = self.get_session() return self.session.cmd_output(cmd) else: return utils.system_output(cmd) except error.CmdError: logging.error("Failed to set a user's sasl password %s", cmd)
[docs] def setup(self, remote=True): """ Create sasl users with password """ for sasl_user, sasl_pwd in eval(self.sasl_user_pwd): cmd = "echo %s |%s -p -a libvirt %s" % (sasl_pwd, self.sasl_pwd_cmd, sasl_user) try: if remote: self.session = self.get_session() self.session.cmd(cmd) else: utils.system(cmd) except error.CmdError: logging.error("Failed to set a user's sasl password %s", cmd)
[docs] def cleanup(self, remote=True): """ Clear created sasl users """ for sasl_user, sasl_pwd in eval(self.sasl_user_pwd): cmd = "%s -a libvirt -d %s" % (self.sasl_pwd_cmd, sasl_user) try: if remote: self.session = self.get_session() self.session.cmd(cmd) else: utils.system(cmd) except error.CmdError: logging.error("Failed to disable a user's access %s", cmd)
[docs]class VirshSessionSASL(virsh.VirshSession): """ A wrap class for virsh session which used SASL infrastructure. """ def __init__(self, params): self.virsh_exec = virsh.VIRSH_EXEC self.sasl_user = params.get('sasl_user') self.sasl_pwd = params.get('sasl_pwd') self.remote_ip = params.get('remote_ip') self.remote_user = params.get('remote_user') self.remote_pwd = params.get('remote_pwd') self.remote_auth = False if self.remote_ip: self.remote_auth = True super(VirshSessionSASL, self).__init__(virsh_exec=self.virsh_exec, remote_ip=self.remote_ip, remote_user=self.remote_user, remote_pwd=self.remote_pwd, ssh_remote_auth=self.remote_auth, auto_close=True, check_libvirtd=False) self.sendline('connect') self.sendline(self.sasl_user) self.sendline(self.sasl_pwd) # make sure session is connected successfully if self.cmd_status('list', timeout=60) != 0: logging.debug("Persistent virsh session is not responding, " "libvirtd may be dead.") raise aexpect.ShellStatusError(virsh.VIRSH_EXEC, 'list')