#!/usr/bin/env python
'''
Created on Dec 6, 2013
:author: jzupka
'''
import os
import sys
import select
import time
import stat
import gc
import logging
import traceback
import subprocess
import string
import random
import shutil
import signal
import remote_interface
import messenger as ms
[docs]def daemonize(pipe_root_path="/tmp"):
"""
Init daemon.
:param pipe_root_path: path to directory for pipe.
:return: [True if child, stdin_path, stdou_path, stderr_path]
"""
def is_file_open(path):
"""
Determine process which open file.
:param path: Path to file.
:return: [[pid,mode], ... ].
"""
opens = []
pids = os.listdir('/proc')
for pid in sorted(pids):
try:
int(pid)
except ValueError:
continue
fd_dir = os.path.join('/proc', pid, 'fd')
try:
for filepath in os.listdir(fd_dir):
try:
p = os.path.join(fd_dir, filepath)
link = os.readlink(os.path.join(fd_dir, filepath))
if link == path:
mode = os.lstat(p).st_mode
opens.append([pid, mode])
except OSError:
continue
except OSError, e:
if e.errno == 2:
continue
raise
return opens
def daemonize():
"""
Run guest as a daemon.
"""
gc_was_enabled = gc.isenabled()
# Disable gc to avoid bug where gc -> file_dealloc ->
# write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable()
try:
pid = os.fork()
if gc_was_enabled:
gc.enable()
if pid > 0: # If parent return False
os.waitpid(pid, 0)
return 0
except OSError, e:
sys.stderr.write("Daemonize failed: %s\n" % (e))
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if gc_was_enabled:
gc.enable()
if pid > 0: # If parent Exit
sys.exit(0)
except OSError, e:
sys.stderr.write("Daemonize failed: %s\n" % (e))
sys.exit(1)
if gc_was_enabled:
gc.enable()
sys.stdout.flush()
sys.stderr.flush()
return 1
stdin_path = os.path.join(pipe_root_path, "stdin")
stdout_path = os.path.join(pipe_root_path, "stdout")
stderr_path = os.path.join(pipe_root_path, "stderr")
results_path = os.path.join(pipe_root_path, "results")
inputs_path = os.path.join(pipe_root_path, "inputs")
for f in [stdin_path, stdout_path, stderr_path, results_path, inputs_path]:
try:
os.mkfifo(f)
except OSError, e:
if e.errno == 17:
pass
# Check for a pidfile to see if the daemon already runs
openers = is_file_open(stdout_path)
rundaemon = False
if len(openers) > 0:
for i in openers:
if i[1] & stat.S_IWUSR:
rundaemon = True
openers.remove(i)
if len(openers) > 0:
for i in openers:
os.kill(int(i[0]), 9)
time.sleep(0.3)
# Start the daemon
child = False
if not rundaemon:
child = daemonize()
if child == 0:
return (child,
inputs_path,
results_path,
stdin_path,
stdout_path,
stderr_path)
else:
signal.signal(signal.SIGIO, signal.SIG_DFL)
return (child,
results_path,
inputs_path,
stdin_path,
stdout_path,
stderr_path)
[docs]def create_process_cmd():
"""
Create child process without clean process data thanks that it is possible
call function and classes from child process.
"""
r_c, w_p = os.pipe()
r_p, w_c = os.pipe()
r_si, w_si = os.pipe()
r_so, w_so = os.pipe()
r_se, w_se = os.pipe()
gc_was_enabled = gc.isenabled()
# Disable gc to avoid bug where gc -> file_dealloc ->
# write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable()
pid = os.fork()
if pid == 0: # Child process
os.close(r_p)
os.close(w_p)
os.close(w_si)
os.close(r_so)
os.close(r_se)
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
sys.stdin = os.fdopen(r_si, 'r', 0)
sys.stdout = os.fdopen(w_so, 'w', 0)
sys.stderr = os.fdopen(w_se, 'w', 0)
if gc_was_enabled:
gc.enable()
return (0, r_c, w_c, None, None, None)
else:
os.close(r_c)
os.close(w_c)
os.close(r_si)
os.close(w_so)
os.close(w_se)
if gc_was_enabled:
gc.enable()
return (pid, r_p, w_p, w_si, r_so, r_se)
[docs]def gen_tmp_dir(root_path):
"""
Try to create tmp dir with special name.
"""
path = None
while (path is None or os.path.exists(path)):
rname = "runner" + "".join(random.sample(string.letters, 4))
path = os.path.join(root_path, rname)
try:
if not os.path.exists(path):
os.mkdir(path)
return path
except:
continue
[docs]def clean_tmp_dir(path):
"""
Clean up directory.
"""
shutil.rmtree(path, True)
[docs]def sort_fds_event(fds):
hup = [x[0] for x in fds if x[1] & select.POLLHUP]
read = [x[0] for x in fds if x[1] & select.POLLIN]
write = [x[0] for x in fds if x[1] & select.POLLOUT]
return hup, read, write
[docs]def close_unused_fds(fds):
"""
Close all file descriptors which are not necessary anymore.
:param fds: file descriptors
:type fds: list []
"""
for fd in fds:
os.close(fd)
[docs]class CmdFinish(object):
"""
Class used for communication with child process. This class
"""
__slots__ = ["pid"]
def __init__(self, parent=False):
if not parent:
self.pid = os.getpid()
else:
self.pid = os.getppid()
self.pid = self.pid
[docs]class CmdSlave(object):
"""
Representation of BaseCmd on slave side.
"""
def __init__(self, baseCmd):
"""
:param baseCmd: basecmd for encapsulation.
"""
self.basecmd = baseCmd
self.cmd_id = baseCmd.cmd_id
self.obj = None
self.pid = None
self.r_pipe = None
self.w_pipe = None
self.stdin_pipe = None
self.stdout_pipe = None
self.stderr_pipe = None
self.async = False
self.nohup = False
self.manage = False
self.msg = None
[docs] def close_pipes(self):
"""
Close command communication pipe.
"""
if self.r_pipe is not None:
os.close(self.r_pipe)
if self.w_pipe is not None:
os.close(self.w_pipe)
if self.stdin_pipe is not None:
os.close(self.stdin_pipe)
if self.stdout_pipe is not None:
os.close(self.stdout_pipe)
if self.stderr_pipe is not None:
os.close(self.stderr_pipe)
[docs] def parse_func_name(self, func_name, commander):
"""
Parse name sended from master.
format: ``["manage|async|nohup| ", "fnname1", "fnname2", ...]``
:param func_name: Function name
:param commander: Where to execute the command (remote or local)
"""
if func_name[0] == "manage": # start command in main process.
self.manage = True
func_name = func_name[1:]
if func_name[0] == "async": # start command in new process.
self.async = True
func_name = func_name[1:]
if func_name[0] == "nohup": # start command in new daemon process.
self.nohup = True
func_name = func_name[1:]
if hasattr(commander, func_name[0]):
obj = getattr(commander, func_name[0])
elif func_name[0] in commander.globals:
obj = commander.globals[func_name[0]]
elif func_name[0] in commander.locals:
obj = commander.locals[func_name[0]]
else:
obj = globals()[func_name[0]]
if len(func_name) > 1:
for name in func_name[1:]:
obj = getattr(obj, name)
return obj
def __call__(self, commander):
"""
Call command cmd(*args, **kargs)
"""
self.obj = self.parse_func_name(self.basecmd.func, commander)
if self.manage: # start command in main process
self.basecmd.results = self.obj(*self.basecmd.args,
**self.basecmd.kargs)
self.basecmd._finished = True
self.finish(commander)
elif self.async: # start command in new process
self.basecmd.results = self.__call_async__(commander)
self.basecmd._async = True
elif self.nohup: # start command in new daemon process
if self.basecmd.cmd_hash is None:
self.basecmd.cmd_hash = gen_tmp_dir("/tmp")
self.basecmd.results = self.__call_nohup__(commander)
self.basecmd._async = True
else: # start command in new process but wait for input.
self.basecmd.results = self.__call_async__(commander)
def __call_async__(self, commander):
(self.pid, self.r_pipe, self.w_pipe, self.stdin_pipe,
self.stdout_pipe, self.stderr_pipe) = create_process_cmd()
if self.pid == 0: # Child process make commands
commander._close_cmds_stdios(self)
self.msg = ms.Messenger(ms.StdIOWrapperIn(self.r_pipe),
ms.StdIOWrapperOut(self.w_pipe))
try:
self.basecmd.results = self.obj(*self.basecmd.args,
**self.basecmd.kargs)
except Exception:
err_msg = traceback.format_exc()
self.msg.write_msg(remote_interface.CmdTraceBack(err_msg))
sys.exit(-1)
finally:
self.msg.write_msg(self.basecmd.results)
self.msg.write_msg(CmdFinish())
sys.exit(0)
else: # Parent process create communication interface to child process
self.msg = ms.Messenger(ms.StdIOWrapperIn(self.r_pipe),
ms.StdIOWrapperOut(self.w_pipe))
def __call_nohup__(self, commander):
(pid, self.r_path, self.w_path, self.stdin_path, self.stdout_path,
self.stderr_path) = daemonize(self.basecmd.cmd_hash)
if pid == 1: # Child process make commands
commander._close_cmds_stdios(self)
(self.pid, r_pipe, w_pipe, stdin_pipe,
stdout_pipe, stderr_pipe) = create_process_cmd()
if self.pid == 0: # Child process make commands
self.msg = ms.Messenger(ms.StdIOWrapperIn(r_pipe),
ms.StdIOWrapperOut(w_pipe))
try:
self.basecmd.results = self.obj(*self.basecmd.args,
**self.basecmd.kargs)
except Exception:
err_msg = traceback.format_exc()
self.msg.write_msg(remote_interface.CmdTraceBack(err_msg))
sys.exit(-1)
finally:
self.msg.write_msg(self.basecmd.results)
sys.exit(0)
else:
# helper child process open communication pipes.
# This process is able to manage problem with connection width
# main parent process. It allows start unchanged child process.
self.r_pipe = os.open(self.r_path, os.O_RDONLY)
self.w_pipe = os.open(self.w_path, os.O_WRONLY)
sys.stdout = os.fdopen(os.open(self.stdout_path, os.O_WRONLY),
"w",
0)
sys.stderr = os.fdopen(os.open(self.stderr_path, os.O_WRONLY),
"w",
0)
sys.stdin = os.fdopen(os.open(self.stdin_path, os.O_RDONLY),
"r",
0)
w_fds = [r_pipe, w_pipe, stdin_pipe, stdout_pipe, stderr_pipe]
m_fds = [self.r_pipe,
self.w_pipe,
sys.stdin.fileno(),
sys.stdout.fileno(),
sys.stderr.fileno()]
p = select.poll()
p.register(r_pipe)
p.register(w_pipe)
# p.register(stdin_pipe)
p.register(stdout_pipe)
p.register(stderr_pipe)
p.register(self.r_pipe)
# p.register(self.w_pipe)
p.register(sys.stdin.fileno())
# p.register(sys.stdout.fileno())
# p.register(sys.stderr.fileno())
io_map = {r_pipe: self.w_pipe,
self.r_pipe: w_pipe,
sys.stdin.fileno(): stdin_pipe,
stdout_pipe: sys.stdout.fileno(),
stderr_pipe: sys.stderr.fileno()}
while 1:
d = p.poll()
w_ev = [x for x in d if x[0] in w_fds]
m_ev = [x for x in d if x[0] in m_fds]
w_hup, w_read, _ = sort_fds_event(w_ev)
m_hup, m_read, _ = sort_fds_event(m_ev)
if m_hup:
time.sleep(0.1)
if w_hup: # child process finished
for r in w_read:
data = os.read(r, 16384)
os.write(io_map[r], data)
break
for r in w_read:
data = os.read(r, 16384)
os.write(io_map[r], data)
for r in m_read:
data = os.read(r, 16384)
os.write(io_map[r], data)
self.msg = ms.Messenger(ms.StdIOWrapperIn(self.r_pipe),
ms.StdIOWrapperOut(self.w_pipe))
self.msg.write_msg(CmdFinish())
exit(0)
else: # main process open communication named pipes.
self.w_pipe = os.open(self.w_path, os.O_WRONLY)
self.r_pipe = os.open(self.r_path, os.O_RDONLY)
self.stdout_pipe = os.open(self.stdout_path, os.O_RDONLY)
self.stderr_pipe = os.open(self.stderr_path, os.O_RDONLY)
self.stdin_pipe = os.open(self.stdin_path, os.O_WRONLY)
self.msg = ms.Messenger(ms.StdIOWrapperIn(self.r_pipe),
ms.StdIOWrapperOut(self.w_pipe))
[docs] def work(self):
"""
Wait for message from running child process
"""
succ, msg = self.msg.read_msg()
if isinstance(msg, CmdFinish):
try:
pid, _ = os.waitpid(msg.pid, 0)
except OSError:
pid = msg.pid
if (succ is False or pid == msg.pid):
self.basecmd._finished = True
return True
else:
return False
else:
self.basecmd.results = msg
[docs] def recover_paths(self):
"""
Helper function for reconnect to daemon/nohup process.
"""
self.stdin_path = os.path.join(self.basecmd.cmd_hash, "stdin")
self.stdout_path = os.path.join(self.basecmd.cmd_hash, "stdout")
self.stderr_path = os.path.join(self.basecmd.cmd_hash, "stderr")
self.w_path = os.path.join(self.basecmd.cmd_hash, "results")
self.r_path = os.path.join(self.basecmd.cmd_hash, "inputs")
[docs] def recover_fds(self):
"""
Helper function for reconnect to daemon/nohup process.
"""
if self.r_pipe is None:
self.recover_paths()
self.w_pipe = os.open(self.w_path, os.O_WRONLY)
self.r_pipe = os.open(self.r_path, os.O_RDONLY)
self.stdin_pipe = os.open(self.stdin_path, os.O_WRONLY)
self.stdout_pipe = os.open(self.stdout_path, os.O_RDONLY)
self.stderr_pipe = os.open(self.stderr_path, os.O_RDONLY)
self.msg = ms.Messenger(ms.StdIOWrapperIn(self.r_pipe),
ms.StdIOWrapperOut(self.w_pipe))
[docs] def finish(self, commander):
"""
Remove cmd from commander commands on finish of process.
"""
self.close_pipes()
if self.basecmd.cmd_hash:
clean_tmp_dir(self.basecmd.cmd_hash)
self.basecmd.cmd_hash = None
del commander.cmds[self.cmd_id]
[docs]class CommanderSlave(ms.Messenger):
"""
Class commander slace is responsible for communication with commander
master. It invoke commands to slave part and receive messages from them.
For communication is used only stdin and stdout which are streams from
slave part.
"""
def __init__(self, stdin, stdout, o_stdout, o_stderr):
super(CommanderSlave, self).__init__(stdin, stdout)
self._exit = False
self.cmds = {}
self.globals = {}
self.locals = {}
self.o_stdout = o_stdout
self.o_stderr = o_stderr
[docs] def cmd_loop(self):
"""
Wait for commands from master and receive results and outputs from
commands.
"""
try:
while (not self._exit):
stdios = [self.stdin, self.o_stdout, self.o_stderr]
r_pipes = [cmd.r_pipe for cmd in self.cmds.values()
if cmd.r_pipe is not None]
stdouts = [cmd.stdout_pipe for cmd in self.cmds.values()
if cmd.stdout_pipe is not None]
stderrs = [cmd.stderr_pipe for cmd in self.cmds.values()
if cmd.stderr_pipe is not None]
r, _, _ = select.select(stdios + r_pipes + stdouts + stderrs, [], [])
if self.stdin in r: # command from controller
cmd = CmdSlave(self.read_msg()[1])
self.cmds[cmd.cmd_id] = cmd
try:
cmd(self)
self.write_msg(cmd.basecmd)
except Exception:
err_msg = traceback.format_exc()
self.write_msg(remote_interface.CommanderError(err_msg))
if self.o_stdout in r: # Send message from stdout
msg = os.read(self.o_stdout, 16384)
self.write_msg(remote_interface.StdOut(msg))
if self.o_stderr in r: # Send message from stdout
msg = os.read(self.o_stderr, 16384)
self.write_msg(remote_interface.StdErr(msg))
# test all commands for io
for cmd in self.cmds.values():
if cmd.stdout_pipe in r: # command stdout
data = os.read(cmd.stdout_pipe, 16384)
if data != "": # pipe is not closed on another side.
self.write_msg(remote_interface.StdOut(data,
cmd.cmd_id))
else:
os.close(cmd.stdout_pipe)
cmd.stdout_pipe = None
if cmd.stderr_pipe in r: # command stderr
data = os.read(cmd.stderr_pipe, 16384)
if data != "": # pipe is not closed on another side.
self.write_msg(remote_interface.StdErr(data,
cmd.cmd_id))
else:
os.close(cmd.stderr_pipe)
cmd.stderr_pipe = None
if cmd.r_pipe in r: # command results
if cmd.work():
cmd.finish(self)
self.write_msg(cmd.basecmd)
except Exception:
err_msg = traceback.format_exc()
self.write_msg(remote_interface.CommanderError(err_msg))
def _close_cmds_stdios(self, exclude_cmd):
for cmd in self.cmds.values():
if cmd is not exclude_cmd:
cmd.close_pipes()
[docs]class CommanderSlaveCmds(CommanderSlave):
"""
Class extends CommanderSlave and adds to them special commands like
shell process, interactive python, send_msg to cmd.
"""
def __init__(self, stdin, stdout, o_stdout, o_stderr):
super(CommanderSlaveCmds, self).__init__(stdin, stdout,
o_stdout, o_stderr)
while (1):
succ, data = self.read_msg()
if succ and data == "start":
break
self.write_msg("Started")
[docs] def shell(self, cmd):
"""
Starts shell process. Stdout is automatically copyed to basecmd.stdout
:param cmd: Command which should be started.
:return: basecmd with return code of cmd.
"""
process = subprocess.Popen(cmd,
shell=True,
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr)
return process.wait()
[docs] def interactive(self):
"""
Starts interactive python.
"""
while 1:
out = raw_input()
if out == "":
return
try:
exec out
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
print "On Guest exception from: \n" + "".join(
traceback.format_exception(exc_type,
exc_value,
exc_traceback))
print "FAIL: Guest command exception."
[docs] def send_msg(self, msg, cmd_id):
"""
Send msg to cmd with id == cmd_id
:param msg: message passed to cmd over the stdin
:type msg: str
:param cmd_id: id of cmd.
"""
os.write(self.cmds[cmd_id].stdin_pipe, msg)
[docs] def register_cmd(self, basecmd, basecmd_cmd_id):
"""
Second side of set_commander cmd from master. It register existing
cmd to CommandSlave dict.
:param basecmd: cmd which should be added to CommandSlave dict
:type basecmd: BaseCmd
:param basecmd_cmd_id: number under which should be stored
:type basecmd_cmd_id: int
"""
remote_interface.BaseCmd.single_cmd_id = basecmd_cmd_id
cmd = CmdSlave(basecmd)
self.cmds[basecmd.cmd_id] = cmd
if cmd.basecmd.cmd_hash is not None:
cmd.recover_fds()
return basecmd
[docs] def add_function(self, f_code):
"""
Adds function to client code.
:param f_code: Code of function.
:type f_code: str.
"""
exec(f_code, globals(), globals())
[docs] def copy_file(self, name, path, content):
"""
Really naive implementation of copping files. Should be used only for
short files.
"""
f = open(os.path.join(path, name), "w")
f.write(content)
f.close()
[docs] def import_src(self, name, path=None):
"""
Import file to running python session.
"""
if path:
if path not in sys.path:
sys.path.append(path)
mod = __import__(name, globals(), locals())
globals()[name] = mod
sys.modules[name] = mod
[docs] def exit(self):
"""
Method for killing command slave.
"""
self._exit = True
return "bye"
[docs]def remote_agent(in_stream_cls, out_stream_cls):
"""
Connect file descriptors to right pipe and start slave command loop.
When something happend it raise exception which could be caught by cmd
master.
:params in_stream_cls: Class encapsulated input stream.
:params out_stream_cls: Class encapsulated output stream.
"""
try:
fd_stdout = sys.stdout.fileno()
fd_stderr = sys.stderr.fileno()
fd_stdin = sys.stdin.fileno()
soutr, soutw = os.pipe()
serrr, serrw = os.pipe()
sys.stdout = os.fdopen(soutw, 'w', 0)
sys.stderr = os.fdopen(serrw, 'w', 0)
os.write(fd_stdout, "#")
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
w_stdin = None
w_stdout = out_stream_cls(fd_stdout)
w_stdin = in_stream_cls(fd_stdin)
cmd = CommanderSlaveCmds(w_stdin,
w_stdout,
soutr,
serrr)
cmd.cmd_loop()
except SystemExit:
pass
except:
e = traceback.format_exc()
sys.stderr.write(e)
# traceback.print_exc()
if __name__ == '__main__':
if len(sys.argv) > 1:
if sys.argv[1] == "agent":
remote_agent(ms.StdIOWrapperIn, ms.StdIOWrapperOut)
elif sys.argv[1] == "agent_base64":
remote_agent(ms.StdIOWrapperInBase64, ms.StdIOWrapperOutBase64)