"""
Autotest representation of qemu devices.
These classes implements various features in order to simulate, verify or
interact with qemu qdev structure.
:copyright: 2012-2013 Red Hat Inc.
"""
# Python imports
import logging
import re
# Autotest imports
from utils import DeviceError
from virttest import qemu_monitor
from virttest import utils_misc
import qbuses
import traceback
try:
# pylint: disable=E0611
from collections import OrderedDict
except ImportError:
from virttest.staging.backports.collections import OrderedDict
def _convert_args(arg_dict):
"""
Convert monitor command arguments dict into humanmonitor string.
:param arg_dict: The dict of monitor command arguments.
:return: A string in humanmonitor's 'key=value' format, or a empty
'' when the dict is empty.
"""
return ",".join("%s=%s" % (key, val) for key, val in arg_dict.iteritems())
def _build_cmd(cmd, args=None, q_id=None):
"""
Format QMP command from cmd and args
:param cmd: Command ('device_add', ...)
:param q_id: queue id; True = generate random, None = None, str = use str
"""
obj = {"execute": cmd}
if args is not None:
obj["arguments"] = args
if q_id is True:
obj["id"] = utils_misc.generate_random_string(8)
elif q_id is not None:
obj["id"] = q_id
return obj
#
# Device objects
#
[docs]class QBaseDevice(object):
""" Base class of qemu objects """
def __init__(self, dev_type="QBaseDevice", params=None, aobject=None,
parent_bus=None, child_bus=None):
"""
:param dev_type: type of this component
:param params: component's parameters
:param aobject: Autotest object which is associated with this device
:param parent_bus: list of dicts specifying the parent bus
:param child_bus: list of buses, which this device provides
"""
self.aid = None # unique per VM id
self.type = dev_type # device type
self.aobject = aobject # related autotest object
if parent_bus is None:
parent_bus = tuple()
self.parent_bus = parent_bus # list of buses into which this dev fits
self.child_bus = [] # list of buses which this dev provides
if child_bus is None:
child_bus = []
elif not isinstance(child_bus, (list, tuple)):
self.add_child_bus(child_bus)
else:
for bus in child_bus:
self.add_child_bus(bus)
self.dynamic_params = []
self.params = OrderedDict() # various device params (id, name, ...)
if params:
for key, value in params.iteritems():
self.set_param(key, value)
[docs] def add_child_bus(self, bus):
"""
Add child bus
:param bus: Bus, which this device contains
:type bus: QSparseBus-like
"""
self.child_bus.append(bus)
bus.set_device(self)
[docs] def rm_child_bus(self, bus):
"""
removes child bus
:param bus: Bus, which this device contains
:type bus: QSparseBus-like
"""
self.child_bus.remove(bus)
bus.set_device(None)
[docs] def set_param(self, option, value, option_type=None, dynamic=False):
"""
Set device param using qemu notation ("on", "off" instead of bool...)
:param option: which option's value to set
:param value: new value
:param option_type: type of the option (bool)
:param dynamic: if true value is changed to DYN for not_dynamic compare
"""
if dynamic:
if option not in self.dynamic_params:
self.dynamic_params.append(option)
else:
if option in self.dynamic_params:
self.dynamic_params.remove(option)
if option_type is bool or isinstance(value, bool):
if value in ['yes', 'on', True]:
self.params[option] = "on"
elif value in ['no', 'off', False]:
self.params[option] = "off"
elif value or value == 0:
if value == "EMPTY_STRING":
self.params[option] = '""'
else:
self.params[option] = value
elif value is None and option in self.params:
del(self.params[option])
if option in self.dynamic_params:
self.dynamic_params.remove(option)
[docs] def get_param(self, option, default=None):
""" :return: object param """
return self.params.get(option, default)
def __getitem__(self, option):
""" :return: object param """
return self.params[option]
def __delitem__(self, option):
""" deletes self.params[option] """
del(self.params[option])
def __len__(self):
""" length of self.params """
return len(self.params)
def __setitem__(self, option, value):
""" self.set_param(option, value, None) """
return self.set_param(option, value)
def __contains__(self, option):
""" Is the option set? """
return option in self.params
def __str__(self):
""" :return: Short string representation of this object. """
return self.str_short()
def __eq__(self, dev2, dynamic=True):
""" :return: True when devs are similar, False when different. """
check_attrs = ['cmdline_nd', 'hotplug_hmp_nd', 'hotplug_qmp_nd']
try:
for check_attr in check_attrs:
try:
_ = getattr(self, check_attr)()
except (DeviceError, NotImplementedError, AttributeError):
try:
getattr(dev2, check_attr)()
except (DeviceError, NotImplementedError, AttributeError):
pass
else:
if _ != getattr(dev2, check_attr)():
return False
except Exception:
logging.error(traceback.format_exc())
return False
return True
def __ne__(self, dev2):
""" :return: True when devs are different, False when similar. """
return not self.__eq__(dev2)
[docs] def str_short(self):
""" Short representation (aid, qid, alternative, type) """
if self.get_qid(): # Show aid only when it's based on qid
if self.get_aid():
return self.get_aid()
else:
return "q'%s'" % self.get_qid()
elif self._get_alternative_name():
return "a'%s'" % self._get_alternative_name()
else:
return "t'%s'" % self.type
[docs] def str_long(self):
""" Full representation, multi-line with all params """
out = """%s
aid = %s
aobject = %s
parent_bus = %s
child_bus = %s
params:""" % (self.type, self.aid, self.aobject, self.parent_bus,
self.child_bus)
for key, value in self.params.iteritems():
out += "\n %s = %s" % (key, value)
return out + '\n'
def _get_alternative_name(self):
""" :return: alternative object name """
return None
[docs] def get_qid(self):
""" :return: qemu_id """
return self.params.get('id', '')
[docs] def get_aid(self):
""" :return: per VM unique autotest_id """
return self.aid
[docs] def set_aid(self, aid):
""":param aid: new autotest id for this device"""
self.aid = aid
[docs] def get_children(self):
""" :return: List of all children (recursive) """
children = []
for bus in self.child_bus:
children.extend(bus)
return children
[docs] def cmdline(self):
""" :return: cmdline command to define this device """
raise NotImplementedError
[docs] def cmdline_nd(self):
"""
Command line without dynamic params.
:return: cmdline command to define this device
without dynamic parameters
"""
self.cmdline()
# pylint: disable=E0202
[docs] def hotplug(self, monitor):
""" :return: the output of monitor.cmd() hotplug command """
if isinstance(monitor, qemu_monitor.QMPMonitor):
try:
cmd, args = self.hotplug_qmp()
return monitor.cmd(cmd, args)
except DeviceError: # qmp command not supported
return monitor.human_monitor_cmd(self.hotplug_hmp())
elif isinstance(monitor, qemu_monitor.HumanMonitor):
return monitor.cmd(self.hotplug_hmp())
else:
raise TypeError("Invalid monitor object: %s(%s)" % (monitor,
type(monitor)))
[docs] def hotplug_hmp(self):
""" :return: the hotplug monitor command """
raise DeviceError("Hotplug is not supported by this device %s", self)
[docs] def hotplug_qmp(self):
""" :return: tuple(hotplug qemu command, arguments)"""
raise DeviceError("Hotplug is not supported by this device %s", self)
[docs] def unplug_hook(self):
""" Modification prior to unplug can be made here """
pass
[docs] def unplug_unhook(self):
""" Roll back the modification made before unplug """
pass
[docs] def unplug(self, monitor):
""" :return: the output of monitor.cmd() unplug command """
if isinstance(monitor, qemu_monitor.QMPMonitor):
try:
cmd, args = self.unplug_qmp()
return monitor.cmd(cmd, args)
except DeviceError: # qmp command not supported
return monitor.human_monitor_cmd(self.unplug_hmp())
elif isinstance(monitor, qemu_monitor.HumanMonitor):
return monitor.cmd(self.unplug_hmp())
else:
raise TypeError("Invalid monitor object: %s(%s)" % (monitor,
type(monitor)))
[docs] def unplug_hmp(self):
""" :return: the unplug monitor command """
raise DeviceError("Unplug is not supported by this device %s", self)
[docs] def unplug_qmp(self):
""" :return: tuple(unplug qemu command, arguments)"""
raise DeviceError("Unplug is not supported by this device %s", self)
[docs] def verify_hotplug(self, out, monitor):
"""
:param out: Output of the hotplug command
:param monitor: Monitor used for hotplug
:return: True when successful, False when unsuccessful, string/None
when can't decide.
"""
return out
[docs] def verify_unplug(self, out, monitor): # pylint: disable=W0613,R0201
"""
:param out: Output of the unplug command
:param monitor: Monitor used for unplug
"""
return out
[docs]class QStringDevice(QBaseDevice):
"""
General device which allows to specify methods by fixed or parametrizable
strings in this format:
::
"%(type)s,id=%(id)s,addr=%(addr)s"
``params`` will be used to subst ``%()s``
"""
def __init__(self, dev_type="dummy", params=None, aobject=None,
parent_bus=None, child_bus=None, cmdline="", cmdline_nd=None):
"""
:param dev_type: type of this component
:param params: component's parameters
:param aobject: Autotest object which is associated with this device
:param parent_bus: bus(es), in which this device is plugged in
:param child_bus: bus, which this device provides
:param cmdline: cmdline string
"""
super(QStringDevice, self).__init__(dev_type, params, aobject,
parent_bus, child_bus)
self._cmdline = cmdline
self._cmdline_nd = cmdline_nd
if cmdline_nd is None:
self._cmdline_nd = cmdline
[docs] def cmdline(self):
""" :return: cmdline command to define this device """
try:
if self._cmdline:
return self._cmdline % self.params
except KeyError, details:
raise KeyError("Param %s required for cmdline is not present in %s"
% (details, self.str_long()))
[docs] def cmdline_nd(self):
"""
Command line without dynamic parameters.
:return: cmdline command to define this device without dynamic parameters.
"""
try:
if self._cmdline_nd:
return self._cmdline_nd % self.params
except KeyError, details:
raise KeyError("Param %s required for cmdline is not present in %s"
% (details, self.str_long()))
[docs]class QCustomDevice(QBaseDevice):
"""
Representation of the '-$option $param1=$value1,$param2...' qemu object.
This representation handles only cmdline.
"""
def __init__(self, dev_type, params=None, aobject=None,
parent_bus=None, child_bus=None, backend=None):
"""
:param dev_type: The desired -$option parameter (device, chardev, ..)
"""
super(QCustomDevice, self).__init__(dev_type, params, aobject,
parent_bus, child_bus)
if backend:
self.__backend = backend
else:
self.__backend = None
[docs] def cmdline(self):
""" :return: cmdline command to define this device """
if self.__backend and self.params.get(self.__backend):
out = "-%s %s," % (self.type, self.params.get(self.__backend))
params = self.params.copy()
del params[self.__backend]
else:
out = "-%s " % self.type
params = self.params
for key, value in params.iteritems():
if value != "NO_EQUAL_STRING":
out += "%s=%s," % (key, value)
else:
out += "%s," % key
if out[-1] == ',':
out = out[:-1]
return out
[docs] def cmdline_nd(self):
"""
Command line without dynamic parameters.
:return: cmdline command to define this device without dynamic parameters.
"""
if self.__backend and self.params.get(self.__backend):
out = "-%s %s," % (self.type, self.params.get(self.__backend))
params = self.params.copy()
del params[self.__backend]
else:
out = "-%s " % self.type
params = self.params
for key, value in params.iteritems():
if value != "NO_EQUAL_STRING":
if key in self.dynamic_params:
out += "%s=DYN," % (key,)
else:
out += "%s=%s," % (key, value)
else:
out += "%s," % key
if out[-1] == ',':
out = out[:-1]
return out
[docs]class QDrive(QCustomDevice):
"""
Representation of the '-drive' qemu object without hotplug support.
"""
def __init__(self, aobject, use_device=True):
child_bus = qbuses.QDriveBus('drive_%s' % aobject, aobject)
super(QDrive, self).__init__("drive", {}, aobject, (),
child_bus)
if use_device:
self.params['id'] = 'drive_%s' % aobject
[docs] def set_param(self, option, value, option_type=None):
"""
Set device param using qemu notation ("on", "off" instead of bool...)
It restricts setting of the 'id' param as it's automatically created.
:param option: which option's value to set
:param value: new value
:param option_type: type of the option (bool)
"""
if option == 'id':
raise KeyError("Drive ID is automatically created from aobject. %s"
% self)
elif option == 'bus':
# Workaround inconsistency between -drive and -device
value = re.findall(r'(\d+)', value)
if value is not None:
value = value[0]
super(QDrive, self).set_param(option, value, option_type)
[docs]class QOldDrive(QDrive):
"""
This is a variant for -drive without 'addr' support
"""
[docs] def set_param(self, option, value, option_type=None):
"""
Ignore addr parameters as they are not supported by old qemus
"""
if option == 'addr':
logging.warn("Ignoring 'addr=%s' parameter of %s due of old qemu"
", PCI addresses might be messed up.", value,
self.str_short())
return
return super(QOldDrive, self).set_param(option, value, option_type)
[docs]class QHPDrive(QDrive):
"""
Representation of the '-drive' qemu object with hotplug support.
"""
def __init__(self, aobject):
super(QHPDrive, self).__init__(aobject)
self.__hook_drive_bus = None
[docs] def verify_hotplug(self, out, monitor):
if isinstance(monitor, qemu_monitor.QMPMonitor):
if out.startswith('OK'):
return True
else:
if out == 'OK':
return True
return False
[docs] def verify_unplug(self, out, monitor):
out = monitor.info("qtree", debug=False)
if "unknown command" in out: # Old qemu don't have info qtree
return True
dev_id_name = 'id "%s"' % self.aid
if dev_id_name in out:
return False
else:
return True
[docs] def get_children(self):
""" Device bus should be removed too """
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
drive_bus = bus
self.rm_child_bus(bus)
break
devices = super(QHPDrive, self).get_children()
self.add_child_bus(drive_bus)
return devices
[docs] def unplug_hook(self):
"""
Devices from this bus are not removed, only 'drive' is set to None.
"""
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
for dev in bus:
self.__hook_drive_bus = dev.get_param('drive')
dev['drive'] = None
break
[docs] def unplug_unhook(self):
""" Set back the previous 'drive' (unsafe, using the last value) """
if self.__hook_drive_bus is not None:
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
for dev in bus:
dev['drive'] = self.__hook_drive_bus
break
[docs] def hotplug_hmp(self):
""" :return: the hotplug monitor command """
args = self.params.copy()
pci_addr = args.pop('addr', 'auto')
args = _convert_args(args)
return "drive_add %s %s" % (pci_addr, args)
[docs] def unplug_hmp(self):
""" :return: the unplug monitor command """
if self.get_qid() is None:
raise DeviceError("qid not set; device %s can't be unplugged"
% self)
return "drive_del %s" % self.get_qid()
[docs]class QRHDrive(QDrive):
"""
Representation of the '-drive' qemu object with RedHat hotplug support.
"""
def __init__(self, aobject):
super(QRHDrive, self).__init__(aobject)
self.__hook_drive_bus = None
[docs] def hotplug_hmp(self):
""" :return: the hotplug monitor command """
args = self.params.copy()
args.pop('addr', None) # not supported by RHDrive
args.pop('if', None)
args = _convert_args(args)
return "__com.redhat_drive_add %s" % args
[docs] def hotplug_qmp(self):
""" :return: the hotplug monitor command """
args = self.params.copy()
args.pop('addr', None) # not supported by RHDrive
args.pop('if', None)
return "__com.redhat_drive_add", args
[docs] def get_children(self):
""" Device bus should be removed too """
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
drive_bus = bus
self.rm_child_bus(bus)
break
devices = super(QRHDrive, self).get_children()
self.add_child_bus(drive_bus)
return devices
[docs] def unplug_hook(self):
"""
Devices from this bus are not removed, only 'drive' is set to None.
"""
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
for dev in bus:
self.__hook_drive_bus = dev.get_param('drive')
dev['drive'] = None
break
[docs] def unplug_unhook(self):
""" Set back the previous 'drive' (unsafe, using the last value) """
if self.__hook_drive_bus is not None:
for bus in self.child_bus:
if isinstance(bus, qbuses.QDriveBus):
for dev in bus:
dev['drive'] = self.__hook_drive_bus
break
[docs] def unplug_hmp(self):
""" :return: the unplug monitor command """
if self.get_qid() is None:
raise DeviceError("qid not set; device %s can't be unplugged"
% self)
return "__com.redhat_drive_del %s" % self.get_qid()
[docs] def unplug_qmp(self):
""" :return: the unplug monitor command """
if self.get_qid() is None:
raise DeviceError("qid not set; device %s can't be unplugged"
% self)
return "__com.redhat_drive_del", {'id': self.get_qid()}
[docs]class QDevice(QCustomDevice):
"""
Representation of the '-device' qemu object. It supports all methods.
:note: Use driver format in full form - 'driver' = '...' (usb-ehci, ide-hd)
"""
def __init__(self, driver=None, params=None, aobject=None,
parent_bus=None, child_bus=None):
super(QDevice, self).__init__("device", params, aobject, parent_bus,
child_bus, 'driver')
if driver:
self.set_param('driver', driver)
self.hook_drive_bus = None
def _get_alternative_name(self):
""" :return: alternative object name """
if self.params.get('driver'):
return self.params.get('driver')
[docs] def hotplug_hmp(self):
""" :return: the hotplug monitor command """
if self.params.get('driver'):
params = self.params.copy()
out = "device_add %s" % params.pop('driver')
params = _convert_args(params)
if params:
out += ",%s" % params
else:
out = "device_add %s" % _convert_args(self.params)
return out
[docs] def hotplug_qmp(self):
""" :return: the hotplug monitor command """
return "device_add", self.params
[docs] def hotplug_hmp_nd(self):
""" :return: the hotplug monitor command without dynamic parameters"""
if self.params.get('driver'):
params = self.params.copy()
out = "device_add %s" % params.pop('driver')
for key in self.dynamic_params:
params[key] = "DYN"
params = _convert_args(params)
if params:
out += ",%s" % params
else:
params = self.params.copy()
for key in self.dynamic_params:
params[key] = "DYN"
out = "device_add %s" % _convert_args(params)
return out
[docs] def hotplug_qmp_nd(self):
""" :return: the hotplug monitor command without dynamic parameters"""
params = self.params.copy()
for key in self.dynamic_params:
params[key] = "DYN"
return "device_add", params
[docs] def get_children(self):
""" Device bus should be removed too """
devices = super(QDevice, self).get_children()
if self.hook_drive_bus:
devices.append(self.hook_drive_bus)
return devices
[docs] def unplug_hmp(self):
""" :return: the unplug monitor command """
if self.get_qid():
return "device_del %s" % self.get_qid()
else:
raise DeviceError("Device has no qemu_id.")
[docs] def unplug_qmp(self):
""" :return: the unplug monitor command """
if self.get_qid():
return "device_del", {'id': self.get_qid()}
else:
raise DeviceError("Device has no qemu_id.")
[docs] def verify_unplug(self, out, monitor):
out = monitor.info("qtree", debug=False)
if "unknown command" in out: # Old qemu don't have info qtree
return out
dev_id_name = 'id "%s"' % self.get_qid()
if dev_id_name in out:
return False
else:
return True
# pylint: disable=E0202
[docs] def verify_hotplug(self, out, monitor):
out = monitor.info("qtree", debug=False)
if "unknown command" in out: # Old qemu don't have info qtree
return out
dev_id_name = 'id "%s"' % self.get_qid()
if dev_id_name in out:
return True
else:
return False
[docs]class QGlobal(QBaseDevice):
"""
Representation of qemu global setting (-global driver.property=value)
"""
def __init__(self, driver, prop, value, aobject=None,
parent_bus=None, child_bus=None):
"""
:param driver: Which global driver to set
:param prop: Which property to set
:param value: What's the desired value
:param params: component's parameters
:param aobject: Autotest object which is associated with this device
:param parent_bus: bus(es), in which this device is plugged in
:param child_bus: bus, which this device provides
"""
params = {'driver': driver, 'property': prop, 'value': value}
super(QGlobal, self).__init__('global', params, aobject,
parent_bus, child_bus)
[docs] def cmdline(self):
return "-global %s.%s=%s" % (self['driver'], self['property'],
self['value'])
[docs]class QFloppy(QGlobal):
"""
Imitation of qemu floppy disk defined by -global isa-fdc.drive?=$drive
"""
def __init__(self, unit=None, drive=None, aobject=None, parent_bus=None,
child_bus=None):
"""
:param unit: Floppy unit (None, 0, 1 or driveA, driveB)
:param drive: id of drive
:param aobject: Autotest object which is associated with this device
:param parent_bus: bus(es), in which this device is plugged in
:param child_bus: bus(es), which this device provides
"""
super(QFloppy, self).__init__('isa-fdc', unit, drive, aobject,
parent_bus, child_bus)
def _get_alternative_name(self):
return "floppy-%s" % (self.get_param('property'))
[docs] def set_param(self, option, value, option_type=None):
"""
drive and unit params have to be 'translated' as value and property.
"""
if option == 'drive':
option = 'value'
elif option == 'unit':
option = 'property'
super(QFloppy, self).set_param(option, value, option_type)