summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/lib/rpc.py295
-rw-r--r--setup.cfg2
-rw-r--r--tools/pip-requires1
3 files changed, 282 insertions, 16 deletions
diff --git a/ryu/lib/rpc.py b/ryu/lib/rpc.py
index 8455791a..57a35bab 100644
--- a/ryu/lib/rpc.py
+++ b/ryu/lib/rpc.py
@@ -1,37 +1,302 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# msgpack-rpc
+# http://wiki.msgpack.org/display/MSGPACK/RPC+specification
+
import msgpack
-class RpcMessage(object):
+class MessageType(object):
REQUEST = 0
RESPONSE = 1
NOTIFY = 2
-class RpcSession(object):
+class MessageEncoder(object):
+ """msgpack-rpc encoder/decoder.
+ intended to be transport-agnostic.
+ """
def __init__(self):
- super(RpcSession, self).__init__()
- self._packer = msgpack.Packer()
- self._unpacker = msgpack.Unpacker()
+ super(MessageEncoder, self).__init__()
+ # note: on-wire msgpack has no notion of encoding.
+ # the msgpack-python library implicitly converts unicode to
+ # utf-8 encoded bytes by default. we don't want to rely on
+ # the behaviour though because it seems to be going to change.
+ # cf. https://gist.github.com/methane/5022403
+ self._packer = msgpack.Packer(encoding=None)
+ self._unpacker = msgpack.Unpacker(encoding=None)
self._next_msgid = 0
def _create_msgid(self):
this_id = self._next_msgid
- self._next_msgid += 1
+ self._next_msgid = (self._next_msgid + 1) % 0xffffffff
return this_id
def create_request(self, method, params):
+ assert isinstance(method, str)
+ assert isinstance(params, list)
msgid = self._create_msgid()
- return self._packer.pack([RpcMessage.REQUEST, msgid, method, params])
+ return (self._packer.pack([MessageType.REQUEST, msgid, method,
+ params]), msgid)
- def create_response(self, msgid, error, result):
- return self._packer.pack([RpcMessage.RESPONSE, msgid, error, result])
+ def create_response(self, msgid, error=None, result=None):
+ assert isinstance(msgid, int)
+ assert 0 <= msgid and msgid <= 0xffffffff
+ assert error is None or result is None
+ return self._packer.pack([MessageType.RESPONSE, msgid, error, result])
def create_notification(self, method, params):
- return self._packer.pack([RpcMessage.NOTIFY, method, params])
+ assert isinstance(method, str)
+ assert isinstance(params, list)
+ return self._packer.pack([MessageType.NOTIFY, method, params])
- def get_messages(self, data):
+ def get_and_dispatch_messages(self, data, disp_table):
+ """dissect messages from a raw stream data.
+ disp_table[type] should be a callable for the corresponding
+ MessageType.
+ """
self._unpacker.feed(data)
- messages = []
- for msg in self._unpacker:
- messages.append(msg)
- return messages
+ for m in self._unpacker:
+ self._dispatch_message(m, disp_table)
+
+ def _dispatch_message(self, m, disp_table):
+ # XXX validation
+ type = m[0]
+ try:
+ f = disp_table[type]
+ except KeyError:
+ # ignore messages with unknown type
+ return
+ f(m[1:])
+
+
+from collections import deque
+import select
+
+
+class EndPoint(object):
+ """An endpoint
+ *sock* is a socket-like. it can be either blocking or non-blocking.
+ """
+ def __init__(self, sock, encoder=None, disp_table=None):
+ if encoder is None:
+ encoder = MessageEncoder()
+ self._encoder = encoder
+ self._sock = sock
+ if disp_table is None:
+ self._table = {
+ MessageType.REQUEST: self._enqueue_incoming_request,
+ MessageType.RESPONSE: self._enqueue_incoming_response,
+ MessageType.NOTIFY: self._enqueue_incoming_notification
+ }
+ else:
+ self._table = disp_table
+ self._send_buffer = bytearray()
+ # msgids for which we sent a request but have not received a response
+ self._pending_requests = set()
+ # queues for incoming messages
+ self._requests = deque()
+ self._notifications = deque()
+ self._responses = {}
+ self._incoming = 0 # number of incoming messages in our queues
+
+ def selectable(self):
+ rlist = [self._sock]
+ wlist = []
+ if self._send_buffer:
+ wlist.append(self._sock)
+ return rlist, wlist
+
+ def process_outgoing(self):
+ try:
+ sent_bytes = self._sock.send(self._send_buffer)
+ except IOError:
+ sent_bytes = 0
+ del self._send_buffer[:sent_bytes]
+
+ def process_incoming(self):
+ self.receive_messages(all=True)
+
+ def process(self):
+ self.process_outgoing()
+ self.process_incoming()
+
+ def block(self):
+ rlist, wlist = self.selectable()
+ select.select(rlist, wlist, rlist + wlist)
+
+ def serve(self):
+ while True:
+ self.block()
+ self.process()
+
+ def _send_message(self, msg):
+ self._send_buffer += msg
+ self.process_outgoing()
+
+ def send_request(self, method, params):
+ """Send a request
+ """
+ msg, msgid = self._encoder.create_request(method, params)
+ self._send_message(msg)
+ self._pending_requests.add(msgid)
+ return msgid
+
+ def send_response(self, msgid, error=None, result=None):
+ """Send a response
+ """
+ msg = self._encoder.create_response(msgid, error, result)
+ self._send_message(msg)
+
+ def send_notification(self, method, params):
+ """Send a notification
+ """
+ msg = self._encoder.create_notification(method, params)
+ self._send_message(msg)
+
+ def receive_messages(self, all=False):
+ """Try to receive some messages.
+ Received messages are put on the internal queues.
+ They can be retrieved using get_xxx() methods.
+ Returns True if there's something queued for get_xxx() methods.
+ """
+ while all or self._incoming == 0:
+ try:
+ packet = self._sock.recv(4096) # XXX the size is arbitrary
+ except IOError:
+ packet = None
+ if not packet:
+ break
+ self._encoder.get_and_dispatch_messages(packet, self._table)
+ return self._incoming > 0
+
+ def _enqueue_incoming_request(self, m):
+ self._requests.append(m)
+ self._incoming += 1
+
+ def _enqueue_incoming_response(self, m):
+ msgid, error, result = m
+ try:
+ self._pending_requests.remove(msgid)
+ except KeyError:
+ # bogus msgid
+ # XXXwarn
+ return
+ assert not msgid in self._responses
+ self._responses[msgid] = (error, result)
+ self._incoming += 1
+
+ def _enqueue_incoming_notification(self, m):
+ self._notifications.append(m)
+ self._incoming += 1
+
+ def _get_message(self, q):
+ try:
+ m = q.popleft()
+ assert self._incoming > 0
+ self._incoming -= 1
+ return m
+ except IndexError:
+ return None
+
+ def get_request(self):
+ return self._get_message(self._requests)
+
+ def get_response(self, msgid):
+ try:
+ m = self._responses.pop(msgid)
+ assert self._incoming > 0
+ self._incoming -= 1
+ except KeyError:
+ return None
+ error, result = m
+ return (result, error)
+
+ def get_notification(self):
+ return self._get_message(self._notifications)
+
+
+class RPCError(Exception):
+ """an error from server
+ """
+ def __init__(self, error):
+ self._error = error
+
+ def get_value(self):
+ return self._error
+
+ def __str__(self):
+ return str(self._error)
+
+
+class Client(object):
+ """a convenient class for a pure rpc client
+ *sock* is a socket-like. it should be blocking.
+ """
+ def __init__(self, sock, encoder=None, notification_callback=None):
+ self._endpoint = EndPoint(sock, encoder)
+ if notification_callback is None:
+ # ignore notifications by default
+ self._notification_callback = lambda n: None
+ else:
+ self._notification_callback = notification_callback
+
+ def _process_input_notification(self):
+ n = self._endpoint.get_notification()
+ if n:
+ self._notification_callback(n)
+
+ def _process_input_request(self):
+ # ignore requests as we are a pure client
+ # XXXwarn
+ self._endpoint.get_request()
+
+ def call(self, method, params):
+ """synchronous call.
+ send a request and wait for a response.
+ return a result. or raise RPCError exception if the peer
+ sends us an error.
+ """
+ msgid = self._endpoint.send_request(method, params)
+ while True:
+ if not self._endpoint.receive_messages():
+ raise EOFError("EOF")
+ res = self._endpoint.get_response(msgid)
+ if res:
+ result, error = res
+ if error is None:
+ return result
+ raise RPCError(error)
+ self._process_input_notification()
+ self._process_input_request()
+
+ def send_notification(self, method, params):
+ """send a notification to the peer.
+ """
+ self._endpoint.send_notification(method, params)
+
+ def receive_notification(self):
+ """wait for the next incoming message.
+ intended to be used when we have nothing to send but want to receive
+ notifications.
+ """
+ if not self._endpoint.receive_messages():
+ raise EOFError("EOF")
+ self._process_input_notification()
+ self._process_input_request()
diff --git a/setup.cfg b/setup.cfg
index 1c3a7d85..de434ed4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -34,7 +34,7 @@ source-dir = doc/source
[bdist_rpm]
Release = 1
Group = Applications/Accessories
-Requires = python-eventlet, python-routes, python-webob, python-paramiko, python-netaddr python-oslo-config
+Requires = python-eventlet, python-routes, python-webob, python-paramiko, python-netaddr python-oslo-config, python-msgpack
doc_files = LICENSE
MANIFEST.in
README.rst
diff --git a/tools/pip-requires b/tools/pip-requires
index eb0a1198..d066bfe3 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -4,3 +4,4 @@ webob>=1.0.8
paramiko
netaddr
oslo.config
+msgpack-python>=0.4.0