summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJason Kölker <jason@koelker.net>2016-03-20 03:02:59 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-03-23 22:29:07 +0900
commit69744305b96c4b9a4b30979b39be4ff3aa4b3622 (patch)
treee2c31cd8ecdeb37928ea99cdbe08d727c3aa80f7
parent8d2604ee67f25528f6e0c167b1b73955b164e992 (diff)
protocols/ovsdb: Improve non-blocking performance
Prevent `client.discover_schemas` from blocking while calling into the `ovs` library by emulating `jsonrpc.Connection.transact_block` to not block and explicitly allow greenlet switching every loop. Works with both the embeded ryu.contrib.ovs and upstream ovs python packages. Signed-off-by: Jason Kölker <jason@koelker.net> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/services/protocols/ovsdb/client.py48
1 files changed, 44 insertions, 4 deletions
diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py
index 5d4b21ff..4a67dfd1 100644
--- a/ryu/services/protocols/ovsdb/client.py
+++ b/ryu/services/protocols/ovsdb/client.py
@@ -14,6 +14,7 @@
# limitations under the License.
import collections
+import errno
import logging
import uuid
@@ -33,6 +34,7 @@ vlog.Vlog = Vlog
from ovs import jsonrpc
+from ovs import poller
from ovs import reconnect
from ovs import stream
from ovs import timeval
@@ -63,8 +65,46 @@ def dictify(row):
if row is None:
return
- return dict([(k, v.to_python(_uuid_to_row))
- for k, v in row._data.items()])
+ result = {}
+
+ for key, value in row._data.items():
+ result[key] = value.to_python(_uuid_to_row)
+ hub.sleep(0)
+
+ return result
+
+
+def transact_block(request, connection):
+ """Emulate jsonrpc.Connection.transact_block without blocking eventlet.
+ """
+ error = connection.send(request)
+ reply = None
+
+ if error:
+ return error, reply
+
+ ovs_poller = poller.Poller()
+ while not error:
+ ovs_poller.immediate_wake()
+ error, reply = connection.recv()
+
+ if error != errno.EAGAIN:
+ break
+
+ if (reply and
+ reply.id == request.id and
+ reply.type in (jsonrpc.Message.T_REPLY,
+ jsonrpc.Message.T_ERROR)):
+ break
+
+ connection.run()
+ connection.wait(poller)
+ connection.recv_wait(poller)
+ poller.block()
+
+ hub.sleep(0)
+
+ return error, reply
def discover_schemas(connection):
@@ -72,7 +112,7 @@ def discover_schemas(connection):
# is supported.
# TODO(jkoelker) support arbitrary schemas
req = jsonrpc.Message.create_request('list_dbs', [])
- error, reply = connection.transact_block(req)
+ error, reply = transact_block(req, connection)
if error or reply.error:
return
@@ -83,7 +123,7 @@ def discover_schemas(connection):
continue
req = jsonrpc.Message.create_request('get_schema', [db])
- error, reply = connection.transact_block(req)
+ error, reply = transact_block(req, connection)
if error or reply.error:
# TODO(jkoelker) Error handling