#!/usr/bin/env python
'''
Created on Dec 6, 2013
:author: jzupka
'''
import os
import logging
import select
import cPickle
import time
import remote_interface
import cStringIO
import base64
[docs]class IOWrapper(object):
"""
Class encaptulates io opearation to be more consist in different
implementations. (stdio, sockets, etc..)
"""
def __init__(self, obj):
"""
:param obj: IO obj for example file decriptor.
"""
self._obj = obj
[docs] def close(self):
raise NotImplementedError()
[docs] def read(self, max_len, timeout=None):
"""
Read function should be reinmplemented as blocking reading from data
source when timeout is None and nonblocking for timeout is not None.
Implementation example StdIWrapper.
:params max_len: Max len of readed data.
:type max_len: int
:param timeout: Timeout of reading operation.
:type timeout: float
:return: Readed data.
"""
raise NotImplementedError()
[docs] def write(self, data):
"""
Write funciton should be implemented for object uded for writing.
:param data: Data to write.
:type data: str.
"""
raise NotImplementedError()
[docs] def fileno(self):
"""
Function should return file descriptor number. If object should be used
for standard io operation.
:return: File number.
"""
raise NotImplementedError()
def _wait_for_data(self, max_len, timeout):
"""
Wait for data for time == timeout.
:params max_len: Max len of readed data.
:type max_len: int
:param timeout: Timeout of reading operation.
:type timeout: float
:return: Readed data.
"""
r, _, _ = select.select([self.fileno()], [], [], timeout)
if r:
return self.read(max_len, None)
return None
[docs]class DataWrapper(object):
"""
Basic implementation of IOWrapper for stdio.
"""
[docs] def decode(self, data):
"""
Decodes the data which was read.
:return: decoded data.
"""
return data
[docs] def encode(self, data):
"""
Encode data.
:return: encoded data.
"""
return data
[docs]class DataWrapperBase64(DataWrapper):
"""
Basic implementation of IOWrapper for stdio.
"""
[docs] def decode(self, data):
return base64.b64decode(data)
[docs] def encode(self, data):
return base64.b64encode(data)
[docs]class StdIOWrapper(IOWrapper, DataWrapper):
"""
Basic implementation of IOWrapper for stdio.
"""
[docs] def close(self):
os.close(self._obj)
[docs] def fileno(self):
return self._obj
[docs]class StdIOWrapperIn(StdIOWrapper):
"""
Basic implementation of IOWrapper for stdin
"""
[docs] def read(self, max_len, timeout=None):
if timeout is not None:
return self._wait_for_data(max_len, timeout)
else:
return os.read(self._obj, max_len)
[docs]class StdIOWrapperOut(StdIOWrapper):
"""
Basic implementation of IOWrapper for stdout
"""
[docs] def write(self, data):
os.write(self._obj, data)
[docs]class StdIOWrapperInBase64(StdIOWrapperIn, DataWrapperBase64):
"""
Basic implementation of IOWrapper for stdin
"""
[docs]class StdIOWrapperOutBase64(StdIOWrapperOut, DataWrapperBase64):
"""
Basic implementation of IOWrapper for stdout
"""
[docs]class MessengerError(Exception):
def __init__(self, msg):
super(MessengerError, self).__init__(msg)
self.msg = msg
def __str__(self):
return "Messenger ERROR %s" % (self.msg)
def _map_path(mod_name, kls_name):
if mod_name.endswith('remote_interface'): # catch all old module names
mod = remote_interface
return getattr(mod, kls_name)
else:
mod = __import__(mod_name)
return getattr(mod, kls_name)
[docs]class Messenger(object):
"""
Class could be used for communication between two python process connected
by communication canal wrapped by IOWrapper class. Pickling is used
for communication and thus it is possible to communicate every picleable
object.
"""
def __init__(self, stdin, stdout):
"""
:params stdin: Object for read data from communication interface.
:type stdin: IOWrapper
:params stdout: Object for write data to communication interface.
:type stdout: IOWrapper
"""
self.stdin = stdin
self.stdout = stdout
# Unfortunately only static length of data length is supported.
self.enc_len_length = len(stdout.encode("0" * 10))
[docs] def close(self):
self.stdin.close()
self.stdout.close()
[docs] def flush_stdin(self):
"""
Flush all input data from communication interface.
"""
const = 16384
r, _, _ = select.select([self.stdin.fileno()], [], [], 1)
while r:
if len(self.stdin.read(const)) < const:
break
r, _, _ = select.select([self.stdin.fileno()], [], [], 1)
[docs] def write_msg(self, data):
"""
Write formated message to communication interface.
"""
self.stdout.write(self.format_msg(data))
def _read_until_len(self, timeout=None):
"""
Deal with terminal interfaces... Read input until gets string
contains " " and digits len(string) == 10
:param timeout: timeout of reading.
"""
data = ""
endtime = None
if timeout is not None:
endtime = time.time() + timeout
while (len(data) < self.enc_len_length and
(endtime is None or time.time() < endtime)):
d = self.stdin.read(1, timeout)
if d is None:
return None
if len(d) == 0:
return d
data += d
if len(data) < self.enc_len_length:
return None
return self.stdout.decode(data)
[docs] def read_msg(self, timeout=None):
"""
Read data from com interface.
:param timeout: timeout for reading data.
:type timeout: float
:return: (True, data) when reading is successful.
(False, None) when other side is closed.
(None, None) when reading is timeouted.
"""
data = self._read_until_len(timeout)
if data is None:
return (None, None)
if len(data) == 0:
return (False, None)
rdata = None
try:
cmd_len = int(data)
rdata = ""
rdata_len = 0
while (rdata_len < cmd_len):
rdata += self.stdin.read(cmd_len - rdata_len)
rdata_len = len(rdata)
rdataIO = cStringIO.StringIO(self.stdin.decode(rdata))
unp = cPickle.Unpickler(rdataIO)
unp.find_global = _map_path
data = unp.load()
except Exception, e:
logging.error("ERROR data:%s rdata:%s" % (data, rdata))
try:
self.write_msg(remote_interface.MessengerError("Communication "
"failed.%s" % (e)))
except OSError:
pass
self.flush_stdin()
raise
# Debugging commands.
# if (isinstance(data, remote_interface.BaseCmd)):
# print data.func
return (True, data)