"""
Classes and functions to handle block/disk images for libvirt.
This exports:
- two functions for get image/blkdebug filename
- class for image operates and basic parameters
- class for storage pool operations
"""
import re
import logging
import time
from autotest.client.shared import error
import storage
import virsh
[docs]class QemuImg(storage.QemuImg):
"""
libvirt class for handling operations of disk/block images.
"""
def __init__(self, params, root_dir, tag):
"""
Init the default value for image object.
:param params: Dictionary containing the test parameters.
:param root_dir: Base directory for relative filenames.
:param tag: Image tag defined in parameter images.
"""
storage.QemuImg(params, root_dir, tag)
# Please init image_cmd for libvirt in this class
# self.image_cmd =
[docs] def create(self, params):
"""
Create an image.
:param params: Dictionary containing the test parameters.
:note: params should contain:
"""
raise NotImplementedError
[docs] def convert(self, params, root_dir):
"""
Convert image
:param params: A dict
:param root_dir: dir for save the convert image
:note: params should contain:
"""
raise NotImplementedError
[docs] def rebase(self, params):
"""
Rebase image
:param params: A dict
:note: params should contain:
"""
raise NotImplementedError
[docs] def commit(self):
"""
Commit image to it's base file
"""
raise NotImplementedError
[docs] def snapshot_create(self):
"""
Create a snapshot image.
:note: params should contain:
"""
raise NotImplementedError
[docs] def snapshot_del(self, blkdebug_cfg=""):
"""
Delete a snapshot image.
:param blkdebug_cfg: The configure file of blkdebug
:note: params should contain:
snapshot_image_name -- the name of snapshot image file
"""
raise NotImplementedError
[docs] def remove(self):
"""
Remove an image file.
:note: params should contain:
"""
raise NotImplementedError
[docs] def check_image(self, params, root_dir):
"""
Check an image using the appropriate tools for each virt backend.
:param params: Dictionary containing the test parameters.
:param root_dir: Base directory for relative filenames.
:note: params should contain:
:raise VMImageCheckError: In case qemu-img check fails on the image.
"""
raise NotImplementedError
[docs]class StoragePool(object):
"""
Pool Manager for libvirt storage with virsh commands
"""
def __init__(self, virsh_instance=virsh):
# An instance of Virsh class
# Help to setup connection to virsh instance
self.virsh_instance = virsh_instance
[docs] def list_pools(self):
"""
Return a dict include pools' information with structure:
pool_name ==> pool_details(a dict: feature ==> value)
"""
# Allow it raise exception if command has executed failed.
result = self.virsh_instance.pool_list("--all", ignore_status=False)
pools = {}
lines = result.stdout.strip().splitlines()
if len(lines) > 2:
head = lines[0]
lines = lines[2:]
else:
return pools
for line in lines:
details = line.split()
details_dict = {}
head_iter = enumerate(head.split())
while True:
try:
(index, column) = head_iter.next()
except StopIteration:
break
if re.match("[N|n]ame", column):
pool_name = details[index]
else:
details_dict[column] = details[index]
pools[pool_name] = details_dict
return pools
[docs] def pool_exists(self, name):
"""
Check whether pool exists on given libvirt
"""
try:
pools = self.list_pools()
except error.CmdError:
return False
return name in pools
[docs] def pool_state(self, name):
"""
Get pool's state.
:return: active/inactive, and None when something wrong.
"""
try:
pools = self.list_pools()
except error.CmdError:
return None
if self.pool_exists(name):
details_dict = pools[name]
try:
return details_dict['State']
except KeyError:
pass
return None
[docs] def pool_info(self, name):
"""
Get pool's information.
:return: A dict include pool's information:
Name ==> value
UUID ==> value
...
"""
info = {}
try:
result = self.virsh_instance.pool_info(name, ignore_status=False)
except error.CmdError:
return info
for line in result.stdout.splitlines():
params = line.split(':')
if len(params) == 2:
name = params[0].strip()
value = params[1].strip()
info[name] = value
return info
[docs] def get_pool_uuid(self, name):
"""
Get pool's uuid.
:return: Pool uuid.
"""
return self.pool_info(name)["UUID"]
[docs] def is_pool_active(self, name):
"""
Check whether pool is active on given libvirt
"""
if self.pool_state(name) == "active":
return True
return False
[docs] def is_pool_persistent(self, name):
"""
Check whether pool is persistent
"""
if self.pool_info(name)["Persistent"] == "yes":
return True
return False
[docs] def delete_pool(self, name):
"""
Destroy and Delete a pool if it exists on given libvirt
It's reasonable to delete a pool by calling pool-delete.
However, due to pool-delete operation is non-recoverable.
Redhat suggests to achieve this objective by virsh,
1) virsh pool-destroy pool-name
2) virsh pool-undefine pool-name
Please refer to the following URI for more details.
https://access.redhat.com/documentation/en-US
/Red_Hat_Enterprise_Linux/6/html
/Virtualization_Administration_Guide
/chap-Virtualization_Administration_Guide
-Storage_Pools-Storage_Pools.html#delete-ded-disk-storage-pool
"""
if self.is_pool_active(name):
if not self.virsh_instance.pool_destroy(name):
# TODO: Allow pool_destroy to raise exception.
# Because some testcase rely on this function,
# I should start this work after this module is accepted.
logging.error("Destroy pool '%s' failed.", name)
return False
# Undefine pool anyway
try:
self.virsh_instance.pool_undefine(name, ignore_status=False)
except error.CmdError, detail:
if self.pool_exists(name):
logging.error("Undefine pool '%s' failed:%s", name, detail)
return False
logging.info("Deleted pool '%s'", name)
return True
[docs] def set_pool_autostart(self, name, extra=""):
"""
Set given pool as autostart
"""
try:
self.virsh_instance.pool_autostart(name, extra, ignore_status=False)
except error.CmdError:
logging.error("Autostart pool '%s' failed.", name)
return False
logging.info("Set pool '%s' autostart.", name)
return True
[docs] def build_pool(self, name):
"""
Build pool.
"""
try:
self.virsh_instance.pool_build(name, ignore_status=False)
except error.CmdError:
logging.error("Build pool '%s' failed.", name)
return False
logging.info("Built pool '%s'", name)
return True
[docs] def start_pool(self, name):
"""
Start pool if it is inactive.
"""
if self.is_pool_active(name):
logging.info("Pool '%s' is already active.", name)
return True
try:
self.virsh_instance.pool_start(name, ignore_status=False)
except error.CmdError, details:
logging.error("Start pool '%s' failed: %s", name, details)
return False
logging.info("Started pool '%s'", name)
return True
[docs] def destroy_pool(self, name):
"""
Destroy pool if it is active.
"""
if not self.is_pool_active(name):
logging.info("pool '%s' is already inactive.", name)
return True
return self.virsh_instance.pool_destroy(name)
[docs] def define_dir_pool(self, name, target_path):
"""
Define a directory type pool.
"""
try:
self.virsh_instance.pool_define_as(name, "dir", target_path,
ignore_status=False)
except error.CmdError:
logging.error("Define dir pool '%s' failed.", name)
return False
logging.info("Defined pool '%s'", name)
return True
[docs] def define_fs_pool(self, name, block_device, target_path):
"""
Define a filesystem type pool.
"""
try:
self.virsh_instance.pool_define_as(name, "fs", target_path,
extra="--source-dev %s" % block_device,
ignore_status=False)
except error.CmdError:
logging.error("Define fs pool '%s' failed.", name)
return False
logging.info("Defined pool '%s'", name)
return True
[docs] def define_lvm_pool(self, name, block_device, vg_name, target_path):
"""
Define a lvm type pool.
"""
try:
extra = "--source-dev %s --source-name %s" % (block_device,
vg_name)
self.virsh_instance.pool_define_as(name, "logical", target_path,
extra, ignore_status=False)
except error.CmdError:
logging.error("Define logic pool '%s' failed.", name)
return False
logging.info("Defined pool '%s'", name)
return True
[docs] def define_disk_pool(self, name, block_device, target_path):
"""
Define a disk type pool.
"""
try:
extra = "--source-dev %s" % block_device
self.virsh_instance.pool_define_as(name, "disk", target_path,
extra, ignore_status=False)
except error.CmdError:
logging.error("Define disk pool '%s' failed.", name)
return False
logging.info("Defined pool '%s'", name)
return True
[docs] def define_iscsi_pool(self, name, source_host, source_dev, target_path):
"""
Define a iscsi type pool.
"""
try:
extra = "--source-host %s --source-dev %s" % (source_host,
source_dev)
self.virsh_instance.pool_define_as(name, "iscsi", target_path,
extra, ignore_status=False)
except error.CmdError:
logging.error("Define iscsi pool '%s' failed.", name)
return False
logging.info("Define pool '%s'", name)
return True
[docs] def define_netfs_pool(self, name, source_host, source_path, target_path):
"""
Define a netfs type pool.
"""
try:
extra = "--source-host %s --source-path %s" % (source_host,
target_path)
self.virsh_instance.pool_define_as(name, "netfs", target_path,
extra, ignore_status=False)
except error.CmdError:
logging.error("Define netfs pool '%s' failed.", name)
return False
logging.info("Define pool '%s'", name)
return True
[docs] def define_rbd_pool(self, name, source_host, source_name, extra=""):
"""
Define a rbd type pool.
"""
try:
extra = ("--source-host %s --source-name %s %s" %
(source_host, source_name, extra))
self.virsh_instance.pool_define_as(name, "rbd", "",
extra, ignore_status=False)
except error.CmdError:
logging.error("Define rbd pool '%s' failed.", name)
return False
logging.info("Define pool '%s'", name)
return True
[docs]class PoolVolume(object):
"""Volume Manager for libvirt storage pool."""
def __init__(self, pool_name, virsh_instance=virsh):
self.pool_name = pool_name
self.virsh_instance = virsh_instance
[docs] def list_volumes(self):
"""
Return a dict include volumes' name(key) and path(value).
"""
result = self.virsh_instance.vol_list(self.pool_name,
ignore_status=False)
volumes = {}
lines = result.stdout.strip().splitlines()
if len(lines) > 2:
head = lines[0]
lines = lines[2:]
else:
return volumes
for line in lines:
# Path may be not standard unix path
try:
path = re.findall("\s+\S*/.*", line)[0]
except IndexError:
# Do not find a path
path = ""
name = line.split(path)[0].lstrip()
volumes[name] = path.strip()
return volumes
[docs] def volume_exists(self, name):
try:
volumes = self.list_volumes()
except error.CmdError:
return False
return name in volumes
[docs] def volume_info(self, name):
"""
Get volume's information with command vol-info.
"""
info = {}
try:
result = self.virsh_instance.vol_info(name, self.pool_name,
ignore_status=False)
except error.CmdError, detail:
logging.error("Get volume information failed:%s", detail)
return info
for line in result.stdout.strip().splitlines():
attr = line.split(':')[0]
value = line.split("%s:" % attr)[-1].strip()
info[attr] = value
return info
[docs] def create_volume(self, name, capability,
allocation=None, frmt=None):
"""
Create a volume in pool.
"""
if self.volume_exists(name):
logging.debug("Volume '%s' already exists.", name)
return False
try:
self.virsh_instance.vol_create_as(name, self.pool_name,
capability, allocation, frmt,
ignore_status=False, debug=True)
except error.CmdError, detail:
logging.error("Create volume failed:%s", detail)
return False
if not self.volume_exists(name):
logging.error("Created volume does not exist:%s",
self.list_volumes())
return False
return True
[docs] def delete_volume(self, name):
"""
Remove a volume.
"""
if self.volume_exists(name):
try:
self.virsh_instance.vol_delete(name, self.pool_name,
ignore_status=False)
except error.CmdError, detail:
logging.error("Delete volume failed:%s", detail)
return False
if not self.volume_exists(name):
logging.debug("Volume '%s' has been deleted.", name)
return True
else:
logging.debug("Delete volume '%s' failed.", name)
return False
else:
logging.info("Volume '%s' does not exist.", name)
return True # Return True for expected result
[docs] def clone_volume(self, old_name, new_name):
"""
Clone a volume
"""
if self.volume_exists(old_name) and not self.volume_exists(new_name):
try:
self.virsh_instance.vol_clone(old_name, new_name,
self.pool_name,
ignore_status=False)
except error.CmdError, detail:
logging.error("Clone volume failed:%s", detail)
return False
if self.volume_exists(new_name):
logging.debug("Volume '%s' has been created by clone.",
new_name)
return True
else:
logging.debug("Volume '%s' clone failed.", old_name)
return False
else:
logging.info("Volume '%s' does not exist or '%s' has been exist."
% (old_name, new_name))
return False