diff options
-rw-r--r-- | ryu/contrib/tinyrpc/__init__.py | 6 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/client.py | 91 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/dispatch/__init__.py | 201 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/exc.py | 40 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/protocols/__init__.py | 173 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/protocols/jsonrpc.py | 291 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/server/__init__.py | 71 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/server/gevent.py | 13 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py | 115 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/__init__.py | 52 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/http.py | 31 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/tcp.py | 52 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/wsgi.py | 90 | ||||
-rw-r--r-- | ryu/contrib/tinyrpc/transports/zmq.py | 76 | ||||
-rw-r--r-- | tools/pip-requires | 1 |
15 files changed, 1302 insertions, 1 deletions
diff --git a/ryu/contrib/tinyrpc/__init__.py b/ryu/contrib/tinyrpc/__init__.py new file mode 100644 index 00000000..f24deb2e --- /dev/null +++ b/ryu/contrib/tinyrpc/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .protocols import * +from .exc import * +from .client import * diff --git a/ryu/contrib/tinyrpc/client.py b/ryu/contrib/tinyrpc/client.py new file mode 100644 index 00000000..0d77547f --- /dev/null +++ b/ryu/contrib/tinyrpc/client.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .exc import RPCError + + +class RPCClient(object): + """Client for making RPC calls to connected servers. + + :param protocol: An :py:class:`~tinyrpc.RPCProtocol` instance. + :param transport: A :py:class:`~tinyrpc.transports.ClientTransport` + instance. + """ + + def __init__(self, protocol, transport): + self.protocol = protocol + self.transport = transport + + def _send_and_handle_reply(self, req): + # sends and waits for reply + reply = self.transport.send_message(req.serialize()) + + response = self.protocol.parse_reply(reply) + + if hasattr(response, 'error'): + raise RPCError('Error calling remote procedure: %s' %\ + response.error) + + return response + + def call(self, method, args, kwargs, one_way=False): + """Calls the requested method and returns the result. + + If an error occured, an :py:class:`~tinyrpc.exc.RPCError` instance + is raised. + + :param method: Name of the method to call. + :param args: Arguments to pass to the method. + :param kwargs: Keyword arguments to pass to the method. + :param one_way: Whether or not a reply is desired. + """ + req = self.protocol.create_request(method, args, kwargs, one_way) + + return self._send_and_handle_reply(req).result + + def get_proxy(self, prefix='', one_way=False): + """Convenience method for creating a proxy. + + :param prefix: Passed on to :py:class:`~tinyrpc.client.RPCProxy`. + :param one_way: Passed on to :py:class:`~tinyrpc.client.RPCProxy`. + :return: :py:class:`~tinyrpc.client.RPCProxy` instance.""" + return RPCProxy(self, prefix, one_way) + + def batch_call(self, calls): + """Experimental, use at your own peril.""" + req = self.protocol.create_batch_request() + + for call_args in calls: + req.append(self.protocol.create_request(*call_args)) + + return self._send_and_handle_reply(req) + + +class RPCProxy(object): + """Create a new remote proxy object. + + Proxies allow calling of methods through a simpler interface. See the + documentation for an example. + + :param client: An :py:class:`~tinyrpc.client.RPCClient` instance. + :param prefix: Prefix to prepend to every method name. + :param one_way: Passed to every call of + :py:func:`~tinyrpc.client.call`. + """ + + def __init__(self, client, prefix='', one_way=False): + self.client = client + self.prefix = prefix + self.one_way = one_way + + def __getattr__(self, name): + """Returns a proxy function that, when called, will call a function + name ``name`` on the client associated with the proxy. + """ + proxy_func = lambda *args, **kwargs: self.client.call( + self.prefix + name, + args, + kwargs, + one_way=self.one_way + ) + return proxy_func diff --git a/ryu/contrib/tinyrpc/dispatch/__init__.py b/ryu/contrib/tinyrpc/dispatch/__init__.py new file mode 100644 index 00000000..ec722e4c --- /dev/null +++ b/ryu/contrib/tinyrpc/dispatch/__init__.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import inspect + +from ..exc import * + + +def public(name=None): + """Set RPC name on function. + + This function decorator will set the ``_rpc_public_name`` attribute on a + function, causing it to be picked up if an instance of its parent class is + registered using + :py:func:`~tinyrpc.dispatch.RPCDispatcher.register_instance`. + + ``@public`` is a shortcut for ``@public()``. + + :param name: The name to register the function with. + """ + # called directly with function + if callable(name): + f = name + f._rpc_public_name = f.__name__ + return f + + def _(f): + f._rpc_public_name = name or f.__name__ + return f + + return _ + + +class RPCDispatcher(object): + """Stores name-to-method mappings.""" + + def __init__(self): + self.method_map = {} + self.subdispatchers = {} + + def add_subdispatch(self, dispatcher, prefix=''): + """Adds a subdispatcher, possibly in its own namespace. + + :param dispatcher: The dispatcher to add as a subdispatcher. + :param prefix: A prefix. All of the new subdispatchers methods will be + available as prefix + their original name. + """ + self.subdispatchers.setdefault(prefix, []).append(dispatcher) + + def add_method(self, f, name=None): + """Add a method to the dispatcher. + + :param f: Callable to be added. + :param name: Name to register it with. If ``None``, ``f.__name__`` will + be used. + """ + assert callable(f), "method argument must be callable" + # catches a few programming errors that are + # commonly silently swallowed otherwise + if not name: + name = f.__name__ + + if name in self.method_map: + raise RPCError('Name %s already registered') + + self.method_map[name] = f + + def dispatch(self, request): + """Fully handle request. + + The dispatch method determines which method to call, calls it and + returns a response containing a result. + + No exceptions will be thrown, rather, every exception will be turned + into a response using :py:func:`~tinyrpc.RPCRequest.error_respond`. + + If a method isn't found, a :py:exc:`~tinyrpc.exc.MethodNotFoundError` + response will be returned. If any error occurs outside of the requested + method, a :py:exc:`~tinyrpc.exc.ServerError` without any error + information will be returend. + + If the method is found and called but throws an exception, the + exception thrown is used as a response instead. This is the only case + in which information from the exception is possibly propagated back to + the client, as the exception is part of the requested method. + + :py:class:`~tinyrpc.RPCBatchRequest` instances are handled by handling + all its children in order and collecting the results, then returning an + :py:class:`~tinyrpc.RPCBatchResponse` with the results. + + :param request: An :py:func:`~tinyrpc.RPCRequest`. + :return: An :py:func:`~tinyrpc.RPCResponse`. + """ + if hasattr(request, 'create_batch_response'): + results = [self._dispatch(req) for req in request] + + response = request.create_batch_response() + if response != None: + response.extend(results) + + return response + else: + return self._dispatch(request) + + def _dispatch(self, request): + try: + try: + method = self.get_method(request.method) + except KeyError as e: + return request.error_respond(MethodNotFoundError(e)) + + # we found the method + try: + result = method(*request.args, **request.kwargs) + except Exception as e: + # an error occured within the method, return it + return request.error_respond(e) + + # respond with result + return request.respond(result) + except Exception as e: + # unexpected error, do not let client know what happened + return request.error_respond(ServerError()) + + def get_method(self, name): + """Retrieve a previously registered method. + + Checks if a method matching ``name`` has been registered. + + If :py:func:`get_method` cannot find a method, every subdispatcher + with a prefix matching the method name is checked as well. + + If a method isn't found, a :py:class:`KeyError` is thrown. + + :param name: Callable to find. + :param return: The callable. + """ + if name in self.method_map: + return self.method_map[name] + + for prefix, subdispatchers in self.subdispatchers.iteritems(): + if name.startswith(prefix): + for sd in subdispatchers: + try: + return sd.get_method(name[len(prefix):]) + except KeyError: + pass + + raise KeyError(name) + + def public(self, name=None): + """Convenient decorator. + + Allows easy registering of functions to this dispatcher. Example: + + .. code-block:: python + + dispatch = RPCDispatcher() + + @dispatch.public + def foo(bar): + # ... + + class Baz(object): + def not_exposed(self): + # ... + + @dispatch.public(name='do_something') + def visible_method(arg1) + # ... + + :param name: Name to register callable with + """ + if callable(name): + self.add_method(name) + return name + + def _(f): + self.add_method(f, name=name) + return f + + return _ + + def register_instance(self, obj, prefix=''): + """Create new subdispatcher and register all public object methods on + it. + + To be used in conjunction with the :py:func:`tinyrpc.dispatch.public` + decorator (*not* :py:func:`tinyrpc.dispatch.RPCDispatcher.public`). + + :param obj: The object whose public methods should be made available. + :param prefix: A prefix for the new subdispatcher. + """ + dispatch = self.__class__() + for name, f in inspect.getmembers( + obj, lambda f: callable(f) and hasattr(f, '_rpc_public_name') + ): + dispatch.add_method(f, f._rpc_public_name) + + # add to dispatchers + self.add_subdispatch(dispatch, prefix) diff --git a/ryu/contrib/tinyrpc/exc.py b/ryu/contrib/tinyrpc/exc.py new file mode 100644 index 00000000..0c57284f --- /dev/null +++ b/ryu/contrib/tinyrpc/exc.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +class RPCError(Exception): + """Base class for all excetions thrown by :py:mod:`tinyrpc`.""" + + +class BadRequestError(RPCError): + """Base class for all errors that caused the processing of a request to + abort before a request object could be instantiated.""" + + def error_respond(self): + """Create :py:class:`~tinyrpc.RPCErrorResponse` to respond the error. + + :return: A error responce instance or ``None``, if the protocol decides + to drop the error silently.""" + raise RuntimeError('Not implemented') + + +class BadReplyError(RPCError): + """Base class for all errors that caused processing of a reply to abort + before it could be turned in a response object.""" + + +class InvalidRequestError(BadRequestError): + """A request made was malformed (i.e. violated the specification) and could + not be parsed.""" + + +class InvalidReplyError(BadReplyError): + """A reply received was malformed (i.e. violated the specification) and + could not be parsed into a response.""" + + +class MethodNotFoundError(RPCError): + """The desired method was not found.""" + + +class ServerError(RPCError): + """An internal error in the RPC system occured.""" diff --git a/ryu/contrib/tinyrpc/protocols/__init__.py b/ryu/contrib/tinyrpc/protocols/__init__.py new file mode 100644 index 00000000..9ad55b9e --- /dev/null +++ b/ryu/contrib/tinyrpc/protocols/__init__.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python + +from ..exc import * + +class RPCRequest(object): + unique_id = None + """A unique ID to remember the request by. Protocol specific, may or + may not be set. This value should only be set by + :py:func:`~tinyrpc.RPCProtocol.create_request`. + + The ID allows client to receive responses out-of-order and still allocate + them to the correct request. + + Only supported if the parent protocol has + :py:attr:`~tinyrpc.RPCProtocol.supports_out_of_order` set to ``True``. + """ + + method = None + """The name of the method to be called.""" + + args = [] + """The positional arguments of the method call.""" + + kwargs = {} + """The keyword arguments of the method call.""" + + def error_respond(self, error): + """Creates an error response. + + Create a response indicating that the request was parsed correctly, + but an error has occured trying to fulfill it. + + :param error: An exception or a string describing the error. + + :return: A response or ``None`` to indicate that no error should be sent + out. + """ + raise NotImplementedError() + + def respond(self, result): + """Create a response. + + Call this to return the result of a successful method invocation. + + This creates and returns an instance of a protocol-specific subclass of + :py:class:`~tinyrpc.RPCResponse`. + + :param result: Passed on to new response instance. + + :return: A response or ``None`` to indicate this request does not expect a + response. + """ + raise NotImplementedError() + + def serialize(self): + """Returns a serialization of the request. + + :return: A string to be passed on to a transport. + """ + raise NotImplementedError() + + +class RPCBatchRequest(list): + """Multiple requests batched together. + + A batch request is a subclass of :py:class:`list`. Protocols that support + multiple requests in a single message use this to group them together. + + Handling a batch requests is done in any order, responses must be gathered + in a batch response and be in the same order as their respective requests. + + Any item of a batch request is either a request or a subclass of + :py:class:`~tinyrpc.BadRequestError`, which indicates that there has been + an error in parsing the request. + """ + + def create_batch_response(self): + """Creates a response suitable for responding to this request. + + :return: An :py:class:`~tinyrpc.RPCBatchResponse` or ``None``, if no + response is expected.""" + raise NotImplementedError() + + def serialize(self): + raise NotImplementedError() + + +class RPCResponse(object): + """RPC call response class. + + Base class for all deriving responses. + + Has an attribute ``result`` containing the result of the RPC call, unless + an error occured, in which case an attribute ``error`` will contain the + error message.""" + + unique_id = None + + def serialize(self): + """Returns a serialization of the response. + + :return: A reply to be passed on to a transport. + """ + raise NotImplementedError() + + +class RPCErrorResponse(RPCResponse): + pass + + +class RPCBatchResponse(list): + """Multiple response from a batch request. See + :py:class:`~tinyrpc.RPCBatchRequest` on how to handle. + + Items in a batch response need to be + :py:class:`~tinyrpc.RPCResponse` instances or None, meaning no reply should + generated for the request. + """ + + def serialize(self): + """Returns a serialization of the batch response.""" + raise NotImplementedError() + + +class RPCProtocol(object): + """Base class for all protocol implementations.""" + + supports_out_of_order = False + """If true, this protocol can receive responses out of order correctly. + + Note that this usually depends on the generation of unique_ids, the + generation of these may or may not be thread safe, depending on the + protocol. Ideally, only once instance of RPCProtocol should be used per + client.""" + + def create_request(self, method, args=None, kwargs=None, one_way=False): + """Creates a new RPCRequest object. + + It is up to the implementing protocol whether or not ``args``, + ``kwargs``, one of these, both at once or none of them are supported. + + :param method: The method name to invoke. + :param args: The positional arguments to call the method with. + :param kwargs: The keyword arguments to call the method with. + :param one_way: The request is an update, i.e. it does not expect a + reply. + :return: A new :py:class:`~tinyrpc.RPCRequest` instance. + """ + raise NotImplementedError() + + def parse_request(self, data): + """Parses a request given as a string and returns an + :py:class:`RPCRequest` instance. + + :return: An instanced request. + """ + raise NotImplementedError() + + def parse_reply(self, data): + """Parses a reply and returns an :py:class:`RPCResponse` instance. + + :return: An instanced response. + """ + raise NotImplementedError() + + +class RPCBatchProtocol(RPCProtocol): + def create_batch_request(self, requests=None): + """Create a new :py:class:`tinyrpc.RPCBatchRequest` object. + + :param requests: A list of requests. + """ + raise NotImplementedError() diff --git a/ryu/contrib/tinyrpc/protocols/jsonrpc.py b/ryu/contrib/tinyrpc/protocols/jsonrpc.py new file mode 100644 index 00000000..941da51d --- /dev/null +++ b/ryu/contrib/tinyrpc/protocols/jsonrpc.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .. import RPCBatchProtocol, RPCRequest, RPCResponse, RPCErrorResponse,\ + InvalidRequestError, MethodNotFoundError, ServerError,\ + InvalidReplyError, RPCError, RPCBatchRequest, RPCBatchResponse + +import json + +class FixedErrorMessageMixin(object): + def __init__(self, *args, **kwargs): + if not args: + args = [self.message] + super(FixedErrorMessageMixin, self).__init__(*args, **kwargs) + + def error_respond(self): + response = JSONRPCErrorResponse() + + response.error = self.message + response.unique_id = None + response._jsonrpc_error_code = self.jsonrpc_error_code + return response + + + +class JSONRPCParseError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32700 + message = 'Parse error' + + +class JSONRPCInvalidRequestError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32600 + message = 'Invalid Request' + + +class JSONRPCMethodNotFoundError(FixedErrorMessageMixin, MethodNotFoundError): + jsonrpc_error_code = -32601 + message = 'Method not found' + + +class JSONRPCInvalidParamsError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32602 + message = 'Invalid params' + + +class JSONRPCInternalError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32603 + message = 'Internal error' + + +class JSONRPCServerError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32000 + message = '' + + +class JSONRPCSuccessResponse(RPCResponse): + def _to_dict(self): + return { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'id': self.unique_id, + 'result': self.result, + } + + def serialize(self): + return json.dumps(self._to_dict()) + + +class JSONRPCErrorResponse(RPCErrorResponse): + def _to_dict(self): + return { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'id': self.unique_id, + 'error': { + 'message': str(self.error), + 'code': self._jsonrpc_error_code, + } + } + + def serialize(self): + return json.dumps(self._to_dict()) + + +def _get_code_and_message(error): + assert isinstance(error, (Exception, basestring)) + if isinstance(error, Exception): + if hasattr(error, 'jsonrpc_error_code'): + code = error.jsonrpc_error_code + msg = str(error) + elif isinstance(error, InvalidRequestError): + code = JSONRPCInvalidRequestError.jsonrpc_error_code + msg = JSONRPCInvalidRequestError.message + elif isinstance(error, MethodNotFoundError): + code = JSONRPCMethodNotFoundError.jsonrpc_error_code + msg = JSONRPCMethodNotFoundError.message + else: + # allow exception message to propagate + code = JSONRPCServerError.jsonrpc_error_code + msg = str(error) + else: + code = -32000 + msg = error + + return code, msg + + +class JSONRPCRequest(RPCRequest): + def error_respond(self, error): + if not self.unique_id: + return None + + response = JSONRPCErrorResponse() + + code, msg = _get_code_and_message(error) + + response.error = msg + response.unique_id = self.unique_id + response._jsonrpc_error_code = code + return response + + def respond(self, result): + response = JSONRPCSuccessResponse() + + if not self.unique_id: + return None + + response.result = result + response.unique_id = self.unique_id + + return response + + def _to_dict(self): + jdata = { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'method': self.method, + } + if self.args: + jdata['params'] = self.args + if self.kwargs: + jdata['params'] = self.kwargs + if self.unique_id != None: + jdata['id'] = self.unique_id + return jdata + + def serialize(self): + return json.dumps(self._to_dict()) + + +class JSONRPCBatchRequest(RPCBatchRequest): + def create_batch_response(self): + if self._expects_response(): + return JSONRPCBatchResponse() + + def _expects_response(self): + for request in self: + if isinstance(request, Exception): + return True + if request.unique_id != None: + return True + + return False + + def serialize(self): + return json.dumps([req._to_dict() for req in self]) + + +class JSONRPCBatchResponse(RPCBatchResponse): + def serialize(self): + return json.dumps([resp._to_dict() for resp in self if resp != None]) + + +class JSONRPCProtocol(RPCBatchProtocol): + """JSONRPC protocol implementation. + + Currently, only version 2.0 is supported.""" + + JSON_RPC_VERSION = "2.0" + _ALLOWED_REPLY_KEYS = sorted(['id', 'jsonrpc', 'error', 'result']) + _ALLOWED_REQUEST_KEYS = sorted(['id', 'jsonrpc', 'method', 'params']) + + def __init__(self, *args, **kwargs): + super(JSONRPCProtocol, self).__init__(*args, **kwargs) + self._id_counter = 0 + + def _get_unique_id(self): + self._id_counter += 1 + return self._id_counter + + def create_batch_request(self, requests=None): + return JSONRPCBatchRequest(requests or []) + + def create_request(self, method, args=None, kwargs=None, one_way=False): + if args and kwargs: + raise InvalidRequestError('Does not support args and kwargs at '\ + 'the same time') + + request = JSONRPCRequest() + + if not one_way: + request.unique_id = self._get_unique_id() + + request.method = method + request.args = args + request.kwargs = kwargs + + return request + + def parse_reply(self, data): + try: + rep = json.loads(data) + except Exception as e: + raise InvalidReplyError(e) + + for k in rep.iterkeys(): + if not k in self._ALLOWED_REPLY_KEYS: + raise InvalidReplyError('Key not allowed: %s' % k) + + if not 'jsonrpc' in rep: + raise InvalidReplyError('Missing jsonrpc (version) in response.') + + if rep['jsonrpc'] != self.JSON_RPC_VERSION: + raise InvalidReplyError('Wrong JSONRPC version') + + if not 'id' in rep: + raise InvalidReplyError('Missing id in response') + + if ('error' in rep) == ('result' in rep): + raise InvalidReplyError( + 'Reply must contain exactly one of result and error.' + ) + + if 'error' in rep: + response = JSONRPCErrorResponse() + error = rep['error'] + response.error = error['message'] + response._jsonrpc_error_code = error['code'] + else: + response = JSONRPCSuccessResponse() + response.result = rep.get('result', None) + + response.unique_id = rep['id'] + + return response + + def parse_request(self, data): + try: + req = json.loads(data) + except Exception as e: + raise JSONRPCParseError() + + if isinstance(req, list): + # batch request + requests = JSONRPCBatchRequest() + for subreq in req: + try: + requests.append(self._parse_subrequest(subreq)) + except RPCError as e: + requests.append(e) + except Exception as e: + requests.append(JSONRPCInvalidRequestError()) + + if not requests: + raise JSONRPCInvalidRequestError() + return requests + else: + return self._parse_subrequest(req) + + def _parse_subrequest(self, req): + for k in req.iterkeys(): + if not k in self._ALLOWED_REQUEST_KEYS: + raise JSONRPCInvalidRequestError() + + if req.get('jsonrpc', None) != self.JSON_RPC_VERSION: + raise JSONRPCInvalidRequestError() + + if not isinstance(req['method'], basestring): + raise JSONRPCInvalidRequestError() + + request = JSONRPCRequest() + request.method = str(req['method']) + request.unique_id = req.get('id', None) + + params = req.get('params', None) + if params != None: + if isinstance(params, list): + request.args = req['params'] + elif isinstance(params, dict): + request.kwargs = req['params'] + else: + raise JSONRPCInvalidParamsError() + + return request diff --git a/ryu/contrib/tinyrpc/server/__init__.py b/ryu/contrib/tinyrpc/server/__init__.py new file mode 100644 index 00000000..6b2cc1ad --- /dev/null +++ b/ryu/contrib/tinyrpc/server/__init__.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# FIXME: needs unittests +# FIXME: needs checks for out-of-order, concurrency, etc as attributes +from tinyrpc.exc import RPCError + +class RPCServer(object): + """High level RPC server. + + :param transport: The :py:class:`~tinyrpc.transports.RPCTransport` to use. + :param protocol: The :py:class:`~tinyrpc.RPCProtocol` to use. + :param dispatcher: The :py:class:`~tinyrpc.dispatch.RPCDispatcher` to use. + """ + def __init__(self, transport, protocol, dispatcher): + self.transport = transport + self.protocol = protocol + self.dispatcher = dispatcher + + def serve_forever(self): + """Handle requests forever. + + Starts the server loop in which the transport will be polled for a new + message. + + After a new message has arrived, + :py:func:`~tinyrpc.server.RPCServer._spawn` is called with a handler + function and arguments to handle the request. + + The handler function will try to decode the message using the supplied + protocol, if that fails, an error response will be sent. After decoding + the message, the dispatcher will be asked to handle the resultung + request and the return value (either an error or a result) will be sent + back to the client using the transport. + + After calling :py:func:`~tinyrpc.server.RPCServer._spawn`, the server + will fetch the next message and repeat. + """ + while True: + context, message = self.transport.receive_message() + + # assuming protocol is threadsafe and dispatcher is theadsafe, as + # long as its immutable + + def handle_message(context, message): + try: + request = self.protocol.parse_request(message) + except RPCError as e: + response = e.error_respond() + else: + response = self.dispatcher.dispatch(request) + + # send reply + self.transport.send_reply(context, response.serialize()) + + self._spawn(handle_message, context, message) + + def _spawn(self, func, *args, **kwargs): + """Spawn a handler function. + + This function is overridden in subclasses to provide concurrency. + + In the base implementation, it simply calls the supplied function + ``func`` with ``*args`` and ``**kwargs``. This results in a + single-threaded, single-process, synchronous server. + + :param func: A callable to call. + :param args: Arguments to ``func``. + :param kwargs: Keyword arguments to ``func``. + """ + func(*args, **kwargs) diff --git a/ryu/contrib/tinyrpc/server/gevent.py b/ryu/contrib/tinyrpc/server/gevent.py new file mode 100644 index 00000000..c1078fcf --- /dev/null +++ b/ryu/contrib/tinyrpc/server/gevent.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import +import gevent + +from . import RPCServer + + +class RPCServerGreenlets(RPCServer): + # documentation in docs because of dependencies + def _spawn(self, func, *args, **kwargs): + gevent.spawn(func, *args, **kwargs) diff --git a/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py new file mode 100644 index 00000000..a574d973 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import gevent +import zmq.green as zmq +from logbook import Logger + +from tinyrpc.protocols.jsonrpc import JSONRPCProtocol +from tinyrpc.dispatch import RPCDispatcher +from tinyrpc import RPCError, ServerError, MethodNotFoundError + + +class Server(object): + def __init__(transport, protocol, dispatcher): + self.transport = transport + self.protocol = protocol + self.dispatcher = dispatcher + + def run(self): + while True: + try: + context, message = self.transport.receive_message() + except Exception as e: + self.exception(e) + continue + + # assuming protocol is threadsafe and dispatcher is theadsafe, as long + # as its immutable + + self.handle_client(context, message) + + def handle_client(self, context, message): + try: + request = self.protocol.parse_request(message) + except RPCError as e: + self.exception(e) + response = e.error_respond() + else: + response = dispatcher.dispatch(request) + + # send reply + reply = response.serialize() + self.transport.send_reply(context, reply) + + +class ConcurrentServerMixin(object): + def handle_client(self, context, message): + self.spawn( + super(ConcurrentServer, self).handle_client, context, message + ) + + +class ZmqRouterTransport(object): + def __init__(self, socket): + self.socket = socket + + def receive_message(self): + msg = socket.recv_multipart() + return msg[:-1], [-1] + + def send_reply(self, context, reply): + self.send_multipart(context + [reply]) + + +class GeventConcurrencyMixin(ConcurrentServerMixin): + def spawn(self, func, *args, **kwargs): + gevent.spawn(func, *args, **kwargs) + + +def rpc_server(socket, protocol, dispatcher): + log = Logger('rpc_server') + log.debug('starting up...') + while True: + try: + message = socket.recv_multipart() + except Exception as e: + log.warning('Failed to receive message from client, ignoring...') + log.exception(e) + continue + + log.debug('Received message %s from %r' % (message[-1], message[0])) + + # assuming protocol is threadsafe and dispatcher is theadsafe, as long + # as its immutable + + def handle_client(message): + try: + request = protocol.parse_request(message[-1]) + except RPCError as e: + log.exception(e) + response = e.error_respond() + else: + response = dispatcher.dispatch(request) + log.debug('Response okay: %r' % response) + + # send reply + message[-1] = response.serialize() + log.debug('Replying %s to %r' % (message[-1], message[0])) + socket.send_multipart(message) + + gevent.spawn(handle_client, message) + + +context = zmq.Context() +socket = context.socket(zmq.ROUTER) +socket.bind("tcp://127.0.0.1:12345") + +dispatcher = RPCDispatcher() + +@dispatcher.public +def throw_up(): + return 'asad' + raise Exception('BLARGH') + +rpc_server(socket, JSONRPCProtocol(), dispatcher) diff --git a/ryu/contrib/tinyrpc/transports/__init__.py b/ryu/contrib/tinyrpc/transports/__init__.py new file mode 100644 index 00000000..3bbc8720 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/__init__.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +class ServerTransport(object): + """Base class for all server transports.""" + + def receive_message(self): + """Receive a message from the transport. + + Blocks until another message has been received. May return a context + opaque to clients that should be passed on + :py:func:`~tinyrpc.transport.Transport.send_reply` to identify the + client later on. + + :return: A tuple consisting of ``(context, message)``. + """ + raise NotImplementedError() + + def send_reply(self, context, reply): + """Sends a reply to a client. + + The client is usually identified by passing ``context`` as returned + from the original + :py:func:`~tinyrpc.transport.Transport.receive_message` call. + + Messages must be strings, it is up to the sender to convert the + beforehand. A non-string value raises a :py:exc:`TypeError`. + + :param context: A context returned by + :py:func:`~tinyrpc.transport.Transport.receive_message`. + :param reply: A string to send back as the reply. + """ + raise NotImplementedError + + +class ClientTransport(object): + """Base class for all client transports.""" + + def send_message(self, message, expect_reply=True): + """Send a message to the server and possibly receive a reply. + + Sends a message to the connected server. + + Messages must be strings, it is up to the sender to convert the + beforehand. A non-string value raises a :py:exc:`TypeError`. + + This function will block until one reply has been received. + + :param message: A string to send. + :return: A string containing the server reply. + """ + raise NotImplementedError diff --git a/ryu/contrib/tinyrpc/transports/http.py b/ryu/contrib/tinyrpc/transports/http.py new file mode 100644 index 00000000..919f97f4 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/http.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from Queue import Queue +import threading +import requests + +from . import ServerTransport, ClientTransport + + +class HttpPostClientTransport(ClientTransport): + """HTTP POST based client transport. + + Requires :py:mod:`requests`. Submits messages to a server using the body of + an ``HTTP`` ``POST`` request. Replies are taken from the responses body. + + :param endpoint: The URL to send ``POST`` data to. + :param kwargs: Additional parameters for :py:func:`requests.post`. + """ + def __init__(self, endpoint, **kwargs): + self.endpoint = endpoint + self.request_kwargs = kwargs + + def send_message(self, message, expect_reply=True): + if not isinstance(message, str): + raise TypeError('str expected') + + r = requests.post(self.endpoint, data=message, **self.request_kwargs) + + if expect_reply: + return r.content diff --git a/ryu/contrib/tinyrpc/transports/tcp.py b/ryu/contrib/tinyrpc/transports/tcp.py new file mode 100644 index 00000000..c5ac6142 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/tcp.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from Queue import Queue +import struct +import threading + +from SocketServer import TCPServer, BaseRequestHandler, ThreadingMixIn + +from . import RPCRequestResponseServer + + +def _read_length_prefixed_msg(sock, prefix_format='!I'): + prefix_bytes = struct.calcsize(prefix_format) + + sock.recv(prefix_bytes) + +def _read_n_bytes(sock, n): + buf = [] + while n > 0: + data = sock.recv(n) + n -= len(data) + buf.append(data) + + return ''.join(buf) + + +def create_length_prefixed_tcp_handler(): + queue = Queue() + class LengthPrefixedTcpHandler(BaseRequestHandler): + def handle(self): + #msg = _read_length_prefixed_msg(self.request) + # this will run inside a new thread + self.request.send("hello\n") + while True: + b = _read_n_bytes(self.request, 10) + self.request.send("you sent: %s" % b) + queue.put(b) + + return queue, LengthPrefixedTcpHandler + + +def tcp_test_main(): + class Server(ThreadingMixIn, TCPServer): + pass + + queue, Handler = create_length_prefixed_tcp_handler() + + server = Server(('localhost', 12345), Handler) + server.allow_reuse_address = True + + server.serve_forever() diff --git a/ryu/contrib/tinyrpc/transports/wsgi.py b/ryu/contrib/tinyrpc/transports/wsgi.py new file mode 100644 index 00000000..f9a84c12 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/wsgi.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import Queue + +from werkzeug.wrappers import Response, Request + +from . import ServerTransport + + +class WsgiServerTransport(ServerTransport): + """WSGI transport. + + Requires :py:mod:`werkzeug`. + + Due to the nature of WSGI, this transport has a few pecularities: It must + be run in a thread, greenlet or some other form of concurrent execution + primitive. + + This is due to + :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.handle` blocking + while waiting for a call to + :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.send_reply`. + + The parameter ``queue_class`` must be used to supply a proper queue class + for the chosen concurrency mechanism (i.e. when using :py:mod:`gevent`, + set it to :py:class:`gevent.queue.Queue`). + + :param max_content_length: The maximum request content size allowed. Should + be set to a sane value to prevent DoS-Attacks. + :param queue_class: The Queue class to use. + :param allow_origin: The ``Access-Control-Allow-Origin`` header. Defaults + to ``*`` (so change it if you need actual security). + """ + def __init__(self, max_content_length=4096, queue_class=Queue.Queue, + allow_origin='*'): + self._queue_class = queue_class + self.messages = queue_class() + self.max_content_length = max_content_length + self.allow_origin = allow_origin + + def receive_message(self): + return self.messages.get() + + def send_reply(self, context, reply): + if not isinstance(reply, str): + raise TypeError('str expected') + + context.put(reply) + + def handle(self, environ, start_response): + """WSGI handler function. + + The transport will serve a request by reading the message and putting + it into an internal buffer. It will then block until another + concurrently running function sends a reply using + :py:func:`~tinyrpc.transports.WsgiServerTransport.send_reply`. + + The reply will then be sent to the client being handled and handle will + return. + """ + request = Request(environ) + request.max_content_length = self.max_content_length + + access_control_headers = { + 'Access-Control-Allow-Methods': 'POST', + 'Access-Control-Allow-Origin': self.allow_origin, + 'Access-Control-Allow-Headers': \ + 'Content-Type, X-Requested-With, Accept, Origin' + } + + if request.method == 'OPTIONS': + response = Response(headers=access_control_headers) + + elif request.method == 'POST': + # message is encoded in POST, read it... + msg = request.stream.read() + + # create new context + context = self._queue_class() + + self.messages.put((context, msg)) + + # ...and send the reply + response = Response(context.get(), headers=access_control_headers) + else: + # nothing else supported at the moment + response = Response('Only POST supported', 405) + + return response(environ, start_response) diff --git a/ryu/contrib/tinyrpc/transports/zmq.py b/ryu/contrib/tinyrpc/transports/zmq.py new file mode 100644 index 00000000..502a1ddc --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/zmq.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import # needed for zmq import +import zmq + +from . import ServerTransport, ClientTransport + + +class ZmqServerTransport(ServerTransport): + """Server transport based on a :py:const:`zmq.ROUTER` socket. + + :param socket: A :py:const:`zmq.ROUTER` socket instance, bound to an + endpoint. + """ + + def __init__(self, socket): + self.socket = socket + + def receive_message(self): + msg = self.socket.recv_multipart() + return msg[:-1], msg[-1] + + def send_reply(self, context, reply): + self.socket.send_multipart(context + [reply]) + + @classmethod + def create(cls, zmq_context, endpoint): + """Create new server transport. + + Instead of creating the socket yourself, you can call this function and + merely pass the :py:class:`zmq.core.context.Context` instance. + + By passing a context imported from :py:mod:`zmq.green`, you can use + green (gevent) 0mq sockets as well. + + :param zmq_context: A 0mq context. + :param endpoint: The endpoint clients will connect to. + """ + socket = zmq_context.socket(zmq.ROUTER) + socket.bind(endpoint) + return cls(socket) + + +class ZmqClientTransport(ClientTransport): + """Client transport based on a :py:const:`zmq.REQ` socket. + + :param socket: A :py:const:`zmq.REQ` socket instance, connected to the + server socket. + """ + + def __init__(self, socket): + self.socket = socket + + def send_message(self, message, expect_reply=True): + self.socket.send(message) + + if expect_reply: + return self.socket.recv() + + @classmethod + def create(cls, zmq_context, endpoint): + """Create new client transport. + + Instead of creating the socket yourself, you can call this function and + merely pass the :py:class:`zmq.core.context.Context` instance. + + By passing a context imported from :py:mod:`zmq.green`, you can use + green (gevent) 0mq sockets as well. + + :param zmq_context: A 0mq context. + :param endpoint: The endpoint the server is bound to. + """ + socket = zmq_context.socket(zmq.REQ) + socket.connect(endpoint) + return cls(socket) diff --git a/tools/pip-requires b/tools/pip-requires index 793e48b4..7be60f19 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -7,4 +7,3 @@ paramiko routes six>=1.4.0 webob>=1.0.8 -tinyrpc |