summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/topology/__init__.py0
-rw-r--r--ryu/topology/dumper.py104
-rw-r--r--ryu/topology/event.py86
-rw-r--r--ryu/topology/switches.py221
4 files changed, 411 insertions, 0 deletions
diff --git a/ryu/topology/__init__.py b/ryu/topology/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/ryu/topology/__init__.py
diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py
new file mode 100644
index 00000000..426bcb5a
--- /dev/null
+++ b/ryu/topology/dumper.py
@@ -0,0 +1,104 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import gevent
+import gevent.queue
+import time
+
+from ryu.base import app_manager
+from ryu.controller.handler import set_ev_handler
+from ryu.topology import switches, event
+
+LOG = logging.getLogger(__name__)
+
+
+class DiscoveryEventDumper(app_manager.RyuApp):
+ ''' This app dumps discovery events
+ '''
+
+ def __init__(self):
+ super(DiscoveryEventDumper, self).__init__()
+
+ # For testing when sync and async request.
+# self.threads.append(gevent.spawn_later(0, self._request_sync, 5))
+ self.threads.append(gevent.spawn_later(0, self._request_async, 10))
+
+ self.is_active = True
+
+ @set_ev_handler(event.EventSwitchEnter)
+ def switch_enter_handler(self, ev):
+ LOG.debug(ev)
+
+ @set_ev_handler(event.EventSwitchLeave)
+ def switch_leave_handler(self, ev):
+ LOG.debug(ev)
+
+ @set_ev_handler(event.EventPortAdd)
+ def port_add_handler(self, ev):
+ LOG.debug(ev)
+
+ @set_ev_handler(event.EventPortDelete)
+ def port_delete_handler(self, ev):
+ LOG.debug(ev)
+
+ @set_ev_handler(event.EventPortModify)
+ def port_modify_handler(self, ev):
+ LOG.debug(ev)
+
+ def _request_sync(self, interval):
+ while self.is_active:
+ request = event.EventSwitchRequest()
+ LOG.debug('request sync %s thread(%s)',
+ request, id(gevent.getcurrent()))
+ reply = self.send_request(request)
+ LOG.debug('reply sync %s', reply)
+ if len(reply.switches) > 0:
+ for sw in reply.switches:
+ LOG.debug(' %s', sw)
+ gevent.sleep(interval)
+
+ def _request_async(self, interval):
+ while self.is_active:
+ request = event.EventSwitchRequest()
+ LOG.debug('request async %s thread(%s)',
+ request, id(gevent.getcurrent()))
+ self.send_event(request.dst, request)
+
+ start = time.time()
+ busy = interval / 2
+ i = 0
+ while i < busy:
+ if time.time() > start + i:
+ i += 1
+ LOG.debug(' thread is busy... %s/%s thread(%s)',
+ i, busy, id(gevent.getcurrent()))
+ LOG.debug(' thread yield to reply handler. thread(%s)',
+ id(gevent.getcurrent()))
+
+ # yield
+ gevent.sleep(0)
+
+ LOG.debug(' thread get back. thread(%s)',
+ id(gevent.getcurrent()))
+ gevent.sleep(interval - busy)
+
+ @set_ev_handler(event.EventSwitchReply)
+ def switch_reply_handler(self, reply):
+ LOG.debug('reply async %s', reply)
+ if len(reply.switches) > 0:
+ for sw in reply.switches:
+ LOG.debug(' %s', sw)
diff --git a/ryu/topology/event.py b/ryu/topology/event.py
new file mode 100644
index 00000000..0c9ce7ce
--- /dev/null
+++ b/ryu/topology/event.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from ryu.controller import event
+
+LOG = logging.getLogger(__name__)
+
+
+class EventSwitchBase(event.EventBase):
+ def __init__(self, switch):
+ super(EventSwitchBase, self).__init__()
+ self.switch = switch
+
+ def __str__(self):
+ return '%s<dpid=%s, %s ports>' % \
+ (self.__class__.__name__,
+ self.switch.dp.id, len(self.switch.ports))
+
+
+class EventSwitchEnter(EventSwitchBase):
+ def __init__(self, switch):
+ super(EventSwitchEnter, self).__init__(switch)
+
+
+class EventSwitchLeave(EventSwitchBase):
+ def __init__(self, switch):
+ super(EventSwitchLeave, self).__init__(switch)
+
+
+class EventPortBase(event.EventBase):
+ def __init__(self, port):
+ super(EventPortBase, self).__init__()
+ self.port = port
+
+ def __str__(self):
+ return '%s<%s>' % (self.__class__.__name__, self.port)
+
+
+class EventPortAdd(EventPortBase):
+ def __init__(self, port):
+ super(EventPortAdd, self).__init__(port)
+
+
+class EventPortDelete(EventPortBase):
+ def __init__(self, port):
+ super(EventPortDelete, self).__init__(port)
+
+
+class EventPortModify(EventPortBase):
+ def __init__(self, port):
+ super(EventPortModify, self).__init__(port)
+
+
+class EventSwitchRequest(event.EventRequestBase):
+ # If dpid is None, reply all list
+ def __init__(self, dpid=None):
+ super(EventSwitchRequest, self).__init__()
+ self.dst = 'switches'
+ self.dpid = dpid
+
+ def __str__(self):
+ return 'EventSwitchRequest<src=%s, dpid=%s>' % \
+ (self.src, self.dpid)
+
+
+class EventSwitchReply(event.EventReplyBase):
+ def __init__(self, dst, switches):
+ super(EventSwitchReply, self).__init__(dst)
+ self.switches = switches
+
+ def __str__(self):
+ return 'EventSwitchReply<dst=%s, %s>' % \
+ (self.dst, self.switches)
diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py
new file mode 100644
index 00000000..6cf9af79
--- /dev/null
+++ b/ryu/topology/switches.py
@@ -0,0 +1,221 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from ryu.topology import event
+from ryu.base import app_manager
+from ryu.controller import ofp_event
+from ryu.controller.handler import set_ev_cls
+from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
+
+LOG = logging.getLogger(__name__)
+
+
+class Port(object):
+ # This is data class passed by EventPortXXX
+ def __init__(self, dpid, ofproto, ofpport):
+ super(Port, self).__init__()
+
+ self.dpid = dpid
+ self._ofproto = ofproto
+ self._config = ofpport.config
+ self._state = ofpport.state
+
+ self.port_no = ofpport.port_no
+ self.hw_addr = ofpport.hw_addr
+ self.name = ofpport.name
+
+ def is_reserved(self):
+ return self.port_no > self._ofproto.OFPP_MAX
+
+ def is_down(self):
+ return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 \
+ or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0
+
+ def is_live(self):
+ # NOTE: OF1.2 has OFPPS_LIVE state
+ # return (self._state & self._ofproto.OFPPS_LIVE) > 0
+ return not self.is_down()
+
+ # for Switch.del_port()
+ def __eq__(self, other):
+ return self.dpid == other.dpid and self.port_no == other.port_no
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ return hash((self.dpid, self.port_no))
+
+ def __str__(self):
+ LIVE_MSG = {False: 'DOWN', True: 'LIVE'}
+ return 'Port<dpid=%s, port_no=%s, %s>' % \
+ (self.dpid, self.port_no, LIVE_MSG[self.is_live()])
+
+
+class Switch(object):
+ # This is data class passed by EventSwitchXXX
+ def __init__(self, dp):
+ super(Switch, self).__init__()
+
+ self.dp = dp
+ self.ports = []
+
+ def add_port(self, ofpport):
+ port = Port(self.dp.id, self.dp.ofproto, ofpport)
+ if not port.is_reserved():
+ self.ports.append(port)
+
+ def del_port(self, ofpport):
+ self.ports.remove(Port(ofpport))
+
+ def __str__(self):
+ msg = 'Switch<dpid=%s, ' % self.dp.id
+ for port in self.ports:
+ msg += str(port) + ' '
+
+ msg += '>'
+ return msg
+
+
+class PortState(dict):
+ # dict: int port_no -> OFPPort port
+ # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser
+ def __init__(self):
+ super(PortState, self).__init__()
+
+ def add(self, port_no, port):
+ self[port_no] = port
+
+ def remove(self, port_no):
+ del self[port_no]
+
+ def modify(self, port_no, port):
+ self[port_no] = port
+
+
+class Switches(app_manager.RyuApp):
+ _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave,
+ event.EventPortAdd, event.EventPortDelete,
+ event.EventPortModify]
+
+ def __init__(self):
+ super(Switches, self).__init__()
+
+ self.name = 'switches'
+ self.dps = {} # datapath_id => class Datapath
+ self.port_state = {} # datapath_id => ports
+
+ def _register(self, dp):
+ assert dp.id is not None
+ assert dp.id not in self.dps
+
+ self.dps[dp.id] = dp
+ self.port_state[dp.id] = PortState()
+ for port in dp.ports.values():
+ self.port_state[dp.id].add(port.port_no, port)
+
+ def _unregister(self, dp):
+ if dp.id in self.dps:
+ del self.dps[dp.id]
+ del self.port_state[dp.id]
+
+ def _get_switch(self, dp):
+ switch = Switch(dp)
+ for ofpport in self.port_state[dp.id].itervalues():
+ switch.add_port(ofpport)
+ return switch
+
+ @set_ev_cls(ofp_event.EventOFPStateChange,
+ [MAIN_DISPATCHER, DEAD_DISPATCHER])
+ def state_change_handler(self, ev):
+ dp = ev.datapath
+ assert dp is not None
+ LOG.debug(dp)
+
+ if ev.state == MAIN_DISPATCHER:
+ self._register(dp)
+ switch = self._get_switch(dp)
+ LOG.debug('register %s', switch)
+ self.send_event_to_observers(event.EventSwitchEnter(switch))
+
+ elif ev.state == DEAD_DISPATCHER:
+ # dp.id is None when datapath dies before handshake
+ if dp.id is None:
+ return
+ switch = self._get_switch(dp)
+ self._unregister(dp)
+ LOG.debug('unregister %s', switch)
+ self.send_event_to_observers(event.EventSwitchLeave(switch))
+
+ @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
+ def port_status_handler(self, ev):
+ msg = ev.msg
+ reason = msg.reason
+ dp = msg.datapath
+ ofpport = msg.desc
+
+ if reason == dp.ofproto.OFPPR_ADD:
+ #LOG.debug('A port was added.' +
+ # '(datapath id = %s, port number = %s)',
+ # dp.id, ofpport.port_no)
+ self.port_state[dp.id].add(ofpport.port_no, ofpport)
+ self.send_event_to_observers(
+ event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport)))
+
+ elif reason == dp.ofproto.OFPPR_DELETE:
+ #LOG.debug('A port was deleted.' +
+ # '(datapath id = %s, port number = %s)',
+ # dp.id, ofpport.port_no)
+ self.port_state[dp.id].remove(ofpport.port_no)
+ self.send_event_to_observers(
+ event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport)))
+
+ else:
+ assert reason == dp.ofproto.OFPPR_MODIFY
+ #LOG.debug('A port was modified.' +
+ # '(datapath id = %s, port number = %s)',
+ # dp.id, ofpport.port_no)
+ self.port_state[dp.id].modify(ofpport.port_no, ofpport)
+ self.send_event_to_observers(
+ event.EventPortModify(Port(dp.id, dp.ofproto, ofpport)))
+
+ @set_ev_cls(event.EventSwitchRequest)
+ def switch_request_handler(self, req):
+ LOG.debug(req)
+ dpid = req.dpid
+
+ switches = []
+ if dpid is None:
+ # reply all list
+ for dp in self.dps.itervalues():
+ switches.append(self._get_switch(dp))
+ elif dpid in self.dps:
+ switches.append(self._get_switch(self.dps[dpid]))
+
+ rep = event.EventSwitchReply(req.src, switches)
+ if req.sync:
+ self.send_reply(rep)
+ else:
+ self.send_event(req.src, rep)
+
+
+def get(app, dpid=None):
+ return app.send_request(event.EventSwitchRequest(dpid))
+
+
+def get_all(app):
+ return get(app)