Source code for virttest.qemu_devices_unittest

#!/usr/bin/python
"""
This is a unittest for qemu_devices library.

:author: Lukas Doktor <ldoktor@redhat.com>
:copyright: 2012 Red Hat, Inc.
"""
__author__ = """Lukas Doktor (ldoktor@redhat.com)"""

import re
import unittest
import os

import common
from autotest.client.shared.test_utils import mock
from qemu_devices import qdevices, qbuses, qcontainer
from qemu_devices.utils import DeviceHotplugError, DeviceRemoveError
import data_dir
import qemu_monitor

UNITTEST_DATA_DIR = os.path.join(
    data_dir.get_root_dir(), "virttest", "unittest_data")

# Dummy variables
# qemu-1.5.0 human monitor help output
QEMU_HMP = open(os.path.join(UNITTEST_DATA_DIR, "qemu-1.5.0__hmp_help")).read()
# qemu-1.5.0 QMP monitor commands output
QEMU_QMP = open(os.path.join(UNITTEST_DATA_DIR, "qemu-1.5.0__qmp_help")).read()
# qemu-1.5.0 -help
QEMU_HELP = open(os.path.join(UNITTEST_DATA_DIR, "qemu-1.5.0__help")).read()
# qemu-1.5.0 -devices ?
QEMU_DEVICES = open(
    os.path.join(UNITTEST_DATA_DIR, "qemu-1.5.0__devices_help")).read()
# qemu-1.5.0 -M ?
QEMU_MACHINE = open(
    os.path.join(UNITTEST_DATA_DIR, "qemu-1.5.0__machine_help")).read()


