From 69744305b96c4b9a4b30979b39be4ff3aa4b3622 Mon Sep 17 00:00:00 2001 From: Jason Kölker Date: Sun, 20 Mar 2016 03:02:59 +0000 Subject: protocols/ovsdb: Improve non-blocking performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prevent `client.discover_schemas` from blocking while calling into the `ovs` library by emulating `jsonrpc.Connection.transact_block` to not block and explicitly allow greenlet switching every loop. Works with both the embeded ryu.contrib.ovs and upstream ovs python packages. Signed-off-by: Jason Kölker Signed-off-by: FUJITA Tomonori --- ryu/services/protocols/ovsdb/client.py | 48 +++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py index 5d4b21ff..4a67dfd1 100644 --- a/ryu/services/protocols/ovsdb/client.py +++ b/ryu/services/protocols/ovsdb/client.py @@ -14,6 +14,7 @@ # limitations under the License. import collections +import errno import logging import uuid @@ -33,6 +34,7 @@ vlog.Vlog = Vlog from ovs import jsonrpc +from ovs import poller from ovs import reconnect from ovs import stream from ovs import timeval @@ -63,8 +65,46 @@ def dictify(row): if row is None: return - return dict([(k, v.to_python(_uuid_to_row)) - for k, v in row._data.items()]) + result = {} + + for key, value in row._data.items(): + result[key] = value.to_python(_uuid_to_row) + hub.sleep(0) + + return result + + +def transact_block(request, connection): + """Emulate jsonrpc.Connection.transact_block without blocking eventlet. + """ + error = connection.send(request) + reply = None + + if error: + return error, reply + + ovs_poller = poller.Poller() + while not error: + ovs_poller.immediate_wake() + error, reply = connection.recv() + + if error != errno.EAGAIN: + break + + if (reply and + reply.id == request.id and + reply.type in (jsonrpc.Message.T_REPLY, + jsonrpc.Message.T_ERROR)): + break + + connection.run() + connection.wait(poller) + connection.recv_wait(poller) + poller.block() + + hub.sleep(0) + + return error, reply def discover_schemas(connection): @@ -72,7 +112,7 @@ def discover_schemas(connection): # is supported. # TODO(jkoelker) support arbitrary schemas req = jsonrpc.Message.create_request('list_dbs', []) - error, reply = connection.transact_block(req) + error, reply = transact_block(req, connection) if error or reply.error: return @@ -83,7 +123,7 @@ def discover_schemas(connection): continue req = jsonrpc.Message.create_request('get_schema', [db]) - error, reply = connection.transact_block(req) + error, reply = transact_block(req, connection) if error or reply.error: # TODO(jkoelker) Error handling -- cgit v1.2.3