Source code for virttest.libvirt_xml.network_xml

"""
Module simplifying manipulation of XML described at
http://libvirt.org/formatnetwork.html
"""

import logging
from virttest import xml_utils
from virttest.libvirt_xml import base, xcepts, accessors


[docs]class RangeList(list): """ A list of start & end address tuples """ def __init__(self, iterable=None): """ Initialize from list/tuple of two-item tuple start/end address strings """ x_str = "iterable must contain two-item tuples of start/end addresses" newone = [] for item in iterable: if not issubclass(type(item), tuple): raise xcepts.LibvirtXMLError(x_str) if len(item) is not 2: raise xcepts.LibvirtXMLError(x_str) # Assume strings will be validated elsewhere newone.append(tuple(item)) super(RangeList, self).__init__(newone)
[docs] def append_to_element(self, element): """ Adds range described by instance to ElementTree.element """ if not issubclass(type(element), xml_utils.ElementTree.Element): raise ValueError( "Element is not a ElementTree.Element or subclass") for start, end in self: serange = {'start': start, 'end': end} element.append(xml_utils.ElementTree.Element('range', serange))
[docs]class IPXML(base.LibvirtXMLBase): """ IP address block, optionally containing DHCP range information Properties: dhcp_ranges: Dict. keys: start, end host_attr: host mac, name and ip information address: string IP address netmask: string IP's netmask """ __slots__ = ('dhcp_ranges', 'address', 'netmask', 'hosts', 'family', 'prefix', 'tftp_root', 'dhcp_bootp') def __init__(self, address='192.168.122.1', netmask='255.255.255.0', virsh_instance=base.virsh): """ Create new IPXML instance based on address/mask """ accessors.XMLAttribute( 'address', self, parent_xpath='/', tag_name='ip', attribute='address') accessors.XMLAttribute( 'netmask', self, parent_xpath='/', tag_name='ip', attribute='netmask') accessors.XMLAttribute( 'family', self, parent_xpath='/', tag_name='ip', attribute='family') accessors.XMLAttribute( 'prefix', self, parent_xpath='/', tag_name='ip', attribute='prefix') accessors.XMLAttribute( 'tftp_root', self, parent_xpath='/', tag_name='tftp', attribute='root') accessors.XMLAttribute( 'dhcp_bootp', self, parent_xpath='/dhcp', tag_name='bootp', attribute='file') accessors.XMLElementDict('dhcp_ranges', self, parent_xpath='/dhcp', tag_name='range') accessors.XMLElementList('hosts', self, parent_xpath='/dhcp', marshal_from=self.marshal_from_host, marshal_to=self.marshal_to_host) super(IPXML, self).__init__(virsh_instance=virsh_instance) self.xml = u"<ip address='%s' netmask='%s'></ip>" % (address, netmask) @staticmethod
[docs] def marshal_from_host(item, index, libvirtxml): """Convert a dictionary into a tag + attributes""" del index # not used del libvirtxml # not used if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of host " "attributes, not a %s" % str(item)) return ('host', dict(item)) # return copy of dict, not reference
@staticmethod
[docs] def marshal_to_host(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes into a dictionary""" del index # not used del libvirtxml # not used if tag != 'host': return None # skip this one return dict(attr_dict) # return copy of dict, not reference
[docs]class DNSXML(base.LibvirtXMLBase): """ IP address block, optionally containing DHCP range information Properties: txt: Dict. keys: name, value forwarder: List srv: Dict. keys: service, protocol, domain, tartget, port, priority, weight hosts: List of host name """ __slots__ = ('dns_forward', 'txt', 'forwarders', 'srv', 'host') def __init__(self, virsh_instance=base.virsh): """ Create new IPXML instance based on address/mask """ accessors.XMLElementDict('txt', self, parent_xpath='/', tag_name='txt') accessors.XMLElementDict('srv', self, parent_xpath='/', tag_name='srv') accessors.XMLElementList('forwarders', self, parent_xpath='/', marshal_from=self.marshal_from_forwarder, marshal_to=self.marshal_to_forwarder) accessors.XMLAttribute('dns_forward', self, parent_xpath='/', tag_name='dns', attribute='forwardPlainNames') accessors.XMLElementNest('host', self, parent_xpath='/', tag_name='host', subclass=DNSXML.HostXML, subclass_dargs={ 'virsh_instance': virsh_instance}) super(DNSXML, self).__init__(virsh_instance=virsh_instance) self.xml = u"<dns></dns>"
[docs] class HostnameXML(base.LibvirtXMLBase): """ Hostname element of dns """ __slots__ = ('hostname',) def __init__(self, virsh_instance=base.virsh): """ Create new HostnameXML instance """ accessors.XMLElementText('hostname', self, parent_xpath='/', tag_name='hostname') super(DNSXML.HostnameXML, self).__init__(virsh_instance=virsh_instance) self.xml = '<hostname/>'
[docs] class HostXML(base.LibvirtXMLBase): """ Hostname element of dns """ __slots__ = ('host_ip', 'hostnames',) def __init__(self, virsh_instance=base.virsh): """ Create new TimerXML instance """ accessors.XMLAttribute('host_ip', self, parent_xpath='/', tag_name='host', attribute='ip') accessors.XMLElementList('hostnames', self, parent_xpath='/', marshal_from=self.marshal_from_hostname, marshal_to=self.marshal_to_hostname) super(DNSXML.HostXML, self).__init__(virsh_instance=virsh_instance) self.xml = '<host/>' @staticmethod
[docs] def marshal_from_hostname(item, index, libvirtxml): """Convert a HostnameXML instance into a tag + attributes""" del index # not used del libvirtxml # not used if isinstance(item, str): return ("hostname", {}, item) else: raise xcepts.LibvirtXMLError("Expected a str attributes," " not a %s" % str(item))
@staticmethod
[docs] def marshal_to_hostname(tag, attr, index, libvirtxml, text): """Convert a tag + attributes into a HostnameXML instance""" del attr # not used del index # not used if tag != 'hostname': return None # Don't convert this item newone = DNSXML.HostnameXML(virsh_instance=libvirtxml.virsh) newone.hostname = text return newone
[docs] def new_host(self, **dargs): """ Return a new disk IOTune instance and set properties from dargs """ new_one = DNSXML.HostXML(virsh_instance=self.virsh) for key, value in dargs.items(): setattr(new_one, key, value) return new_one
@staticmethod
[docs] def marshal_from_forwarder(item, index, libvirtxml): """Convert a dictionary into a tag + attributes""" del index # not used del libvirtxml # not used if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of host " "attributes, not a %s" % str(item)) return ('forwarder', dict(item)) # return copy of dict, not reference
@staticmethod
[docs] def marshal_to_forwarder(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes into a dictionary""" del index # not used del libvirtxml # not used if tag != 'forwarder': return None # skip this one return dict(attr_dict) # return copy of dict, not reference
[docs]class PortgroupXML(base.LibvirtXMLBase): """ Accessor methods for PortgroupXML class in NetworkXML. Properties: name: string, operates on 'name' attribute of portgroup tag default: string of yes or no, operates on 'default' attribute of portgroup tag virtualport_type: string, operates on 'type' attribute of virtualport tag in portgroup. bandwidth_inbound: dict, operates on inbound tag in bandwidth which is child of portgroup. bandwidth_outbound: dict, operates on outbound tag in bandwidth which is child of portgroup. vlan_tag: dict, operates on vlan tag of portgroup """ __slots__ = ('name', 'default', 'virtualport_type', 'bandwidth_inbound', 'bandwidth_outbound', 'vlan_tag') def __init__(self, virsh_instance=base.virsh): """ Create new PortgroupXML instance. """ accessors.XMLAttribute('name', self, parent_xpath='/', tag_name='portgroup', attribute='name') accessors.XMLAttribute('default', self, parent_xpath='/', tag_name='portgroup', attribute='default') accessors.XMLAttribute('virtualport_type', self, parent_xpath='/', tag_name='virtualport', attribute='type') accessors.XMLElementDict('bandwidth_inbound', self, parent_xpath='/bandwidth', tag_name='inbound') accessors.XMLElementDict('bandwidth_outbound', self, parent_xpath='/bandwidth', tag_name='outbound') accessors.XMLElementDict('vlan_tag', self, parent_xpath='/vlan', tag_name='tag') super(PortgroupXML, self).__init__(virsh_instance=virsh_instance) self.xml = u"<portgroup></portgroup>"
[docs]class NetworkXMLBase(base.LibvirtXMLBase): """ Accessor methods for NetworkXML class. Properties: name: string, operates on XML name tag uuid: string, operates on uuid tag mac: string, operates on address attribute of mac tag ip: string operate on ip/dhcp ranges as IPXML instances forward: dict, operates on forward tag forward_interface: list, operates on forward/interface tag nat_port: dict, operates on nat tag bridge: dict, operates on bridge attributes routes: list, operates on route tag. virtualport_type: string, operates on 'type' attribute of virtualport tag. bandwidth_inbound: dict, operates on inbound under bandwidth. bandwidth_outbound: dict, operates on outbound under bandwidth. portgroup: PortgroupXML instance to access portgroup tag. domain_name: string, operates on name attribute of domain tag dns: DNSXML instance to access dns tag. defined: virtual boolean, callout to virsh methods get: True if libvirt knows network name set: True defines network, False undefines to libvirt del: Undefines network to libvirt active: virtual boolean, callout to virsh methods get: True if network is active to libvirt set: True activates network, False deactivates to libvirt del: Deactivates network to libvirt autostart: virtual boolean, callout to virsh methods get: True if libvirt autostarts network with same name set: True to set autostart, False to unset to libvirt del: Unset autostart to libvirt persistent: virtual boolean, callout to virsh methods get: True if network was defined, False if only created. set: Same as defined property del: Same as defined property """ __slots__ = ('name', 'uuid', 'bridge', 'defined', 'active', 'autostart', 'persistent', 'forward', 'mac', 'ip', 'bandwidth_inbound', 'bandwidth_outbound', 'portgroup', 'dns', 'domain_name', 'nat_port', 'forward_interface', 'routes', 'virtualport_type') __uncompareable__ = base.LibvirtXMLBase.__uncompareable__ + ( 'defined', 'active', 'autostart', 'persistent') __schema_name__ = "network" def __init__(self, virsh_instance=base.virsh): accessors.XMLElementText('name', self, parent_xpath='/', tag_name='name') accessors.XMLElementText('uuid', self, parent_xpath='/', tag_name='uuid') accessors.XMLAttribute('mac', self, parent_xpath='/', tag_name='mac', attribute='address') accessors.XMLElementDict('forward', self, parent_xpath='/', tag_name='forward') accessors.XMLElementList('forward_interface', self, parent_xpath='/forward', marshal_from=self.marshal_from_forward_iface, marshal_to=self.marshal_to_forward_iface) accessors.XMLElementDict('nat_port', self, parent_xpath='/forward/nat', tag_name='port') accessors.XMLElementDict('bridge', self, parent_xpath='/', tag_name='bridge') accessors.XMLElementDict('bandwidth_inbound', self, parent_xpath='/bandwidth', tag_name='inbound') accessors.XMLElementDict('bandwidth_outbound', self, parent_xpath='/bandwidth', tag_name='outbound') accessors.XMLAttribute('domain_name', self, parent_xpath='/', tag_name='domain', attribute='name') accessors.XMLElementNest('dns', self, parent_xpath='/', tag_name='dns', subclass=DNSXML, subclass_dargs={ 'virsh_instance': virsh_instance}) accessors.XMLElementList('routes', self, parent_xpath='/', marshal_from=self.marshal_from_route, marshal_to=self.marshal_to_route) accessors.XMLAttribute('virtualport_type', self, parent_xpath='/', tag_name='virtualport', attribute='type') super(NetworkXMLBase, self).__init__(virsh_instance=virsh_instance) def __check_undefined__(self, errmsg): if not self.defined: raise xcepts.LibvirtXMLError(errmsg)
[docs] def get_defined(self): """ Accessor for 'define' property - does this name exist in network list """ params = {'only_names': True, 'virsh_instance': self.virsh} return self.name in self.virsh.net_state_dict(**params)
[docs] def set_defined(self, value): """Accessor method for 'define' property, set True to define.""" if not self.__super_get__('INITIALIZED'): pass # do nothing value = bool(value) if value: self.virsh.net_define(self.xml) # send it the filename else: del self.defined
[docs] def del_defined(self): """Accessor method for 'define' property, undefines network""" self.__check_undefined__("Cannot undefine non-existant network") self.virsh.net_undefine(self.name)
[docs] def get_active(self): """Accessor method for 'active' property (True/False)""" self.__check_undefined__("Cannot determine activation for undefined " "network") state_dict = self.virsh.net_state_dict(virsh_instance=self.virsh) return state_dict[self.name]['active']
[docs] def set_active(self, value): """Accessor method for 'active' property, sets network active""" if not self.__super_get__('INITIALIZED'): pass # do nothing self.__check_undefined__("Cannot activate undefined network") value = bool(value) if value: if not self.active: self.virsh.net_start(self.name) else: pass # don't activate twice else: if self.active: del self.active else: pass # don't deactivate twice
[docs] def del_active(self): """Accessor method for 'active' property, stops network""" self.__check_undefined__("Cannot deactivate undefined network") if self.active: self.virsh.net_destroy(self.name) else: pass # don't destroy twice
[docs] def get_autostart(self): """Accessor method for 'autostart' property, True if set""" self.__check_undefined__("Cannot determine autostart for undefined " "network") state_dict = self.virsh.net_state_dict(virsh_instance=self.virsh) return state_dict[self.name]['autostart']
[docs] def set_autostart(self, value): """Accessor method for 'autostart' property, sets/unsets autostart""" if not self.__super_get__('INITIALIZED'): pass # do nothing self.__check_undefined__("Cannot set autostart for undefined network") value = bool(value) if value: if not self.autostart: self.virsh.net_autostart(self.name) else: pass # don't set autostart twice else: if self.autostart: del self.autostart else: pass # don't unset autostart twice
[docs] def del_autostart(self): """Accessor method for 'autostart' property, unsets autostart""" if not self.defined: raise xcepts.LibvirtXMLError("Can't autostart nonexistant network") self.virsh.net_autostart(self.name, "--disable")
[docs] def get_persistent(self): """Accessor method for 'persistent' property""" state_dict = self.virsh.net_state_dict(virsh_instance=self.virsh) return state_dict[self.name]['persistent']
# Copy behavior for consistency set_persistent = set_defined del_persistent = del_defined
[docs] def get_ip(self): xmltreefile = self.__dict_get__('xml') try: ip_root = xmltreefile.reroot('/ip') except KeyError, detail: raise xcepts.LibvirtXMLError(detail) ipxml = IPXML(virsh_instance=self.__dict_get__('virsh')) ipxml.xmltreefile = ip_root return ipxml
[docs] def set_ip(self, value): if not issubclass(type(value), IPXML): raise xcepts.LibvirtXMLError("value must be a IPXML or subclass") xmltreefile = self.__dict_get__('xml') # IPXML root element is whole IP element tree root = xmltreefile.getroot() root.append(value.xmltreefile.getroot()) xmltreefile.write()
[docs] def del_ip(self): xmltreefile = self.__dict_get__('xml') element = xmltreefile.find('/ip') if element is not None: xmltreefile.remove(element) xmltreefile.write()
[docs] def get_portgroup(self): try: portgroup_root = self.xmltreefile.reroot('/portgroup') except KeyError, detail: raise xcepts.LibvirtXMLError(detail) portgroup_xml = PortgroupXML(virsh_instance=self.__dict_get__('virsh')) portgroup_xml.xmltreefile = portgroup_root return portgroup_xml
[docs] def set_portgroup(self, value): if not issubclass(type(value), PortgroupXML): raise xcepts.LibvirtXMLError("value must be a PortgroupXML" "instance or subclass.") root = self.xmltreefile.getroot() root.append(value.xmltreefile.getroot()) self.xmltreefile.write()
[docs] def del_portgroup(self): element = self.xmltreefile.find("/portgroup") if element is not None: self.xmltreefile.remove(element) self.xmltreefile.write()
[docs] def new_dns(self, **dargs): """ Return a new dns instance and set properties from dargs """ new_one = DNSXML(virsh_instance=self.virsh) for key, value in dargs.items(): setattr(new_one, key, value) return new_one
@staticmethod
[docs] def marshal_from_forward_iface(item, index, libvirtxml): """Convert a dictionary into a tag + attributes""" del index # not used del libvirtxml # not used if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of interface " "attributes, not a %s" % str(item)) return ('interface', dict(item)) # return copy of dict, not reference
@staticmethod
[docs] def marshal_to_forward_iface(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes into a dictionary""" del index # not used del libvirtxml # not used if tag != 'interface': return None # skip this one return dict(attr_dict) # return copy of dict, not reference
@staticmethod
[docs] def marshal_from_route(item, index, libvirtxml): """Convert a dictionary into a tag + attributes""" del index # not used del libvirtxml # not used if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of interface " "attributes, not a %s" % str(item)) return ('route', dict(item)) # return copy of dict, not reference
@staticmethod
[docs] def marshal_to_route(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes into a dictionary""" del index # not used del libvirtxml # not used if tag != 'route': return None # skip this one return dict(attr_dict) # return copy of dict, not reference
[docs]class NetworkXML(NetworkXMLBase): """ Manipulators of a Virtual Network through it's XML definition. """ __slots__ = [] def __init__(self, network_name='default', virsh_instance=base.virsh): """ Initialize new instance with empty XML """ super(NetworkXML, self).__init__(virsh_instance=virsh_instance) self.xml = u"<network><name>%s</name></network>" % network_name @staticmethod # wraps __new__
[docs] def new_all_networks_dict(virsh_instance=base.virsh): """ Return a dictionary of names to NetworkXML instances for all networks :param virsh: virsh module or instance to use :return: Dictionary of network name to NetworkXML instance """ result = {} # Values should all share virsh property new_netxml = NetworkXML(virsh_instance=virsh_instance) params = {'only_names': True, 'virsh_instance': virsh_instance} networks = new_netxml.virsh.net_state_dict(**params).keys() for net_name in networks: new_copy = new_netxml.copy() new_copy.xml = virsh_instance.net_dumpxml(net_name).stdout.strip() result[net_name] = new_copy return result
@staticmethod
[docs] def new_from_net_dumpxml(network_name, virsh_instance=base.virsh, extra=""): """ Return new NetworkXML instance from virsh net-dumpxml command :param network_name: Name of network to net-dumpxml :param virsh_instance: virsh module or instance to use :return: New initialized NetworkXML instance """ netxml = NetworkXML(virsh_instance=virsh_instance) netxml['xml'] = virsh_instance.net_dumpxml(network_name, extra).stdout.strip() return netxml
@staticmethod
[docs] def get_uuid_by_name(network_name, virsh_instance=base.virsh): """ Return Network's uuid by Network's name. :param network_name: Network's name :return: Network's uuid """ network_xml = NetworkXML.new_from_net_dumpxml(network_name, virsh_instance) return network_xml.uuid
[docs] def debug_xml(self): """ Dump contents of XML file for debugging """ xml = str(self) # LibvirtXMLBase.__str__ returns XML content for debug_line in str(xml).splitlines(): logging.debug("Network XML: %s", debug_line)
[docs] def state_dict(self): """ Return a dict containing states of active/autostart/persistent :return: A dict contains active/autostart/persistent as keys and boolean as values or None if network doesn't exist. """ if self.defined: return self.virsh.net_state_dict(virsh_instance=self.virsh)[self.name]
[docs] def create(self): """ Adds non-persistant / transient network to libvirt with net-create """ self.virsh.net_create(self.xml)
[docs] def orbital_nuclear_strike(self): """It's the only way to really be sure. Remove all libvirt state""" try: self['active'] = False # deactivate (stop) network if active except xcepts.LibvirtXMLError, detail: # inconsequential, network will be removed logging.warning(detail) try: self['defined'] = False # undefine (delete) network if persistent except xcepts.LibvirtXMLError, detail: # network already gone logging.warning(detail)
[docs] def exists(self): """ Return True if network already exists. """ cmd_result = self.virsh.net_uuid(self.name) return (cmd_result.exit_status == 0)
[docs] def undefine(self): """ Undefine network witch name is self.name. """ self.virsh.net_destroy(self.name) cmd_result = self.virsh.net_undefine(self.name) if cmd_result.exit_status: raise xcepts.LibvirtXMLError("Failed to undefine network %s.\n" "Detail: %s" % (self.name, cmd_result.stderr))
[docs] def define(self): """ Define network from self.xml. """ cmd_result = self.virsh.net_define(self.xml) if cmd_result.exit_status: raise xcepts.LibvirtXMLError("Failed to define network %s.\n" "Detail: %s" % (self.name, cmd_result.stderr))
[docs] def start(self): """ Start network with self.virsh. """ cmd_result = self.virsh.net_start(self.name) if cmd_result.exit_status: raise xcepts.LibvirtXMLError("Failed to start network %s.\n" "Detail: %s" % (self.name, cmd_result.stderr))
[docs] def sync(self, state=None): """ Make the change of "self" take effect on network. Recover network to designated state if option state is set. :param state: a boolean dict contains active/persistent/autostart as keys """ if self['defined']: if self['active']: del self['active'] if self['defined']: del self['defined'] self['defined'] = True if state: self['active'] = state['active'] if not state['persistent']: del self['persistent'] if self.defined: self['autostart'] = state['autostart'] else: self['active'] = True self['autostart'] = True