Blob Blame History Raw
#
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.

# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#

import _io
import os
import sys
import time
import logging
from threading import Condition
try:
    import _thread as thread
except ImportError:
    import thread
try:
    from queue import Queue
except ImportError:
    from Queue import Queue
try:
    import cPickle as pickle
except ImportError:
    import pickle

from syncdutils import Thread, select, lf

pickle_proto = 2
repce_version = 1.0


def ioparse(i, o):
    if isinstance(i, int):
        i = os.fdopen(i, 'rb')
    # rely on duck typing for recognizing
    # streams as that works uniformly
    # in py2 and py3
    if hasattr(o, 'fileno'):
        o = o.fileno()
    return (i, o)


def send(out, *args):
    """pickle args and write out wholly in one syscall

    ie. not use the ability of pickle to dump directly to
    a stream, as that would potentially mess up messages
    by interleaving them
    """
    os.write(out, pickle.dumps(args, pickle_proto))


def recv(inf):
    """load an object from input stream
    python2 and python3 compatibility, inf is sys.stdin
    and is opened as text stream by default. Hence using the
    buffer attribute
    """
    if isinstance(inf, _io.TextIOWrapper):
        return pickle.load(inf.buffer)
    else:
        return pickle.load(inf)


class RepceServer(object):

    """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce

    ... also our homebrewed RPC backend where the transport layer is
    reduced to a pair of filehandles.

    This is the server component.
    """

    def __init__(self, obj, i, o, wnum=6):
        """register a backend object .obj to which incoming messages
           are dispatched, also incoming/outcoming streams
        """
        self.obj = obj
        self.inf, self.out = ioparse(i, o)
        self.wnum = wnum
        self.q = Queue()

    def service_loop(self):
        """fire up worker threads, get messages and dispatch among them"""
        for i in range(self.wnum):
            t = Thread(target=self.worker)
            t.start()
        try:
            while True:
                self.q.put(recv(self.inf))
        except EOFError:
            logging.info("terminating on reaching EOF.")

    def worker(self):
        """life of a worker

        Get message, extract its id, method name and arguments
        (kwargs not supported), call method on .obj.
        Send back message id + return value.
        If method call throws an exception, rescue it, and send
        back the exception as result (with flag marking it as
        exception).
        """
        while True:
            in_data = self.q.get(True)
            rid = in_data[0]
            rmeth = in_data[1]
            exc = False
            if rmeth == '__repce_version__':
                res = repce_version
            else:
                try:
                    res = getattr(self.obj, rmeth)(*in_data[2:])
                except:
                    res = sys.exc_info()[1]
                    exc = True
                    logging.exception("call failed: ")
            send(self.out, rid, exc, res)


class RepceJob(object):

    """class representing message status we can use
    for waiting on reply"""

    def __init__(self, cbk):
        """
        - .rid: (process-wise) unique id
        - .cbk: what we do upon receiving reply
        """
        self.rid = (os.getpid(), thread.get_ident(), time.time())
        self.cbk = cbk
        self.lever = Condition()
        self.done = False

    def __repr__(self):
        return ':'.join([str(x) for x in self.rid])

    def wait(self):
        self.lever.acquire()
        if not self.done:
            self.lever.wait()
        self.lever.release()
        return self.result

    def wakeup(self, data):
        self.result = data
        self.lever.acquire()
        self.done = True
        self.lever.notify()
        self.lever.release()


class RepceClient(object):

    """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce

    ... also our homebrewed RPC backend where the transport layer is
    reduced to a pair of filehandles.

    This is the client component.
    """

    def __init__(self, i, o):
        self.inf, self.out = ioparse(i, o)
        self.jtab = {}
        t = Thread(target=self.listen)
        t.start()

    def listen(self):
        while True:
            select((self.inf,), (), ())
            rid, exc, res = recv(self.inf)
            rjob = self.jtab.pop(rid)
            if rjob.cbk:
                rjob.cbk(rjob, [exc, res])

    def push(self, meth, *args, **kw):
        """wrap arguments in a RepceJob, send them to server
           and return the RepceJob

           @cbk to pass on RepceJob can be given as kwarg.
        """
        cbk = kw.get('cbk')
        if not cbk:
            def cbk(rj, res):
                if res[0]:
                    raise res[1]
        rjob = RepceJob(cbk)
        self.jtab[rjob.rid] = rjob
        logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
        send(self.out, rjob.rid, meth, *args)
        return rjob

    def __call__(self, meth, *args):
        """RePCe client is callabe, calling it implements a synchronous
        remote call.

        We do a .push with a cbk which does a wakeup upon receiving answer,
        then wait on the RepceJob.
        """
        rjob = self.push(
            meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
        exc, res = rjob.wait()
        if exc:
            logging.error(lf('call failed',
                             call=repr(rjob),
                             method=meth,
                             error=str(type(res).__name__)))
            raise res
        logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
        return res

    class mprx(object):

        """method proxy, standard trick to implement rubyesque
        method_missing in Python

        A class is a closure factory, you know what I mean, or go read
        some SICP.
        """

        def __init__(self, ins, meth):
            self.ins = ins
            self.meth = meth

        def __call__(self, *a):
            return self.ins(self.meth, *a)

    def __getattr__(self, meth):
        """this implements transparent method dispatch to remote object,
           so that you don't need to call the RepceClient instance like

             rclient('how_old_are_you_if_born_in', 1979)

           but you can make it into an ordinary method call like

             rclient.how_old_are_you_if_born_in(1979)
        """
        return self.mprx(self, meth)

    def __version__(self):
        """used in handshake to verify compatibility"""
        d = {'proto': self('__repce_version__')}
        try:
            d['object'] = self('version')
        except AttributeError:
            pass
        return d