summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/contrib/tinyrpc/__init__.py6
-rw-r--r--ryu/contrib/tinyrpc/client.py91
-rw-r--r--ryu/contrib/tinyrpc/dispatch/__init__.py201
-rw-r--r--ryu/contrib/tinyrpc/exc.py40
-rw-r--r--ryu/contrib/tinyrpc/protocols/__init__.py173
-rw-r--r--ryu/contrib/tinyrpc/protocols/jsonrpc.py291
-rw-r--r--ryu/contrib/tinyrpc/server/__init__.py71
-rw-r--r--ryu/contrib/tinyrpc/server/gevent.py13
-rw-r--r--ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py115
-rw-r--r--ryu/contrib/tinyrpc/transports/__init__.py52
-rw-r--r--ryu/contrib/tinyrpc/transports/http.py31
-rw-r--r--ryu/contrib/tinyrpc/transports/tcp.py52
-rw-r--r--ryu/contrib/tinyrpc/transports/wsgi.py90
-rw-r--r--ryu/contrib/tinyrpc/transports/zmq.py76
-rw-r--r--tools/pip-requires1
15 files changed, 1302 insertions, 1 deletions
diff --git a/ryu/contrib/tinyrpc/__init__.py b/ryu/contrib/tinyrpc/__init__.py
new file mode 100644
index 00000000..f24deb2e
--- /dev/null
+++ b/ryu/contrib/tinyrpc/__init__.py
@@ -0,0 +1,6 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from .protocols import *
+from .exc import *
+from .client import *
diff --git a/ryu/contrib/tinyrpc/client.py b/ryu/contrib/tinyrpc/client.py
new file mode 100644
index 00000000..0d77547f
--- /dev/null
+++ b/ryu/contrib/tinyrpc/client.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from .exc import RPCError
+
+
+class RPCClient(object):
+ """Client for making RPC calls to connected servers.
+
+ :param protocol: An :py:class:`~tinyrpc.RPCProtocol` instance.
+ :param transport: A :py:class:`~tinyrpc.transports.ClientTransport`
+ instance.
+ """
+
+ def __init__(self, protocol, transport):
+ self.protocol = protocol
+ self.transport = transport
+
+ def _send_and_handle_reply(self, req):
+ # sends and waits for reply
+ reply = self.transport.send_message(req.serialize())
+
+ response = self.protocol.parse_reply(reply)
+
+ if hasattr(response, 'error'):
+ raise RPCError('Error calling remote procedure: %s' %\
+ response.error)
+
+ return response
+
+ def call(self, method, args, kwargs, one_way=False):
+ """Calls the requested method and returns the result.
+
+ If an error occured, an :py:class:`~tinyrpc.exc.RPCError` instance
+ is raised.
+
+ :param method: Name of the method to call.
+ :param args: Arguments to pass to the method.
+ :param kwargs: Keyword arguments to pass to the method.
+ :param one_way: Whether or not a reply is desired.
+ """
+ req = self.protocol.create_request(method, args, kwargs, one_way)
+
+ return self._send_and_handle_reply(req).result
+
+ def get_proxy(self, prefix='', one_way=False):
+ """Convenience method for creating a proxy.
+
+ :param prefix: Passed on to :py:class:`~tinyrpc.client.RPCProxy`.
+ :param one_way: Passed on to :py:class:`~tinyrpc.client.RPCProxy`.
+ :return: :py:class:`~tinyrpc.client.RPCProxy` instance."""
+ return RPCProxy(self, prefix, one_way)
+
+ def batch_call(self, calls):
+ """Experimental, use at your own peril."""
+ req = self.protocol.create_batch_request()
+
+ for call_args in calls:
+ req.append(self.protocol.create_request(*call_args))
+
+ return self._send_and_handle_reply(req)
+
+
+class RPCProxy(object):
+ """Create a new remote proxy object.
+
+ Proxies allow calling of methods through a simpler interface. See the
+ documentation for an example.
+
+ :param client: An :py:class:`~tinyrpc.client.RPCClient` instance.
+ :param prefix: Prefix to prepend to every method name.
+ :param one_way: Passed to every call of
+ :py:func:`~tinyrpc.client.call`.
+ """
+
+ def __init__(self, client, prefix='', one_way=False):
+ self.client = client
+ self.prefix = prefix
+ self.one_way = one_way
+
+ def __getattr__(self, name):
+ """Returns a proxy function that, when called, will call a function
+ name ``name`` on the client associated with the proxy.
+ """
+ proxy_func = lambda *args, **kwargs: self.client.call(
+ self.prefix + name,
+ args,
+ kwargs,
+ one_way=self.one_way
+ )
+ return proxy_func
diff --git a/ryu/contrib/tinyrpc/dispatch/__init__.py b/ryu/contrib/tinyrpc/dispatch/__init__.py
new file mode 100644
index 00000000..ec722e4c
--- /dev/null
+++ b/ryu/contrib/tinyrpc/dispatch/__init__.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import inspect
+
+from ..exc import *
+
+
+def public(name=None):
+ """Set RPC name on function.
+
+ This function decorator will set the ``_rpc_public_name`` attribute on a
+ function, causing it to be picked up if an instance of its parent class is
+ registered using
+ :py:func:`~tinyrpc.dispatch.RPCDispatcher.register_instance`.
+
+ ``@public`` is a shortcut for ``@public()``.
+
+ :param name: The name to register the function with.
+ """
+ # called directly with function
+ if callable(name):
+ f = name
+ f._rpc_public_name = f.__name__
+ return f
+
+ def _(f):
+ f._rpc_public_name = name or f.__name__
+ return f
+
+ return _
+
+
+class RPCDispatcher(object):
+ """Stores name-to-method mappings."""
+
+ def __init__(self):
+ self.method_map = {}
+ self.subdispatchers = {}
+
+ def add_subdispatch(self, dispatcher, prefix=''):
+ """Adds a subdispatcher, possibly in its own namespace.
+
+ :param dispatcher: The dispatcher to add as a subdispatcher.
+ :param prefix: A prefix. All of the new subdispatchers methods will be
+ available as prefix + their original name.
+ """
+ self.subdispatchers.setdefault(prefix, []).append(dispatcher)
+
+ def add_method(self, f, name=None):
+ """Add a method to the dispatcher.
+
+ :param f: Callable to be added.
+ :param name: Name to register it with. If ``None``, ``f.__name__`` will
+ be used.
+ """
+ assert callable(f), "method argument must be callable"
+ # catches a few programming errors that are
+ # commonly silently swallowed otherwise
+ if not name:
+ name = f.__name__
+
+ if name in self.method_map:
+ raise RPCError('Name %s already registered')
+
+ self.method_map[name] = f
+
+ def dispatch(self, request):
+ """Fully handle request.
+
+ The dispatch method determines which method to call, calls it and
+ returns a response containing a result.
+
+ No exceptions will be thrown, rather, every exception will be turned
+ into a response using :py:func:`~tinyrpc.RPCRequest.error_respond`.
+
+ If a method isn't found, a :py:exc:`~tinyrpc.exc.MethodNotFoundError`
+ response will be returned. If any error occurs outside of the requested
+ method, a :py:exc:`~tinyrpc.exc.ServerError` without any error
+ information will be returend.
+
+ If the method is found and called but throws an exception, the
+ exception thrown is used as a response instead. This is the only case
+ in which information from the exception is possibly propagated back to
+ the client, as the exception is part of the requested method.
+
+ :py:class:`~tinyrpc.RPCBatchRequest` instances are handled by handling
+ all its children in order and collecting the results, then returning an
+ :py:class:`~tinyrpc.RPCBatchResponse` with the results.
+
+ :param request: An :py:func:`~tinyrpc.RPCRequest`.
+ :return: An :py:func:`~tinyrpc.RPCResponse`.
+ """
+ if hasattr(request, 'create_batch_response'):
+ results = [self._dispatch(req) for req in request]
+
+ response = request.create_batch_response()
+ if response != None:
+ response.extend(results)
+
+ return response
+ else:
+ return self._dispatch(request)
+
+ def _dispatch(self, request):
+ try:
+ try:
+ method = self.get_method(request.method)
+ except KeyError as e:
+ return request.error_respond(MethodNotFoundError(e))
+
+ # we found the method
+ try:
+ result = method(*request.args, **request.kwargs)
+ except Exception as e:
+ # an error occured within the method, return it
+ return request.error_respond(e)
+
+ # respond with result
+ return request.respond(result)
+ except Exception as e:
+ # unexpected error, do not let client know what happened
+ return request.error_respond(ServerError())
+
+ def get_method(self, name):
+ """Retrieve a previously registered method.
+
+ Checks if a method matching ``name`` has been registered.
+
+ If :py:func:`get_method` cannot find a method, every subdispatcher
+ with a prefix matching the method name is checked as well.
+
+ If a method isn't found, a :py:class:`KeyError` is thrown.
+
+ :param name: Callable to find.
+ :param return: The callable.
+ """
+ if name in self.method_map:
+ return self.method_map[name]
+
+ for prefix, subdispatchers in self.subdispatchers.iteritems():
+ if name.startswith(prefix):
+ for sd in subdispatchers:
+ try:
+ return sd.get_method(name[len(prefix):])
+ except KeyError:
+ pass
+
+ raise KeyError(name)
+
+ def public(self, name=None):
+ """Convenient decorator.
+
+ Allows easy registering of functions to this dispatcher. Example:
+
+ .. code-block:: python
+
+ dispatch = RPCDispatcher()
+
+ @dispatch.public
+ def foo(bar):
+ # ...
+
+ class Baz(object):
+ def not_exposed(self):
+ # ...
+
+ @dispatch.public(name='do_something')
+ def visible_method(arg1)
+ # ...
+
+ :param name: Name to register callable with
+ """
+ if callable(name):
+ self.add_method(name)
+ return name
+
+ def _(f):
+ self.add_method(f, name=name)
+ return f
+
+ return _
+
+ def register_instance(self, obj, prefix=''):
+ """Create new subdispatcher and register all public object methods on
+ it.
+
+ To be used in conjunction with the :py:func:`tinyrpc.dispatch.public`
+ decorator (*not* :py:func:`tinyrpc.dispatch.RPCDispatcher.public`).
+
+ :param obj: The object whose public methods should be made available.
+ :param prefix: A prefix for the new subdispatcher.
+ """
+ dispatch = self.__class__()
+ for name, f in inspect.getmembers(
+ obj, lambda f: callable(f) and hasattr(f, '_rpc_public_name')
+ ):
+ dispatch.add_method(f, f._rpc_public_name)
+
+ # add to dispatchers
+ self.add_subdispatch(dispatch, prefix)
diff --git a/ryu/contrib/tinyrpc/exc.py b/ryu/contrib/tinyrpc/exc.py
new file mode 100644
index 00000000..0c57284f
--- /dev/null
+++ b/ryu/contrib/tinyrpc/exc.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+class RPCError(Exception):
+ """Base class for all excetions thrown by :py:mod:`tinyrpc`."""
+
+
+class BadRequestError(RPCError):
+ """Base class for all errors that caused the processing of a request to
+ abort before a request object could be instantiated."""
+
+ def error_respond(self):
+ """Create :py:class:`~tinyrpc.RPCErrorResponse` to respond the error.
+
+ :return: A error responce instance or ``None``, if the protocol decides
+ to drop the error silently."""
+ raise RuntimeError('Not implemented')
+
+
+class BadReplyError(RPCError):
+ """Base class for all errors that caused processing of a reply to abort
+ before it could be turned in a response object."""
+
+
+class InvalidRequestError(BadRequestError):
+ """A request made was malformed (i.e. violated the specification) and could
+ not be parsed."""
+
+
+class InvalidReplyError(BadReplyError):
+ """A reply received was malformed (i.e. violated the specification) and
+ could not be parsed into a response."""
+
+
+class MethodNotFoundError(RPCError):
+ """The desired method was not found."""
+
+
+class ServerError(RPCError):
+ """An internal error in the RPC system occured."""
diff --git a/ryu/contrib/tinyrpc/protocols/__init__.py b/ryu/contrib/tinyrpc/protocols/__init__.py
new file mode 100644
index 00000000..9ad55b9e
--- /dev/null
+++ b/ryu/contrib/tinyrpc/protocols/__init__.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+
+from ..exc import *
+
+class RPCRequest(object):
+ unique_id = None
+ """A unique ID to remember the request by. Protocol specific, may or
+ may not be set. This value should only be set by
+ :py:func:`~tinyrpc.RPCProtocol.create_request`.
+
+ The ID allows client to receive responses out-of-order and still allocate
+ them to the correct request.
+
+ Only supported if the parent protocol has
+ :py:attr:`~tinyrpc.RPCProtocol.supports_out_of_order` set to ``True``.
+ """
+
+ method = None
+ """The name of the method to be called."""
+
+ args = []
+ """The positional arguments of the method call."""
+
+ kwargs = {}
+ """The keyword arguments of the method call."""
+
+ def error_respond(self, error):
+ """Creates an error response.
+
+ Create a response indicating that the request was parsed correctly,
+ but an error has occured trying to fulfill it.
+
+ :param error: An exception or a string describing the error.
+
+ :return: A response or ``None`` to indicate that no error should be sent
+ out.
+ """
+ raise NotImplementedError()
+
+ def respond(self, result):
+ """Create a response.
+
+ Call this to return the result of a successful method invocation.
+
+ This creates and returns an instance of a protocol-specific subclass of
+ :py:class:`~tinyrpc.RPCResponse`.
+
+ :param result: Passed on to new response instance.
+
+ :return: A response or ``None`` to indicate this request does not expect a
+ response.
+ """
+ raise NotImplementedError()
+
+ def serialize(self):
+ """Returns a serialization of the request.
+
+ :return: A string to be passed on to a transport.
+ """
+ raise NotImplementedError()
+
+
+class RPCBatchRequest(list):
+ """Multiple requests batched together.
+
+ A batch request is a subclass of :py:class:`list`. Protocols that support
+ multiple requests in a single message use this to group them together.
+
+ Handling a batch requests is done in any order, responses must be gathered
+ in a batch response and be in the same order as their respective requests.
+
+ Any item of a batch request is either a request or a subclass of
+ :py:class:`~tinyrpc.BadRequestError`, which indicates that there has been
+ an error in parsing the request.
+ """
+
+ def create_batch_response(self):
+ """Creates a response suitable for responding to this request.
+
+ :return: An :py:class:`~tinyrpc.RPCBatchResponse` or ``None``, if no
+ response is expected."""
+ raise NotImplementedError()
+
+ def serialize(self):
+ raise NotImplementedError()
+
+
+class RPCResponse(object):
+ """RPC call response class.
+
+ Base class for all deriving responses.
+
+ Has an attribute ``result`` containing the result of the RPC call, unless
+ an error occured, in which case an attribute ``error`` will contain the
+ error message."""
+
+ unique_id = None
+
+ def serialize(self):
+ """Returns a serialization of the response.
+
+ :return: A reply to be passed on to a transport.
+ """
+ raise NotImplementedError()
+
+
+class RPCErrorResponse(RPCResponse):
+ pass
+
+
+class RPCBatchResponse(list):
+ """Multiple response from a batch request. See
+ :py:class:`~tinyrpc.RPCBatchRequest` on how to handle.
+
+ Items in a batch response need to be
+ :py:class:`~tinyrpc.RPCResponse` instances or None, meaning no reply should
+ generated for the request.
+ """
+
+ def serialize(self):
+ """Returns a serialization of the batch response."""
+ raise NotImplementedError()
+
+
+class RPCProtocol(object):
+ """Base class for all protocol implementations."""
+
+ supports_out_of_order = False
+ """If true, this protocol can receive responses out of order correctly.
+
+ Note that this usually depends on the generation of unique_ids, the
+ generation of these may or may not be thread safe, depending on the
+ protocol. Ideally, only once instance of RPCProtocol should be used per
+ client."""
+
+ def create_request(self, method, args=None, kwargs=None, one_way=False):
+ """Creates a new RPCRequest object.
+
+ It is up to the implementing protocol whether or not ``args``,
+ ``kwargs``, one of these, both at once or none of them are supported.
+
+ :param method: The method name to invoke.
+ :param args: The positional arguments to call the method with.
+ :param kwargs: The keyword arguments to call the method with.
+ :param one_way: The request is an update, i.e. it does not expect a
+ reply.
+ :return: A new :py:class:`~tinyrpc.RPCRequest` instance.
+ """
+ raise NotImplementedError()
+
+ def parse_request(self, data):
+ """Parses a request given as a string and returns an
+ :py:class:`RPCRequest` instance.
+
+ :return: An instanced request.
+ """
+ raise NotImplementedError()
+
+ def parse_reply(self, data):
+ """Parses a reply and returns an :py:class:`RPCResponse` instance.
+
+ :return: An instanced response.
+ """
+ raise NotImplementedError()
+
+
+class RPCBatchProtocol(RPCProtocol):
+ def create_batch_request(self, requests=None):
+ """Create a new :py:class:`tinyrpc.RPCBatchRequest` object.
+
+ :param requests: A list of requests.
+ """
+ raise NotImplementedError()
diff --git a/ryu/contrib/tinyrpc/protocols/jsonrpc.py b/ryu/contrib/tinyrpc/protocols/jsonrpc.py
new file mode 100644
index 00000000..941da51d
--- /dev/null
+++ b/ryu/contrib/tinyrpc/protocols/jsonrpc.py
@@ -0,0 +1,291 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from .. import RPCBatchProtocol, RPCRequest, RPCResponse, RPCErrorResponse,\
+ InvalidRequestError, MethodNotFoundError, ServerError,\
+ InvalidReplyError, RPCError, RPCBatchRequest, RPCBatchResponse
+
+import json
+
+class FixedErrorMessageMixin(object):
+ def __init__(self, *args, **kwargs):
+ if not args:
+ args = [self.message]
+ super(FixedErrorMessageMixin, self).__init__(*args, **kwargs)
+
+ def error_respond(self):
+ response = JSONRPCErrorResponse()
+
+ response.error = self.message
+ response.unique_id = None
+ response._jsonrpc_error_code = self.jsonrpc_error_code
+ return response
+
+
+
+class JSONRPCParseError(FixedErrorMessageMixin, InvalidRequestError):
+ jsonrpc_error_code = -32700
+ message = 'Parse error'
+
+
+class JSONRPCInvalidRequestError(FixedErrorMessageMixin, InvalidRequestError):
+ jsonrpc_error_code = -32600
+ message = 'Invalid Request'
+
+
+class JSONRPCMethodNotFoundError(FixedErrorMessageMixin, MethodNotFoundError):
+ jsonrpc_error_code = -32601
+ message = 'Method not found'
+
+
+class JSONRPCInvalidParamsError(FixedErrorMessageMixin, InvalidRequestError):
+ jsonrpc_error_code = -32602
+ message = 'Invalid params'
+
+
+class JSONRPCInternalError(FixedErrorMessageMixin, InvalidRequestError):
+ jsonrpc_error_code = -32603
+ message = 'Internal error'
+
+
+class JSONRPCServerError(FixedErrorMessageMixin, InvalidRequestError):
+ jsonrpc_error_code = -32000
+ message = ''
+
+
+class JSONRPCSuccessResponse(RPCResponse):
+ def _to_dict(self):
+ return {
+ 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
+ 'id': self.unique_id,
+ 'result': self.result,
+ }
+
+ def serialize(self):
+ return json.dumps(self._to_dict())
+
+
+class JSONRPCErrorResponse(RPCErrorResponse):
+ def _to_dict(self):
+ return {
+ 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
+ 'id': self.unique_id,
+ 'error': {
+ 'message': str(self.error),
+ 'code': self._jsonrpc_error_code,
+ }
+ }
+
+ def serialize(self):
+ return json.dumps(self._to_dict())
+
+
+def _get_code_and_message(error):
+ assert isinstance(error, (Exception, basestring))
+ if isinstance(error, Exception):
+ if hasattr(error, 'jsonrpc_error_code'):
+ code = error.jsonrpc_error_code
+ msg = str(error)
+ elif isinstance(error, InvalidRequestError):
+ code = JSONRPCInvalidRequestError.jsonrpc_error_code
+ msg = JSONRPCInvalidRequestError.message
+ elif isinstance(error, MethodNotFoundError):
+ code = JSONRPCMethodNotFoundError.jsonrpc_error_code
+ msg = JSONRPCMethodNotFoundError.message
+ else:
+ # allow exception message to propagate
+ code = JSONRPCServerError.jsonrpc_error_code
+ msg = str(error)
+ else:
+ code = -32000
+ msg = error
+
+ return code, msg
+
+
+class JSONRPCRequest(RPCRequest):
+ def error_respond(self, error):
+ if not self.unique_id:
+ return None
+
+ response = JSONRPCErrorResponse()
+
+ code, msg = _get_code_and_message(error)
+
+ response.error = msg
+ response.unique_id = self.unique_id
+ response._jsonrpc_error_code = code
+ return response
+
+ def respond(self, result):
+ response = JSONRPCSuccessResponse()
+
+ if not self.unique_id:
+ return None
+
+ response.result = result
+ response.unique_id = self.unique_id
+
+ return response
+
+ def _to_dict(self):
+ jdata = {
+ 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
+ 'method': self.method,
+ }
+ if self.args:
+ jdata['params'] = self.args
+ if self.kwargs:
+ jdata['params'] = self.kwargs
+ if self.unique_id != None:
+ jdata['id'] = self.unique_id
+ return jdata
+
+ def serialize(self):
+ return json.dumps(self._to_dict())
+
+
+class JSONRPCBatchRequest(RPCBatchRequest):
+ def create_batch_response(self):
+ if self._expects_response():
+ return JSONRPCBatchResponse()
+
+ def _expects_response(self):
+ for request in self:
+ if isinstance(request, Exception):
+ return True
+ if request.unique_id != None:
+ return True
+
+ return False
+
+ def serialize(self):
+ return json.dumps([req._to_dict() for req in self])
+
+
+class JSONRPCBatchResponse(RPCBatchResponse):
+ def serialize(self):
+ return json.dumps([resp._to_dict() for resp in self if resp != None])
+
+
+class JSONRPCProtocol(RPCBatchProtocol):
+ """JSONRPC protocol implementation.
+
+ Currently, only version 2.0 is supported."""
+
+ JSON_RPC_VERSION = "2.0"
+ _ALLOWED_REPLY_KEYS = sorted(['id', 'jsonrpc', 'error', 'result'])
+ _ALLOWED_REQUEST_KEYS = sorted(['id', 'jsonrpc', 'method', 'params'])
+
+ def __init__(self, *args, **kwargs):
+ super(JSONRPCProtocol, self).__init__(*args, **kwargs)
+ self._id_counter = 0
+
+ def _get_unique_id(self):
+ self._id_counter += 1
+ return self._id_counter
+
+ def create_batch_request(self, requests=None):
+ return JSONRPCBatchRequest(requests or [])
+
+ def create_request(self, method, args=None, kwargs=None, one_way=False):
+ if args and kwargs:
+ raise InvalidRequestError('Does not support args and kwargs at '\
+ 'the same time')
+
+ request = JSONRPCRequest()
+
+ if not one_way:
+ request.unique_id = self._get_unique_id()
+
+ request.method = method
+ request.args = args
+ request.kwargs = kwargs
+
+ return request
+
+ def parse_reply(self, data):
+ try:
+ rep = json.loads(data)
+ except Exception as e:
+ raise InvalidReplyError(e)
+
+ for k in rep.iterkeys():
+ if not k in self._ALLOWED_REPLY_KEYS:
+ raise InvalidReplyError('Key not allowed: %s' % k)
+
+ if not 'jsonrpc' in rep:
+ raise InvalidReplyError('Missing jsonrpc (version) in response.')
+
+ if rep['jsonrpc'] != self.JSON_RPC_VERSION:
+ raise InvalidReplyError('Wrong JSONRPC version')
+
+ if not 'id' in rep:
+ raise InvalidReplyError('Missing id in response')
+
+ if ('error' in rep) == ('result' in rep):
+ raise InvalidReplyError(
+ 'Reply must contain exactly one of result and error.'
+ )
+
+ if 'error' in rep:
+ response = JSONRPCErrorResponse()
+ error = rep['error']
+ response.error = error['message']
+ response._jsonrpc_error_code = error['code']
+ else:
+ response = JSONRPCSuccessResponse()
+ response.result = rep.get('result', None)
+
+ response.unique_id = rep['id']
+
+ return response
+
+ def parse_request(self, data):
+ try:
+ req = json.loads(data)
+ except Exception as e:
+ raise JSONRPCParseError()
+
+ if isinstance(req, list):
+ # batch request
+ requests = JSONRPCBatchRequest()
+ for subreq in req:
+ try:
+ requests.append(self._parse_subrequest(subreq))
+ except RPCError as e:
+ requests.append(e)
+ except Exception as e:
+ requests.append(JSONRPCInvalidRequestError())
+
+ if not requests:
+ raise JSONRPCInvalidRequestError()
+ return requests
+ else:
+ return self._parse_subrequest(req)
+
+ def _parse_subrequest(self, req):
+ for k in req.iterkeys():
+ if not k in self._ALLOWED_REQUEST_KEYS:
+ raise JSONRPCInvalidRequestError()
+
+ if req.get('jsonrpc', None) != self.JSON_RPC_VERSION:
+ raise JSONRPCInvalidRequestError()
+
+ if not isinstance(req['method'], basestring):
+ raise JSONRPCInvalidRequestError()
+
+ request = JSONRPCRequest()
+ request.method = str(req['method'])
+ request.unique_id = req.get('id', None)
+
+ params = req.get('params', None)
+ if params != None:
+ if isinstance(params, list):
+ request.args = req['params']
+ elif isinstance(params, dict):
+ request.kwargs = req['params']
+ else:
+ raise JSONRPCInvalidParamsError()
+
+ return request
diff --git a/ryu/contrib/tinyrpc/server/__init__.py b/ryu/contrib/tinyrpc/server/__init__.py
new file mode 100644
index 00000000..6b2cc1ad
--- /dev/null
+++ b/ryu/contrib/tinyrpc/server/__init__.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# FIXME: needs unittests
+# FIXME: needs checks for out-of-order, concurrency, etc as attributes
+from tinyrpc.exc import RPCError
+
+class RPCServer(object):
+ """High level RPC server.
+
+ :param transport: The :py:class:`~tinyrpc.transports.RPCTransport` to use.
+ :param protocol: The :py:class:`~tinyrpc.RPCProtocol` to use.
+ :param dispatcher: The :py:class:`~tinyrpc.dispatch.RPCDispatcher` to use.
+ """
+ def __init__(self, transport, protocol, dispatcher):
+ self.transport = transport
+ self.protocol = protocol
+ self.dispatcher = dispatcher
+
+ def serve_forever(self):
+ """Handle requests forever.
+
+ Starts the server loop in which the transport will be polled for a new
+ message.
+
+ After a new message has arrived,
+ :py:func:`~tinyrpc.server.RPCServer._spawn` is called with a handler
+ function and arguments to handle the request.
+
+ The handler function will try to decode the message using the supplied
+ protocol, if that fails, an error response will be sent. After decoding
+ the message, the dispatcher will be asked to handle the resultung
+ request and the return value (either an error or a result) will be sent
+ back to the client using the transport.
+
+ After calling :py:func:`~tinyrpc.server.RPCServer._spawn`, the server
+ will fetch the next message and repeat.
+ """
+ while True:
+ context, message = self.transport.receive_message()
+
+ # assuming protocol is threadsafe and dispatcher is theadsafe, as
+ # long as its immutable
+
+ def handle_message(context, message):
+ try:
+ request = self.protocol.parse_request(message)
+ except RPCError as e:
+ response = e.error_respond()
+ else:
+ response = self.dispatcher.dispatch(request)
+
+ # send reply
+ self.transport.send_reply(context, response.serialize())
+
+ self._spawn(handle_message, context, message)
+
+ def _spawn(self, func, *args, **kwargs):
+ """Spawn a handler function.
+
+ This function is overridden in subclasses to provide concurrency.
+
+ In the base implementation, it simply calls the supplied function
+ ``func`` with ``*args`` and ``**kwargs``. This results in a
+ single-threaded, single-process, synchronous server.
+
+ :param func: A callable to call.
+ :param args: Arguments to ``func``.
+ :param kwargs: Keyword arguments to ``func``.
+ """
+ func(*args, **kwargs)
diff --git a/ryu/contrib/tinyrpc/server/gevent.py b/ryu/contrib/tinyrpc/server/gevent.py
new file mode 100644
index 00000000..c1078fcf
--- /dev/null
+++ b/ryu/contrib/tinyrpc/server/gevent.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+import gevent
+
+from . import RPCServer
+
+
+class RPCServerGreenlets(RPCServer):
+ # documentation in docs because of dependencies
+ def _spawn(self, func, *args, **kwargs):
+ gevent.spawn(func, *args, **kwargs)
diff --git a/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py
new file mode 100644
index 00000000..a574d973
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import gevent
+import zmq.green as zmq
+from logbook import Logger
+
+from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
+from tinyrpc.dispatch import RPCDispatcher
+from tinyrpc import RPCError, ServerError, MethodNotFoundError
+
+
+class Server(object):
+ def __init__(transport, protocol, dispatcher):
+ self.transport = transport
+ self.protocol = protocol
+ self.dispatcher = dispatcher
+
+ def run(self):
+ while True:
+ try:
+ context, message = self.transport.receive_message()
+ except Exception as e:
+ self.exception(e)
+ continue
+
+ # assuming protocol is threadsafe and dispatcher is theadsafe, as long
+ # as its immutable
+
+ self.handle_client(context, message)
+
+ def handle_client(self, context, message):
+ try:
+ request = self.protocol.parse_request(message)
+ except RPCError as e:
+ self.exception(e)
+ response = e.error_respond()
+ else:
+ response = dispatcher.dispatch(request)
+
+ # send reply
+ reply = response.serialize()
+ self.transport.send_reply(context, reply)
+
+
+class ConcurrentServerMixin(object):
+ def handle_client(self, context, message):
+ self.spawn(
+ super(ConcurrentServer, self).handle_client, context, message
+ )
+
+
+class ZmqRouterTransport(object):
+ def __init__(self, socket):
+ self.socket = socket
+
+ def receive_message(self):
+ msg = socket.recv_multipart()
+ return msg[:-1], [-1]
+
+ def send_reply(self, context, reply):
+ self.send_multipart(context + [reply])
+
+
+class GeventConcurrencyMixin(ConcurrentServerMixin):
+ def spawn(self, func, *args, **kwargs):
+ gevent.spawn(func, *args, **kwargs)
+
+
+def rpc_server(socket, protocol, dispatcher):
+ log = Logger('rpc_server')
+ log.debug('starting up...')
+ while True:
+ try:
+ message = socket.recv_multipart()
+ except Exception as e:
+ log.warning('Failed to receive message from client, ignoring...')
+ log.exception(e)
+ continue
+
+ log.debug('Received message %s from %r' % (message[-1], message[0]))
+
+ # assuming protocol is threadsafe and dispatcher is theadsafe, as long
+ # as its immutable
+
+ def handle_client(message):
+ try:
+ request = protocol.parse_request(message[-1])
+ except RPCError as e:
+ log.exception(e)
+ response = e.error_respond()
+ else:
+ response = dispatcher.dispatch(request)
+ log.debug('Response okay: %r' % response)
+
+ # send reply
+ message[-1] = response.serialize()
+ log.debug('Replying %s to %r' % (message[-1], message[0]))
+ socket.send_multipart(message)
+
+ gevent.spawn(handle_client, message)
+
+
+context = zmq.Context()
+socket = context.socket(zmq.ROUTER)
+socket.bind("tcp://127.0.0.1:12345")
+
+dispatcher = RPCDispatcher()
+
+@dispatcher.public
+def throw_up():
+ return 'asad'
+ raise Exception('BLARGH')
+
+rpc_server(socket, JSONRPCProtocol(), dispatcher)
diff --git a/ryu/contrib/tinyrpc/transports/__init__.py b/ryu/contrib/tinyrpc/transports/__init__.py
new file mode 100644
index 00000000..3bbc8720
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/__init__.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+class ServerTransport(object):
+ """Base class for all server transports."""
+
+ def receive_message(self):
+ """Receive a message from the transport.
+
+ Blocks until another message has been received. May return a context
+ opaque to clients that should be passed on
+ :py:func:`~tinyrpc.transport.Transport.send_reply` to identify the
+ client later on.
+
+ :return: A tuple consisting of ``(context, message)``.
+ """
+ raise NotImplementedError()
+
+ def send_reply(self, context, reply):
+ """Sends a reply to a client.
+
+ The client is usually identified by passing ``context`` as returned
+ from the original
+ :py:func:`~tinyrpc.transport.Transport.receive_message` call.
+
+ Messages must be strings, it is up to the sender to convert the
+ beforehand. A non-string value raises a :py:exc:`TypeError`.
+
+ :param context: A context returned by
+ :py:func:`~tinyrpc.transport.Transport.receive_message`.
+ :param reply: A string to send back as the reply.
+ """
+ raise NotImplementedError
+
+
+class ClientTransport(object):
+ """Base class for all client transports."""
+
+ def send_message(self, message, expect_reply=True):
+ """Send a message to the server and possibly receive a reply.
+
+ Sends a message to the connected server.
+
+ Messages must be strings, it is up to the sender to convert the
+ beforehand. A non-string value raises a :py:exc:`TypeError`.
+
+ This function will block until one reply has been received.
+
+ :param message: A string to send.
+ :return: A string containing the server reply.
+ """
+ raise NotImplementedError
diff --git a/ryu/contrib/tinyrpc/transports/http.py b/ryu/contrib/tinyrpc/transports/http.py
new file mode 100644
index 00000000..919f97f4
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/http.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from Queue import Queue
+import threading
+import requests
+
+from . import ServerTransport, ClientTransport
+
+
+class HttpPostClientTransport(ClientTransport):
+ """HTTP POST based client transport.
+
+ Requires :py:mod:`requests`. Submits messages to a server using the body of
+ an ``HTTP`` ``POST`` request. Replies are taken from the responses body.
+
+ :param endpoint: The URL to send ``POST`` data to.
+ :param kwargs: Additional parameters for :py:func:`requests.post`.
+ """
+ def __init__(self, endpoint, **kwargs):
+ self.endpoint = endpoint
+ self.request_kwargs = kwargs
+
+ def send_message(self, message, expect_reply=True):
+ if not isinstance(message, str):
+ raise TypeError('str expected')
+
+ r = requests.post(self.endpoint, data=message, **self.request_kwargs)
+
+ if expect_reply:
+ return r.content
diff --git a/ryu/contrib/tinyrpc/transports/tcp.py b/ryu/contrib/tinyrpc/transports/tcp.py
new file mode 100644
index 00000000..c5ac6142
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/tcp.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from Queue import Queue
+import struct
+import threading
+
+from SocketServer import TCPServer, BaseRequestHandler, ThreadingMixIn
+
+from . import RPCRequestResponseServer
+
+
+def _read_length_prefixed_msg(sock, prefix_format='!I'):
+ prefix_bytes = struct.calcsize(prefix_format)
+
+ sock.recv(prefix_bytes)
+
+def _read_n_bytes(sock, n):
+ buf = []
+ while n > 0:
+ data = sock.recv(n)
+ n -= len(data)
+ buf.append(data)
+
+ return ''.join(buf)
+
+
+def create_length_prefixed_tcp_handler():
+ queue = Queue()
+ class LengthPrefixedTcpHandler(BaseRequestHandler):
+ def handle(self):
+ #msg = _read_length_prefixed_msg(self.request)
+ # this will run inside a new thread
+ self.request.send("hello\n")
+ while True:
+ b = _read_n_bytes(self.request, 10)
+ self.request.send("you sent: %s" % b)
+ queue.put(b)
+
+ return queue, LengthPrefixedTcpHandler
+
+
+def tcp_test_main():
+ class Server(ThreadingMixIn, TCPServer):
+ pass
+
+ queue, Handler = create_length_prefixed_tcp_handler()
+
+ server = Server(('localhost', 12345), Handler)
+ server.allow_reuse_address = True
+
+ server.serve_forever()
diff --git a/ryu/contrib/tinyrpc/transports/wsgi.py b/ryu/contrib/tinyrpc/transports/wsgi.py
new file mode 100644
index 00000000..f9a84c12
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/wsgi.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import Queue
+
+from werkzeug.wrappers import Response, Request
+
+from . import ServerTransport
+
+
+class WsgiServerTransport(ServerTransport):
+ """WSGI transport.
+
+ Requires :py:mod:`werkzeug`.
+
+ Due to the nature of WSGI, this transport has a few pecularities: It must
+ be run in a thread, greenlet or some other form of concurrent execution
+ primitive.
+
+ This is due to
+ :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.handle` blocking
+ while waiting for a call to
+ :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.send_reply`.
+
+ The parameter ``queue_class`` must be used to supply a proper queue class
+ for the chosen concurrency mechanism (i.e. when using :py:mod:`gevent`,
+ set it to :py:class:`gevent.queue.Queue`).
+
+ :param max_content_length: The maximum request content size allowed. Should
+ be set to a sane value to prevent DoS-Attacks.
+ :param queue_class: The Queue class to use.
+ :param allow_origin: The ``Access-Control-Allow-Origin`` header. Defaults
+ to ``*`` (so change it if you need actual security).
+ """
+ def __init__(self, max_content_length=4096, queue_class=Queue.Queue,
+ allow_origin='*'):
+ self._queue_class = queue_class
+ self.messages = queue_class()
+ self.max_content_length = max_content_length
+ self.allow_origin = allow_origin
+
+ def receive_message(self):
+ return self.messages.get()
+
+ def send_reply(self, context, reply):
+ if not isinstance(reply, str):
+ raise TypeError('str expected')
+
+ context.put(reply)
+
+ def handle(self, environ, start_response):
+ """WSGI handler function.
+
+ The transport will serve a request by reading the message and putting
+ it into an internal buffer. It will then block until another
+ concurrently running function sends a reply using
+ :py:func:`~tinyrpc.transports.WsgiServerTransport.send_reply`.
+
+ The reply will then be sent to the client being handled and handle will
+ return.
+ """
+ request = Request(environ)
+ request.max_content_length = self.max_content_length
+
+ access_control_headers = {
+ 'Access-Control-Allow-Methods': 'POST',
+ 'Access-Control-Allow-Origin': self.allow_origin,
+ 'Access-Control-Allow-Headers': \
+ 'Content-Type, X-Requested-With, Accept, Origin'
+ }
+
+ if request.method == 'OPTIONS':
+ response = Response(headers=access_control_headers)
+
+ elif request.method == 'POST':
+ # message is encoded in POST, read it...
+ msg = request.stream.read()
+
+ # create new context
+ context = self._queue_class()
+
+ self.messages.put((context, msg))
+
+ # ...and send the reply
+ response = Response(context.get(), headers=access_control_headers)
+ else:
+ # nothing else supported at the moment
+ response = Response('Only POST supported', 405)
+
+ return response(environ, start_response)
diff --git a/ryu/contrib/tinyrpc/transports/zmq.py b/ryu/contrib/tinyrpc/transports/zmq.py
new file mode 100644
index 00000000..502a1ddc
--- /dev/null
+++ b/ryu/contrib/tinyrpc/transports/zmq.py
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import # needed for zmq import
+import zmq
+
+from . import ServerTransport, ClientTransport
+
+
+class ZmqServerTransport(ServerTransport):
+ """Server transport based on a :py:const:`zmq.ROUTER` socket.
+
+ :param socket: A :py:const:`zmq.ROUTER` socket instance, bound to an
+ endpoint.
+ """
+
+ def __init__(self, socket):
+ self.socket = socket
+
+ def receive_message(self):
+ msg = self.socket.recv_multipart()
+ return msg[:-1], msg[-1]
+
+ def send_reply(self, context, reply):
+ self.socket.send_multipart(context + [reply])
+
+ @classmethod
+ def create(cls, zmq_context, endpoint):
+ """Create new server transport.
+
+ Instead of creating the socket yourself, you can call this function and
+ merely pass the :py:class:`zmq.core.context.Context` instance.
+
+ By passing a context imported from :py:mod:`zmq.green`, you can use
+ green (gevent) 0mq sockets as well.
+
+ :param zmq_context: A 0mq context.
+ :param endpoint: The endpoint clients will connect to.
+ """
+ socket = zmq_context.socket(zmq.ROUTER)
+ socket.bind(endpoint)
+ return cls(socket)
+
+
+class ZmqClientTransport(ClientTransport):
+ """Client transport based on a :py:const:`zmq.REQ` socket.
+
+ :param socket: A :py:const:`zmq.REQ` socket instance, connected to the
+ server socket.
+ """
+
+ def __init__(self, socket):
+ self.socket = socket
+
+ def send_message(self, message, expect_reply=True):
+ self.socket.send(message)
+
+ if expect_reply:
+ return self.socket.recv()
+
+ @classmethod
+ def create(cls, zmq_context, endpoint):
+ """Create new client transport.
+
+ Instead of creating the socket yourself, you can call this function and
+ merely pass the :py:class:`zmq.core.context.Context` instance.
+
+ By passing a context imported from :py:mod:`zmq.green`, you can use
+ green (gevent) 0mq sockets as well.
+
+ :param zmq_context: A 0mq context.
+ :param endpoint: The endpoint the server is bound to.
+ """
+ socket = zmq_context.socket(zmq.REQ)
+ socket.connect(endpoint)
+ return cls(socket)
diff --git a/tools/pip-requires b/tools/pip-requires
index 793e48b4..7be60f19 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -7,4 +7,3 @@ paramiko
routes
six>=1.4.0
webob>=1.0.8
-tinyrpc