diff options
-rw-r--r-- | ryu/topology/__init__.py | 0 | ||||
-rw-r--r-- | ryu/topology/dumper.py | 104 | ||||
-rw-r--r-- | ryu/topology/event.py | 86 | ||||
-rw-r--r-- | ryu/topology/switches.py | 221 |
4 files changed, 411 insertions, 0 deletions
diff --git a/ryu/topology/__init__.py b/ryu/topology/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/ryu/topology/__init__.py diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py new file mode 100644 index 00000000..426bcb5a --- /dev/null +++ b/ryu/topology/dumper.py @@ -0,0 +1,104 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import gevent +import gevent.queue +import time + +from ryu.base import app_manager +from ryu.controller.handler import set_ev_handler +from ryu.topology import switches, event + +LOG = logging.getLogger(__name__) + + +class DiscoveryEventDumper(app_manager.RyuApp): + ''' This app dumps discovery events + ''' + + def __init__(self): + super(DiscoveryEventDumper, self).__init__() + + # For testing when sync and async request. +# self.threads.append(gevent.spawn_later(0, self._request_sync, 5)) + self.threads.append(gevent.spawn_later(0, self._request_async, 10)) + + self.is_active = True + + @set_ev_handler(event.EventSwitchEnter) + def switch_enter_handler(self, ev): + LOG.debug(ev) + + @set_ev_handler(event.EventSwitchLeave) + def switch_leave_handler(self, ev): + LOG.debug(ev) + + @set_ev_handler(event.EventPortAdd) + def port_add_handler(self, ev): + LOG.debug(ev) + + @set_ev_handler(event.EventPortDelete) + def port_delete_handler(self, ev): + LOG.debug(ev) + + @set_ev_handler(event.EventPortModify) + def port_modify_handler(self, ev): + LOG.debug(ev) + + def _request_sync(self, interval): + while self.is_active: + request = event.EventSwitchRequest() + LOG.debug('request sync %s thread(%s)', + request, id(gevent.getcurrent())) + reply = self.send_request(request) + LOG.debug('reply sync %s', reply) + if len(reply.switches) > 0: + for sw in reply.switches: + LOG.debug(' %s', sw) + gevent.sleep(interval) + + def _request_async(self, interval): + while self.is_active: + request = event.EventSwitchRequest() + LOG.debug('request async %s thread(%s)', + request, id(gevent.getcurrent())) + self.send_event(request.dst, request) + + start = time.time() + busy = interval / 2 + i = 0 + while i < busy: + if time.time() > start + i: + i += 1 + LOG.debug(' thread is busy... %s/%s thread(%s)', + i, busy, id(gevent.getcurrent())) + LOG.debug(' thread yield to reply handler. thread(%s)', + id(gevent.getcurrent())) + + # yield + gevent.sleep(0) + + LOG.debug(' thread get back. thread(%s)', + id(gevent.getcurrent())) + gevent.sleep(interval - busy) + + @set_ev_handler(event.EventSwitchReply) + def switch_reply_handler(self, reply): + LOG.debug('reply async %s', reply) + if len(reply.switches) > 0: + for sw in reply.switches: + LOG.debug(' %s', sw) diff --git a/ryu/topology/event.py b/ryu/topology/event.py new file mode 100644 index 00000000..0c9ce7ce --- /dev/null +++ b/ryu/topology/event.py @@ -0,0 +1,86 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from ryu.controller import event + +LOG = logging.getLogger(__name__) + + +class EventSwitchBase(event.EventBase): + def __init__(self, switch): + super(EventSwitchBase, self).__init__() + self.switch = switch + + def __str__(self): + return '%s<dpid=%s, %s ports>' % \ + (self.__class__.__name__, + self.switch.dp.id, len(self.switch.ports)) + + +class EventSwitchEnter(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchEnter, self).__init__(switch) + + +class EventSwitchLeave(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchLeave, self).__init__(switch) + + +class EventPortBase(event.EventBase): + def __init__(self, port): + super(EventPortBase, self).__init__() + self.port = port + + def __str__(self): + return '%s<%s>' % (self.__class__.__name__, self.port) + + +class EventPortAdd(EventPortBase): + def __init__(self, port): + super(EventPortAdd, self).__init__(port) + + +class EventPortDelete(EventPortBase): + def __init__(self, port): + super(EventPortDelete, self).__init__(port) + + +class EventPortModify(EventPortBase): + def __init__(self, port): + super(EventPortModify, self).__init__(port) + + +class EventSwitchRequest(event.EventRequestBase): + # If dpid is None, reply all list + def __init__(self, dpid=None): + super(EventSwitchRequest, self).__init__() + self.dst = 'switches' + self.dpid = dpid + + def __str__(self): + return 'EventSwitchRequest<src=%s, dpid=%s>' % \ + (self.src, self.dpid) + + +class EventSwitchReply(event.EventReplyBase): + def __init__(self, dst, switches): + super(EventSwitchReply, self).__init__(dst) + self.switches = switches + + def __str__(self): + return 'EventSwitchReply<dst=%s, %s>' % \ + (self.dst, self.switches) diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py new file mode 100644 index 00000000..6cf9af79 --- /dev/null +++ b/ryu/topology/switches.py @@ -0,0 +1,221 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from ryu.topology import event +from ryu.base import app_manager +from ryu.controller import ofp_event +from ryu.controller.handler import set_ev_cls +from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER + +LOG = logging.getLogger(__name__) + + +class Port(object): + # This is data class passed by EventPortXXX + def __init__(self, dpid, ofproto, ofpport): + super(Port, self).__init__() + + self.dpid = dpid + self._ofproto = ofproto + self._config = ofpport.config + self._state = ofpport.state + + self.port_no = ofpport.port_no + self.hw_addr = ofpport.hw_addr + self.name = ofpport.name + + def is_reserved(self): + return self.port_no > self._ofproto.OFPP_MAX + + def is_down(self): + return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 \ + or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0 + + def is_live(self): + # NOTE: OF1.2 has OFPPS_LIVE state + # return (self._state & self._ofproto.OFPPS_LIVE) > 0 + return not self.is_down() + + # for Switch.del_port() + def __eq__(self, other): + return self.dpid == other.dpid and self.port_no == other.port_no + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash((self.dpid, self.port_no)) + + def __str__(self): + LIVE_MSG = {False: 'DOWN', True: 'LIVE'} + return 'Port<dpid=%s, port_no=%s, %s>' % \ + (self.dpid, self.port_no, LIVE_MSG[self.is_live()]) + + +class Switch(object): + # This is data class passed by EventSwitchXXX + def __init__(self, dp): + super(Switch, self).__init__() + + self.dp = dp + self.ports = [] + + def add_port(self, ofpport): + port = Port(self.dp.id, self.dp.ofproto, ofpport) + if not port.is_reserved(): + self.ports.append(port) + + def del_port(self, ofpport): + self.ports.remove(Port(ofpport)) + + def __str__(self): + msg = 'Switch<dpid=%s, ' % self.dp.id + for port in self.ports: + msg += str(port) + ' ' + + msg += '>' + return msg + + +class PortState(dict): + # dict: int port_no -> OFPPort port + # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser + def __init__(self): + super(PortState, self).__init__() + + def add(self, port_no, port): + self[port_no] = port + + def remove(self, port_no): + del self[port_no] + + def modify(self, port_no, port): + self[port_no] = port + + +class Switches(app_manager.RyuApp): + _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave, + event.EventPortAdd, event.EventPortDelete, + event.EventPortModify] + + def __init__(self): + super(Switches, self).__init__() + + self.name = 'switches' + self.dps = {} # datapath_id => class Datapath + self.port_state = {} # datapath_id => ports + + def _register(self, dp): + assert dp.id is not None + assert dp.id not in self.dps + + self.dps[dp.id] = dp + self.port_state[dp.id] = PortState() + for port in dp.ports.values(): + self.port_state[dp.id].add(port.port_no, port) + + def _unregister(self, dp): + if dp.id in self.dps: + del self.dps[dp.id] + del self.port_state[dp.id] + + def _get_switch(self, dp): + switch = Switch(dp) + for ofpport in self.port_state[dp.id].itervalues(): + switch.add_port(ofpport) + return switch + + @set_ev_cls(ofp_event.EventOFPStateChange, + [MAIN_DISPATCHER, DEAD_DISPATCHER]) + def state_change_handler(self, ev): + dp = ev.datapath + assert dp is not None + LOG.debug(dp) + + if ev.state == MAIN_DISPATCHER: + self._register(dp) + switch = self._get_switch(dp) + LOG.debug('register %s', switch) + self.send_event_to_observers(event.EventSwitchEnter(switch)) + + elif ev.state == DEAD_DISPATCHER: + # dp.id is None when datapath dies before handshake + if dp.id is None: + return + switch = self._get_switch(dp) + self._unregister(dp) + LOG.debug('unregister %s', switch) + self.send_event_to_observers(event.EventSwitchLeave(switch)) + + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) + def port_status_handler(self, ev): + msg = ev.msg + reason = msg.reason + dp = msg.datapath + ofpport = msg.desc + + if reason == dp.ofproto.OFPPR_ADD: + #LOG.debug('A port was added.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].add(ofpport.port_no, ofpport) + self.send_event_to_observers( + event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport))) + + elif reason == dp.ofproto.OFPPR_DELETE: + #LOG.debug('A port was deleted.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].remove(ofpport.port_no) + self.send_event_to_observers( + event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport))) + + else: + assert reason == dp.ofproto.OFPPR_MODIFY + #LOG.debug('A port was modified.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].modify(ofpport.port_no, ofpport) + self.send_event_to_observers( + event.EventPortModify(Port(dp.id, dp.ofproto, ofpport))) + + @set_ev_cls(event.EventSwitchRequest) + def switch_request_handler(self, req): + LOG.debug(req) + dpid = req.dpid + + switches = [] + if dpid is None: + # reply all list + for dp in self.dps.itervalues(): + switches.append(self._get_switch(dp)) + elif dpid in self.dps: + switches.append(self._get_switch(self.dps[dpid])) + + rep = event.EventSwitchReply(req.src, switches) + if req.sync: + self.send_reply(rep) + else: + self.send_event(req.src, rep) + + +def get(app, dpid=None): + return app.send_request(event.EventSwitchRequest(dpid)) + + +def get_all(app): + return get(app) |