summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xbin/ryu-manager11
-rw-r--r--ryu/app/quantum_adapter.py4
-rw-r--r--ryu/app/simple_vlan.py4
-rw-r--r--ryu/app/tunnel_port_updater.py10
-rw-r--r--ryu/app/wsgi.py4
-rw-r--r--ryu/base/app_manager.py10
-rw-r--r--ryu/controller/controller.py15
-rw-r--r--ryu/lib/hub.py8
-rw-r--r--ryu/lib/ofctl_v1_0.py8
-rw-r--r--ryu/lib/ofctl_v1_2.py8
-rw-r--r--ryu/lib/ovs/vsctl.py4
-rw-r--r--ryu/tests/integrated/test_of_config.py4
-rw-r--r--ryu/tests/unit/lib/test_hub.py10
-rw-r--r--ryu/topology/dumper.py43
-rw-r--r--ryu/topology/switches.py14
-rw-r--r--setup.cfg2
-rw-r--r--tools/pip-requires2
17 files changed, 77 insertions, 84 deletions
diff --git a/bin/ryu-manager b/bin/ryu-manager
index d9cc8059..62188dea 100755
--- a/bin/ryu-manager
+++ b/bin/ryu-manager
@@ -16,9 +16,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import gevent
-from gevent import monkey
-monkey.patch_all()
+from ryu.lib import hub
+hub.patch()
# TODO:
# Right now, we have our own patched copy of ovs python bindings
@@ -69,16 +68,16 @@ def main():
services = []
ctlr = controller.OpenFlowController()
- thr = gevent.spawn_later(0, ctlr)
+ thr = hub.spawn(ctlr)
services.append(thr)
webapp = wsgi.start_service(app_mgr)
if webapp:
- thr = gevent.spawn_later(0, webapp)
+ thr = hub.spawn(webapp)
services.append(thr)
try:
- gevent.joinall(services)
+ hub.joinall(services)
finally:
app_mgr.close()
diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py
index 46cdd9d7..56f09701 100644
--- a/ryu/app/quantum_adapter.py
+++ b/ryu/app/quantum_adapter.py
@@ -36,10 +36,6 @@ from ryu.lib.ovs import bridge
from ryu.lib.quantum_ifaces import QuantumIfaces
-from gevent import monkey
-monkey.patch_all()
-
-
CONF = cfg.CONF
diff --git a/ryu/app/simple_vlan.py b/ryu/app/simple_vlan.py
index 21b2ddff..373649c9 100644
--- a/ryu/app/simple_vlan.py
+++ b/ryu/app/simple_vlan.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import gevent
import logging
from ryu.app import (conf_switch_key,
@@ -27,6 +26,7 @@ from ryu.controller import (conf_switch,
tunnels)
import ryu.exception as ryu_exc
from ryu.lib import dpid as dpid_lib
+from ryu.lib import hub
from ryu.lib.ovs import bridge
from ryu.ofproto import nx_match
@@ -131,7 +131,7 @@ class SimpleVLAN(app_manager.RyuApp):
port_name = port.name.rstrip('\x00')
try:
ovs_br.set_db_attribute("Port", port_name, "tag", tunnel_key)
- except gevent.Timeout:
+ except hub.Timeout:
self.logger.error('timeout')
return
diff --git a/ryu/app/tunnel_port_updater.py b/ryu/app/tunnel_port_updater.py
index 18ec0799..5740d395 100644
--- a/ryu/app/tunnel_port_updater.py
+++ b/ryu/app/tunnel_port_updater.py
@@ -15,7 +15,6 @@
# limitations under the License.
import collections
-import gevent
from oslo.config import cfg
import logging
import netaddr
@@ -29,6 +28,7 @@ from ryu.controller import (conf_switch,
network,
tunnels)
from ryu.lib import dpid as dpid_lib
+from ryu.lib import hub
from ryu.lib.ovs import bridge as ovs_bridge
@@ -120,8 +120,8 @@ class TunnelDP(object):
self.conf_switch = conf_switch_
self.inited = False
- self.req_q = gevent.queue.Queue()
- self.thr = gevent.spawn_later(0, self._serve_loop)
+ self.req_q = hub.Queue()
+ self.thr = hub.spawn(self._serve_loop)
def _init(self):
self.ovs_bridge.init()
@@ -289,7 +289,7 @@ class TunnelDP(object):
if not self.inited:
try:
self._init()
- except gevent.timeout.Timeout:
+ except hub.Timeout:
self.logger.warn('_init timeouted')
req = None
@@ -315,7 +315,7 @@ class TunnelDP(object):
self._del_tunnel_port_ip(req.remote_ip)
else:
self.logger.error('unknown request %s', req)
- except gevent.timeout.Timeout:
+ except hub.Timeout:
# timeout. try again
self.logger.warn('timeout try again')
continue
diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py
index f879b907..28bada0b 100644
--- a/ryu/app/wsgi.py
+++ b/ryu/app/wsgi.py
@@ -17,7 +17,7 @@
from oslo.config import cfg
import webob.dec
-from gevent import pywsgi
+from ryu.lib import hub
from routes import Mapper
from routes.util import URLGenerator
@@ -80,7 +80,7 @@ class WSGIApplication(object):
return controller(req)
-class WSGIServer(pywsgi.WSGIServer):
+class WSGIServer(hub.WSGIServer):
def __init__(self, application, **config):
super(WSGIServer, self).__init__((CONF.wsapi_host, CONF.wsapi_port),
application, **config)
diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py
index 53419cfc..5d241cea 100644
--- a/ryu/base/app_manager.py
+++ b/ryu/base/app_manager.py
@@ -17,14 +17,12 @@
import inspect
import itertools
import logging
-import gevent
-
-from gevent.queue import Queue
from ryu import utils
from ryu.controller.handler import register_instance
from ryu.controller.controller import Datapath
from ryu.controller.event import EventRequestBase, EventReplyBase
+from ryu.lib import hub
LOG = logging.getLogger('ryu.base.app_manager')
@@ -62,10 +60,10 @@ class RyuApp(object):
self.event_handlers = {}
self.observers = {}
self.threads = []
- self.events = Queue()
- self.replies = Queue()
+ self.events = hub.Queue()
+ self.replies = hub.Queue()
self.logger = logging.getLogger(self.name)
- self.threads.append(gevent.spawn(self._event_loop))
+ self.threads.append(hub.spawn(self._event_loop))
def register_handler(self, ev_cls, handler):
assert callable(handler)
diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py
index c875764a..4d0ccdff 100644
--- a/ryu/controller/controller.py
+++ b/ryu/controller/controller.py
@@ -17,13 +17,12 @@
import contextlib
from oslo.config import cfg
import logging
-import gevent
+from ryu.lib import hub
+from ryu.lib.hub import StreamServer
import traceback
import random
import greenlet
import ssl
-from gevent.server import StreamServer
-from gevent.queue import Queue
import ryu.base.app_manager
@@ -124,7 +123,7 @@ class Datapath(object):
# The limit is arbitrary. We need to limit queue size to
# prevent it from eating memory up
- self.send_q = Queue(16)
+ self.send_q = hub.Queue(16)
self.set_version(max(self.supported_ofp_version))
self.xid = random.randint(0, self.ofproto.MAX_XID)
@@ -188,7 +187,7 @@ class Datapath(object):
count += 1
if count > 2048:
count = 0
- gevent.sleep(0)
+ hub.sleep(0)
@_deactivate
def _send_loop(self):
@@ -218,7 +217,7 @@ class Datapath(object):
self.send(msg.buf)
def serve(self):
- send_thr = gevent.spawn(self._send_loop)
+ send_thr = hub.spawn(self._send_loop)
# send hello message immediately
hello = self.ofproto_parser.OFPHello(self)
@@ -227,8 +226,8 @@ class Datapath(object):
try:
self._recv_loop()
finally:
- gevent.kill(send_thr)
- gevent.joinall([send_thr])
+ hub.kill(send_thr)
+ hub.joinall([send_thr])
#
# Utility methods for convenience
diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py
index 02dae4d8..ea935b7b 100644
--- a/ryu/lib/hub.py
+++ b/ryu/lib/hub.py
@@ -114,5 +114,9 @@ if HUB_TYPE == 'eventlet':
def wait(self, timeout=None):
if timeout is None:
self._wait()
- with Timeout(timeout):
- self._wait()
+ else:
+ try:
+ with Timeout(timeout):
+ self._wait()
+ except Timeout:
+ pass
diff --git a/ryu/lib/ofctl_v1_0.py b/ryu/lib/ofctl_v1_0.py
index ad474705..58e27500 100644
--- a/ryu/lib/ofctl_v1_0.py
+++ b/ryu/lib/ofctl_v1_0.py
@@ -16,9 +16,9 @@
import struct
import socket
import logging
-import gevent
from ryu.ofproto import ofproto_v1_0
+from ryu.lib import hub
from ryu.lib.mac import haddr_to_bin, haddr_to_str
@@ -195,13 +195,13 @@ def nw_dst_to_str(wildcards, addr):
def send_stats_request(dp, stats, waiters, msgs):
dp.set_xid(stats)
waiters_per_dp = waiters.setdefault(dp.id, {})
- lock = gevent.event.AsyncResult()
+ lock = hub.Event()
waiters_per_dp[stats.xid] = (lock, msgs)
dp.send_msg(stats)
try:
- lock.get(timeout=DEFAULT_TIMEOUT)
- except gevent.Timeout:
+ lock.wait(timeout=DEFAULT_TIMEOUT)
+ except hub.Timeout:
del waiters_per_dp[stats.xid]
diff --git a/ryu/lib/ofctl_v1_2.py b/ryu/lib/ofctl_v1_2.py
index 2a2a7c05..0789bf68 100644
--- a/ryu/lib/ofctl_v1_2.py
+++ b/ryu/lib/ofctl_v1_2.py
@@ -16,11 +16,11 @@
import struct
import socket
import logging
-import gevent
from ryu.ofproto import inet
from ryu.ofproto import ofproto_v1_2
from ryu.ofproto import ofproto_v1_2_parser
+from ryu.lib import hub
from ryu.lib import mac
@@ -187,13 +187,13 @@ def match_ip_to_str(value, mask):
def send_stats_request(dp, stats, waiters, msgs):
dp.set_xid(stats)
waiters_per_dp = waiters.setdefault(dp.id, {})
- lock = gevent.event.AsyncResult()
+ lock = hub.Event()
waiters_per_dp[stats.xid] = (lock, msgs)
dp.send_msg(stats)
try:
- lock.get(timeout=DEFAULT_TIMEOUT)
- except gevent.Timeout:
+ lock.wait(timeout=DEFAULT_TIMEOUT)
+ except hub.Timeout:
del waiters_per_dp[stats.xid]
diff --git a/ryu/lib/ovs/vsctl.py b/ryu/lib/ovs/vsctl.py
index 8cada742..420c48c9 100644
--- a/ryu/lib/ovs/vsctl.py
+++ b/ryu/lib/ovs/vsctl.py
@@ -15,7 +15,6 @@
# limitations under the License.
-import gevent
import itertools
import logging
import operator
@@ -31,6 +30,7 @@ from ovs import (jsonrpc,
stream)
from ovs.db import idl
+from ryu.lib import hub
from ryu.lib.ovs import vswitch_idl
LOG = logging.getLogger(__name__) # use ovs.vlog?
@@ -1002,7 +1002,7 @@ class VSCtl(object):
if timeout_sec is None:
self._run_command(commands)
else:
- with gevent.Timeout(timeout_sec, exception):
+ with hub.Timeout(timeout_sec, exception):
self._run_command(commands)
# commands
diff --git a/ryu/tests/integrated/test_of_config.py b/ryu/tests/integrated/test_of_config.py
index a7d5815f..0f544639 100644
--- a/ryu/tests/integrated/test_of_config.py
+++ b/ryu/tests/integrated/test_of_config.py
@@ -110,7 +110,6 @@ Here is my sys.config used for this test.
"""
-import gevent
import traceback
import lxml.etree
@@ -118,6 +117,7 @@ import ncclient
from ryu.base import app_manager
from ryu.lib.netconf import constants as nc_consts
+from ryu.lib import hub
from ryu.lib import of_config
from ryu.lib.of_config import capable_switch
from ryu.lib.of_config import constants as ofc_consts
@@ -218,7 +218,7 @@ class OFConfigClient(app_manager.RyuApp):
self.switch = capable_switch.OFCapableSwitch(
host=HOST, port=PORT, username=USERNAME, password=PASSWORD,
unknown_host_cb=lambda host, fingeprint: True)
- gevent.spawn(self._do_of_config)
+ hub.spawn(self._do_of_config)
def _validate(self, tree):
xmlschema = _get_schema()
diff --git a/ryu/tests/unit/lib/test_hub.py b/ryu/tests/unit/lib/test_hub.py
index 1b8166c3..4c558022 100644
--- a/ryu/tests/unit/lib/test_hub.py
+++ b/ryu/tests/unit/lib/test_hub.py
@@ -79,13 +79,11 @@ class Test_hub(unittest.TestCase):
ev = hub.Event()
result = []
with hub.Timeout(2):
- hub.spawn(_child, ev, result)
- try:
- ev.wait(timeout=0.5)
- raise BaseException("should timed out")
- except hub.Timeout:
- pass
+ t = hub.spawn(_child, ev, result)
+ ev.wait(timeout=0.5)
assert len(result) == 0
+ ev.wait()
+ assert len(result) == 1
def test_spawn_event3(self):
def _child(ev, ev2, result):
diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py
index 264871f0..132ac65a 100644
--- a/ryu/topology/dumper.py
+++ b/ryu/topology/dumper.py
@@ -15,12 +15,11 @@
import logging
-import gevent
-import gevent.queue
import time
from ryu.base import app_manager
from ryu.controller import handler
+from ryu.lib import hub
from ryu.topology import event
from ryu.topology import switches
@@ -39,14 +38,14 @@ class DiscoveryEventDumper(app_manager.RyuApp):
# For testing when sync and async request.
# self.threads.append(
-# gevent.spawn_later(0, self._switch_request_sync, 5))
+# hub.spawn(self._switch_request_sync, 5))
# self.threads.append(
-# gevent.spawn_later(0, self._switch_request_async, 10))
+# hub.spawn(self._switch_request_async, 10))
#
# self.threads.append(
-# gevent.spawn_later(0, self._link_request_sync, 5))
+# hub.spawn(self._link_request_sync, 5))
# self.threads.append(
-# gevent.spawn_later(0, self._link_request_async, 10))
+# hub.spawn(self._link_request_async, 10))
self.is_active = True
@@ -82,19 +81,19 @@ class DiscoveryEventDumper(app_manager.RyuApp):
while self.is_active:
request = event.EventSwitchRequest()
LOG.debug('switch_request sync %s thread(%s)',
- request, id(gevent.getcurrent()))
+ request, id(hub.getcurrent()))
reply = self.send_request(request)
LOG.debug('switch_reply sync %s', reply)
if len(reply.switches) > 0:
for sw in reply.switches:
LOG.debug(' %s', sw)
- gevent.sleep(interval)
+ hub.sleep(interval)
def _switch_request_async(self, interval):
while self.is_active:
request = event.EventSwitchRequest()
LOG.debug('switch_request async %s thread(%s)',
- request, id(gevent.getcurrent()))
+ request, id(hub.getcurrent()))
self.send_event(request.dst, request)
start = time.time()
@@ -104,16 +103,16 @@ class DiscoveryEventDumper(app_manager.RyuApp):
if time.time() > start + i:
i += 1
LOG.debug(' thread is busy... %s/%s thread(%s)',
- i, busy, id(gevent.getcurrent()))
+ i, busy, id(hub.getcurrent()))
LOG.debug(' thread yield to switch_reply handler. thread(%s)',
- id(gevent.getcurrent()))
+ id(hub.getcurrent()))
# yield
- gevent.sleep(0)
+ hub.sleep(0)
LOG.debug(' thread get back. thread(%s)',
- id(gevent.getcurrent()))
- gevent.sleep(interval - busy)
+ id(hub.getcurrent()))
+ hub.sleep(interval - busy)
@handler.set_ev_cls(event.EventSwitchReply)
def switch_reply_handler(self, reply):
@@ -126,19 +125,19 @@ class DiscoveryEventDumper(app_manager.RyuApp):
while self.is_active:
request = event.EventLinkRequest()
LOG.debug('link_request sync %s thread(%s)',
- request, id(gevent.getcurrent()))
+ request, id(hub.getcurrent()))
reply = self.send_request(request)
LOG.debug('link_reply sync %s', reply)
if len(reply.links) > 0:
for link in reply.links:
LOG.debug(' %s', link)
- gevent.sleep(interval)
+ hub.sleep(interval)
def _link_request_async(self, interval):
while self.is_active:
request = event.EventLinkRequest()
LOG.debug('link_request async %s thread(%s)',
- request, id(gevent.getcurrent()))
+ request, id(hub.getcurrent()))
self.send_event(request.dst, request)
start = time.time()
@@ -148,16 +147,16 @@ class DiscoveryEventDumper(app_manager.RyuApp):
if time.time() > start + i:
i += 1
LOG.debug(' thread is busy... %s/%s thread(%s)',
- i, busy, id(gevent.getcurrent()))
+ i, busy, id(hub.getcurrent()))
LOG.debug(' thread yield to link_reply handler. thread(%s)',
- id(gevent.getcurrent()))
+ id(hub.getcurrent()))
# yield
- gevent.sleep(0)
+ hub.sleep(0)
LOG.debug(' thread get back. thread(%s)',
- id(gevent.getcurrent()))
- gevent.sleep(interval - busy)
+ id(hub.getcurrent()))
+ hub.sleep(interval - busy)
@handler.set_ev_cls(event.EventLinkReply)
def link_reply_handler(self, reply):
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
index b5584dca..0995dd63 100644
--- a/ryu/topology/switches.py
+++ b/ryu/topology/switches.py
@@ -14,7 +14,6 @@
# limitations under the License.
import logging
-import gevent
import struct
import time
import json
@@ -26,6 +25,7 @@ from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.exception import RyuException
+from ryu.lib import hub
from ryu.lib.mac import DONTCARE, haddr_to_str
from ryu.lib.dpid import dpid_to_str, str_to_dpid
from ryu.lib.port_no import port_no_to_str
@@ -453,17 +453,17 @@ class Switches(app_manager.RyuApp):
if self.link_discovery:
self.install_flow = CONF.install_lldp_flow
self.explicit_drop = CONF.explicit_drop
- self.lldp_event = gevent.event.Event()
- self.link_event = gevent.event.Event()
- self.threads.append(gevent.spawn_later(0, self.lldp_loop))
- self.threads.append(gevent.spawn_later(0, self.link_loop))
+ self.lldp_event = hub.Event()
+ self.link_event = hub.Event()
+ self.threads.append(hub.spawn(self.lldp_loop))
+ self.threads.append(hub.spawn(self.link_loop))
def close(self):
self.is_active = False
if self.link_discovery:
self.lldp_event.set()
self.link_event.set()
- gevent.joinall(self.threads)
+ hub.joinall(self.threads)
def _register(self, dp):
assert dp.id is not None
@@ -745,7 +745,7 @@ class Switches(app_manager.RyuApp):
self.send_lldp_packet(port)
for port in ports:
self.send_lldp_packet(port)
- gevent.sleep(self.LLDP_SEND_GUARD) # don't burst
+ hub.sleep(self.LLDP_SEND_GUARD) # don't burst
if timeout is not None and ports:
timeout = 0 # We have already slept
diff --git a/setup.cfg b/setup.cfg
index 44f55c74..41d11794 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,7 +6,7 @@ source-dir = doc/source
[bdist_rpm]
Release = 1
Group = Applications/Accessories
-Requires = python-gevent >= 0.13, python-routes, python-webob, python-paramiko
+Requires = python-eventlet, python-routes, python-webob, python-paramiko
doc_files = LICENSE
MANIFEST.in
README.rst
diff --git a/tools/pip-requires b/tools/pip-requires
index 2a9b4ce4..cf7f06b8 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -1,4 +1,4 @@
-gevent>=0.13
+eventlet
routes
webob>=1.0.8
paramiko