diff options
author | Jason Kölker <jason@koelker.net> | 2016-03-20 03:02:59 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-03-23 22:29:07 +0900 |
commit | 69744305b96c4b9a4b30979b39be4ff3aa4b3622 (patch) | |
tree | e2c31cd8ecdeb37928ea99cdbe08d727c3aa80f7 | |
parent | 8d2604ee67f25528f6e0c167b1b73955b164e992 (diff) |
protocols/ovsdb: Improve non-blocking performance
Prevent `client.discover_schemas` from blocking while calling into the
`ovs` library by emulating `jsonrpc.Connection.transact_block` to not
block and explicitly allow greenlet switching every loop.
Works with both the embeded ryu.contrib.ovs and upstream ovs python
packages.
Signed-off-by: Jason Kölker <jason@koelker.net>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/services/protocols/ovsdb/client.py | 48 |
1 files changed, 44 insertions, 4 deletions
diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py index 5d4b21ff..4a67dfd1 100644 --- a/ryu/services/protocols/ovsdb/client.py +++ b/ryu/services/protocols/ovsdb/client.py @@ -14,6 +14,7 @@ # limitations under the License. import collections +import errno import logging import uuid @@ -33,6 +34,7 @@ vlog.Vlog = Vlog from ovs import jsonrpc +from ovs import poller from ovs import reconnect from ovs import stream from ovs import timeval @@ -63,8 +65,46 @@ def dictify(row): if row is None: return - return dict([(k, v.to_python(_uuid_to_row)) - for k, v in row._data.items()]) + result = {} + + for key, value in row._data.items(): + result[key] = value.to_python(_uuid_to_row) + hub.sleep(0) + + return result + + +def transact_block(request, connection): + """Emulate jsonrpc.Connection.transact_block without blocking eventlet. + """ + error = connection.send(request) + reply = None + + if error: + return error, reply + + ovs_poller = poller.Poller() + while not error: + ovs_poller.immediate_wake() + error, reply = connection.recv() + + if error != errno.EAGAIN: + break + + if (reply and + reply.id == request.id and + reply.type in (jsonrpc.Message.T_REPLY, + jsonrpc.Message.T_ERROR)): + break + + connection.run() + connection.wait(poller) + connection.recv_wait(poller) + poller.block() + + hub.sleep(0) + + return error, reply def discover_schemas(connection): @@ -72,7 +112,7 @@ def discover_schemas(connection): # is supported. # TODO(jkoelker) support arbitrary schemas req = jsonrpc.Message.create_request('list_dbs', []) - error, reply = connection.transact_block(req) + error, reply = transact_block(req, connection) if error or reply.error: return @@ -83,7 +123,7 @@ def discover_schemas(connection): continue req = jsonrpc.Message.create_request('get_schema', [db]) - error, reply = connection.transact_block(req) + error, reply = transact_block(req, connection) if error or reply.error: # TODO(jkoelker) Error handling |