Source code for virttest.utils_net

import openvswitch
import re
import os
import socket
import fcntl
import struct
import logging
import random
import math
import time
import shelve
import remote
import commands
from autotest.client import utils, os_dep
from autotest.client.shared import error
import propcan
import utils_misc
import arch
import aexpect
from versionable_class import factory

CTYPES_SUPPORT = True
try:
    import ctypes
except ImportError:
    CTYPES_SUPPORT = False

SYSFS_NET_PATH = "/sys/class/net"
PROCFS_NET_PATH = "/proc/net/dev"
# globals
sock = None
sockfd = None


[docs]class NetError(Exception): def __init__(self, *args): Exception.__init__(self, *args)
[docs]class TAPModuleError(NetError): def __init__(self, devname, action="open", details=None): NetError.__init__(self, devname) self.devname = devname self.action = action self.details = details def __str__(self): e_msg = "Can't %s %s" % (self.action, self.devname) if self.details is not None: e_msg += " : %s" % self.details return e_msg
[docs]class TAPNotExistError(NetError): def __init__(self, ifname): NetError.__init__(self, ifname) self.ifname = ifname def __str__(self): return "Interface %s does not exist" % self.ifname
[docs]class TAPCreationError(NetError): def __init__(self, ifname, details=None): NetError.__init__(self, ifname, details) self.ifname = ifname self.details = details def __str__(self): e_msg = "Cannot create TAP device %s" % self.ifname if self.details is not None: e_msg += ": %s" % self.details return e_msg
[docs]class MacvtapCreationError(NetError): def __init__(self, ifname, base_interface, details=None): NetError.__init__(self, ifname, details) self.ifname = ifname self.interface = base_interface self.details = details def __str__(self): e_msg = "Cannot create macvtap device %s " % self.ifname e_msg += "base physical interface %s." % self.interface if self.details is not None: e_msg += ": %s" % self.details return e_msg
[docs]class MacvtapGetBaseInterfaceError(NetError): def __init__(self, ifname=None, details=None): NetError.__init__(self, ifname, details) self.ifname = ifname self.details = details def __str__(self): e_msg = "Cannot get a valid physical interface to create macvtap." if self.ifname: e_msg += "physical interface is : %s " % self.ifname if self.details is not None: e_msg += "error info: %s" % self.details return e_msg
[docs]class TAPBringUpError(NetError): def __init__(self, ifname): NetError.__init__(self, ifname) self.ifname = ifname def __str__(self): return "Cannot bring up TAP %s" % self.ifname
[docs]class TAPBringDownError(NetError): def __init__(self, ifname): NetError.__init__(self, ifname) self.ifname = ifname def __str__(self): return "Cannot bring down TAP %s" % self.ifname
[docs]class BRAddIfError(NetError): def __init__(self, ifname, brname, details): NetError.__init__(self, ifname, brname, details) self.ifname = ifname self.brname = brname self.details = details def __str__(self): return ("Can't add interface %s to bridge %s: %s" % (self.ifname, self.brname, self.details))
[docs]class BRDelIfError(NetError): def __init__(self, ifname, brname, details): NetError.__init__(self, ifname, brname, details) self.ifname = ifname self.brname = brname self.details = details def __str__(self): return ("Can't remove interface %s from bridge %s: %s" % (self.ifname, self.brname, self.details))
[docs]class IfNotInBridgeError(NetError): def __init__(self, ifname, details): NetError.__init__(self, ifname, details) self.ifname = ifname self.details = details def __str__(self): return ("Interface %s is not present on any bridge: %s" % (self.ifname, self.details))
[docs]class OpenflowSwitchError(NetError): def __init__(self, brname): NetError.__init__(self, brname) self.brname = brname def __str__(self): return ("Only support openvswitch, make sure your env support ovs, " "and your bridge %s is an openvswitch" % self.brname)
[docs]class BRNotExistError(NetError): def __init__(self, brname, details): NetError.__init__(self, brname, details) self.brname = brname self.details = details def __str__(self): return ("Bridge %s does not exist: %s" % (self.brname, self.details))
[docs]class IfChangeBrError(NetError): def __init__(self, ifname, old_brname, new_brname, details): NetError.__init__(self, ifname, old_brname, new_brname, details) self.ifname = ifname self.new_brname = new_brname self.old_brname = old_brname self.details = details def __str__(self): return ("Can't move interface %s from bridge %s to bridge %s: %s" % (self.ifname, self.new_brname, self.oldbrname, self.details))
[docs]class IfChangeAddrError(NetError): def __init__(self, ifname, ipaddr, details): NetError.__init__(self, ifname, ipaddr, details) self.ifname = ifname self.ipaddr = ipaddr self.details = details def __str__(self): return ("Can't change interface IP address %s from interface %s: %s" % (self.ifname, self.ipaddr, self.details))
[docs]class BRIpError(NetError): def __init__(self, brname): NetError.__init__(self, brname) self.brname = brname def __str__(self): return ("Bridge %s doesn't have an IP address assigned. It's" " impossible to start dnsmasq for this bridge." % (self.brname))
[docs]class VMIPV6NeighNotFoundError(NetError): def __init__(self, ipv6_address): NetError.__init__(self, ipv6_address) self.ipv6_address = ipv6_address def __str__(self): return "No IPV6 neighbours with address %s" % self.ipv6_address
[docs]class VMIPV6AdressError(NetError): def __init__(self, error_info): NetError.__init__(self, error_info) self.error_info = error_info def __str__(self): return "%s, check your test env supports IPV6" % self.error_info
[docs]class HwAddrSetError(NetError): def __init__(self, ifname, mac): NetError.__init__(self, ifname, mac) self.ifname = ifname self.mac = mac def __str__(self): return "Can not set mac %s to interface %s" % (self.mac, self.ifname)
[docs]class HwAddrGetError(NetError): def __init__(self, ifname): NetError.__init__(self, ifname) self.ifname = ifname def __str__(self): return "Can not get mac of interface %s" % self.ifname
[docs]class IPAddrGetError(NetError): def __init__(self, mac_addr, details=None): NetError.__init__(self, mac_addr) self.mac_addr = mac_addr self.details = details def __str__(self): details_msg = "Get guest nic ['%s'] IP address error" % self.mac_addr details_msg += "error info: %s" % self.details return details_msg
[docs]class HwOperstarteGetError(NetError): def __init__(self, ifname, details=None): NetError.__init__(self, ifname) self.ifname = ifname self.details = details def __str__(self): return "Get nic %s operstate error, %s" % (self.ifname, self.details)
[docs]class VlanError(NetError): def __init__(self, ifname, details): NetError.__init__(self, ifname, details) self.ifname = ifname self.details = details def __str__(self): return ("Vlan error on interface %s: %s" % (self.ifname, self.details))
[docs]class VMNetError(NetError): def __str__(self): return ("VMNet instance items must be dict-like and contain " "a 'nic_name' mapping")
[docs]class DbNoLockError(NetError): def __str__(self): return "Attempt made to access database with improper locking"
[docs]class DelLinkError(NetError): def __init__(self, ifname, details=None): NetError.__init__(self, ifname, details) self.ifname = ifname self.details = details def __str__(self): e_msg = "Cannot delete interface %s" % self.ifname if self.details is not None: e_msg += ": %s" % self.details return e_msg
[docs]def warp_init_del(func): def new_func(*args, **argkw): globals()["sock"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) globals()["sockfd"] = globals()["sock"].fileno() try: return func(*args, **argkw) finally: globals()["sock"].close() globals()["sock"] = None globals()["sockfd"] = None return new_func
[docs]class Interface(object): ''' Class representing a Linux network device. ''' def __init__(self, name): self.name = name def __repr__(self): return "<%s %s at 0x%x>" % (self.__class__.__name__, self.name, id(self)) @warp_init_del
[docs] def up(self): ''' Bring up the bridge interface. Equivalent to ifconfig [iface] up. ''' # Get existing device flags ifreq = struct.pack('16sh', self.name, 0) flags = struct.unpack('16sh', fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] # Set new flags flags = flags | arch.IFF_UP ifreq = struct.pack('16sh', self.name, flags) fcntl.ioctl(sockfd, arch.SIOCSIFFLAGS, ifreq)
@warp_init_del
[docs] def down(self): ''' Bring up the bridge interface. Equivalent to ifconfig [iface] down. ''' # Get existing device flags ifreq = struct.pack('16sh', self.name, 0) flags = struct.unpack('16sh', fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] # Set new flags flags = flags & ~arch.IFF_UP ifreq = struct.pack('16sh', self.name, flags) fcntl.ioctl(sockfd, arch.SIOCSIFFLAGS, ifreq)
@warp_init_del
[docs] def is_up(self): ''' Return True if the interface is up, False otherwise. ''' # Get existing device flags ifreq = struct.pack('16sh', self.name, 0) flags = struct.unpack('16sh', fcntl.ioctl(sockfd, arch.SIOCGIFFLAGS, ifreq))[1] # Set new flags if flags & arch.IFF_UP: return True else: return False
@warp_init_del
[docs] def get_mac(self): ''' Obtain the device's mac address. ''' ifreq = struct.pack('16sH14s', self.name, socket.AF_UNIX, '\x00' * 14) res = fcntl.ioctl(sockfd, arch.SIOCGIFHWADDR, ifreq) address = struct.unpack('16sH14s', res)[2] mac = struct.unpack('6B8x', address) return ":".join(['%02X' % i for i in mac])
@warp_init_del
[docs] def set_mac(self, newmac): ''' Set the device's mac address. Device must be down for this to succeed. ''' macbytes = [int(i, 16) for i in newmac.split(':')] ifreq = struct.pack('16sH6B8x', self.name, socket.AF_UNIX, *macbytes) fcntl.ioctl(sockfd, arch.SIOCSIFHWADDR, ifreq)
@warp_init_del
[docs] def get_ip(self): """ Get ip address of this interface """ ifreq = struct.pack('16sH14s', self.name, socket.AF_INET, '\x00' * 14) try: res = fcntl.ioctl(sockfd, arch.SIOCGIFADDR, ifreq) except IOError: return None ip = struct.unpack('16sH2x4s8x', res)[2] return socket.inet_ntoa(ip)
@warp_init_del
[docs] def set_ip(self, newip): """ Set the ip address of the interface """ ipbytes = socket.inet_aton(newip) ifreq = struct.pack('16sH2s4s8s', self.name, socket.AF_INET, '\x00' * 2, ipbytes, '\x00' * 8) fcntl.ioctl(sockfd, arch.SIOCSIFADDR, ifreq)
@warp_init_del
[docs] def get_netmask(self): """ Get ip network netmask """ if not CTYPES_SUPPORT: raise error.TestNAError( "Getting the netmask requires python > 2.4") ifreq = struct.pack('16sH14s', self.name, socket.AF_INET, '\x00' * 14) try: res = fcntl.ioctl(sockfd, arch.SIOCGIFNETMASK, ifreq) except IOError: return 0 netmask = socket.ntohl(struct.unpack('16sH2xI8x', res)[2]) return 32 - int(math.log(ctypes.c_uint32(~netmask).value + 1, 2))
@warp_init_del
[docs] def set_netmask(self, netmask): """ Set netmask """ if not CTYPES_SUPPORT: raise error.TestNAError( "Setting the netmask requires python > 2.4") netmask = ctypes.c_uint32(~((2 ** (32 - netmask)) - 1)).value nmbytes = socket.htonl(netmask) ifreq = struct.pack('16sH2si8s', self.name, socket.AF_INET, '\x00' * 2, nmbytes, '\x00' * 8) fcntl.ioctl(sockfd, arch.SIOCSIFNETMASK, ifreq)
@warp_init_del
[docs] def get_index(self): ''' Convert an interface name to an index value. ''' ifreq = struct.pack('16si', self.name, 0) res = fcntl.ioctl(sockfd, arch.SIOCGIFINDEX, ifreq) return struct.unpack("16si", res)[1]
@warp_init_del
[docs] def get_stats(self): """ Get the status information of the Interface """ spl_re = re.compile(r"\s+") fp = open(PROCFS_NET_PATH) # Skip headers fp.readline() fp.readline() while True: data = fp.readline() if not data: return None name, stats_str = data.split(":") if name.strip() != self.name: continue stats = [int(a) for a in spl_re.split(stats_str.strip())] break titles = ["rx_bytes", "rx_packets", "rx_errs", "rx_drop", "rx_fifo", "rx_frame", "rx_compressed", "rx_multicast", "tx_bytes", "tx_packets", "tx_errs", "tx_drop", "tx_fifo", "tx_colls", "tx_carrier", "tx_compressed"] return dict(zip(titles, stats))
[docs] def is_brport(self): """ Check Whether this Interface is a bridge port_to_br """ path = os.path.join(SYSFS_NET_PATH, self.name) return os.path.exists(os.path.join(path, "brport"))
def __netlink_pack(self, msgtype, flags, seq, pid, data): ''' Pack with Netlink message header and data into Netlink package :msgtype: Message types: e.g. RTM_DELLINK :flags: Flag bits :seq: The sequence number of the message :pid: Process ID :data: data :return: return the package ''' return struct.pack('IHHII', 16 + len(data), msgtype, flags, seq, pid) + data def __netlink_unpack(self, data): ''' Unpack the data from kernel ''' out = [] while data: length, msgtype, flags, seq, pid = struct.unpack('IHHII', data[:16]) if len(data) < length: raise RuntimeError("Buffer overrun!") out.append((msgtype, flags, seq, pid, data[16:length])) data = data[length:] return out
[docs]class Macvtap(Interface): """ class of macvtap, base Interface """ def __init__(self, tapname=None): if tapname is None: self.tapname = "macvtap" + utils_misc.generate_random_id() else: self.tapname = tapname Interface.__init__(self, self.tapname)
[docs] def get_tapname(self): return self.tapname
[docs] def get_device(self): return "/dev/tap%s" % self.get_index()
[docs] def create(self, device, mode="vepa"): """ Create a macvtap device, only when the device does not exist. :param device: Macvtap device to be created. :param mode: Creation mode. """ path = os.path.join(SYSFS_NET_PATH, self.tapname) if not os.path.exists(path): self.ip_link_ctl(["link", "add", "link", device, "name", self.tapname, "type", "macvtap", "mode", mode])
[docs] def delete(self): path = os.path.join(SYSFS_NET_PATH, self.tapname) if os.path.exists(path): self.ip_link_ctl(["link", "delete", self.tapname])
[docs] def open(self): device = self.get_device() try: return os.open(device, os.O_RDWR) except OSError, e: raise TAPModuleError(device, "open", e)
[docs]class IPAddress(object): """ Class to manipulate IPv4 or IPv6 address. """ def __init__(self, ip_str='', info=''): self.addr = '' self.iface = '' self.scope = 0 self.packed_addr = None if info: try: self.iface = info['iface'] self.addr = info['addr'] self.version = info['version'] self.scope = info['scope'] except KeyError: pass if ip_str: self.canonicalize(ip_str) def __str__(self): if self.version == 'ipv6': return "%s%%%s" % (self.addr, self.scope) else: return self.addr
[docs] def canonicalize(self, ip_str): """ Parse an IP string for listen to IPAddress content. """ try: if ':' in ip_str: self.version = 'ipv6' if '%' in ip_str: ip_str, scope = ip_str.split('%') self.scope = int(scope) self.packed_addr = socket.inet_pton(socket.AF_INET6, ip_str) self.addr = socket.inet_ntop(socket.AF_INET6, self.packed_addr) else: self.version = 'ipv4' self.packed_addr = socket.inet_pton(socket.AF_INET, ip_str) self.addr = socket.inet_ntop(socket.AF_INET, self.packed_addr) except socket.error, detail: if 'illegal IP address' in str(detail): self.addr = ip_str self.version = 'hostname'
[docs] def listening_on(self, port, max_retry=30): """ Check whether a port is used for listening. """ def test_connection(self, port): """ Try connect to a port and return the connect result as error no. """ port = int(port) if self.version == 'ipv6': sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.settimeout(1.0) result = sock.connect_ex((self.addr, port, 0, self.scope)) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1.0) result = sock.connect_ex((self.addr, port)) return result retry = 0 while True: if retry == max_retry: return False result = test_connection(self, port) if result == 0: return True elif result == 11: # Resource temporarily unavailable. time.sleep(0.1) retry += 1 else: return False
def __eq__(self, other_ip): if self.version != other_ip.version: return False if self.addr != other_ip.addr: return False if self.iface and other_ip.iface and self.iface != other_ip.iface: return False return True
[docs]def get_macvtap_base_iface(base_interface=None): """ Get physical interface to create macvtap, if you assigned base interface is valid(not belong to any bridge and is up), will use it; else use the first physical interface, which is not a brport and up. """ tap_base_device = None (dev_int, _) = get_sorted_net_if() if not dev_int: err_msg = "Cannot get any physical interface from the host" raise MacvtapGetBaseInterfaceError(details=err_msg) if base_interface and base_interface in dev_int: base_inter = Interface(base_interface) if (not base_inter.is_brport()) and base_inter.is_up(): tap_base_device = base_interface if not tap_base_device: if base_interface: warn_msg = "Can not use '%s' as macvtap base interface, " warn_msg += "will choice automatically" logging.warn(warn_msg % base_interface) for interface in dev_int: base_inter = Interface(interface) if base_inter.is_brport(): continue if base_inter.is_up(): tap_base_device = interface break if not tap_base_device: err_msg = ("Could not find a valid physical interface to create " "macvtap, make sure the interface is up and it does not " "belong to any bridge.") raise MacvtapGetBaseInterfaceError(details=err_msg) return tap_base_device
[docs]def create_macvtap(ifname, mode="vepa", base_if=None, mac_addr=None): """ Create Macvtap device, return a object of Macvtap :param ifname: macvtap interface name :param mode: macvtap type mode ("vepa, bridge,..) :param base_if: physical interface to create macvtap :param mac_addr: macvtap mac address """ try: base_if = get_macvtap_base_iface(base_if) o_macvtap = Macvtap(ifname) o_macvtap.create(base_if, mode) if mac_addr: o_macvtap.set_mac(mac_addr) return o_macvtap except Exception, e: raise MacvtapCreationError(ifname, base_if, e)
[docs]def open_macvtap(macvtap_object, queues=1): """ Open a macvtap device and returns its file descriptors which are used by fds=<fd1:fd2:..> parameter of qemu For single queue, only returns one file descriptor, it's used by fd=<fd> legacy parameter of qemu If you not have a switch support vepa in you env, run this type case you need at least two nic on you host [just workaround] :param macvtap_object: macvtap object :param queues: Queue number """ tapfds = [] for queue in range(int(queues)): tapfds.append(str(macvtap_object.open())) return ":".join(tapfds)
[docs]def create_and_open_macvtap(ifname, mode="vepa", queues=1, base_if=None, mac_addr=None): """ Create a new macvtap device, open it, and return the fds :param ifname: macvtap interface name :param mode: macvtap type mode ("vepa, bridge,..) :param queues: Queue number :param base_if: physical interface to create macvtap :param mac_addr: macvtap mac address """ o_macvtap = create_macvtap(ifname, mode, base_if, mac_addr) return open_macvtap(o_macvtap, queues)
[docs]class Bridge(object):
[docs] def get_structure(self): """ Get bridge list. """ ebr_i = re.compile(r"^(\S+).*?(\S+)$", re.MULTILINE) br_i = re.compile(r"^(\S+).*?(\S+)\s+(\S+)$", re.MULTILINE) nbr_i = re.compile(r"^\s+(\S+)$", re.MULTILINE) out_line = (utils.run(r"brctl show", verbose=False).stdout.splitlines()) result = dict() bridge = None for line in out_line[1:]: iface_var = None stp_var = None if len(line.split()) == 3: # virbr0 8000.fe54005e05e0 [no|yes][\s+] (tmpbr, stp_var) = ebr_i.findall(line.strip())[0] bridge = tmpbr br_attrs = dict() if stp_var: br_attrs["stp"] = stp_var br_attrs["iface"] = [] result[bridge] = br_attrs else: br_line = br_i.findall(line) if br_line: # virbr0 8000.fe54005e05e0 yes vnet0 (tmpbr, stp_var, iface_var) = br_line[0] bridge = tmpbr br_attrs = dict() if stp_var: br_attrs["stp"] = stp_var br_attrs["iface"] = [] result[bridge] = br_attrs else: # virbr0 8000.fe54005e05e0 yes vnet0 # > vnet1 if_line = nbr_i.findall(line) if if_line: iface_var = if_line[0] if iface_var and iface_var not in ['yes', 'no']: # add interface to bridge br_attrs["iface"].append(iface_var) return result
[docs] def list_br(self): return self.get_structure().keys()
[docs] def list_iface(self): """ Return all interfaces used by bridge. """ interface_list = [] for br in self.list_br(): for (value) in self.get_structure()[br]['iface']: interface_list.append(value) return interface_list
[docs] def port_to_br(self, port_name): """ Return bridge which contain port. :param port_name: Name of port. :return: Bridge name or None if there is no bridge which contain port. """ bridge = None for br in self.list_br(): if port_name in self.get_structure()[br]['iface']: bridge = br return bridge
def _br_ioctl(self, io_cmd, brname, ifname): ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) index = if_nametoindex(ifname) if index == 0: raise TAPNotExistError(ifname) ifr = struct.pack("16si", brname, index) _ = fcntl.ioctl(ctrl_sock, io_cmd, ifr) ctrl_sock.close()
[docs] def add_port(self, brname, ifname): """ Add a device to bridge :param ifname: Name of TAP device :param brname: Name of the bridge """ try: self._br_ioctl(arch.SIOCBRADDIF, brname, ifname) except IOError, details: raise BRAddIfError(ifname, brname, details)
[docs] def del_port(self, brname, ifname): """ Remove a TAP device from bridge :param ifname: Name of TAP device :param brname: Name of the bridge """ try: self._br_ioctl(arch.SIOCBRDELIF, brname, ifname) except IOError, details: raise BRDelIfError(ifname, brname, details)
[docs] def add_bridge(self, brname): """ Add a bridge in host """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) fcntl.ioctl(ctrl_sock, arch.SIOCBRADDBR, brname) ctrl_sock.close()
[docs] def del_bridge(self, brname): """ Delete a bridge in host """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) fcntl.ioctl(ctrl_sock, arch.SIOCBRDELBR, brname) ctrl_sock.close()
[docs] def get_stp_status(self, brname): """ get STP status """ bridge_stp = None try: bridge_stp = self.get_structure()[brname]['stp'] except KeyError: logging.error("Not find bridge %s", brname) return bridge_stp
def __init_openvswitch(func): """ Decorator used for late init of __ovs variable. """ def wrap_init(*args, **kargs): global __ovs if __ovs is None: try: __ovs = factory(openvswitch.OpenVSwitchSystem)() __ovs.init_system() if (not __ovs.check()): raise Exception("Check of OpenVSwitch failed.") except Exception, e: logging.debug("Host does not support OpenVSwitch: %s", e) return func(*args, **kargs) return wrap_init # Global variable for OpenVSwitch __ovs = None __bridge = Bridge()
[docs]def if_nametoindex(ifname): """ Map an interface name into its corresponding index. Returns 0 on error, as 0 is not a valid index :param ifname: interface name """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) ifr = struct.pack("16si", ifname, 0) r = fcntl.ioctl(ctrl_sock, arch.SIOCGIFINDEX, ifr) index = struct.unpack("16si", r)[1] ctrl_sock.close() return index
[docs]def vnet_mq_probe(tapfd): """ Check if the IFF_MULTI_QUEUE is support by tun. :param tapfd: the file descriptor of /dev/net/tun """ u = struct.pack("I", 0) try: r = fcntl.ioctl(tapfd, arch.TUNGETFEATURES, u) except OverflowError: logging.debug("Fail to get tun features!") return False flags = struct.unpack("I", r)[0] if flags & arch.IFF_MULTI_QUEUE: return True else: return False
[docs]def vnet_hdr_probe(tapfd): """ Check if the IFF_VNET_HDR is support by tun. :param tapfd: the file descriptor of /dev/net/tun """ u = struct.pack("I", 0) try: r = fcntl.ioctl(tapfd, arch.TUNGETFEATURES, u) except OverflowError: logging.debug("Fail to get tun features!") return False flags = struct.unpack("I", r)[0] if flags & arch.IFF_VNET_HDR: return True else: return False
[docs]def open_tap(devname, ifname, queues=1, vnet_hdr=True): """ Open a tap device and returns its file descriptors which are used by fds=<fd1:fd2:..> parameter of qemu For single queue, only returns one file descriptor, it's used by fd=<fd> legacy parameter of qemu :param devname: TUN device path :param ifname: TAP interface name :param queues: Queue number :param vnet_hdr: Whether enable the vnet header """ tapfds = [] for i in range(int(queues)): try: tapfds.append(str(os.open(devname, os.O_RDWR))) except OSError, e: raise TAPModuleError(devname, "open", e) flags = arch.IFF_TAP | arch.IFF_NO_PI if vnet_mq_probe(int(tapfds[i])): flags |= arch.IFF_MULTI_QUEUE elif (int(queues) > 1): raise TAPCreationError(ifname, "Host doesn't support MULTI_QUEUE") if vnet_hdr and vnet_hdr_probe(int(tapfds[i])): flags |= arch.IFF_VNET_HDR ifr = struct.pack("16sh", ifname, flags) try: r = fcntl.ioctl(int(tapfds[i]), arch.TUNSETIFF, ifr) except IOError, details: raise TAPCreationError(ifname, details) return ':'.join(tapfds)
[docs]def is_virtual_network_dev(dev_name): """ :param dev_name: Device name. :return: True if dev_name is in virtual/net dir, else false. """ if dev_name in os.listdir("/sys/devices/virtual/net/"): return True else: return False
[docs]def find_dnsmasq_listen_address(): """ Search all dnsmasq listen addresses. :param bridge_name: Name of bridge. :param bridge_ip: Bridge ip. :return: List of ip where dnsmasq is listening. """ cmd = "ps -Af | grep dnsmasq" result = utils.run(cmd).stdout return re.findall("--listen-address (.+?) ", result, re.MULTILINE)
[docs]def local_runner(cmd, timeout=None): return utils.run(cmd, verbose=False, timeout=timeout).stdout
[docs]def local_runner_status(cmd, timeout=None): return utils.run(cmd, verbose=False, timeout=timeout).exit_status
[docs]def get_net_if(runner=None, state=None): """ :param runner: command runner. :param div_phy_virt: if set true, will return a tuple division real physical interface and virtual interface :return: List of network interfaces. """ if runner is None: runner = local_runner if state is None: state = ".*" cmd = "ip link" result = runner(cmd) return re.findall(r"^\d+: (\S+?)[@:].*state %s.*$" % (state), result, re.MULTILINE)
[docs]def get_sorted_net_if(): """ Get all network interfaces, but sort them among physical and virtual if. :return: Tuple (physical interfaces, virtual interfaces) """ all_interfaces = get_net_if() phy_interfaces = [] vir_interfaces = [] for d in all_interfaces: path = os.path.join(SYSFS_NET_PATH, d) if not os.path.isdir(path): continue if not os.path.exists(os.path.join(path, "device")): vir_interfaces.append(d) else: phy_interfaces.append(d) return (phy_interfaces, vir_interfaces)
[docs]def get_net_if_addrs(if_name, runner=None): """ Get network device ip addresses. ioctl not used because it's not compatible with ipv6 address. :param if_name: Name of interface. :return: List ip addresses of network interface. """ if runner is None: runner = local_runner cmd = "ip addr show %s" % (if_name) result = runner(cmd) return {"ipv4": re.findall("inet (.+?)/..?", result, re.MULTILINE), "ipv6": re.findall("inet6 (.+?)/...?", result, re.MULTILINE), "mac": re.findall("link/ether (.+?) ", result, re.MULTILINE)}
[docs]def get_net_if_addrs_win(session, mac_addr): """ Try to get windows guest nic address by serial session :param session: serial sesssion :param mac_addr: guest nic mac address :return: List ip addresses of network interface. """ ip_address = get_windows_nic_attribute(session, "macaddress", mac_addr, "IPAddress", global_switch="nicconfig") return {"ipv4": re.findall('(\d+.\d+.\d+.\d+)"', ip_address), "ipv6": re.findall('(fe80.*?)"', ip_address)}
[docs]def get_net_if_and_addrs(runner=None): """ :return: Dict of interfaces and their addresses {"ifname": addrs}. """ ret = {} ifs = get_net_if(runner) for iface in ifs: ret[iface] = get_net_if_addrs(iface, runner) return ret
[docs]def get_guest_ip_addr(session, mac_addr, os_type="linux", ip_version="ipv4", linklocal=False): """ Get guest ip addresses by serial session :param session: serial session :param mac_addr: nic mac address of the nic that you want get :param os_type: guest os type, windows or linux :param ip_version: guest ip version, ipv4 or ipv6 :param linklocal: Wether ip address is local or remote :return: ip addresses of network interface. """ if ip_version == "ipv6" and linklocal: return ipv6_from_mac_addr(mac_addr) try: if os_type == "linux": nic_ifname = get_linux_ifname(session, mac_addr) info_cmd = "ifconfig -a; ethtool -S %s" % nic_ifname nic_address = get_net_if_addrs(nic_ifname, session.cmd_output) elif os_type == "windows": info_cmd = "ipconfig /all" nic_address = get_net_if_addrs_win(session, mac_addr) else: info_cmd = "" raise IPAddrGetError(mac_addr, "Unknown os type") if ip_version == "ipv4": if nic_address["ipv4"]: return nic_address["ipv4"][-1] else: return None else: global_address = [x for x in nic_address["ipv6"] if not x.lower().startswith("fe80")] if global_address: return global_address[0] else: return None except Exception, err: logging.debug(session.cmd_output(info_cmd)) raise IPAddrGetError(mac_addr, err)
[docs]def restart_guest_network(session, mac_addr=None, os_type="linux", ip_version="ipv4", timeout=240): """ Restart guest network by serial session :param session: serial session :param mac_addr: nic mac address of the nic that you want restart :param os_type: guest os type, windows or linux :param ip_version: guest ip version, ipv4 or ipv6 :param timeout: timeout value for command. """ if os_type == "linux": if mac_addr: nic_ifname = get_linux_ifname(session, mac_addr) restart_cmd = "ifconfig %s up; " % nic_ifname restart_cmd += "dhclient -r; " if ip_version == "ipv6": restart_cmd += "dhclient -6 %s" % nic_ifname else: restart_cmd += "dhclient %s" % nic_ifname else: restart_cmd = "dhclient -r; " if ip_version == "ipv6": restart_cmd += "dhclient -6" else: restart_cmd += "dhclient" elif os_type == "windows": if ip_version == "ipv6": restart_cmd = 'ipconfig /renew6' else: restart_cmd = 'ipconfig /renew' if mac_addr: nic_connectionid = get_windows_nic_attribute(session, "macaddress", mac_addr, "netconnectionid", timeout=120) restart_cmd += ' "%s"' % nic_connectionid session.cmd_output_safe(restart_cmd, timeout=timeout)
[docs]def set_net_if_ip(if_name, ip_addr, runner=None): """ Set network device ip addresses. ioctl not used because there is incompatibility with ipv6. :param if_name: Name of interface. :param ip_addr: Interface ip addr in format "ip_address/mask". :raise: IfChangeAddrError. """ if runner is None: runner = local_runner cmd = "ip addr add %s dev %s" % (ip_addr, if_name) try: runner(cmd) except error.CmdError, e: raise IfChangeAddrError(if_name, ip_addr, e)
[docs]def del_net_if_ip(if_name, ip_addr, runner=None): """ Delete network device ip addresses. :param if_name: Name of interface. :param ip_addr: Interface ip addr in format "ip_address/mask". :raise: IfChangeAddrError. """ if runner is None: runner = local_runner cmd = "ip addr del %s dev %s" % (ip_addr, if_name) try: runner(cmd) except error.CmdError, e: raise IfChangeAddrError(if_name, ip_addr, e)
[docs]def get_net_if_operstate(ifname, runner=None): """ Get linux host/guest network device operstate. :param if_name: Name of the interface. :raise: HwOperstarteGetError. """ if runner is None: runner = local_runner cmd = "cat /sys/class/net/%s/operstate" % ifname try: operstate = runner(cmd) if "up" in operstate: return "up" elif "down" in operstate: return "down" elif "unknown" in operstate: return "unknown" else: raise HwOperstarteGetError(ifname, "operstate is not known.") except error.CmdError: raise HwOperstarteGetError(ifname, "run operstate cmd error.")
[docs]def ipv6_from_mac_addr(mac_addr): """ :return: Ipv6 address for communication in link range. """ mp = mac_addr.split(":") mp[0] = ("%x") % (int(mp[0], 16) ^ 0x2) mac_address = "fe80::%s%s:%sff:fe%s:%s%s" % tuple(mp) return ":".join(map(lambda x: x.lstrip("0"), mac_address.split(":")))
[docs]def refresh_neigh_table(interface_name=None, neigh_address="ff02::1"): """ Refresh host neighbours table, if interface_name is assigned only refresh neighbours of this interface, else refresh the all the neighbours. """ if isinstance(interface_name, list): interfaces = interface_name elif isinstance(interface_name, str): interfaces = interface_name.split() else: interfaces = filter(lambda x: "-" not in x, get_net_if()) interfaces.remove("lo") for interface in interfaces: refresh_cmd = "ping6 -c 2 -I %s %s > /dev/null" % (interface, neigh_address) utils.system(refresh_cmd, ignore_status=True)
[docs]def get_neighbours_info(neigh_address="", interface_name=None): """ Get the neighbours infomation """ refresh_neigh_table(interface_name, neigh_address) cmd = "ip -6 neigh show nud reachable" if neigh_address: cmd += " %s" % neigh_address output = utils.system_output(cmd) if not output: raise VMIPV6NeighNotFoundError(neigh_address) all_neigh = {} neigh_info = {} for line in output.splitlines(): neigh_address = line.split()[0] neigh_info["address"] = neigh_address neigh_info["attach_if"] = line.split()[2] neigh_mac = line.split()[4] neigh_info["mac"] = neigh_mac all_neigh[neigh_mac] = neigh_info all_neigh[neigh_address] = neigh_info return all_neigh
[docs]def neigh_reachable(neigh_address, attach_if=None): """ Check the neighbour is reachable """ try: get_neighbours_info(neigh_address, attach_if) except VMIPV6NeighNotFoundError: return False return True
[docs]def get_neigh_attch_interface(neigh_address): """ Get the interface wihch can reach the neigh_address """ return get_neighbours_info(neigh_address)[neigh_address]["attach_if"]
[docs]def get_neigh_mac(neigh_address): """ Get neighbour mac by his address """ return get_neighbours_info(neigh_address)[neigh_address]["mac"]
[docs]def check_add_dnsmasq_to_br(br_name, tmpdir): """ Add dnsmasq for bridge. dnsmasq could be added only if bridge has assigned ip address. :param bridge_name: Name of bridge. :param bridge_ip: Bridge ip. :param tmpdir: Tmp dir for save pid file and ip range file. :return: When new dnsmasq is started name of pidfile otherwise return None because system dnsmasq is already started on bridge. """ br_ips = get_net_if_addrs(br_name)["ipv4"] if not br_ips: raise BRIpError(br_name) dnsmasq_listen = find_dnsmasq_listen_address() dhcp_ip_start = br_ips[0].split(".") dhcp_ip_start[3] = "128" dhcp_ip_start = ".".join(dhcp_ip_start) dhcp_ip_end = br_ips[0].split(".") dhcp_ip_end[3] = "254" dhcp_ip_end = ".".join(dhcp_ip_end) pidfile = ("%s-dnsmasq.pid") % (br_ips[0]) leases = ("%s.leases") % (br_ips[0]) if not (set(br_ips) & set(dnsmasq_listen)): logging.debug("There is no dnsmasq on br %s." "Starting new one." % (br_name)) utils.run("/usr/sbin/dnsmasq --strict-order --bind-interfaces" " --pid-file=%s --conf-file= --except-interface lo" " --listen-address %s --dhcp-range %s,%s --dhcp-leasefile=%s" " --dhcp-lease-max=127 --dhcp-no-override" % (os.path.join(tmpdir, pidfile), br_ips[0], dhcp_ip_start, dhcp_ip_end, (os.path.join(tmpdir, leases)))) return pidfile return None
@__init_openvswitch
[docs]def find_bridge_manager(br_name, ovs=None): """ Finds bridge which contain interface iface_name. :param br_name: Name of interface. :return: (br_manager) which contain bridge or None. """ if ovs is None: ovs = __ovs # find ifname in standard linux bridge. if br_name in __bridge.list_br(): return __bridge elif ovs is not None and br_name in ovs.list_br(): return ovs else: return None
@__init_openvswitch
[docs]def find_current_bridge(iface_name, ovs=None): """ Finds bridge which contains interface iface_name. :param iface_name: Name of interface. :return: (br_manager, Bridge) which contain iface_name or None. """ if ovs is None: ovs = __ovs # find ifname in standard linux bridge. master = __bridge bridge = master.port_to_br(iface_name) if bridge is None and ovs: master = ovs bridge = master.port_to_br(iface_name) if bridge is None: master = None return (master, bridge)
@__init_openvswitch
[docs]def change_iface_bridge(ifname, new_bridge, ovs=None): """ Change bridge on which interface was added. :param ifname: Iface name or Iface struct. :param new_bridge: Name of new bridge. """ if ovs is None: ovs = __ovs br_manager_new = find_bridge_manager(new_bridge, ovs) if br_manager_new is None: raise BRNotExistError(new_bridge, "") if type(ifname) is str: (br_manager_old, br_old) = find_current_bridge(ifname, ovs) if br_manager_old is not None: br_manager_old.del_port(br_old, ifname) br_manager_new.add_port(new_bridge, ifname) elif issubclass(type(ifname), VirtIface): br_manager_old = find_bridge_manager(ifname.netdst, ovs) if br_manager_old is not None: br_manager_old.del_port(ifname.netdst, ifname.ifname) br_manager_new.add_port(new_bridge, ifname.ifname) ifname.netdst = new_bridge else: raise error.AutotestError("Network interface %s is wrong type %s." % (ifname, new_bridge))
@__init_openvswitch
[docs]def ovs_br_exists(brname, ovs=None): """ Check if bridge exists or not on OVS system :param brname: Name of the bridge :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs return brname in ovs.list_br()
@__init_openvswitch
[docs]def add_ovs_bridge(brname, ovs=None): """ Add a bridge to ovs :param brname: Name of the bridge :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs if not ovs_br_exists(brname, ovs): ovs.add_br(brname)
@__init_openvswitch
[docs]def del_ovs_bridge(brname, ovs=None): """ Delete a bridge from ovs :param brname: Name of the bridge :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs if ovs_br_exists(brname, ovs): ovs.del_br(brname) else: raise BRNotExistError(brname, "")
@__init_openvswitch
[docs]def add_to_bridge(ifname, brname, ovs=None): """ Add a TAP device to bridge :param ifname: Name of TAP device :param brname: Name of the bridge :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs _ifname = None if type(ifname) is str: _ifname = ifname elif issubclass(type(ifname), VirtIface): _ifname = ifname.ifname if brname in __bridge.list_br(): # Try add port to standard bridge or openvswitch in compatible mode. __bridge.add_port(brname, _ifname) return if ovs is None: raise BRAddIfError(ifname, brname, "There is no bridge in system.") # Try add port to OpenVSwitch bridge. if brname in ovs.list_br(): ovs.add_port(brname, ifname)
@__init_openvswitch
[docs]def del_from_bridge(ifname, brname, ovs=None): """ Del a TAP device to bridge :param ifname: Name of TAP device :param brname: Name of the bridge :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs _ifname = None if type(ifname) is str: _ifname = ifname elif issubclass(type(ifname), VirtIface): _ifname = ifname.ifname if ovs is None: raise BRDelIfError(ifname, brname, "There is no bridge in system.") if brname in __bridge.list_br(): # Try add port to standard bridge or openvswitch in compatible mode. __bridge.del_port(brname, _ifname) return # Try add port to OpenVSwitch bridge. if brname in ovs.list_br(): ovs.del_port(brname, _ifname)
@__init_openvswitch
[docs]def openflow_manager(br_name, command, flow_options=None, ovs=None): """ Manager openvswitch flow rules :param br_name: name of the bridge :param command: manager cmd(add-flow, del-flows, dump-flows..) :param flow_options: open flow options :param ovs: OpenVSwitch object. """ if ovs is None: ovs = __ovs if ovs is None or br_name not in ovs.list_br(): raise OpenflowSwitchError(br_name) manager_cmd = "ovs-ofctl %s %s" % (command, br_name) if flow_options: manager_cmd += " %s" % flow_options return utils.run(manager_cmd)
[docs]def bring_up_ifname(ifname): """ Bring up an interface :param ifname: Name of the interface """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) ifr = struct.pack("16sh", ifname, arch.IFF_UP) try: fcntl.ioctl(ctrl_sock, arch.SIOCSIFFLAGS, ifr) except IOError: raise TAPBringUpError(ifname) ctrl_sock.close()
[docs]def bring_down_ifname(ifname): """ Bring down an interface :param ifname: Name of the interface """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) ifr = struct.pack("16sh", ifname, 0) try: fcntl.ioctl(ctrl_sock, arch.SIOCSIFFLAGS, ifr) except IOError: raise TAPBringDownError(ifname) ctrl_sock.close()
[docs]def if_set_macaddress(ifname, mac): """ Set the mac address for an interface :param ifname: Name of the interface :param mac: Mac address """ ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) ifr = struct.pack("256s", ifname) try: mac_dev = fcntl.ioctl(ctrl_sock, arch.SIOCGIFHWADDR, ifr)[18:24] mac_dev = ":".join(["%02x" % ord(m) for m in mac_dev]) except IOError, e: raise HwAddrGetError(ifname) if mac_dev.lower() == mac.lower(): return ifr = struct.pack("16sH14s", ifname, 1, "".join([chr(int(m, 16)) for m in mac.split(":")])) try: fcntl.ioctl(ctrl_sock, arch.SIOCSIFHWADDR, ifr) except IOError, e: logging.info(e) raise HwAddrSetError(ifname, mac) ctrl_sock.close()
[docs]class IPv6Manager(propcan.PropCanBase): """ Setup and cleanup IPv6 environment. """ __slots__ = ('server_ip', 'server_user', 'server_pwd', 'server_ifname', 'client_ifname', 'client_ipv6_addr', 'server_ipv6_addr', 'client', 'port', 'runner', 'prompt', 'session', 'auto_recover', 'check_ipv6_connectivity') def __init__(self, *args, **dargs): init_dict = dict(*args, **dargs) init_dict['server_ip'] = init_dict.get('server_ip', 'SERVER.IP') init_dict['server_user'] = init_dict.get('server_user', 'root') init_dict['server_pwd'] = init_dict.get('server_pwd', None) init_dict['server_ifname'] = init_dict.get('server_ifname', 'eth0') init_dict['server_ipv6_addr'] = init_dict.get('server_ipv6_addr') init_dict['client_ifname'] = init_dict.get('client_ifname', 'eth0') init_dict['client_ipv6_addr'] = init_dict.get('client_ipv6_addr') init_dict['client'] = init_dict.get('client', 'ssh') init_dict['port'] = init_dict.get('port', 22) init_dict['prompt'] = init_dict.get('prompt', r"[\#\$]\s*$") init_dict['auto_recover'] = init_dict.get('auto_recover', False) init_dict['check_ipv6_connectivity'] = \ init_dict.get('check_ipv6_connectivity', 'yes') self.__dict_set__('session', None) super(IPv6Manager, self).__init__(init_dict) def __del__(self): """ Close opened session and recover network configuration. """ self.close_session() if self.auto_recover: try: self.cleanup() except: raise error.TestError("Failed to cleanup test environment") def _new_session(self): """ Build a new server session. """ port = self.port prompt = self.prompt host = self.server_ip client = self.client username = self.server_user password = self.server_pwd try: session = remote.wait_for_login(client, host, port, username, password, prompt) except remote.LoginTimeoutError: raise error.TestError("Got a timeout error when login to server.") except remote.LoginAuthenticationError: raise error.TestError("Authentication failed to login to server.") except remote.LoginProcessTerminatedError: raise error.TestError("Host terminates during login to server.") except remote.LoginError: raise error.TestError("Some error occurs login to client server.") return session
[docs] def get_session(self): """ Make sure the session is alive and available """ session = self.__dict_get__('session') if (session is not None) and (session.is_alive()): return session else: session = self._new_session() self.__dict_set__('session', session) return session
[docs] def close_session(self): """ If the session exists then close it. """ if self.session: self.session.close()
[docs] def get_addr_list(self, runner=None): """ Get IPv6 address list from local and remote host. """ ipv6_addr_list = [] if not runner: ipv6_addr_list = get_net_if_addrs(self.client_ifname).get("ipv6") logging.debug("Local IPv6 address list: %s", ipv6_addr_list) else: ipv6_addr_list = get_net_if_addrs(self.server_ifname, runner).get("ipv6") logging.debug("remote IPv6 address list: %s", ipv6_addr_list) return ipv6_addr_list
@staticmethod
[docs] def check_connectivity(client_ifname, server_ipv6, count=5): """ Check IPv6 network connectivity :param client_ifname: client network interface name :param server_ipv6: server IPv6 address ::param count: sending packets counts, default is 5 """ try: os_dep.command("ping6") except ValueError: raise error.TestNAError("Can't find ping6 command") command = "ping6 -I %s %s -c %s" % (client_ifname, server_ipv6, count) result = utils.run(command, ignore_status=True) if result.exit_status: raise error.TestNAError("The '%s' destination is unreachable:" " %s", server_ipv6, result.stderr) else: logging.info("The '%s' destination is connectivity!", server_ipv6)
[docs] def flush_ip6tables(self): """ Refresh IPv6 firewall rules """ flush_cmd = "ip6tables -F" find_ip6tables_cmd = "which ip6tables" test_NA_err = "Can't find ip6tables command" test_fail_err = "Failed to flush 'icmp6-adm-prohibited' rule" flush_cmd_pass = "Succeed to run command '%s'" % flush_cmd # check if ip6tables command exists on the local try: os_dep.command("ip6tables") except ValueError: raise error.TestNAError(test_NA_err) # flush local ip6tables rules result = utils.run(flush_cmd, ignore_status=True) if result.exit_status: raise error.TestFail("%s on local host:%s" % (test_fail_err, result.stderr)) else: logging.info("%s on the local host", flush_cmd_pass) # check if ip6tables command exists on the remote if self.session.cmd_status(find_ip6tables_cmd): raise error.TestNAError(test_NA_err) # flush remote ip6tables rules if self.session.cmd_status(flush_cmd): raise error.TestFail("%s on the remote host" % test_fail_err) else: logging.info("%s on the remote host", flush_cmd_pass)
[docs] def setup(self): """ Setup IPv6 network environment. """ self.session = self.get_session() runner = self.session.cmd_output try: logging.info("Prepare to configure IPv6 test environment...") # configure global IPv6 address for local host set_net_if_ip(self.client_ifname, self.client_ipv6_addr) # configure global IPv6 address for remote host set_net_if_ip(self.server_ifname, self.server_ipv6_addr, runner) # check IPv6 network connectivity if self.check_ipv6_connectivity == "yes": # the ipv6 address looks like this '3efe::101/64' ipv6_addr_des = self.server_ipv6_addr.split('/')[0] self.check_connectivity(self.client_ifname, ipv6_addr_des) # flush ip6tables both local and remote host self.flush_ip6tables() except Exception, e: self.close_session() raise error.TestError("Failed to setup IPv6 environment!!:%s", e)
[docs] def cleanup(self): """ Cleanup IPv6 network environment. """ logging.info("Prepare to clean up IPv6 test environment...") local_ipv6_addr_list = self.get_addr_list() # the ipv6 address looks like this '3efe::101/64' ipv6_addr_src = self.client_ipv6_addr.split('/')[0] ipv6_addr_des = self.server_ipv6_addr.split('/')[0] # delete global IPv6 address from local host if ipv6_addr_src in local_ipv6_addr_list: del_net_if_ip(self.client_ifname, self.client_ipv6_addr) self.session = self.get_session() runner = self.session.cmd_output remote_ipv6_addr_list = self.get_addr_list(runner) # delete global IPv6 address from remote host if ipv6_addr_des in remote_ipv6_addr_list: del_net_if_ip(self.server_ifname, self.server_ipv6_addr, runner) # make sure opening session is closed self.close_session()
[docs]class VirtIface(propcan.PropCan, object): """ Networking information for single guest interface and host connection. """ __slots__ = ['nic_name', 'g_nic_name', 'mac', 'nic_model', 'ip', 'nettype', 'netdst'] # Make sure first byte generated is always zero and it follows # the class definition. This helps provide more predictable # addressing while avoiding clashes between multiple NICs. LASTBYTE = random.SystemRandom().randint(0x00, 0xff) def __getstate__(self): state = {} for key in self.__class__.__all_slots__: if key in self: state[key] = self[key] return state def __setstate__(self, state): self.__init__(state) @classmethod
[docs] def name_is_valid(cls, nic_name): """ Corner-case prevention where nic_name is not a sane string value """ try: return isinstance(nic_name, str) and len(nic_name) > 1 except (TypeError, KeyError, AttributeError): return False
@classmethod
[docs] def mac_is_valid(cls, mac): try: mac = cls.mac_str_to_int_list(mac) except TypeError: return False return True # Though may be less than 6 bytes
@classmethod
[docs] def mac_str_to_int_list(cls, mac): """ Convert list of string bytes to int list """ if isinstance(mac, (str, unicode)): mac = mac.split(':') # strip off any trailing empties for rindex in xrange(len(mac), 0, -1): if not mac[rindex - 1].strip(): del mac[rindex - 1] else: break try: assert len(mac) < 7 for byte_str_index in xrange(0, len(mac)): byte_str = mac[byte_str_index] assert isinstance(byte_str, (str, unicode)) assert len(byte_str) > 0 try: value = eval("0x%s" % byte_str, {}, {}) except SyntaxError: raise AssertionError assert value >= 0x00 assert value <= 0xFF mac[byte_str_index] = value except AssertionError: raise TypeError("%s %s is not a valid MAC format " "string or list" % (str(mac.__class__), str(mac))) return mac
@classmethod
[docs] def int_list_to_mac_str(cls, mac_bytes): """ Return string formatting of int mac_bytes """ for byte_index in xrange(0, len(mac_bytes)): mac = mac_bytes[byte_index] # Project standardized on lower-case hex if mac < 16: mac_bytes[byte_index] = "0%x" % mac else: mac_bytes[byte_index] = "%x" % mac return mac_bytes
@classmethod
[docs] def generate_bytes(cls): """ Return next byte from ring """ cls.LASTBYTE += 1 if cls.LASTBYTE > 0xff: cls.LASTBYTE = 0 yield cls.LASTBYTE
@classmethod
[docs] def complete_mac_address(cls, mac): """ Append randomly generated byte strings to make mac complete :param mac: String or list of mac bytes (possibly incomplete) :raise: TypeError if mac is not a string or a list """ mac = cls.mac_str_to_int_list(mac) if len(mac) == 6: return ":".join(cls.int_list_to_mac_str(mac)) for rand_byte in cls.generate_bytes(): mac.append(rand_byte) return cls.complete_mac_address(cls.int_list_to_mac_str(mac))
[docs]class LibvirtIface(VirtIface): """ Networking information specific to libvirt """ __slots__ = []
[docs]class QemuIface(VirtIface): """ Networking information specific to Qemu """ __slots__ = ['vlan', 'device_id', 'ifname', 'tapfds', 'tapfd_ids', 'netdev_id', 'tftp', 'romfile', 'nic_extra_params', 'netdev_extra_params', 'queues', 'vhostfds', 'vectors']
[docs]class VMNet(list): """ Collection of networking information. """ # don't flood discard warnings DISCARD_WARNINGS = 10 # __init__ must not presume clean state, it should behave # assuming there is existing properties/data on the instance # and take steps to preserve or update it as appropriate. def __init__(self, container_class=VirtIface, virtiface_list=[]): """ Initialize from list-like virtiface_list using container_class """ if container_class != VirtIface and ( not issubclass(container_class, VirtIface)): raise TypeError("Container class must be Base_VirtIface " "or subclass not a %s" % str(container_class)) self.container_class = container_class super(VMNet, self).__init__([]) if isinstance(virtiface_list, list): for virtiface in virtiface_list: self.append(virtiface) else: raise VMNetError def __getstate__(self): return [nic for nic in self] def __setstate__(self, state): VMNet.__init__(self, self.container_class, state) def __getitem__(self, index_or_name): if isinstance(index_or_name, str): index_or_name = self.nic_name_index(index_or_name) return super(VMNet, self).__getitem__(index_or_name) def __setitem__(self, index_or_name, value): if not isinstance(value, dict): raise VMNetError if self.container_class.name_is_valid(value['nic_name']): if isinstance(index_or_name, str): index_or_name = self.nic_name_index(index_or_name) self.process_mac(value) super(VMNet, self).__setitem__(index_or_name, self.container_class(value)) else: raise VMNetError def __delitem__(self, index_or_name): if isinstance(index_or_name, str): index_or_name = self.nic_name_index(index_or_name) super(VMNet, self).__delitem__(index_or_name)
[docs] def subclass_pre_init(self, params, vm_name): """ Subclasses must establish style before calling VMNet. __init__() """ # TODO: Get rid of this function. it's main purpose is to provide # a shared way to setup style (container_class) from params+vm_name # so that unittests can run independently for each subclass. self.vm_name = vm_name self.params = params.object_params(self.vm_name) self.vm_type = self.params.get('vm_type', 'default') self.driver_type = self.params.get('driver_type', 'default') for key, value in VMNetStyle(self.vm_type, self.driver_type).items(): setattr(self, key, value)
[docs] def process_mac(self, value): """ Strips 'mac' key from value if it's not valid """ original_mac = mac = value.get('mac') if mac: mac = value['mac'] = value['mac'].lower() if len(mac.split(':') ) == 6 and self.container_class.mac_is_valid(mac): return else: del value['mac'] # don't store invalid macs # Notify user about these, but don't go crazy if self.__class__.DISCARD_WARNINGS >= 0: logging.warning('Discarded invalid mac "%s" for nic "%s" ' 'from input, %d warnings remaining.' % (original_mac, value.get('nic_name'), self.__class__.DISCARD_WARNINGS)) self.__class__.DISCARD_WARNINGS -= 1
[docs] def mac_list(self): """ Return a list of all mac addresses used by defined interfaces """ return [nic.mac for nic in self if hasattr(nic, 'mac')]
[docs] def append(self, value): newone = self.container_class(value) newone_name = newone['nic_name'] if newone.name_is_valid(newone_name) and ( newone_name not in self.nic_name_list()): self.process_mac(newone) super(VMNet, self).append(newone) else: raise VMNetError
[docs] def nic_name_index(self, name): """ Return the index number for name, or raise KeyError """ if not isinstance(name, str): raise TypeError("nic_name_index()'s nic_name must be a string") nic_name_list = self.nic_name_list() try: return nic_name_list.index(name) except ValueError: raise IndexError("Can't find nic named '%s' among '%s'" % (name, nic_name_list))
[docs] def nic_name_list(self): """ Obtain list of nic names from lookup of contents 'nic_name' key. """ namelist = [] for item in self: # Rely on others to throw exceptions on 'None' names namelist.append(item['nic_name']) return namelist
[docs] def nic_lookup(self, prop_name, prop_value): """ Return the first index with prop_name key matching prop_value or None """ for nic_index in xrange(0, len(self)): if self[nic_index].has_key(prop_name): if self[nic_index][prop_name] == prop_value: return nic_index return None
# TODO: Subclass VMNet into Qemu/Libvirt variants and # pull them, along with ParmasNet and maybe DbNet based on # Style definitions. i.e. libvirt doesn't need DbNet at all, # but could use some custom handling at the VMNet layer # for xen networking. This will also enable further extensions # to network information handing in the future.
[docs]class VMNetStyle(dict): """ Make decisions about needed info from vm_type and driver_type params. """ # Keyd first by vm_type, then by driver_type. VMNet_Style_Map = { 'default': { 'default': { 'mac_prefix': '9a', 'container_class': QemuIface, } }, 'libvirt': { 'default': { 'mac_prefix': '9a', 'container_class': LibvirtIface, }, 'qemu': { 'mac_prefix': '52:54:00', 'container_class': LibvirtIface, }, 'xen': { 'mac_prefix': '00:16:3e', 'container_class': LibvirtIface, } } } def __new__(cls, vm_type, driver_type): return cls.get_style(vm_type, driver_type) @classmethod
[docs] def get_vm_type_map(cls, vm_type): return cls.VMNet_Style_Map.get(vm_type, cls.VMNet_Style_Map['default'])
@classmethod
[docs] def get_driver_type_map(cls, vm_type_map, driver_type): return vm_type_map.get(driver_type, vm_type_map['default'])
@classmethod
[docs] def get_style(cls, vm_type, driver_type): style = cls.get_driver_type_map(cls.get_vm_type_map(vm_type), driver_type) return style
[docs]class ParamsNet(VMNet): """ Networking information from Params Params contents specification- vms = <vm names...> nics = <nic names...> nics_<vm name> = <nic names...> # attr: mac, ip, model, nettype, netdst, etc. <attr> = value <attr>_<nic name> = value """ # __init__ must not presume clean state, it should behave # assuming there is existing properties/data on the instance # and take steps to preserve or update it as appropriate. def __init__(self, params, vm_name): self.subclass_pre_init(params, vm_name) # use temporary list to initialize result_list = [] nic_name_list = self.params.objects('nics') for nic_name in nic_name_list: # nic name is only in params scope nic_dict = {'nic_name': nic_name} nic_params = self.params.object_params(nic_name) # set default values for the nic nic_params = self.__set_default_params__(nic_name, nic_params) # avoid processing unsupported properties proplist = list(self.container_class().__all_slots__) # nic_name was already set, remove from __slots__ list copy del proplist[proplist.index('nic_name')] for propertea in proplist: # Merge existing propertea values if they exist try: existing_value = getattr(self[nic_name], propertea, None) except ValueError: existing_value = None except IndexError: existing_value = None nic_dict[propertea] = nic_params.get(propertea, existing_value) result_list.append(nic_dict) VMNet.__init__(self, self.container_class, result_list) def __set_default_params__(self, nic_name, nic_params): """ Use params to overwrite defaults from DbParams param: nic_name: nic name (string) param: nic_params: params contain nic properties(like dict) """ default_params = {} default_params['queues'] = 1 default_params['tftp'] = None default_params['romfile'] = None default_params['nic_extra_params'] = '' default_params['netdev_extra_params'] = '' nic_name_list = self.params.objects('nics') default_params['vlan'] = str(nic_name_list.index(nic_name)) if nic_params.get('enable_misx_vectors') == 'yes': default_params['vectors'] = 2 * 1 + 2 for key, val in default_params.items(): nic_params.setdefault(key, val) return nic_params
[docs] def mac_index(self): """ Generator over mac addresses found in params """ for nic_name in self.params.get('nics'): nic_obj_params = self.params.object_params(nic_name) mac = nic_obj_params.get('mac') if mac: yield mac else: continue
[docs] def reset_mac(self, index_or_name): """ Reset to mac from params if defined and valid, or undefine. """ nic = self[index_or_name] nic_name = nic.nic_name nic_params = self.params.object_params(nic_name) params_mac = nic_params.get('mac') if params_mac and self.container_class.mac_is_valid(params_mac): new_mac = params_mac.lower() else: new_mac = None nic.mac = new_mac
[docs] def reset_ip(self, index_or_name): """ Reset to ip from params if defined and valid, or undefine. """ nic = self[index_or_name] nic_name = nic.nic_name nic_params = self.params.object_params(nic_name) params_ip = nic_params.get('ip') if params_ip: new_ip = params_ip else: new_ip = None nic.ip = new_ip
[docs]class DbNet(VMNet): """ Networking information from database Database specification- database values are python string-formatted lists of dictionaries """ # __init__ must not presume clean state, it should behave # assuming there is existing properties/data on the instance # and take steps to preserve or update it as appropriate. def __init__(self, params, vm_name, db_filename, db_key): self.subclass_pre_init(params, vm_name) self.db_key = db_key self.db_filename = db_filename self.db_lockfile = db_filename + ".lock" # Merge (don't overwrite) existing propertea values if they # exist in db try: self.lock_db() entry = self.db_entry() except KeyError: entry = [] self.unlock_db() proplist = list(self.container_class().__all_slots__) # nic_name was already set, remove from __slots__ list copy del proplist[proplist.index('nic_name')] nic_name_list = self.nic_name_list() for db_nic in entry: nic_name = db_nic['nic_name'] if nic_name in nic_name_list: for propertea in proplist: # only set properties in db but not in self if propertea in db_nic: self[nic_name].set_if_none( propertea, db_nic[propertea]) if entry: VMNet.__init__(self, self.container_class, entry) # Assume self.update_db() called elsewhere
[docs] def lock_db(self): if not hasattr(self, 'lock'): self.lock = utils_misc.lock_file(self.db_lockfile) if not hasattr(self, 'db'): self.db = shelve.open(self.db_filename) else: raise DbNoLockError else: raise DbNoLockError
[docs] def unlock_db(self): if hasattr(self, 'db'): self.db.close() del self.db if hasattr(self, 'lock'): utils_misc.unlock_file(self.lock) del self.lock else: raise DbNoLockError else: raise DbNoLockError
[docs] def db_entry(self, db_key=None): """ Returns a python list of dictionaries from locked DB string-format entry """ if not db_key: db_key = self.db_key try: db_entry = self.db[db_key] except AttributeError: # self.db doesn't exist: raise DbNoLockError # Always wear protection try: eval_result = eval(db_entry, {}, {}) except SyntaxError: raise ValueError("Error parsing entry for %s from " "database '%s'" % (self.db_key, self.db_filename)) if not isinstance(eval_result, list): raise ValueError("Unexpected database data: %s" % ( str(eval_result))) result = [] for result_dict in eval_result: if not isinstance(result_dict, dict): raise ValueError("Unexpected database sub-entry data %s" % ( str(result_dict))) result.append(result_dict) return result
[docs] def save_to_db(self, db_key=None): """ Writes string representation out to database """ if db_key is None: db_key = self.db_key data = str(self) # Avoid saving empty entries if len(data) > 3: try: self.db[self.db_key] = data except AttributeError: raise DbNoLockError else: try: # make sure old db entry is removed del self.db[db_key] except KeyError: pass
[docs] def update_db(self): self.lock_db() self.save_to_db() self.unlock_db()
[docs] def mac_index(self): """Generator of mac addresses found in database""" try: for db_key in self.db.keys(): for nic in self.db_entry(db_key): mac = nic.get('mac') if mac: yield mac else: continue except AttributeError: raise DbNoLockError
ADDRESS_POOL_FILENAME = os.path.join("/tmp", "address_pool") ADDRESS_POOL_LOCK_FILENAME = ADDRESS_POOL_FILENAME + ".lock"
[docs]def clean_tmp_files(): """ Remove the base address pool filename. """ if os.path.isfile(ADDRESS_POOL_LOCK_FILENAME): os.unlink(ADDRESS_POOL_LOCK_FILENAME) if os.path.isfile(ADDRESS_POOL_FILENAME): os.unlink(ADDRESS_POOL_FILENAME)
[docs]class VirtNet(DbNet, ParamsNet): """ Persistent collection of VM's networking information. """ # __init__ must not presume clean state, it should behave # assuming there is existing properties/data on the instance # and take steps to preserve or update it as appropriate. def __init__(self, params, vm_name, db_key, db_filename=ADDRESS_POOL_FILENAME): """ Load networking info. from db, then from params, then update db. :param params: Params instance using specification above :param vm_name: Name of the VM as might appear in Params :param db_key: database key uniquely identifying VM instance :param db_filename: database file to cache previously parsed params """ # Params always overrides database content DbNet.__init__(self, params, vm_name, db_filename, db_key) ParamsNet.__init__(self, params, vm_name) self.update_db() # Delegating get/setstate() details more to ancestor classes # doesn't play well with multi-inheritence. While possibly # more difficult to maintain, hard-coding important property # names for pickling works. The possibility also remains open # for extensions via style-class updates. def __getstate__(self): state = {'container_items': VMNet.__getstate__(self)} for attrname in ['params', 'vm_name', 'db_key', 'db_filename', 'vm_type', 'driver_type', 'db_lockfile']: state[attrname] = getattr(self, attrname) for style_attr in VMNetStyle(self.vm_type, self.driver_type).keys(): state[style_attr] = getattr(self, style_attr) return state def __setstate__(self, state): for key in state.keys(): if key == 'container_items': continue # handle outside loop setattr(self, key, state.pop(key)) VMNet.__setstate__(self, state.pop('container_items')) def __eq__(self, other): if len(self) != len(other): return False # Order doesn't matter for most OS's as long as MAC & netdst match for nic_name in self.nic_name_list(): if self[nic_name] != other[nic_name]: return False return True def __ne__(self, other): return not self.__eq__(other)
[docs] def mac_index(self): """ Generator for all allocated mac addresses (requires db lock) """ for mac in DbNet.mac_index(self): yield mac for mac in ParamsNet.mac_index(self): yield mac
[docs] def generate_mac_address(self, nic_index_or_name, attempts=1024): """ Set & return valid mac address for nic_index_or_name or raise NetError :param nic_index_or_name: index number or name of NIC :return: MAC address string :raise: NetError if mac generation failed """ nic = self[nic_index_or_name] if nic.has_key('mac'): logging.warning("Overwriting mac %s for nic %s with random" % (nic.mac, str(nic_index_or_name))) self.free_mac_address(nic_index_or_name) attempts_remaining = attempts while attempts_remaining > 0: mac_attempt = nic.complete_mac_address(self.mac_prefix) self.lock_db() if mac_attempt not in self.mac_index(): nic.mac = mac_attempt.lower() self.unlock_db() self.update_db() return self[nic_index_or_name].mac else: attempts_remaining -= 1 self.unlock_db() raise NetError("%s/%s MAC generation failed with prefix %s after %d " "attempts for NIC %s on VM %s (%s)" % ( self.vm_type, self.driver_type, self.mac_prefix, attempts, str(nic_index_or_name), self.vm_name, self.db_key))
[docs] def free_mac_address(self, nic_index_or_name): """ Remove the mac value from nic_index_or_name and cache unless static :param nic_index_or_name: index number or name of NIC """ nic = self[nic_index_or_name] if nic.has_key('mac'): # Reset to params definition if any, or None self.reset_mac(nic_index_or_name) self.update_db()
[docs] def set_mac_address(self, nic_index_or_name, mac): """ Set a MAC address to value specified :param nic_index_or_name: index number or name of NIC :raise: NetError if mac already assigned """ nic = self[nic_index_or_name] if nic.has_key('mac'): logging.warning("Overwriting mac %s for nic %s with %s" % (nic.mac, str(nic_index_or_name), mac)) nic.mac = mac.lower() self.update_db()
[docs] def get_mac_address(self, nic_index_or_name): """ Return a MAC address for nic_index_or_name :param nic_index_or_name: index number or name of NIC :return: MAC address string. """ return self[nic_index_or_name].mac.lower()
[docs] def generate_ifname(self, nic_index_or_name): """ Return and set network interface name """ nic_index = self.nic_name_index(self[nic_index_or_name].nic_name) prefix = "t%d-" % nic_index postfix = utils_misc.generate_random_string(6) # Ensure interface name doesn't excede 11 characters self[nic_index_or_name].ifname = (prefix + postfix)[-11:] self.update_db() return self[nic_index_or_name].ifname
[docs]def parse_arp(): """ Read /proc/net/arp, return a mapping of MAC to IP :return: dict mapping MAC to IP """ ret = {} arp_cache = file('/proc/net/arp').readlines() for line in arp_cache: mac = line.split()[3] ip = line.split()[0] flag = line.split()[2] # Skip the header if mac.count(":") != 5: continue # Skip the incomplete ARP entries if flag == "0x0": continue ret[mac] = ip return ret
[docs]def verify_ip_address_ownership(ip, macs, timeout=60.0): """ Use arping and the ARP cache to make sure a given IP address belongs to one of the given MAC addresses. :param ip: An IP address. :param macs: A list or tuple of MAC addresses. :return: True if ip is assigned to a MAC address in macs. """ def __arping(regex, arping_cmd, ip): # Compile a regex that matches the given IP address and any of the # given MAC addresses from arping output ip_map = parse_arp() for mac in macs: if ip_map.get(mac) == ip: return True o = commands.getoutput(arping_cmd) if not regex.search(o): logging.debug("Verify arping result failed: %s" % o) return False return True # Get the name of the bridge device for ip route cache ip_cmd = utils_misc.find_command("ip") ip_cmd = "%s route get %s; %s route | grep default" % (ip_cmd, ip, ip_cmd) output = commands.getoutput(ip_cmd) devs = re.findall(r"dev\s+\S+", output, re.I) checked_devs = [] if not devs: logging.debug("No dev in route table to %s: %s" % (ip, output)) return False mac_regex = "|".join("(%s)" % mac for mac in macs) regex = re.compile(r"\b%s\b.*\b(%s)\b" % (ip, mac_regex), re.I) arping_bin = utils_misc.find_command("arping") for dev in devs: dev = dev.split()[-1] if dev in checked_devs: continue arping_cmd = "%s -f -c 3 -I %s %s" % (arping_bin, dev, ip) ret = utils_misc.wait_for(lambda: __arping(regex, arping_cmd, ip), timeout=timeout) checked_devs.append(dev) if ret: return bool(ret) return False
[docs]def generate_mac_address_simple(): r = random.SystemRandom() mac = "9a:%02x:%02x:%02x:%02x:%02x" % (r.randint(0x00, 0xff), r.randint(0x00, 0xff), r.randint(0x00, 0xff), r.randint(0x00, 0xff), r.randint(0x00, 0xff)) return mac
[docs]def get_ip_address_by_interface(ifname): """ returns ip address by interface :param ifname - interface name :raise NetError - When failed to fetch IP address (ioctl raised IOError.). Retrieves interface address from socket fd trough ioctl call and transforms it into string from 32-bit packed binary by using socket.inet_ntoa(). """ mysocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: return socket.inet_ntoa(fcntl.ioctl( mysocket.fileno(), arch.SIOCGIFADDR, # ifname to binary IFNAMSIZ == 16 struct.pack('256s', ifname[:15]) )[20:24]) except IOError: raise NetError( "Error while retrieving IP address from interface %s." % ifname)
[docs]def get_host_ip_address(params): """ returns ip address of host specified in host_ip_addr parameter If provided otherwise ip address on interface specified in netdst parameter is returned :param params """ host_ip = params.get('host_ip_addr', None) if not host_ip: host_ip = get_ip_address_by_interface(params.get('netdst')) logging.warning("No IP address of host was provided, using IP address" " on %s interface", str(params.get('netdst'))) return host_ip
[docs]def get_all_ips(): """ Get all IPv4 and IPv6 addresses from all interfaces. """ ips = [] # Get all ipv4 IPs. for iface in get_host_iface(): try: ip_addr = get_ip_address_by_interface(iface) except NetError: pass else: if ip_addr is not None: ip_info = { 'iface': iface, 'addr': ip_addr, 'version': 'ipv4', } ips.append(IPAddress(info=ip_info)) # Get all ipv6 IPs. if_inet6_fp = open('/proc/net/if_inet6', 'r') for line in if_inet6_fp.readlines(): # ipv6_ip, dev_no, len_prefix, scope, iface_flag, iface ipv6_ip, dev_no, _, _, _, iface = line.split() ipv6_ip = ":".join([ipv6_ip[i:i + 4] for i in range(0, 32, 4)]) ip_info = { 'iface': iface, 'addr': ipv6_ip, 'version': 'ipv6', 'scope': int(dev_no, 16) } ips.append(IPAddress(info=ip_info)) if_inet6_fp.close() return ips
[docs]def get_correspond_ip(remote_ip): """ Get local ip address which is used to contact remote ip. :param remote_ip: Remote ip :return: Local corespond IP. """ result = utils.run("ip route get %s" % (remote_ip)).stdout local_ip = re.search("src (.+)", result) if local_ip is not None: local_ip = local_ip.groups()[0] return local_ip
[docs]def get_linux_ifname(session, mac_address=""): """ Get the interface name through the mac address. :param session: session to the virtual machine :param mac_address: the macaddress of nic :raise error.TestError in case it was not possible to determine the interface name. """ def _process_output(cmd, reg_pattern): sys_ifname = ["lo", "sit0"] try: output = session.cmd_output_safe(cmd) ifname_list = re.findall(reg_pattern, output, re.I) if not ifname_list: return None if mac_address: return ifname_list[0] for ifname in sys_ifname: if ifname in ifname_list: ifname_list.remove(ifname) return ifname_list except aexpect.ShellCmdError: return None # Try ifconfig first i = _process_output("ifconfig -a", r"(\w+)\s+Link.*%s" % mac_address) if i is not None: return i # No luck, try ip link i = _process_output("ip link | grep -B1 '%s' -i" % mac_address, r"\d+:\s+(\w+):\s+.*") if i is not None: return i # No luck, look on /sys cmd = r"grep '%s' /sys/class/net/*/address " % mac_address i = _process_output(cmd, r"net/(\w+)/address:%s" % mac_address) if i is not None: return i # If we came empty handed, let's raise an error raise error.TestError("Failed to determine interface name with " "mac %s" % mac_address)
[docs]def update_mac_ip_address(vm, params, timeout=None): """ Get mac and ip address from guest then update the mac pool and address cache :param vm: VM object :param params: Dictionary with the test parameters. """ network_query = params.get("network_query", "ifconfig") restart_network = params.get("restart_network", "service network restart") filter_list = [] mac_ip_filter = params.get("mac_ip_filter") if mac_ip_filter: filter_list.append(mac_ip_filter) else: # Provide a list of patterns for different linux version filter_list.extend([(r"HWaddr (.\w+:\w+:\w+:\w+:\w+:\w+)\s+?" "inet addr:(.\d+\.\d+\.\d+\.\d+)"), (r"inet (.\d+.\d+.\d+.\d+).*?" "ether (.\w+:\w+:\w+:\w+:\w+:\w+)")]) if timeout is None: timeout = int(params.get("login_timeout")) session = vm.wait_for_serial_login(timeout=360) end_time = time.time() + timeout macs_ips = [] num = 0 while time.time() < end_time: try: if num % 3 == 0 and num != 0: # Ignore any errors here for further operation session.cmd(restart_network, ignore_all_errors=True) output = session.cmd_status_output(network_query)[1] for mac_ip_filter in filter_list: macs_ips = re.findall(mac_ip_filter, output, re.S) if macs_ips: break # Get nics number except Exception, err: logging.error(err) nics = params.get("nics") nic_minimum = len(re.split(r"\s+", nics.strip())) if len(macs_ips) == nic_minimum: break num += 1 time.sleep(5) if len(macs_ips) < nic_minimum: logging.error("Not all nics get ip address") for (_ip, mac) in macs_ips: vlan = macs_ips.index((_ip, mac)) # _ip, mac are in different sequence in Fedora and RHEL guest. if re.match(".\d+\.\d+\.\d+\.\d+", mac): _ip, mac = mac, _ip if "-" in mac: mac = mac.replace("-", ".") vm.address_cache[mac.lower()] = _ip vm.virtnet.set_mac_address(vlan, mac)
[docs]def get_windows_nic_attribute(session, key, value, target, timeout=240, global_switch="nic"): """ Get the windows nic attribute using wmic. All the support key you can using wmic to have a check. :param session: session to the virtual machine :param key: the key supported by wmic :param value: the value of the key :param target: which nic attribute you want to get. """ cmd = 'wmic %s where %s="%s" get %s' % (global_switch, key, value, target) o = session.cmd(cmd, timeout=timeout).strip() if not o: err_msg = "Get guest %s attribute %s failed!" % (global_switch, target) raise error.TestError(err_msg) return o.splitlines()[-1]
[docs]def set_win_guest_nic_status(session, connection_id, status, timeout=240): """ Set windows guest nic ENABLED/DISABLED :param session : session to virtual machine :param connection_id : windows guest nic netconnectionid :param status : set nic ENABLED/DISABLED """ cmd = 'netsh interface set interface name="%s" admin=%s' session.cmd(cmd % (connection_id, status), timeout=timeout)
[docs]def disable_windows_guest_network(session, connection_id, timeout=240): return set_win_guest_nic_status(session, connection_id, "DISABLED", timeout)
[docs]def enable_windows_guest_network(session, connection_id, timeout=240): return set_win_guest_nic_status(session, connection_id, "ENABLED", timeout)
[docs]def restart_windows_guest_network(session, connection_id, timeout=240, mode="netsh"): """ Restart guest's network via serial console. mode "netsh" can not works in winxp system :param session: session to virtual machine :param connection_id: windows nic connectionid,it means connection name, you Can get connection id string via wmic """ if mode == "netsh": disable_windows_guest_network(session, connection_id, timeout=timeout) enable_windows_guest_network(session, connection_id, timeout=timeout) elif mode == "devcon": restart_windows_guest_network_by_devcon(session, connection_id)
[docs]def restart_windows_guest_network_by_key(session, key, value, timeout=240, mode="netsh"): """ Restart the guest network by nic Attribute like connectionid, interfaceindex, "netsh" can not work in winxp system. using devcon mode must download devcon.exe and put it under c:\ :param session: session to virtual machine :param key: the key supported by wmic nic :param value: the value of the key :param timeout: timeout :param mode: command mode netsh or devcon """ if mode == "netsh": oper_key = "netconnectionid" elif mode == "devcon": oper_key = "pnpdeviceid" id = get_windows_nic_attribute(session, key, value, oper_key, timeout) if not id: raise error.TestError("Get nic %s failed" % oper_key) if mode == "devcon": id = id.split("&")[-1] restart_windows_guest_network(session, id, timeout, mode)
[docs]def set_guest_network_status_by_devcon(session, status, netdevid, timeout=240): """ using devcon to enable/disable the network device. using it must download the devcon.exe, and put it under c:\ """ set_cmd = r"c:\devcon.exe %s =Net @PCI\*\*%s" % (status, netdevid) session.cmd(set_cmd, timeout=timeout)
[docs]def restart_windows_guest_network_by_devcon(session, netdevid, timeout=240): set_guest_network_status_by_devcon(session, 'disable', netdevid) set_guest_network_status_by_devcon(session, 'enable', netdevid)
[docs]def get_host_iface(): """ List the nic interface in host. :return: a list of the interfaces in host :rtype: list """ proc_net_file = open(PROCFS_NET_PATH, 'r') host_iface_info = proc_net_file.read() proc_net_file.close() return [_.strip() for _ in re.findall("(.*):", host_iface_info)]
[docs]def get_host_default_gateway(): """ Get the Default Gateway in host. :return: a string of the host's default gateway. :rtype: string """ cmd = "ip route | awk '/default/ { print $3 }'" try: output = utils.system_output(cmd) except: raise error.TestError("Failed to get the host's default GateWay.") return output
[docs]def check_listening_port_by_service(service, port, listen_addr='0.0.0.0', runner=None): """ Check TCP/IP listening by service """ cmd = "netstat -tunlp | grep -E '^tcp.*LISTEN.*%s.*'" % service find_netstat_cmd = "which netstat" output = "" find_str = listen_addr + ":" + port try: if not runner: try: os_dep.command("netstat") except ValueError, details: raise error.TestNAError(details) output = utils.system_output(cmd) else: if not runner(find_netstat_cmd): raise error.TestNAError("Missing netstat command on remote") output = runner(cmd) except error.CmdError: logging.error("Failed to run command '%s'", cmd) if not re.search(find_str, output, re.M): raise error.TestFail("Failed to listen %s: %s" % (find_str, output)) logging.info("The listening is active: %s", output)
[docs]def check_listening_port_remote_by_service(server_ip, server_user, server_pwd, service, port, listen_addr): """ Check remote TCP/IP listening by service """ # setup remote session session = None try: session = remote.wait_for_login('ssh', server_ip, '22', server_user, server_pwd, r"[\#\$]\s*$") runner = session.cmd_output check_listening_port_by_service(service, port, listen_addr, runner) except: if session: session.close()