[docs]class ParamsDict(dict): """ params like dictionary """
[docs] def objects(self, item): if self.get(item): return self.get(item).split(' ')
[docs] def object_params(self, obj): ret = self.copy() for (param, value) in self.iteritems(): if param.endswith('_%s' % obj): ret[param[:-len('_%s' % obj)]] = value return ret
[docs]class MockHMPMonitor(qemu_monitor.HumanMonitor): """ Dummy class inherited from qemu_monitor.HumanMonitor """ def __init__(self): # pylint: disable=W0231 self.debug_log = False def __del__(self): pass
[docs]class Devices(unittest.TestCase): """ set of qemu devices tests """
[docs] def test_q_base_device(self): """ QBaseDevice tests """ qdevice = qdevices.QBaseDevice('MyType', {'ParamA': 'ValueA', 'AUTOREMOVE': None}, 'Object1', {'type': 'pci'}) self.assertEqual(qdevice['ParamA'], 'ValueA', 'Param added during ' '__init__ is corrupted %s != %s' % (qdevice['ParamA'], 'ValueA')) qdevice['ParamA'] = 'ValueB' qdevice.set_param('BoolTrue', True) qdevice.set_param('BoolFalse', 'off', bool) qdevice['Empty'] = 'EMPTY_STRING' out = """MyType aid = None aobject = Object1 parent_bus = {'type': 'pci'} child_bus = [] params: ParamA = ValueB BoolTrue = on BoolFalse = off Empty = "" """ self.assertEqual(qdevice.str_long(), out, "Device output doesn't match" "\n%s\n\n%s" % (qdevice.str_long(), out))
[docs] def test_q_string_device(self): """ QStringDevice tests """ qdevice = qdevices.QStringDevice('MyType', {'addr': '0x7'}, cmdline='-qdevice ahci,addr=%(addr)s') self.assertEqual(qdevice.cmdline(), '-qdevice ahci,addr=0x7', "Cmdline" " doesn't match expected one:\n%s\n%s" % (qdevice.cmdline(), '-qdevice ahci,addr=0x7'))
[docs] def test_q_device(self): """ QDevice tests """ qdevice = qdevices.QDevice('ahci', {'addr': '0x7'}) self.assertEqual(str(qdevice), "a'ahci'", "Alternative name error %s " "!= %s" % (str(qdevice), "a'ahci'")) qdevice['id'] = 'ahci1' self.assertEqual(str(qdevice), "q'ahci1'", "Id name error %s " "!= %s" % (str(qdevice), "q'ahci1'")) exp = "device_add ahci,addr=0x7,id=ahci1" out = qdevice.hotplug_hmp() self.assertEqual(out, exp, "HMP command corrupted:\n%s\n%s" % (out, exp)) exp = ("('device_add', OrderedDict([('addr', '0x7'), " "('driver', 'ahci'), ('id', 'ahci1')]))") out = str(qdevice.hotplug_qmp()) self.assertEqual(out, exp, "QMP command corrupted:\n%s\n%s" % (out, exp))
[docs]class Buses(unittest.TestCase): """ Set of bus-representation tests """
[docs] def test_q_sparse_bus(self): """ Sparse bus tests (general bus testing) """ bus = qbuses.QSparseBus('bus', (['addr1', 'addr2', 'addr3'], [2, 6, 4]), 'my_bus', 'bus_type', 'autotest_bus') qdevice = qdevices.QDevice # Correct records params = {'addr1': '0', 'addr2': '0', 'addr3': '0', 'bus': 'my_bus'} dev = qdevice('dev1', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr1': '1', 'addr2': '0', 'addr3': '0', 'bus': 'my_bus'} dev = qdevice('dev2', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr1': '1', 'addr2': '1', 'addr3': '0', 'bus': 'my_bus'} dev = qdevice('dev3', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr1': '1', 'addr2': '1', 'addr3': '1', 'bus': 'my_bus'} dev = qdevice('dev4', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr1': '1', 'bus': 'my_bus'} dev = qdevice('dev5', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'bus': 'my_bus'} dev = qdevice('dev6', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {} dev2 = qdevice('dev7', params, parent_bus={'type': 'bus_type'}) exp = [] out = bus.insert(dev2, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev2.str_long(), bus.str_long())) # Compare short repr exp = ("my_bus(bus_type): {0-0-0:a'dev1',0-0-1:a'dev6',0-0-2:a'dev7'," "1-0-0:a'dev2',1-0-1:a'dev5',1-1-0:a'dev3',1-1-1:a'dev4'}") out = str(bus.str_short()) self.assertEqual(out, exp, "Short representation corrupted:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) # Incorrect records # Used address params = {'addr1': '0', 'addr2': '0', 'addr3': '0', 'bus': 'my_bus'} dev = qdevice('devI1', params, parent_bus={'type': 'bus_type'}) exp = "UsedSlot" out = bus.insert(dev, False) self.assertEqual(out, exp, "Added bad device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Out of range address params = {'addr1': '0', 'addr2': '6', 'addr3': '0', 'bus': 'my_bus'} dev = qdevice('devI2', params, parent_bus={'type': 'bus_type'}) exp = "BadAddr(False)" out = bus.insert(dev, False) self.assertEqual(out, exp, "Added bad device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Incorrect bus name params = {'bus': 'other_bus'} dev = qdevice('devI3', params, parent_bus={'type': 'bus_type'}) exp = "BusId" out = bus.insert(dev, False) self.assertEqual(out, exp, "Added bad device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Compare short repr exp = ("my_bus(bus_type): {0-0-0:a'dev1',0-0-1:a'dev6',0-0-2:a'dev7'," "1-0-0:a'dev2',1-0-1:a'dev5',1-1-0:a'dev3',1-1-1:a'dev4'}") out = str(bus.str_short()) self.assertEqual(out, exp, "Short representation corrupted:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) # Compare long repr exp = """Bus my_bus, type=bus_type Slots: ---------------< 1-0-0 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus addr2 = 0 addr3 = 0 addr1 = 1 driver = dev2 ---------------< 1-0-1 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus addr1 = 1 driver = dev5 ---------------< 1-1-1 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus addr2 = 1 addr3 = 1 addr1 = 1 driver = dev4 ---------------< 1-1-0 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus addr2 = 1 addr3 = 0 addr1 = 1 driver = dev3 ---------------< 0-0-1 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus driver = dev6 ---------------< 0-0-0 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: bus = my_bus addr2 = 0 addr3 = 0 addr1 = 0 driver = dev1 ---------------< 0-0-2 >--------------- device aid = None aobject = None parent_bus = {'type': 'bus_type'} child_bus = [] params: driver = dev7 """ out = str(bus.str_long()) self.assertEqual(out, exp, "Long representation corrupted:\n%s\n%s" % (repr(out), exp)) # Low level functions # Get device by object exp = dev2 out = bus.get(dev2) self.assertEqual(out, exp, "Failed to get device from bus:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) dev2.aid = 'bad_device3' exp = dev2 out = bus.get('bad_device3') self.assertEqual(out, exp, "Failed to get device from bus:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) exp = None out = bus.get('missing_bad_device') self.assertEqual(out, exp, "Got device while expecting None:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) # Remove all devices devs = [dev for dev in bus] for dev in devs: bus.remove(dev) exp = 'Bus my_bus, type=bus_type\nSlots:\n' out = str(bus.str_long()) self.assertEqual(out, exp, "Long representation corrupted:\n%s\n%s" % (out, exp))
[docs] def test_q_pci_bus(self): """ PCI bus tests """ bus = qbuses.QPCIBus('pci.0', 'pci', 'my_pci') qdevice = qdevices.QDevice # Good devices params = {'addr': '0'} dev = qdevice('dev1', params, parent_bus={'type': 'pci'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr': 10, 'bus': 'pci.0'} dev = qdevice('dev2', params, parent_bus={'type': 'pci'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) params = {'addr': '0x1f'} dev = qdevice('dev3', params, parent_bus={'type': 'pci'}) exp = [] out = bus.insert(dev, False) self.assertEqual(out, exp, "Failed to add device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Compare short repr exp = ("pci.0(pci): {00-00:a'dev1',0a-00:a'dev2',1f-00:a'dev3'}") out = str(bus.str_short()) self.assertEqual(out, exp, "Short representation corrupted:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long())) # Incorrect records # Used address params = {'addr': 0} dev = qdevice('devI1', params, parent_bus={'type': 'pci'}) exp = "UsedSlot" out = bus.insert(dev, False) self.assertEqual(out, exp, "Added bad device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Out of range address params = {'addr': '0xffff'} dev = qdevice('devI2', params, parent_bus={'type': 'pci'}) exp = "BadAddr(False)" out = bus.insert(dev, False) self.assertEqual(out, exp, "Added bad device; %s != %s\n%s\n\n%s" % (out, exp, dev.str_long(), bus.str_long())) # Compare short repr exp = ("pci.0(pci): {00-00:a'dev1',0a-00:a'dev2',1f-00:a'dev3'}") out = str(bus.str_short()) self.assertEqual(out, exp, "Short representation corrupted:\n%s\n%s" "\n\n%s" % (out, exp, bus.str_long()))
[docs] def test_q_pci_bus_strict(self): """ PCI bus tests in strict_mode (enforce additional options) """ bus = qbuses.QPCIBus('pci.0', 'pci', 'my_pci') qdevice = qdevices.QDevice params = {} bus.insert(qdevice('dev1', params, parent_bus={'type': 'pci'}), True) bus.insert(qdevice('dev2', params, parent_bus={'type': 'pci'}), True) bus.insert(qdevice('dev3', params, parent_bus={'type': 'pci'}), True) params = {'addr': '0x1f'} bus.insert(qdevice('dev1', params, parent_bus={'type': 'pci'}), True) params = {'addr': 30} bus.insert(qdevice('dev1', params, parent_bus={'type': 'pci'}), True) params = {'addr': 12} bus.insert(qdevice('dev1', params, parent_bus={'type': 'pci'}), True) # All devices will have 'addr' set as we are in the strict mode exp = """Bus pci.0, type=pci Slots: ---------------< 1e-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: addr = 1e driver = dev1 bus = pci.0 ---------------< 02-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: driver = dev3 bus = pci.0 addr = 02 ---------------< 1f-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: addr = 1f driver = dev1 bus = pci.0 ---------------< 00-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: driver = dev1 bus = pci.0 addr = 00 ---------------< 0c-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: addr = 0c driver = dev1 bus = pci.0 ---------------< 01-00 >--------------- device aid = None aobject = None parent_bus = {'type': 'pci'} child_bus = [] params: driver = dev2 bus = pci.0 addr = 01 """ out = str(bus.str_long()) self.assertEqual(out, exp, "Long representation corrupted:\n%s\n%s" % (out, exp))
[docs] def test_usb_bus(self): """ Tests the specific handlings of QUSBBus """ usbc1 = qbuses.QUSBBus(2, 'usb1.0', 'uhci') # Insert device into usb controller, default port dev = qdevices.QDevice('usb-kbd', parent_bus={'type': 'uhci'}) assert usbc1.insert(dev) == [] # Insert usb-hub into usb controller, default port dev = qdevices.QDevice('usb-hub', parent_bus={'type': 'uhci'}) assert usbc1.insert(dev) == [] hub1 = dev.child_bus[-1] # Insert usb-hub into usb-hub, exact port dev = qdevices.QDevice('usb-hub', {'port': '2.4'}, parent_bus={'type': 'uhci'}) assert hub1.insert(dev) == [] hub2 = dev.child_bus[-1] # Insert usb-hub into usb-hub in usb-hub, exact port dev = qdevices.QDevice('usb-hub', {'port': '2.4.3'}, parent_bus={'type': 'uhci'}) assert hub2.insert(dev) == [] hub3 = dev.child_bus[-1] # verify that port is updated correctly self.assertEqual("2.4.3", dev.get_param("port")) # Insert usb-device into usb-hub in usb-hub in usb-hub, exact port dev = qdevices.QDevice('usb-kbd', {'port': '2.4.3.1'}, parent_bus={'type': 'uhci'}) assert hub3.insert(dev) == [] # Insert usb-device into usb-hub in usb-hub in usb-hub, default port dev = qdevices.QDevice('usb-kbd', parent_bus={'type': 'uhci'}) assert hub3.insert(dev) == [] # Try to insert device into specific port which belongs to inferior bus out = hub2.insert(qdevices.QDevice('usb-kbd', {'port': '2.4.3.3'}, parent_bus={'type': 'uhci'})) assert out == "BusId" # Try to insert device into specific port which belongs to superior bus out = hub2.insert(qdevices.QDevice('usb-kbd', {'port': '2.4'}, parent_bus={'type': 'uhci'})) assert out == "BusId" # Try to insert device into specific port which belongs to same level # but different port out = hub2.insert(qdevices.QDevice('usb-kbd', {'port': '2.3.4'}, parent_bus={'type': 'uhci'})) assert out == "BusId" # Force insert device with port which belongs to other hub dev = qdevices.QDevice('usb-hub', {'port': '2.4.3.4'}, parent_bus={'type': 'uhci'}) # Check the overall buses correctness self.assertEqual("usb1.0(uhci): {1:a'usb-kbd',2:a'usb-hub'}", usbc1.str_short()) self.assertEqual("usb1.0(uhci): {4:a'usb-hub'}", hub1.str_short()) self.assertEqual("usb1.0(uhci): {3:a'usb-hub'}", hub2.str_short()) self.assertEqual("usb1.0(uhci): {1:a'usb-kbd',2:a'usb-kbd'}", hub3.str_short())
[docs]class Container(unittest.TestCase): """ Tests related to the abstract representation of qemu machine """
[docs] def setUp(self): self.god = mock.mock_god(ut=self) self.god.stub_function(qcontainer.utils, "system_output")
[docs] def tearDown(self): self.god.unstub_all()
[docs] def create_qdev(self, vm_name='vm1', strict_mode="no", allow_hotplugged_vm="yes"): """ :return: Initialized qcontainer.DevContainer object """ qemu_cmd = '/usr/bin/qemu_kvm' qcontainer.utils.system_output.expect_call('%s -help' % qemu_cmd, timeout=10, ignore_status=True ).and_return(QEMU_HELP) qcontainer.utils.system_output.expect_call("%s -device \? 2>&1" % qemu_cmd, timeout=10, ignore_status=True ).and_return(QEMU_DEVICES) qcontainer.utils.system_output.expect_call("%s -M ?" % qemu_cmd, timeout=10, ignore_status=True ).and_return(QEMU_MACHINE) cmd = "echo -e 'help\nquit' | %s -monitor stdio -vnc none" % qemu_cmd qcontainer.utils.system_output.expect_call(cmd, timeout=10, ignore_status=True ).and_return(QEMU_HMP) cmd = ('echo -e \'{ "execute": "qmp_capabilities" }\n' '{ "execute": "query-commands", "id": "RAND91" }\n' '{ "execute": "quit" }\'' '| %s -qmp stdio -vnc none | grep return |' ' grep RAND91' % qemu_cmd) qcontainer.utils.system_output.expect_call(cmd, timeout=10, ignore_status=True ).and_return('') cmd = ('echo -e \'{ "execute": "qmp_capabilities" }\n' '{ "execute": "query-commands", "id": "RAND91" }\n' '{ "execute": "quit" }\' | (sleep 1; cat )' '| %s -qmp stdio -vnc none | grep return |' ' grep RAND91' % qemu_cmd) qcontainer.utils.system_output.expect_call(cmd, timeout=10, ignore_status=True ).and_return(QEMU_QMP) qdev = qcontainer.DevContainer(qemu_cmd, vm_name, strict_mode, 'no', allow_hotplugged_vm) self.god.check_playback() return qdev
[docs] def test_qdev_functional(self): """ Test basic qdev workflow """ qdev = self.create_qdev('vm1') # Add basic 'pc' devices out = qdev.insert(qdev.machine_by_params(ParamsDict({'machine_type': 'pc'}))) assert isinstance(out, list) assert len(out) == 6, len(out) exp = r"""Devices of vm1: machine aid = __0 aobject = pci.0 parent_bus = () child_bus = \[.*QPCIBus.*, .*QStrictCustomBus.*\] params: i440FX aid = __1 aobject = None parent_bus = ({'aobject': 'pci.0'},) child_bus = \[\] params: driver = i440FX addr = 00 bus = pci.0 PIIX4_PM aid = __2 aobject = None parent_bus = ({'aobject': 'pci.0'},) child_bus = \[\] params: driver = PIIX4_PM addr = 01.3 bus = pci.0 PIIX3 aid = __3 aobject = None parent_bus = ({'aobject': 'pci.0'},) child_bus = \[\] params: driver = PIIX3 addr = 01 bus = pci.0 piix3-ide aid = __4 aobject = None parent_bus = ({'aobject': 'pci.0'},) child_bus = \[.*QIDEBus.*\] params: driver = piix3-ide addr = 01.1 bus = pci.0 fdc aid = __5 aobject = None parent_bus = \(\) child_bus = \[.*QFloppyBus.*\] params:""" out = qdev.str_long() self.assertNotEqual(re.findall(exp, out), None, 'Long representation is' 'corrupted:\n%s\n%s' % (out, exp)) exp = ("Buses of vm1\n" " floppy(floppy): [None,None]\n" " ide(ide): [None,None,None,None]\n" " _PCI_CHASSIS_NR(None): {}\n" " _PCI_CHASSIS(None): {}\n" " pci.0(PCI): {00-00:t'i440FX',01-00:t'PIIX3'," "01-01:t'piix3-ide',01-03:t'PIIX4_PM'}") out = qdev.str_bus_short() assert out == exp, "Bus representation is ocrrupted:\n%s\n%s" % (out, exp) # Insert some good devices qdevice = qdevices.QDevice # Device with child bus bus = qbuses.QSparseBus('bus', [['addr'], [6]], 'hba1.0', 'hba', 'a_hba') dev = qdevice('HBA', {'id': 'hba1', 'addr': 10}, parent_bus={'aobject': 'pci.0'}, child_bus=bus) out = qdev.insert(dev) assert isinstance(out, list), out assert len(out) == 1, len(out) # Device inside a child bus by type (most common) dev = qdevice('dev', {}, parent_bus={'type': 'hba'}) out = qdev.insert(dev) assert isinstance(out, list), out assert len(out) == 1, len(out) # Device inside a child bus by autotest_id dev = qdevice('dev', {}, 'autotest_remove', {'aobject': 'a_hba'}) out = qdev.insert(dev) assert isinstance(out, list), out assert len(out) == 1, len(out) # Device inside a child bus by busid dev = qdevice('dev', {}, 'autoremove', {'busid': 'hba1.0'}) out = qdev.insert(dev) assert isinstance(out, list), out assert len(out) == 1, len(out) # Check the representation exp = ("Devices of vm1: [t'machine',t'i440FX',t'PIIX4_PM',t'PIIX3'," "t'piix3-ide',t'fdc',hba1,a'dev',a'dev',a'dev']") out = qdev.str_short() self.assertEqual(out, exp, "Short representation is corrupted:\n%s\n%s" % (out, exp)) exp = ("Buses of vm1\n" " hba1.0(hba): {0:a'dev',1:a'dev',2:a'dev'}\n" " floppy(floppy): [None,None]\n" " ide(ide): [None,None,None,None]\n" " _PCI_CHASSIS_NR(None): {}\n" " _PCI_CHASSIS(None): {}\n" " pci.0(PCI): {00-00:t'i440FX',01-00:t'PIIX3'," "01-01:t'piix3-ide',01-03:t'PIIX4_PM',0a-00:hba1}") out = qdev.str_bus_short() assert out == exp, 'Bus representation iscorrupted:\n%s\n%s' % (out, exp) # Check the representation exp = ("Devices of vm1: [t'machine',t'i440FX',t'PIIX4_PM',t'PIIX3'," "t'piix3-ide',t'fdc',hba1,a'dev',a'dev',a'dev']") out = qdev.str_short() assert out == exp, "Short representation is corrupted:\n%s\n%s" % (out, exp) exp = ("Buses of vm1\n" " hba1.0(hba): {0:a'dev',1:a'dev',2:a'dev'}\n" " floppy(floppy): [None,None]\n" " ide(ide): [None,None,None,None]\n" " _PCI_CHASSIS_NR(None): {}\n" " _PCI_CHASSIS(None): {}\n" " pci.0(PCI): {00-00:t'i440FX',01-00:t'PIIX3'," "01-01:t'piix3-ide',01-03:t'PIIX4_PM',0a-00:hba1}") out = qdev.str_bus_short() assert out == exp, 'Bus representation is corrupted:\n%s\n%s' % (out, exp) # Now representation contains some devices, play with it a bit # length out = len(qdev) assert out == 10, "Length of qdev is incorrect: %s != %s" % (out, 10) # compare qdev2 = self.create_qdev('vm1') self.assertNotEqual(qdev, qdev2, "This qdev matches empty one:" "\n%s\n%s" % (qdev, qdev2)) self.assertNotEqual(qdev2, qdev, "Empty qdev matches current one:" "\n%s\n%s" % (qdev, qdev2)) for _ in xrange(10): qdev2.insert(qdevice()) self.assertNotEqual(qdev, qdev2, "This qdev matches different one:" "\n%s\n%s" % (qdev, qdev2)) self.assertNotEqual(qdev2, qdev, "Other qdev matches this one:\n%s\n%s" % (qdev, qdev2)) # cmdline exp = ("-machine pc -device HBA,id=hba1,addr=0a,bus=pci.0 -device dev " "-device dev -device dev") out = qdev.cmdline() self.assertEqual(out, exp, 'Corrupted qdev.cmdline() output:\n%s\n%s' % (out, exp)) # get_by_qid (currently we have 2 devices of the same qid) out = qdev.get_by_qid('hba1') self.assertEqual(len(out), 1, 'Incorrect number of devices by qid ' '"hba1": %s != 1\n%s' % (len(out), qdev.str_long())) # Remove some devices # Remove based on aid out = qdev.remove('__6') self.assertEqual(out, None, 'Failed to remove device:\n%s\nRepr:\n%s' % ('hba1__0', qdev.str_long())) # Remove device which contains other devices (without recursive) self.assertRaises(qcontainer.DeviceRemoveError, qdev.remove, 'hba1', False) # Remove device which contains other devices (recursive) out = qdev.remove('hba1') self.assertEqual(out, None, 'Failed to remove device:\n%s\nRepr:\n%s' % ('hba1', qdev.str_long())) # Check the representation exp = ("Devices of vm1: [t'machine',t'i440FX',t'PIIX4_PM',t'PIIX3'," "t'piix3-ide',t'fdc']") out = qdev.str_short() assert out == exp, "Short representation is corrupted:\n%s\n%s" % (out, exp) exp = ("Buses of vm1\n" " floppy(floppy): [None,None]\n" " ide(ide): [None,None,None,None]\n" " _PCI_CHASSIS_NR(None): {}\n" " _PCI_CHASSIS(None): {}\n" " pci.0(PCI): {00-00:t'i440FX',01-00:t'PIIX3'," "01-01:t'piix3-ide',01-03:t'PIIX4_PM'}") out = qdev.str_bus_short() assert out == exp, 'Bus representation is corrupted:\n%s\n%s' % (out, exp)
[docs] def test_qdev_hotplug(self): """ Test the hotplug/unplug functionality """ qdev = self.create_qdev('vm1', False, True) devs = qdev.machine_by_params(ParamsDict({'machine_type': 'pc'})) for dev in devs: qdev.insert(dev) monitor = MockHMPMonitor() out = qdev.get_state() assert out == -1, ("Status after init is not -1" " (%s)" % out) out = len(qdev) assert out == 6, "Number of devices of this VM is not 5 (%s)" % out dev1, dev2 = qdev.images_define_by_variables('disk', '/tmp/a', fmt="virtio") out = dev1.hotplug_hmp() exp = "drive_add auto id=drive_disk,if=none,file=/tmp/a" assert out == exp, ("Hotplug command of drive is incorrect:\n%s\n%s" % (exp, out)) # hotplug of drive will return " OK" (pass) dev1.hotplug = lambda _monitor: "OK" dev1.verify_hotplug = lambda _out, _monitor: True out, ver_out = qdev.simple_hotplug(dev1, monitor) assert out == "OK", "Return value of hotplug is not OK (%s)" % out assert ver_out is True, ("Return value of hotplug" " is not True (%s)" % ver_out) out = qdev.get_state() assert out == 0, ("Status after verified hotplug is not 0 (%s)" % out) # hotplug of virtio-blk-pci will return "" out = dev2.hotplug_hmp() exp = "device_add virtio-blk-pci,id=disk,drive=drive_disk" assert out == exp, ("Hotplug command of device is incorrect:\n%s\n%s" % (exp, out)) dev2.hotplug = lambda _monitor: "" dev2.verify_hotplug = lambda _out, _monitor: "" out, ver_out = qdev.simple_hotplug(dev2, monitor) # automatic verification is not supported, hotplug returns the original # monitor message ("") assert ver_out == "", ("Return value of hotplug is" " not "" (%s)" % ver_out) assert out == "", 'Return value of hotplug is not "" (%s)' % out out = qdev.get_state() assert out == 1, ("Status after verified hotplug is not 1 (%s)" % out) qdev.hotplug_verified() out = qdev.get_state() assert out == 0, ("Status after verified hotplug is not 0 (%s)" % out) out = len(qdev) assert out == 8, "Number of devices of this VM is not 8 (%s)" % out # Hotplug is expected to pass but monitor reports failure dev3 = qdevices.QDrive('a_dev1') dev3.hotplug = lambda _monitor: ("could not open disk image /tmp/qqq: " "No such file or directory") out, ver_out = qdev.simple_hotplug(dev3, monitor) exp = "could not open disk image /tmp/qqq: No such file or directory" assert out, "Return value of hotplug is incorrect:\n%s\n%s" % (out, exp) out = qdev.get_state() assert out == 1, ("Status after failed hotplug is not 1 (%s)" % out) # device is still in qdev, but is not in qemu, we should remove it qdev.remove(dev3, recursive=False) out = qdev.get_state() assert out == 1, ("Status after verified hotplug is not 1 (%s)" % out) qdev.hotplug_verified() out = qdev.get_state() assert out == 0, ("Status after verified hotplug is not 0 (%s)" % out) # Hotplug is expected to fail, qdev should stay unaffected dev4 = qdevices.QBaseDevice("bad_dev", parent_bus={'type': "XXX"}) dev4.hotplug = lambda _monitor: ("") self.assertRaises(qcontainer.DeviceHotplugError, qdev.simple_hotplug, dev4, True) out = qdev.get_state() assert out == 0, "Status after impossible hotplug is not 0 (%s)" % out # Unplug # Unplug used drive (automatic verification not supported) out = dev1.unplug_hmp() exp = "drive_del drive_disk" assert out == exp, ("Hotplug command of device is incorrect:\n%s\n%s" % (exp, out)) dev1.unplug = lambda _monitor: "" dev1.verify_unplug = lambda _monitor, _out: "" out, ver_out = qdev.simple_unplug(dev1, monitor) # I verified, that device was unplugged successfully qdev.hotplug_verified() out = qdev.get_state() assert out == 0, ("Status after verified hotplug is not 0 (%s)" % out) out = len(qdev) assert out == 7, "Number of devices of this VM is not 7 (%s)" % out # Removal of drive should also set drive of the disk device to None out = dev2.get_param('drive') assert out is None, "Drive was not removed from disk device"
# pylint: disable=W0212
[docs] def test_qdev_low_level(self): """ Test low level functions """ qdev = self.create_qdev('vm1') # Representation state (used for hotplug or other nasty things) out = qdev.get_state() assert out == -1, "qdev state is incorrect %s != %s" % (out, 1) qdev.set_dirty() out = qdev.get_state() self.assertEqual(out, 1, "qdev state is incorrect %s != %s" % (out, 1)) qdev.set_dirty() out = qdev.get_state() self.assertEqual(out, 2, "qdev state is incorrect %s != %s" % (out, 1)) qdev.set_clean() out = qdev.get_state() self.assertEqual(out, 1, "qdev state is incorrect %s != %s" % (out, 1)) qdev.set_clean() out = qdev.get_state() self.assertEqual(out, 0, "qdev state is incorrect %s != %s" % (out, 1)) qdev.reset_state() out = qdev.get_state() assert out == -1, "qdev state is incorrect %s != %s" % (out, 1) # __create_unique_aid dev = qdevices.QDevice() qdev.insert(dev) out = dev.get_aid() self.assertEqual(out, '__0', "incorrect aid %s != %s" % (out, '__0')) dev = qdevices.QDevice(None, {'id': 'qid'}) qdev.insert(dev) out = dev.get_aid() self.assertEqual(out, 'qid', "incorrect aid %s != %s" % (out, 'qid')) # has_option out = qdev.has_option('device') self.assertEqual(out, True) out = qdev.has_option('missing_option') self.assertEqual(out, False) # has_device out = qdev.has_device('ide-drive') self.assertEqual(out, True) out = qdev.has_device('missing_device') self.assertEqual(out, False) # get_help_text out = qdev.get_help_text() self.assertEqual(out, QEMU_HELP) # has_hmp_cmd self.assertTrue(qdev.has_hmp_cmd('pcie_aer_inject_error')) self.assertTrue(qdev.has_hmp_cmd('c')) self.assertTrue(qdev.has_hmp_cmd('cont')) self.assertFalse(qdev.has_hmp_cmd('off')) self.assertFalse(qdev.has_hmp_cmd('\ndump-guest-memory')) self.assertFalse(qdev.has_hmp_cmd('The')) # has_qmp_cmd self.assertTrue(qdev.has_qmp_cmd('device_add')) self.assertFalse(qdev.has_qmp_cmd('RAND91')) # Add some buses bus1 = qbuses.QPCIBus('pci.0', 'pci', 'a_pci0') qdev.insert(qdevices.QDevice(params={'id': 'pci0'}, child_bus=bus1)) bus2 = qbuses.QPCIBus('pci.1', 'pci', 'a_pci1') qdev.insert(qdevices.QDevice(child_bus=bus2)) bus3 = qbuses.QPCIBus('pci.2', 'pci', 'a_pci2') qdev.insert(qdevices.QDevice(child_bus=bus3)) bus4 = qbuses.QPCIBus('pcie.0', 'pcie', 'a_pcie0') qdev.insert(qdevices.QDevice(child_bus=bus4)) # get_buses (all buses of this type) out = qdev.get_buses({'type': 'pci'}) self.assertEqual(len(out), 3, 'get_buses should return 3 buses but ' 'returned %s instead:\n%s' % (len(out), out)) # get_first_free_bus (last added bus of this type) out = qdev.get_first_free_bus({'type': 'pci'}, [None, None]) self.assertEqual(bus3, out) # fill the first pci bus for _ in xrange(32): qdev.insert(qdevices.QDevice(parent_bus={'type': 'pci'})) # get_first_free_bus (last one is full, return the previous one) out = qdev.get_first_free_bus({'type': 'pci'}, [None, None]) self.assertEqual(bus2, out) # list_named_buses out = qdev.list_missing_named_buses('pci.', 'pci', 5) self.assertEqual(len(out), 2, 'Number of missing named buses is ' 'incorrect: %s != %s\n%s' % (len(out), 2, out)) out = qdev.list_missing_named_buses('pci.', 'abc', 5) self.assertEqual(len(out), 5, 'Number of missing named buses is ' 'incorrect: %s != %s\n%s' % (len(out), 2, out)) # idx_of_next_named_bus out = qdev.idx_of_next_named_bus('pci.') self.assertEqual(out, 3, 'Incorrect idx of next named bus: %s !=' ' %s' % (out, 3)) # get_children dev = qdevices.QDevice(parent_bus={'aobject': 'a_pci0'}) bus = qbuses.QPCIBus('test1', 'test', 'a_test1') dev.add_child_bus(bus) bus = qbuses.QPCIBus('test2', 'test', 'a_test2') dev.add_child_bus(bus) qdev.insert(dev) qdev.insert(qdevices.QDevice(parent_bus={'aobject': 'a_test1'})) qdev.insert(qdevices.QDevice(parent_bus={'aobject': 'a_test2'})) out = dev.get_children() assert len(out) == 2, ("Not all children were listed %d != 2:\n%s" % (len(out), out)) out = bus.get_device() assert out == dev, ("bus.get_device() returned different device " "than the one in which it was plugged:\n" "%s\n%s\n%s" % (out.str_long(), dev.str_long(), qdev.str_long()))
[docs] def test_qdev_equal(self): qdev1 = self.create_qdev('vm1', allow_hotplugged_vm='no') qdev2 = self.create_qdev('vm1', allow_hotplugged_vm='no') qdev3 = self.create_qdev('vm1', allow_hotplugged_vm='yes') monitor = MockHMPMonitor() assert qdev1 == qdev2, ("Init qdevs are not alike\n%s\n%s" % (qdev1.str_long(), qdev2.str_long())) # Insert a device to qdev1 dev = qdevices.QDevice('dev1', {'id': 'dev1'}) qdev1.insert(dev) assert qdev1 != qdev2, ("Different qdevs match:\n%s\n%s" % (qdev1.str_long(), qdev2.str_long())) # Insert similar device to qdev2 dev = qdevices.QDevice('dev1', {'id': 'dev1'}) qdev2.insert(dev) assert qdev1 == qdev2, ("Similar qdevs are not alike\n%s\n%s" % (qdev1.str_long(), qdev2.str_long())) # Hotplug similar device to qdev3 dev = qdevices.QDevice('dev1', {'id': 'dev1'}) dev.hotplug = lambda _monitor: "" # override the hotplug method dev.verify_hotplug = lambda _out, _monitor: True qdev3.simple_hotplug(dev, monitor) assert qdev1 == qdev3, ("Similar hotplugged qdevs are not alike\n%s\n" "%s" % (qdev1.str_long(), qdev2.str_long())) # Eq. is not symmetrical, qdev1 doesn't allow hotplugged VMs. assert qdev3 != qdev1, ("Similar hotplugged qdevs match even thought " "qdev1 doesn't allow hotplugged VM\n%s\n%s" % (qdev1.str_long(), qdev2.str_long())) qdev2.__qemu_help = "I support only this :-)" # pylint: disable=W0212 assert qdev1 == qdev2, ("qdevs of different qemu versions match:\n%s\n" "%s" % (qdev1.str_long(), qdev2.str_long()))
[docs] def test_pci(self): qdev = self.create_qdev('vm1') devs = qdev.machine_by_params(ParamsDict({'machine_type': 'pc'})) for dev in devs: qdev.insert(dev) # machine creates main pci (pci.0) # buses root.1 pci_switch pci_bridge # root.1: ioh3420(pci.0) # pci_switch: x3130(root.1) # pci_bridge: pci-bridge(root.1) devs = qdev.pcic_by_params('root.1', {'pci_bus': 'pci.0', 'type': 'ioh3420'}) qdev.insert(devs) devs = qdev.pcic_by_params('pci_switch', {'pci_bus': 'root.1', 'type': 'x3130'}) qdev.insert(devs) devs = qdev.pcic_by_params('pci_bridge', {'pci_bus': 'root.1', 'type': 'pci-bridge'}) qdev.insert(devs) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_bridge'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'pci_bridge'})) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_switch1'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'pci_switch'})) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_switch2'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'pci_switch'})) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_switch3'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'pci_switch'})) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_root1'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'root.1'})) qdev.insert(qdevices.QDevice("ahci", {'id': 'in_pci.0'}, parent_bus={'type': ('PCI', 'PCIE'), 'aobject': 'pci.0'})) exp = ("-machine pc -device ioh3420,id=root.1,bus=pci.0,addr=02 " "-device x3130-upstream,id=pci_switch,bus=root.1,addr=00 " "-device pci-bridge,id=pci_bridge,bus=root.1,addr=01," "chassis_nr=1 -device ahci,id=in_bridge,bus=pci_bridge,addr=01" " -device xio3130-downstream,bus=pci_switch,id=pci_switch.0," "addr=00,chassis=1 -device ahci,id=in_switch1,bus=pci_switch.0" ",addr=00 " "-device xio3130-downstream,bus=pci_switch,id=pci_switch.1," "addr=01,chassis=2 -device ahci,id=in_switch2,bus=pci_switch.1" ",addr=00 " "-device xio3130-downstream,bus=pci_switch,id=pci_switch.2," "addr=02,chassis=3 -device ahci,id=in_switch3,bus=pci_switch.2" ",addr=00 " "-device ahci,id=in_root1,bus=root.1,addr=02 " "-device ahci,id=in_pci.0,bus=pci.0,addr=03") out = qdev.cmdline() assert out == exp, (out, exp)
if __name__ == "__main__": unittest.main()