Source code for virttest.libvirt_xml.vm_xml

"""
Module simplifying manipulation of XML described at
http://libvirt.org/formatdomain.html
"""

import logging
from autotest.client.shared import error
from virttest import xml_utils
from virttest.libvirt_xml import base, accessors, xcepts
from virttest.libvirt_xml.devices import librarian


[docs]class VMXMLDevices(list): """ List of device instances from classes handed out by librarian.get() """ @staticmethod def __type_check__(other): try: # Raise error if object isn't dict-like or doesn't have key device_tag = other['device_tag'] # Check that we have support for this type librarian.get(device_tag) except (AttributeError, TypeError, xcepts.LibvirtXMLError): # Required to always raise TypeError for list API in VMXML class raise TypeError("Unsupported item type: %s" % str(type(other))) def __setitem__(self, key, value): self.__type_check__(value) super(VMXMLDevices, self).__setitem__(key, value) return self
[docs] def append(self, value): self.__type_check__(value) super(VMXMLDevices, self).append(value) return self
[docs] def extend(self, iterable): # Make sure __type_check__ happens for item in iterable: self.append(item) return self
[docs] def by_device_tag(self, tag): result = VMXMLDevices() for device in self: if device.device_tag == tag: result.append(device) return result
[docs]class VMXMLBase(base.LibvirtXMLBase): """ Accessor methods for VMXML class properties (items in __slots__) Properties: hypervisor_type: string, hypervisor type name get: return domain's type attribute value set: change domain type attribute value del: raise xcepts.LibvirtXMLError vm_name: string, name of the vm get: return text value of name tag set: set text value of name tag del: raise xcepts.LibvirtXMLError uuid: string, uuid string for vm get: return text value of uuid tag set: set text value for (new) uuid tag (unvalidated) del: remove uuid tag vcpu, max_mem, current_mem, iothreads: integers get: returns integer set: set integer del: removes tag dumpcore: string, control guest OS memory dump get: return text value set: set 'on' or 'off' for guest OS memory dump del: removes tag numa_memory: dictionary get: return dictionary of numatune/memory attributes set: set numatune/memory attributes from dictionary del: remove numatune/memory tag numa_memnode: list dict of memnode attributes cellid, mode and nodeset get: return list of dictionary with numatune/memnode attributes set: set multiple numatune/memnode attributes from dictionary list del: remove numatune/memnode tag on_poweroff: string, action to take when the guest requests a poweroff get: returns text value of on_poweroff tag set: set test of on_poweroff tag del: remove on_poweroff tag on_reboot: string, action to take when the guest requests a reboot get: returns text value of on_reboot tag set: set test of on_reboot tag del: remove on_reboot tag on_crash: string, action to take when the guest crashes get: returns text value of on_crash tag set: set test of on_crash tag del: remove on_crash tag devices: VMXMLDevices (list-like) get: returns VMXMLDevices instance for all devices set: Define all devices from VMXMLDevices instance del: remove all devices cputune: VMCPUTuneXML get: return VMCPUTuneXML instance for the domain. set: Define cputune tag from a VMCPUTuneXML instance. del: remove cputune tag cpu: VMCPUXML get: return VMCPUXML instance for the domain. set: Define cpu tag from a VMCPUXML instance. del: remove cpu tag current_vcpu: string, 'current' attribute of vcpu tag get: return a string for 'current' attribute of vcpu set: change 'current' attribute of vcpu del: remove 'current' attribute of vcpu placement: string, 'placement' attribute of vcpu tag get: return a string for 'placement' attribute of vcpu set: change 'placement' attribute of vcpu del: remove 'placement' attribute of vcpu cpuset: string, 'cpuset' attribute of vcpu tag get: return a string for 'cpuset' attribute of vcpu set: change 'cpuset' attribute of vcpu del: remove 'cpuset' attribute of vcpu emulatorpin: string, cpuset value (see man virsh: cpulist) get: return text value of cputune/emulatorpin attributes set: set cputune/emulatorpin attributes from string del: remove cputune/emulatorpin tag features: VMFeaturesXML get: return VMFeaturesXML instances for the domain. set: define features tag from a VMFeaturesXML instances. del: remove features tag mem_backing: VMMemBackingXML get: return VMMemBackingXML instances for the domain. set: define memoryBacking tag from a VMMemBackingXML instances. del: remove memoryBacking tag max_mem_unit: string, 'unit' attribute of memory get: return text value of memory unit attribute set: set memory unit attribute del: remove memory unit attribute current_mem_unit: string, 'unit' attribute of memory get: return text value of current_memory unit attribute set: set current_memory unit attribute del: remove current_memory unit attribute memtune: VMMemTuneXML get: return VMMemTuneXML instance for the domain. set: Define memtune tag from a VMCPUTuneXML instance. del: remove memtune tag """ # Additional names of attributes and dictionary-keys instances may contain __slots__ = ('hypervisor_type', 'vm_name', 'uuid', 'vcpu', 'max_mem', 'current_mem', 'dumpcore', 'numa_memory', 'numa_memnode', 'devices', 'seclabel', 'cputune', 'placement', 'cpuset', 'current_vcpu', 'os', 'cpu', 'pm', 'on_poweroff', 'on_reboot', 'on_crash', 'features', 'mb', 'max_mem_unit', 'current_mem_unit', 'memtune', 'max_mem_rt', 'max_mem_rt_unit', 'max_mem_rt_slots', 'iothreads') __uncompareable__ = base.LibvirtXMLBase.__uncompareable__ __schema_name__ = "domain" def __init__(self, virsh_instance=base.virsh): accessors.XMLAttribute(property_name="hypervisor_type", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='domain', attribute='type') accessors.XMLElementText(property_name="vm_name", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='name') accessors.XMLElementText(property_name="uuid", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='uuid') accessors.XMLElementInt(property_name="iothreads", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='iothreads') accessors.XMLElementInt(property_name="vcpu", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='vcpu') accessors.XMLAttribute(property_name="current_vcpu", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='vcpu', attribute='current') accessors.XMLAttribute(property_name="placement", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='vcpu', attribute='placement') accessors.XMLAttribute(property_name="cpuset", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='vcpu', attribute='cpuset') accessors.XMLElementInt(property_name="max_mem", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='memory') accessors.XMLAttribute(property_name="max_mem_unit", libvirtxml=self, parent_xpath='/', tag_name='memory', attribute='unit') accessors.XMLAttribute(property_name="dumpcore", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='memory', attribute='dumpCore') accessors.XMLElementInt(property_name="current_mem", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='currentMemory') accessors.XMLAttribute(property_name="current_mem_unit", libvirtxml=self, parent_xpath='/', tag_name='currentMemory', attribute='unit') accessors.XMLElementInt(property_name="max_mem_rt", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='maxMemory') accessors.XMLAttribute(property_name="max_mem_rt_slots", libvirtxml=self, parent_xpath='/', tag_name='maxMemory', attribute='slots') accessors.XMLAttribute(property_name="max_mem_rt_unit", libvirtxml=self, parent_xpath='/', tag_name='maxMemory', attribute='unit') accessors.XMLElementNest(property_name='os', libvirtxml=self, parent_xpath='/', tag_name='os', subclass=VMOSXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementDict(property_name="numa_memory", libvirtxml=self, forbidden=None, parent_xpath='numatune', tag_name='memory') accessors.XMLElementList(property_name="numa_memnode", libvirtxml=self, parent_xpath='numatune', marshal_from=self.marshal_from_memnode, marshal_to=self.marshal_to_memnode) accessors.XMLElementNest(property_name='cputune', libvirtxml=self, parent_xpath='/', tag_name='cputune', subclass=VMCPUTuneXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementNest(property_name='cpu', libvirtxml=self, parent_xpath='/', tag_name='cpu', subclass=VMCPUXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementNest(property_name='pm', libvirtxml=self, parent_xpath='/', tag_name='pm', subclass=VMPMXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementText(property_name="on_poweroff", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='on_poweroff') accessors.XMLElementText(property_name="on_reboot", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='on_reboot') accessors.XMLElementText(property_name="on_crash", libvirtxml=self, forbidden=None, parent_xpath='/', tag_name='on_crash') accessors.XMLElementNest(property_name='features', libvirtxml=self, parent_xpath='/', tag_name='features', subclass=VMFeaturesXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementNest(property_name='mb', libvirtxml=self, parent_xpath='/', tag_name='memoryBacking', subclass=VMMemBackingXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementNest(property_name='memtune', libvirtxml=self, parent_xpath='/', tag_name='memtune', subclass=VMMemTuneXML, subclass_dargs={ 'virsh_instance': virsh_instance}) super(VMXMLBase, self).__init__(virsh_instance=virsh_instance) @staticmethod
[docs] def marshal_from_memnode(item, index, libvirtxml): """ Convert a dict to memnode tag and attributes. """ del index del libvirtxml if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of memnode " "attributes, not a %s" % str(item)) return ('memnode', dict(item))
@staticmethod
[docs] def marshal_to_memnode(tag, attr_dict, index, libvirtxml): """ Convert a memnode tag and attributes to a dict. """ del index del libvirtxml if tag != 'memnode': return None return dict(attr_dict)
[docs] def get_devices(self, device_type=None): """ Put all nodes of devices into a VMXMLDevices instance. """ devices = VMXMLDevices() all_devices = self.xmltreefile.find('devices') if device_type is not None: device_nodes = all_devices.findall(device_type) else: device_nodes = all_devices for node in device_nodes: device_tag = node.tag device_class = librarian.get(device_tag) new_one = device_class.new_from_element(node, virsh_instance=self.virsh) devices.append(new_one) return devices
[docs] def set_devices(self, value): """ Define devices based on contents of VMXMLDevices instance """ value_type = type(value) if not issubclass(value_type, VMXMLDevices): raise xcepts.LibvirtXMLError("Value %s Must be a VMXMLDevices or " "subclass not a %s" % (str(value), str(value_type))) # Start with clean slate exist_dev = self.xmltreefile.find('devices') if exist_dev is not None: self.del_devices() if len(value) > 0: devices_element = xml_utils.ElementTree.SubElement( self.xmltreefile.getroot(), 'devices') for device in value: # Separate the element from the tree device_element = device.xmltreefile.getroot() devices_element.append(device_element) self.xmltreefile.write()
[docs] def del_devices(self): """ Remove all devices """ try: self.xmltreefile.remove_by_xpath('/devices', remove_all=True) except (AttributeError, TypeError): pass # Element already doesn't exist self.xmltreefile.write()
[docs] def get_seclabel(self): """ Return seclabel + child attribute dict list or raise LibvirtXML error :return: None if no seclabel in xml, list contains dict of seclabel's attributs and children. """ __children_list__ = ['label', 'baselabel', 'imagelabel'] seclabel_node = self.xmltreefile.findall("seclabel") # no seclabel tag found in xml. if seclabel_node == []: raise xcepts.LibvirtXMLError("Seclabel for this domain does not " "exist") seclabels = [] for i in range(len(seclabel_node)): seclabel = dict(seclabel_node[i].items()) for child_name in __children_list__: child_node = seclabel_node[i].find(child_name) if child_node is not None: seclabel[child_name] = child_node.text seclabels.append(seclabel) return seclabels
[docs] def set_seclabel(self, seclabel_dict_list): """ Set seclabel of vm. Delete all seclabels if seclabel exists, create new seclabels use dict values from given seclabel_dict_list in xmltreefile. """ __attributs_list__ = ['type', 'model', 'relabel'] __children_list__ = ['label', 'baselabel', 'imagelabel'] # check the type of seclabel_dict_list and value. if not isinstance(seclabel_dict_list, list): raise xcepts.LibvirtXMLError("seclabel_dict_list should be a " "instance of list, but not a %s.\n" % type(seclabel_dict_list)) for seclabel_dict in seclabel_dict_list: if not isinstance(seclabel_dict, dict): raise xcepts.LibvirtXMLError("value in seclabel_dict_list" "should be a instance of dict " "but not a %s.\n" % type(seclabel_dict)) seclabel_nodes = self.xmltreefile.findall("seclabel") if seclabel_nodes is not None: for i in range(len(seclabel_nodes)): self.del_seclabel() for i in range(len(seclabel_dict_list)): seclabel_node = xml_utils.ElementTree.SubElement( self.xmltreefile.getroot(), "seclabel") for key, value in seclabel_dict_list[i].items(): if key in __children_list__: child_node = seclabel_node.find(key) if child_node is None: child_node = xml_utils.ElementTree.SubElement( seclabel_node, key) child_node.text = value elif key in __attributs_list__: seclabel_node.set(key, value) else: continue self.xmltreefile.write()
[docs] def del_seclabel(self): """ Remove the seclabel tag from a domain """ try: self.xmltreefile.remove_by_xpath("/seclabel", remove_all=True) except (AttributeError, TypeError): pass # Element already doesn't exist self.xmltreefile.write()
[docs] def set_controller(self, controller_list): """ Set controller of vm. Create new controllers use xmltreefile from given controller_list. """ # check the type of controller_list and value. if not isinstance(controller_list, list): raise xcepts.LibvirtXMLError("controller_element_list should be a" "instance of list, but not a %s.\n" % type(controller_list)) devices_element = self.xmltreefile.find("devices") for contl in controller_list: element = xml_utils.ElementTree.ElementTree( file=contl.xml) devices_element.append(element.getroot()) self.xmltreefile.write()
[docs] def del_controller(self, controller_type=None): """ Delete controllers according controller type :return: None if deleting all controllers """ all_controllers = self.xmltreefile.findall("devices/controller") del_controllers = [] for controller in all_controllers: if controller.get("type") != controller_type: continue del_controllers.append(controller) # no seclabel tag found in xml. if del_controllers == []: logging.debug("Controller %s for this domain does not " "exist" % controller_type) for controller in del_controllers: self.xmltreefile.remove(controller)
[docs]class VMXML(VMXMLBase): """ Higher-level manipulations related to VM's XML or guest/host state """ # Must copy these here or there will be descriptor problems __slots__ = [] def __init__(self, hypervisor_type='kvm', virsh_instance=base.virsh): """ Create new VM XML instance """ super(VMXML, self).__init__(virsh_instance=virsh_instance) # Setup some bare-bones XML to build upon self.xml = u"<domain type='%s'></domain>" % hypervisor_type @staticmethod # static method (no self) needed b/c calls VMXML.__new__
[docs] def new_from_dumpxml(vm_name, options="", virsh_instance=base.virsh): """ Return new VMXML instance from virsh dumpxml command :param vm_name: Name of VM to dumpxml :param virsh_instance: virsh module or instance to use :return: New initialized VMXML instance """ # TODO: Look up hypervisor_type on incoming XML vmxml = VMXML(virsh_instance=virsh_instance) vmxml['xml'] = virsh_instance.dumpxml(vm_name, extra=options).stdout.strip() return vmxml
@staticmethod
[docs] def new_from_inactive_dumpxml(vm_name, options="", virsh_instance=base.virsh): """ Return new VMXML instance of inactive domain from virsh dumpxml command :param vm_name: Name of VM to dumpxml :param options: virsh dumpxml command's options :param virsh_instance: virsh module or instance to use :return: New initialized VMXML instance """ if options.find("--inactive") == -1: options += " --inactive" return VMXML.new_from_dumpxml(vm_name, options, virsh_instance)
@staticmethod
[docs] def get_device_class(type_name): """ Return class that handles type_name devices, or raise exception. """ return librarian.get(type_name)
[docs] def undefine(self, options=None): """Undefine this VM with libvirt retaining XML in instance""" return self.virsh.remove_domain(self.vm_name, options)
[docs] def define(self): """Define VM with virsh from this instance""" result = self.virsh.define(self.xml) if result.exit_status: logging.debug("Define %s failed.\n" "Detail: %s.", self.vm_name, result.stderr) return False return True
[docs] def sync(self, options=None): """Rebuild VM with the config file.""" # If target vm no longer exist, this will raise an exception. try: backup = self.new_from_dumpxml(self.vm_name) except IOError: logging.debug("Failed to backup %s.", self.vm_name) backup = None if not self.undefine(options): raise xcepts.LibvirtXMLError("Failed to undefine %s." % self.vm_name) if not self.define(): if backup: backup.define() raise xcepts.LibvirtXMLError("Failed to define %s, from %s." % (self.vm_name, self.xml))
@staticmethod
[docs] def vm_rename(vm, new_name, uuid=None, virsh_instance=base.virsh): """ Rename a vm from its XML. :param vm: VM class type instance :param new_name: new name of vm :param uuid: new_vm's uuid, if None libvirt will generate. :return: a new VM instance """ if vm.is_alive(): vm.destroy(gracefully=True) vmxml = VMXML.new_from_dumpxml(vm_name=vm.name, virsh_instance=virsh_instance) backup = vmxml.copy() # can't do in-place rename, must operate on XML if not vmxml.undefine(): del vmxml # clean up temporary files raise xcepts.LibvirtXMLError("Error reported while undefining VM") # Alter the XML vmxml.vm_name = new_name if uuid is None: # invalidate uuid so libvirt will regenerate del vmxml.uuid vm.uuid = None else: vmxml.uuid = uuid vm.uuid = uuid # Re-define XML to libvirt logging.debug("Rename %s to %s.", vm.name, new_name) # error message for failed define error_msg = "Error reported while defining VM:\n" try: if not vmxml.define(): raise xcepts.LibvirtXMLError(error_msg + "%s" % vmxml.get('xml')) except error.CmdError, detail: del vmxml # clean up temporary files # Allow exceptions thrown here since state will be undefined backup.define() raise xcepts.LibvirtXMLError(error_msg + "%s" % detail) # Keep names uniform vm.name = new_name return vm
@staticmethod
[docs] def set_pm_suspend(vm_name, mem="yes", disk="yes", virsh_instance=base.virsh): """ Add/set pm suspend Support :params vm_name: Name of defined vm :params mem: Enable suspend to memory :params disk: Enable suspend to disk """ # Build a instance of class VMPMXML. pm = VMPMXML() pm.mem_enabled = mem pm.disk_enabled = disk # Set pm to the new instance. vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) vmxml.pm = pm vmxml.sync()
@staticmethod
[docs] def set_vm_vcpus(vm_name, value, current=None, virsh_instance=base.virsh): """ Convenience method for updating 'vcpu' and 'current' attribute property of a defined VM :param vm_name: Name of defined vm to change vcpu elemnet data :param value: New data value, None to delete. :param current: New current value, None will not change current value """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) if value is not None: if current is not None: try: current_int = int(current) except ValueError: raise xcepts.LibvirtXMLError("Invalid 'current' value '%s'" % current) if current_int > value: raise xcepts.LibvirtXMLError( "The cpu current value %s is larger than max number %s" % (current, value)) else: vmxml['current_vcpu'] = current vmxml['vcpu'] = value # call accessor method to change XML else: # value is None del vmxml.vcpu vmxml.undefine() vmxml.define()
# Temporary files for vmxml cleaned up automatically # when it goes out of scope here. @staticmethod
[docs] def check_cpu_mode(mode): """ Check input cpu mode invalid or not. :param mode: the mode of cpu:'host-model'... """ # Possible values for the mode attribute are: # "custom", "host-model", "host-passthrough" cpu_mode = ["custom", "host-model", "host-passthrough"] if mode.strip() not in cpu_mode: raise xcepts.LibvirtXMLError( "The cpu mode '%s' is invalid!" % mode)
[docs] def get_disk_all(self): """ Return VM's disk from XML definition, None if not set """ disk_nodes = self.xmltreefile.find('devices').findall('disk') disks = {} for node in disk_nodes: dev = node.find('target').get('dev') disks[dev] = node return disks
@staticmethod
[docs] def get_disk_source(vm_name, option="", virsh_instance=base.virsh): """ Get block device of a defined VM's disks. :param vm_name: Name of defined vm. :param option: extra option. """ vmxml = VMXML.new_from_dumpxml(vm_name, option, virsh_instance=virsh_instance) disks = vmxml.get_disk_all() return disks.values()
@staticmethod
[docs] def get_disk_blk(vm_name, virsh_instance=base.virsh): """ Get block device of a defined VM's disks. :param vm_name: Name of defined vm. """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) disks = vmxml.get_disk_all() return disks.keys()
@staticmethod
[docs] def get_disk_count(vm_name, virsh_instance=base.virsh): """ Get count of VM's disks. :param vm_name: Name of defined vm. """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) disks = vmxml.get_disk_all() if disks is not None: return len(disks) return 0
@staticmethod
[docs] def get_disk_attr(vm_name, target, tag, attr, virsh_instance=base.virsh): """ Get value of disk tag attribute for a given target dev. """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) attr_value = None try: disk = vmxml.get_disk_all()[target] if tag in ["driver", "boot", "address", "alias", "source"]: attr_value = disk.find(tag).get(attr) except AttributeError: logging.error("No %s/%s found.", tag, attr) return attr_value
@staticmethod
[docs] def check_disk_exist(vm_name, disk_src, virsh_instance=base.virsh): """ Check if given disk exist in VM. :param vm_name: Domain name. :param disk_src: Domian disk source path or darget dev. :return: True/False """ found = False vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) if not vmxml.get_disk_count(vm_name, virsh_instance=virsh_instance): raise xcepts.LibvirtXMLError("No disk in domain %s." % vm_name) blk_list = vmxml.get_disk_blk(vm_name, virsh_instance=virsh_instance) disk_list = vmxml.get_disk_source(vm_name, virsh_instance=virsh_instance) try: file_list = [] for disk in disk_list: file_list.append(disk.find('source').get('file')) except AttributeError: logging.debug("No 'file' type disk.") if disk_src in file_list + blk_list: found = True return found
@staticmethod
[docs] def check_disk_type(vm_name, disk_src, disk_type, virsh_instance=base.virsh): """ Check if disk type is correct in VM :param vm_name: Domain name. :param disk_src: Domain disk source path :param disk_type: Domain disk type :return: True/False """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) if not vmxml.get_disk_count(vm_name, virsh_instance=virsh_instance): raise xcepts.LibvirtXMLError("No disk in domain %s." % vm_name) disks = vmxml.get_disk_source(vm_name, virsh_instance=virsh_instance) found = False try: for disk in disks: disk_dev = "" if disk_type == "file": disk_dev = disk.find('source').get('file') elif disk_type == "block": disk_dev = disk.find('source').get('dev') if disk_src == disk_dev: found = True except AttributeError: logging.debug("No '%s' type disk." % disk_type) return found
@staticmethod
[docs] def get_disk_serial(vm_name, disk_target, virsh_instance=base.virsh): """ Get disk serial in VM :param vm_name: Domain name. :param disk_target: Domain disk target :return: disk serial """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) if not vmxml.get_disk_count(vm_name, virsh_instance=virsh_instance): raise xcepts.LibvirtXMLError("No disk in domain %s." % vm_name) try: disk = vmxml.get_disk_all()[disk_target] except KeyError: raise xcepts.LibvirtXMLError("Wrong disk target:%s." % disk_target) serial = "" try: serial = disk.find("serial").text except AttributeError: logging.debug("No serial assigned.") return serial
@staticmethod
[docs] def get_disk_address(vm_name, disk_target, virsh_instance=base.virsh): """ Get disk address in VM :param vm_name: Domain name. :param disk_target: Domain disk target :return: disk address """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) if not vmxml.get_disk_count(vm_name, virsh_instance=virsh_instance): raise xcepts.LibvirtXMLError("No disk in domain %s." % vm_name) try: disk = vmxml.get_disk_all()[disk_target] except KeyError: raise xcepts.LibvirtXMLError("Wrong disk target:%s." % disk_target) address_str = "" try: disk_bus = disk.find("target").get("bus") address = disk.find("address") if disk_bus == "virtio": add_type = address.get("type") add_domain = address.get("domain") add_bus = address.get("bus") add_slot = address.get("slot") add_func = address.get("function") address_str = ("%s:%s.%s.%s.%s" % (add_type, add_domain, add_bus, add_slot, add_func)) elif disk_bus in ["ide", "scsi"]: bus = address.get("bus") target = address.get("target") unit = address.get("unit") address_str = "%s:%s.%s.%s" % (disk_bus, bus, target, unit) except AttributeError, e: raise xcepts.LibvirtXMLError("Get wrong attribute: %s" % str(e)) return address_str
@staticmethod
[docs] def get_numa_memory_params(vm_name, virsh_instance=base.virsh): """ Return VM's numa memory setting from XML definition """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) return vmxml.numa_memory
@staticmethod
[docs] def get_numa_memnode_params(vm_name, virsh_instance=base.virsh): """ Return VM's numa memnode setting from XML definition """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) return vmxml.numa_memnode
[docs] def get_primary_serial(self): """ Get a dict with primary serial features. """ xmltreefile = self.__dict_get__('xml') primary_serial = xmltreefile.find('devices').find('serial') serial_features = {} serial_type = primary_serial.get('type') serial_port = primary_serial.find('target').get('port') # Support node here for more features serial_features['serial'] = primary_serial # Necessary features serial_features['type'] = serial_type serial_features['port'] = serial_port return serial_features
@staticmethod
[docs] def set_primary_serial(vm_name, dev_type, port, path=None, virsh_instance=base.virsh): """ Set primary serial's features of vm_name. :param vm_name: Name of defined vm to set primary serial. :param dev_type: the type of ``serial:pty,file...`` :param port: the port of serial :param path: the path of serial, it is not necessary for pty """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) xmltreefile = vmxml.__dict_get__('xml') try: serial = vmxml.get_primary_serial()['serial'] except AttributeError: logging.debug("Can not find any serial, now create one.") # Create serial tree, default is pty serial = xml_utils.ElementTree.SubElement( xmltreefile.find('devices'), 'serial', {'type': 'pty'}) # Create elements of serial target, default port is 0 xml_utils.ElementTree.SubElement(serial, 'target', {'port': '0'}) serial.set('type', dev_type) serial.find('target').set('port', port) # path may not be exist. if path is not None: serial.find('source').set('path', path) else: try: source = serial.find('source') serial.remove(source) except AssertionError: pass # Element not found, already removed. xmltreefile.write() vmxml.set_xml(xmltreefile.name) vmxml.undefine() vmxml.define()
[docs] def get_agent_channels(self): """ Get all qemu guest agent channels """ channels = self.xmltreefile.findall("./devices/channel") ga_channels = [] for channel in channels: target = channel.find('./target') if target is not None: name = target.get('name') if name and name.startswith("org.qemu.guest_agent"): ga_channels.append(channel) return ga_channels
[docs] def set_agent_channel(self, src_path=None, tgt_name='org.qemu.guest_agent.0', ignore_exist=False): """ Add a channel for guest agent if non exists. :param src_path: Source path of the channel :param tgt_name: Target name of the channel :param ignore_exist: Whether add a channel even if another already exists. """ if not ignore_exist and self.get_agent_channels(): logging.debug("Guest agent channel already exists") return if not src_path: src_path = '/var/lib/libvirt/qemu/%s-guest.agent' % self.vm_name channel = self.get_device_class('channel')(type_name='unix') channel.add_source(mode='bind', path=src_path) channel.add_target(type='virtio', name=tgt_name) self.devices = self.devices.append(channel)
[docs] def remove_agent_channels(self): """ Delete all channels for guest agent """ for channel in self.get_agent_channels(): self.xmltreefile.remove(channel)
[docs] def get_iface_all(self): """ Get a dict with interface's mac and node. """ iface_nodes = self.xmltreefile.find('devices').findall('interface') interfaces = {} for node in iface_nodes: mac_addr = node.find('mac').get('address') interfaces[mac_addr] = node return interfaces
@staticmethod
[docs] def get_iface_by_mac(vm_name, mac, virsh_instance=base.virsh): """ Get the interface if mac is matched. :param vm_name: Name of defined vm. :param mac: a mac address. :return: return a dict include main interface's features """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) interfaces = vmxml.get_iface_all() try: interface = interfaces[mac] except KeyError: interface = None if interface is not None: # matched mac exists. iface_type = interface.get('type') source = interface.find('source').get(iface_type) features = {} features['type'] = iface_type features['mac'] = mac features['source'] = source return features else: return None
@staticmethod
[docs] def get_iface_dev(vm_name, virsh_instance=base.virsh): """ Return VM's interface device from XML definition, None if not set """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) ifaces = vmxml.get_iface_all() if ifaces: return ifaces.keys() return None
@staticmethod
[docs] def get_first_mac_by_name(vm_name, virsh_instance=base.virsh): """ Convenience method for getting first mac of a defined VM :param: vm_name: Name of defined vm to get mac """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) xmltreefile = vmxml.__dict_get__('xml') try: iface = xmltreefile.find('devices').find('interface') return iface.find('mac').get('address') except AttributeError: return None
@staticmethod
[docs] def get_iftune_params(vm_name, options="", virsh_instance=base.virsh): """ Return VM's interface tuning setting from XML definition """ vmxml = VMXML.new_from_dumpxml(vm_name, options=options, virsh_instance=virsh_instance) xmltreefile = vmxml.__dict_get__('xml') iftune_params = {} bandwidth = None try: bandwidth = xmltreefile.find('devices/interface/bandwidth') try: iftune_params['inbound'] = bandwidth.find( 'inbound').get('average') iftune_params['outbound'] = bandwidth.find( 'outbound').get('average') except AttributeError: logging.error("Can't find <inbound> or <outbound> element") except AttributeError: logging.error("Can't find <bandwidth> element") return iftune_params
[docs] def get_net_all(self): """ Return VM's net from XML definition, None if not set """ xmltreefile = self.__dict_get__('xml') net_nodes = xmltreefile.find('devices').findall('interface') nets = {} for node in net_nodes: dev = node.find('target').get('dev') nets[dev] = node return nets
@staticmethod
[docs] def set_multiqueues(vm_name, queues, index=0): """ Set multiqueues for interface. :param queues: the count of queues for interface :param index: the index of interface """ driver_params = {'name': "vhost", 'queues': queues} vmxml = VMXML.new_from_dumpxml(vm_name) nets = vmxml.__dict_get__('xml').find('devices').findall('interface') if index >= len(nets): raise xcepts.LibvirtXMLError("Couldn't find %s-th interface for %s" % (index, vm_name)) net = nets[index] iface = vmxml.get_device_class('interface').new_from_element(net) iface.model = "virtio" iface.driver = iface.new_driver(driver_attr=driver_params) # Update devices: Remove all interfaces and attach new one vmxml.__dict_get__('xml').find('devices').remove(net) vmxml.devices = vmxml.devices.append(iface) vmxml.sync()
# TODO re-visit this method after the libvirt_xml.devices.interface module # is implemented @staticmethod
[docs] def get_net_dev(vm_name): """ Get net device of a defined VM's nets. :param vm_name: Name of defined vm. """ vmxml = VMXML.new_from_dumpxml(vm_name) nets = vmxml.get_net_all() if nets is not None: return nets.keys() return None
@staticmethod
[docs] def set_cpu_mode(vm_name, mode='host-model'): """ Set cpu's mode of VM. :param vm_name: Name of defined vm to set cpu mode. :param mode: the mode of cpu:'host-model'... """ vmxml = VMXML.new_from_dumpxml(vm_name) vmxml.check_cpu_mode(mode) xmltreefile = vmxml.__dict_get__('xml') try: cpu = xmltreefile.find('/cpu') logging.debug("Current cpu mode is '%s'!", cpu.get('mode')) cpu.set('mode', mode) except AttributeError: logging.debug("Can not find any cpu, now create one.") cpu = xml_utils.ElementTree.SubElement(xmltreefile.getroot(), 'cpu', {'mode': mode}) xmltreefile.write() vmxml.undefine() vmxml.define()
[docs] def add_device(self, value): """ Add a device into VMXML. :param value: instance of device in libvirt_xml/devices/ """ devices = self.get_devices() for device in devices: if device == value: logging.debug("Device %s is already in VM %s.", value, self) return devices.append(value) self.set_devices(devices)
[docs] def del_device(self, value, by_tag=False): """ Remove a device from VMXML :param value: instance of device in libvirt_xml/devices/ or device tag :param by_tag: Boolean value. True for delete device by tag name """ devices = self.get_devices() not_found = True for device in devices: device_value = device if by_tag: device_value = device.device_tag if device_value == value: not_found = False devices.remove(device) break if not_found: logging.debug("Device %s does not exist in VM %s.", value, self) return self.set_devices(devices)
@staticmethod
[docs] def add_security_info(vmxml, passwd): """ Add passwd for graphic :param vmxml: instance of VMXML :param passwd: Password you want to set """ devices = vmxml.devices graphics_index = devices.index(devices.by_device_tag('graphics')[0]) graphics = devices[graphics_index] graphics.passwd = passwd vmxml.devices = devices vmxml.define()
[docs] def get_graphics_devices(self, type_name=""): """ Get all graphics devices or desired type graphics devices :param type_name: graphic type, vnc or spice """ devices = self.get_devices() graphics_devices = devices.by_device_tag('graphics') graphics_list = [] for graphics_device in graphics_devices: graphics_index = devices.index(graphics_device) graphics = devices[graphics_index] if not type_name: graphics_list.append(graphics) elif graphics.type_name == type_name: graphics_list.append(graphics) return graphics_list
[docs] def remove_all_graphics(self): """ Remove all graphics devices. """ self.remove_all_device_by_type('graphics')
[docs] def remove_all_device_by_type(self, device_type): """ Remove all devices of a given type. :param type: Type name for devices should be removed. """ try: self.xmltreefile.remove_by_xpath( '/devices/%s' % device_type, remove_all=True) except (AttributeError, TypeError): pass # Element already doesn't exist self.xmltreefile.write()
[docs] def add_hostdev(self, source_address, mode='subsystem', hostdev_type='pci', managed='yes', boot_order=None): """ Add a hostdev device to guest. :param source_address: A dict include slot, function, bus, domain """ dev = self.get_device_class('hostdev')() dev.mode = mode dev.hostdev_type = hostdev_type dev.managed = managed if boot_order: dev.boot_order = boot_order dev.source_address = dev.new_source_address(**source_address) self.add_device(dev)
@staticmethod
[docs] def get_blkio_params(vm_name, options="", virsh_instance=base.virsh): """ Return VM's block I/O setting from XML definition """ vmxml = VMXML.new_from_dumpxml(vm_name, options=options, virsh_instance=virsh_instance) xmltreefile = vmxml.__dict_get__('xml') blkio_params = {} try: blkio = xmltreefile.find('blkiotune') try: blkio_params['weight'] = blkio.find('weight').text except AttributeError: logging.error("Can't find <weight> element") except AttributeError: logging.error("Can't find <blkiotune> element") if blkio and blkio.find('device'): blkio_params['device_weights_path'] = \ blkio.find('device').find('path').text blkio_params['device_weights_weight'] = \ blkio.find('device').find('weight').text return blkio_params
@staticmethod
[docs] def get_blkdevio_params(vm_name, options="", virsh_instance=base.virsh): """ Return VM's block I/O tuning setting from XML definition """ vmxml = VMXML.new_from_dumpxml(vm_name, options=options, virsh_instance=virsh_instance) xmltreefile = vmxml.__dict_get__('xml') blkdevio_params = {} iotune = None blkdevio_list = ['total_bytes_sec', 'read_bytes_sec', 'write_bytes_sec', 'total_iops_sec', 'read_iops_sec', 'write_iops_sec'] # Initialize all of arguments to zero for k in blkdevio_list: blkdevio_params[k] = 0 try: iotune = xmltreefile.find('/devices/disk/iotune') for k in blkdevio_list: if iotune.findall(k): blkdevio_params[k] = int(iotune.find(k).text) except AttributeError: xcepts.LibvirtXMLError("Can't find <iotune> element") return blkdevio_params
@staticmethod
[docs] def set_memoryBacking_tag(vm_name, hpgs=True, nosp=False, locked=False, virsh_instance=base.virsh): """ let the guest using hugepages. """ # Create a new memoryBacking tag mb_xml = VMMemBackingXML() mb_xml.nosharepages = nosp mb_xml.locked = locked if hpgs: hpgs = VMHugepagesXML() mb_xml.hugepages = hpgs # Set memoryBacking to the new instance. vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) vmxml.mb = mb_xml vmxml.sync()
@staticmethod
[docs] def del_memoryBacking_tag(vm_name, virsh_instance=base.virsh): """ Remove the memoryBacking tag from a domain """ vmxml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance) try: vmxml.xmltreefile.remove_by_xpath("/memoryBacking", remove_all=True) vmxml.sync() except (AttributeError, TypeError): pass # Element already doesn't exist
[docs] def remove_all_boots(self): """ Remove all OS boots """ try: self.xmltreefile.remove_by_xpath('/os/boot', remove_all=True) except (AttributeError, TypeError): pass # Element already doesn't exist self.xmltreefile.write()
[docs]class VMCPUXML(base.LibvirtXMLBase): """ Higher-level manipulations related to VM's XML(CPU) """ # Must copy these here or there will be descriptor problems __slots__ = ('model', 'vendor', 'feature_list', 'mode', 'match', 'fallback', 'topology', 'numa_cell') def __init__(self, virsh_instance=base.virsh): """ Create new VMCPU XML instance """ # The set action is for test. accessors.XMLAttribute(property_name="mode", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='cpu', attribute='mode') accessors.XMLAttribute(property_name="match", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='cpu', attribute='match') accessors.XMLElementText(property_name="model", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='model') accessors.XMLElementText(property_name="vendor", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='vendor') accessors.XMLAttribute(property_name="fallback", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='model', attribute='fallback') accessors.XMLElementDict(property_name="topology", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='topology') accessors.XMLElementList(property_name="numa_cell", libvirtxml=self, parent_xpath='numa', marshal_from=self.marshal_from_cell, marshal_to=self.marshal_to_cell) # This will skip self.get_feature_list() defined below accessors.AllForbidden(property_name="feature_list", libvirtxml=self) super(VMCPUXML, self).__init__(virsh_instance=virsh_instance) self.xml = '<cpu/>' @staticmethod
[docs] def marshal_from_cell(item, index, libvirtxml): """ Convert a dict to cell tag and attributes. """ del index del libvirtxml if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of cell " "attributes, not a %s" % str(item)) return ('cell', dict(item))
@staticmethod
[docs] def marshal_to_cell(tag, attr_dict, index, libvirtxml): """ Convert a cell tag and attributes to a dict. """ del index del libvirtxml if tag != 'cell': return None return dict(attr_dict)
[docs] def get_feature_list(self): """ Accessor method for feature_list property (in __slots__) """ feature_list = [] xmltreefile = self.__dict_get__('xml') for feature_node in xmltreefile.findall('/feature'): feature_list.append(feature_node) return feature_list
[docs] def get_feature(self, num): """ Get a feature element from feature list by number :return: Feature element """ count = len(self.feature_list) try: num = int(num) return self.feature_list[num] except (ValueError, TypeError): raise xcepts.LibvirtXMLError("Invalid feature number %s" % num) except IndexError: raise xcepts.LibvirtXMLError("Only %d feature(s)" % count)
[docs] def get_feature_name(self, num): """ Get feature name :param num: Number in feature list :return: Feature name """ return self.get_feature(num).get('name')
[docs] def get_feature_policy(self, num): """ Get feature policy :param num: Number in feature list :return: Feature policy """ return self.get_feature(num).get('policy')
[docs] def remove_feature(self, num): """ Remove a feature from xml :param num: Number in feature list """ xmltreefile = self.__dict_get__('xml') node = xmltreefile.getroot() node.remove(self.get_feature(num))
@staticmethod
[docs] def check_feature_name(value): """ Check feature name valid or not. :param value: Feature name :return: True if check pass """ sys_feature = [] cpu_xml_file = open('/proc/cpuinfo', 'r') for line in cpu_xml_file.readline(): if line.find('flags') != -1: feature_names = line.split(':')[1].strip() sys_sub_feature = feature_names.split(' ') sys_feature = list(set(sys_feature + sys_sub_feature)) cpu_xml_file.close() return (value in sys_feature)
[docs] def set_feature(self, num, name='', policy=''): """ Set feature name (and policy) to xml :param num: Number in feature list :param name: New feature name :param policy: New feature policy """ feature_set_node = self.get_feature(num) if name: feature_set_node.set('name', name) if policy: feature_set_node.set('policy', policy)
[docs] def add_feature(self, name, policy=''): """ Add a feature element to xml :param name: New feature name :param policy: New feature policy """ xmltreefile = self.__dict_get__('xml') node = xmltreefile.getroot() feature_node = {'name': name} if policy: feature_node.update({'policy': policy}) xml_utils.ElementTree.SubElement(node, 'feature', feature_node)
[docs]class VMClockXML(VMXML): """ Higher-level manipulations related to VM's XML(Clock) """ # Must copy these here or there will be descriptor problems __slots__ = ('offset', 'timezone', 'adjustment', 'timers') def __init__(self, virsh_instance=base.virsh, offset="utc"): """ Create new VMClock XML instance """ # The set action is for test. accessors.XMLAttribute(property_name="offset", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='clock', attribute='offset') accessors.XMLAttribute(property_name="timezone", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='clock', attribute='timezone') accessors.XMLAttribute(property_name="adjustment", libvirtxml=self, forbidden=[], parent_xpath='/', tag_name='clock', attribute='adjustment') accessors.XMLElementList(property_name="timers", libvirtxml=self, forbidden=[], parent_xpath="/clock", marshal_from=self.marshal_from_timer, marshal_to=self.marshal_to_timer) super(VMClockXML, self).__init__(virsh_instance=virsh_instance) # Set default offset for clock self.offset = offset
[docs] def from_dumpxml(self, vm_name, virsh_instance=base.virsh): """Helper to load xml from domain.""" self.xml = VMXML.new_from_dumpxml(vm_name, virsh_instance=virsh_instance).xml
# Sub-element of clock
[docs] class TimerXML(VMXML): """Timer element of clock""" __slots__ = ('name', 'present', 'track', 'tickpolicy', 'frequency', 'mode', 'catchup_threshold', 'catchup_slew', 'catchup_limit') def __init__(self, virsh_instance=base.virsh, timer_name="tsc"): """ Create new TimerXML instance """ # The set action is for test. accessors.XMLAttribute(property_name="name", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='name') accessors.XMLAttribute(property_name="present", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='present') accessors.XMLAttribute(property_name="track", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='track') accessors.XMLAttribute(property_name="tickpolicy", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='tickpolicy') accessors.XMLAttribute(property_name="frequency", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='frequency') accessors.XMLAttribute(property_name="mode", libvirtxml=self, forbidden=[], parent_xpath='/clock', tag_name='timer', attribute='mode') accessors.XMLAttribute(property_name="catchup_threshold", libvirtxml=self, forbidden=[], parent_xpath='/clock/timer', tag_name='catchup', attribute='threshold') accessors.XMLAttribute(property_name="catchup_slew", libvirtxml=self, forbidden=[], parent_xpath='/clock/timer', tag_name='catchup', attribute='slew') accessors.XMLAttribute(property_name="catchup_limit", libvirtxml=self, forbidden=[], parent_xpath='/clock/timer', tag_name='catchup', attribute='limit') super(VMClockXML.TimerXML, self).__init__(virsh_instance=virsh_instance) # name is mandatory for timer self.name = timer_name
[docs] def update(self, attr_dict): for attr, value in attr_dict.items(): setattr(self, attr, value)
@staticmethod
[docs] def marshal_from_timer(item, index, libvirtxml): """Convert a TimerXML instance into tag + attributes""" del index del libvirtxml timer = item.xmltreefile.find("clock/timer") try: return (timer.tag, dict(timer.items())) except AttributeError: # Didn't find timer raise xcepts.LibvirtXMLError("Expected a list of timer " "instances, not a %s" % str(item))
@staticmethod
[docs] def marshal_to_timer(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes to a TimerXML instance""" del index if tag == 'timer': newone = VMClockXML.TimerXML(virsh_instance=libvirtxml.virsh) newone.update(attr_dict) return newone else: return None
[docs]class VMCPUTuneXML(base.LibvirtXMLBase): """ CPU tuning tag XML class Elements: vcpupins: list of dict - vcpu, cpuset emulatorpin: attribute - cpuset shares: int period: int quota: int emulator_period: int emulator_quota: int """ __slots__ = ('vcpupins', 'emulatorpin', 'shares', 'period', 'quota', 'emulator_period', 'emulator_quota') def __init__(self, virsh_instance=base.virsh): accessors.XMLElementList('vcpupins', self, parent_xpath='/', marshal_from=self.marshal_from_vcpupins, marshal_to=self.marshal_to_vcpupins) accessors.XMLAttribute('emulatorpin', self, parent_xpath='/', tag_name='emulatorpin', attribute='cpuset') for slot in self.__all_slots__: if slot in ('shares', 'period', 'quota', 'emulator_period', 'emulator_quota'): accessors.XMLElementInt(slot, self, parent_xpath='/', tag_name=slot) super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<cputune/>' @staticmethod
[docs] def marshal_from_vcpupins(item, index, libvirtxml): """ Convert a dict to vcpupin tag and attributes. """ del index del libvirtxml if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of host " "attributes, not a %s" % str(item)) return ('vcpupin', dict(item))
@staticmethod
[docs] def marshal_to_vcpupins(tag, attr_dict, index, libvirtxml): """ Convert a vcpupin tag and attributes to a dict. """ del index del libvirtxml if tag != 'vcpupin': return None return dict(attr_dict)
[docs]class VMOSXML(base.LibvirtXMLBase): """ Class to access <os> tag of domain XML. Elements: type: text attributes - arch, machine loader: path boots: list attributes - dev bootmenu: attributes - enable smbios: attributes - mode bios: attributes - useserial, rebootTimeout init: text bootloader: text bootloader_args: text kernel: text initrd: text cmdline: text dtb: text TODO: initargs: list """ __slots__ = ('type', 'arch', 'machine', 'loader', 'boots', 'bootmenu_enable', 'smbios_mode', 'bios_useserial', 'bios_reboot_timeout', 'init', 'bootloader', 'bootloader_args', 'kernel', 'initrd', 'cmdline', 'dtb', 'initargs') def __init__(self, virsh_instance=base.virsh): accessors.XMLElementText('type', self, parent_xpath='/', tag_name='type') accessors.XMLElementText('loader', self, parent_xpath='/', tag_name='loader') accessors.XMLAttribute('arch', self, parent_xpath='/', tag_name='type', attribute='arch') accessors.XMLAttribute('machine', self, parent_xpath='/', tag_name='type', attribute='machine') accessors.XMLElementList('boots', self, parent_xpath='/', marshal_from=self.marshal_from_boots, marshal_to=self.marshal_to_boots) accessors.XMLAttribute('bootmenu_enable', self, parent_xpath='/', tag_name='bootmenu', attribute='enable') accessors.XMLAttribute('smbios_mode', self, parent_xpath='/', tag_name='smbios', attribute='mode') accessors.XMLAttribute('bios_useserial', self, parent_xpath='/', tag_name='bios', attribute='useserial') accessors.XMLAttribute('bios_reboot_timeout', self, parent_xpath='/', tag_name='bios', attribute='rebootTimeout') accessors.XMLElementText('bootloader', self, parent_xpath='/', tag_name='bootloader') accessors.XMLElementText('bootloader_args', self, parent_xpath='/', tag_name='bootloader_args') accessors.XMLElementText('kernel', self, parent_xpath='/', tag_name='kernel') accessors.XMLElementText('initrd', self, parent_xpath='/', tag_name='initrd') accessors.XMLElementText('cmdline', self, parent_xpath='/', tag_name='cmdline') accessors.XMLElementText('dtb', self, parent_xpath='/', tag_name='dtb') accessors.XMLElementText('init', self, parent_xpath='/', tag_name='init') super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<os/>' @staticmethod
[docs] def marshal_from_boots(item, index, libvirtxml): """ Convert a string to boot tag and attributes. """ del index del libvirtxml return ('boot', {'dev': item})
@staticmethod
[docs] def marshal_to_boots(tag, attr_dict, index, libvirtxml): """ Convert a boot tag and attributes to a string. """ del index del libvirtxml if tag != 'boot': return None return attr_dict['dev']
[docs]class VMPMXML(base.LibvirtXMLBase): """ VM power management tag XML class Elements: suspend-to-disk: attribute - enabled suspend-to-mem: attribute - enabled """ __slots__ = ('disk_enabled', 'mem_enabled') def __init__(self, virsh_instance=base.virsh): accessors.XMLAttribute('disk_enabled', self, parent_xpath='/', tag_name='suspend-to-disk', attribute='enabled') accessors.XMLAttribute('mem_enabled', self, parent_xpath='/', tag_name='suspend-to-mem', attribute='enabled') super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<pm/>'
[docs]class VMFeaturesXML(base.LibvirtXMLBase): """ Class to access <features> tag of domain XML. Elements: feature_list list of top level element hyperv_relaxed: attribute - state hyperv_vapic: attribute - state hyperv_spinlocks: attributes - state, retries kvm_hidden: attribute - state pvspinlock: attribute - state """ __slots__ = ('feature_list', 'hyperv_relaxed_state', 'hyperv_vapic_state', 'hyperv_spinlocks_state', 'hyperv_spinlocks_retries', 'kvm_hidden_state', 'pvspinlock_state') def __init__(self, virsh_instance=base.virsh): accessors.XMLAttribute(property_name='hyperv_relaxed_state', libvirtxml=self, parent_xpath='/hyperv', tag_name='relaxed', attribute='state') accessors.XMLAttribute(property_name='hyperv_vapic_state', libvirtxml=self, parent_xpath='/hyperv', tag_name='vapic', attribute='state') accessors.XMLAttribute(property_name='hyperv_spinlocks_state', libvirtxml=self, parent_xpath='/hyperv', tag_name='spinlocks', attribute='state') accessors.XMLAttribute(property_name='hyperv_spinlocks_retries', libvirtxml=self, parent_xpath='/hyperv', tag_name='spinlocks', attribute='retries') accessors.XMLAttribute(property_name='kvm_hidden_state', libvirtxml=self, parent_xpath='/kvm', tag_name='hidden', attribute='state') accessors.XMLAttribute(property_name='pvspinlock_state', libvirtxml=self, parent_xpath='/', tag_name='pvspinlock', attribute='state') accessors.AllForbidden(property_name="feature_list", libvirtxml=self) super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<features/>'
[docs] def get_feature_list(self): """ Return all features(top level elements) in xml """ feature_list = [] root = self.__dict_get__('xml').getroot() for feature in root: feature_list.append(feature.tag) return feature_list
[docs] def has_feature(self, name): """ Return true if the given feature exist in xml """ return name in self.get_feature_list()
[docs] def add_feature(self, name, attr_name='', attr_value=''): """ Add a feature element to xml :params name: Feature name """ if self.has_feature(name): logging.debug("Feature %s already exist, so remove it", name) self.remove_feature(name) root = self.__dict_get__('xml').getroot() new_attr = {} if attr_name: new_attr = {attr_name: attr_value} xml_utils.ElementTree.SubElement(root, name, new_attr)
[docs] def remove_feature(self, name): """ Remove a feature element from xml :params name: Feature name """ root = self.__dict_get__('xml').getroot() remove_feature = root.find(name) if remove_feature is None: logging.error("Feature %s doesn't exist", name) else: root.remove(remove_feature)
# Sub-element of memoryBacking
[docs]class VMHugepagesXML(VMXML): """hugepages element""" __slots__ = ('pages',) def __init__(self, virsh_instance=base.virsh): accessors.XMLElementList('pages', libvirtxml=self, forbidden=[], parent_xpath="/", marshal_from=self.marshal_from_page, marshal_to=self.marshal_to_page) super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<hugepages/>' # Sub-element of hugepages
[docs] class PageXML(VMXML): """Page element of hugepages""" __slots__ = ('size', 'unit', 'nodeset') def __init__(self, virsh_instance=base.virsh): """ Create new PageXML instance """ accessors.XMLAttribute(property_name="size", libvirtxml=self, forbidden=[], parent_xpath='/hugepages', tag_name='page', attribute='size') accessors.XMLAttribute(property_name="unit", libvirtxml=self, forbidden=[], parent_xpath='/hugepages', tag_name='page', attribute='unit') accessors.XMLAttribute(property_name="nodeset", libvirtxml=self, forbidden=[], parent_xpath='/hugepages', tag_name='page', attribute='nodeset') super(VMHugepagesXML.PageXML, self).__init__(virsh_instance=virsh_instance)
[docs] def update(self, attr_dict): for attr, value in attr_dict.items(): setattr(self, attr, value)
@staticmethod
[docs] def marshal_from_page(item, index, libvirtxml): """Convert a PageXML instance into tag + attributes""" del index del libvirtxml page = item.xmltreefile.find("/hugepages/page") try: return (page.tag, dict(page.items())) except AttributeError: # Didn't find page raise xcepts.LibvirtXMLError("Expected a list of page " "instances, not a %s" % str(item))
@staticmethod
[docs] def marshal_to_page(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes to a PageXML instance""" del index if tag == 'page': newone = VMHugepagesXML.PageXML(virsh_instance=libvirtxml.virsh) newone.update(attr_dict) return newone else: return None
[docs]class VMMemBackingXML(VMXML): """ memoryBacking tag XML class Elements: hugepages nosharepages locked """ __slots__ = ('hugepages', 'nosharepages', 'locked') def __init__(self, virsh_instance=base.virsh): accessors.XMLElementNest(property_name='hugepages', libvirtxml=self, parent_xpath='/', tag_name='hugepages', subclass=VMHugepagesXML, subclass_dargs={ 'virsh_instance': virsh_instance}) for slot in ('nosharepages', 'locked'): accessors.XMLElementBool(slot, self, parent_xpath='/', tag_name=slot) super(self.__class__, self).__init__(virsh_instance=virsh_instance) self.xml = '<memoryBacking/>'
[docs]class VMMemTuneXML(base.LibvirtXMLBase): """ Memory Tuning tag XML class Element: hard_limit: int hard_limit_unit: attribute soft_limit: int soft_limit_unit: attribute swap_hard_limit: int swap_limit_unit: attribute min_guarantee: int min_guarantee_unit: attribute """ __slots__ = ('hard_limit', 'soft_limit', 'swap_hard_limit', 'min_guarantee', 'hard_limit_unit', 'soft_limit_unit', 'swap_limit_unit', 'min_guarantee_unit') def __init__(self, virsh_instance=base.virsh): accessors.XMLElementInt(property_name='hard_limit', libvirtxml=self, parent_xpath='/', tag_name='hard_limit') accessors.XMLAttribute(property_name="hard_limit_unit", libvirtxml=self, parent_xpath='/', tag_name='hard_limit', attribute='unit') accessors.XMLElementInt(property_name='soft_limit', libvirtxml=self, parent_xpath='/', tag_name='soft_limit') accessors.XMLAttribute(property_name="soft_limit_unit", libvirtxml=self, parent_xpath='/', tag_name='soft_limit', attribute='unit') accessors.XMLElementInt(property_name='swap_hard_limit', libvirtxml=self, parent_xpath='/', tag_name='swap_hard_limit') accessors.XMLAttribute(property_name="swap_limit_unit", libvirtxml=self, parent_xpath='/', tag_name='swap_hard_limit', attribute='unit') accessors.XMLElementInt(property_name='min_guarantee', libvirtxml=self, parent_xpath='/', tag_name='min_guarantee') accessors.XMLAttribute(property_name="min_guarantee_unit", libvirtxml=self, parent_xpath='/', tag_name='min_guarantee', attribute='unit') super(VMMemTuneXML, self).__init__(virsh_instance=virsh_instance) self.xml = '<memtune/>'