Source code for virttest.utils_net_unittest

#!/usr/bin/python

import unittest
import time
import logging
import sys
import random
import os
import shelve

import common
from autotest.client import utils
from autotest.client.shared.test_utils import mock
import utils_net
import utils_misc
import cartesian_config
import utils_params
import propcan


[docs]class FakeVm(object): def __init__(self, vm_name, params): self.name = vm_name self.params = params self.vm_type = self.params.get('vm_type') self.driver_type = self.params.get('driver_type') self.instance = ("%s-%s" % ( time.strftime("%Y%m%d-%H%M%S"), utils_misc.generate_random_string(16)))
[docs] def get_params(self): return self.params
[docs] def is_alive(self): logging.info("Fake VM %s (instance %s)", self.name, self.instance)
[docs]class TestBridge(unittest.TestCase):
[docs] class FakeCmd(object): iter = 0 def __init__(self, *args, **kargs): self.fake_cmds = [ """bridge name bridge id STP enabled interfaces virbr0 8000.52540018638c yes virbr0-nic virbr1 8000.525400c0b080 yes em1 virbr1-nic """, """bridge name bridge id STP enabled interfaces virbr0 8000.52540018638c yes """, """bridge name bridge id STP enabled interfaces """, """bridge name bridge id STP enabled interfaces virbr0 8000.52540018638c yes virbr0-nic virbr2-nic virbr3-nic virbr1 8000.525400c0b080 yes em1 virbr1-nic virbr4-nic virbr5-nic virbr2 8000.525400c0b080 yes em1 virbr10-nic virbr40-nic virbr50-nic """] self.stdout = self.get_stdout() self.__class__.iter += 1
[docs] def get_stdout(self): return self.fake_cmds[self.__class__.iter]
[docs] def setUp(self): self.god = mock.mock_god(ut=self) def utils_run(*args, **kargs): return TestBridge.FakeCmd(*args, **kargs) self.god.stub_with(utils, 'run', utils_run)
[docs] def test_getstructure(self): br = utils_net.Bridge().get_structure() self.assertEqual(br, {'virbr1': {'iface': ['em1', 'virbr1-nic'], 'stp': 'yes'}, 'virbr0': {'iface': ['virbr0-nic'], 'stp': 'yes'}}) br = utils_net.Bridge().get_structure() self.assertEqual(br, {'virbr0': {"iface": [], "stp": "yes"}}) br = utils_net.Bridge().get_structure() self.assertEqual(br, {}) br = utils_net.Bridge().get_structure() self.assertEqual(br, {'virbr2': {"iface": ['em1', 'virbr10-nic', 'virbr40-nic', 'virbr50-nic'], "stp": "yes"}, 'virbr1': {"iface": ['em1', 'virbr1-nic', 'virbr4-nic', 'virbr5-nic'], "stp": "yes"}, 'virbr0': {"iface": ['virbr0-nic', 'virbr2-nic', 'virbr3-nic'], "stp": "yes"}})
[docs] def tearDown(self): self.god.unstub_all()
[docs]class TestVirtIface(unittest.TestCase): VirtIface = utils_net.VirtIface
[docs] def setUp(self): logging.disable(logging.INFO) logging.disable(logging.WARNING) utils_net.VirtIface.LASTBYTE = -1 # Restart count at zero # These warnings are annoying during testing utils_net.VMNet.DISCARD_WARNINGS - 1
[docs] def loop_assert(self, virtiface, test_keys, what_func): for propertea in test_keys: attr_access_value = getattr(virtiface, propertea) can_access_value = virtiface[propertea] get_access_value = virtiface.get(propertea, None) expected_value = what_func(propertea) self.assertEqual(attr_access_value, can_access_value) self.assertEqual(can_access_value, expected_value) self.assertEqual(get_access_value, expected_value)
[docs] def test_half_set(self): def what_func(propertea): return props[propertea] half_prop_end = (len(self.VirtIface.__all_slots__) / 2) + 1 props = {} for propertea in self.VirtIface.__all_slots__[0:half_prop_end]: props[propertea] = utils_misc.generate_random_string(16) virtiface = self.VirtIface(props) self.loop_assert(virtiface, props.keys(), what_func)
[docs] def test_full_set(self): def what_func(propertea): return props[propertea] props = {} for propertea in self.VirtIface.__all_slots__: props[propertea] = utils_misc.generate_random_string(16) virtiface = self.VirtIface(props) self.loop_assert(virtiface, props.keys(), what_func)
[docs] def test_apendex_set(self): """ Verify container ignores unknown key names """ def what_func(propertea): return props[propertea] props = {} for propertea in self.VirtIface.__all_slots__: props[propertea] = utils_misc.generate_random_string(16) more_props = {} for _ in xrange(0, 16): key = utils_misc.generate_random_string(16) value = utils_misc.generate_random_string(16) more_props[key] = value # Keep separated for testing apendex_set = {} apendex_set.update(props) apendex_set.update(more_props) virtiface = self.VirtIface(apendex_set) # str(props) guarantees apendex set wasn't incorporated self.loop_assert(virtiface, props.keys(), what_func)
[docs] def test_mac_completer(self): for test_mac in ['9a', '01:02:03:04:05:06', '00', '1:2:3:4:5:6', '0a:0b:0c:0d:0e:0f', 'A0:B0:C0:D0:E0:F0', "01:02:03:04:05:", "01:02:03:04::", "01:02::::", "00:::::::::::::::::::", ":::::::::::::::::::", ":"]: # Tosses an exception if test_mac can't be completed self.VirtIface.complete_mac_address(test_mac) self.assertRaises(TypeError, self.VirtIface.complete_mac_address, '01:f0:0:ba:r!:00') self.assertRaises(TypeError, self.VirtIface.complete_mac_address, "01:02:03::05:06")
[docs]class TestQemuIface(TestVirtIface):
[docs] def setUp(self): super(TestQemuIface, self).setUp() self.VirtIface = utils_net.QemuIface
[docs]class TestLibvirtIface(TestVirtIface):
[docs] def setUp(self): super(TestLibvirtIface, self).setUp() self.VirtIface = utils_net.LibvirtIface
[docs]class TestVmNetStyle(unittest.TestCase):
[docs] def setUp(self): logging.disable(logging.INFO) logging.disable(logging.WARNING)
[docs] def get_style(self, vm_type, driver_type): return utils_net.VMNetStyle.get_style(vm_type, driver_type)
[docs] def test_default_default(self): style = self.get_style(utils_misc.generate_random_string(16), utils_misc.generate_random_string(16)) self.assertEqual(style['mac_prefix'], '9a') self.assertEqual(style['container_class'], utils_net.QemuIface) self.assert_(issubclass(style['container_class'], utils_net.VirtIface))
[docs] def test_libvirt(self): style = self.get_style('libvirt', utils_misc.generate_random_string(16)) self.assertEqual(style['container_class'], utils_net.LibvirtIface) self.assert_(issubclass(style['container_class'], utils_net.VirtIface))
[docs]class TestVmNet(unittest.TestCase):
[docs] def setUp(self): logging.disable(logging.INFO) logging.disable(logging.WARNING) utils_net.VirtIface.LASTBYTE = -1 # Restart count at zero # These warnings are annoying during testing utils_net.VMNet.DISCARD_WARNINGS - 1
[docs] def test_string_container(self): self.assertRaises(TypeError, utils_net.VMNet, str, ["Foo"])
[docs] def test_VirtIface_container(self): test_data = [ {'nic_name': 'nic1', 'mac': '0a'}, {'nic_name': ''}, # test data index 1 {'foo': 'bar', 'nic_name': 'nic2'}, {'nic_name': 'nic3', 'mac': '01:02:03:04:05:06'} ] self.assertRaises(utils_net.VMNetError, utils_net.VMNet, utils_net.VirtIface, test_data) del test_data[1] vmnet = utils_net.VMNet(utils_net.VirtIface, test_data) self.assertEqual(vmnet[0].nic_name, test_data[0]['nic_name']) self.assertEqual(vmnet['nic1'].__class__, utils_net.VirtIface) self.assertEqual(False, hasattr(vmnet['nic1'], 'mac')) self.assertEqual(False, 'mac' in vmnet['nic1'].keys()) self.assertEqual(vmnet.nic_name_list(), ['nic1', 'nic2', 'nic3']) self.assertEqual(vmnet.nic_name_index('nic2'), 1) self.assertRaises(TypeError, vmnet.nic_name_index, 0) self.assertEqual(True, hasattr(vmnet[2], 'mac')) self.assertEqual(test_data[2]['mac'], vmnet[2]['mac'])
[docs]class TestVmNetSubclasses(unittest.TestCase): nettests_cartesian = (""" variants: - onevm: vms=vm1 - twovms: vms=vm1 vm2 - threevms: vms=vm1 vm2 vm3 variants: - typeundefined: - libvirt: vm_type = libvirt variants: - unsetdrivertype: - xen: driver_type = xen nics=nic1 - qemu: driver_type = qemu nics=nic1 nic2 - kvm: driver_type = kvm nics=nic1 nic2 nic3 - lxc: driver_type = lxc nics=nic1 nic2 nic3 nic4 - qemu: vm_type = qemu variants: - unsetdrivertype: - kvm: driver_type = kvm - qemu: driver_type = qemu variants: -propsundefined: -defaultprops: mac = 9a nic_model = virtio nettype = bridge netdst = virbr0 vlan = 0 -mixedpropsone: mac_nic1 = 9a:01 nic_model_nic1 = rtl8139 nettype_nic1 = bridge netdst_nic1 = virbr1 vlan_nic1 = 1 ip_nic1 = 192.168.122.101 netdev_nic1 = foobar -mixedpropstwo: mac_nic2 = 9a:02 nic_model_nic2 = e1000 nettype_nic2 = network netdst_nic2 = eth2 vlan_nic2 = 2 ip_nic2 = 192.168.122.102 netdev_nic2 = barfoo -mixedpropsthree: mac_nic1 = 01:02:03:04:05:06 mac_nic2 = 07:08:09:0a:0b:0c mac_nic4 = 0d:0e:0f:10:11:12 -mixedpropsthree: nettype_nic3 = bridge netdst_nic3 = virbr3 netdev_nic3 = qwerty """) mac_prefix = "01:02:03:04:05:" db_filename = '/dev/shm/UnitTest_AddressPool' db_item_count = 0 counter = 0 # for printing dots
[docs] def setUp(self): """ Runs before every test """ logging.disable(logging.INFO) logging.disable(logging.WARNING) # MAC generator produces from incrementing byte list # at random starting point (class property). # make sure it starts counting at zero before every test utils_net.VirtIface.LASTBYTE = -1 # These warnings are annoying during testing utils_net.VMNet.DISCARD_WARNINGS - 1 parser = cartesian_config.Parser() parser.parse_string(self.nettests_cartesian) self.CartesianResult = [] for d in parser.get_dicts(): params = utils_params.Params(d) self.CartesianResult.append(params) for vm_name in params.objects('vms'): vm = params.object_params(vm_name) nics = vm.get('nics') if nics and len(nics.split()) > 0: self.db_item_count += 1
[docs] def fakevm_generator(self): for params in self.CartesianResult: for vm_name in params.get('vms').split(): # Return iterator covering all types of vms # in exactly the same order each time. For more info, see: # http://docs.python.org/reference/simple_stmts.html#yield yield FakeVm(vm_name, params)
[docs] def zero_counter(self, increment=100): # rough total, doesn't include the number of vms self.increment = increment self.counter = 0 sys.stdout.write(".") sys.stdout.flush()
[docs] def print_and_inc(self): self.counter += 1 if self.counter >= self.increment: self.counter = 0 sys.stdout.write(".") sys.stdout.flush()
[docs] def test_cmp_Virtnet(self): self.zero_counter() to_test = 600 # Random generator slows this test way down for fakevm1 in self.fakevm_generator(): to_test -= 1 if to_test < 1: break fvm1p = fakevm1.get_params() fakevm1.virtnet = utils_net.VirtNet(fvm1p, fakevm1.name, fakevm1.instance, self.db_filename) if len(fakevm1.virtnet) < 2: continue fakevm2 = FakeVm(fakevm1.name + "_2", fvm1p) fakevm2.virtnet = utils_net.VirtNet(fvm1p, fakevm2.name, fakevm2.instance, self.db_filename) # Verify nic order doesn't matter fvm3p = utils_params.Params(fvm1p.items()) # work on copy nic_list = fvm1p.object_params(fakevm1.name).get( "nics", fvm1p.get('nics', "")).split() random.shuffle(nic_list) fvm3p['nics'] = " ".join(nic_list) fakevm3 = FakeVm(fakevm1.name + "_3", fvm3p) fakevm3.virtnet = utils_net.VirtNet(fvm3p, fakevm3.name, fakevm3.instance, self.db_filename) self.assertTrue(fakevm1.virtnet == fakevm1.virtnet) self.assertTrue(fakevm1.virtnet == fakevm2.virtnet) self.assertTrue(fakevm1.virtnet == fakevm3.virtnet) self.assertTrue(fakevm2.virtnet == fakevm3.virtnet) if len(fakevm1.virtnet) > 1: del fakevm1.virtnet[0] self.assertFalse(fakevm1.virtnet == fakevm2.virtnet) self.assertFalse(fakevm1.virtnet == fakevm3.virtnet) self.assertTrue(fakevm1.virtnet != fakevm2.virtnet) self.assertTrue(fakevm1.virtnet != fakevm3.virtnet) self.print_and_inc()
[docs] def test_01_Params(self): """ Load Cartesian combinatorial result verifies against all styles of VM. Note: There are some cases where the key should NOT be set, in this case an exception is caught prior to verifying """ self.zero_counter() for fakevm in self.fakevm_generator(): test_params = fakevm.get_params() virtnet = utils_net.ParamsNet(test_params, fakevm.name) self.assert_(virtnet.container_class) self.assert_(virtnet.mac_prefix) self.assert_(issubclass(virtnet.__class__, list)) # Assume params actually came from CartesianResult because # Checking this takes a very long time across all combinations param_nics = test_params.object_params(fakevm.name).get( "nics", test_params.get('nics', "")).split() # Size of list should match number of nics configured self.assertEqual(len(param_nics), len(virtnet)) # Test each interface data for virtnet_index in xrange(0, len(virtnet)): # index correspondence already established/asserted virtnet_nic = virtnet[virtnet_index] params_nic = param_nics[virtnet_index] self.assert_(issubclass(virtnet_nic.__class__, propcan.PropCan)) self.assertEqual(virtnet_nic.nic_name, params_nic, "%s != %s" % (virtnet_nic.nic_name, params_nic)) # __slots__ functionality established/asserted elsewhere props_to_check = list(utils_net.VirtIface.__all_slots__) # other tests check mac address handling del props_to_check[props_to_check.index('mac')] for propertea in props_to_check: params_propertea = test_params.object_params(params_nic ).get(propertea) # Double-verify dual-mode access works try: virtnet_propertea1 = getattr(virtnet_nic, propertea) virtnet_propertea2 = virtnet_nic[propertea] except (KeyError, AttributeError): # This style may not support all properties, skip continue # Only check stuff cartesian config actually set if params_propertea: self.assertEqual(params_propertea, virtnet_propertea1) self.assertEqual( virtnet_propertea1, virtnet_propertea2) self.print_and_inc()
[docs] def test_02_db(self): """ Load Cartesian combinatorial result from params into database """ try: os.unlink(self.db_filename) except OSError: pass self.zero_counter() for fakevm in self.fakevm_generator(): test_params = fakevm.get_params() virtnet = utils_net.DbNet(test_params, fakevm.name, self.db_filename, fakevm.instance) self.assert_(hasattr(virtnet, 'container_class')) self.assert_(hasattr(virtnet, 'mac_prefix')) self.assert_(not hasattr(virtnet, 'lock')) self.assert_(not hasattr(virtnet, 'db')) vm_name_params = test_params.object_params(fakevm.name) nic_name_list = vm_name_params.objects('nics') for nic_name in nic_name_list: # nic name is only in params scope nic_dict = {'nic_name': nic_name} nic_params = test_params.object_params(nic_name) # avoid processing unsupported properties proplist = list(virtnet.container_class().__all_slots__) # name was already set, remove from __slots__ list copy del proplist[proplist.index('nic_name')] for propertea in proplist: nic_dict[propertea] = nic_params.get(propertea) virtnet.append(nic_dict) virtnet.update_db() # db shouldn't store empty items self.print_and_inc()
[docs] def test_03_db(self): """ Load from database created in test_02_db, verify data against params """ # Verify on-disk data matches dummy data just written self.zero_counter() db = shelve.open(self.db_filename) db_keys = db.keys() self.assertEqual(len(db_keys), self.db_item_count) for key in db_keys: db_value = eval(db[key], {}, {}) self.assert_(isinstance(db_value, list)) self.assert_(len(db_value) > 0) self.assert_(isinstance(db_value[0], dict)) for nic in db_value: mac = nic.get('mac') if mac: # Another test already checked mac_is_valid behavior self.assert_(utils_net.VirtIface.mac_is_valid(mac)) self.print_and_inc() db.close()
[docs] def test_04_VirtNet(self): """ Populate database with max - 1 mac addresses """ try: os.unlink(self.db_filename) except OSError: pass self.zero_counter(25) # setup() method already set LASTBYTE to '-1' for lastbyte in xrange(0, 0xFF): # test_07_VirtNet demands last byte in name and mac match vm_name = "vm%d" % lastbyte if lastbyte < 16: mac = "%s0%x" % (self.mac_prefix, lastbyte) else: mac = "%s%x" % (self.mac_prefix, lastbyte) params = utils_params.Params({ "nics": "nic1", "vms": vm_name, "mac_nic1": mac, }) virtnet = utils_net.VirtNet(params, vm_name, vm_name, self.db_filename) virtnet.mac_prefix = self.mac_prefix self.assertEqual(virtnet['nic1'].mac, mac) self.assertEqual(virtnet.get_mac_address(0), mac) # Confirm only lower-case macs are stored self.assertEqual(virtnet.get_mac_address(0).lower(), virtnet.get_mac_address(0)) self.assertEqual(virtnet.mac_list(), [mac]) self.print_and_inc()
[docs] def test_05_VirtNet(self): """ Load max - 1 entries from db, overriding params. DEPENDS ON test_04_VirtNet running first """ self.zero_counter(25) # second loop forces db load from disk # also confirming params merge with db data for lastbyte in xrange(0, 0xFF): vm_name = "vm%d" % lastbyte params = utils_params.Params({ "nics": "nic1", "vms": vm_name }) virtnet = utils_net.VirtNet(params, vm_name, vm_name, self.db_filename) if lastbyte < 16: mac = "%s0%x" % (self.mac_prefix, lastbyte) else: mac = "%s%x" % (self.mac_prefix, lastbyte) self.assertEqual(virtnet['nic1'].mac, mac) self.assertEqual(virtnet.get_mac_address(0), mac) self.print_and_inc()
[docs] def test_06_VirtNet(self): """ Generate last possibly mac and verify value. DEPENDS ON test_05_VirtNet running first """ self.zero_counter(25) # test two nics, second mac generation should fail (pool exhausted) params = utils_params.Params({ "nics": "nic1 nic2", "vms": "vm255" }) virtnet = utils_net.VirtNet(params, 'vm255', 'vm255', self.db_filename) virtnet.mac_prefix = self.mac_prefix self.assertRaises(AttributeError, virtnet.get_mac_address, 'nic1') mac = "%s%x" % (self.mac_prefix, 255) # This will grab the last available address # only try 300 times, guarantees LASTBYTE counter will loop once self.assertEqual(virtnet.generate_mac_address(0, 300), mac) # This will fail allocation self.assertRaises(utils_net.NetError, virtnet.generate_mac_address, 1, 300)
[docs] def test_07_VirtNet(self): """ Release mac from beginning, midle, and end, re-generate + verify value """ self.zero_counter(1) beginning_params = utils_params.Params({ "nics": "nic1 nic2", "vms": "vm0" }) middle_params = utils_params.Params({ "nics": "nic1 nic2", "vms": "vm127" }) end_params = utils_params.Params({ "nics": "nic1 nic2", "vms": "vm255", }) for params in (beginning_params, middle_params, end_params): vm_name = params['vms'] virtnet = utils_net.VirtNet(params, vm_name, vm_name, self.db_filename) virtnet.mac_prefix = self.mac_prefix iface = virtnet['nic1'] last_db_mac_byte = iface.mac_str_to_int_list(iface.mac)[-1] last_vm_name_byte = int(vm_name[2:]) # Sequential generation from test_04_VirtNet guarantee self.assertEqual(last_db_mac_byte, last_vm_name_byte) # only try 300 times, guarantees LASTBYTE counter will loop once self.assertRaises( utils_net.NetError, virtnet.generate_mac_address, 1, 300) virtnet.free_mac_address(0) virtnet.free_mac_address(1) # generate new on nic1 to verify mac_index generator catches it # and to signify database updated after generation virtnet.generate_mac_address(1, 300) last_db_mac_byte = virtnet['nic2'].mac_str_to_int_list( virtnet['nic2'].mac)[-1] self.assertEqual(last_db_mac_byte, last_vm_name_byte) self.assertEqual(virtnet.get_mac_address(1), virtnet[1].mac) self.print_and_inc()
[docs] def test_08_ifname(self): for fakevm in self.fakevm_generator(): # only need to test kvm instance if fakevm.vm_type != 'qemu': continue test_params = fakevm.get_params() virtnet = utils_net.VirtNet(test_params, fakevm.name, fakevm.name) for virtnet_index in xrange(0, len(virtnet)): result = virtnet.generate_ifname(virtnet_index) self.assertEqual(result, virtnet[virtnet_index].ifname) # assume less than 10 nics self.assert_(len(result) < 11) if len(virtnet) == 2: break # no need to test every possible combination
[docs] def test_99_ifname(self): # cleanup try: os.unlink(self.db_filename) except OSError: pass
if __name__ == '__main__': unittest.main()