diff options
-rwxr-xr-x | bin/ryu-manager | 11 | ||||
-rw-r--r-- | ryu/app/quantum_adapter.py | 4 | ||||
-rw-r--r-- | ryu/app/simple_vlan.py | 4 | ||||
-rw-r--r-- | ryu/app/tunnel_port_updater.py | 10 | ||||
-rw-r--r-- | ryu/app/wsgi.py | 4 | ||||
-rw-r--r-- | ryu/base/app_manager.py | 10 | ||||
-rw-r--r-- | ryu/controller/controller.py | 15 | ||||
-rw-r--r-- | ryu/lib/hub.py | 8 | ||||
-rw-r--r-- | ryu/lib/ofctl_v1_0.py | 8 | ||||
-rw-r--r-- | ryu/lib/ofctl_v1_2.py | 8 | ||||
-rw-r--r-- | ryu/lib/ovs/vsctl.py | 4 | ||||
-rw-r--r-- | ryu/tests/integrated/test_of_config.py | 4 | ||||
-rw-r--r-- | ryu/tests/unit/lib/test_hub.py | 10 | ||||
-rw-r--r-- | ryu/topology/dumper.py | 43 | ||||
-rw-r--r-- | ryu/topology/switches.py | 14 | ||||
-rw-r--r-- | setup.cfg | 2 | ||||
-rw-r--r-- | tools/pip-requires | 2 |
17 files changed, 77 insertions, 84 deletions
diff --git a/bin/ryu-manager b/bin/ryu-manager index d9cc8059..62188dea 100755 --- a/bin/ryu-manager +++ b/bin/ryu-manager @@ -16,9 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gevent -from gevent import monkey -monkey.patch_all() +from ryu.lib import hub +hub.patch() # TODO: # Right now, we have our own patched copy of ovs python bindings @@ -69,16 +68,16 @@ def main(): services = [] ctlr = controller.OpenFlowController() - thr = gevent.spawn_later(0, ctlr) + thr = hub.spawn(ctlr) services.append(thr) webapp = wsgi.start_service(app_mgr) if webapp: - thr = gevent.spawn_later(0, webapp) + thr = hub.spawn(webapp) services.append(thr) try: - gevent.joinall(services) + hub.joinall(services) finally: app_mgr.close() diff --git a/ryu/app/quantum_adapter.py b/ryu/app/quantum_adapter.py index 46cdd9d7..56f09701 100644 --- a/ryu/app/quantum_adapter.py +++ b/ryu/app/quantum_adapter.py @@ -36,10 +36,6 @@ from ryu.lib.ovs import bridge from ryu.lib.quantum_ifaces import QuantumIfaces -from gevent import monkey -monkey.patch_all() - - CONF = cfg.CONF diff --git a/ryu/app/simple_vlan.py b/ryu/app/simple_vlan.py index 21b2ddff..373649c9 100644 --- a/ryu/app/simple_vlan.py +++ b/ryu/app/simple_vlan.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gevent import logging from ryu.app import (conf_switch_key, @@ -27,6 +26,7 @@ from ryu.controller import (conf_switch, tunnels) import ryu.exception as ryu_exc from ryu.lib import dpid as dpid_lib +from ryu.lib import hub from ryu.lib.ovs import bridge from ryu.ofproto import nx_match @@ -131,7 +131,7 @@ class SimpleVLAN(app_manager.RyuApp): port_name = port.name.rstrip('\x00') try: ovs_br.set_db_attribute("Port", port_name, "tag", tunnel_key) - except gevent.Timeout: + except hub.Timeout: self.logger.error('timeout') return diff --git a/ryu/app/tunnel_port_updater.py b/ryu/app/tunnel_port_updater.py index 18ec0799..5740d395 100644 --- a/ryu/app/tunnel_port_updater.py +++ b/ryu/app/tunnel_port_updater.py @@ -15,7 +15,6 @@ # limitations under the License. import collections -import gevent from oslo.config import cfg import logging import netaddr @@ -29,6 +28,7 @@ from ryu.controller import (conf_switch, network, tunnels) from ryu.lib import dpid as dpid_lib +from ryu.lib import hub from ryu.lib.ovs import bridge as ovs_bridge @@ -120,8 +120,8 @@ class TunnelDP(object): self.conf_switch = conf_switch_ self.inited = False - self.req_q = gevent.queue.Queue() - self.thr = gevent.spawn_later(0, self._serve_loop) + self.req_q = hub.Queue() + self.thr = hub.spawn(self._serve_loop) def _init(self): self.ovs_bridge.init() @@ -289,7 +289,7 @@ class TunnelDP(object): if not self.inited: try: self._init() - except gevent.timeout.Timeout: + except hub.Timeout: self.logger.warn('_init timeouted') req = None @@ -315,7 +315,7 @@ class TunnelDP(object): self._del_tunnel_port_ip(req.remote_ip) else: self.logger.error('unknown request %s', req) - except gevent.timeout.Timeout: + except hub.Timeout: # timeout. try again self.logger.warn('timeout try again') continue diff --git a/ryu/app/wsgi.py b/ryu/app/wsgi.py index f879b907..28bada0b 100644 --- a/ryu/app/wsgi.py +++ b/ryu/app/wsgi.py @@ -17,7 +17,7 @@ from oslo.config import cfg import webob.dec -from gevent import pywsgi +from ryu.lib import hub from routes import Mapper from routes.util import URLGenerator @@ -80,7 +80,7 @@ class WSGIApplication(object): return controller(req) -class WSGIServer(pywsgi.WSGIServer): +class WSGIServer(hub.WSGIServer): def __init__(self, application, **config): super(WSGIServer, self).__init__((CONF.wsapi_host, CONF.wsapi_port), application, **config) diff --git a/ryu/base/app_manager.py b/ryu/base/app_manager.py index 53419cfc..5d241cea 100644 --- a/ryu/base/app_manager.py +++ b/ryu/base/app_manager.py @@ -17,14 +17,12 @@ import inspect import itertools import logging -import gevent - -from gevent.queue import Queue from ryu import utils from ryu.controller.handler import register_instance from ryu.controller.controller import Datapath from ryu.controller.event import EventRequestBase, EventReplyBase +from ryu.lib import hub LOG = logging.getLogger('ryu.base.app_manager') @@ -62,10 +60,10 @@ class RyuApp(object): self.event_handlers = {} self.observers = {} self.threads = [] - self.events = Queue() - self.replies = Queue() + self.events = hub.Queue() + self.replies = hub.Queue() self.logger = logging.getLogger(self.name) - self.threads.append(gevent.spawn(self._event_loop)) + self.threads.append(hub.spawn(self._event_loop)) def register_handler(self, ev_cls, handler): assert callable(handler) diff --git a/ryu/controller/controller.py b/ryu/controller/controller.py index c875764a..4d0ccdff 100644 --- a/ryu/controller/controller.py +++ b/ryu/controller/controller.py @@ -17,13 +17,12 @@ import contextlib from oslo.config import cfg import logging -import gevent +from ryu.lib import hub +from ryu.lib.hub import StreamServer import traceback import random import greenlet import ssl -from gevent.server import StreamServer -from gevent.queue import Queue import ryu.base.app_manager @@ -124,7 +123,7 @@ class Datapath(object): # The limit is arbitrary. We need to limit queue size to # prevent it from eating memory up - self.send_q = Queue(16) + self.send_q = hub.Queue(16) self.set_version(max(self.supported_ofp_version)) self.xid = random.randint(0, self.ofproto.MAX_XID) @@ -188,7 +187,7 @@ class Datapath(object): count += 1 if count > 2048: count = 0 - gevent.sleep(0) + hub.sleep(0) @_deactivate def _send_loop(self): @@ -218,7 +217,7 @@ class Datapath(object): self.send(msg.buf) def serve(self): - send_thr = gevent.spawn(self._send_loop) + send_thr = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) @@ -227,8 +226,8 @@ class Datapath(object): try: self._recv_loop() finally: - gevent.kill(send_thr) - gevent.joinall([send_thr]) + hub.kill(send_thr) + hub.joinall([send_thr]) # # Utility methods for convenience diff --git a/ryu/lib/hub.py b/ryu/lib/hub.py index 02dae4d8..ea935b7b 100644 --- a/ryu/lib/hub.py +++ b/ryu/lib/hub.py @@ -114,5 +114,9 @@ if HUB_TYPE == 'eventlet': def wait(self, timeout=None): if timeout is None: self._wait() - with Timeout(timeout): - self._wait() + else: + try: + with Timeout(timeout): + self._wait() + except Timeout: + pass diff --git a/ryu/lib/ofctl_v1_0.py b/ryu/lib/ofctl_v1_0.py index ad474705..58e27500 100644 --- a/ryu/lib/ofctl_v1_0.py +++ b/ryu/lib/ofctl_v1_0.py @@ -16,9 +16,9 @@ import struct import socket import logging -import gevent from ryu.ofproto import ofproto_v1_0 +from ryu.lib import hub from ryu.lib.mac import haddr_to_bin, haddr_to_str @@ -195,13 +195,13 @@ def nw_dst_to_str(wildcards, addr): def send_stats_request(dp, stats, waiters, msgs): dp.set_xid(stats) waiters_per_dp = waiters.setdefault(dp.id, {}) - lock = gevent.event.AsyncResult() + lock = hub.Event() waiters_per_dp[stats.xid] = (lock, msgs) dp.send_msg(stats) try: - lock.get(timeout=DEFAULT_TIMEOUT) - except gevent.Timeout: + lock.wait(timeout=DEFAULT_TIMEOUT) + except hub.Timeout: del waiters_per_dp[stats.xid] diff --git a/ryu/lib/ofctl_v1_2.py b/ryu/lib/ofctl_v1_2.py index 2a2a7c05..0789bf68 100644 --- a/ryu/lib/ofctl_v1_2.py +++ b/ryu/lib/ofctl_v1_2.py @@ -16,11 +16,11 @@ import struct import socket import logging -import gevent from ryu.ofproto import inet from ryu.ofproto import ofproto_v1_2 from ryu.ofproto import ofproto_v1_2_parser +from ryu.lib import hub from ryu.lib import mac @@ -187,13 +187,13 @@ def match_ip_to_str(value, mask): def send_stats_request(dp, stats, waiters, msgs): dp.set_xid(stats) waiters_per_dp = waiters.setdefault(dp.id, {}) - lock = gevent.event.AsyncResult() + lock = hub.Event() waiters_per_dp[stats.xid] = (lock, msgs) dp.send_msg(stats) try: - lock.get(timeout=DEFAULT_TIMEOUT) - except gevent.Timeout: + lock.wait(timeout=DEFAULT_TIMEOUT) + except hub.Timeout: del waiters_per_dp[stats.xid] diff --git a/ryu/lib/ovs/vsctl.py b/ryu/lib/ovs/vsctl.py index 8cada742..420c48c9 100644 --- a/ryu/lib/ovs/vsctl.py +++ b/ryu/lib/ovs/vsctl.py @@ -15,7 +15,6 @@ # limitations under the License. -import gevent import itertools import logging import operator @@ -31,6 +30,7 @@ from ovs import (jsonrpc, stream) from ovs.db import idl +from ryu.lib import hub from ryu.lib.ovs import vswitch_idl LOG = logging.getLogger(__name__) # use ovs.vlog? @@ -1002,7 +1002,7 @@ class VSCtl(object): if timeout_sec is None: self._run_command(commands) else: - with gevent.Timeout(timeout_sec, exception): + with hub.Timeout(timeout_sec, exception): self._run_command(commands) # commands diff --git a/ryu/tests/integrated/test_of_config.py b/ryu/tests/integrated/test_of_config.py index a7d5815f..0f544639 100644 --- a/ryu/tests/integrated/test_of_config.py +++ b/ryu/tests/integrated/test_of_config.py @@ -110,7 +110,6 @@ Here is my sys.config used for this test. """ -import gevent import traceback import lxml.etree @@ -118,6 +117,7 @@ import ncclient from ryu.base import app_manager from ryu.lib.netconf import constants as nc_consts +from ryu.lib import hub from ryu.lib import of_config from ryu.lib.of_config import capable_switch from ryu.lib.of_config import constants as ofc_consts @@ -218,7 +218,7 @@ class OFConfigClient(app_manager.RyuApp): self.switch = capable_switch.OFCapableSwitch( host=HOST, port=PORT, username=USERNAME, password=PASSWORD, unknown_host_cb=lambda host, fingeprint: True) - gevent.spawn(self._do_of_config) + hub.spawn(self._do_of_config) def _validate(self, tree): xmlschema = _get_schema() diff --git a/ryu/tests/unit/lib/test_hub.py b/ryu/tests/unit/lib/test_hub.py index 1b8166c3..4c558022 100644 --- a/ryu/tests/unit/lib/test_hub.py +++ b/ryu/tests/unit/lib/test_hub.py @@ -79,13 +79,11 @@ class Test_hub(unittest.TestCase): ev = hub.Event() result = [] with hub.Timeout(2): - hub.spawn(_child, ev, result) - try: - ev.wait(timeout=0.5) - raise BaseException("should timed out") - except hub.Timeout: - pass + t = hub.spawn(_child, ev, result) + ev.wait(timeout=0.5) assert len(result) == 0 + ev.wait() + assert len(result) == 1 def test_spawn_event3(self): def _child(ev, ev2, result): diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py index 264871f0..132ac65a 100644 --- a/ryu/topology/dumper.py +++ b/ryu/topology/dumper.py @@ -15,12 +15,11 @@ import logging -import gevent -import gevent.queue import time from ryu.base import app_manager from ryu.controller import handler +from ryu.lib import hub from ryu.topology import event from ryu.topology import switches @@ -39,14 +38,14 @@ class DiscoveryEventDumper(app_manager.RyuApp): # For testing when sync and async request. # self.threads.append( -# gevent.spawn_later(0, self._switch_request_sync, 5)) +# hub.spawn(self._switch_request_sync, 5)) # self.threads.append( -# gevent.spawn_later(0, self._switch_request_async, 10)) +# hub.spawn(self._switch_request_async, 10)) # # self.threads.append( -# gevent.spawn_later(0, self._link_request_sync, 5)) +# hub.spawn(self._link_request_sync, 5)) # self.threads.append( -# gevent.spawn_later(0, self._link_request_async, 10)) +# hub.spawn(self._link_request_async, 10)) self.is_active = True @@ -82,19 +81,19 @@ class DiscoveryEventDumper(app_manager.RyuApp): while self.is_active: request = event.EventSwitchRequest() LOG.debug('switch_request sync %s thread(%s)', - request, id(gevent.getcurrent())) + request, id(hub.getcurrent())) reply = self.send_request(request) LOG.debug('switch_reply sync %s', reply) if len(reply.switches) > 0: for sw in reply.switches: LOG.debug(' %s', sw) - gevent.sleep(interval) + hub.sleep(interval) def _switch_request_async(self, interval): while self.is_active: request = event.EventSwitchRequest() LOG.debug('switch_request async %s thread(%s)', - request, id(gevent.getcurrent())) + request, id(hub.getcurrent())) self.send_event(request.dst, request) start = time.time() @@ -104,16 +103,16 @@ class DiscoveryEventDumper(app_manager.RyuApp): if time.time() > start + i: i += 1 LOG.debug(' thread is busy... %s/%s thread(%s)', - i, busy, id(gevent.getcurrent())) + i, busy, id(hub.getcurrent())) LOG.debug(' thread yield to switch_reply handler. thread(%s)', - id(gevent.getcurrent())) + id(hub.getcurrent())) # yield - gevent.sleep(0) + hub.sleep(0) LOG.debug(' thread get back. thread(%s)', - id(gevent.getcurrent())) - gevent.sleep(interval - busy) + id(hub.getcurrent())) + hub.sleep(interval - busy) @handler.set_ev_cls(event.EventSwitchReply) def switch_reply_handler(self, reply): @@ -126,19 +125,19 @@ class DiscoveryEventDumper(app_manager.RyuApp): while self.is_active: request = event.EventLinkRequest() LOG.debug('link_request sync %s thread(%s)', - request, id(gevent.getcurrent())) + request, id(hub.getcurrent())) reply = self.send_request(request) LOG.debug('link_reply sync %s', reply) if len(reply.links) > 0: for link in reply.links: LOG.debug(' %s', link) - gevent.sleep(interval) + hub.sleep(interval) def _link_request_async(self, interval): while self.is_active: request = event.EventLinkRequest() LOG.debug('link_request async %s thread(%s)', - request, id(gevent.getcurrent())) + request, id(hub.getcurrent())) self.send_event(request.dst, request) start = time.time() @@ -148,16 +147,16 @@ class DiscoveryEventDumper(app_manager.RyuApp): if time.time() > start + i: i += 1 LOG.debug(' thread is busy... %s/%s thread(%s)', - i, busy, id(gevent.getcurrent())) + i, busy, id(hub.getcurrent())) LOG.debug(' thread yield to link_reply handler. thread(%s)', - id(gevent.getcurrent())) + id(hub.getcurrent())) # yield - gevent.sleep(0) + hub.sleep(0) LOG.debug(' thread get back. thread(%s)', - id(gevent.getcurrent())) - gevent.sleep(interval - busy) + id(hub.getcurrent())) + hub.sleep(interval - busy) @handler.set_ev_cls(event.EventLinkReply) def link_reply_handler(self, reply): diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py index b5584dca..0995dd63 100644 --- a/ryu/topology/switches.py +++ b/ryu/topology/switches.py @@ -14,7 +14,6 @@ # limitations under the License. import logging -import gevent import struct import time import json @@ -26,6 +25,7 @@ from ryu.controller import ofp_event from ryu.controller.handler import set_ev_cls from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.exception import RyuException +from ryu.lib import hub from ryu.lib.mac import DONTCARE, haddr_to_str from ryu.lib.dpid import dpid_to_str, str_to_dpid from ryu.lib.port_no import port_no_to_str @@ -453,17 +453,17 @@ class Switches(app_manager.RyuApp): if self.link_discovery: self.install_flow = CONF.install_lldp_flow self.explicit_drop = CONF.explicit_drop - self.lldp_event = gevent.event.Event() - self.link_event = gevent.event.Event() - self.threads.append(gevent.spawn_later(0, self.lldp_loop)) - self.threads.append(gevent.spawn_later(0, self.link_loop)) + self.lldp_event = hub.Event() + self.link_event = hub.Event() + self.threads.append(hub.spawn(self.lldp_loop)) + self.threads.append(hub.spawn(self.link_loop)) def close(self): self.is_active = False if self.link_discovery: self.lldp_event.set() self.link_event.set() - gevent.joinall(self.threads) + hub.joinall(self.threads) def _register(self, dp): assert dp.id is not None @@ -745,7 +745,7 @@ class Switches(app_manager.RyuApp): self.send_lldp_packet(port) for port in ports: self.send_lldp_packet(port) - gevent.sleep(self.LLDP_SEND_GUARD) # don't burst + hub.sleep(self.LLDP_SEND_GUARD) # don't burst if timeout is not None and ports: timeout = 0 # We have already slept @@ -6,7 +6,7 @@ source-dir = doc/source [bdist_rpm] Release = 1 Group = Applications/Accessories -Requires = python-gevent >= 0.13, python-routes, python-webob, python-paramiko +Requires = python-eventlet, python-routes, python-webob, python-paramiko doc_files = LICENSE MANIFEST.in README.rst diff --git a/tools/pip-requires b/tools/pip-requires index 2a9b4ce4..cf7f06b8 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,4 +1,4 @@ -gevent>=0.13 +eventlet routes webob>=1.0.8 paramiko |