Source code for virttest.utils_gdb

import os
import logging
import signal
import threading
import aexpect
import Queue
from virttest import utils_misc


[docs]class GDBError(Exception): """ General module exception class """ pass
[docs]class GDBCmdError(GDBError): """ Exception raised when calling an gdb command. """ def __init__(self, command, msg): self.command = command self.msg = msg def __str__(self): return 'Error when calling GDB command %s:\n%s' % ( self.command, self.msg)
def _split_result_str(result_str): """ Helper function to split a string into a list of (key, value) tuples while minding braces matching. """ result = [] key = '' value = '' current = 'key' braces = [] for ch in result_str: if ch == '=' and not braces: if current == 'key': current = 'value' elif ch == ',' and not braces: result.append((key, value)) key = '' value = '' if current == 'value': current = 'key' else: if ch in ['{', '[']: braces.append(ch) elif ch == ']': if braces[-1] == '[': braces.pop() elif ch == '}': if braces[-1] == '{': braces.pop() if current == 'key': key += ch elif current == 'value': value += ch if not braces: result.append((key, value)) return result def _parse_result(result_str): """ Helper function to parse a GDB/MI result string into Python friendly data collections like lists and dicts. """ if result_str.startswith('"') and result_str.endswith('"'): return result_str[1:-1] result = {} if result_str.startswith('[') and result_str.endswith(']'): result_str = result_str[1:-1] result = [] elif result_str.startswith('{') and result_str.endswith('}'): result_str = result_str[1:-1] if not result_str: return result for key, value_str in _split_result_str(result_str): value = _parse_result(value_str) if type(result) == list: result.append((key, value)) elif type(result) == dict: result[key] = value return result
[docs]class GDB(aexpect.Expect): """ Class to manipulate a inferior process in gdb. """ def __init__(self, command=None): self.running = False self.terminated = False self.exiting = False self.cmd_lock = threading.Lock() self.inferior_command = command self.output_queue = Queue.Queue() self.cmd_queue = Queue.Queue() self.cmd_result_queue = Queue.Queue() self.notify_async_output = [] self.exec_async_output = [] self.status_async_output = [] self.log_stream = [] self.console_stream = [] self.target_stream = [] self.callback_threads = [] self.prompt = '(gdb)' self.pid = None dummy_cb = (lambda self, info, params: None, None) self.callbacks = { "stop": dummy_cb, "start": dummy_cb, "termination": dummy_cb, "break": dummy_cb, "signal": dummy_cb, } self.thread_groups = {} command = 'gdb --quiet --interpreter=mi %s' % command aexpect.Expect.__init__( self, command=command, ) self.log_polling_thread = threading.Thread(target=self._poll_log) self.log_polling_thread.start() self._read_output_until_prompt(timeout=3) def _parse_notify_async_line(self, line): """ Parse a GDB/MI async notify line and change state accordingly. :param line: GDB/MI line to be parsed. """ event, info_str = line.lstrip('=').split(',', 1) info = _parse_result(info_str) if event == 'thread-group-added': self.thread_groups[info['id']] = { 'status': 'stopped', 'pid': None, 'threads': set() } self.current_thread_group = info['id'] elif event == 'thread-group-started': pid = int(info['pid']) t_group = self.thread_groups[info['id']] t_group['status'] = 'running' t_group['pid'] = self.pid = pid elif event == 'thread-group-exited': t_group = self.thread_groups[info['id']] t_group['status'] = 'stopped' t_group['pid'] = self.pid = None elif event == 'thread-created': t_group = self.thread_groups[info['group-id']] t_id = int(info['id']) t_group['threads'].add(t_id) elif event == 'thread-exited': t_group = self.thread_groups[info['group-id']] t_id = int(info['id']) t_group['threads'].remove(t_id) elif event in ['library-loaded', 'library-unloaded']: pass else: logging.warning('Unprocessed gdb async notification:\n%s', line) def _parse_exec_async_line(self, line): """ Parse a GDB/MI async exec line and change state accordingly. :param line: GDB/MI line to be parsed. """ event, info_str = line.split(',', 1) info = _parse_result(info_str) if event == '*stopped': self.running = False self._callback('stop', info) if info: if 'reason' in info: if info['reason'] == 'breakpoint-hit': self._callback('break', info) if info['reason'] == 'signal-received': self._callback('signal', info) if info['reason'] in ['exited', 'exited-normally', 'exited-signalled']: self.terminated = True self._callback('termination', info) else: for key in info: logging.warning('Stopped without reason') logging.warning('%s: %s', key, info[key]) else: self._callback('termination', info) if event == '*running': if not self.running: self.running = True self._callback('start', info) def _parse_status_async_line(self, line): """ Parse a GDB/MI async status line and change state accordingly. :param line: GDB/MI line to be parsed. """ pass def _parse_cmd_result_line(self, line): """ Parse a GDB/MI command result line and change state accordingly. :param line: GDB/MI line to be parsed. """ command = self.cmd_queue.get() result = {'command': command} res = line.lstrip('^').split(',', 1) result['status'] = res[0] if len(res) == 2: result['info'] = _parse_result(res[1]) elif len(res) == 1: result['info'] = {} self.cmd_result_queue.put(result) def _read_output_until_prompt(self, timeout=300): """ Read GDB/MI output until a command prompt reached. :param timeout: Max time for the reading. """ result = [] while True: line = self.output_queue.get(timeout=timeout) result.append(line) if line.startswith('(gdb)'): return result def _poll_log(self): """ Read GDB/MI output continuously and parse lines according to its inital until `exit` variable is set. """ while True: res = self.read_nonblocking() if res.strip(): for line in res.splitlines(): if line.startswith('='): self._parse_notify_async_line(line) elif line.startswith('*'): self._parse_exec_async_line(line) elif line.startswith('+'): self._parse_status_async_line(line) elif line.startswith('^'): self._parse_cmd_result_line(line) else: self.output_queue.put(line) if self.exiting: break def _callback(self, callback_type, info): """ General callback function to call specific type of callback funtions. :param callback_type: Could be one of "stop", "start", "termination", "break" or "signal" """ callback_func, params = self.callbacks[callback_type] logging.debug('gdb is Calling back %s' % callback_type) thread = threading.Thread( target=callback_func, args=(self, info, params), name=callback_type ) thread.start() self.callback_threads.append(thread)
[docs] def set_callback(self, callback_type, func, params=None): """ Set a callback function to a customized function. :param callback_type: Could be one of "stop", "start", "termination", "break" or "signal" :param func: Function to be set as callback :param params: Parameters to be passed to callback function """ self.callbacks[callback_type] = (func, params)
[docs] def stop(self): """ Stop inferior by sending a SIGINT signal. """ if self.running: self.send_signal('SIGINT') self.wait_for_stop()
[docs] def kill(self): """ Kill inferior by sending a SIGTERM signal. """ def temp_callback(gdb, info, params): if info['signal-name'] == 'SIGTERM': self.cont() if self.running: stop_cb, stop_cb_params = self.callbacks['signal'] self.set_callback('signal', temp_callback) self.send_signal('SIGTERM') self.wait_for_termination() self.set_callback('signal', stop_cb, stop_cb_params)
[docs] def cont(self): """ Continue a stopped inferior. """ self.cmd('-exec-continue') if not self.running and not self.terminated: self.wait_for_start()
[docs] def insert_break(self, break_func): """ Insert a function breakpoint. :param break_func: Function at which breakpoint inserted """ return self.cmd('-break-insert -f %s' % break_func)
[docs] def back_trace(self): """ Get current backtrace stack as a list of lines. """ result = self.cmd('-stack-list-frames') bts = [] for line in result['info']['stack']: bt = line[1] bt_line = "#%2s" % bt['level'] if 'addr' in bt: bt_line += " %s" % bt['addr'] if 'func' in bt: bt_line += " %-30s" % bt['func'] if 'fullname' in bt: bt_line += " %s:%s" % (bt['fullname'], bt['line']) elif 'from' in bt: bt_line += " %s" % bt['from'] bts.append(bt_line) return bts
[docs] def run(self, arg_str=''): """ Start the inferior with an optional argument string. :param arg_str: Argument the inferior to be called with """ if not self.running: cmd_line = '-exec-run ' + arg_str result = self.cmd(cmd_line) self.wait_for_start() self.terminated = False return result
[docs] def send_signal(self, signal_name): """ Send a signal to the inferior. :param signal_name: Signal name as a string or integer """ if type(signal_name) is str: signal_name = getattr(signal, signal_name) if type(signal_name) == int: os.kill(self.pid, signal_name) else: raise ValueError("Signal should be a string or an integer.")
[docs] def cmd(self, command, cont=True): """ Call a gdb of GDB/MI command. :param command: Command line to be called :param cont: Whether continue the inferior after calling the command """ was_running = False self.cmd_lock.acquire() try: if self.running: was_running = True self.stop() self.cmd_queue.put(command) self.sendline(command) result = self.cmd_result_queue.get() if result['command'] != command: raise GDBCmdError('Mismatch command result line got %s but ' 'expecting %s' % (result['command'], command)) if result['status'] == 'exit': self.exiting = True elif result['status'] == 'error': raise GDBCmdError(command, result['info']['msg']) elif result['status'] == 'running': pass elif result['status'] == 'connected': pass finally: self.cmd_lock.release() if was_running and cont: self.cont() return result
[docs] def wait_for_start(self, timeout=60): """ Wait the inferior to start. :param timeout: Max time to wait """ logging.debug("Waiting for gdb inferior %s to start" % self.inferior_command) return utils_misc.wait_for( lambda: self.running, timeout, step=0.1, )
[docs] def wait_for_stop(self, timeout=60): """ Wait the inferior to be stopped. :param timeout: Max time to wait """ logging.debug("Waiting for gdb inferior %s to stop" % self.inferior_command) res = utils_misc.wait_for( lambda: not self.running, timeout, step=0.1, ) return res
[docs] def wait_for_termination(self, timeout=60): """ Wait the gdb session to be exited. :param timeout: Max time to wait """ logging.debug("Waiting for gdb to terminate") return utils_misc.wait_for( lambda: self.terminated, timeout, step=0.1, )
[docs] def exit(self): """ Exit the gdb session. """ self.cmd('-gdb-exit', cont=False) self.log_polling_thread.join() for thread in self.callback_threads: thread.join()