Source code for virttest.asset

import urllib2
import logging
import os
import glob
from autotest.client import utils, test_config
from autotest.client.shared import git, error
import data_dir
import re


[docs]def get_known_backends(): """ Return virtualization backends supported by virt-test. """ # Generic means the test can run in multiple backends, such as libvirt # and qemu. known_backends = ['generic'] known_backends += os.listdir(data_dir.BASE_BACKEND_DIR) return known_backends
[docs]def get_test_provider_names(backend=None): """ Get the names of all test providers available in test-providers.d. :return: List with the names of all test providers. """ provider_name_list = [] provider_dir = data_dir.get_test_providers_dir() for provider in glob.glob(os.path.join(provider_dir, '*.ini')): provider_name = os.path.basename(provider).split('.')[0] provider_info = get_test_provider_info(provider_name) if backend is not None: if backend in provider_info['backends']: provider_name_list.append(provider_name) else: provider_name_list.append(provider_name) return provider_name_list
[docs]def get_test_provider_subdirs(backend=None): """ Get information of all test provider subdirs for a given backend. If no backend is provided, return all subdirs with tests. :param backend: Backend type, such as 'qemu'. :return: List of directories that contain tests for the given backend. """ subdir_list = [] for provider_name in get_test_provider_names(): provider_info = get_test_provider_info(provider_name) backends_info = provider_info['backends'] if backend is not None: if backend in backends_info: subdir_list.append(backends_info[backend]['path']) else: for b in backends_info: subdir_list.append(backends_info[b]['path']) return subdir_list
[docs]def get_test_provider_info(provider): """ Get a dictionary with relevant test provider info, such as: * provider uri (git repo or filesystem location) * provider git repo data, such as branch, ref, pubkey * backends that this provider has tests for. For each backend type the provider has tests for, the 'path' will be also available. :param provider: Test provider name, such as 'io-github-autotest-qemu'. """ provider_info = {} provider_path = os.path.join(data_dir.get_test_providers_dir(), '%s.ini' % provider) provider_cfg = test_config.config_loader(provider_path) provider_info['name'] = provider provider_info['uri'] = provider_cfg.get('provider', 'uri') provider_info['branch'] = provider_cfg.get('provider', 'branch', 'master') provider_info['ref'] = provider_cfg.get('provider', 'ref') provider_info['pubkey'] = provider_cfg.get('provider', 'pubkey') provider_info['backends'] = {} for backend in get_known_backends(): subdir = provider_cfg.get(backend, 'subdir') if subdir is not None: if provider_info['uri'].startswith('file://'): src = os.path.join(provider_info['uri'][7:], subdir) else: src = os.path.join(data_dir.get_test_provider_dir(provider), subdir) provider_info['backends'].update({backend: {'path': src}}) return provider_info
[docs]def download_test_provider(provider, update=False): """ Download a test provider defined on a .ini file inside test-providers.d. This function will only download test providers that are in git repos. Local filesystems don't need this functionality. :param provider: Test provider name, such as 'io-github-autotest-qemu'. """ provider_info = get_test_provider_info(provider) uri = provider_info.get('uri') if not uri.startswith('file://'): uri = provider_info.get('uri') branch = provider_info.get('branch') ref = provider_info.get('ref') pubkey = provider_info.get('pubkey') download_dst = data_dir.get_test_provider_dir(provider) repo_downloaded = os.path.isdir(os.path.join(download_dst, '.git')) original_dir = os.getcwd() if not repo_downloaded or update: download_dst = git.get_repo(uri=uri, branch=branch, commit=ref, destination_dir=download_dst) os.chdir(download_dst) try: utils.run('git remote add origin %s' % uri) except error.CmdError: pass utils.run('git pull origin %s' % branch) os.chdir(download_dst) utils.system('git log -1') os.chdir(original_dir)
[docs]def download_all_test_providers(update=False): """ Download all available test providers. """ for provider in get_test_provider_names(): download_test_provider(provider, update)
[docs]def get_all_assets(): asset_data_list = [] download_dir = data_dir.get_download_dir() for asset in glob.glob(os.path.join(download_dir, '*.ini')): asset_name = os.path.basename(asset)[:-4] asset_data_list.append(get_asset_info(asset_name)) return asset_data_list
[docs]def get_file_asset(title, src_path, destination): if not os.path.isabs(destination): destination = os.path.join(data_dir.get_data_dir(), destination) for ext in (".xz", ".gz", ".7z", ".bz2"): if os.path.exists(src_path + ext): destination = destination + ext logging.debug('Found source image %s', destination) return { 'url': None, 'sha1_url': None, 'destination': src_path + ext, 'destination_uncompressed': destination, 'uncompress_cmd': None, 'shortname': title, 'title': title, 'downloaded': True} if os.path.exists(src_path): logging.debug('Found source image %s', destination) return {'url': src_path, 'sha1_url': None, 'destination': destination, 'destination_uncompressed': None, 'uncompress_cmd': None, 'shortname': title, 'title': title, 'downloaded': os.path.exists(destination)} return None
[docs]def get_asset_info(asset): asset_info = {} asset_path = os.path.join(data_dir.get_download_dir(), '%s.ini' % asset) asset_cfg = test_config.config_loader(asset_path) asset_info['url'] = asset_cfg.get(asset, 'url') asset_info['sha1_url'] = asset_cfg.get(asset, 'sha1_url') asset_info['title'] = asset_cfg.get(asset, 'title') destination = asset_cfg.get(asset, 'destination') if not os.path.isabs(destination): destination = os.path.join(data_dir.get_data_dir(), destination) asset_info['destination'] = destination asset_info['asset_exists'] = os.path.isfile(destination) # Optional fields d_uncompressed = asset_cfg.get(asset, 'destination_uncompressed') if d_uncompressed is not None and not os.path.isabs(d_uncompressed): d_uncompressed = os.path.join(data_dir.get_data_dir(), d_uncompressed) asset_info['destination_uncompressed'] = d_uncompressed asset_info['uncompress_cmd'] = asset_cfg.get(asset, 'uncompress_cmd') return asset_info
[docs]def uncompress_asset(asset_info, force=False): destination = asset_info['destination'] uncompress_cmd = asset_info['uncompress_cmd'] destination_uncompressed = asset_info['destination_uncompressed'] archive_re = re.compile(r".*\.(gz|xz|7z|bz2)$") if destination_uncompressed is not None: if uncompress_cmd is None: match = archive_re.match(destination) if match: if match.group(1) == 'gz': uncompress_cmd = ('gzip -cd %s > %s' % (destination, destination_uncompressed)) elif match.group(1) == 'xz': uncompress_cmd = ('xz -cd %s > %s' % (destination, destination_uncompressed)) elif match.group(1) == 'bz2': uncompress_cmd = ('bzip2 -cd %s > %s' % (destination, destination_uncompressed)) elif match.group(1) == '7z': uncompress_cmd = '7za -y e %s' % destination else: uncompress_cmd = "%s %s" % (uncompress_cmd, destination) if uncompress_cmd is not None: uncompressed_file_exists = os.path.exists(destination_uncompressed) force = (force or not uncompressed_file_exists) if os.path.isfile(destination) and force: os.chdir(os.path.dirname(destination_uncompressed)) utils.run(uncompress_cmd)
[docs]def download_file(asset_info, interactive=False, force=False): """ Verifies if file that can be find on url is on destination with right hash. This function will verify the SHA1 hash of the file. If the file appears to be missing or corrupted, let the user know. :param asset_info: Dictionary returned by get_asset_info """ file_ok = False problems_ignored = False had_to_download = False sha1 = None url = asset_info['url'] sha1_url = asset_info['sha1_url'] destination = asset_info['destination'] title = asset_info['title'] if sha1_url is not None: try: logging.info("Verifying expected SHA1 sum from %s", sha1_url) sha1_file = urllib2.urlopen(sha1_url) sha1_contents = sha1_file.read() sha1 = sha1_contents.split(" ")[0] logging.info("Expected SHA1 sum: %s", sha1) except Exception, e: logging.error("Failed to get SHA1 from file: %s", e) else: sha1 = None destination_dir = os.path.dirname(destination) if not os.path.isdir(destination_dir): os.makedirs(destination_dir) if not os.path.isfile(destination): logging.warning("File %s not found", destination) if interactive: answer = utils.ask("Would you like to download it from %s?" % url) else: answer = 'y' if answer == 'y': utils.interactive_download( url, destination, "Downloading %s" % title) had_to_download = True else: logging.warning("Missing file %s", destination) else: logging.info("Found %s", destination) if sha1 is None: answer = 'n' else: answer = 'y' if answer == 'y': actual_sha1 = utils.hash_file(destination, method='sha1') if actual_sha1 != sha1: logging.info("Actual SHA1 sum: %s", actual_sha1) if interactive: answer = utils.ask("The file seems corrupted or outdated. " "Would you like to download it?") else: logging.info("The file seems corrupted or outdated") answer = 'y' if answer == 'y': logging.info("Updating image to the latest available...") while not file_ok: utils.interactive_download(url, destination, title) sha1_post_download = utils.hash_file(destination, method='sha1') had_to_download = True if sha1_post_download != sha1: logging.error("Actual SHA1 sum: %s", actual_sha1) if interactive: answer = utils.ask("The file downloaded %s is " "corrupted. Would you like " "to try again?" % destination) else: answer = 'n' if answer == 'n': problems_ignored = True logging.error("File %s is corrupted" % destination) file_ok = True else: file_ok = False else: file_ok = True else: file_ok = True logging.info("SHA1 sum check OK") else: problems_ignored = True logging.info("File %s present, but did not verify integrity", destination) if file_ok: if not problems_ignored: logging.info("%s present, with proper checksum", destination) uncompress_asset(asset_info=asset_info, force=force or had_to_download)
[docs]def download_asset(asset, interactive=True, restore_image=False): """ Download an asset defined on an asset file. Asset files are located under /shared/downloads, are .ini files with the following keys defined: title Title string to display in the download progress bar. url URL of the resource sha1_url URL with SHA1 information for the resource, in the form sha1sum file_basename destination Location of your file relative to the data directory (TEST_SUITE_ROOT/shared/data) destination Location of the uncompressed file relative to the data directory (TEST_SUITE_ROOT/shared/data) uncompress_cmd Command that needs to be executed with the compressed file as a parameter :param asset: String describing an asset file. :param interactive: Whether to ask the user before downloading the file. :param restore_image: If the asset is a compressed image, we can uncompress in order to restore the image. """ asset_info = get_asset_info(asset) destination = asset_info['destination'] if (interactive and not os.path.isfile(destination)): answer = utils.ask("File %s not present. Do you want to download it?" % asset_info['title']) else: answer = "y" if answer == "y": download_file(asset_info=asset_info, interactive=interactive, force=restore_image)