Source code for virttest.lvsb_base

"""
Base classes supporting Libvirt Sandbox (lxc) container testing

:copyright: 2013 Red Hat Inc.
"""

import logging
import signal
import aexpect


[docs]class SandboxException(Exception): """ Basic exception class for problems occurring in SandboxBase or subclasses """ def __init__(self, message): super(SandboxException, self).__init__() self.message = message def __str__(self): return self.message
# This is to allow us to alter back-end session management w/o affecting # sandbox subclasses
[docs]class SandboxSession(object): """ Connection instance to asynchronous I/O redirector process """ # Assist with warning on re-use used = False def __init__(self): self.session = None # createdby new_session @property def connected(self): """ Represents True/False value if background process was created/opened """ if self.session is None: return False else: return True @property def session_id(self): """ Returns unique & persistent identifier for the background process """ if self.connected: return self.session.get_id() else: raise SandboxException("Can't get id of non-running sandbox " "session")
[docs] def new_session(self, command): """ Create and set new opaque session object """ # Allow this to be called more than once w/o consequence self.close_session(warn_if_nonexist=self.used) self.session = aexpect.Expect(command, auto_close=False) self.used = True
[docs] def open_session(self, a_id): """ Restore connection to existing session identified by a_id """ # Allow this to be called more than once w/o consequence self.close_session(warn_if_nonexist=self.used) aexpect.Expect(a_id=a_id) self.used = True
[docs] def close_session(self, warn_if_nonexist=True): """ Finalize assigned opaque session object """ # Allow this to be called more than once w/o consequence if self.connected: self.session.close() else: if warn_if_nonexist: logging.warning("Closing nonexisting sandbox session")
[docs] def kill_session(self, sig=signal.SIGTERM): """ Send a signal to the opaque session object """ if self.connected: self.session.kill(sig=sig) else: raise SandboxException("Can't send signal to inactive sandbox " "session")
[docs] def send(self, a_string): """Send a_string to session""" if self.connected: self.session.send(a_string) else: raise SandboxException("Can't send to an inactive sandbox session")
[docs] def recv(self): """Return combined stdout/stderr output received so far""" if self.connected: return self.session.get_output() else: raise SandboxException("Can't get output from finalized sandbox " "session")
[docs] def recvout(self): """Return just stdout output""" # FIXME: aexpect combines stdout and stderr in a single pipe :( raise NotImplementedError
[docs] def recverr(self): """Return just stderr output""" # FIXME: aexpect combines stdout and stderr in a single pipe :( raise NotImplementedError
[docs] def exit_code(self): """Block, and return exit code from session""" if self.connected: return self.session.get_status() else: raise SandboxException("Can't get exit code from finalized sandbox " "session")
[docs] def is_running(self): """Return True if exit_code() would block""" if self.connected: return self.session.is_alive() else: return None
[docs] def auto_clean(self, boolean): """Make session cleanup on GC if True""" if self.connected: self.session.auto_close = boolean else: raise SandboxException("Can't set auto_clean on disconnected " "sandbox session")
[docs]class SandboxBase(object): """ Base operations for sandboxed command """ # Provide unique instance number for each sandbox instances = None def __init__(self, params): """ Create a new sandbox interface instance based on this type from params """ # Un-pickling instances doesn't call init again if self.__class__.instances is None: self.__class__.instances = 1 else: self.__class__.instances += 1 # store a copy for use to avoid referencing class attribute self.identifier = self.__class__.instances # Allow global 'lvsb_*' keys to be overridden for specific subclass self.params = params.object_params(self.__class__.__name__) self.options = None # opaque value consumed by make_command() # Aexpect has some well hidden bugs, private attribute hides # interface in case it changes from fixes or gets swapped out # entirely. self._session = SandboxSession() # Allow running sandboxes to persist across multiple tests if needed def __getstate__(self): """Serialize instance for pickling""" # Regular dictionary format for now, but could change later state = {'params': self.params, 'identifier': self.identifier, 'options': self.options} # Critical info. to re-connect to session when un-pickle if self._session.connected: state['session_id'] = self._session.session_id return state def __setstate__(self, state): """Actualize instance from state""" for key in ('identifier', 'params', 'options'): setattr(self, key, state[key]) if state.haskey('session_id'): self._session = SandboxSession() self._session.open_session(state['session_id'])
[docs] def run(self, extra=None): """ Launch new sandbox as asynchronous background sandbox process :param extra: String of extra command-line to use but not store """ sandbox_cmdline = self.make_sandbox_command_line(extra) logging.debug("Launching %s", sandbox_cmdline) self._session.new_session(sandbox_cmdline)
[docs] def stop(self): """Destroy but don't finalize asynchronous background sandbox process""" self._session.kill_session()
[docs] def fini(self): """ Finalize asynchronous background sandbox process (destroys state!) """ self._session.close_session()
[docs] def send(self, data): """Send data to asynchronous background sandbox process""" self._session.send(data)
[docs] def recv(self): """ Return stdout and stderr from asynchronous background sandbox process """ return self._session.recv()
[docs] def recvout(self): """ Return only stdout from asynchronous background sandbox process """ return self._session.recvout()
[docs] def recverr(self): """ return only stderr from asynchronous background sandbox process """ return self._session.recverr()
[docs] def running(self): """ Return True/False if asynchronous background sandbox process executing """ return self._session.is_running()
[docs] def exit_code(self): """ Block until asynchronous background sandbox process ends, returning code """ return self._session.exit_code()
[docs] def auto_clean(self, boolean): """ Change behavior of asynchronous background sandbox process on __del__ """ self._session.auto_clean(boolean)
[docs] def make_sandbox_command_line(self, extra=None): """ Return the fully formed command-line for the sandbox using self.options """ # These are the abstract methods subclasses must override raise NotImplementedError
[docs]class SandboxCommandBase(SandboxBase): """ Connection to a single new or existing sandboxed command """ BINARY_PATH_PARAM = 'virt_sandbox_binary' # Cache generated name first time it is requested _name = None def __init__(self, params, name=None): """ Initialize sandbox-command with params and name, autogenerate if None """ if name is not None: self._name = name super(SandboxCommandBase, self).__init__(params) def __getstate__(self): """Serialize instance for pickling""" state = super(SandboxCommandBase, self).__getstate__() state['name'] = self._name return state def __setstate__(self, state): """Actualize instance from state""" self._name = state.pop('name') super(SandboxCommandBase, self).__setstate__(state) def __get_name__(self): """ Represent a unique sandbox name generated from class and identifier """ # Use shortest possible unique names for instances to be easier # to track and make name-comparison fast when there are 10000's # of sandboxes. Only use upper-case letters from class name along # with instance identifier attribute. if self._name is None: class_name = self.__class__.__name__ class_initials = class_name.translate(None, 'abcdefghijklmnopqrstuvwxyz') self._name = "%s_%d" % (class_initials, self.identifier) return self._name @staticmethod def __set_name__(value): del value # not used raise SandboxException("Name is read-only") @staticmethod def __del_name__(): raise SandboxException("Name is read-only") name = property(__get_name__, __set_name__, __del_name__) @staticmethod
[docs] def flaten_options(options): """ Convert a list of tuples into space-seperated options+argument string """ result_list = [] for option, argument in options: # positional argument if option is None: if argument is not None: result_list.append(argument) # both empty, ignore else: # option is not None # --flag if argument is None: result_list.append(option) else: # argument is not None # --option argument or -o argument result_list.append("%s %s" % (option, argument)) if len(result_list) > 0: return " " + " ".join(result_list) else: # they were all (None, None) return ""
[docs] def make_sandbox_command_line(self, extra=None): """Return entire command-line string needed to start sandbox""" command = self.params[self.BINARY_PATH_PARAM] # mandatory param if self.options is not None: command += self.flaten_options(self.options) if extra is not None: command += ' ' + extra return command
[docs] def add_optarg(self, option, argument): """ Add an option with an argument into the list of command line options """ if self.options is None: self.options = [] self.options.append((option, argument))
[docs] def add_flag(self, option): """ Add a flag into the list of command line options """ # Tuple encoding required for flaten_options() self.add_optarg(option, None)
[docs] def add_pos(self, argument): """ Add a positional option into the list of command line options """ # Tuple encoding required for flaten_options() self.add_optarg(None, argument)
[docs] def add_mm(self): """ Append a -- to the end of the current option list """ self.add_pos('--')
[docs] def list_long_options(self): """ Return a list of all long options with an argument """ return [opt for opt, arg in self.options if opt.startswith('--') and arg is not None]
[docs] def list_short_options(self): """ Return a list of all short options with an argument """ result = [] for opt, arg in self.options: if arg is None: continue # flag or positional if len(opt) > 1 and opt[0] == '-' and opt[1] != '-': result.append(opt)
[docs] def list_flags(self): """ Return a list of all flags (options without arguments) """ return [opt for opt, arg in self.options if opt.startswith('--') and arg is None]
[docs] def list_pos(self): """ Return a list of all positional arguments """ return [arg for opt, arg in self.options if opt is None]
# Instances are similar to a list-of-lists- multiple kinds (classes) of # multiple sandobx executions.
[docs]class TestSandboxes(object): """ Aggregate manager class of SandboxCommandBase or subclass instances """ # The class of each sandbox instance to operate on SANDBOX_TYPE = SandboxCommandBase def __init__(self, params, env): """ Create instance(s) of sandbox from a command """ # public attribute for access to each sandbox execution self.sandboxes = [] # Each sandbox type will object_params() itself self.params = params # In case a subclass wants to interface with tests before/after self.env = env # Parse out aggregate manager class-specific params pop = self.params.object_params(self.__class__.__name__) # Allows iterating over all sandboxes e.g. with for_each() self.count = int(pop.get('lvsb_count', '1')) # Simple-case is all sandboxes on the local host self.uri = pop.get('lvsb_uri', 'lxc:///') # The command to run inside the sandbox self.command = pop.get('lvsb_command') # Allows iterating for the options self.opts_count = int(pop.get('lvsb_opts_count', '1')) # FIXME: should automatically generate this self.lvsb_option_mapper = {'optarg': {'connect': '-c', 'name': '-n', 'mount': '-m', 'include': '-i', 'includefile': '-I', 'network': '-N', 'security': '-s'}, 'flag': {'help': '-h', 'version': '-V', 'debug': '-d', 'privileged': '-p', 'shell': '-l'}} # The list to save options self.opts = [] self.flag = [] for k in self.lvsb_option_mapper.keys(): # k may be 'optarg' or 'flag' for key, value in self.lvsb_option_mapper[k].items(): base_name = 'lvsb_%s_options' % key for key_gen, option in params.object_counts('lvsb_opts_count', base_name): # k is 'optarg' if option and value: self.opts.append((value, option)) # k is 'flag' if params.has_key(key_gen) and not option: self.flag.append(value) logging.debug("All of options(%s) and flags(%s)", self.opts, self.flag)
[docs] def init_sandboxes(self): """ Create self.count Sandbox instances """ # self.sandboxes probably empty, can't use for_each() for index in xrange(0, self.count): del index # Keep pylint happy self.sandboxes.append(self.SANDBOX_TYPE(self.params))
[docs] def for_each(self, do_something, *args, **dargs): """ Iterate over all sandboxes, calling do_something on each :param do_sometihng: Called with the item and ``*args``, ``**dargs`` """ # Simplify making the same call to every running sandbox return [do_something(sandbox, *args, **dargs) for sandbox in self.sandboxes]
[docs] def are_running(self): """ Return the number of sandbox processes still running """ running = 0 for is_running in self.for_each(lambda sb: sb.running()): if is_running: running += 1 return running
[docs] def are_failed(self): """ Return the number of sandbox processes with non-zero exit codes """ # Warning, this will block if self.are_running() > 0 failed = 0 for exit_code in self.for_each(lambda sb: sb.exit_code()): if exit_code != 0: failed += 1 return failed