Source code for virttest.libvirt_storage_unittest

#!/usr/bin/python
import unittest

import common
from autotest.client.utils import CmdResult
import libvirt_storage
from virsh_unittest import FakeVirshFactory

# The output of virsh.pool_list with only default pool
_DEFAULT_POOL = ("Name                 State      Autostart\n"
                 "-----------------------------------------\n"
                 "default              active      yes    \n")

# Set output of virsh.pool_list
global _pools_output
_pools_output = _DEFAULT_POOL


[docs]class PoolTestBase(unittest.TestCase): @staticmethod def _pool_list(option="--all", **dargs): # Bogus output of virsh commands cmd = "virsh pool-list --all" output = _pools_output return CmdResult(cmd, output) @staticmethod def _pool_info(name="default", **dargs): cmd = "virsh pool-info %s" % name default_output = ( "Name: default\n" "UUID: bfe5d630-ec5d-86c2-ecca-8a5210493db7\n" "State: running\n" "Persistent: yes\n" "Autostart: yes\n" "Capacity: 47.93 GiB\n" "Allocation: 36.74 GiB\n" "Available: 11.20 GiB\n") if name == "default": return CmdResult(cmd, default_output) else: return CmdResult(cmd) @staticmethod def _pool_define_as(name="unittest", pool_type="dir", target="/var/tmp", extra="", **dargs): unittest_pool = "unittest inactive no\n" global _pools_output _pools_output = _DEFAULT_POOL + unittest_pool return True @staticmethod def _pool_build(name="unittest", **dargs): return True @staticmethod def _pool_start(name="unittest", **dargs): unittest_pool = "unittest active no\n" global _pools_output _pools_output = _DEFAULT_POOL + unittest_pool return True @staticmethod def _pool_autostart(name="unittest", extra="", **dargs): unittest_pool = "unittest active yes\n" global _pools_output _pools_output = _DEFAULT_POOL + unittest_pool return True @staticmethod def _pool_destroy(name="unittest", **dargs): unittest_pool = "unittest inactive yes\n" global _pools_output _pools_output = _DEFAULT_POOL + unittest_pool return True @staticmethod def _pool_undefine(name="unittest", **dargs): global _pools_output _pools_output = _DEFAULT_POOL return True
[docs] def setUp(self): # To avoid not installed libvirt packages self.bogus_virsh = FakeVirshFactory() # Use defined virsh methods needed in this unittest self.bogus_virsh.__super_set__('pool_list', self._pool_list) self.bogus_virsh.__super_set__('pool_info', self._pool_info) self.bogus_virsh.__super_set__('pool_define_as', self._pool_define_as) self.bogus_virsh.__super_set__('pool_build', self._pool_build) self.bogus_virsh.__super_set__('pool_start', self._pool_start) self.bogus_virsh.__super_set__('pool_destroy', self._pool_destroy) self.bogus_virsh.__super_set__('pool_undefine', self._pool_undefine) self.bogus_virsh.__super_set__('pool_autostart', self._pool_autostart) self.sp = libvirt_storage.StoragePool(virsh_instance=self.bogus_virsh)
[docs]class ExistPoolTest(PoolTestBase):
[docs] def test_exist_pool(self): pools = self.sp.list_pools() assert isinstance(pools, dict) # Test pool_state self.assertTrue(self.sp.pool_state("default") in ['active', 'inactive']) # Test pool_info self.assertNotEqual(self.sp.pool_info("default"), {})
[docs]class NewPoolTest(PoolTestBase):
[docs] def test_dir_pool(self): # Used for auto cleanup self.pool_name = "unittest" global _pools_output self.assertTrue(self.sp.define_dir_pool(self.pool_name, "/var/tmp")) self.assertTrue(self.sp.build_pool(self.pool_name)) self.assertTrue(self.sp.start_pool(self.pool_name)) self.assertTrue(self.sp.set_pool_autostart(self.pool_name)) self.assertTrue(self.sp.delete_pool(self.pool_name))
[docs] def tearDown(self): # Confirm created pool has been cleaned up self.sp.delete_pool(self.pool_name)
[docs]class NotExpectedPoolTest(PoolTestBase):
[docs] def test_not_exist_pool(self): self.assertFalse(self.sp.pool_exists("NOTEXISTPOOL")) assert self.sp.pool_state("NOTEXISTPOOL") is None self.assertEqual(self.sp.pool_info("NOTEXISTPOOL"), {})
if __name__ == "__main__": unittest.main()