summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/services/protocols/ovsdb/client.py48
1 files changed, 44 insertions, 4 deletions
diff --git a/ryu/services/protocols/ovsdb/client.py b/ryu/services/protocols/ovsdb/client.py
index 5d4b21ff..4a67dfd1 100644
--- a/ryu/services/protocols/ovsdb/client.py
+++ b/ryu/services/protocols/ovsdb/client.py
@@ -14,6 +14,7 @@
# limitations under the License.
import collections
+import errno
import logging
import uuid
@@ -33,6 +34,7 @@ vlog.Vlog = Vlog
from ovs import jsonrpc
+from ovs import poller
from ovs import reconnect
from ovs import stream
from ovs import timeval
@@ -63,8 +65,46 @@ def dictify(row):
if row is None:
return
- return dict([(k, v.to_python(_uuid_to_row))
- for k, v in row._data.items()])
+ result = {}
+
+ for key, value in row._data.items():
+ result[key] = value.to_python(_uuid_to_row)
+ hub.sleep(0)
+
+ return result
+
+
+def transact_block(request, connection):
+ """Emulate jsonrpc.Connection.transact_block without blocking eventlet.
+ """
+ error = connection.send(request)
+ reply = None
+
+ if error:
+ return error, reply
+
+ ovs_poller = poller.Poller()
+ while not error:
+ ovs_poller.immediate_wake()
+ error, reply = connection.recv()
+
+ if error != errno.EAGAIN:
+ break
+
+ if (reply and
+ reply.id == request.id and
+ reply.type in (jsonrpc.Message.T_REPLY,
+ jsonrpc.Message.T_ERROR)):
+ break
+
+ connection.run()
+ connection.wait(poller)
+ connection.recv_wait(poller)
+ poller.block()
+
+ hub.sleep(0)
+
+ return error, reply
def discover_schemas(connection):
@@ -72,7 +112,7 @@ def discover_schemas(connection):
# is supported.
# TODO(jkoelker) support arbitrary schemas
req = jsonrpc.Message.create_request('list_dbs', [])
- error, reply = connection.transact_block(req)
+ error, reply = transact_block(req, connection)
if error or reply.error:
return
@@ -83,7 +123,7 @@ def discover_schemas(connection):
continue
req = jsonrpc.Message.create_request('get_schema', [db])
- error, reply = connection.transact_block(req)
+ error, reply = transact_block(req, connection)
if error or reply.error:
# TODO(jkoelker) Error handling