summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/tests/unit/lib/test_rpc.py369
1 files changed, 369 insertions, 0 deletions
diff --git a/ryu/tests/unit/lib/test_rpc.py b/ryu/tests/unit/lib/test_rpc.py
new file mode 100644
index 00000000..735e049c
--- /dev/null
+++ b/ryu/tests/unit/lib/test_rpc.py
@@ -0,0 +1,369 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import time
+import unittest
+from nose.tools import raises
+
+from ryu.lib import hub
+hub.patch()
+from ryu.lib import rpc
+
+
+class MyException(BaseException):
+ pass
+
+
+class Test_rpc(unittest.TestCase):
+ """ Test case for ryu.lib.rpc
+ """
+
+ def _handle_request(self, m):
+ e = self._server_endpoint
+ msgid, method, params = m
+ if method == "resp":
+ e.send_response(msgid, result=params[0])
+ elif method == "err":
+ e.send_response(msgid, error=params[0])
+ elif method == "callback":
+ n, cb, v = params
+ assert n > 0
+ self._requests.add(e.send_request(cb, [msgid, n, cb, v]))
+ elif method == "notify1":
+ e.send_notification(params[1], params[2])
+ e.send_response(msgid, result=params[0])
+ elif method == "shutdown":
+ import socket
+ how = getattr(socket, params[0])
+ self._server_sock.shutdown(how)
+ e.send_response(msgid, result=method)
+ else:
+ raise Exception("unknown method %s" % method)
+
+ def _handle_notification(self, m):
+ e = self._server_endpoint
+ method, params = m
+ if method == "notify2":
+ e.send_notification(params[0], params[1])
+
+ def _handle_response(self, m):
+ e = self._server_endpoint
+ msgid, error, result = m
+ assert error is None
+ self._requests.remove(msgid)
+ omsgid, n, cb, v = result
+ assert n >= 0
+ if n == 0:
+ e.send_response(omsgid, result=v)
+ else:
+ self._requests.add(e.send_request(cb, [omsgid, n, cb, v]))
+
+ def setUp(self):
+ import socket
+
+ self._server_sock, self._client_sock = socket.socketpair()
+ table = {
+ rpc.MessageType.REQUEST: self._handle_request,
+ rpc.MessageType.RESPONSE: self._handle_response,
+ rpc.MessageType.NOTIFY: self._handle_notification
+ }
+ self._requests = set()
+ self._server_sock.setblocking(0)
+ self._server_endpoint = rpc.EndPoint(self._server_sock,
+ disp_table=table)
+ self._server_thread = hub.spawn(self._server_endpoint.serve)
+
+ def tearDown(self):
+ hub.kill(self._server_thread)
+ hub.joinall([self._server_thread])
+
+ def test_0_call_str(self):
+ c = rpc.Client(self._client_sock)
+ obj = "hoge"
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, bytes)
+
+ def test_0_call_int(self):
+ c = rpc.Client(self._client_sock)
+ obj = 12345
+ assert isinstance(obj, int)
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, type(obj))
+
+ def test_0_call_int2(self):
+ c = rpc.Client(self._client_sock)
+ obj = sys.maxint
+ assert isinstance(obj, int)
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, type(obj))
+
+ def test_0_call_int3(self):
+ c = rpc.Client(self._client_sock)
+ obj = - sys.maxint - 1
+ assert isinstance(obj, int)
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, type(obj))
+
+ def test_0_call_long(self):
+ c = rpc.Client(self._client_sock)
+ obj = 0xffffffffffffffff # max value for msgpack
+ assert isinstance(obj, long)
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, type(obj))
+
+ def test_0_call_long2(self):
+ c = rpc.Client(self._client_sock)
+ # NOTE: the python type of this value is int for 64-bit arch
+ obj = -0x8000000000000000 # min value for msgpack
+ assert isinstance(obj, (int, long))
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, type(obj))
+
+ @raises(TypeError)
+ def test_0_call_bytearray(self):
+ c = rpc.Client(self._client_sock)
+ obj = bytearray("foo")
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, bytes)
+
+ def test_1_shutdown_wr(self):
+ # test if the server shutdown on disconnect
+ import socket
+ self._client_sock.shutdown(socket.SHUT_WR)
+ hub.joinall([self._server_thread])
+
+ @raises(EOFError)
+ def test_1_client_shutdown_wr(self):
+ c = rpc.Client(self._client_sock)
+ c.call("shutdown", ["SHUT_WR"])
+
+ def test_1_call_True(self):
+ c = rpc.Client(self._client_sock)
+ obj = True
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_None(self):
+ c = rpc.Client(self._client_sock)
+ obj = None
+ assert c.call("resp", [obj]) is None
+
+ def test_2_call_False(self):
+ c = rpc.Client(self._client_sock)
+ obj = False
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_dict(self):
+ c = rpc.Client(self._client_sock)
+ obj = {"hoge": 1, "fuga": 2}
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_empty_dict(self):
+ c = rpc.Client(self._client_sock)
+ obj = {}
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_array(self):
+ c = rpc.Client(self._client_sock)
+ obj = [1, 2, 3, 4]
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_empty_array(self):
+ c = rpc.Client(self._client_sock)
+ obj = []
+ assert c.call("resp", [obj]) == obj
+
+ def test_2_call_tuple(self):
+ c = rpc.Client(self._client_sock)
+ # note: msgpack library implicitly convert a tuple into a list
+ obj = (1, 2, 3)
+ assert c.call("resp", [obj]) == list(obj)
+
+ @raises(TypeError)
+ def test_2_call_unicode(self):
+ c = rpc.Client(self._client_sock)
+ # note: on-wire msgpack has no notion of encoding.
+ # the msgpack library implicitly converts unicode to
+ # utf-8 encoded bytes by default.
+ # we don't want to rely on the behaviour though because
+ # it seems to be going to change.
+ # https://gist.github.com/methane/5022403
+ obj = u"hoge"
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, bytes)
+
+ def test_2_call_small_binary(self):
+ import struct
+ c = rpc.Client(self._client_sock)
+ obj = struct.pack("100x")
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, bytes)
+
+ def test_3_call_complex(self):
+ c = rpc.Client(self._client_sock)
+ obj = [1, "hoge", {"foo": 1, 3: "bar"}]
+ assert c.call("resp", [obj]) == list(obj)
+
+ def test_4_call_large_binary(self):
+ import struct
+
+ c = rpc.Client(self._client_sock)
+ obj = struct.pack("10000000x")
+ result = c.call("resp", [obj])
+ assert result == obj
+ assert isinstance(result, bytes)
+
+ def test_0_notification1(self):
+ l = []
+
+ def callback(n):
+ l.append(n)
+ c = rpc.Client(self._client_sock, notification_callback=callback)
+ obj = "hogehoge"
+ robj = "fugafuga"
+ assert c.call("notify1", [robj, "notify_hoge", [obj]]) == robj
+ c.receive_notification()
+ assert len(l) == 1
+ n = l.pop(0)
+ assert not n is None
+ method, params = n
+ assert method == "notify_hoge"
+ assert params[0] == obj
+
+ def test_0_notification2(self):
+ l = []
+
+ def callback(n):
+ l.append(n)
+ c = rpc.Client(self._client_sock, notification_callback=callback)
+ obj = "hogehogehoge"
+ c.send_notification("notify2", ["notify_hoge", [obj]])
+ c.receive_notification()
+ assert len(l) == 1
+ n = l.pop(0)
+ assert not n is None
+ method, params = n
+ assert method == "notify_hoge"
+ assert params[0] == obj
+
+ def test_0_call_error(self):
+ c = rpc.Client(self._client_sock)
+ obj = "hoge"
+ try:
+ c.call("err", [obj])
+ raise Exception("unexpected")
+ except rpc.RPCError, e:
+ assert e.get_value() == obj
+
+ def test_0_call_error_notification(self):
+ l = []
+
+ def callback(n):
+ l.append(n)
+ c = rpc.Client(self._client_sock, notification_callback=callback)
+ c.send_notification("notify2", ["notify_foo", []])
+ hub.sleep(0.5) # give the peer a chance to run
+ obj = "hoge"
+ try:
+ c.call("err", [obj])
+ raise Exception("unexpected")
+ except rpc.RPCError, e:
+ assert e.get_value() == obj
+ assert len(l) == 1
+ n = l.pop(0)
+ method, params = n
+ assert method == "notify_foo"
+ assert params == []
+
+ def test_4_async_call(self):
+ """send a bunch of requests and then wait for responses
+ """
+ num_calls = 9999
+ old_blocking = self._client_sock.setblocking(0)
+ try:
+ e = rpc.EndPoint(self._client_sock)
+ s = set()
+ for i in range(1, num_calls + 1):
+ s.add(e.send_request("resp", [i]))
+ sum = 0
+ while s:
+ e.block()
+ e.process()
+ done = set()
+ for x in s:
+ r = e.get_response(x)
+ if r is None:
+ continue
+ res, error = r
+ assert error is None
+ sum += res
+ done.add(x)
+ assert done.issubset(s)
+ s -= done
+ assert sum == (1 + num_calls) * num_calls / 2
+ finally:
+ self._client_sock.setblocking(old_blocking)
+
+ def test_4_async_call2(self):
+ """both sides act as rpc client and server
+ """
+ assert not self._requests
+ num_calls = 100
+ old_blocking = self._client_sock.setblocking(0)
+ try:
+ e = rpc.EndPoint(self._client_sock)
+ s = set()
+ for i in range(1, num_calls + 1):
+ s.add(e.send_request("callback", [i, "ourcallback", 0]))
+ sum = 0
+ while s:
+ e.block()
+ e.process()
+ done = set()
+ for x in s:
+ r = e.get_response(x)
+ if r is None:
+ continue
+ res, error = r
+ assert error is None
+ sum += res
+ done.add(x)
+ assert done.issubset(s)
+ s -= done
+ r = e.get_request()
+ if not r is None:
+ msgid, method, params = r
+ assert method == "ourcallback"
+ omsgid, n, cb, v = params
+ assert omsgid in s
+ assert cb == "ourcallback"
+ assert n > 0
+ e.send_response(msgid, result=[omsgid, n-1, cb, v+1])
+ assert sum == (1 + num_calls) * num_calls / 2
+ finally:
+ self._client_sock.setblocking(old_blocking)
+ assert not self._requests