"""
Autotest representations of qemu buses.
These classes emulates the usual qemu buses behaviors in order to create
or match the autotest params into qemu qdev structure.
:copyright: 2012-2013 Red Hat Inc.
"""
# Autotest imports
import qdevices
from utils import none_or_int
#
# Bus representations
# HDA, I2C, IDE, ISA, PCI, SCSI, System, uhci, ehci, ohci, xhci, ccid,
# virtio-serial-bus
#
[docs]class QSparseBus(object):
"""
Universal bus representation object.
It creates an abstraction of the way how buses works in qemu. Additionally
it can store incorrect records (out-of-range addr, multiple devs, ...).
Everything with bad* prefix means it concerns the bad records (badbus).
You can insert and remove device to certain address, address ranges or let
the bus assign first free address. The order of addr_spec does matter since
the last item is incremented first.
There are 3 different address representation used:
stor_addr
stored address representation '$first-$second-...-$ZZZ'
addr
internal address representation [$first, $second, ..., $ZZZ]
device_addr
qemu address stored into separate device params (bus, port)
device{$param1:$first, $param2:$second, ..., $paramZZZ, $ZZZ}
:note: When you insert a device, it's properties might be updated (addr,..)
"""
def __init__(self, bus_item, addr_spec, busid, bus_type=None, aobject=None,
atype=None):
"""
:param bus_item: Name of the parameter which specifies bus (bus)
:type bus_item: str
:param addr_spec: Bus address specification [names][lengths]
:type addr_spec: list of lists
:param busid: id of the bus (pci.0)
:type busid: str
:param bus_type: type of the bus (pci)
:type bus_type: dict
:param aobject: Related autotest object (image1)
:type aobject: str
:param atype: Autotest bus type
:type atype: str
"""
self.busid = busid
self.type = bus_type
self.aobject = aobject
self.bus = {} # Normal bus records
self.bus_item = bus_item # bus param name
self.addr_items = addr_spec[0] # [names][lengths]
self.addr_lengths = addr_spec[1]
self.atype = atype
self.__device = None
self.first_port = [0] * len(addr_spec[0])
def __str__(self):
""" default string representation """
return self.str_short()
def __getitem__(self, item):
"""
:param item: autotest id or QObject-like object
:return: First matching object from this bus
:raise KeyError: In case no match was found
"""
if isinstance(item, qdevices.QBaseDevice):
if item in self.bus.itervalues():
return item
else:
for device in self.bus.itervalues():
if device.get_aid() == item:
return device
raise KeyError("Device %s is not in %s" % (item, self))
[docs] def get(self, item):
"""
:param item: autotest id or QObject-like object
:return: First matching object from this bus or None
"""
if item in self:
return self[item]
def __delitem__(self, item):
"""
Remove device from bus
:param item: autotest id or QObject-like object
:raise KeyError: In case no match was found
"""
self.remove(self[item])
def __len__(self):
""" :return: Number of devices in this bus """
return len(self.bus)
def __contains__(self, item):
"""
Is specified item in this bus?
:param item: autotest id or QObject-like object
:return: True - yes, False - no
"""
if isinstance(item, qdevices.QBaseDevice):
if item in self.bus.itervalues():
return True
else:
for device in self:
if device.get_aid() == item:
return True
return False
def __iter__(self):
""" Iterate over all defined devices. """
return self.bus.itervalues()
[docs] def str_short(self):
""" short string representation """
if self.atype:
bus_type = self.atype
else:
bus_type = self.type
return "%s(%s): %s" % (self.busid, bus_type, self._str_devices())
def _str_devices(self):
""" short string representation of the good bus """
out = '{'
for addr in sorted(self.bus.keys()):
out += "%s:%s," % (addr, self.bus[addr])
if out[-1] == ',':
out = out[:-1]
return out + '}'
[docs] def str_long(self):
""" long string representation """
if self.atype:
bus_type = self.atype
else:
bus_type = self.type
return "Bus %s, type=%s\nSlots:\n%s" % (self.busid, bus_type,
self._str_devices_long())
def _str_devices_long(self):
""" long string representation of devices in the good bus """
out = ""
for addr, dev in self.bus.iteritems():
out += '%s< %4s >%s\n ' % ('-' * 15, addr,
'-' * 15)
if isinstance(dev, str):
out += '"%s"\n ' % dev
else:
out += dev.str_long().replace('\n', '\n ')
out = out[:-3]
out += '\n'
return out
def _increment_addr(self, addr, last_addr=None):
"""
Increment addr base of addr_pattern and last used addr
:param addr: addr_pattern
:param last_addr: previous address
:return: last_addr + 1
"""
if not last_addr:
last_addr = [0] * len(self.addr_lengths)
i = -1
while True:
if i < -len(self.addr_lengths):
return False
if addr[i] is not None:
i -= 1
continue
last_addr[i] += 1
if last_addr[i] < self.addr_lengths[i]:
return last_addr
last_addr[i] = 0
i -= 1
@staticmethod
def _addr2stor(addr):
"""
Converts internal addr to storable/hashable address
:param addr: internal address [addr1, addr2, ...]
:return: storable address "addr1-addr2-..."
"""
out = ""
for value in addr:
if value is None:
out += '*-'
else:
out += '%s-' % value
if out:
return out[:-1]
else:
return "*"
def _dev2addr(self, device):
"""
Parse the internal address out of the device
:param device: qdevices.QBaseDevice device
:return: internal address [addr1, addr2, ...]
"""
addr = []
for key in self.addr_items:
addr.append(none_or_int(device.get_param(key)))
return addr
def _set_first_addr(self, addr_pattern):
"""
:param addr_pattern: Address pattern (full qualified or with Nones)
:return: first valid address based on addr_pattern
"""
use_reserved = True
if addr_pattern is None:
addr_pattern = [None] * len(self.addr_lengths)
# set first usable addr
last_addr = addr_pattern[:]
if None in last_addr: # Address is not fully specified
use_reserved = False # Use only free address
for i in xrange(len(last_addr)):
if last_addr[i] is None:
last_addr[i] = self.first_port[i]
return last_addr, use_reserved
[docs] def get_free_slot(self, addr_pattern):
"""
Finds unoccupied address
:param addr_pattern: Address pattern (full qualified or with Nones)
:return: First free address when found, (free or reserved for this dev)
None when no free address is found, (all occupied)
False in case of incorrect address (oor)
"""
# init
last_addr, use_reserved = self._set_first_addr(addr_pattern)
# Check the addr_pattern ranges
for i in xrange(len(self.addr_lengths)):
if (last_addr[i] < self.first_port[i] or
last_addr[i] >= self.addr_lengths[i]):
return False
# Increment addr until free match is found
while last_addr is not False:
if self._addr2stor(last_addr) not in self.bus:
return last_addr
if (use_reserved and
self.bus[self._addr2stor(last_addr)] == "reserved"):
return last_addr
last_addr = self._increment_addr(addr_pattern, last_addr)
return None # No free matching address found
def _check_bus(self, device):
"""
Check, whether this device can be plugged into this bus.
:param device: qdevices.QBaseDevice device
:return: True in case ids are correct, False when not
"""
if (device.get_param(self.bus_item) and
device.get_param(self.bus_item) != self.busid):
return False
else:
return True
def _set_device_props(self, device, addr):
"""
Set the full device address
:param device: qdevices.QBaseDevice device
:param addr: internal address [addr1, addr2, ...]
"""
if self.bus_item:
device.set_param(self.bus_item, self.busid)
for i in xrange(len(self.addr_items)):
device.set_param(self.addr_items[i], addr[i])
def _update_device_props(self, device, addr):
"""
Update values of previously set address items.
:param device: qdevices.QBaseDevice device
:param addr: internal address [addr1, addr2, ...]
"""
if device.get_param(self.bus_item) is not None:
device.set_param(self.bus_item, self.busid)
for i in xrange(len(self.addr_items)):
if device.get_param(self.addr_items[i]) is not None:
device.set_param(self.addr_items[i], addr[i])
[docs] def reserve(self, addr):
"""
Reserve the slot
:param addr: Desired address
:type addr: internal [addr1, addr2, ..] or stor format "addr1-addr2-.."
"""
if not isinstance(addr, str):
addr = self._addr2stor(addr)
self.bus[addr] = "reserved"
[docs] def insert(self, device, strict_mode=False):
"""
Insert device into this bus representation.
:param device: qdevices.QBaseDevice device
:param strict_mode: Use strict mode (set optional params)
:return: list of added devices on success,
string indicating the failure on failure.
"""
additional_devices = []
if not self._check_bus(device):
return "BusId"
try:
addr_pattern = self._dev2addr(device)
except (ValueError, LookupError):
return "BasicAddress"
addr = self.get_free_slot(addr_pattern)
if addr is None:
if None in addr_pattern:
return "NoFreeSlot"
else:
return "UsedSlot"
elif addr is False:
return "BadAddr(%s)" % addr
else:
additional_devices.extend(self._insert(device,
self._addr2stor(addr)))
if strict_mode: # Set full address in strict_mode
self._set_device_props(device, addr)
else:
self._update_device_props(device, addr)
return additional_devices
def _insert(self, device, addr):
"""
Insert device into good bus
:param device: qdevices.QBaseDevice device
:param addr: internal address [addr1, addr2, ...]
:return: List of additional devices
"""
self.bus[addr] = device
return []
[docs] def remove(self, device):
"""
Remove device from this bus
:param device: qdevices.QBaseDevice device
:return: True when removed, False when the device wasn't found
"""
if device in self.bus.itervalues():
remove = None
for key, item in self.bus.iteritems():
if item is device:
remove = key
break
if remove is not None:
del(self.bus[remove])
return True
return False
[docs] def set_device(self, device):
""" Set the device in which this bus belongs """
self.__device = device
[docs] def get_device(self):
""" Get device in which this bus is present """
return self.__device
[docs] def match_bus(self, bus_spec, type_test=True):
"""
Check if the bus matches the bus_specification.
:param bus_spec: Bus specification
:type bus_spec: dict
:param type_test: Match only type
:type type_test: bool
:return: True when the bus matches the specification
:rtype: bool
"""
if type_test and bus_spec.get('type'):
if isinstance(bus_spec['type'], (tuple, list)):
for bus_type in bus_spec['type']:
if bus_type == self.type:
return True
return False
elif self.type == bus_spec['type']:
return True
for key, value in bus_spec.iteritems():
if isinstance(value, (tuple, list)):
for val in value:
if self.__dict__.get(key, None) == val:
break
else:
return False
elif self.__dict__.get(key, None) != value:
return False
return True
[docs]class QStrictCustomBus(QSparseBus):
"""
Similar to QSparseBus. The address starts with 1 and addr is always set
"""
def __init__(self, bus_item, addr_spec, busid, bus_type=None, aobject=None,
atype=None, first_port=None):
super(QStrictCustomBus, self).__init__(bus_item, addr_spec, busid,
bus_type, aobject, atype)
if first_port:
self.first_port = first_port
def _update_device_props(self, device, addr):
""" in case this is usb-hub update the child port_prefix """
self._set_device_props(device, addr)
[docs]class QNoAddrCustomBus(QSparseBus):
"""
This is the opposite of QStrictCustomBus. Even when addr is set it's not
updated in the device's params.
"""
def _set_device_props(self, device, addr):
pass
def _update_device_props(self, device, addr):
pass
[docs]class QUSBBus(QSparseBus):
"""
USB bus representation including usb-hub handling.
"""
def __init__(self, length, busid, bus_type, aobject=None,
port_prefix=None):
"""
Bus type have to be generalized and parsed from original bus type:
(usb-ehci == ehci, ich9-usb-uhci1 == uhci, ...)
"""
# There are various usb devices for the same bus type, use only portion
for bus in ('uhci', 'ehci', 'ohci', 'xhci'):
if bus in bus_type:
bus_type = bus
break
# Usb ports are counted from 1 so the length have to be +1
super(QUSBBus, self).__init__('bus', [['port'], [length + 1]], busid,
bus_type, aobject)
self.__port_prefix = port_prefix
self.__length = length
self.first_port = [1]
def _check_bus(self, device):
""" Check port prefix in order to match addresses in usb-hubs """
if not super(QUSBBus, self)._check_bus(device):
return False
port = device.get_param('port') # 2.1.6
if port or port == 0: # If port is specified
idx = str(port).rfind('.')
if idx != -1: # Strip last number and compare with port_prefix
return port[:idx] == self.__port_prefix
# Port is number, match only root usb bus
elif self.__port_prefix != "":
return False
return True
def _dev2addr(self, device):
"""
Parse the internal address out of the device
:param device: qdevices.QBaseDevice device
:return: internal address [addr1, addr2, ...]
"""
value = device.get_param('port')
if value is None:
addr = [None]
else:
addr = [int(value[len(self.__port_prefix) + 1:])]
return addr
def __hook_child_bus(self, device, addr):
""" If this is usb-hub, add child bus """
# only usb hub needs customization
if device.get_param('driver') != 'usb-hub':
return
_bus = [_ for _ in device.child_bus if not isinstance(_, QUSBBus)]
_bus.append(QUSBBus(8, self.busid, self.type, device.get_aid(),
str(addr[0])))
device.child_bus = _bus
def _set_device_props(self, device, addr):
""" in case this is usb-hub update the child port_prefix """
if addr[0] or addr[0] is 0:
if self.__port_prefix:
addr = ['%s.%s' % (self.__port_prefix, addr[0])]
self.__hook_child_bus(device, addr)
super(QUSBBus, self)._set_device_props(device, addr)
def _update_device_props(self, device, addr):
""" in case this is usb-hub update the child port_prefix """
self._set_device_props(device, addr)
[docs]class QDriveBus(QSparseBus):
"""
QDrive bus representation (single slot, drive=...)
"""
def __init__(self, busid, aobject=None):
"""
:param busid: id of the bus (pci.0)
:param aobject: Related autotest object (image1)
"""
super(QDriveBus, self).__init__('drive', [[], []], busid, 'QDrive',
aobject)
[docs] def get_free_slot(self, addr_pattern):
""" Use only drive as slot """
if 'drive' in self.bus:
return None
else:
return True
@staticmethod
def _addr2stor(addr):
""" address is always drive """
return 'drive'
def _update_device_props(self, device, addr):
"""
Always set -drive property, it's mandatory. Also for hotplug purposes
store this bus device into hook variable of the device.
"""
self._set_device_props(device, addr)
if hasattr(device, 'hook_drive_bus'):
device.hook_drive_bus = self.get_device()
[docs]class QDenseBus(QSparseBus):
"""
Dense bus representation. The only difference from SparseBus is the output
string format. DenseBus iterates over all addresses and show free slots
too. SparseBus on the other hand prints always the device address.
"""
def _str_devices_long(self):
""" Show all addresses even when they are unused """
out = ""
addr_pattern = [None] * len(self.addr_items)
addr = self._set_first_addr(addr_pattern)[0]
while addr:
dev = self.bus.get(self._addr2stor(addr))
out += '%s< %4s >%s\n ' % ('-' * 15, self._addr2stor(addr),
'-' * 15)
if hasattr(dev, 'str_long'):
out += dev.str_long().replace('\n', '\n ')
out = out[:-3]
elif isinstance(dev, str):
out += '"%s"' % dev
else:
out += "%s" % dev
out += '\n'
addr = self._increment_addr(addr_pattern, addr)
return out
def _str_devices(self):
""" Show all addresses even when they are unused, don't print addr """
out = '['
addr_pattern = [None] * len(self.addr_items)
addr = self._set_first_addr(addr_pattern)[0]
while addr:
out += "%s," % self.bus.get(self._addr2stor(addr))
addr = self._increment_addr(addr_pattern, addr)
if out[-1] == ',':
out = out[:-1]
return out + ']'
[docs]class QPCIBus(QSparseBus):
"""
PCI Bus representation (bus&addr, uses hex digits)
"""
def __init__(self, busid, bus_type, aobject=None, length=32, first_port=0):
""" bus&addr, 32 slots """
super(QPCIBus, self).__init__('bus', [['addr', 'func'], [length, 8]],
busid, bus_type, aobject)
self.first_port = (first_port, 0)
@staticmethod
def _addr2stor(addr):
""" force all items as hexadecimal values """
out = ""
for value in addr:
if value is None:
out += '*-'
else:
out += '%02x-' % value
if out:
return out[:-1]
else:
return "*"
def _dev2addr(self, device):
""" Read the values in base of 16 (hex) """
addr = device.get_param('addr')
if isinstance(addr, int): # only addr
return [addr, 0]
elif not addr: # not defined
return [None, 0]
elif isinstance(addr, str): # addr or addr.func
addr = [int(_, 16) for _ in addr.split('.', 1)]
if len(addr) < 2: # only addr
addr.append(0)
return addr
def _set_device_props(self, device, addr):
""" Convert addr to the format used by qtree """
device.set_param(self.bus_item, self.busid)
orig_addr = device.get_param('addr')
if addr[1] or (isinstance(orig_addr, str) and
orig_addr.find('.') != -1):
device.set_param('addr', '%02x.%x' % (addr[0], addr[1]))
else:
device.set_param('addr', '%02x' % (addr[0]))
def _update_device_props(self, device, addr):
""" Always set properties """
self._set_device_props(device, addr)
def _increment_addr(self, addr, last_addr=None):
""" Don't use multifunction address by default """
if addr[1] is None:
addr[1] = 0
return super(QPCIBus, self)._increment_addr(addr, last_addr=last_addr)
[docs]class QPCISwitchBus(QPCIBus):
"""
PCI Switch bus representation (creates downstream device while inserting
a device).
"""
def __init__(self, busid, bus_type, downstream_type, aobject=None):
super(QPCISwitchBus, self).__init__(busid, bus_type, aobject)
self.__downstream_ports = {}
self.__downstream_type = downstream_type
[docs] def add_downstream_port(self, addr):
"""
Add downstream port of the certain address
"""
if addr not in self.__downstream_ports:
bus_id = "%s.%s" % (self.busid, int(addr, 16))
bus = QPCIBus(bus_id, 'PCIE', bus_id)
self.__downstream_ports[addr] = bus
downstream = qdevices.QDevice(self.__downstream_type,
{'id': bus_id,
'bus': self.busid,
'addr': addr},
aobject=self.aobject,
parent_bus={'busid': '_PCI_CHASSIS'},
child_bus=bus)
return downstream
def _insert(self, device, addr):
"""
Instead of the device inserts the downstream port. The device is
inserted later during _set_device_props into this downstream port.
"""
_addr = addr.split('-')[0]
added_devices = []
downstream = self.add_downstream_port(_addr)
if downstream is not None:
added_devices.append(downstream)
added_devices.extend(super(QPCISwitchBus, self)._insert(downstream,
addr))
bus_id = "%s.%s" % (self.busid, int(_addr, 16))
device['bus'] = bus_id
return added_devices
def _set_device_props(self, device, addr):
"""
Instead of setting the addr this insert the device into the
downstream port.
"""
self.__downstream_ports['%02x' % addr[0]].insert(device)
[docs]class QSCSIBus(QSparseBus):
"""
SCSI bus representation (bus + 2 leves, don't iterate over lun by default)
"""
def __init__(self, busid, bus_type, addr_spec, aobject=None, atype=None):
"""
:param busid: id of the bus (mybus.0)
:param bus_type: type of the bus (virtio-scsi-pci, lsi53c895a, ...)
:param addr_spec: Ranges of addr_spec [scsiid_range, lun_range]
:param aobject: Related autotest object (image1)
:param atype: Autotest bus type
:type atype: str
"""
super(QSCSIBus, self).__init__('bus', [['scsi-id', 'lun'], addr_spec],
busid, bus_type, aobject, atype)
def _increment_addr(self, addr, last_addr=None):
"""
Qemu doesn't increment lun automatically so don't use it when
it's not explicitelly specified.
"""
if addr[1] is None:
addr[1] = 0
return super(QSCSIBus, self)._increment_addr(addr, last_addr=last_addr)
[docs]class QBusUnitBus(QDenseBus):
""" Implementation of bus-unit bus (ahci, ide) """
def __init__(self, busid, bus_type, lengths, aobject=None, atype=None):
"""
:param busid: id of the bus (mybus.0)
:type busid: str
:param bus_type: type of the bus (ahci)
:type bus_type: str
:param lenghts: lenghts of [buses, units]
:type lenghts: list of lists
:param aobject: Related autotest object (image1)
:type aobject: str
:param atype: Autotest bus type
:type atype: str
"""
if len(lengths) != 2:
raise ValueError("len(lenghts) have to be 2 (%s)" % self)
super(QBusUnitBus, self).__init__('bus', [['bus', 'unit'], lengths],
busid, bus_type, aobject, atype)
def _update_device_props(self, device, addr):
""" Always set the properties """
return self._set_device_props(device, addr)
def _set_device_props(self, device, addr):
"""This bus is compound of m-buses + n-units, set properties """
device.set_param('bus', "%s.%s" % (self.busid, addr[0]))
device.set_param('unit', addr[1])
def _check_bus(self, device):
""" This bus is compound of m-buses + n-units, check correct busid """
bus = device.get_param('bus')
if isinstance(bus, str):
bus = bus.rsplit('.', 1)
if len(bus) == 2 and bus[0] != self.busid: # aaa.3
return False
elif not bus[0].isdigit() and bus[0] != self.busid: # aaa
return False
return True # None, 5, '3'
def _dev2addr(self, device):
""" This bus is compound of m-buses + n-units, parse addr from dev """
bus = None
unit = None
busid = device.get_param('bus')
if isinstance(busid, str):
if busid.isdigit():
bus = int(busid)
else:
busid = busid.rsplit('.', 1)
if len(busid) == 2 and busid[1].isdigit():
bus = int(busid[1])
if isinstance(busid, int):
bus = busid
if device.get_param('unit'):
unit = int(device.get_param('unit'))
return [bus, unit]
[docs]class QAHCIBus(QBusUnitBus):
""" AHCI bus (ich9-ahci, ahci) """
def __init__(self, busid, aobject=None):
""" 6xbus, 2xunit """
super(QAHCIBus, self).__init__(busid, 'IDE', [6, 1], aobject, 'ahci')
[docs]class QIDEBus(QBusUnitBus):
""" IDE bus (piix3-ide) """
def __init__(self, busid, aobject=None):
""" 2xbus, 2xunit """
super(QIDEBus, self).__init__(busid, 'IDE', [2, 2], aobject, 'ide')
[docs]class QFloppyBus(QDenseBus):
"""
Floppy bus (-global isa-fdc.drive?=$drive)
"""
def __init__(self, busid, aobject=None):
""" property <= [driveA, driveB] """
super(QFloppyBus, self).__init__(None, [['property'], [2]], busid,
'floppy', aobject)
@staticmethod
def _addr2stor(addr):
""" translate as drive$CHAR """
return "drive%s" % chr(65 + addr[0]) # 'A' + addr
def _dev2addr(self, device):
""" Read None, number or drive$CHAR and convert to int() """
addr = device.get_param('property')
if isinstance(addr, str):
if addr.startswith('drive') and len(addr) > 5:
addr = ord(addr[5])
elif addr.isdigit():
addr = int(addr)
return [addr]
def _update_device_props(self, device, addr):
""" Always set props """
self._set_device_props(device, addr)
def _set_device_props(self, device, addr):
""" Change value to drive{A,B,...} """
device.set_param('property', self._addr2stor(addr))
[docs]class QOldFloppyBus(QDenseBus):
"""
Floppy bus (-drive index=n)
"""
def __init__(self, busid, aobject=None):
""" property <= [driveA, driveB] """
super(QOldFloppyBus, self).__init__(None, [['index'], [2]], busid,
'floppy', aobject)
def _update_device_props(self, device, addr):
""" Always set props """
self._set_device_props(device, addr)
def _set_device_props(self, device, addr):
""" Change value to drive{A,B,...} """
device.set_param('index', self._addr2stor(addr))