"""
High-level libvirt test utility functions.
This module is meant to reduce code size by performing common test procedures.
Generally, code here should look like test code.
More specifically:
- Functions in this module should raise exceptions if things go wrong
- Functions in this module typically use functions and classes from
lower-level modules (e.g. utils_misc, qemu_vm, aexpect).
- Functions in this module should not be used by lower-level modules.
- Functions in this module should be used in the right context.
For example, a function should not be used where it may display
misleading or inaccurate info or debug messages.
:copyright: 2014 Red Hat Inc.
"""
import re
import os
import ast
import logging
import shutil
import threading
import time
from virttest import virsh
from virttest import xml_utils
from virttest import iscsi
from virttest import nfs
from virttest import data_dir
from virttest import aexpect
from virttest import utils_misc
from virttest import utils_selinux
from virttest import libvirt_storage
from virttest import utils_net
from virttest import gluster
from virttest import remote
from virttest.test_setup import LibvirtPolkitConfig
from virttest.utils_libvirtd import service_libvirtd_control
from autotest.client import utils
from autotest.client.shared import error
from virttest.libvirt_xml import vm_xml
from virttest.libvirt_xml import network_xml
from virttest.libvirt_xml import xcepts
from virttest.libvirt_xml import NetworkXML
from virttest.libvirt_xml import IPXML
from virttest.libvirt_xml import pool_xml
from virttest.libvirt_xml import nwfilter_xml
from virttest.libvirt_xml.devices import disk
from virttest.libvirt_xml.devices import hostdev
from virttest.libvirt_xml.devices import controller
from virttest.libvirt_xml.devices import seclabel
from virttest.libvirt_xml.devices import channel
from __init__ import ping
try:
from autotest.client import lv_utils
except ImportError:
from virttest.staging import lv_utils
[docs]class LibvirtNetwork(object):
"""
Class to create a temporary network for testing.
"""
[docs] def create_vnet_xml(self):
"""
Create XML for a virtual network.
"""
net_xml = NetworkXML()
net_xml.name = self.name
ip = IPXML(address=self.address)
net_xml.ip = ip
return self.address, net_xml
[docs] def create_macvtap_xml(self):
"""
Create XML for a macvtap network.
"""
net_xml = NetworkXML()
net_xml.name = self.name
net_xml.forward = {'mode': 'bridge', 'dev': self.iface}
ip = utils_net.get_ip_address_by_interface(self.iface)
return ip, net_xml
[docs] def create_bridge_xml(self):
"""
Create XML for a bridged network.
"""
net_xml = NetworkXML()
net_xml.name = self.name
net_xml.forward = {'mode': 'bridge'}
net_xml.bridge = {'name': self.iface}
ip = utils_net.get_ip_address_by_interface(self.iface)
return ip, net_xml
def __init__(self, net_type, address=None, iface=None, net_name=None,
persistent=False):
if net_name is None:
self.name = 'virt-test-%s' % net_type
else:
self.name = net_name
self.address = address
self.iface = iface
self.persistent = persistent
if net_type == 'vnet':
if not self.address:
raise error.TestError('Create vnet need address be set')
self.ip, net_xml = self.create_vnet_xml()
elif net_type == 'macvtap':
if not self.iface:
raise error.TestError('Create macvtap need iface be set')
self.ip, net_xml = self.create_macvtap_xml()
elif net_type == 'bridge':
if not self.iface:
raise error.TestError('Create bridge need iface be set')
self.ip, net_xml = self.create_bridge_xml()
else:
raise error.TestError('Unknown libvirt network type %s' % net_type)
if self.persistent:
net_xml.define()
net_xml.start()
else:
net_xml.create()
[docs] def cleanup(self):
"""
Clear up network.
"""
virsh.net_destroy(self.name)
if self.persistent:
virsh.net_undefine(self.name)
[docs]def cpus_parser(cpulist):
"""
Parse a list of cpu list, its syntax is a comma separated list,
with '-' for ranges and '^' denotes exclusive.
:param cpulist: a list of physical CPU numbers
"""
hyphens = []
carets = []
commas = []
others = []
if cpulist is None:
return None
else:
if "," in cpulist:
cpulist_list = re.split(",", cpulist)
for cpulist in cpulist_list:
if "-" in cpulist:
tmp = re.split("-", cpulist)
hyphens = hyphens + range(int(tmp[0]), int(tmp[-1]) + 1)
elif "^" in cpulist:
tmp = re.split("\^", cpulist)[-1]
carets.append(int(tmp))
else:
try:
commas.append(int(cpulist))
except ValueError:
logging.error("The cpulist has to be an "
"integer. (%s)", cpulist)
elif "-" in cpulist:
tmp = re.split("-", cpulist)
hyphens = range(int(tmp[0]), int(tmp[-1]) + 1)
elif "^" in cpulist:
tmp = re.split("^", cpulist)[-1]
carets.append(int(tmp))
else:
try:
others.append(int(cpulist))
return others
except ValueError:
logging.error("The cpulist has to be an "
"integer. (%s)", cpulist)
cpus_set = set(hyphens).union(set(commas)).difference(set(carets))
return sorted(list(cpus_set))
[docs]def cpus_string_to_affinity_list(cpus_string, num_cpus):
"""
Parse the cpus_string string to a affinity list.
e.g
host_cpu_count = 4
0 --> [y,-,-,-]
0,1 --> [y,y,-,-]
0-2 --> [y,y,y,-]
0-2,^2 --> [y,y,-,-]
r --> [y,y,y,y]
"""
# Check the input string.
single_pattern = r"\d+"
between_pattern = r"\d+-\d+"
exclude_pattern = r"\^\d+"
sub_pattern = r"(%s)|(%s)|(%s)" % (exclude_pattern,
single_pattern, between_pattern)
pattern = r"^((%s),)*(%s)$" % (sub_pattern, sub_pattern)
if not re.match(pattern, cpus_string):
logging.debug("Cpus_string=%s is not a supported format for cpu_list."
% cpus_string)
# Init a list for result.
affinity = []
for i in range(int(num_cpus)):
affinity.append('-')
# Letter 'r' means all cpus.
if cpus_string == "r":
for i in range(len(affinity)):
affinity[i] = "y"
return affinity
# Split the string with ','.
sub_cpus = cpus_string.split(",")
# Parse each sub_cpus.
for cpus in sub_cpus:
if "-" in cpus:
minmum = cpus.split("-")[0]
maxmum = cpus.split("-")[-1]
for i in range(int(minmum), int(maxmum) + 1):
affinity[i] = "y"
elif "^" in cpus:
affinity[int(cpus.strip("^"))] = "-"
else:
affinity[int(cpus)] = "y"
return affinity
[docs]def cpu_allowed_list_by_task(pid, tid):
"""
Get the Cpus_allowed_list in status of task.
"""
cmd = "cat /proc/%s/task/%s/status|grep Cpus_allowed_list:| awk '{print $2}'" % (pid, tid)
result = utils.run(cmd, ignore_status=True)
if result.exit_status:
return None
return result.stdout.strip()
[docs]def clean_up_snapshots(vm_name, snapshot_list=[]):
"""
Do recovery after snapshot
:param vm_name: Name of domain
:param snapshot_list: The list of snapshot name you want to remove
"""
if not snapshot_list:
# Get all snapshot names from virsh snapshot-list
snapshot_list = virsh.snapshot_list(vm_name)
# Get snapshot disk path
for snap_name in snapshot_list:
# Delete useless disk snapshot file if exists
snap_xml = virsh.snapshot_dumpxml(vm_name,
snap_name).stdout.strip()
xtf_xml = xml_utils.XMLTreeFile(snap_xml)
disks_path = xtf_xml.findall('disks/disk/source')
for disk in disks_path:
os.system('rm -f %s' % disk.get('file'))
# Delete snapshots of vm
virsh.snapshot_delete(vm_name, snap_name)
else:
# Get snapshot disk path from domain xml because
# there is no snapshot info with the name
dom_xml = vm_xml.VMXML.new_from_dumpxml(vm_name).xmltreefile
disk_path = dom_xml.find('devices/disk/source').get('file')
for name in snapshot_list:
snap_disk_path = disk_path.split(".")[0] + "." + name
os.system('rm -f %s' % snap_disk_path)
[docs]def get_all_cells():
"""
Use virsh freecell --all to get all cells on host
::
# virsh freecell --all
0: 124200 KiB
1: 1059868 KiB
--------------------
Total: 1184068 KiB
That would return a dict like:
::
cell_dict = {"0":"124200 KiB", "1":"1059868 KiB", "Total":"1184068 KiB"}
:return: cell_dict
"""
fc_result = virsh.freecell(options="--all", ignore_status=True)
if fc_result.exit_status:
if fc_result.stderr.count("NUMA not supported"):
raise error.TestNAError(fc_result.stderr.strip())
else:
raise error.TestFail(fc_result.stderr.strip())
output = fc_result.stdout.strip()
cell_list = output.splitlines()
# remove "------------" line
del cell_list[-2]
cell_dict = {}
for cell_line in cell_list:
cell_info = cell_line.split(":")
cell_num = cell_info[0].strip()
cell_mem = cell_info[-1].strip()
cell_dict[cell_num] = cell_mem
return cell_dict
[docs]def check_blockjob(vm_name, target, check_point="none", value="0"):
"""
Run blookjob command to check block job progress, bandwidth, ect.
:param vm_name: Domain name
:param target: Domian disk target dev
:param check_point: Job progrss, bandwidth or none(no job)
:param value: Value of progress, bandwidth or 0(no job)
:return: Boolean value, true for pass, false for fail
"""
if check_point not in ["progress", "bandwidth", "none"]:
logging.error("Check point must be: progress, bandwidth or none")
return False
try:
cmd_result = virsh.blockjob(vm_name, target, "--info", ignore_status=True)
output = cmd_result.stdout.strip()
err = cmd_result.stderr.strip()
status = cmd_result.exit_status
except:
raise error.TestFail("Error occur when running blockjob command.")
if status == 0:
# libvirt print job progress to stderr
if not len(err):
logging.debug("No block job find")
if check_point == "none":
return True
else:
if check_point == "none":
logging.error("Expect no job but find block job:\n%s", err)
elif check_point == "progress":
progress = value + " %"
if re.search(progress, err):
return True
elif check_point == "bandwidth":
bandwidth = value + " MiB/s"
if bandwidth == output.split(':')[1].strip():
logging.debug("Bandwidth is equal to %s", bandwidth)
return True
else:
logging.error("Bandwidth is not equal to %s", bandwidth)
else:
logging.error("Run blockjob command fail")
return False
[docs]def setup_or_cleanup_nfs(is_setup, mount_dir="nfs-mount", is_mount=False,
export_options="rw,no_root_squash",
mount_options="rw",
export_dir="nfs-export",
restore_selinux=""):
"""
Set SElinux to "permissive" and Set up nfs service on localhost.
Or clean up nfs service on localhost and restore SElinux.
Note: SElinux status must be backed up and restored after use.
Example:
# Setup NFS.
res = setup_or_cleanup_nfs(is_setup=True)
# Backup SELinux status.
selinux_bak = res["selinux_status_bak"]
# Do something.
...
# Cleanup NFS and restore NFS.
res = setup_or_cleanup_nfs(is_setup=False, restore_selinux=selinux_bak)
:param is_setup: Boolean value, true for setup, false for cleanup
:param mount_dir: NFS mount dir. This can be an absolute path on the
host or a relative path origin from libvirt tmp dir.
Default to "nfs-mount".
:param is_mount: Boolean value, Whether the target NFS should be mounted.
:param export_options: Options for nfs dir. Default to "nfs-export".
:param mount_options: Options for mounting nfs dir. Default to "rw".
:param export_dir: NFS export dir. This can be an absolute path on the
host or a relative path origin from libvirt tmp dir.
Default to "nfs-export".
:return: A dict contains export and mount result parameters:
export_dir: Absolute directory of exported local NFS file system.
mount_dir: Absolute directory NFS file system mounted on.
selinux_status_bak: SELinux status before set
"""
result = {}
tmpdir = data_dir.get_tmp_dir()
if not os.path.isabs(export_dir):
export_dir = os.path.join(tmpdir, export_dir)
if not os.path.isabs(mount_dir):
mount_dir = os.path.join(tmpdir, mount_dir)
result["export_dir"] = export_dir
result["mount_dir"] = mount_dir
result["selinux_status_bak"] = utils_selinux.get_status()
nfs_params = {"nfs_mount_dir": mount_dir, "nfs_mount_options": mount_options,
"nfs_mount_src": export_dir, "setup_local_nfs": "yes",
"export_options": export_options}
_nfs = nfs.Nfs(nfs_params)
if is_setup:
# Set selinux to permissive that the file in nfs
# can be used freely
if utils_selinux.is_enforcing():
utils_selinux.set_status("permissive")
_nfs.setup()
if not is_mount:
_nfs.umount()
del result["mount_dir"]
else:
if restore_selinux:
utils_selinux.set_status(restore_selinux)
_nfs.unexportfs_in_clean = True
_nfs.cleanup()
return result
[docs]def setup_or_cleanup_iscsi(is_setup, is_login=True,
emulated_image="emulated-iscsi", image_size="1G",
chap_user="", chap_passwd="", restart_tgtd="no",
portal_ip="127.0.0.1"):
"""
Set up(and login iscsi target) or clean up iscsi service on localhost.
:param is_setup: Boolean value, true for setup, false for cleanup
:param is_login: Boolean value, true for login, false for not login
:param emulated_image: name of iscsi device
:param image_size: emulated image's size
:param chap_user: CHAP authentication username
:param chap_passwd: CHAP authentication password
:return: iscsi device name or iscsi target
"""
tmpdir = os.path.join(data_dir.get_root_dir(), 'tmp')
emulated_path = os.path.join(tmpdir, emulated_image)
emulated_target = ("iqn.%s.com.virttest:%s.target" %
(time.strftime("%Y-%m"), emulated_image))
iscsi_params = {"emulated_image": emulated_path, "target": emulated_target,
"image_size": image_size, "iscsi_thread_id": "virt",
"chap_user": chap_user, "chap_passwd": chap_passwd,
"restart_tgtd": restart_tgtd, "portal_ip": portal_ip}
_iscsi = iscsi.Iscsi.create_iSCSI(iscsi_params)
if is_setup:
if is_login:
_iscsi.login()
# The device doesn't necessarily appear instantaneously, so give
# about 5 seconds for it to appear before giving up
iscsi_device = utils_misc.wait_for(_iscsi.get_device_name, 5, 0, 1,
"Searching iscsi device name.")
if iscsi_device:
logging.debug("iscsi device: %s", iscsi_device)
return iscsi_device
if not iscsi_device:
logging.error("Not find iscsi device.")
# Cleanup and return "" - caller needs to handle that
# _iscsi.export_target() will have set the emulated_id and
# export_flag already on success...
_iscsi.cleanup()
utils.run("rm -f %s" % emulated_path)
else:
_iscsi.export_target()
return (emulated_target, _iscsi.luns)
else:
_iscsi.export_flag = True
_iscsi.emulated_id = _iscsi.get_target_id()
_iscsi.cleanup()
utils.run("rm -f %s" % emulated_path)
utils.run("vgscan --cache", ignore_status=True)
return ""
[docs]def get_host_ipv4_addr():
"""
Get host ipv4 addr
"""
if_up = utils_net.get_net_if(state="UP")
for i in if_up:
ipv4_value = utils_net.get_net_if_addrs(i)["ipv4"]
logging.debug("ipv4_value is %s", ipv4_value)
if ipv4_value != []:
ip_addr = ipv4_value[0]
break
if ip_addr is not None:
logging.info("ipv4 address is %s", ip_addr)
else:
raise error.TestFail("Fail to get ip address")
return ip_addr
[docs]def setup_or_cleanup_gluster(is_setup, vol_name, brick_path="", pool_name="",
file_path="/etc/glusterfs/glusterd.vol"):
"""
Set up or clean up glusterfs environment on localhost
:param is_setup: Boolean value, true for setup, false for cleanup
:param vol_name: gluster created volume name
:param brick_path: Dir for create glusterfs
:return: ip_addr or nothing
"""
try:
utils_misc.find_command("gluster")
except ValueError:
raise error.TestNAError("Missing command 'gluster'")
if not brick_path:
tmpdir = os.path.join(data_dir.get_root_dir(), 'tmp')
brick_path = os.path.join(tmpdir, pool_name)
if is_setup:
ip_addr = get_host_ipv4_addr()
gluster.add_rpc_insecure(file_path)
gluster.glusterd_start()
logging.debug("finish start gluster")
gluster.gluster_vol_create(vol_name, ip_addr, brick_path, force=True)
gluster.gluster_allow_insecure(vol_name)
logging.debug("finish vol create in gluster")
return ip_addr
else:
gluster.gluster_vol_stop(vol_name, True)
gluster.gluster_vol_delete(vol_name)
gluster.gluster_brick_delete(brick_path)
return ""
[docs]def define_pool(pool_name, pool_type, pool_target, cleanup_flag, **kwargs):
"""
To define a given type pool(Support types: 'dir', 'netfs', logical',
iscsi', 'gluster', 'disk' and 'fs').
:param pool_name: Name of the pool
:param pool_type: Type of the pool
:param pool_target: Target for underlying storage
:param cleanup_flag: A list contains 3 booleans and 1 string stands for
need_cleanup_nfs, need_cleanup_iscsi,
need_cleanup_logical, selinux_bak and
need_cleanup_gluster
:param kwargs: key words for special pool define. eg, glusterfs pool
source path and source name, etc
"""
extra = ""
vg_name = pool_name
cleanup_nfs = False
cleanup_iscsi = False
cleanup_logical = False
selinux_bak = ""
cleanup_gluster = False
if not os.path.exists(pool_target) and pool_type != "gluster":
os.mkdir(pool_target)
if pool_type == "dir":
pass
elif pool_type == "netfs":
# Set up NFS server without mount
res = setup_or_cleanup_nfs(True, pool_target, False)
nfs_path = res["export_dir"]
selinux_bak = res["selinux_status_bak"]
cleanup_nfs = True
extra = "--source-host %s --source-path %s" % ('localhost',
nfs_path)
elif pool_type == "logical":
# Create vg by using iscsi device
lv_utils.vg_create(vg_name, setup_or_cleanup_iscsi(True))
cleanup_iscsi = True
cleanup_logical = True
extra = "--source-name %s" % vg_name
elif pool_type == "iscsi":
# Set up iscsi target without login
iscsi_target, _ = setup_or_cleanup_iscsi(True, False)
cleanup_iscsi = True
extra = "--source-host %s --source-dev %s" % ('localhost',
iscsi_target)
elif pool_type == "disk":
# Set up iscsi target and login
device_name = setup_or_cleanup_iscsi(True)
cleanup_iscsi = True
# Create a partition to make sure disk pool can start
cmd = "parted -s %s mklabel msdos" % device_name
utils.run(cmd)
cmd = "parted -s %s mkpart primary ext4 0 100" % device_name
utils.run(cmd)
extra = "--source-dev %s" % device_name
elif pool_type == "fs":
# Set up iscsi target and login
device_name = setup_or_cleanup_iscsi(True)
cleanup_iscsi = True
# Format disk to make sure fs pool can start
cmd = "mkfs.ext4 -F %s" % device_name
utils.run(cmd)
extra = "--source-dev %s" % device_name
elif pool_type == "gluster":
gluster_source_path = kwargs.get('gluster_source_path')
gluster_source_name = kwargs.get('gluster_source_name')
gluster_file_name = kwargs.get('gluster_file_name')
gluster_file_type = kwargs.get('gluster_file_type')
gluster_file_size = kwargs.get('gluster_file_size')
gluster_vol_number = kwargs.get('gluster_vol_number')
# Prepare gluster service and create volume
hostip = setup_or_cleanup_gluster(True, gluster_source_name,
pool_name=pool_name)
logging.debug("hostip is %s", hostip)
# create image in gluster volume
file_path = "gluster://%s/%s" % (hostip, gluster_source_name)
for i in range(gluster_vol_number):
file_name = "%s_%d" % (gluster_file_name, i)
utils.run("qemu-img create -f %s %s/%s %s" %
(gluster_file_type, file_path, file_name,
gluster_file_size))
cleanup_gluster = True
extra = "--source-host %s --source-path %s --source-name %s" % \
(hostip, gluster_source_path, gluster_source_name)
elif pool_type in ["scsi", "mpath", "rbd", "sheepdog"]:
raise error.TestNAError(
"Pool type '%s' has not yet been supported in the test." %
pool_type)
else:
raise error.TestFail("Invalid pool type: '%s'." % pool_type)
# Mark the clean up flags
cleanup_flag[0] = cleanup_nfs
cleanup_flag[1] = cleanup_iscsi
cleanup_flag[2] = cleanup_logical
cleanup_flag[3] = selinux_bak
cleanup_flag[4] = cleanup_gluster
try:
result = virsh.pool_define_as(pool_name, pool_type, pool_target, extra,
ignore_status=True)
except error.CmdError:
logging.error("Define '%s' type pool fail.", pool_type)
return result
[docs]def verify_virsh_console(session, user, passwd, timeout=10, debug=False):
"""
Run commands in console session.
"""
log = ""
console_cmd = "cat /proc/cpuinfo"
try:
while True:
match, text = session.read_until_last_line_matches(
[r"[E|e]scape character is", r"login:",
r"[P|p]assword:", session.prompt],
timeout, internal_timeout=1)
if match == 0:
if debug:
logging.debug("Got '^]', sending '\\n'")
session.sendline()
elif match == 1:
if debug:
logging.debug("Got 'login:', sending '%s'", user)
session.sendline(user)
elif match == 2:
if debug:
logging.debug("Got 'Password:', sending '%s'", passwd)
session.sendline(passwd)
elif match == 3:
if debug:
logging.debug("Got Shell prompt -- logged in")
break
status, output = session.cmd_status_output(console_cmd)
logging.info("output of command:\n%s", output)
session.close()
except (aexpect.ShellError,
aexpect.ExpectError), detail:
log = session.get_output()
logging.error("Verify virsh console failed:\n%s\n%s", detail, log)
session.close()
return False
if not re.search("processor", output):
logging.error("Verify virsh console failed: Result does not match.")
return False
return True
[docs]def pci_label_from_address(address_dict, radix=10):
"""
Generate a pci label from a dict of address.
:param address_dict: A dict contains domain, bus, slot and function.
:param radix: The radix of your data in address_dict.
Example:
::
address_dict = {'domain': '0x0000', 'bus': '0x08', 'slot': '0x10', 'function': '0x0'}
radix = 16
return = pci_0000_08_10_0
"""
try:
domain = int(address_dict['domain'], radix)
bus = int(address_dict['bus'], radix)
slot = int(address_dict['slot'], radix)
function = int(address_dict['function'], radix)
except (TypeError, KeyError), detail:
raise error.TestError(detail)
pci_label = ("pci_%04x_%02x_%02x_%01x" % (domain, bus, slot, function))
return pci_label
[docs]def mk_label(disk, label="msdos", session=None):
"""
Set label for disk.
"""
mklabel_cmd = "parted -s %s mklabel %s" % (disk, label)
if session:
session.cmd(mklabel_cmd)
else:
utils.run(mklabel_cmd)
[docs]def mk_part(disk, size="100M", session=None):
"""
Create a partition for disk
"""
mkpart_cmd = ("parted -s -a optimal %s mklabel msdos -- "
"mkpart primary ext4 0 %s" % (disk, size))
if session:
session.cmd(mkpart_cmd)
else:
utils.run(mkpart_cmd)
[docs]def mkfs(partition, fs_type, options="", session=None):
"""
Make a file system on the partition
"""
mkfs_cmd = "mkfs.%s -F %s %s" % (fs_type, partition, options)
if session:
session.cmd(mkfs_cmd)
else:
utils.run(mkfs_cmd)
[docs]def get_parts_list(session=None):
"""
Get all partition lists.
"""
parts_cmd = "cat /proc/partitions"
if session:
_, parts_out = session.cmd_status_output(parts_cmd)
else:
parts_out = utils.run(parts_cmd).stdout
parts = []
if parts_out:
for line in parts_out.rsplit("\n"):
if line.startswith("major") or line == "":
continue
parts_line = line.rsplit()
if len(parts_line) == 4:
parts.append(parts_line[3])
logging.debug("Find parts: %s" % parts)
return parts
[docs]def yum_install(pkg_list, session=None):
"""
Try to install packages on system
"""
if not isinstance(pkg_list, list):
raise error.TestError("Parameter error.")
yum_cmd = "rpm -q {0} || yum -y install {0}"
for pkg in pkg_list:
if session:
status = session.cmd_status(yum_cmd.format(pkg))
else:
status = utils.run(yum_cmd.format(pkg)).exit_status
if status:
raise error.TestFail("Failed to install package: %s"
% pkg)
[docs]def check_actived_pool(pool_name):
"""
Check if pool_name exist in active pool list
"""
sp = libvirt_storage.StoragePool()
if not sp.pool_exists(pool_name):
raise error.TestFail("Can't find pool %s" % pool_name)
if not sp.is_pool_active(pool_name):
raise error.TestFail("Pool %s is not active." % pool_name)
logging.debug("Find active pool %s", pool_name)
return True
[docs]class PoolVolumeTest(object):
"""Test class for storage pool or volume"""
def __init__(self, test, params):
self.tmpdir = test.tmpdir
self.params = params
self.selinux_bak = ""
[docs] def cleanup_pool(self, pool_name, pool_type, pool_target, emulated_image,
**kwargs):
"""
Delete vols, destroy the created pool and restore the env
"""
sp = libvirt_storage.StoragePool()
source_format = kwargs.get('source_format')
source_name = kwargs.get('source_name')
device_name = kwargs.get('device_name', "/DEV/EXAMPLE")
try:
if sp.pool_exists(pool_name):
pv = libvirt_storage.PoolVolume(pool_name)
if pool_type in ["dir", "netfs", "logical", "disk"]:
if sp.is_pool_active(pool_name):
vols = pv.list_volumes()
for vol in vols:
# Ignore failed deletion here for deleting pool
pv.delete_volume(vol)
if not sp.delete_pool(pool_name):
raise error.TestFail("Delete pool %s failed" % pool_name)
finally:
if pool_type == "netfs" and source_format != 'glusterfs':
nfs_server_dir = self.params.get("nfs_server_dir", "nfs-server")
nfs_path = os.path.join(self.tmpdir, nfs_server_dir)
setup_or_cleanup_nfs(is_setup=False, export_dir=nfs_path,
restore_selinux=self.selinux_bak)
if os.path.exists(nfs_path):
shutil.rmtree(nfs_path)
if pool_type == "logical":
cmd = "pvs |grep vg_logical|awk '{print $1}'"
pv = utils.system_output(cmd)
# Cleanup logical volume anyway
utils.run("vgremove -f vg_logical", ignore_status=True)
utils.run("pvremove %s" % pv, ignore_status=True)
# These types used iscsi device
# If we did not provide block device
if (pool_type in ["logical", "fs", "disk"] and
device_name.count("EXAMPLE")):
setup_or_cleanup_iscsi(is_setup=False,
emulated_image=emulated_image)
# Used iscsi device anyway
if pool_type in ["iscsi", "scsi"]:
setup_or_cleanup_iscsi(is_setup=False,
emulated_image=emulated_image)
if pool_type == "scsi":
scsi_xml_file = self.params.get("scsi_xml_file", "")
if os.path.exists(scsi_xml_file):
os.remove(scsi_xml_file)
if pool_type in ["dir", "fs", "netfs"]:
pool_target = os.path.join(self.tmpdir, pool_target)
if os.path.exists(pool_target):
shutil.rmtree(pool_target)
if pool_type == "gluster" or source_format == 'glusterfs':
setup_or_cleanup_gluster(False, source_name)
[docs] def pre_pool(self, pool_name, pool_type, pool_target, emulated_image,
**kwargs):
"""
Prepare(define or create) the specific type pool
:param pool_name: created pool name
:param pool_type: dir, disk, logical, fs, netfs or else
:param pool_target: target of storage pool
:param emulated_image: use an image file to simulate a scsi disk
it could be used for disk, logical pool, etc
:param kwargs: key words for specific pool
"""
extra = ""
image_size = kwargs.get('image_size', "100M")
source_format = kwargs.get('source_format')
source_name = kwargs.get('source_name', None)
persistent = kwargs.get('persistent', False)
device_name = kwargs.get('device_name', "/DEV/EXAMPLE")
# If tester does not provide block device, creating one
if (device_name.count("EXAMPLE") and
pool_type in ["disk", "fs", "logical"]):
device_name = setup_or_cleanup_iscsi(is_setup=True,
emulated_image=emulated_image,
image_size=image_size)
if pool_type == "dir" and not persistent:
pool_target = os.path.join(self.tmpdir, pool_target)
if not os.path.exists(pool_target):
os.mkdir(pool_target)
elif pool_type == "disk":
# Disk pool does not allow to create volume by virsh command,
# so introduce parameter 'pre_disk_vol' to create partition(s)
# by 'parted' command, the parameter is a list of partition size,
# and the max number of partitions is 4. If pre_disk_vol is None,
# disk pool will have no volume
pre_disk_vol = kwargs.get('pre_disk_vol', None)
if type(pre_disk_vol) == list and len(pre_disk_vol):
for vol in pre_disk_vol:
mk_part(device_name, vol)
else:
mk_label(device_name, "gpt")
extra = " --source-dev %s" % device_name
if source_format:
extra += " --source-format %s" % source_format
elif pool_type == "fs":
cmd = "mkfs.ext4 -F %s" % device_name
pool_target = os.path.join(self.tmpdir, pool_target)
if not os.path.exists(pool_target):
os.mkdir(pool_target)
extra = " --source-dev %s" % device_name
utils.run(cmd)
elif pool_type == "logical":
logical_device = device_name
cmd_pv = "pvcreate %s" % logical_device
vg_name = "vg_%s" % pool_type
cmd_vg = "vgcreate %s %s" % (vg_name, logical_device)
extra = "--source-name %s" % vg_name
utils.run(cmd_pv)
utils.run(cmd_vg)
# Create a small volume for verification
# And VG path will not exist if no any volume in.(bug?)
cmd_lv = "lvcreate --name default_lv --size 1M %s" % vg_name
utils.run(cmd_lv)
elif pool_type == "netfs":
export_options = kwargs.get('export_options',
"rw,async,no_root_squash")
pool_target = os.path.join(self.tmpdir, pool_target)
if not os.path.exists(pool_target):
os.mkdir(pool_target)
if source_format == 'glusterfs':
hostip = setup_or_cleanup_gluster(True, source_name,
pool_name=pool_name)
logging.debug("hostip is %s", hostip)
extra = "--source-host %s --source-path %s" % (hostip,
source_name)
extra += " --source-format %s" % source_format
utils.system("setsebool virt_use_fusefs on")
else:
nfs_server_dir = self.params.get("nfs_server_dir", "nfs-server")
nfs_path = os.path.join(self.tmpdir, nfs_server_dir)
if not os.path.exists(nfs_path):
os.mkdir(nfs_path)
res = setup_or_cleanup_nfs(is_setup=True,
export_options=export_options,
export_dir=nfs_path)
self.selinux_bak = res["selinux_status_bak"]
source_host = self.params.get("source_host", "localhost")
extra = "--source-host %s --source-path %s" % (source_host,
nfs_path)
elif pool_type == "iscsi":
ip_protocal = kwargs.get('ip_protocal', "ipv4")
setup_or_cleanup_iscsi(is_setup=True,
emulated_image=emulated_image,
image_size=image_size)
iscsi_sessions = iscsi.iscsi_get_sessions()
iscsi_target = None
for iscsi_node in iscsi_sessions:
if iscsi_node[1].count(emulated_image):
iscsi_target = iscsi_node[1]
break
iscsi.iscsi_logout(iscsi_target)
if ip_protocal == "ipv6":
ip_addr = "::1"
else:
ip_addr = "127.0.0.1"
extra = " --source-host %s --source-dev %s" % (ip_addr,
iscsi_target)
elif pool_type == "scsi":
scsi_xml_file = self.params.get("scsi_xml_file", "")
if not os.path.exists(scsi_xml_file):
logical_device = setup_or_cleanup_iscsi(
is_setup=True,
emulated_image=emulated_image,
image_size=image_size)
cmd = ("iscsiadm -m session -P 3 |grep -B3 %s| grep Host|awk "
"'{print $3}'" % logical_device.split('/')[2])
scsi_host = utils.system_output(cmd)
scsi_pool_xml = pool_xml.PoolXML()
scsi_pool_xml.name = pool_name
scsi_pool_xml.pool_type = "scsi"
scsi_pool_xml.target_path = pool_target
scsi_pool_source_xml = pool_xml.SourceXML()
scsi_pool_source_xml.adp_type = 'scsi_host'
scsi_pool_source_xml.adp_name = "host" + scsi_host
scsi_pool_xml.set_source(scsi_pool_source_xml)
logging.debug("SCSI pool XML %s:\n%s", scsi_pool_xml.xml,
str(scsi_pool_xml))
scsi_xml_file = scsi_pool_xml.xml
self.params['scsi_xml_file'] = scsi_xml_file
elif pool_type == "gluster":
source_path = kwargs.get('source_path')
hostip = setup_or_cleanup_gluster(True, source_name,
pool_name=pool_name)
logging.debug("Gluster host ip address: %s", hostip)
extra = "--source-host %s --source-path %s --source-name %s" % \
(hostip, source_path, source_name)
func = virsh.pool_create_as
if pool_type == "scsi":
func = virsh.pool_create
if persistent:
func = virsh.pool_define_as
if pool_type == "scsi":
func = virsh.pool_define
# Create/define pool
if pool_type == "scsi":
result = func(scsi_xml_file, debug=True)
else:
result = func(pool_name, pool_type, pool_target, extra, debug=True)
# Here, virsh.pool_create_as return a boolean value and all other 3
# functions return CmdResult object
if isinstance(result, bool):
re_v = result
else:
re_v = result.exit_status == 0
if not re_v:
self.cleanup_pool(pool_name, pool_type, pool_target,
emulated_image, **kwargs)
raise error.TestFail("Prepare pool failed")
xml_str = virsh.pool_dumpxml(pool_name)
logging.debug("New prepared pool XML: %s", xml_str)
[docs] def pre_vol(self, vol_name, vol_format, capacity, allocation, pool_name):
"""
Preapare the specific type volume in pool
"""
pv = libvirt_storage.PoolVolume(pool_name)
if not pv.create_volume(vol_name, capacity, allocation, vol_format):
raise error.TestFail("Prepare volume failed.")
if not pv.volume_exists(vol_name):
raise error.TestFail("Can't find volume: %s" % vol_name)
##########Migration Relative functions##############
[docs]class MigrationTest(object):
"""Class for migration tests"""
def __init__(self):
# To get result in thread, using member parameters
# Result of virsh migrate command
# True means command executed successfully
self.RET_MIGRATION = True
# A lock for threads
self.RET_LOCK = threading.RLock()
# The time spent when migrating vms
# format: vm_name -> time(seconds)
self.mig_time = {}
[docs] def thread_func_migration(self, vm, desturi, options=None):
"""
Thread for virsh migrate command.
:param vm: A libvirt vm instance(local or remote).
:param desturi: remote host uri.
"""
# Migrate the domain.
try:
if options is None:
options = "--live --timeout=60"
stime = int(time.time())
vm.migrate(desturi, option=options, ignore_status=False,
debug=True)
etime = int(time.time())
self.mig_time[vm.name] = etime - stime
except error.CmdError, detail:
logging.error("Migration to %s failed:\n%s", desturi, detail)
self.RET_LOCK.acquire()
self.RET_MIGRATION = False
self.RET_LOCK.release()
[docs] def do_migration(self, vms, srcuri, desturi, migration_type, options=None,
thread_timeout=60):
"""
Migrate vms.
:param vms: migrated vms.
:param srcuri: local uri, used when migrate vm from remote to local
:param descuri: remote uri, used when migrate vm from local to remote
:param migration_type: do orderly for simultaneous migration
"""
if migration_type == "orderly":
for vm in vms:
migration_thread = threading.Thread(target=self.thread_func_migration,
args=(vm, desturi, options))
migration_thread.start()
migration_thread.join(thread_timeout)
if migration_thread.isAlive():
logging.error("Migrate %s timeout.", migration_thread)
self.RET_LOCK.acquire()
self.RET_MIGRATION = False
self.RET_LOCK.release()
elif migration_type == "cross":
# Migrate a vm to remote first,
# then migrate another to remote with the first vm back
vm_remote = vms.pop()
self.thread_func_migration(vm_remote, desturi)
for vm in vms:
thread1 = threading.Thread(target=self.thread_func_migration,
args=(vm_remote, srcuri, options))
thread2 = threading.Thread(target=self.thread_func_migration,
args=(vm, desturi, options))
thread1.start()
thread2.start()
thread1.join(thread_timeout)
thread2.join(thread_timeout)
vm_remote = vm
if thread1.isAlive() or thread1.isAlive():
logging.error("Cross migrate timeout.")
self.RET_LOCK.acquire()
self.RET_MIGRATION = False
self.RET_LOCK.release()
# Add popped vm back to list
vms.append(vm_remote)
elif migration_type == "simultaneous":
migration_threads = []
for vm in vms:
migration_threads.append(threading.Thread(
target=self.thread_func_migration,
args=(vm, desturi, options)))
# let all migration going first
for thread in migration_threads:
thread.start()
# listen threads until they end
for thread in migration_threads:
thread.join(thread_timeout)
if thread.isAlive():
logging.error("Migrate %s timeout.", thread)
self.RET_LOCK.acquire()
self.RET_MIGRATION = False
self.RET_LOCK.release()
if not self.RET_MIGRATION:
raise error.TestFail()
[docs] def cleanup_dest_vm(self, vm, srcuri, desturi):
"""
Cleanup migrated vm on remote host.
"""
vm.connect_uri = desturi
if vm.exists():
if vm.is_persistent():
vm.undefine()
if vm.is_alive():
# If vm on remote host is unaccessible
# graceful shutdown may cause confused
vm.destroy(gracefully=False)
# Set connect uri back to local uri
vm.connect_uri = srcuri
[docs]def check_result(result, expected_fails=[], skip_if=[], any_error=False):
"""
Check the result of a command and check command error message against
expectation.
:param result: Command result instance.
:param expected_fails: list of regex of expected stderr patterns. The check
will pass if any of these patterns matches.
:param skip_if: list of regex of expected patterns. The check will raise a
TestNAError if any of these patterns matches.
:param any_error: Whether expect on any error message. Setting to True will
will override expected_fails
"""
logging.debug("Command result:\n%s" % result)
if skip_if:
for patt in skip_if:
if re.search(patt, result.stderr):
raise error.TestNAError("Test skipped: found '%s' in test "
"result:\n%s" %
(patt, result.stderr))
if any_error:
if result.exit_status:
return
else:
raise error.TestFail("Expect should fail but got:\n%s" % result)
if result.exit_status:
if expected_fails:
if not any(re.search(patt, result.stderr)
for patt in expected_fails):
raise error.TestFail("Expect should fail with one of %s, "
"but failed with:\n%s" %
(expected_fails, result))
else:
raise error.TestFail("Expect should succeed, but got:\n%s" % result)
else:
if expected_fails:
raise error.TestFail("Expect should fail with one of %s, "
"but succeeded:\n%s" %
(expected_fails, result))
[docs]def check_exit_status(result, expect_error=False):
"""
Check the exit status of virsh commands.
:param result: Virsh command result object
:param expect_error: Boolean value, expect command success or fail
"""
if not expect_error:
if result.exit_status != 0:
raise error.TestFail(result.stderr)
else:
logging.debug("Command output:\n%s", result.stdout.strip())
elif expect_error and result.exit_status == 0:
raise error.TestFail("Expect fail, but run successfully.")
[docs]def get_interface_details(vm_name):
"""
Get the interface details from virsh domiflist command output
:return: list of all interfaces details
"""
# Parse the domif-list command output
domiflist_out = virsh.domiflist(vm_name).stdout
# Regular expression for the below output
# vnet0 bridge virbr0 virtio 52:54:00:b2:b3:b4
rg = re.compile(r"^(\w+|-)\s+(\w+)\s+(\w+)\s+(\S+)\s+"
"(([a-fA-F0-9]{2}:?){6})")
iface_cmd = {}
ifaces_cmd = []
for line in domiflist_out.split('\n'):
match_obj = rg.search(line)
# Due to the extra space in the list
if match_obj is not None:
iface_cmd['interface'] = match_obj.group(1)
iface_cmd['type'] = match_obj.group(2)
iface_cmd['source'] = match_obj.group(3)
iface_cmd['model'] = match_obj.group(4)
iface_cmd['mac'] = match_obj.group(5)
ifaces_cmd.append(iface_cmd)
iface_cmd = {}
return ifaces_cmd
[docs]def get_ifname_host(vm_name, mac):
"""
Get the vm interface name on host
:return: interface name, None if not exist
"""
ifaces = get_interface_details(vm_name)
for iface in ifaces:
if iface["mac"] == mac:
return iface["interface"]
return None
[docs]def check_iface(iface_name, checkpoint, extra="", **dargs):
"""
Check interface with specified checkpoint.
:param iface_name: Interface name
:param checkpoint: Check if interface exists,
and It's MAC address, IP address and State,
also connectivity by ping.
valid checkpoint: [exists, mac, ip, ping, state]
:param extra: Extra string for checking
:return: Boolean value, true for pass, false for fail
"""
support_check = ["exists", "mac", "ip", "ping"]
iface = utils_net.Interface(name=iface_name)
check_pass = False
try:
if checkpoint == "exists":
# extra is iface-list option
list_find, ifcfg_find = (False, False)
# Check virsh list output
result = virsh.iface_list(extra, ignore_status=True)
check_exit_status(result, False)
output = re.findall(r"(\S+)\ +(\S+)\ +(\S+|\s+)[\ +\n]",
str(result.stdout))
if filter(lambda x: x[0] == iface_name, output[1:]):
list_find = True
logging.debug("Find '%s' in virsh iface-list output: %s",
iface_name, list_find)
# Check network script
iface_script = "/etc/sysconfig/network-scripts/ifcfg-" + iface_name
ifcfg_find = os.path.exists(iface_script)
logging.debug("Find '%s': %s", iface_script, ifcfg_find)
check_pass = list_find and ifcfg_find
elif checkpoint == "mac":
# extra is the MAC address to compare
iface_mac = iface.get_mac().lower()
check_pass = iface_mac == extra
logging.debug("MAC address of %s: %s", iface_name, iface_mac)
elif checkpoint == "ip":
# extra is the IP address to compare
iface_ip = iface.get_ip()
check_pass = iface_ip == extra
logging.debug("IP address of %s: %s", iface_name, iface_ip)
elif checkpoint == "state":
# check iface State
result = virsh.iface_list(extra, ignore_status=True)
check_exit_status(result, False)
output = re.findall(r"(\S+)\ +(\S+)\ +(\S+|\s+)[\ +\n]",
str(result.stdout))
iface_state = filter(lambda x: x[0] == iface_name, output[1:])[0][1]
# active corresponds True, otherwise return False
check_pass = iface_state == "active"
elif checkpoint == "ping":
# extra is the ping destination
count = dargs.get("count", 3)
timeout = dargs.get("timeout", 5)
ping_s, _ = ping(dest=extra, count=count, interface=iface_name,
timeout=timeout,)
check_pass = ping_s == 0
else:
logging.debug("Support check points are: %s", support_check)
logging.error("Unsupport check point: %s", checkpoint)
except Exception, detail:
raise error.TestFail("Interface check failed: %s" % detail)
return check_pass
[docs]def create_hostdev_xml(pci_id, boot_order=0):
"""
Create a hostdev configuration file.
:param pci_id: such as "0000:03:04.0"
"""
# Create attributes dict for device's address element
device_domain = pci_id.split(':')[0]
device_domain = "0x%s" % device_domain
device_bus = pci_id.split(':')[1]
device_bus = "0x%s" % device_bus
device_slot = pci_id.split(':')[-1].split('.')[0]
device_slot = "0x%s" % device_slot
device_function = pci_id.split('.')[-1]
device_function = "0x%s" % device_function
hostdev_xml = hostdev.Hostdev()
hostdev_xml.mode = "subsystem"
hostdev_xml.managed = "yes"
hostdev_xml.hostdev_type = "pci"
if boot_order:
hostdev_xml.boot_order = boot_order
attrs = {'domain': device_domain, 'slot': device_slot,
'bus': device_bus, 'function': device_function}
hostdev_xml.source_address = hostdev_xml.new_source_address(**attrs)
logging.debug("Hostdev XML:\n%s", str(hostdev_xml))
return hostdev_xml.xml
[docs]def alter_boot_order(vm_name, pci_id, boot_order=0):
"""
Alter the startup sequence of VM to PCI-device firstly
OS boot element and per-device boot elements are mutually exclusive,
It's necessary that remove all OS boots before setting PCI-device order
:param vm_name: VM name
:param pci_id: such as "0000:06:00.1"
:param boot_order: order priority, such as 1, 2, ...
"""
vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name)
# remove all of OS boots
vmxml.remove_all_boots()
# prepare PCI-device XML with boot order
try:
device_domain = pci_id.split(':')[0]
device_domain = "0x%s" % device_domain
device_bus = pci_id.split(':')[1]
device_bus = "0x%s" % device_bus
device_slot = pci_id.split(':')[-1].split('.')[0]
device_slot = "0x%s" % device_slot
device_function = pci_id.split('.')[-1]
device_function = "0x%s" % device_function
except IndexError:
raise error.TestError("Invalid PCI Info: %s" % pci_id)
attrs = {'domain': device_domain, 'slot': device_slot,
'bus': device_bus, 'function': device_function}
vmxml.add_hostdev(attrs, boot_order=boot_order)
# synchronize XML
vmxml.sync()
[docs]def create_disk_xml(params):
"""
Create a disk configuration file.
"""
# Create attributes dict for disk's address element
type_name = params.get("type_name", "file")
target_dev = params.get("target_dev", "vdb")
target_bus = params.get("target_bus", "virtio")
diskxml = disk.Disk(type_name)
diskxml.device = params.get("device_type", "disk")
snapshot_attr = params.get('disk_snapshot_attr')
if snapshot_attr:
diskxml.snapshot = snapshot_attr
source_attrs = {}
source_host = []
source_seclabel = []
auth_attrs = {}
driver_attrs = {}
try:
if type_name == "file":
source_file = params.get("source_file", "")
source_attrs = {'file': source_file}
elif type_name == "block":
source_file = params.get("source_file", "")
source_attrs = {'dev': source_file}
elif type_name == "dir":
source_dir = params.get("source_dir", "")
source_attrs = {'dir': source_dir}
elif type_name == "volume":
source_pool = params.get("source_pool")
source_volume = params.get("source_volume")
source_mode = params.get("source_mode", "")
source_attrs = {'pool': source_pool, 'volume': source_volume}
if source_mode:
source_attrs.update({"mode": source_mode})
elif type_name == "network":
source_protocol = params.get("source_protocol")
source_name = params.get("source_name")
source_host_name = params.get("source_host_name").split()
source_host_port = params.get("source_host_port").split()
transport = params.get("transport")
source_attrs = {'protocol': source_protocol, 'name': source_name}
source_host = []
for host_name, host_port in zip(source_host_name, source_host_port):
source_host.append({'name': host_name,
'port': host_port})
if transport:
source_host[0].update({'transport': transport})
else:
error.TestNAError("Unsupport disk type %s" % type_name)
source_startupPolicy = params.get("source_startupPolicy")
if source_startupPolicy:
source_attrs['startupPolicy'] = source_startupPolicy
sec_model = params.get("sec_model")
relabel = params.get("relabel")
label = params.get("sec_label")
if sec_model or relabel:
sec_dict = {}
sec_xml = seclabel.Seclabel()
if sec_model:
sec_dict.update({'model': sec_model})
if relabel:
sec_dict.update({'relabel': relabel})
if label:
sec_dict.update({'label': label})
sec_xml.update(sec_dict)
logging.debug("The sec xml is %s", sec_xml.xmltreefile)
source_seclabel.append(sec_xml)
source_params = {"attrs": source_attrs, "seclabels": source_seclabel,
"hosts": source_host}
src_config_file = params.get("source_config_file")
if src_config_file:
source_params.update({"config_file": src_config_file})
# If we use config file, "hosts" isn't needed
if "hosts" in source_params:
source_params.pop("hosts")
snapshot_name = params.get('source_snap_name')
if snapshot_name:
source_params.update({"snapshot_name": snapshot_name})
diskxml.source = diskxml.new_disk_source(**source_params)
auth_user = params.get("auth_user")
secret_type = params.get("secret_type")
secret_uuid = params.get("secret_uuid")
secret_usage = params.get("secret_usage")
if auth_user:
auth_attrs['auth_user'] = auth_user
if secret_type:
auth_attrs['secret_type'] = secret_type
if secret_uuid:
auth_attrs['secret_uuid'] = secret_uuid
elif secret_usage:
auth_attrs['secret_usage'] = secret_usage
if auth_attrs:
diskxml.auth = diskxml.new_auth(**auth_attrs)
driver_name = params.get("driver_name", "qemu")
driver_type = params.get("driver_type", "")
driver_cache = params.get("driver_cache", "")
driver_discard = params.get("driver_discard", "")
driver_iothread = params.get("driver_iothread", "")
if driver_name:
driver_attrs['name'] = driver_name
if driver_type:
driver_attrs['type'] = driver_type
if driver_cache:
driver_attrs['cache'] = driver_cache
if driver_discard:
driver_attrs['discard'] = driver_discard
if driver_iothread:
driver_attrs['iothread'] = driver_iothread
if driver_attrs:
diskxml.driver = driver_attrs
diskxml.readonly = "yes" == params.get("readonly", "no")
diskxml.share = "yes" == params.get("shareable", "no")
diskxml.target = {'dev': target_dev, 'bus': target_bus}
except Exception, detail:
logging.error("Fail to create disk XML:\n%s", detail)
logging.debug("Disk XML %s:\n%s", diskxml.xml, str(diskxml))
# Wait for file completed
def file_exists():
if not utils.run("ls %s" % diskxml.xml,
ignore_status=True).exit_status:
return True
utils_misc.wait_for(file_exists, 5)
return diskxml.xml
[docs]def create_net_xml(net_name, params):
"""
Create a new network or update an existed network xml
"""
dns_dict = {}
host_dict = {}
net_name = params.get("net_name", "default")
net_bridge = params.get("net_bridge", '{}')
net_forward = params.get("net_forward", '{}')
forward_iface = params.get("forward_iface")
net_dns_forward = params.get("net_dns_forward")
net_dns_txt = params.get("net_dns_txt")
net_dns_srv = params.get("net_dns_srv")
net_dns_forwarders = params.get("net_dns_forwarders", "").split()
net_dns_hostip = params.get("net_dns_hostip")
net_dns_hostnames = params.get("net_dns_hostnames", "").split()
net_domain = params.get("net_domain")
net_virtualport = params.get("net_virtualport")
net_bandwidth_inbound = params.get("net_bandwidth_inbound", "{}")
net_bandwidth_outbound = params.get("net_bandwidth_outbound", "{}")
net_ip_family = params.get("net_ip_family")
net_ip_address = params.get("net_ip_address")
net_ip_netmask = params.get("net_ip_netmask", "255.255.255.0")
net_ipv6_address = params.get("net_ipv6_address")
net_ipv6_prefix = params.get("net_ipv6_prefix", "64")
nat_port = params.get("nat_port")
guest_name = params.get("guest_name")
guest_ipv4 = params.get("guest_ipv4")
guest_ipv6 = params.get("guest_ipv6")
guest_mac = params.get("guest_mac")
dhcp_start_ipv4 = params.get("dhcp_start_ipv4", "192.168.122.2")
dhcp_end_ipv4 = params.get("dhcp_end_ipv4", "192.168.122.254")
dhcp_start_ipv6 = params.get("dhcp_start_ipv6")
dhcp_end_ipv6 = params.get("dhcp_end_ipv6")
tftp_root = params.get("tftp_root")
bootp_file = params.get("bootp_file")
routes = params.get("routes", "").split()
pg_name = params.get("portgroup_name", "").split()
try:
if not virsh.net_info(net_name, ignore_status=True).exit_status:
# Edit an existed network
netxml = network_xml.NetworkXML.new_from_net_dumpxml(net_name)
netxml.del_ip()
else:
netxml = network_xml.NetworkXML(net_name)
if net_dns_forward:
dns_dict["dns_forward"] = net_dns_forward
if net_dns_txt:
dns_dict["txt"] = ast.literal_eval(net_dns_txt)
if net_dns_srv:
dns_dict["srv"] = ast.literal_eval(net_dns_srv)
if net_dns_forwarders:
dns_dict["forwarders"] = [ast.literal_eval(x) for x in
net_dns_forwarders]
if net_dns_hostip:
host_dict["host_ip"] = net_dns_hostip
if net_dns_hostnames:
host_dict["hostnames"] = net_dns_hostnames
dns_obj = netxml.new_dns(**dns_dict)
if host_dict:
host = dns_obj.new_host(**host_dict)
dns_obj.host = host
netxml.dns = dns_obj
bridge = ast.literal_eval(net_bridge)
if bridge:
netxml.bridge = bridge
forward = ast.literal_eval(net_forward)
if forward:
netxml.forward = forward
if forward_iface:
interface = [
{'dev': x} for x in forward_iface.split()]
netxml.forward_interface = interface
if nat_port:
netxml.nat_port = ast.literal_eval(nat_port)
if net_domain:
netxml.domain_name = net_domain
net_inbound = ast.literal_eval(net_bandwidth_inbound)
net_outbound = ast.literal_eval(net_bandwidth_outbound)
if net_inbound:
netxml.bandwidth_inbound = net_inbound
if net_outbound:
netxml.bandwidth_outbound = net_outbound
if net_virtualport:
netxml.virtualport_type = net_virtualport
if net_ip_family == "ipv6":
ipxml = network_xml.IPXML()
ipxml.family = net_ip_family
ipxml.prefix = net_ipv6_prefix
del ipxml.netmask
if net_ipv6_address:
ipxml.address = net_ipv6_address
if dhcp_start_ipv6 and dhcp_end_ipv6:
ipxml.dhcp_ranges = {"start": dhcp_start_ipv6,
"end": dhcp_end_ipv6}
if guest_name and guest_ipv6 and guest_mac:
ipxml.hosts = [{"name": guest_name,
"ip": guest_ipv6}]
netxml.set_ip(ipxml)
if net_ip_address:
ipxml = network_xml.IPXML(net_ip_address,
net_ip_netmask)
if dhcp_start_ipv4 and dhcp_end_ipv4:
ipxml.dhcp_ranges = {"start": dhcp_start_ipv4,
"end": dhcp_end_ipv4}
if tftp_root:
ipxml.tftp_root = tftp_root
if bootp_file:
ipxml.dhcp_bootp = bootp_file
if guest_name and guest_ipv4 and guest_mac:
ipxml.hosts = [{"mac": guest_mac,
"name": guest_name,
"ip": guest_ipv4}]
netxml.set_ip(ipxml)
if routes:
netxml.routes = [ast.literal_eval(x) for x in routes]
if pg_name:
pg_default = params.get("portgroup_default",
"").split()
pg_virtualport = params.get(
"portgroup_virtualport", "").split()
pg_bandwidth_inbound = params.get(
"portgroup_bandwidth_inbound", "").split()
pg_bandwidth_outbound = params.get(
"portgroup_bandwidth_outbound", "").split()
pg_vlan = params.get("portgroup_vlan", "").split()
for i in range(len(pg_name)):
pgxml = network_xml.PortgroupXML()
pgxml.name = pg_name[i]
if len(pg_default) > i:
pgxml.default = pg_default[i]
if len(pg_virtualport) > i:
pgxml.virtualport_type = pg_virtualport[i]
if len(pg_bandwidth_inbound) > i:
pgxml.bandwidth_inbound = ast.literal_eval(pg_bandwidth_inbound[i])
if len(pg_bandwidth_outbound) > i:
pgxml.bandwidth_outbound = ast.literal_eval(pg_bandwidth_outbound[i])
if len(pg_vlan) > i:
pgxml.vlan_tag = ast.literal_eval(pg_vlan[i])
netxml.set_portgroup(pgxml)
logging.debug("New network xml file: %s", netxml)
netxml.xmltreefile.write()
return netxml
except Exception, detail:
utils.log_last_traceback()
raise error.TestFail("Fail to create network XML: %s" % detail)
[docs]def create_nwfilter_xml(params):
"""
Create a new network filter or update an existed network filter xml
"""
filter_name = params.get("filter_name", "testcase")
exist_filter = params.get("exist_filter", "no-mac-spoofing")
filter_chain = params.get("filter_chain")
filter_priority = params.get("filter_priority", "")
filter_uuid = params.get("filter_uuid")
# process filterref_name
filterrefs_list = []
filterrefs_key = []
for i in params.keys():
if 'filterref_name_' in i:
filterrefs_key.append(i)
filterrefs_key.sort()
for i in filterrefs_key:
filterrefs_dict = {}
filterrefs_dict['filter'] = params[i]
filterrefs_list.append(filterrefs_dict)
# prepare rule and protocol attributes
protocol = {}
rule_dict = {}
rule_dict_tmp = {}
RULE_ATTR = ('rule_action', 'rule_direction', 'rule_priority',
'rule_statematch')
PROTOCOL_TYPES = ['mac', 'vlan', 'stp', 'arp', 'rarp', 'ip', 'ipv6',
'tcp', 'udp', 'sctp', 'icmp', 'igmp', 'esp', 'ah',
'udplite', 'all', 'tcp-ipv6', 'udp-ipv6', 'sctp-ipv6',
'icmpv6', 'esp-ipv6', 'ah-ipv6', 'udplite-ipv6',
'all-ipv6']
# rule should end with 'EOL' as separator, multiple rules are supported
rule = params.get("rule")
if rule:
rule_list = rule.split('EOL')
for i in range(len(rule_list)):
if rule_list[i]:
attr = rule_list[i].split()
for j in range(len(attr)):
attr_list = attr[j].split('=')
rule_dict_tmp[attr_list[0]] = attr_list[1]
rule_dict[i] = rule_dict_tmp
rule_dict_tmp = {}
# process protocol parameter
for i in rule_dict.keys():
if 'protocol' not in rule_dict[i]:
# Set protocol as string 'None' as parse from cfg is
# string 'None'
protocol[i] = 'None'
else:
protocol[i] = rule_dict[i]['protocol']
rule_dict[i].pop('protocol')
if protocol[i] in PROTOCOL_TYPES:
# replace '-' with '_' in ipv6 types as '-' is not
# supposed to be in class name
if '-' in protocol[i]:
protocol[i] = protocol[i].replace('-', '_')
else:
raise error.TestFail("Given protocol type %s"
" is not in supported list %s"
% (protocol[i], PROTOCOL_TYPES))
try:
new_filter = nwfilter_xml.NwfilterXML()
filterxml = new_filter.new_from_filter_dumpxml(exist_filter)
# Set filter attribute
filterxml.filter_name = filter_name
filterxml.filter_priority = filter_priority
if filter_chain:
filterxml.filter_chain = filter_chain
if filter_uuid:
filterxml.uuid = filter_uuid
filterxml.filterrefs = filterrefs_list
# Set rule attribute
index_total = filterxml.get_rule_index()
rule = filterxml.get_rule(0)
rulexml = rule.backup_rule()
for i in index_total:
filterxml.del_rule()
for i in range(len(rule_dict.keys())):
rulexml.rule_action = rule_dict[i].get('rule_action')
rulexml.rule_direction = rule_dict[i].get('rule_direction')
rulexml.rule_priority = rule_dict[i].get('rule_priority')
rulexml.rule_statematch = rule_dict[i].get('rule_statematch')
for j in RULE_ATTR:
if j in rule_dict[i].keys():
rule_dict[i].pop(j)
# set protocol attribute
if protocol[i] != 'None':
protocolxml = rulexml.get_protocol(protocol[i])
new_one = protocolxml.new_attr(**rule_dict[i])
protocolxml.attrs = new_one
rulexml.xmltreefile = protocolxml.xmltreefile
else:
rulexml.del_protocol()
filterxml.add_rule(rulexml)
# Reset rulexml
rulexml = rule.backup_rule()
filterxml.xmltreefile.write()
logging.info("The network filter xml is:\n%s" % filterxml)
return filterxml
except Exception, detail:
utils.log_last_traceback()
raise error.TestFail("Fail to create nwfilter XML: %s" % detail)
[docs]def create_channel_xml(params, alias=False, address=False):
"""
Create a XML contains channel information.
:param params: the params for Channel slot
:param alias: allow to add 'alias' slot
:param address: allow to add 'address' slot
"""
# Create attributes dict for channel's element
channel_source = {}
channel_target = {}
channel_alias = {}
channel_address = {}
channel_params = {}
channel_type_name = params.get("channel_type_name")
source_mode = params.get("source_mode")
source_path = params.get("source_path")
target_type = params.get("target_type")
target_name = params.get("target_name")
if channel_type_name is None:
raise error.TestFail("channel_type_name not specified.")
# if these params are None, it won't be used.
if source_mode:
channel_source['mode'] = source_mode
if source_path:
channel_source['path'] = source_path
if target_type:
channel_target['type'] = target_type
if target_name:
channel_target['name'] = target_name
channel_params = {'type_name': channel_type_name,
'source': channel_source,
'target': channel_target}
if alias:
channel_alias = target_name
channel_params['alias'] = {'name': channel_alias}
if address:
channel_address = {'type': 'virtio-serial',
'controller': '0',
'bus': '0'}
channel_params['address'] = channel_address
channelxml = channel.Channel.new_from_dict(channel_params)
logging.debug("Channel XML:\n%s", channelxml)
return channelxml
[docs]def set_domain_state(vm, vm_state):
"""
Set domain state.
:param vm: the vm object
:param vm_state: the given vm state string "shut off", "running"
"paused", "halt" or "pm_suspend"
"""
# reset domain state
if vm.is_alive():
vm.destroy(gracefully=False)
if not vm_state == "shut off":
vm.start()
session = vm.wait_for_login()
if vm_state == "paused":
vm.pause()
elif vm_state == "halt":
try:
session.cmd("halt")
except (aexpect.ShellProcessTerminatedError, aexpect.ShellStatusError):
# The halt command always gets these errors, but execution is OK,
# skip these errors
pass
elif vm_state == "pm_suspend":
# Execute "pm-suspend-hybrid" command directly will get Timeout error,
# so here execute it in background, and wait for 3s manually
if session.cmd_status("which pm-suspend-hybrid"):
raise error.TestNAError("Cannot execute this test for domain"
" doesn't have pm-suspend-hybrid command!")
session.cmd("pm-suspend-hybrid &")
time.sleep(3)
[docs]def set_guest_agent(vm):
"""
Set domain xml with guest agent channel and install guest agent rpm
in domain.
:param vm: the vm object
"""
logging.warning("This function is going to be deprecated. "
"Please use vm.prepare_guest_agent() instead.")
# reset domain state
if vm.is_alive():
vm.destroy(gracefully=False)
vmxml = vm_xml.VMXML.new_from_inactive_dumpxml(vm.name)
logging.debug("Attempting to set guest agent channel")
vmxml.set_agent_channel()
vmxml.sync()
vm.start()
session = vm.wait_for_login()
# Check if qemu-ga already started automatically
cmd = "rpm -q qemu-guest-agent || yum install -y qemu-guest-agent"
stat_install = session.cmd_status(cmd, 300)
if stat_install != 0:
raise error.TestFail("Fail to install qemu-guest-agent, make "
"sure that you have usable repo in guest")
# Check if qemu-ga already started
stat_ps = session.cmd_status("ps aux |grep [q]emu-ga")
if stat_ps != 0:
session.cmd("qemu-ga -d")
# Check if the qemu-ga really started
stat_ps = session.cmd_status("ps aux |grep [q]emu-ga")
if stat_ps != 0:
raise error.TestFail("Fail to run qemu-ga in guest")
[docs]def set_vm_disk(vm, params, tmp_dir=None, test=None):
"""
Replace vm first disk with given type in domain xml, including file type
(local, nfs), network type(gluster, iscsi), block type(use connected iscsi
block disk).
For all types, all following params are common and need be specified:
disk_device: default to 'disk'
disk_type: 'block' or 'network'
disk_target: default to 'vda'
disk_target_bus: default to 'virtio'
disk_format: default to 'qcow2'
disk_src_protocol: 'iscsi', 'gluster' or 'netfs'
For 'gluster' network type, following params are gluster only and need be
specified:
vol_name: string
pool_name: default to 'gluster-pool'
transport: 'tcp', 'rdma' or '', default to ''
For 'iscsi' network type, following params need be specified:
image_size: default to "10G", 10G is raw size of jeos disk
disk_src_host: default to "127.0.0.1"
disk_src_port: default to "3260"
For 'netfs' network type, following params need be specified:
mnt_path_name: the mount dir name, default to "nfs-mount"
export_options: nfs mount options, default to "rw,no_root_squash,fsid=0"
For 'block' type, using connected iscsi block disk, following params need
be specified:
image_size: default to "10G", 10G is raw size of jeos disk
:param vm: the vm object
:param tmp_dir: string, dir path
:param params: dict, dict include setup vm disk xml configurations
"""
vmxml = vm_xml.VMXML.new_from_inactive_dumpxml(vm.name)
logging.debug("original xml is: %s", vmxml.xmltreefile)
disk_device = params.get("disk_device", "disk")
disk_snapshot_attr = params.get("disk_snapshot_attr")
disk_type = params.get("disk_type")
disk_target = params.get("disk_target", 'vda')
disk_target_bus = params.get("disk_target_bus", "virtio")
disk_src_protocol = params.get("disk_source_protocol")
disk_src_name = params.get("disk_source_name")
disk_src_host = params.get("disk_source_host", "127.0.0.1")
disk_src_port = params.get("disk_source_port", "3260")
disk_src_config = params.get("disk_source_config")
disk_snap_name = params.get("disk_snap_name")
emu_image = params.get("emulated_image", "emulated-iscsi")
image_size = params.get("image_size", "10G")
disk_format = params.get("disk_format", "qcow2")
driver_iothread = params.get("driver_iothread", "")
mnt_path_name = params.get("mnt_path_name", "nfs-mount")
exp_opt = params.get("export_options", "rw,no_root_squash,fsid=0")
first_disk = vm.get_first_disk_devices()
blk_source = first_disk['source']
disk_xml = vmxml.devices.by_device_tag('disk')[0]
src_disk_format = disk_xml.xmltreefile.find('driver').get('type')
sec_model = params.get('sec_model')
relabel = params.get('relabel')
sec_label = params.get('sec_label')
pool_name = params.get("pool_name", "set-vm-disk-pool")
disk_src_mode = params.get('disk_src_mode', 'host')
auth_user = params.get("auth_user")
secret_type = params.get("secret_type")
secret_usage = params.get("secret_usage")
secret_uuid = params.get("secret_uuid")
disk_params = {'device_type': disk_device,
'disk_snapshot_attr': disk_snapshot_attr,
'type_name': disk_type,
'target_dev': disk_target,
'target_bus': disk_target_bus,
'driver_type': disk_format,
'driver_cache': 'none',
'driver_iothread': driver_iothread,
'sec_model': sec_model,
'relabel': relabel,
'sec_label': sec_label,
'auth_user': auth_user,
'secret_type': secret_type,
'secret_uuid': secret_uuid,
'secret_usage': secret_usage}
if not tmp_dir:
tmp_dir = data_dir.get_tmp_dir()
# gluster only params
vol_name = params.get("vol_name")
transport = params.get("transport", "")
brick_path = os.path.join(tmp_dir, pool_name)
image_convert = "yes" == params.get("image_convert", 'yes')
if vm.is_alive():
vm.destroy(gracefully=False)
# Replace domain disk with iscsi, gluster, block or netfs disk
if disk_src_protocol == 'iscsi':
if disk_type == 'block':
is_login = True
elif disk_type == 'network' or disk_type == 'volume':
is_login = False
else:
raise error.TestFail("Disk type '%s' not expected, only disk "
"type 'block', 'network' or 'volume' work "
"with 'iscsi'" % disk_type)
if disk_type == 'volume':
pvt = PoolVolumeTest(test, params)
pvt.pre_pool(pool_name, 'iscsi', "/dev/disk/by-path",
emulated_image=emu_image,
image_size=image_size)
# Get volume name
cmd_result = virsh.vol_list(pool_name)
try:
vol_name = re.findall(r"(\S+)\ +(\S+)[\ +\n]",
str(cmd_result.stdout))[1][0]
except IndexError:
raise error.TestError("Fail to get volume name in pool %s" %
pool_name)
emulated_path = virsh.vol_path(vol_name, pool_name,
debug=True).stdout.strip()
else:
# Setup iscsi target
if is_login:
iscsi_target = setup_or_cleanup_iscsi(
is_setup=True, is_login=is_login,
image_size=image_size, emulated_image=emu_image)
else:
iscsi_target, lun_num = setup_or_cleanup_iscsi(
is_setup=True, is_login=is_login,
image_size=image_size, emulated_image=emu_image)
emulated_path = os.path.join(tmp_dir, emu_image)
# Copy first disk to emulated backing store path
cmd = "qemu-img convert -f %s -O %s %s %s" % (src_disk_format,
disk_format,
blk_source,
emulated_path)
utils.run(cmd, ignore_status=False)
if disk_type == 'block':
disk_params_src = {'source_file': iscsi_target}
elif disk_type == "volume":
disk_params_src = {'source_pool': pool_name,
'source_volume': vol_name,
'source_mode': disk_src_mode}
else:
disk_params_src = {'source_protocol': disk_src_protocol,
'source_name': iscsi_target + "/" + lun_num,
'source_host_name': disk_src_host,
'source_host_port': disk_src_port}
elif disk_src_protocol == 'gluster':
# Setup gluster.
host_ip = setup_or_cleanup_gluster(True, vol_name,
brick_path, pool_name)
logging.debug("host ip: %s " % host_ip)
dist_img = "gluster.%s" % disk_format
if image_convert:
# Convert first disk to gluster disk path
disk_cmd = ("qemu-img convert -f %s -O %s %s /mnt/%s" %
(src_disk_format, disk_format, blk_source, dist_img))
else:
# create another disk without convert
disk_cmd = "qemu-img create -f %s /mnt/%s 10M" % (src_disk_format,
dist_img)
# Mount the gluster disk and create the image.
utils.run("mount -t glusterfs %s:%s /mnt; %s; umount /mnt"
% (host_ip, vol_name, disk_cmd))
disk_params_src = {'source_protocol': disk_src_protocol,
'source_name': "%s/%s" % (vol_name, dist_img),
'source_host_name': host_ip,
'source_host_port': "24007"}
if transport:
disk_params_src.update({"transport": transport})
elif disk_src_protocol == 'netfs':
# Setup nfs
res = setup_or_cleanup_nfs(True, mnt_path_name,
is_mount=True,
export_options=exp_opt)
exp_path = res["export_dir"]
mnt_path = res["mount_dir"]
params["selinux_status_bak"] = res["selinux_status_bak"]
dist_img = "nfs-img"
# Convert first disk to gluster disk path
disk_cmd = ("qemu-img convert -f %s -O %s %s %s/%s" %
(src_disk_format, disk_format,
blk_source, exp_path, dist_img))
utils.run(disk_cmd, ignore_status=False)
src_file_path = "%s/%s" % (mnt_path, dist_img)
disk_params_src = {'source_file': src_file_path}
elif disk_src_protocol == 'rbd':
mon_host = params.get("mon_host")
if image_convert:
disk_cmd = ("qemu-img convert -f %s -O %s %s rbd:%s:mon_host=%s"
% (src_disk_format, disk_format, blk_source,
disk_src_name, mon_host))
utils.run(disk_cmd, ignore_status=False)
disk_params_src = {'source_protocol': disk_src_protocol,
'source_name': disk_src_name,
'source_host_name': disk_src_host,
'source_host_port': disk_src_port,
'source_config_file': disk_src_config}
if disk_snap_name:
disk_params_src.update({'source_snap_name': disk_snap_name})
else:
# use current source file with update params
disk_params_src = {'source_file': blk_source}
# Delete disk elements
disks = vmxml.get_devices(device_type="disk")
for disk_ in disks:
if disk_.target['dev'] == disk_target:
vmxml.del_device(disk_)
# New disk xml
new_disk = disk.Disk(type_name=disk_type)
new_disk.new_disk_source(attrs={'file': blk_source})
disk_params.update(disk_params_src)
disk_xml = create_disk_xml(disk_params)
new_disk.xml = disk_xml
# Add new disk xml and redefine vm
vmxml.add_device(new_disk)
# Set domain options
dom_iothreads = params.get("dom_iothreads")
if dom_iothreads:
vmxml.iothreads = int(dom_iothreads)
logging.debug("The vm xml now is: %s" % vmxml.xmltreefile)
vmxml.sync()
vm.start()
[docs]def attach_additional_device(vm_name, targetdev, disk_path, params, config=True):
"""
Create a disk with disksize, then attach it to given vm.
:param vm_name: Libvirt VM name.
:param disk_path: path of attached disk
:param targetdev: target of disk device
:param params: dict include necessary configurations of device
"""
logging.info("Attaching disk...")
# Update params for source file
params['source_file'] = disk_path
params['target_dev'] = targetdev
# Create a file of device
xmlfile = create_disk_xml(params)
# To confirm attached device do not exist.
if config:
extra = "--config"
else:
extra = ""
virsh.detach_disk(vm_name, targetdev, extra=extra)
return virsh.attach_device(domain_opt=vm_name, file_opt=xmlfile,
flagstr=extra, debug=True)
[docs]def device_exists(vm, target_dev):
"""
Check if given target device exists on vm.
"""
targets = vm.get_blk_devices().keys()
if target_dev in targets:
return True
return False
[docs]def create_local_disk(disk_type, path=None,
size="10", disk_format="raw",
vgname=None, lvname=None):
if disk_type != "lvm" and path is None:
raise error.TestError("Path is needed for creating local disk")
if path:
utils.run("mkdir -p %s" % os.path.dirname(path))
try:
size = str(float(size)) + "G"
except ValueError:
pass
cmd = ""
if disk_type == "file":
cmd = "qemu-img create -f %s %s %s" % (disk_format, path, size)
elif disk_type == "floppy":
cmd = "dd if=/dev/zero of=%s count=1024 bs=1024" % path
elif disk_type == "iso":
cmd = "mkisofs -o %s /root/*.*" % path
elif disk_type == "lvm":
if vgname is None or lvname is None:
raise error.TestError("Both VG name and LV name are needed")
lv_utils.lv_create(vgname, lvname, size)
path = "/dev/%s/%s" % (vgname, lvname)
else:
raise error.TestError("Unknown disk type %s" % disk_type)
if cmd:
utils.run(cmd, ignore_status=True)
return path
[docs]def delete_local_disk(disk_type, path=None,
vgname=None, lvname=None):
if disk_type in ["file", "floppy", "iso"]:
if path is None:
raise error.TestError("Path is needed for deleting local disk")
else:
cmd = "rm -f %s" % path
utils.run(cmd, ignore_status=True)
elif disk_type == "lvm":
if vgname is None or lvname is None:
raise error.TestError("Both VG name and LV name needed")
lv_utils.lv_remove(vgname, lvname)
else:
raise error.TestError("Unknown disk type %s" % disk_type)
[docs]def create_scsi_disk(scsi_option, scsi_size="2048"):
"""
Get the scsi device created by scsi_debug kernel module
:param scsi_option. The scsi_debug kernel module options.
:return: scsi device if it is created successfully.
"""
try:
utils_misc.find_command("lsscsi")
except ValueError:
raise error.TestNAError("Missing command 'lsscsi'.")
try:
# Load scsi_debug kernel module.
# Unload it first if it's already loaded.
if utils.module_is_loaded("scsi_debug"):
utils.unload_module("scsi_debug")
utils.load_module("scsi_debug dev_size_mb=%s %s"
% (scsi_size, scsi_option))
# Get the scsi device name
scsi_disk = utils.run("lsscsi|grep scsi_debug|"
"awk '{print $6}'").stdout.strip()
logging.info("scsi disk: %s" % scsi_disk)
return scsi_disk
except Exception, e:
logging.error(str(e))
return None
[docs]def delete_scsi_disk():
"""
Delete scsi device by removing scsi_debug kernel module.
"""
if utils.module_is_loaded("scsi_debug"):
utils.unload_module("scsi_debug")
[docs]def set_controller_multifunction(vm_name, controller_type='scsi'):
"""
Set multifunction on for controller device and expand to all function.
"""
vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name)
exist_controllers = vmxml.get_devices("controller")
# Used to contain controllers in format:
# domain:bus:slot:func -> controller object
expanded_controllers = {}
# The index of controller
index = 0
for e_controller in exist_controllers:
if e_controller.type != controller_type:
continue
# Set multifunction on
address_attrs = e_controller.address.attrs
address_attrs['multifunction'] = "on"
domain = address_attrs['domain']
bus = address_attrs['bus']
slot = address_attrs['slot']
all_funcs = ["0x0", "0x1", "0x2", "0x3", "0x4", "0x5", "0x6"]
for func in all_funcs:
key = "%s:%s:%s:%s" % (domain, bus, slot, func)
address_attrs['function'] = func
# Create a new controller instance
new_controller = controller.Controller(controller_type)
new_controller.xml = str(xml_utils.XMLTreeFile(e_controller.xml))
new_controller.index = index
new_controller.address = new_controller.new_controller_address(
attrs=address_attrs)
# Expand controller to all functions with multifunction
if key not in expanded_controllers.keys():
expanded_controllers[key] = new_controller
index += 1
logging.debug("Expanded controllers: %s", expanded_controllers.values())
vmxml.del_controller(controller_type)
vmxml.set_controller(expanded_controllers.values())
vmxml.sync()
[docs]def attach_disks(vm, path, vgname, params):
"""
Attach multiple disks.According parameter disk_type in params,
it will create lvm or file type disks.
:param path: file type disk's path
:param vgname: lvm type disk's volume group name
"""
# Additional disk on vm
disks_count = int(params.get("added_disks_count", 1)) - 1
multifunction_on = "yes" == params.get("multifunction_on", "no")
disk_size = params.get("added_disk_size", "0.1")
disk_type = params.get("added_disk_type", "file")
disk_target = params.get("added_disk_target", "virtio")
disk_format = params.get("added_disk_format", "raw")
# Whether attaching device with --config
attach_config = "yes" == params.get("attach_disk_config", "yes")
def generate_disks_index(count, target="virtio"):
# Created disks' index
target_list = []
# Used to flag progression
index = 0
# A list to maintain prefix for generating device
# ['a','b','c'] means prefix abc
prefix_list = []
while count > 0:
# Out of range for current prefix_list
if (index / 26) > 0:
# Update prefix_list to expand disks, such as [] -> ['a'],
# ['z'] -> ['a', 'a'], ['z', 'z'] -> ['a', 'a', 'a']
prefix_index = len(prefix_list)
if prefix_index == 0:
prefix_list.append('a')
# Append a new prefix to list, then update pre-'z' in list
# to 'a' to keep the progression 1
while prefix_index > 0:
prefix_index -= 1
prefix_cur = prefix_list[prefix_index]
if prefix_cur == 'z':
prefix_list[prefix_index] = 'a'
# All prefix in prefix_list are 'z',
# it's time to expand it.
if prefix_index == 0:
prefix_list.append('a')
else:
# For whole prefix_list, progression is 1
prefix_list[prefix_index] = chr(ord(prefix_cur) + 1)
break
# Reset for another iteration
index = 0
prefix = "".join(prefix_list)
suffix_index = index % 26
suffix = chr(ord('a') + suffix_index)
index += 1
count -= 1
# Generate device target according to driver type
if target == "virtio":
target_dev = "vd%s" % (prefix + suffix)
elif target == "scsi":
target_dev = "sd%s" % (prefix + suffix)
target_list.append(target_dev)
return target_list
target_list = generate_disks_index(disks_count, disk_target)
# A dict include disks information: source file and size
added_disks = {}
for target_dev in target_list:
# Do not attach if it does already exist
if device_exists(vm, target_dev):
continue
# Prepare controller for special disks like virtio-scsi
# Open multifunction to add more controller for disks(150 or more)
if multifunction_on:
set_controller_multifunction(vm.name, disk_target)
disk_params = {}
disk_params['type_name'] = disk_type
disk_params['target_dev'] = target_dev
disk_params['target_bus'] = disk_target
disk_params['device_type'] = params.get("device_type", "disk")
device_name = "%s_%s" % (target_dev, vm.name)
disk_path = os.path.join(os.path.dirname(path), device_name)
disk_path = create_local_disk(disk_type, disk_path,
disk_size, disk_format,
vgname, device_name)
added_disks[disk_path] = disk_size
result = attach_additional_device(vm.name, target_dev, disk_path,
disk_params, attach_config)
if result.exit_status:
raise error.TestFail("Attach device %s failed."
% target_dev)
logging.debug("New VM XML:\n%s", vm.get_xml())
return added_disks
[docs]def define_new_vm(vm_name, new_name):
"""
Just define a new vm from given name
"""
try:
vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name)
vmxml.vm_name = new_name
del vmxml.uuid
vmxml.define()
return True
except xcepts.LibvirtXMLError, detail:
logging.error(detail)
return False
[docs]def remotely_control_libvirtd(server_ip, server_user, server_pwd,
action='restart', status_error='no'):
"""
Remotely restart libvirt service
"""
session = None
try:
session = remote.wait_for_login('ssh', server_ip, '22',
server_user, server_pwd,
r"[\#\$]\s*$")
logging.info("%s libvirt daemon\n", action)
service_libvirtd_control(action, session)
session.close()
except (remote.LoginError, aexpect.ShellError, error.CmdError), detail:
if session:
session.close()
if status_error == "no":
raise error.TestFail("Failed to %s libvirtd service on "
"server: %s\n", action, detail)
else:
logging.info("It is an expect %s", detail)
[docs]def connect_libvirtd(uri, read_only="", virsh_cmd="list", auth_user=None,
auth_pwd=None, vm_name="", status_error="no",
extra="", log_level='LIBVIRT_DEBUG=3', su_user="",
patterns_virsh_cmd=".*Id\s*Name\s*State\s*.*"):
"""
Connect libvirt daemon
"""
patterns_yes_no = r".*[Yy]es.*[Nn]o.*"
patterns_auth_name_comm = r".*name:.*"
patterns_auth_name_xen = r".*name.*root.*:.*"
patterns_auth_pwd = r".*[Pp]assword.*"
command = "%s %s virsh %s -c %s %s %s" % (extra, log_level, read_only,
uri, virsh_cmd, vm_name)
# allow specific user to run virsh command
if su_user != "":
command = "su %s -c '%s'" % (su_user, command)
logging.info("Execute %s", command)
# setup shell session
session = aexpect.ShellSession(command, echo=True)
try:
# requires access authentication
match_list = [patterns_yes_no, patterns_auth_name_comm,
patterns_auth_name_xen, patterns_auth_pwd,
patterns_virsh_cmd]
while True:
match, text = session.read_until_any_line_matches(match_list,
timeout=30,
internal_timeout=1)
if match == -5:
logging.info("Matched 'yes/no', details: <%s>", text)
session.sendline("yes")
elif match == -3 or match == -4:
logging.info("Matched 'username', details: <%s>", text)
session.sendline(auth_user)
elif match == -2:
logging.info("Matched 'password', details: <%s>", text)
session.sendline(auth_pwd)
elif match == -1:
logging.info("Expected output of virsh command: <%s>", text)
break
else:
logging.error("The real prompt text: <%s>", text)
break
session.close()
return True
except (aexpect.ShellError, aexpect.ExpectError), details:
log = session.get_output()
session.close()
logging.error("Failed to connect libvirtd: %s\n%s", details, log)
return False
[docs]def get_all_vol_paths():
"""
Get all volumes' path in host
"""
vol_path = []
sp = libvirt_storage.StoragePool()
for pool_name in sp.list_pools().keys():
if sp.list_pools()[pool_name]['State'] != "active":
logging.warning("Inactive pool '%s' cannot be processed" % pool_name)
continue
pv = libvirt_storage.PoolVolume(pool_name)
for path in pv.list_volumes().values():
vol_path.append(path)
return set(vol_path)
[docs]def do_migration(vm_name, uri, extra, auth_pwd, auth_user="root",
options="--verbose", virsh_patterns=".*100\s%.*",
su_user="", timeout=30):
"""
Migrate VM to target host.
"""
patterns_yes_no = r".*[Yy]es.*[Nn]o.*"
patterns_auth_name = r".*name:.*"
patterns_auth_pwd = r".*[Pp]assword.*"
command = "%s virsh migrate %s %s %s" % (extra, vm_name, options, uri)
# allow specific user to run virsh command
if su_user != "":
command = "su %s -c '%s'" % (su_user, command)
logging.info("Execute %s", command)
# setup shell session
session = aexpect.ShellSession(command, echo=True)
try:
# requires access authentication
match_list = [patterns_yes_no, patterns_auth_name,
patterns_auth_pwd, virsh_patterns]
while True:
match, text = session.read_until_any_line_matches(match_list,
timeout=timeout,
internal_timeout=1)
if match == -4:
logging.info("Matched 'yes/no', details: <%s>", text)
session.sendline("yes")
elif match == -3:
logging.info("Matched 'username', details: <%s>", text)
session.sendline(auth_user)
elif match == -2:
logging.info("Matched 'password', details: <%s>", text)
session.sendline(auth_pwd)
elif match == -1:
logging.info("Expected output of virsh migrate: <%s>", text)
break
else:
logging.error("The real prompt text: <%s>", text)
break
session.close()
return True
except (aexpect.ShellError, aexpect.ExpectError), details:
log = session.get_output()
session.close()
logging.error("Failed to migrate %s: %s\n%s", vm_name, details, log)
return False
[docs]def update_vm_disk_source(vm_name, disk_source_path, source_type="file"):
"""
Update disk source path of the VM
:param source_type: it may be 'dev' or 'file' type, which is default
"""
if not os.path.isdir(disk_source_path):
logging.error("Require disk source path!!")
return False
# Prepare to update VM first disk source file
vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name)
devices = vmxml.devices
disk_index = devices.index(devices.by_device_tag('disk')[0])
disks = devices[disk_index]
disk_source = disks.source.get_attrs().get(source_type)
logging.debug("The disk source file of the VM: %s", disk_source)
if not os.path.exists(disk_source):
logging.error("The disk source doesn't exist!!")
return False
vm_name_with_format = os.path.basename(disk_source)
new_disk_source = os.path.join(disk_source_path, vm_name_with_format)
logging.debug("The new disk source file of the VM: %s", new_disk_source)
# Update VM disk source file
disks.source = disks.new_disk_source(**{'attrs': {'%s' % source_type:
"%s" % new_disk_source}})
# SYNC VM XML change
vmxml.devices = devices
logging.debug("The new VM XML:\n%s", vmxml.xmltreefile)
vmxml.sync()
return True
[docs]def hotplug_domain_vcpu(domain, count, by_virsh=True, hotplug=True):
"""
Hot-plug/Hot-unplug vcpu for domian
:param domain: Domain name, id, uuid
:param count: to setvcpus it's the current vcpus number,
but to qemu-monitor-command,
we need to designate a specific CPU ID.
The default will be got by (count - 1)
:param by_virsh: True means hotplug/unplug by command setvcpus,
otherwise, using qemu_monitor
:param hotplug: True means hot-plug, False means hot-unplug
"""
if by_virsh:
result = virsh.setvcpus(domain, count, "--live", debug=True)
else:
if hotplug:
cpu_opt = "cpu-add"
else:
cpu_opt = "cpu-del"
# Note: cpu-del is supported currently, it will return error.
# as follow,
# {
# "id": "libvirt-23",
# "error": {
# "class": "CommandNotFound",
# "desc": "The command cpu-del has not been found"
# }
# }
# so, the caller should check the result.
# hot-plug/hot-plug the CPU has maximal ID
params = (cpu_opt, (count - 1))
cmd = '{\"execute\":\"%s\",\"arguments\":{\"id\":%d}}' % params
result = virsh.qemu_monitor_command(domain,
cmd,
"--pretty",
debug=True)
return result
[docs]def exec_virsh_edit(source, edit_cmd, connect_uri="qemu:///system"):
"""
Execute edit command.
:param source : virsh edit's option.
:param edit_cmd: Edit command list to execute.
:return: True if edit is successful, False if edit is failure.
"""
logging.info("Trying to edit xml with cmd %s", edit_cmd)
session = aexpect.ShellSession("sudo -s")
try:
session.sendline("virsh -c %s edit %s" % (connect_uri, source))
for cmd in edit_cmd:
session.sendline(cmd)
session.send('\x1b')
session.send('ZZ')
remote.handle_prompts(session, None, None, r"[\#\$]\s*$", debug=True)
session.close()
return True
except Exception, e:
session.close()
logging.error("Error occurred: %s", e)
return False
[docs]def new_disk_vol_name(pool_name):
"""
According to BZ#1138523, the new volume name must be the next
created partition(sdb1, etc.), so we need to inspect the original
partitions of the disk then count the new partition number.
:param pool_name: Disk pool name
:return: New volume name or none
"""
poolxml = pool_xml.PoolXML.new_from_dumpxml(pool_name)
if poolxml.get_type(pool_name) != "disk":
logging.error("This is not a disk pool")
return None
disk = poolxml.get_source().device_path[5:]
part_num = len(filter(lambda s: s.startswith(disk), get_parts_list()))
return disk + str(part_num)
[docs]def update_polkit_rule(params, pattern, new_value):
"""
This function help to update the rule during testing.
:param params: Test run params
:param pattern: Regex pattern for updating
:param new_value: New value for updating
"""
polkit = LibvirtPolkitConfig(params)
polkit_rules_path = polkit.polkit_rules_path
try:
polkit_f = open(polkit_rules_path, 'r+')
rule = polkit_f.read()
new_rule = re.sub(pattern, new_value, rule)
polkit_f.seek(0)
polkit_f.truncate()
polkit_f.write(new_rule)
polkit_f.close()
logging.debug("New polkit config rule is:\n%s", new_rule)
polkit.polkitd.restart()
except IOError, e:
logging.error(e)