Source code for virttest.libvirt_xml.nwfilter_xml

Module simplifying manipulation of XML described at

from virttest.libvirt_xml import base, xcepts, accessors
from virttest.libvirt_xml.nwfilter_protocols import librarian

[docs]class NwfilterRulesProtocol(list): """ List of protocol instances from classes handed out by librarian.get """ @staticmethod def __type_check__(other): try: # Raise error if object isn't dict-like or doesn't have key device_tag = other['device_tag'] # Check that we have support for this type librarian.get(device_tag) except (AttributeError, TypeError, xcepts.LibvirtXMLError): # Required to always raise TypeError for list API in VMXML class raise TypeError("Unsupported item type: %s" % str(type(other))) def __setitem__(self, key, value): self.__type_check__(value) super(NwfilterRulesProtocol, self).__setitem__(key, value) return self
[docs] def append(self, value): self.__type_check__(value) super(NwfilterRulesProtocol, self).append(value) return self
[docs] def extend(self, iterable): # Make sure __type_check__ happens for item in iterable: self.append(item) return self
[docs] def by_device_tag(self, tag): result = NwfilterRulesProtocol() for protocol in self: if protocol.device_tag == tag: result.append(protocol) return result
[docs]class NwfilterXMLRules(base.LibvirtXMLBase): """ Create new NwfilterXMLRules instance. Properties: rule_action: string, rule action rule_direction: string, rule direction priority: string, rule priority statematch: string, rule statematch """ __slots__ = ('rule_action', 'rule_direction', 'rule_priority', 'rule_statematch') def __init__(self, protocol=None, virsh_instance=base.virsh): accessors.XMLAttribute('rule_action', self, parent_xpath='/', tag_name='rule', attribute='action') accessors.XMLAttribute('rule_direction', self, parent_xpath='/', tag_name='rule', attribute='direction') accessors.XMLAttribute('rule_priority', self, parent_xpath='/', tag_name='rule', attribute='priority') accessors.XMLAttribute('rule_statematch', self, parent_xpath='/', tag_name='rule', attribute='statematch') super(NwfilterXMLRules, self).__init__(virsh_instance=virsh_instance) self.xml = '<rule></rule>'
[docs] def backup_rule(self): """ Return backup rule instance :return: the backup of rule instance """ backup = NwfilterXMLRules(virsh_instance=self.__dict_get__('virsh')) backup.xmltreefile = self.xmltreefile.backup_copy() return backup
[docs] def get_protocol(self, protocol=None): """ Return None if protocol is None, else return specific class instance :param protocol: specific protocol type in rules :return: specific protocol class instance from librarian.get """ if protocol: protocol_class = librarian.get(protocol) protocol_node = self.xmltreefile.getroot().getchildren()[0] protocol_node.tag = protocol new_one = protocol_class.new_from_element(protocol_node) new_one.xmltreefile = self.xmltreefile else: new_one = None return new_one
[docs] def new_protocol(self, **dargs): """ Return a new rule protocol instance and set properties from dargs """ protocol_tag = dargs.get("name") new_one = librarian.get(protocol_tag) for key, value in dargs.items(): setattr(new_one, key, value) return new_one
[docs] def del_protocol(self): """ Delete protocol in rule xml """ protocol_node = self.xmltreefile.getroot().getchildren() if protocol_node: self.xmltreefile.remove(protocol_node[0]) self.xmltreefile.write()
[docs]class NwfilterXMLBase(base.LibvirtXMLBase): """ Accessor methods for NwfilterXML class. Properties: filter_name: string, filter name filter_chain: string, filter name filter_priority: string, filter priority uuid: string, operates on uuid tag filterrefs: list, list of dictionaries describing filterref properties """ __slots__ = base.LibvirtXMLBase.__slots__ + ('filter_name', 'filter_chain', 'filter_priority', 'uuid', 'filterrefs') __uncompareable__ = base.LibvirtXMLBase.__uncompareable__ __schema_name__ = "nwfilter" def __init__(self, virsh_instance=base.virsh): accessors.XMLAttribute('filter_name', self, parent_xpath='/', tag_name='filter', attribute='name') accessors.XMLAttribute('filter_chain', self, parent_xpath='/', tag_name='filter', attribute='chain') accessors.XMLAttribute('filter_priority', self, parent_xpath='/', tag_name='filter', attribute='priority') accessors.XMLElementText('uuid', self, parent_xpath='/', tag_name='uuid') accessors.XMLElementList(property_name='filterrefs', libvirtxml=self, parent_xpath='/', marshal_from=self.marshal_from_filterref, marshal_to=self.marshal_to_filterref) super(NwfilterXMLBase, self).__init__(virsh_instance=virsh_instance) @staticmethod
[docs] def marshal_from_filterref(item, index, libvirtxml): """Convert a dictionary into a tag + attributes""" del index # not used del libvirtxml # not used if not isinstance(item, dict): raise xcepts.LibvirtXMLError("Expected a dictionary of filterref " "attributes, not a %s" % str(item)) return ('filterref', dict(item)) # return copy of dict, not reference
[docs] def marshal_to_filterref(tag, attr_dict, index, libvirtxml): """Convert a tag + attributes into a dictionary""" del index # not used del libvirtxml # not used if tag != 'filterref': return None # skip this one return dict(attr_dict) # return copy of dict, not reference
[docs] def get_rule_index(self, rule_protocol=None): """ Return rule index list for specific protocol :param rule_protocol: the specific protocol type in rules :return: rule index list """ rule_index = [] source_root = self.xmltreefile.findall('rule') for i in range(len(source_root)): if rule_protocol: protocol_node = source_root[i].getchildren()[0] if protocol_node.tag == rule_protocol: rule_index.append(i) else: rule_index.append(i) return rule_index
[docs] def get_rule(self, rule_index=0, rule_protocol=None): """ Return NwfilterXMLRules instance for specific protocol and index :param rule_index: rule's index number :param rule_protocol: the specific protocol type in rules :return: New initialized NwfilterXMLRules instance """ index = self.get_rule_index(rule_protocol) if rule_index not in index: raise xcepts.LibvirtXMLError("rule index %s is not valid" % rule_index) source_root = self.xmltreefile.findall('rule') rulexml = NwfilterXMLRules(virsh_instance=self.__dict_get__('virsh')) rulexml.xmltreefile = self.xmltreefile.backup_copy() rulexml.xmltreefile._setroot(source_root[rule_index]) rulexml.xmltreefile.write() rulexml.xmltreefile.flush() return rulexml
[docs] def del_rule(self, rule_index=0): """ Delete rule with specific index :param rule_index: rule's index number """ source_root = self.xmltreefile.findall('rule') self.xmltreefile.remove(source_root[rule_index]) self.xmltreefile.write()
[docs] def set_rule(self, value, rule_index=0): """ Delete rule with specific index and add new given value :param rule_index: rule's index number :param value: NwfilterXMLRules instance """ if not issubclass(type(value), NwfilterXMLRules): raise xcepts.LibvirtXMLError( "Value must be a NwfilterXMLRules or subclass") try: source_root = self.xmltreefile.findall('rule') except KeyError, detail: raise xcepts.LibvirtXMLError(detail) if source_root[rule_index] is not None: self.del_rule(rule_index) root = self.xmltreefile.getroot() root.insert(rule_index, value.xmltreefile.getroot()) self.xmltreefile.write()
[docs] def add_rule(self, value): """ Add new rule into filter :param value: NwfilterXMLRules instance """ if not issubclass(type(value), NwfilterXMLRules): raise xcepts.LibvirtXMLError( "Value must be a NwfilterXMLRules or subclass") root = self.xmltreefile.getroot() root.append(value.xmltreefile.getroot()) self.xmltreefile.write()
[docs] def get_protocol_attr(self, rule_index=0, protocol=None): """ Return protocol dict of specific rule index and protocol type :param rule_index: rule's index number :param protocol: the specific protocol type in rules :return: protocol attribute dict """ rule = self.get_rule(rule_index, protocol) if protocol: protocol = rule.get_protocol(protocol) attr = protocol.get_attr() else: attr = None return attr
[docs]class NwfilterXML(NwfilterXMLBase): """ Manipulators of a nwfilter through it's XML definition. """ __slots__ = NwfilterXMLBase.__slots__ def __init__(self, virsh_instance=base.virsh): """ Initialize new instance with empty XML """ super(NwfilterXML, self).__init__(virsh_instance=virsh_instance) self.xml = u"<filter></filter>" @staticmethod
[docs] def new_from_filter_dumpxml(uuid, options="", virsh_instance=base.virsh): """ Return new NwfilterXML instance from virsh filter-dumpxml command :param uuid: filter's uuid :param virsh_instance: virsh module or instance to use :return: New initialized NwfilterXML instance """ filter_xml = NwfilterXML(virsh_instance=virsh_instance) filter_xml['xml'] = virsh_instance.nwfilter_dumpxml(uuid, options=options ).stdout.strip() return filter_xml
[docs] def get_all_rules(self): """ Return all rules dict with protocol attribute. :return: all rules dict with key as rule index number """ rule_dict_attr = {} rule_nodes = self.xmltreefile.findall('rule') for i in range(len(rule_nodes)): if rule_nodes[i].getchildren(): protocol_node = rule_nodes[i].getchildren()[0] protocol = protocol_node.tag pro_dict = dict(protocol_node.items()) rule_dict = dict(rule_nodes[i].items()) rule_dict.update(pro_dict) rule_dict['protocol'] = protocol rule_dict_attr[i] = rule_dict else: rule_dict = dict(rule_nodes[i].items()) rule_dict_attr[i] = rule_dict return rule_dict_attr
[docs] def get_rules_dict(self, filter_name, options="", virsh_instance=base.virsh): """ Return all rules dict with protocol attribute for given filter :param filter_name: name or uuid of filter :param options: extra options :return: all rules dictionary with index as key """ filxml = NwfilterXML.new_from_filter_dumpxml(filter_name, virsh_instance=base.virsh) rules = filxml.get_all_rules() return rules
[docs] def get_all_protocols(self, protocol=None): """ Put all type of protocol into a NwfilterRulesProtocol instance. Return all protocols class list if protocol as None, else return specific protocol type class list. :param protocol: specific protocol type in rules :return: NwfilterRulesProtocol instance list """ protocols = NwfilterRulesProtocol() all_rules = self.xmltreefile.findall('rule') for i in all_rules: protocol_node = i.getchildren() if protocol_node: if protocol: # Each rule node only have one protocol node, so # only use protocol_node[0] if protocol_node[0].tag == protocol: protocol_class = librarian.get(protocol) new_one = protocol_class.new_from_element( protocol_node[0]) protocols.device_tag = protocol protocols.append(new_one) else: protocol_class = librarian.get(protocol_node[0].tag) new_one = protocol_class.new_from_element( protocol_node[0]) protocols.device_tag = protocol_node[0].tag protocols.append(new_one) return protocols