"""
Autotest qdev-structure representation.
This is the main class which represent qdev-structure. It allows to create,
interact and verify the qemu qdev structure.
:copyright: 2012-2013 Red Hat Inc.
"""
# Python imports
import logging
import re
# Autotest imports
from autotest.client.shared import utils, error
from virttest import arch, storage, data_dir, virt_vm
from utils import (DeviceError, DeviceHotplugError, DeviceInsertError,
DeviceRemoveError, DeviceUnplugError, none_or_int)
import os
import qbuses
import qdevices
import shutil
#
# Device container (device representation of VM)
# This class represents VM by storing all devices and their connections (buses)
#
[docs]class DevContainer(object):
"""
Device container class
"""
# General methods
def __init__(self, qemu_binary, vmname, strict_mode="no",
workaround_qemu_qmp_crash="no", allow_hotplugged_vm="yes"):
"""
:param qemu_binary: qemu binary
:param vm: related VM
:param strict_mode: Use strict mode (set optional params)
"""
def get_hmp_cmds(qemu_binary):
""" :return: list of human monitor commands """
_ = utils.system_output("echo -e 'help\nquit' | %s -monitor "
"stdio -vnc none" % qemu_binary,
timeout=10, ignore_status=True)
_ = re.findall(r'^([^\| \[\n]+\|?\w+)', _, re.M)
hmp_cmds = []
for cmd in _:
if '|' not in cmd:
if cmd != 'The':
hmp_cmds.append(cmd)
else:
hmp_cmds.extend(cmd.split('|'))
return hmp_cmds
def get_qmp_cmds(qemu_binary, workaround_qemu_qmp_crash=False):
""" :return: list of qmp commands """
cmds = None
if not workaround_qemu_qmp_crash:
cmds = utils.system_output('echo -e \''
'{ "execute": "qmp_capabilities" }\n'
'{ "execute": "query-commands", "id": "RAND91" }\n'
'{ "execute": "quit" }\''
'| %s -qmp stdio -vnc none | grep return |'
' grep RAND91' % qemu_binary, timeout=10,
ignore_status=True).splitlines()
if not cmds:
# Some qemu versions crashes when qmp used too early; add sleep
cmds = utils.system_output('echo -e \''
'{ "execute": "qmp_capabilities" }\n'
'{ "execute": "query-commands", "id": "RAND91" }\n'
'{ "execute": "quit" }\' | (sleep 1; cat )'
'| %s -qmp stdio -vnc none | grep return |'
' grep RAND91' % qemu_binary, timeout=10,
ignore_status=True).splitlines()
if cmds:
cmds = re.findall(r'{\s*"name"\s*:\s*"([^"]+)"\s*}', cmds[0])
if cmds: # If no mathes, return None
return cmds
self.__state = -1 # -1 synchronized, 0 synchronized after hotplug
self.__qemu_help = utils.system_output("%s -help" % qemu_binary,
timeout=10, ignore_status=True)
# escape the '?' otherwise it will fail if we have a single-char filename in cwd
self.__device_help = utils.system_output("%s -device \? 2>&1"
% qemu_binary, timeout=10,
ignore_status=True)
self.__machine_types = utils.system_output("%s -M ?" % qemu_binary,
timeout=10, ignore_status=True)
self.__hmp_cmds = get_hmp_cmds(qemu_binary)
self.__qmp_cmds = get_qmp_cmds(qemu_binary,
workaround_qemu_qmp_crash == 'always')
self.vmname = vmname
self.strict_mode = strict_mode == 'yes'
self.__devices = []
self.__buses = []
self.__qemu_binary = qemu_binary
self.__execute_qemu_last = None
self.__execute_qemu_out = ""
self.allow_hotplugged_vm = allow_hotplugged_vm == 'yes'
def __getitem__(self, item):
"""
:param item: autotest id or QObject-like object
:return: First matching object defined in this QDevContainer
:raise KeyError: In case no match was found
"""
if isinstance(item, qdevices.QBaseDevice):
if item in self.__devices:
return item
elif item:
for device in self.__devices:
if device.get_aid() == item:
return device
raise KeyError("Device %s is not in %s" % (item, self))
[docs] def get(self, item):
"""
:param item: autotest id or QObject-like object
:return: First matching object defined in this QDevContainer or None
"""
if item in self:
return self[item]
[docs] def get_by_properties(self, filt):
"""
Return list of matching devices
:param filt: filter {'property': 'value', ...}
:type filt: dict
"""
out = []
for device in self.__devices:
for key, value in filt.iteritems():
if not hasattr(device, key):
break
if getattr(device, key) != value:
break
else:
out.append(device)
return out
[docs] def get_by_params(self, filt):
"""
Return list of matching devices
:param filt: filter {'param': 'value', ...}
:type filt: dict
"""
out = []
for device in self.__devices:
for key, value in filt.iteritems():
if key not in device.params:
break
if device.params[key] != value:
break
else:
out.append(device)
return out
def __delitem__(self, item):
"""
Delete specified item from devices list
:param item: autotest id or QObject-like object
:raise KeyError: In case no match was found
"""
# Remove child_buses including devices
if self.remove(item):
raise KeyError(item)
[docs] def remove(self, device, recursive=True):
"""
Remove device from this representation
:param device: autotest id or QObject-like object
:param recursive: remove children recursively
:return: None on success, -1 when the device is not present
"""
device = self[device]
if not recursive: # Check if there are no children
for bus in device.child_bus:
if len(bus) != 0:
raise DeviceRemoveError(device, "Child bus contains "
"devices", self)
else: # Recursively remove all devices
for dev in device.get_children():
# One child might be already removed from other child's bus
if dev in self:
self.remove(dev, True)
if device in self.__devices: # It might be removed from child bus
for bus in self.__buses: # Remove from parent_buses
bus.remove(device)
for bus in device.child_bus: # Remove child buses from vm buses
self.__buses.remove(bus)
self.__devices.remove(device) # Remove from list of devices
[docs] def wash_the_device_out(self, device):
"""
Removes any traces of the device from representation.
:param device: qdevices.QBaseDevice device
"""
# remove device from parent buses
for bus in self.__buses:
if device in bus:
bus.remove(device)
# remove child devices
for bus in device.child_bus:
for dev in device.get_children():
if dev in self:
self.remove(dev, True)
# remove child_buses from self.__buses
if bus in self.__buses:
self.__buses.remove(bus)
# remove device from self.__devices
if device in self.__devices:
self.__devices.remove(device)
def __len__(self):
""" :return: Number of inserted devices """
return len(self.__devices)
def __contains__(self, item):
"""
Is specified item defined in current devices list?
:param item: autotest id or QObject-like object
:return: True - yes, False - no
"""
if isinstance(item, qdevices.QBaseDevice):
if item in self.__devices:
return True
elif item:
for device in self.__devices:
if device.get_aid() == item:
return True
return False
def __iter__(self):
""" Iterate over all defined devices. """
return self.__devices.__iter__()
def __eq__(self, qdev2):
""" Are the VM representation alike? """
if len(qdev2) != len(self):
return False
if qdev2.get_state() != self.get_state():
if qdev2.allow_hotplugged_vm:
if qdev2.get_state() > 0 or self.get_state() > 0:
return False
else:
return False
for dev in self:
if dev not in qdev2:
return False
# state, buses and devices are handled earlier
qdev2 = qdev2.__dict__
for key, value in self.__dict__.iteritems():
if key in ("_DevContainer__devices", "_DevContainer__buses",
"_DevContainer__state",
"allow_hotplugged_vm"):
continue
if key not in qdev2 or qdev2[key] != value:
return False
return True
def __ne__(self, qdev2):
""" Are the VM representation different? """
return not self.__eq__(qdev2)
[docs] def set_dirty(self):
""" Increase VM dirtiness (not synchronized with VM) """
if self.__state >= 0:
self.__state += 1
else:
self.__state = 1
[docs] def set_clean(self):
""" Decrease VM dirtiness (synchronized with VM) """
if self.__state > 0:
self.__state -= 1
else:
raise DeviceError("Trying to clean clear VM (probably calling "
"hotplug_clean() twice).\n%s" % self.str_long())
[docs] def reset_state(self):
"""
Mark representation as completely clean, without hotplugged devices.
"""
self.__state = -1
[docs] def get_state(self):
""" Get the current state (0 = synchronized with VM) """
return self.__state
[docs] def get_by_qid(self, qid):
"""
:param qid: qemu id
:return: List of items with matching qemu id
"""
ret = []
if qid:
for device in self:
if device.get_qid() == qid:
ret.append(device)
return ret
[docs] def str_short(self):
""" Short string representation of all devices """
out = "Devices of %s" % self.vmname
dirty = self.get_state()
if dirty == -1:
pass
elif dirty == 0:
out += "(H)"
else:
out += "(DIRTY%s)" % dirty
out += ": ["
for device in self:
out += "%s," % device
if out[-1] == ',':
out = out[:-1]
return out + "]"
[docs] def str_long(self):
""" Long string representation of all devices """
out = "Devices of %s" % self.vmname
dirty = self.get_state()
if dirty == -1:
pass
elif dirty == 0:
out += "(H)"
else:
out += "(DIRTY%s)" % dirty
out += ":\n"
for device in self:
out += device.str_long()
if out[-1] == '\n':
out = out[:-1]
return out
[docs] def str_bus_short(self):
""" Short representation of all buses """
out = "Buses of %s\n " % self.vmname
for bus in self.__buses:
out += str(bus)
out += "\n "
return out[:-3]
[docs] def str_bus_long(self):
""" Long representation of all buses """
out = "Devices of %s:\n " % self.vmname
for bus in self.__buses:
out += bus.str_long().replace('\n', '\n ')
return out[:-3]
def __create_unique_aid(self, qid):
"""
Creates unique autotest id name from given qid
:param qid: Original qemu id
:return: aid (the format is "$qid__%d")
"""
if qid and qid not in self:
return qid
i = 0
while "%s__%d" % (qid, i) in self:
i += 1
return "%s__%d" % (qid, i)
[docs] def has_option(self, option):
"""
:param option: Desired option
:return: Is the desired option supported by current qemu?
"""
return bool(re.search(r"^-%s(\s|$)" % option, self.__qemu_help,
re.MULTILINE))
[docs] def has_device(self, device):
"""
:param device: Desired device
:return: Is the desired device supported by current qemu?
"""
return bool(re.search(r'name "%s"|alias "%s"' % (device, device),
self.__device_help))
[docs] def get_help_text(self):
"""
:return: Full output of "qemu -help"
"""
return self.__qemu_help
[docs] def has_hmp_cmd(self, cmd):
"""
:param cmd: Desired command
:return: Is the desired command supported by this qemu's human monitor?
"""
return cmd in self.__hmp_cmds
[docs] def has_qmp_cmd(self, cmd):
"""
:param cmd: Desired command
:return: Is the desired command supported by this qemu's QMP monitor?
"""
return cmd in self.__qmp_cmds
[docs] def execute_qemu(self, options, timeout=5):
"""
Execute this qemu and return the stdout+stderr output.
:param options: additional qemu options
:type options: string
:param timeout: execution timeout
:type timeout: int
:return: Output of the qemu
:rtype: string
"""
if self.__execute_qemu_last != options:
cmd = "%s %s 2>&1" % (self.__qemu_binary, options)
self.__execute_qemu_out = str(utils.run(cmd, timeout=timeout,
ignore_status=True,
verbose=False).stdout)
return self.__execute_qemu_out
[docs] def get_buses(self, bus_spec, type_test=False):
"""
:param bus_spec: Bus specification (dictionary)
:type bus_spec: dict
:param atype: Match qemu and atype params
:type atype: bool
:return: All matching buses
:rtype: List of QSparseBus
"""
buses = []
for bus in self.__buses:
if bus.match_bus(bus_spec, type_test):
buses.append(bus)
return buses
[docs] def get_first_free_bus(self, bus_spec, addr):
"""
:param bus_spec: Bus specification (dictionary)
:param addr: Desired address
:return: First matching bus with free desired address (the latest
added matching bus)
"""
buses = self.get_buses(bus_spec)
for bus in buses:
_ = bus.get_free_slot(addr)
if _ is not None and _ is not False:
return bus
[docs] def insert(self, devices, strict_mode=None):
"""
Inserts devices into this VM representation
:param devices: List of qdevices.QBaseDevice devices
:raise DeviceError: On failure. The representation remains unchanged.
"""
def cleanup():
""" Remove all added devices (on failure) """
for device in added:
self.wash_the_device_out(device)
if not isinstance(devices, list):
devices = [devices]
added = []
for device in devices:
try:
added.extend(self._insert(device, strict_mode))
except DeviceError, details:
cleanup()
raise DeviceError("%s\nError occurred while inserting device %s"
" (%s). Please check the log for details."
% (details, device, devices))
return added
def _insert(self, device, strict_mode=None):
"""
Inserts device into this VM representation
:param device: qdevices.QBaseDevice device
:raise DeviceError: On failure. The representation remains unchanged.
1) get list of matching parent buses
2) try to find matching bus+address
2b) add bus required additional devices prior to adding this device
3) add child buses
4) append into self.devices
"""
def clean(device, added_devices):
""" Remove all inserted devices (on failure) """
self.wash_the_device_out(device)
for device in added_devices:
self.wash_the_device_out(device)
if strict_mode is None:
_strict_mode = self.strict_mode
if strict_mode is True:
_strict_mode = True
else:
_strict_mode = False
added_devices = []
if device.parent_bus is not None and not isinstance(device.parent_bus,
(list, tuple)):
# it have to be list of parent buses
device.parent_bus = (device.parent_bus,)
for parent_bus in device.parent_bus:
# type, aobject, busid
if parent_bus is None:
continue
# 1
buses = self.get_buses(parent_bus, True)
if not buses:
err = "ParentBus(%s): No matching bus\n" % parent_bus
clean(device, added_devices)
raise DeviceInsertError(device, err, self)
bus_returns = []
strict_mode = _strict_mode
for bus in buses: # 2
if not bus.match_bus(parent_bus, False):
# First available bus in qemu is not of the same type as
# we in autotest require. Force strict mode to get this
# device into the correct bus (ide-hd could go into ahci
# and ide hba, qemu doesn't care, autotest does).
strict_mode = True
bus_returns.append(-1) # Don't use this bus
continue
bus_returns.append(bus.insert(device, strict_mode))
if isinstance(bus_returns[-1], list): # we are done
# The bus might require additional devices plugged first
try:
added_devices.extend(self.insert(bus_returns[-1]))
except DeviceError, details:
err = ("Can't insert device %s because additional "
"device required by bus %s failed to be "
"inserted with:\n%s" % (device, bus, details))
clean(device, added_devices)
raise DeviceError(err)
break
if isinstance(bus_returns[-1], list): # we are done
continue
err = "ParentBus(%s): No free matching bus\n" % parent_bus
clean(device, added_devices)
raise DeviceInsertError(device, err, self)
# 3
for bus in device.child_bus:
self.__buses.insert(0, bus)
# 4
if device.get_qid() and self.get_by_qid(device.get_qid()):
err = "Devices qid %s already used in VM\n" % device.get_qid()
clean(device, added_devices)
raise DeviceInsertError(device, err, self)
device.set_aid(self.__create_unique_aid(device.get_qid()))
self.__devices.append(device)
added_devices.append(device)
return added_devices
[docs] def simple_hotplug(self, device, monitor):
"""
Function hotplug device to devices representation. If verification is
supported by hodplugged device and result of verification is True
then it calls set_clean. Otherwise it don't call set_clean because
devices representatio don't know if device is added correctly.
:param device: Device which should be unplugged.
:type device: string, qdevices.QDevice.
:param monitor: Monitor from vm.
:type monitor: qemu_monitor.Monitor
:return: tuple(monitor.cmd(), verify_hotplug output)
"""
self.set_dirty()
out = device.hotplug(monitor)
ver_out = device.verify_hotplug(out, monitor)
if ver_out is False:
self.set_clean()
return out, ver_out
qdev_out = None
try:
qdev_out = self.insert(device)
if not isinstance(qdev_out, list) or len(qdev_out) != 1:
raise NotImplementedError("This device %s require to hotplug "
"multiple devices %s, which is not "
"supported." % (device, out))
if ver_out is True:
self.set_clean()
except DeviceError, exc:
self.set_clean() # qdev remains consistent
raise DeviceHotplugError(device, 'According to qemu_device: %s'
% exc, self)
out = device.hotplug(monitor)
return out, ver_out
[docs] def simple_unplug(self, device, monitor):
"""
Function unplug device to devices representation. If verification is
supported by unplugged device and result of verification is True
then it calls set_clean. Otherwise it don't call set_clean because
devices representatio don't know if device is added correctly.
:param device: Device which should be unplugged.
:type device: string, qdevices.QDevice.
:param monitor: Monitor from vm.
:type monitor: qemu_monitor.Monitor
:return: tuple(monitor.cmd(), verify_unplug output)
"""
device = self[device]
self.set_dirty()
# Remove all devices, which are removed together with this dev
out = device.unplug(monitor)
ver_out = device.verify_unplug(out, monitor)
if ver_out is False:
self.set_clean()
return out, ver_out
try:
device.unplug_hook()
self.remove(device, True)
if ver_out is True:
self.set_clean()
elif out is False:
raise DeviceUnplugError(device, "Device wasn't unplugged in "
"qemu, but it was unplugged in device "
"representation.", self)
except (DeviceError, KeyError), exc:
device.unplug_unhook()
raise DeviceUnplugError(device, exc, self)
return out, ver_out
[docs] def hotplug_verified(self):
"""
This function should be used after you verify, that hotplug was
successful. For each hotplug call, hotplug_verified have to be
executed in order to mark VM as clear.
:warning: If you can't verify, that hotplug was successful, don't
use this function! You could screw-up following tests.
"""
self.set_clean()
[docs] def list_missing_named_buses(self, bus_pattern, bus_type, bus_count):
"""
:param bus_pattern: Bus name pattern with 1x%s for idx or %s is
appended in the end. ('mybuses' or 'my%sbus').
:param bus_type: Type of the bus.
:param bus_count: Desired number of buses.
:return: List of buses, which are missing in range(bus_count)
"""
if "%s" not in bus_pattern:
bus_pattern = bus_pattern + "%s"
missing_buses = [bus_pattern % i for i in xrange(bus_count)]
for bus in self.__buses:
if bus.type == bus_type and re.match(bus_pattern % '\d+',
bus.busid):
if bus.busid in missing_buses:
missing_buses.remove(bus.busid)
return missing_buses
[docs] def idx_of_next_named_bus(self, bus_pattern):
"""
:param bus_pattern: Bus name prefix without %s and tailing digit
:return: Name of the next bus (integer is appended and incremented
until there is no existing bus).
"""
if "%s" not in bus_pattern:
bus_pattern = bus_pattern + "%s"
buses = []
for bus in self.__buses:
if bus.busid and re.match(bus_pattern % '\d+', bus.busid):
buses.append(bus.busid)
i = 0
while True:
if bus_pattern % i not in buses:
return i
i += 1
[docs] def cmdline(self, dynamic=True):
"""
Creates cmdline arguments for creating all defined devices
:return: cmdline of all devices (without qemu-cmd itself)
"""
out = ""
for device in self.__devices:
if dynamic:
_out = device.cmdline()
else:
_out = device.cmdline_nd()
if _out:
out += " %s" % _out
if out:
return out[1:]
[docs] def hook_fill_scsi_hbas(self, params):
"""
This hook creates dummy scsi hba per 7 -drive 'scsi' devices.
"""
i = 6 # We are going to divide it by 7 so 6 will result in 0
for image_name in params.objects("images"):
image_params = params.object_params(image_name)
_is_oldscsi = (image_params.get('drive_format') == 'scsi')
_scsi_without_device = (not self.has_option('device') and
params.object_params(image_name)
.get('drive_format', 'virtio_blk')
.startswith('scsi'))
if _is_oldscsi or _scsi_without_device:
i += 1
for image_name in params.objects("cdroms"):
_is_oldscsi = (params.object_params(image_name).get('cd_format') ==
'scsi')
_scsi_without_device = (not self.has_option('device') and
params.object_params(image_name)
.get('cd_format', 'virtio_blk')
.startswith('scsi'))
if _is_oldscsi or _scsi_without_device:
i += 1
for i in xrange(i / 7): # Autocreated lsi hba
if arch.ARCH in ('ppc64', 'ppc64le'):
_name = 'spapr-vscsi%s' % i
bus = qbuses.QSCSIBus("scsi.0", 'SCSI', [8, 16384],
atype='spapr-vscsi')
self.insert(qdevices.QStringDevice(_name,
child_bus=bus))
else:
_name = 'lsi53c895a%s' % i
bus = qbuses.QSCSIBus("scsi.0", 'SCSI', [8, 16384], atype='lsi53c895a')
self.insert(qdevices.QStringDevice(_name,
parent_bus={'aobject':
params.get('pci_bus',
'pci.0')},
child_bus=bus))
# Machine related methods
[docs] def machine_by_params(self, params=None):
"""
Choose the used machine and set the default devices accordingly
:param params: VM params
:return: List of added devices (including default buses)
"""
def machine_q35(cmd=False):
"""
Q35 + ICH9
:param cmd: If set uses "-M $cmd" to force this machine type
:return: List of added devices (including default buses)
"""
logging.warn('Using Q35 machine which is not yet fullytested on '
'virt-test. False errors might occur.')
devices = []
bus = (qbuses.QPCIBus('pcie.0', 'PCIE', 'pci.0'),
qbuses.QStrictCustomBus(None, [['chassis'], [256]], '_PCI_CHASSIS',
first_port=[1]),
qbuses.QStrictCustomBus(None, [['chassis_nr'], [256]],
'_PCI_CHASSIS_NR', first_port=[1]))
devices.append(qdevices.QStringDevice('machine', cmdline=cmd,
child_bus=bus,
aobject="pci.0"))
devices.append(qdevices.QStringDevice('mch', {'addr': 0, 'driver': 'mch'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('ICH9 LPC', {'addr': '1f.0',
'driver': 'ICH9 LPC'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('ICH9 SMB', {'addr': '1f.3',
'driver': 'ICH9 SMB'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('ICH9-ahci', {'addr': '1f.2',
'driver': 'ich9-ahci'},
parent_bus={'aobject': 'pci.0'},
child_bus=qbuses.QAHCIBus('ide')))
if self.has_option('device') and self.has_option("global"):
devices.append(qdevices.QStringDevice('fdc',
child_bus=qbuses.QFloppyBus('floppy')))
else:
devices.append(qdevices.QStringDevice('fdc',
child_bus=qbuses.QOldFloppyBus('floppy'))
)
return devices
def machine_i440FX(cmd=False):
"""
i440FX + PIIX
:param cmd: If set uses "-M $cmd" to force this machine type
:return: List of added devices (including default buses)
"""
devices = []
pci_bus = "pci.0"
bus = (qbuses.QPCIBus(pci_bus, 'PCI', 'pci.0'),
qbuses.QStrictCustomBus(None, [['chassis'], [256]], '_PCI_CHASSIS',
first_port=[1]),
qbuses.QStrictCustomBus(None, [['chassis_nr'], [256]],
'_PCI_CHASSIS_NR', first_port=[1]))
devices.append(qdevices.QStringDevice('machine', cmdline=cmd,
child_bus=bus,
aobject="pci.0"))
devices.append(qdevices.QStringDevice('i440FX',
{'addr': 0, 'driver': 'i440FX'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('PIIX4_PM', {'addr': '01.3',
'driver': 'PIIX4_PM'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('PIIX3',
{'addr': 1, 'driver': 'PIIX3'},
parent_bus={'aobject': 'pci.0'}))
devices.append(qdevices.QStringDevice('piix3-ide', {'addr': '01.1',
'driver': 'piix3-ide'},
parent_bus={'aobject': 'pci.0'},
child_bus=qbuses.QIDEBus('ide')))
if self.has_option('device') and self.has_option("global"):
devices.append(qdevices.QStringDevice('fdc',
child_bus=qbuses.QFloppyBus('floppy')))
else:
devices.append(qdevices.QStringDevice('fdc',
child_bus=qbuses.QOldFloppyBus('floppy'))
)
return devices
def machine_aarch64(cmd=False):
"""
aarch64 (arm64) doesn't support PCI bus, only MMIO transports.
Also it requires pflash for EFI boot.
:param cmd: If set uses "-M $cmd" to force this machine type
:return: List of added devices (including default buses)
"""
def get_aavmf_vars(params):
"""
Naive implementation of obtaining the main (first) image name
"""
try:
first_image = params.objects('images')[0]
name = params.object_params(first_image).get('image_name')
return os.path.join(data_dir.DATA_DIR,
name + "_AAVMF_VARS.fd")
except IndexError:
raise DeviceError("Unable to map main image name to "
"AAVMF variables file.")
logging.warn('Support for aarch64 is highly experimental!')
devices = []
devices.append(qdevices.QStringDevice('machine', cmdline=cmd))
# EFI pflash
aavmf_code = ("-drive file=/usr/share/AAVMF/AAVMF_CODE.fd,"
"if=pflash,format=raw,unit=0,readonly=on")
devices.append(qdevices.QStringDevice('AAVMF_CODE',
cmdline=aavmf_code))
aavmf_vars = get_aavmf_vars(params)
if not os.path.exists(aavmf_vars):
logging.warn("AAVMF variables file '%s' doesn't exist, "
"recreating it from the template (this should "
"only happen when you install the machine as "
"there is no default boot in EFI!)")
shutil.copy2('/usr/share/AAVMF/AAVMF_VARS.fd', aavmf_vars)
aavmf_vars = ("-drive file=%s,if=pflash,format=raw,unit=1"
% aavmf_vars)
devices.append(qdevices.QStringDevice('AAVMF_VARS',
cmdline=aavmf_vars))
# Add virtio-bus
# TODO: Currently this uses QNoAddrCustomBus and does not
# set the device's properties. This means that the qemu qtree
# and autotest's representations are completelly different and
# can't be used.
bus = qbuses.QNoAddrCustomBus('bus', [['addr'], [32]],
'virtio-mmio-bus', 'virtio-bus',
'virtio-mmio-bus')
devices.append(qdevices.QStringDevice('machine', cmdline=cmd,
child_bus=bus,
aobject="virtio-mmio-bus"))
return devices
def machine_other(cmd=False):
"""
isapc or unknown machine type. This type doesn't add any default
buses or devices, only sets the cmdline.
:param cmd: If set uses "-M $cmd" to force this machine type
:return: List of added devices (including default buses)
"""
logging.warn('Machine type isa/unknown is not supported by '
'virt-test. False errors might occur')
devices = []
devices.append(qdevices.QStringDevice('machine', cmdline=cmd))
return devices
machine_type = params.get('machine_type')
machine_type_extra_params = params.get('machine_type_extra_params')
if machine_type:
m_types = []
for _ in self.__machine_types.splitlines()[1:]:
m_types.append(_.split()[0])
if machine_type in m_types:
if (self.has_option('M') or self.has_option('machine')):
cmd = "-machine %s" % machine_type
if machine_type_extra_params:
cmd += ",%s" % machine_type_extra_params.strip(',')
else:
cmd = ""
if 'q35' in machine_type: # Q35 + ICH9
devices = machine_q35(cmd)
elif arch.ARCH == 'aarch64':
devices = machine_aarch64(cmd)
elif 'isapc' not in machine_type: # i440FX
devices = machine_i440FX(cmd)
else: # isapc (or other)
devices = machine_other(cmd)
elif params.get("invalid_machine_type", "no") == "yes":
# For negative testing pretend the unsupported machine is
# similar to i440fx one (1 PCI bus, ..)
devices = machine_i440FX("-M %s" % machine_type)
else:
raise error.TestNAError("Unsupported machine type %s." %
(machine_type))
else:
devices = None
for _ in self.__machine_types.splitlines()[1:]:
if 'default' in _:
if 'q35' in machine_type: # Q35 + ICH9
devices = machine_q35(False)
elif 'isapc' not in machine_type: # i440FX
devices = machine_i440FX(False)
else: # isapc (or other)
logging.warn('Machine isa/unknown is not supported by '
'virt-test. False errors might occur')
devices = machine_other(False)
if not devices:
logging.warn('Unable to find the default machine type, using '
'i440FX')
devices = machine_i440FX(False)
# reserve pci.0 addresses
pci_params = params.object_params('pci.0')
reserved = pci_params.get('reserved_slots', '').split()
if reserved:
for bus in self.__buses:
if bus.aobject == "pci.0":
for addr in reserved:
bus.reserve(hex(int(addr)))
break
return devices
# USB Controller related methods
[docs] def usbc_by_variables(self, usb_id, usb_type, multifunction=False,
masterbus=None, firstport=None, freq=None,
max_ports=6, pci_addr=None, pci_bus='pci.0'):
"""
Creates usb-controller devices by variables
:param usb_id: Usb bus name
:param usb_type: Usb bus type
:param multifunction: Is the bus multifunction
:param masterbus: Is this bus master?
:param firstport: Offset of the first port
:param freq: Bus frequency
:param max_ports: How many ports this bus have [6]
:param pci_addr: Desired PCI address
:return: List of QDev devices
"""
if not self.has_option("device"):
# Okay, for the archaic qemu which has not device parameter,
# just return a usb uhci controller.
# If choose this kind of usb controller, it has no name/id,
# and only can be created once, so give it a special name.
usb = qdevices.QStringDevice("oldusb", cmdline="-usb",
child_bus=qbuses.QUSBBus(2, 'usb.0', 'uhci', usb_id))
return [usb]
if not self.has_device(usb_type):
raise error.TestNAError("usb controller %s not available"
% usb_type)
usb = qdevices.QDevice(usb_type, {}, usb_id, {'aobject': pci_bus},
qbuses.QUSBBus(max_ports, '%s.0' % usb_id, usb_type, usb_id))
new_usbs = [usb] # each usb dev might compound of multiple devs
# TODO: Add 'bus' property (it was not in the original version)
usb.set_param('id', usb_id)
usb.set_param('masterbus', masterbus)
usb.set_param('multifunction', multifunction)
usb.set_param('firstport', firstport)
usb.set_param('freq', freq)
usb.set_param('addr', pci_addr)
if usb_type == "ich9-usb-ehci1":
usb.set_param('addr', '1d.7')
usb.set_param('multifunction', 'on')
for i in xrange(3):
new_usbs.append(qdevices.QDevice('ich9-usb-uhci%d' % (i + 1), {},
usb_id))
new_usbs[-1].parent_bus = {'aobject': pci_bus}
new_usbs[-1].set_param('id', '%s.%d' % (usb_id, i))
new_usbs[-1].set_param('multifunction', 'on')
new_usbs[-1].set_param('masterbus', '%s.0' % usb_id)
# current qdevices doesn't support x.y addr. Plug only
# the 0th one into this representation.
new_usbs[-1].set_param('addr', '1d.%d' % (2 * i))
new_usbs[-1].set_param('firstport', 2 * i)
return new_usbs
[docs] def usbc_by_params(self, usb_name, params):
"""
Wrapper for creating usb bus from autotest usb params.
:param usb_name: Name of the usb bus
:param params: USB params (params.object_params(usb_name))
:return: List of QDev devices
"""
return self.usbc_by_variables(usb_name,
params.get('usb_type'),
params.get('multifunction'),
params.get('masterbus'),
params.get('firstport'),
params.get('freq'),
params.get('max_ports', 6),
params.get('pci_addr'),
params.get('pci_bus', 'pci.0'))
# USB Device related methods
[docs] def usb_by_variables(self, usb_name, usb_type, controller_type, bus=None,
port=None):
"""
Creates usb-devices by variables.
:param usb_name: usb name
:param usb_type: usb type (usb-tablet, usb-serial, ...)
:param controller_type: type of the controller (uhci, ehci, xhci, ...)
:param bus: the bus name (my_bus.0, ...)
:param port: port specifiacation (4, 4.1.2, ...)
:return: QDev device
"""
if not self.has_device(usb_type):
raise error.TestNAError("usb device %s not available"
% usb_type)
if self.has_option('device'):
device = qdevices.QDevice(usb_type, aobject=usb_name)
device.set_param('id', 'usb-%s' % usb_name)
device.set_param('bus', bus)
device.set_param('port', port)
device.parent_bus += ({'type': controller_type},)
else:
if "tablet" in usb_type:
device = qdevices.QStringDevice('usb-%s' % usb_name,
cmdline='-usbdevice %s' % usb_name)
else:
device = qdevices.QStringDevice('missing-usb-%s' % usb_name)
logging.error("This qemu supports only tablet device; ignoring"
" %s", usb_name)
return device
[docs] def usb_by_params(self, usb_name, params):
"""
Wrapper for creating usb devices from autotest params.
:param usb_name: Name of the usb
:param params: USB device's params
:return: QDev device
"""
return self.usb_by_variables(usb_name,
params.get("usb_type"),
params.get("usb_controller"),
params.get("bus"),
params.get("port"))
# Images (disk, cdrom, floppy) device related methods
[docs] def images_define_by_variables(self, name, filename, index=None, fmt=None,
cache=None, werror=None, rerror=None, serial=None,
snapshot=None, boot=None, blkdebug=None, bus=None,
unit=None, port=None, bootindex=None, removable=None,
min_io_size=None, opt_io_size=None,
physical_block_size=None, logical_block_size=None,
readonly=None, scsiid=None, lun=None, aio=None,
strict_mode=None, media=None, imgfmt=None,
pci_addr=None, scsi_hba=None, x_data_plane=None,
blk_extra_params=None, scsi=None,
pci_bus='pci.0', drv_extra_params=None,
num_queues=None, bus_extra_params=None,
force_fmt=None):
"""
Creates related devices by variables
:note: To skip the argument use None, to disable it use False
:note: Strictly bool options accept "yes", "on" and True ("no"...)
:param name: Autotest name of this disk
:param filename: Path to the disk file
:param index: drive index (used for generating names)
:param fmt: drive subsystem type (ide, scsi, virtio, usb2, ...)
:param force_fmt: Force to use specific drive format
:param cache: disk cache (none, writethrough, writeback)
:param werror: What to do when write error occurs (stop, ...)
:param rerror: What to do when read error occurs (stop, ...)
:param serial: drive serial number ($string)
:param snapshot: use snapshot? ($bool)
:param boot: is bootable? ($bool)
:param blkdebug: use blkdebug (None, blkdebug_filename)
:param bus: 1st level of disk location (index of bus) ($int)
:param unit: 2nd level of disk location (unit/scsiid/...) ($int)
:param port: 3rd level of disk location (port/lun/...) ($int)
:param bootindex: device boot priority ($int)
:param removable: can the drive be removed? ($bool)
:param min_io_size: Min allowed io size
:param opt_io_size: Optimal io size
:param physical_block_size: set physical_block_size ($int)
:param logical_block_size: set logical_block_size ($int)
:param readonly: set the drive readonly ($bool)
:param scsiid: Deprecated 2nd level of disk location (&unit)
:param lun: Deprecated 3rd level of disk location (&port)
:param aio: set the type of async IO (native, threads, ..)
:param strict_mode: enforce optional parameters (address, ...) ($bool)
:param media: type of the media (disk, cdrom, ...)
:param imgfmt: image format (qcow2, raw, ...)
:param pci_addr: drive pci address ($int)
:param scsi_hba: Custom scsi HBA
:param num_queues: performace option for virtio-scsi-pci
:param bus_extra_params: options want to add to virtio-scsi-pci bus
"""
def define_hbas(qtype, atype, bus, unit, port, qbus, pci_bus,
addr_spec=None, num_queues=None,
bus_extra_params=None):
"""
Helper for creating HBAs of certain type.
"""
devices = []
if qbus == qbuses.QAHCIBus: # AHCI uses multiple ports, id is different
_hba = 'ahci%s'
else:
_hba = atype.replace('-', '_') + '%s.0' # HBA id
_bus = bus
if bus is None:
bus = self.get_first_free_bus({'type': qtype, 'atype': atype},
[unit, port])
if bus is None:
bus = self.idx_of_next_named_bus(_hba)
else:
bus = bus.busid
if isinstance(bus, int):
for bus_name in self.list_missing_named_buses(
_hba, qtype, bus + 1):
_bus_name = bus_name.rsplit('.')[0]
bus_params = {'id': _bus_name, 'driver': atype}
if num_queues is not None and int(num_queues) > 1:
bus_params['num_queues'] = num_queues
if bus_extra_params:
for extra_param in bus_extra_params.split(","):
key, value = extra_param.split('=')
bus_params[key] = value
if addr_spec:
dev = qdevices.QDevice(params=bus_params,
parent_bus=pci_bus,
child_bus=qbus(busid=bus_name,
bus_type=qtype,
addr_spec=addr_spec,
atype=atype))
else:
dev = qdevices.QDevice(params=bus_params,
parent_bus=pci_bus,
child_bus=qbus(busid=bus_name))
devices.append(dev)
bus = _hba % bus
if qbus == qbuses.QAHCIBus and unit is not None:
bus += ".%d" % unit
# If bus was not set, don't set it, unless the device is
# a spapr-vscsi device.
elif _bus is None and 'spapr_vscsi' not in _hba:
bus = None
return devices, bus, {'type': qtype, 'atype': atype}
#
# Parse params
#
devices = [] # All related devices
use_device = self.has_option("device")
if fmt == "scsi": # fmt=scsi force the old version of devices
logging.warn("'scsi' drive_format is deprecated, please use the "
"new lsi_scsi type for disk %s", name)
use_device = False
if not fmt:
use_device = False
if fmt == 'floppy' and not self.has_option("global"):
use_device = False
if strict_mode is None:
strict_mode = self.strict_mode
if strict_mode: # Force default variables
if cache is None:
cache = "none"
if removable is None:
removable = "yes"
if aio is None:
aio = "native"
if media is None:
media = "disk"
else: # Skip default variables
if media != 'cdrom': # ignore only 'disk'
media = None
if "[,boot=on|off]" not in self.get_help_text():
if boot in ('yes', 'on', True):
bootindex = "1"
boot = None
bus = none_or_int(bus) # First level
unit = none_or_int(unit) # Second level
port = none_or_int(port) # Third level
# Compatibility with old params - scsiid, lun
if scsiid is not None:
logging.warn("drive_scsiid param is obsolete, use drive_unit "
"instead (disk %s)", name)
unit = none_or_int(scsiid)
if lun is not None:
logging.warn("drive_lun param is obsolete, use drive_port instead "
"(disk %s)", name)
port = none_or_int(lun)
if pci_addr is not None and fmt == 'virtio':
logging.warn("drive_pci_addr is obsolete, use drive_bus instead "
"(disk %s)", name)
bus = none_or_int(pci_addr)
pci_bus = {'aobject': pci_bus}
#
# HBA
# fmt: ide, scsi, virtio, scsi-hd, ahci, usb1,2,3 + hba
# device: ide-drive, usb-storage, scsi-hd, scsi-cd, virtio-blk-pci
# bus: ahci, virtio-scsi-pci, USB
#
if not use_device:
if fmt and (fmt == "scsi" or (fmt.startswith('scsi') and
(scsi_hba == 'lsi53c895a' or
scsi_hba == 'spapr-vscsi'))):
if not (bus is None and unit is None and port is None):
logging.warn("Using scsi interface without -device "
"support; ignoring bus/unit/port. (%s)", name)
bus, unit, port = None, None, None
# In case we hotplug, lsi wasn't added during the startup hook
if arch.ARCH in ('ppc64', 'ppc64le'):
_ = define_hbas('SCSI', 'spapr-vscsi', None, None, None,
qbuses.QSCSIBus, None, [8, 16384])
else:
_ = define_hbas('SCSI', 'lsi53c895a', None, None, None,
qbuses.QSCSIBus, pci_bus, [8, 16384])
devices.extend(_[0])
elif fmt == "ide":
if bus:
logging.warn('ide supports only 1 hba, use drive_unit to set'
'ide.* for disk %s', name)
bus = unit
dev_parent = {'type': 'IDE', 'atype': 'ide'}
elif fmt == "ahci":
devs, bus, dev_parent = define_hbas('IDE', 'ahci', bus, unit, port,
qbuses.QAHCIBus, pci_bus)
devices.extend(devs)
elif fmt.startswith('scsi-'):
if not scsi_hba:
scsi_hba = "virtio-scsi-pci"
if scsi_hba != "virtio-scsi-pci":
num_queues = None
addr_spec = None
if scsi_hba == 'lsi53c895a' or scsi_hba == 'spapr-vscsi':
addr_spec = [8, 16384]
elif scsi_hba == 'virtio-scsi-pci':
addr_spec = [256, 16384]
elif scsi_hba == 'virtio-scsi-device':
addr_spec = [256, 16384]
pci_bus = {'type': 'virtio-bus'}
if scsi_hba == 'spapr-vscsi':
pci_bus = None
_, bus, dev_parent = define_hbas('SCSI', scsi_hba, bus, unit, port,
qbuses.QSCSIBus, pci_bus,
addr_spec, num_queues=num_queues,
bus_extra_params=bus_extra_params)
devices.extend(_)
elif fmt in ('usb1', 'usb2', 'usb3'):
if bus:
logging.warn('Manual setting of drive_bus is not yet supported'
' for usb disk %s', name)
bus = None
if fmt == 'usb1':
dev_parent = {'type': 'uhci'}
elif fmt == 'usb2':
dev_parent = {'type': 'ehci'}
elif fmt == 'usb3':
dev_parent = {'type': 'xhci'}
elif fmt == 'virtio':
dev_parent = pci_bus
elif fmt == 'virtio-blk-device':
dev_parent = {'type': 'virtio-bus'}
else:
dev_parent = {'type': fmt}
#
# Drive
# -drive fmt or -drive fmt=none -device ...
#
if self.has_hmp_cmd('__com.redhat_drive_add') and use_device:
devices.append(qdevices.QRHDrive(name))
elif self.has_hmp_cmd('drive_add') and use_device:
devices.append(qdevices.QHPDrive(name))
elif self.has_option("device"):
devices.append(qdevices.QDrive(name, use_device))
else: # very old qemu without 'addr' support
devices.append(qdevices.QOldDrive(name, use_device))
devices[-1].set_param('if', 'none')
devices[-1].set_param('cache', cache)
devices[-1].set_param('rerror', rerror)
devices[-1].set_param('werror', werror)
devices[-1].set_param('serial', serial)
devices[-1].set_param('boot', boot, bool)
devices[-1].set_param('snapshot', snapshot, bool)
devices[-1].set_param('readonly', readonly, bool)
if 'aio' in self.get_help_text():
devices[-1].set_param('aio', aio)
devices[-1].set_param('media', media)
devices[-1].set_param('format', imgfmt)
if blkdebug is not None:
devices[-1].set_param('file', 'blkdebug:%s:%s' % (blkdebug,
filename))
else:
devices[-1].set_param('file', filename)
if drv_extra_params:
drv_extra_params = (_.split('=', 1) for _ in
drv_extra_params.split(',') if _)
for key, value in drv_extra_params:
devices[-1].set_param(key, value)
if not use_device:
if fmt and fmt.startswith('scsi-'):
if scsi_hba == 'lsi53c895a' or scsi_hba == 'spapr-vscsi':
fmt = 'scsi' # Compatibility with the new scsi
if fmt and fmt not in ('ide', 'scsi', 'sd', 'mtd', 'floppy',
'pflash', 'virtio'):
raise virt_vm.VMDeviceNotSupportedError(self.vmname,
fmt)
devices[-1].set_param('if', fmt) # overwrite previously set None
if not fmt: # When fmt unspecified qemu uses ide
fmt = 'ide'
devices[-1].set_param('index', index)
if fmt == 'ide':
devices[-1].parent_bus = ({'type': fmt.upper(), 'atype': fmt},)
elif fmt == 'scsi':
if arch.ARCH in ('ppc64', 'ppc64le'):
devices[-1].parent_bus = ({'atype': 'spapr-vscsi',
'type': 'SCSI'},)
else:
devices[-1].parent_bus = ({'atype': 'lsi53c895a',
'type': 'SCSI'},)
elif fmt == 'floppy':
devices[-1].parent_bus = ({'type': fmt},)
elif fmt == 'virtio':
devices[-1].set_param('addr', pci_addr)
devices[-1].parent_bus = (pci_bus,)
if not media == 'cdrom':
logging.warn("Using -drive fmt=xxx for %s is unsupported "
"method, false errors might occur.", name)
return devices
#
# Device
#
devices.append(qdevices.QDevice(params={}, aobject=name))
devices[-1].parent_bus += ({'busid': 'drive_%s' % name}, dev_parent)
if fmt in ("ide", "ahci"):
if not self.has_device('ide-hd'):
devices[-1].set_param('driver', 'ide-drive')
elif media == 'cdrom':
devices[-1].set_param('driver', 'ide-cd')
else:
devices[-1].set_param('driver', 'ide-hd')
devices[-1].set_param('unit', port)
elif fmt and fmt.startswith('scsi-'):
devices[-1].set_param('driver', fmt)
devices[-1].set_param('scsi-id', unit)
devices[-1].set_param('lun', port)
devices[-1].set_param('removable', removable, bool)
if strict_mode:
devices[-1].set_param('channel', 0)
elif fmt == 'virtio':
devices[-1].set_param('driver', 'virtio-blk-pci')
devices[-1].set_param("scsi", scsi, bool)
if bus is not None:
devices[-1].set_param('addr', hex(bus))
bus = None
elif fmt in ('usb1', 'usb2', 'usb3'):
devices[-1].set_param('driver', 'usb-storage')
devices[-1].set_param('port', unit)
devices[-1].set_param('removable', removable, bool)
elif fmt == 'floppy':
# Overwrite qdevices.QDevice with qdevices.QFloppy
devices[-1] = qdevices.QFloppy(unit, 'drive_%s' % name, name,
({'busid': 'drive_%s' % name}, {'type': fmt}))
else:
logging.warn('Using default device handling (disk %s)', name)
devices[-1].set_param('driver', fmt)
if force_fmt:
logging.info("Force to use %s for the device" % force_fmt)
devices[-1].set_param('driver', force_fmt)
# Get the supported options
options = self.execute_qemu("-device %s,?" % devices[-1]['driver'])
devices[-1].set_param('id', name)
devices[-1].set_param('bus', bus)
devices[-1].set_param('drive', 'drive_%s' % name)
devices[-1].set_param('logical_block_size', logical_block_size)
devices[-1].set_param('physical_block_size', physical_block_size)
devices[-1].set_param('min_io_size', min_io_size)
devices[-1].set_param('opt_io_size', opt_io_size)
devices[-1].set_param('bootindex', bootindex)
if x_data_plane in ["yes", "no", "on", "off"]:
devices[-1].set_param('x-data-plane', x_data_plane, bool)
else:
devices[-1].set_param('iothread', x_data_plane)
if 'serial' in options:
devices[-1].set_param('serial', serial)
devices[-2].set_param('serial', None) # remove serial from drive
if blk_extra_params:
blk_extra_params = (_.split('=', 1) for _ in
blk_extra_params.split(',') if _)
for key, value in blk_extra_params:
devices[-1].set_param(key, value)
return devices
[docs] def images_define_by_params(self, name, image_params, media=None,
index=None, image_boot=None,
image_bootindex=None):
"""
Wrapper for creating disks and related hbas from autotest image params.
:note: To skip the argument use None, to disable it use False
:note: Strictly bool options accept "yes", "on" and True ("no"...)
:note: Options starting with '_' are optional and used only when
strict_mode is True
:param name: Name of the new disk
:param params: Disk params (params.object_params(name))
"""
shared_dir = os.path.join(data_dir.get_data_dir(), "shared")
return self.images_define_by_variables(name,
storage.get_image_filename(
image_params,
data_dir.get_data_dir()),
index,
image_params.get(
"drive_format"),
image_params.get("drive_cache"),
image_params.get(
"drive_werror"),
image_params.get(
"drive_rerror"),
image_params.get(
"drive_serial"),
image_params.get(
"image_snapshot"),
image_boot,
storage.get_image_blkdebug_filename(
image_params,
shared_dir),
image_params.get("drive_bus"),
image_params.get("drive_unit"),
image_params.get("drive_port"),
image_bootindex,
image_params.get("removable"),
image_params.get("min_io_size"),
image_params.get("opt_io_size"),
image_params.get(
"physical_block_size"),
image_params.get(
"logical_block_size"),
image_params.get(
"image_readonly"),
image_params.get(
"drive_scsiid"),
image_params.get("drive_lun"),
image_params.get("image_aio"),
image_params.get(
"strict_mode") == "yes",
media,
image_params.get(
"image_format"),
image_params.get(
"drive_pci_addr"),
image_params.get("scsi_hba"),
image_params.get(
"x-data-plane"),
image_params.get(
"blk_extra_params"),
image_params.get("virtio-blk-pci_scsi"),
image_params.get('pci_bus', 'pci.0'),
image_params.get("drv_extra_params"),
image_params.get("num_queues"),
image_params.get("bus_extra_params"),
image_params.get("force_drive_format"))
[docs] def cdroms_define_by_params(self, name, image_params, media=None,
index=None, image_boot=None,
image_bootindex=None):
"""
Wrapper for creating cdrom and related hbas from autotest image params.
:note: To skip the argument use None, to disable it use False
:note: Strictly bool options accept "yes", "on" and True ("no"...)
:note: Options starting with '_' are optional and used only when
strict_mode is True
:param name: Name of the new disk
:param params: Disk params (params.object_params(name))
"""
iso = image_params.get('cdrom')
image_params['image_name'] = ""
if iso:
image_params['image_name'] = os.path.join(data_dir.get_data_dir(),
image_params.get('cdrom')
)
image_params['image_raw_device'] = 'yes'
cd_format = image_params.get('cd_format')
if cd_format is None or cd_format is 'ide':
if not self.get_buses({'atype': 'ide'}):
logging.warn("cd_format IDE not available, using AHCI "
"instead.")
cd_format = 'ahci'
shared_dir = os.path.join(data_dir.get_data_dir(), "shared")
return self.images_define_by_variables(name,
storage.get_image_filename(
image_params,
data_dir.get_data_dir()),
index,
cd_format,
'', # skip drive_cache
image_params.get(
"drive_werror"),
image_params.get(
"drive_rerror"),
image_params.get(
"drive_serial"),
image_params.get(
"image_snapshot"),
image_boot,
storage.get_image_blkdebug_filename(
image_params,
shared_dir),
image_params.get("drive_bus"),
image_params.get("drive_unit"),
image_params.get("drive_port"),
image_bootindex,
image_params.get("removable"),
image_params.get("min_io_size"),
image_params.get("opt_io_size"),
image_params.get(
"physical_block_size"),
image_params.get(
"logical_block_size"),
image_params.get(
"image_readonly"),
image_params.get(
"drive_scsiid"),
image_params.get("drive_lun"),
image_params.get("image_aio"),
image_params.get(
"strict_mode") == "yes",
media,
None, # skip img_fmt
image_params.get(
"drive_pci_addr"),
image_params.get("scsi_hba"),
image_params.get(
"x-data-plane"),
image_params.get(
"blk_extra_params"),
image_params.get("virtio-blk-pci_scsi"),
image_params.get('pci_bus', 'pci.0'),
image_params.get("drv_extra_params"),
None,
image_params.get("bus_extra_params"),
image_params.get("force_drive_format"))
[docs] def pcic_by_params(self, name, params):
"""
Creates pci controller/switch/... based on params
:param name: Autotest name
:param params: PCI controller params
:note: x3130 creates x3130-upstream bus + xio3130-downstream port for
each inserted device.
:warning: x3130-upstream device creates only x3130-upstream device
and you are responsible for creating the downstream ports.
"""
driver = params.get('type', 'ioh3420')
if driver in ('ioh3420', 'x3130-upstream', 'x3130'):
bus_type = 'PCIE'
else:
bus_type = 'PCI'
parent_bus = [{'aobject': params.get('pci_bus', 'pci.0')}]
if driver == 'x3130':
bus = qbuses.QPCISwitchBus(name, bus_type, 'xio3130-downstream', name)
driver = 'x3130-upstream'
else:
if driver == 'pci-bridge': # addr 1-19, chasis_nr
parent_bus.append({'busid': '_PCI_CHASSIS_NR'})
bus_length = 20
bus_first_port = 1
elif driver == 'i82801b11-bridge': # addr 1-19
bus_length = 20
bus_first_port = 1
else: # addr = 0-31
bus_length = 32
bus_first_port = 0
bus = qbuses.QPCIBus(name, bus_type, name, bus_length, bus_first_port)
for addr in params.get('reserved_slots', '').split():
bus.reserve(addr)
return qdevices.QDevice(driver, {'id': name}, aobject=name,
parent_bus=parent_bus,
child_bus=bus)