Source code for virttest.utils_test.libguestfs

import re
import os
import logging
import commands
from autotest.client.shared import error, utils
from virttest import virsh, virt_vm, libvirt_vm, data_dir
from virttest import utils_net, xml_utils
from virttest.libvirt_xml import vm_xml, xcepts
from virttest import utils_libguestfs as lgf
from virttest import qemu_storage


[docs]class VTError(Exception): pass
[docs]class VTAttachError(VTError): def __init__(self, cmd, output): super(VTAttachError, self).__init__(cmd, output) self.cmd = cmd self.output = output def __str__(self): return ("Attach command failed:%s\n%s" % (self.cmd, self.output))
[docs]class VTMountError(VTError): def __init__(self, cmd, output): VTError.__init__(self, cmd, output) self.cmd = cmd self.output = output def __str__(self): return ("Mount command failed:%s\n%s" % (self.cmd, self.output))
[docs]class VTXMLParseError(VTError): def __init__(self, cmd, output): super(VTXMLParseError, self).__init__(cmd, output) self.cmd = cmd self.output = output def __str__(self): return ("Parse XML with '%s' failed:%s" % (self.cmd, self.output))
[docs]def preprocess_image(params): """ Create a disk which used by guestfish params: Get params from cfg file """ image_dir = params.get("img_dir", data_dir.get_tmp_dir()) image_name = params.get("image_name", "gs_common") image = qemu_storage.QemuImg(params, image_dir, image_name) image_path, _ = image.create(params) logging.info("Image created in %s" % image_path) return image_path
[docs]def primary_disk_virtio(vm): """ To verify if system disk is virtio. :param vm: Libvirt VM object. """ vmdisks = vm.get_disk_devices() if "vda" in vmdisks.keys(): return True return False
[docs]def get_primary_disk(vm): """ Get primary disk source. :param vm: Libvirt VM object. """ vmdisks = vm.get_disk_devices() if len(vmdisks): pri_target = ['vda', 'sda', 'hda'] for target in pri_target: try: return vmdisks[target]['source'] except KeyError: pass return None
[docs]def attach_additional_disk(vm, disksize, targetdev): """ Create a disk with disksize, then attach it to given vm. :param vm: Libvirt VM object. :param disksize: size of attached disk :param targetdev: target of disk device """ logging.info("Attaching disk...") disk_path = os.path.join(data_dir.get_tmp_dir(), targetdev) cmd = "qemu-img create %s %s" % (disk_path, disksize) status, output = commands.getstatusoutput(cmd) if status: return (False, output) # To confirm attached device do not exist. virsh.detach_disk(vm.name, targetdev, extra="--config") attach_result = virsh.attach_disk(vm.name, disk_path, targetdev, extra="--config", debug=True) if attach_result.exit_status: return (False, attach_result) return (True, disk_path)
[docs]def define_new_vm(vm_name, new_name): """ Just define a new vm from given name """ try: vmxml = vm_xml.VMXML.new_from_dumpxml(vm_name) vmxml.vm_name = new_name del vmxml.uuid logging.debug(str(vmxml)) vmxml.define() return True except xcepts.LibvirtXMLError, detail: logging.error(detail) return False
[docs]def cleanup_vm(vm_name=None, disk=None): """ Cleanup the vm with its disk deleted. """ try: if vm_name is not None: virsh.undefine(vm_name) except error.CmdError, detail: logging.error("Undefine %s failed:%s", vm_name, detail) try: if disk is not None: if os.path.exists(disk): os.remove(disk) except IOError, detail: logging.error("Remove disk %s failed:%s", disk, detail)
[docs]class VirtTools(object): """ Useful functions for virt-commands. Some virt-tools need an input disk and output disk. Main for virt-clone, virt-sparsify, virt-resize. """ def __init__(self, vm, params): self.params = params self.oldvm = vm # Many command will create a new vm or disk, init it here self.newvm = libvirt_vm.VM("VTNEWVM", vm.params, vm.root_dir, vm.address_cache) # Prepare for created vm disk self.indisk = get_primary_disk(vm) self.outdisk = None
[docs] def update_vm_disk(self): """ Update oldvm's disk, and then create a newvm. """ target_dev = self.params.get("gf_updated_target_dev", "vdb") device_size = self.params.get("gf_updated_device_size", "50M") self.newvm.name = self.params.get("gf_updated_new_vm") if self.newvm.is_alive(): self.newvm.destroy() self.newvm.wait_for_shutdown() attachs, attacho = attach_additional_disk(self.newvm, disksize=device_size, targetdev=target_dev) if attachs: # Restart vm for guestfish command # Otherwise updated disk is not visible try: self.newvm.start() self.newvm.wait_for_login() self.newvm.destroy() self.newvm.wait_for_shutdown() self.params['added_disk_path'] = attacho except virt_vm.VMError, detail: raise VTAttachError("", str(detail)) else: raise VTAttachError("", attacho)
[docs] def clone_vm_filesystem(self, newname=None): """ Clone a new vm with only its filesystem disk. :param newname:if newname is None, create a new name with clone added. """ logging.info("Cloning...") # Init options for virt-clone options = {} autoclone = bool(self.params.get("autoclone", False)) new_filesystem_path = self.params.get("new_filesystem_path") cloned_files = [] if new_filesystem_path: self.outdisk = new_filesystem_path elif self.indisk is not None: self.outdisk = "%s-clone" % self.indisk cloned_files.append(self.outdisk) options['files'] = cloned_files # cloned_mac can be CREATED, RANDOM or a string. cloned_mac = self.params.get("cloned_mac", "CREATED") if cloned_mac == "CREATED": options['mac'] = utils_net.generate_mac_address_simple() else: options['mac'] = cloned_mac options['ignore_status'] = True options['debug'] = True options['timeout'] = int(self.params.get("timeout", 240)) if newname is None: newname = "%s-virtclone" % self.oldvm.name result = lgf.virt_clone_cmd(self.oldvm.name, newname, autoclone, **options) if result.exit_status: error_info = "Clone %s to %s failed." % (self.oldvm.name, newname) logging.error(error_info) return (False, result) else: self.newvm.name = newname cloned_mac = vm_xml.VMXML.get_first_mac_by_name(newname) if cloned_mac is not None: self.newvm.address_cache[cloned_mac] = None return (True, result)
[docs] def sparsify_disk(self): """ Sparsify a disk """ logging.info("Sparsifing...") if self.indisk is None: logging.error("No disk can be sparsified.") return (False, "Input disk is None.") if self.outdisk is None: self.outdisk = "%s-sparsify" % self.indisk timeout = int(self.params.get("timeout", 240)) result = lgf.virt_sparsify_cmd(self.indisk, self.outdisk, ignore_status=True, debug=True, timeout=timeout) if result.exit_status: error_info = "Sparsify %s to %s failed." % (self.indisk, self.outdisk) logging.error(error_info) return (False, result) return (True, result)
[docs] def define_vm_with_newdisk(self): """ Define the new vm with old vm's configuration Changes: 1.replace name 2.delete uuid 3.replace disk """ logging.info("Define a new vm:") old_vm_name = self.oldvm.name new_vm_name = "%s-vtnewdisk" % old_vm_name self.newvm.name = new_vm_name old_disk = self.indisk new_disk = self.outdisk try: vmxml = vm_xml.VMXML.new_from_dumpxml(old_vm_name) vmxml.vm_name = new_vm_name vmxml.uuid = "" vmxml.set_xml(re.sub(old_disk, new_disk, str(vmxml.__dict_get__('xml')))) logging.debug(vmxml.__dict_get__('xml')) vmxml.define() except xcepts.LibvirtXMLError, detail: logging.debug(detail) return (False, detail) return (True, vmxml.xml)
[docs] def expand_vm_filesystem(self, resize_part_num=2, resized_size="+1G", new_disk=None): """ Expand vm's filesystem with virt-resize. """ logging.info("Resizing vm's disk...") options = {} options['resize'] = "/dev/sda%s" % resize_part_num options['resized_size'] = resized_size if new_disk is not None: self.outdisk = new_disk elif self.outdisk is None: self.outdisk = "%s-resize" % self.indisk options['ignore_status'] = True options['debug'] = True options['timeout'] = int(self.params.get("timeout", 480)) result = lgf.virt_resize_cmd(self.indisk, self.outdisk, **options) if result.exit_status: logging.error(result) return (False, result) return (True, self.outdisk)
[docs] def guestmount(self, mountpoint, disk_or_domain=None): """ Mount filesystems in a disk or domain to host mountpoint. :param disk_or_domain: if it is None, use default vm in params """ logging.info("Mounting filesystems...") if disk_or_domain is None: disk_or_domain = self.oldvm.name if not os.path.isdir(mountpoint): os.mkdir(mountpoint) if os.path.ismount(mountpoint): utils.run("umount -l %s" % mountpoint, ignore_status=True) inspector = "yes" == self.params.get("gm_inspector", "yes") readonly = "yes" == self.params.get("gm_readonly", "no") special_mountpoints = self.params.get("special_mountpoints", []) is_disk = "yes" == self.params.get("gm_is_disk", "no") options = {} options['ignore_status'] = True options['debug'] = True options['timeout'] = int(self.params.get("timeout", 240)) options['special_mountpoints'] = special_mountpoints options['is_disk'] = is_disk result = lgf.guestmount(disk_or_domain, mountpoint, inspector, readonly, **options) if result.exit_status: error_info = "Mount %s to %s failed." % (disk_or_domain, mountpoint) logging.error(result) return (False, error_info) return (True, mountpoint)
[docs] def write_file_with_guestmount(self, mountpoint, path, content=None, vm_ref=None, cleanup=True): """ Write content to file with guestmount """ logging.info("Creating file...") gms, gmo = self.guestmount(mountpoint, vm_ref) if gms is True: mountpoint = gmo else: logging.error("Create file %s failed.", path) return (False, gmo) # file's path on host's mountpoint # Connect mountpoint and path, then remove additional character '/' file_path = os.path.abspath("%s/%s" % (mountpoint, path)) if content is None: content = "This is a temp file with guestmount." try: fd = open(file_path, "w") fd.write(content) fd.close() except IOError, detail: logging.error(detail) return (False, detail) logging.info("Create file %s successfully", file_path) # Cleanup created file if cleanup: utils.run("rm -f %s" % file_path, ignore_status=True) return (True, file_path)
[docs] def get_primary_disk_fs_type(self): """ Get primary disk filesystem type """ result = lgf.virt_filesystems(self.oldvm.name, long_format=True) if result.exit_status: raise error.TestNAError("Cannot get primary disk" " filesystem information!") fs_info = result.stdout.strip().splitlines() if len(fs_info) <= 1: raise error.TestNAError("No disk filesystem information!") try: primary_disk_info = fs_info[1] fs_type = primary_disk_info.split()[2] return fs_type except (KeyError, ValueError), detail: raise error.TestFail(str(detail))
[docs] def tar_in(self, tar_file, dest="/tmp", vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_tar_in(vm_ref, tar_file, dest, debug=True, ignore_status=True) return result
[docs] def tar_out(self, directory, tar_file="temp.tar", vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_tar_out(vm_ref, directory, tar_file, debug=True, ignore_status=True) return result
[docs] def cat(self, filename, vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_cat_cmd(vm_ref, filename, debug=True, ignore_status=True) return result
[docs] def copy_in(self, filename, dest="/tmp", vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_copy_in(vm_ref, filename, dest, debug=True, ignore_status=True) return result
[docs] def copy_out(self, file_path, localdir="/tmp", vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_copy_out(vm_ref, file_path, localdir, debug=True, ignore_status=True) return result
[docs] def format_disk(self, disk_path=None, filesystem=None, partition=None, lvm=None): """ :param disk_path: None for additional disk by update_vm_disk() only """ if disk_path is None: disk_path = self.params.get("added_disk_path") result = lgf.virt_format(disk_path, filesystem, lvm=lvm, partition=partition, debug=True, ignore_status=True) return result
[docs] def get_filesystems_info(self, vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_filesystems(vm_ref, long_format=True, debug=True, all=True, ignore_status=True) return result
[docs] def list_df(self, vm_ref=None): if vm_ref is None: vm_ref = self.oldvm.name result = lgf.virt_df(vm_ref, debug=True, ignore_status=True) return result
[docs] def get_vm_info_with_inspector(self, vm_ref=None): """ Return a dict includes os information. """ if vm_ref is None: vm_ref = self.oldvm.name # A dict to include system information sys_info = {} result = lgf.virt_inspector(vm_ref, ignore_status=True) if result.exit_status: logging.error("Get %s information with inspector(2) failed:\n%s", vm_ref, result) return sys_info # Analyse output to get information try: xmltreefile = xml_utils.XMLTreeFile(result.stdout) os_root = xmltreefile.find("operatingsystem") if os_root is None: raise VTXMLParseError("operatingsystem", os_root) except (IOError, VTXMLParseError), detail: logging.error(detail) return sys_info sys_info['root'] = os_root.findtext("root") sys_info['name'] = os_root.findtext("name") sys_info['arch'] = os_root.findtext("arch") sys_info['distro'] = os_root.findtext("distro") sys_info['release'] = os_root.findtext("product_name") sys_info['major_version'] = os_root.findtext("major_version") sys_info['minor_version'] = os_root.findtext("minor_version") sys_info['hostname'] = os_root.findtext("hostname") # filesystems and mountpoints are dict to restore detail info mountpoints = {} for node in os_root.find("mountpoints"): mp_device = node.get("dev") if mp_device is not None: mountpoints[mp_device] = node.text sys_info['mountpoints'] = mountpoints filesystems = {} for node in os_root.find("filesystems"): fs_detail = {} fs_device = node.get("dev") if fs_device is not None: fs_detail['type'] = node.findtext("type") fs_detail['label'] = node.findtext("label") fs_detail['uuid'] = node.findtext("uuid") filesystems[fs_device] = fs_detail sys_info['filesystems'] = filesystems logging.debug("VM information:\n%s", sys_info) return sys_info
[docs]class GuestfishTools(lgf.GuestfishPersistent): """Useful Tools for Guestfish class.""" __slots__ = ('params', ) def __init__(self, params): """ Init a persistent guestfish shellsession. """ self.params = params disk_img = params.get("disk_img") ro_mode = bool(params.get("gf_ro_mode", False)) libvirt_domain = params.get("libvirt_domain") inspector = bool(params.get("gf_inspector", False)) mount_options = params.get("mount_options") run_mode = params.get("gf_run_mode", "interactive") super(GuestfishTools, self).__init__(disk_img, ro_mode, libvirt_domain, inspector, mount_options=mount_options, run_mode=run_mode)
[docs] def get_root(self): """ Get root filesystem w/ guestfish """ getroot_result = self.inspect_os() roots_list = getroot_result.stdout.splitlines() if getroot_result.exit_status or not len(roots_list): logging.error("Get root failed:%s", getroot_result) return (False, getroot_result) return (True, roots_list[0].strip())
[docs] def analyse_release(self): """ Analyse /etc/redhat-release """ logging.info("Analysing /etc/redhat-release...") release_result = self.cat("/etc/redhat-release") logging.debug(release_result) if release_result.exit_status: logging.error("Cat /etc/redhat-release failed") return (False, release_result) release_type = {'rhel': "Red Hat Enterprise Linux", 'fedora': "Fedora"} for key in release_type: if re.search(release_type[key], release_result.stdout): return (True, key)
[docs] def write_file(self, path, content): """ Create a new file to vm with guestfish """ logging.info("Creating file %s in vm...", path) write_result = self.write(path, content) if write_result.exit_status: logging.error("Create '%s' with content '%s' failed:%s", path, content, write_result) return False return True
[docs] def get_partitions_info(self, device="/dev/sda"): """ Get disk partition's information. """ list_result = self.part_list(device) if list_result.exit_status: logging.error("List partition info failed:%s", list_result) return (False, list_result) list_lines = list_result.stdout.splitlines() # This dict is a struct like this: {key:{a dict}, key:{a dict}} partitions = {} # This dict is a struct of normal dict, for temp value of a partition part_details = {} index = -1 for line in list_lines: # Init for a partition if re.search("\[\d\]\s+=", line): index = line.split("]")[0].split("[")[-1] part_details = {} partitions[index] = part_details if re.search("part_num", line): part_num = int(line.split(":")[-1].strip()) part_details['num'] = part_num elif re.search("part_start", line): part_start = int(line.split(":")[-1].strip()) part_details['start'] = part_start elif re.search("part_end", line): part_end = int(line.split(":")[-1].strip()) part_details['end'] = part_end elif re.search("part_size", line): part_size = int(line.split(":")[-1].strip()) part_details['size'] = part_size if index != -1: partitions[index] = part_details logging.info(partitions) return (True, partitions)
[docs] def get_part_size(self, part_num): status, partitions = self.get_partitions_info() if status is False: return None for partition in partitions.values(): if str(partition.get("num")) == str(part_num): return partition.get("size")
[docs] def create_fs(self): """ Create filesystem of disk Choose lvm or physical partition and create fs on it """ image_path = self.params.get("image_path") self.add_drive(image_path) self.run() partition_type = self.params.get("partition_type") fs_type = self.params.get("fs_type", "ext3") image_size = self.params.get("image_size", "6G") with_blocksize = self.params.get("with_blocksize") blocksize = self.params.get("blocksize") tarball_path = self.params.get("tarball_path") if partition_type not in ['lvm', 'physical']: return (False, "partition_type is incorrect, support [physical,lvm]") if partition_type == "lvm": logging.info("create lvm partition...") pv_name = self.params.get("pv_name", "/dev/sdb") vg_name = self.params.get("vg_name", "vol_test") lv_name = self.params.get("lv_name", "vol_file") mount_point = "/dev/%s/%s" % (vg_name, lv_name) if 'G' in image_size: lv_size = int(image_size.replace('G', '')) * 1000 else: lv_size = int(image_size.replace('M', '')) - 10 self.pvcreate(pv_name) self.vgcreate(vg_name, pv_name) self.lvcreate(lv_name, vg_name, lv_size) elif partition_type == "physical": logging.info("create physical partition...") pv_name = self.params.get("pv_name", "/dev/sdb") mount_point = pv_name + "1" self.part_disk(pv_name, "mbr") self.part_list(pv_name) self.params["mount_point"] = mount_point if with_blocksize == "yes" and fs_type != "btrfs" and fs_type != "no_fs": if blocksize: self.mkfs_opts(fs_type, mount_point, "blocksize:%s" % (blocksize)) self.vfs_type(mount_point) else: logging.error("with_blocksize is set but blocksize not given") self.umount_all() self.sync() return (False, "with_blocksize is set but blocksize not given") elif fs_type != "no_fs": self.mkfs(fs_type, mount_point) self.vfs_type(mount_point) if tarball_path: self.mount_options("noatime", mount_point, '/') self.tar_in_opts(tarball_path, '/', 'gzip') self.ll('/') self.umount_all() self.sync() return (True, "create_fs successfully")
[docs] def create_msdos_part(self, device, start="1", end="-1"): """ Create a msdos partition in given device. Default partition section is whole disk(1~-1). And return its part name if part add succeed. """ logging.info("Creating a new partition on %s...", device) init_result = self.part_init(device, "msdos") if init_result.exit_status: logging.error("Init disk failed:%s", init_result) return (False, init_result) add_result = self.part_add(device, "p", start, end) if add_result.exit_status: logging.error("Add a partition failed:%s", add_result) return (False, add_result) # Get latest created part num to return status, partitions = self.get_partitions_info(device) if status is False: return (False, partitions) part_num = -1 for partition in partitions.values(): cur_num = partition.get("num") if cur_num > part_num: part_num = cur_num if part_num == -1: return (False, partitions) return (True, part_num)
[docs] def create_whole_disk_msdos_part(self, device): """ Create only one msdos partition in given device. And return its part name if part add succeed. """ logging.info("Creating one partition of whole %s...", device) init_result = self.part_init(device, "msdos") if init_result.exit_status: logging.error("Init disk failed:%s", init_result) return (False, init_result) disk_result = self.part_disk(device, "msdos") if disk_result.exit_status: logging.error("Init disk failed:%s", disk_result) return (False, disk_result) # Get latest created part num to return status, partitions = self.get_partitions_info(device) if status is False: return (False, partitions) part_num = -1 for partition in partitions.values(): cur_num = partition.get("num") if cur_num > part_num: part_num = cur_num if part_num == -1: return (False, partitions) return (True, part_num)
[docs] def get_bootable_part(self, device="/dev/sda"): status, partitions = self.get_partitions_info(device) if status is False: return (False, partitions) for partition in partitions.values(): num = partition.get("num") ba_result = self.part_get_bootable(device, num) if ba_result.stdout.strip() == "true": return (True, "%s%s" % (device, num)) return (False, partitions)
[docs] def get_mbr_id(self, device="/dev/sda"): status, partitions = self.get_partitions_info(device) if status is False: return (False, partitions) for partition in partitions.values(): num = partition.get("num") mbr_id_result = self.part_get_mbr_id(device, num) if mbr_id_result.exit_status == 0: return (True, mbr_id_result.stdout.strip()) return (False, partitions)
[docs] def get_part_type(self, device="/dev/sda"): part_type_result = self.part_get_parttype(device) if part_type_result.exit_status: return (False, part_type_result) return (True, part_type_result.stdout.strip())
[docs] def get_md5(self, path): """ Get files md5 value. """ logging.info("Computing %s's md5...", path) md5_result = self.checksum("md5", path) if md5_result.exit_status: logging.error("Check %s's md5 failed:%s", path, md5_result) return (False, md5_result) return (True, md5_result.stdout.strip())
[docs] def reset_interface(self, iface_mac): """ Check interface through guestfish.Fix mac if necessary. """ # disk or domain vm_ref = self.params.get("libvirt_domain") if not vm_ref: vm_ref = self.params.get("disk_img") if not vm_ref: logging.error("No object to edit.") return False logging.info("Resetting %s's mac to %s", vm_ref, iface_mac) # Fix file which includes interface devices information # Default is /etc/udev/rules.d/70-persistent-net.rules devices_file = "/etc/udev/rules.d/70-persistent-net.rules" # Set file which binds mac and IP-address ifcfg_files = ["/etc/sysconfig/network-scripts/ifcfg-p1p1", "/etc/sysconfig/network-scripts/ifcfg-eth0"] # Fix devices file mac_regex = (r"\w.:\w.:\w.:\w.:\w.:\w.") edit_expr = "s/%s/%s/g" % (mac_regex, iface_mac) file_ret = self.is_file(devices_file) if file_ret.stdout.strip() == "true": self.close_session() try: result = lgf.virt_edit_cmd(vm_ref, devices_file, expr=edit_expr, debug=True, ignore_status=True) if result.exit_status: logging.error("Edit %s failed:%s", devices_file, result) return False except lgf.LibguestfsCmdError, detail: logging.error("Edit %s failed:%s", devices_file, detail) return False self.new_session() # Just to keep output looking better self.is_ready() logging.debug(self.cat(devices_file)) # Fix interface file for ifcfg_file in ifcfg_files: file_ret = self.is_file(ifcfg_file) if file_ret.stdout.strip() == "false": continue self.close_session() self.params['ifcfg_file'] = ifcfg_file try: result = lgf.virt_edit_cmd(vm_ref, ifcfg_file, expr=edit_expr, debug=True, ignore_status=True) if result.exit_status: logging.error("Edit %s failed:%s", ifcfg_file, result) return False except lgf.LibguestfsCmdError, detail: logging.error("Edit %s failed:%s", ifcfg_file, detail) return False self.new_session() # Just to keep output looking better self.is_ready() logging.debug(self.cat(ifcfg_file)) return True
[docs] def copy_ifcfg_back(self): # This function must be called after reset_interface() ifcfg_file = self.params.get("ifcfg_file") bak_file = "%s.bak" % ifcfg_file if ifcfg_file: self.is_ready() is_need = self.is_file(ifcfg_file) if is_need.stdout.strip() == "false": cp_result = self.cp(bak_file, ifcfg_file) if cp_result.exit_status: logging.warn("Recover ifcfg file failed:%s", cp_result) return False return True