"""
oVirt SDK wrapper module.
:copyright: 2008-2012 Red Hat Inc.
"""
import time
import logging
try:
from ovirtsdk.api import API
from ovirtsdk.xml import params as param
except ImportError:
logging.warning("ovirtsdk module not present, please install it")
import virt_vm
_api = None
_connected = False
[docs]class WaitStateTimeoutError(Exception):
def __init__(self, msg, output):
Exception.__init__(self, msg, output)
self.msg = msg
self.output = output
[docs]class WaitVMStateTimeoutError(WaitStateTimeoutError):
def __str__(self):
str = "Timeout expired when waiting for VM to %s,"
str += " actual state is: %s"
return str % (self.msg, self.output)
[docs]class WaitHostStateTimeoutError(WaitStateTimeoutError):
def __str__(self):
str = "Timeout expired when waiting for Host to %s,"
str += " actual state is: %s"
return str % (self.msg, self.output)
[docs]def connect(params):
"""
Connect ovirt manager API.
"""
url = params.get('ovirt_engine_url')
username = params.get('ovirt_engine_user')
password = params.get('ovirt_engine_password')
version = params.get('ovirt_engine_version')
if url is None or username is None or password is None:
logging.error('ovirt_engine[url|user|password] are necessary!!')
if version is None:
version = param.Version(major='3', minor='5')
else:
version = param.Version(version)
global _api, _connected
try:
# Try to connect oVirt API if connection doesn't exist,
# otherwise, directly return existing API connection.
if not _connected:
_api = API(url, username, password, insecure=True)
_connected = True
return (_api, version)
else:
return (_api, version)
except Exception, e:
logging.error('Failed to connect: %s\n' % str(e))
else:
logging.info('Succeed to connect oVirt/Rhevm manager\n')
[docs]def disconnect():
"""
Disconnect ovirt manager connection.
"""
global _api, _connected
if _connected:
return _api.disconnect()
[docs]class VMManager(virt_vm.BaseVM):
"""
This class handles all basic VM operations for oVirt.
"""
def __init__(self, params, root_dir, address_cache=None, state=None):
"""
Initialize the object and set a few attributes.
:param name: The name of the object
:param params: A dict containing VM params (see method
make_qemu_command for a full description)
:param root_dir: Base directory for relative filenames
:param address_cache: A dict that maps MAC addresses to IP addresses
:param state: If provided, use this as self.__dict__
"""
if state:
self.__dict__ = state
else:
self.process = None
self.serial_console = None
self.redirs = {}
self.vnc_port = 5900
self.vnclisten = "0.0.0.0"
self.pci_assignable = None
self.netdev_id = []
self.device_id = []
self.pci_devices = []
self.uuid = None
self.only_pty = False
self.remote_sessions = []
self.spice_port = 8000
self.name = params.get("main_vm", "")
self.params = params
self.root_dir = root_dir
self.address_cache = address_cache
self.vnclisten = "0.0.0.0"
self.driver_type = "v2v"
super(VMManager, self).__init__(self.name, params)
(self.api, self.version) = connect(params)
if self.name:
self.update_instance()
[docs] def update_instance(self):
self.instance = self.api.vms.get(self.name)
[docs] def list(self):
"""
List all of VMs.
"""
vm_list = []
try:
vms = self.api.vms.list(query='name=*')
for i in range(len(vms)):
vm_list.append(vms[i].name)
return vm_list
except Exception, e:
logging.error('Failed to get vms:\n%s' % str(e))
[docs] def state(self):
"""
Return VM state.
"""
try:
self.update_instance()
return self.instance.status.state
except Exception, e:
logging.error('Failed to get %s status:\n%s' % (self.name, str(e)))
[docs] def get_mac_address(self, net_name='*'):
"""
Return MAC address of a VM.
"""
try:
return self.instance.nics.get(name=net_name).get_mac().get_address()
except Exception, e:
logging.error('Failed to get %s status:\n%s' % (self.name, str(e)))
[docs] def lookup_by_storagedomains(self, storage_name):
"""
Lookup VM object in storage domain according to VM name.
"""
try:
storage = self.api.storagedomains.get(storage_name)
return storage.vms.get(self.name)
except Exception, e:
logging.error('Failed to get %s from %s:\n%s' % (self.name,
storage_name, str(e)))
[docs] def is_dead(self):
"""
Judge if a VM is dead.
"""
if self.state() == 'down':
logging.info('VM %s status is <Down>' % self.name)
return True
else:
return False
[docs] def is_alive(self):
"""
Judge if a VM is alive.
"""
return not self.is_dead()
[docs] def is_paused(self):
"""
Return if VM is suspend.
"""
if self.state() == 'suspended':
return True
else:
logging.debug('VM %s status is %s ' % (self.name, self.state()))
return False
[docs] def start(self, wait_for_up=True, timeout=300):
"""
Start a VM.
"""
end_time = time.time() + timeout
if self.is_dead():
logging.info('Starting VM %s' % self.name)
self.instance.start()
vm_powering_up = False
vm_up = False
while time.time() < end_time:
if self.state() == 'powering_up':
vm_powering_up = True
if wait_for_up:
logging.info('Waiting for VM to reach <Up> status')
if self.state() == 'up':
vm_up = True
break
else:
break
time.sleep(1)
if not vm_powering_up and not vm_up:
raise WaitVMStateTimeoutError("START", self.state())
else:
logging.debug('VM is alive')
[docs] def suspend(self, timeout):
"""
Suspend a VM.
"""
end_time = time.time() + timeout
vm_suspend = False
while time.time() < end_time:
try:
logging.info('Suspend VM %s' % self.name)
self.instance.suspend()
logging.info('Waiting for VM to reach <Suspended> status')
if self.is_paused():
vm_suspend = True
break
except Exception, e:
if e.reason == 'Bad Request' \
and 'asynchronous running tasks' in e.detail:
logging.warning("VM has asynchronous running tasks, "
"trying again")
time.sleep(1)
else:
raise e
time.sleep(1)
if not vm_suspend:
raise WaitVMStateTimeoutError("SUSPEND", self.state())
[docs] def resume(self, timeout):
"""
Resume a suspended VM.
"""
end_time = time.time() + timeout
try:
if self.state() != 'up':
logging.info('Resume VM %s' % self.name)
self.instance.start()
logging.info('Waiting for VM to <UP> status')
vm_resume = False
while time.time() < end_time:
if self.state() == 'up':
vm_resume = True
break
time.sleep(1)
if not vm_resume:
raise WaitVMStateTimeoutError("RESUME", self.state())
else:
logging.debug('VM already up')
except Exception, e:
logging.error('Failed to resume VM:\n%s' % str(e))
[docs] def shutdown(self, gracefully=True, timeout=300):
"""
Shut down a running VM.
"""
end_time = time.time() + timeout
if self.is_alive():
logging.info('Shutdown VM %s' % self.name)
if gracefully:
self.instance.shutdown()
else:
self.instance.stop()
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
else:
logging.debug('VM already down')
[docs] def delete(self, timeout=300):
"""
Delete a VM.
"""
end_time = time.time() + timeout
if self.name in self.list():
logging.info('Delete VM %s' % self.name)
self.instance.delete()
logging.info('Waiting for VM to be <Deleted>')
vm_delete = False
while time.time() < end_time:
if self.name not in self.list():
vm_delete = True
break
time.sleep(1)
if not vm_delete:
raise WaitVMStateTimeoutError("DELETE", self.state())
logging.info('VM was removed successfully')
else:
logging.debug('VM not exist')
[docs] def destroy(self, gracefully=False):
"""
Destroy a VM.
"""
if self.api.vms is None:
return
self.shutdown(gracefully)
[docs] def delete_from_export_domain(self, export_name):
"""
Remove a VM from specified export domain.
:param export_name: export domain name.
"""
vm = self.lookup_by_storagedomains(export_name)
try:
vm.delete()
except Exception, e:
logging.error('Failed to remove VM:\n%s' % str(e))
[docs] def import_from_export_domain(self, export_name, storage_name,
cluster_name, timeout=300):
"""
Import a VM from export domain to data domain.
:param export_name: Export domain name.
:param storage_name: Storage domain name.
:param cluster_name: Cluster name.
"""
end_time = time.time() + timeout
vm = self.lookup_by_storagedomains(export_name)
storage_domains = self.api.storagedomains.get(storage_name)
clusters = self.api.clusters.get(cluster_name)
logging.info('Import VM %s' % self.name)
vm.import_vm(param.Action(storage_domain=storage_domains,
cluster=clusters))
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.name in self.list():
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
logging.info('Import %s successfully', self.name)
[docs] def export_from_export_domain(self, export_name, timeout=300):
"""
Export a VM from storage domain to export domain.
:param export_name: Export domain name.
"""
end_time = time.time() + timeout
storage_domains = self.api.storagedomains.get(export_name)
logging.info('Export VM %s' % self.name)
self.instance.export(param.Action(storage_domain=storage_domains))
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
logging.info('Export %s successfully', self.name)
[docs] def snapshot(self, snapshot_name='my_snapshot', timeout=300):
"""
Create a snapshot to VM.
:param snapshot_name: 'my_snapshot' is default snapshot name.
:param timeout: Time out
"""
end_time = time.time() + timeout
snap_params = param.Snapshot(description=snapshot_name,
vm=self.instance)
logging.info('Creating a snapshot %s for VM %s'
% (snapshot_name, self.name))
self.instance.snapshots.add(snap_params)
logging.info('Waiting for snapshot creation to finish')
vm_snapsnop = False
while time.time() < end_time:
if self.state() != 'image_locked':
vm_snapsnop = True
break
time.sleep(1)
if not vm_snapsnop:
raise WaitVMStateTimeoutError("SNAPSHOT", self.state())
logging.info('Snapshot was created successfully')
[docs] def create_template(self, cluster_name, template_name='my_template', timeout=300):
"""
Create a template from VM.
:param cluster_name: cluster name.
:param template_name: 'my_template' is default template name.
:param timeout: Time out
"""
end_time = time.time() + timeout
cluster = self.api.clusters.get(cluster_name)
tmpl_params = param.Template(name=template_name,
vm=self.instance,
cluster=cluster)
try:
logging.info('Creating a template %s from VM %s'
% (template_name, self.name))
self.api.templates.add(tmpl_params)
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
except Exception, e:
logging.error('Failed to create a template from VM:\n%s' % str(e))
[docs] def add(self, memory, disk_size, cluster_name, storage_name,
nic_name='eth0', network_interface='virtio',
network_name='ovirtmgmt', disk_interface='virtio',
disk_format='raw', template_name='Blank', timeout=300):
"""
Create VM with one NIC and one Disk.
:param memory: VM's memory size such as 1024*1024*1024=1GB.
:param disk_size: VM's disk size such as 512*1024=512MB.
:param nic_name: VM's NICs name such as 'eth0'.
:param network_interface: VM's network interface such as 'virtio'.
:param network_name: network such as ovirtmgmt for ovirt, rhevm for rhel.
:param disk_format: VM's disk format such as 'raw' or 'cow'.
:param disk_interface: VM's disk interface such as 'virtio'.
:param cluster_name: cluster name.
:param storage_name: storage domain name.
:param template_name: VM's template name, default is 'Blank'.
:param timeout: Time out
"""
end_time = time.time() + timeout
# network name is ovirtmgmt for ovirt, rhevm for rhel.
vm_params = param.VM(name=self.name, memory=memory,
cluster=self.api.clusters.get(cluster_name),
template=self.api.templates.get(template_name))
storage = self.api.storagedomains.get(storage_name)
storage_params = param.StorageDomains(storage_domain=[storage])
nic_params = param.NIC(name=nic_name,
network=param.Network(name=network_name),
interface=network_interface)
disk_params = param.Disk(storage_domains=storage_params,
size=disk_size,
type_='system',
status=None,
interface=disk_interface,
format=disk_format,
sparse=True,
bootable=True)
try:
logging.info('Creating a VM %s' % self.name)
self.api.vms.add(vm_params)
logging.info('NIC is added to VM %s' % self.name)
self.instance.nics.add(nic_params)
logging.info('Disk is added to VM %s' % self.name)
self.instance.disks.add(disk_params)
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
except Exception, e:
logging.error('Failed to create VM with disk and NIC\n%s' % str(e))
[docs] def add_vm_from_template(self, cluster_name, template_name='Blank',
new_name='my_new_vm', timeout=300):
"""
Create a VM from template.
:param cluster_name: cluster name.
:param template_name: default template is 'Blank'.
:param new_name: 'my_new_vm' is a default new VM's name.
:param timeout: Time out
"""
end_time = time.time() + timeout
vm_params = param.VM(name=new_name,
cluster=self.api.clusters.get(cluster_name),
template=self.api.templates.get(template_name))
try:
logging.info('Creating a VM %s from template %s'
% (new_name, template_name))
self.api.vms.add(vm_params)
logging.info('Waiting for VM to reach <Down> status')
vm_down = False
while time.time() < end_time:
if self.is_dead():
vm_down = True
break
time.sleep(1)
if not vm_down:
raise WaitVMStateTimeoutError("DOWN", self.state())
logging.info('VM was created from template successfully')
except Exception, e:
logging.error('Failed to create VM from template:\n%s' % str(e))
[docs] def get_address(self, index=0):
"""
Return the address of the guest through ovirt node tcpdump cache.
:param index: Name or index of the NIC whose address is requested.
:return: IP address of NIC.
:raise VMIPAddressMissingError: If no IP address is found for the the
NIC's MAC address
"""
nic = self.virtnet[index]
if nic.nettype == 'bridge':
mac = self.get_mac_address()
ip = self.address_cache.get(mac)
# TODO: Verify MAC-IP address mapping on remote ovirt node
if not ip:
raise virt_vm.VMIPAddressMissingError(mac)
return ip
else:
raise ValueError("Ovirt only support bridge nettype now.")
[docs]class DataCenterManager(object):
"""
This class handles all basic datacenter operations.
"""
def __init__(self, params):
self.name = params.get("dc_name", "")
self.params = params
(self.api, self.version) = connect(params)
if self.name:
self.instance = self.api.datacenters.get(self.name)
[docs] def list(self):
"""
List all of datacenters.
"""
dc_list = []
try:
logging.info('List Data centers')
dcs = self.api.datacenters.list(query='name=*')
for i in range(len(dcs)):
dc_list.append(dcs[i].name)
return dc_list
except Exception, e:
logging.error('Failed to get data centers:\n%s' % str(e))
[docs] def add(self, storage_type):
"""
Add a new data center.
"""
if not self.name:
self.name = "my_datacenter"
try:
logging.info('Creating a %s type datacenter %s'
% (storage_type, self.name))
if self.api.datacenters.add(param.DataCenter(name=self.name, storage_type=storage_type, version=self.version)):
logging.info('Data center was created successfully')
except Exception, e:
logging.error('Failed to create data center:\n%s' % str(e))
[docs]class ClusterManager(object):
"""
This class handles all basic cluster operations.
"""
def __init__(self, params):
self.name = params.get("cluster_name", "")
self.params = params
(self.api, self.version) = connect(params)
if self.name:
self.instance = self.api.clusters.get(self.name)
[docs] def list(self):
"""
List all of clusters.
"""
cluster_list = []
try:
logging.info('List clusters')
clusters = self.api.clusters.list(query='name=*')
for i in range(len(clusters)):
cluster_list.append(clusters[i].name)
return cluster_list
except Exception, e:
logging.error('Failed to get clusters:\n%s' % str(e))
[docs] def add(self, dc_name, cpu_type='Intel Nehalem Family'):
"""
Add a new cluster into data center.
"""
if not self.name:
self.name = "my_cluster"
dc = self.api.datacenters.get(dc_name)
try:
logging.info('Creating a cluster %s in datacenter %s'
% (self.name, dc_name))
if self.api.clusters.add(param.Cluster(name=self.name,
cpu=param.CPU(id=cpu_type),
data_center=dc,
version=self.version)):
logging.info('Cluster was created successfully')
except Exception, e:
logging.error('Failed to create cluster:\n%s' % str(e))
[docs]class HostManager(object):
"""
This class handles all basic host operations.
"""
def __init__(self, params):
self.name = params.get("hostname", "")
self.params = params
(self.api, self.version) = connect(params)
if self.name:
self.instance = self.api.hosts.get(self.name)
[docs] def list(self):
"""
List all of hosts.
"""
host_list = []
try:
logging.info('List hosts')
hosts = self.api.hosts.list(query='name=*')
for i in range(len(hosts)):
host_list.append(hosts[i].name)
return host_list
except Exception, e:
logging.error('Failed to get hosts:\n%s' % str(e))
[docs] def state(self):
"""
Return host state.
"""
try:
return self.instance.status.state
except Exception, e:
logging.error('Failed to get %s status:\n%s' % (self.name, str(e)))
[docs] def add(self, host_address, host_password, cluster_name, timeout=300):
"""
Register a host into specified cluster.
"""
end_time = time.time() + timeout
if not self.name:
self.name = 'my_host'
clusters = self.api.clusters.get(cluster_name)
host_params = param.Host(name=self.name, address=host_address,
cluster=clusters, root_password=host_password)
try:
logging.info('Registing a host %s into cluster %s'
% (self.name, cluster_name))
if self.api.hosts.add(host_params):
logging.info('Waiting for host to reach the <Up> status ...')
host_up = False
while time.time() < end_time:
if self.state() == 'up':
host_up = True
break
time.sleep(1)
if not host_up:
raise WaitHostStateTimeoutError("UP", self.state())
logging.info('Host was installed successfully')
except Exception, e:
logging.error('Failed to install host:\n%s' % str(e))
[docs] def get_address(self):
"""
Return host IP address.
"""
try:
logging.info('Get host %s IP' % self.name)
return self.instance.get_address()
except Exception, e:
logging.error('Failed to get host %s IP address:\n%s' %
(self.name, str(e)))
[docs]class StorageDomainManager(object):
"""
This class handles all basic storage domain operations.
"""
def __init__(self, params):
self.name = params.get("storage_name", "")
self.params = params
(self.api, self.version) = connect(params)
if self.name:
self.instance = self.api.storagedomains.get(self.name)
[docs] def list(self):
"""
List all of storagedomains.
"""
storage_list = []
try:
logging.info('List storage domains')
storages = self.api.storagedomains.list()
for i in range(len(storages)):
storage_list.append(storages[i].name)
return storage_list
except Exception, e:
logging.error('Failed to get storage domains:\n%s' % str(e))
[docs] def attach_iso_export_domain_into_datacenter(self, address, path,
dc_name, host_name,
domain_type,
storage_type='nfs',
name='my_iso'):
"""
Attach ISO/export domain into data center.
:param name: ISO or Export name.
:param host_name: host name.
:param dc_name: data center name.
:param path: ISO/export domain path.
:param address: ISO/export domain address.
:param domain_type: storage domain type, it may be 'iso' or 'export'.
:param storage_type: storage type, it may be 'nfs', 'iscsi', or 'fc'.
"""
dc = self.api.datacenters.get(dc_name)
host = self.api.hosts.get(host_name)
storage_params = param.Storage(type_=storage_type,
address=address,
path=path)
storage_domain__params = param.StorageDomain(name=name,
data_center=dc,
type_=domain_type,
host=host,
storage=storage_params)
try:
logging.info('Create/import ISO storage domain %s' % name)
if self.api.storagedomains.add(storage_domain__params):
logging.info('%s domain was created/imported successfully'
% domain_type)
logging.info('Attach ISO storage domain %s' % name)
if self.api.datacenters.get(dc_name).storagedomains.add(
self.api.storagedomains.get(name)):
logging.info('%s domain was attached successfully'
% domain_type)
logging.info('Activate ISO storage domain %s' % name)
if self.api.datacenters.get(dc_name).storagedomains.get(
name).activate():
logging.info('%s domain was activated successfully'
% domain_type)
except Exception, e:
logging.error('Failed to add %s domain:\n%s'
% (domain_type, str(e)))