diff options
author | YAMADA Hideki <yamada.hideki@po.ntts.co.jp> | 2013-03-14 19:16:07 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-03-25 03:13:11 +0900 |
commit | 7d5a68cdc20c657bef192c82c1dab2746efb2bf3 (patch) | |
tree | f955395794681a7fd476d52d21a6591f57af77be | |
parent | 443891b1c42afa6f102f081db477d37b02b09158 (diff) |
topology: support link discovery
event.py: add link events.
switches.py: add link discovery (only of1.0).
dumper.py: add handler for link events.
TODO: support other OpenFlow version.
Signed-off-by: YAMADA Hideki <yamada.hideki@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rwxr-xr-x | bin/ryu-manager | 1 | ||||
-rw-r--r-- | ryu/topology/dumper.py | 79 | ||||
-rw-r--r-- | ryu/topology/event.py | 42 | ||||
-rw-r--r-- | ryu/topology/switches.py | 630 |
4 files changed, 726 insertions, 26 deletions
diff --git a/bin/ryu-manager b/bin/ryu-manager index 03b1b79f..2924bd49 100755 --- a/bin/ryu-manager +++ b/bin/ryu-manager @@ -41,6 +41,7 @@ from ryu import version from ryu.app import wsgi from ryu.base.app_manager import AppManager from ryu.controller import controller +from ryu.topology import switches CONF = cfg.CONF diff --git a/ryu/topology/dumper.py b/ryu/topology/dumper.py index 426bcb5a..b48d06fd 100644 --- a/ryu/topology/dumper.py +++ b/ryu/topology/dumper.py @@ -21,7 +21,7 @@ import time from ryu.base import app_manager from ryu.controller.handler import set_ev_handler -from ryu.topology import switches, event +from ryu.topology import event LOG = logging.getLogger(__name__) @@ -34,8 +34,15 @@ class DiscoveryEventDumper(app_manager.RyuApp): super(DiscoveryEventDumper, self).__init__() # For testing when sync and async request. -# self.threads.append(gevent.spawn_later(0, self._request_sync, 5)) - self.threads.append(gevent.spawn_later(0, self._request_async, 10)) +# self.threads.append( +# gevent.spawn_later(0, self._switch_request_sync, 5)) +# self.threads.append( +# gevent.spawn_later(0, self._switch_request_async, 10)) +# +# self.threads.append( +# gevent.spawn_later(0, self._link_request_sync, 5)) +# self.threads.append( +# gevent.spawn_later(0, self._link_request_async, 10)) self.is_active = True @@ -59,22 +66,30 @@ class DiscoveryEventDumper(app_manager.RyuApp): def port_modify_handler(self, ev): LOG.debug(ev) - def _request_sync(self, interval): + @set_ev_handler(event.EventLinkAdd) + def link_add_handler(self, ev): + LOG.debug(ev) + + @set_ev_handler(event.EventLinkDelete) + def link_del_handler(self, ev): + LOG.debug(ev) + + def _switch_request_sync(self, interval): while self.is_active: request = event.EventSwitchRequest() - LOG.debug('request sync %s thread(%s)', + LOG.debug('switch_request sync %s thread(%s)', request, id(gevent.getcurrent())) reply = self.send_request(request) - LOG.debug('reply sync %s', reply) + LOG.debug('switch_reply sync %s', reply) if len(reply.switches) > 0: for sw in reply.switches: LOG.debug(' %s', sw) gevent.sleep(interval) - def _request_async(self, interval): + def _switch_request_async(self, interval): while self.is_active: request = event.EventSwitchRequest() - LOG.debug('request async %s thread(%s)', + LOG.debug('switch_request async %s thread(%s)', request, id(gevent.getcurrent())) self.send_event(request.dst, request) @@ -86,7 +101,7 @@ class DiscoveryEventDumper(app_manager.RyuApp): i += 1 LOG.debug(' thread is busy... %s/%s thread(%s)', i, busy, id(gevent.getcurrent())) - LOG.debug(' thread yield to reply handler. thread(%s)', + LOG.debug(' thread yield to switch_reply handler. thread(%s)', id(gevent.getcurrent())) # yield @@ -98,7 +113,51 @@ class DiscoveryEventDumper(app_manager.RyuApp): @set_ev_handler(event.EventSwitchReply) def switch_reply_handler(self, reply): - LOG.debug('reply async %s', reply) + LOG.debug('switch_reply async %s', reply) if len(reply.switches) > 0: for sw in reply.switches: LOG.debug(' %s', sw) + + def _link_request_sync(self, interval): + while self.is_active: + request = event.EventLinkRequest() + LOG.debug('link_request sync %s thread(%s)', + request, id(gevent.getcurrent())) + reply = self.send_request(request) + LOG.debug('link_reply sync %s', reply) + if len(reply.links) > 0: + for link in reply.links: + LOG.debug(' %s', link) + gevent.sleep(interval) + + def _link_request_async(self, interval): + while self.is_active: + request = event.EventLinkRequest() + LOG.debug('link_request async %s thread(%s)', + request, id(gevent.getcurrent())) + self.send_event(request.dst, request) + + start = time.time() + busy = interval / 2 + i = 0 + while i < busy: + if time.time() > start + i: + i += 1 + LOG.debug(' thread is busy... %s/%s thread(%s)', + i, busy, id(gevent.getcurrent())) + LOG.debug(' thread yield to link_reply handler. thread(%s)', + id(gevent.getcurrent())) + + # yield + gevent.sleep(0) + + LOG.debug(' thread get back. thread(%s)', + id(gevent.getcurrent())) + gevent.sleep(interval - busy) + + @set_ev_handler(event.EventLinkReply) + def link_reply_handler(self, reply): + LOG.debug('link_reply async %s', reply) + if len(reply.links) > 0: + for link in reply.links: + LOG.debug(' %s', link) diff --git a/ryu/topology/event.py b/ryu/topology/event.py index 0c9ce7ce..bd87ab00 100644 --- a/ryu/topology/event.py +++ b/ryu/topology/event.py @@ -84,3 +84,45 @@ class EventSwitchReply(event.EventReplyBase): def __str__(self): return 'EventSwitchReply<dst=%s, %s>' % \ (self.dst, self.switches) + + +class EventLinkBase(event.EventBase): + def __init__(self, link): + super(EventLinkBase, self).__init__() + self.link = link + + def __str__(self): + return '%s<%s>' % (self.__class__.__name__, self.link) + + +class EventLinkAdd(EventLinkBase): + def __init__(self, link): + super(EventLinkAdd, self).__init__(link) + + +class EventLinkDelete(EventLinkBase): + def __init__(self, link): + super(EventLinkDelete, self).__init__(link) + + +class EventLinkRequest(event.EventRequestBase): + # If dpid is None, reply all list + def __init__(self, dpid=None): + super(EventLinkRequest, self).__init__() + self.dst = 'switches' + self.dpid = dpid + + def __str__(self): + return 'EventLinkRequest<src=%s, dpid=%s>' % \ + (self.src, self.dpid) + + +class EventLinkReply(event.EventReplyBase): + def __init__(self, dst, dpid, links): + super(EventLinkReply, self).__init__(dst) + self.dpid = dpid + self.links = links + + def __str__(self): + return 'EventLinkReply<dst=%s, dpid=%s, links=%s>' % \ + (self.dst, self.dpid, len(self.links)) diff --git a/ryu/topology/switches.py b/ryu/topology/switches.py index 6cf9af79..6b69a001 100644 --- a/ryu/topology/switches.py +++ b/ryu/topology/switches.py @@ -14,16 +14,43 @@ # limitations under the License. import logging +import gevent +import struct +import time +from oslo.config import cfg from ryu.topology import event from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import set_ev_cls from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER +from ryu.exception import RyuException +from ryu.lib.mac import DONTCARE +from ryu.lib.dpid import dpid_to_str, str_to_dpid +from ryu.lib.packet import packet, ethernet, lldp +from ryu.ofproto.ether import ETH_TYPE_LLDP +from ryu.ofproto import ofproto_v1_0 +from ryu.ofproto import nx_match +from ryu.ofproto import ofproto_v1_2 +from ryu.ofproto import ofproto_v1_3 + LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +CONF.register_cli_opts([ + cfg.BoolOpt('observe-links', default=False, + help='observe link discovery events.'), + cfg.BoolOpt('install-lldp-flow', default=True, + help='link discovery: explicitly install flow entry ' + 'to send lldp packet to controller'), + cfg.BoolOpt('explicit-drop', default=True, + help='link discovery: explicitly drop lldp packet in') +]) + + class Port(object): # This is data class passed by EventPortXXX def __init__(self, dpid, ofproto, ofpport): @@ -91,6 +118,27 @@ class Switch(object): return msg +class Link(object): + # This is data class passed by EventLinkXXX + def __init__(self, src, dst): + super(Link, self).__init__() + self.src = src + self.dst = dst + + # this type is used for key value of LinkState + def __eq__(self, other): + return self.src == other.src and self.dst == other.dst + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash((self.src, self.dst)) + + def __str__(self): + return 'Link: %s to %s' % (self.src, self.dst) + + class PortState(dict): # dict: int port_no -> OFPPort port # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser @@ -107,17 +155,297 @@ class PortState(dict): self[port_no] = port +class PortData(object): + def __init__(self, is_down, lldp_data): + super(PortData, self).__init__() + self.is_down = is_down + self.lldp_data = lldp_data + self.timestamp = None + self.sent = 0 + + def lldp_sent(self): + self.timestamp = time.time() + self.sent += 1 + + def lldp_received(self): + self.sent = 0 + + def lldp_dropped(self): + return self.sent + + def clear_timestamp(self): + self.timestamp = None + + def set_down(self, is_down): + self.is_down = is_down + + def __str__(self): + return 'PortData<live=%s, timestamp=%s, sent=%d>' \ + % (not self.is_down, self.timestamp, self.sent) + + +class PortDataState(dict): + # dict: Port class -> PortData class + # slimed down version of OrderedDict as python 2.6 doesn't support it. + _PREV = 0 + _NEXT = 1 + _KEY = 2 + + def __init__(self): + super(PortDataState, self).__init__() + self._root = root = [] # sentinel node + root[:] = [root, root, None] # [_PREV, _NEXT, _KEY] + # doubly linked list + self._map = {} + + def _remove_key(self, key): + link_prev, link_next, key = self._map.pop(key) + link_prev[self._NEXT] = link_next + link_next[self._PREV] = link_prev + + def _append_key(self, key): + root = self._root + last = root[self._PREV] + last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root, + key] + + def _prepend_key(self, key): + root = self._root + first = root[self._NEXT] + first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first, + key] + + def _move_last_key(self, key): + self._remove_key(key) + self._append_key(key) + + def _move_front_key(self, key): + self._remove_key(key) + self._prepend_key(key) + + def add_port(self, port, lldp_data): + if port not in self: + self._prepend_key(port) + self[port] = PortData(port.is_down(), lldp_data) + else: + self[port].is_down = port.is_down() + + def lldp_sent(self, port): + port_data = self[port] + port_data.lldp_sent() + self._move_last_key(port) + return port_data + + def lldp_received(self, port): + self[port].lldp_received() + + def move_front(self, port): + port_data = self.get(port, None) + if port_data is not None: + port_data.clear_timestamp() + self._move_front_key(port) + + def set_down(self, port): + is_down = port.is_down() + port_data = self[port] + port_data.set_down(is_down) + port_data.clear_timestamp() + if not is_down: + self._move_front_key(port) + return is_down + + def get_port(self, port): + return self[port] + + def del_port(self, port): + del self[port] + self._remove_key(port) + + def __iter__(self): + root = self._root + curr = root[self._NEXT] + while curr is not root: + yield curr[self._KEY] + curr = curr[self._NEXT] + + def clear(self): + for node in self._map.itervalues(): + del node[:] + root = self._root + root[:] = [root, root, None] + self._map.clear() + dict.clear(self) + + def items(self): + 'od.items() -> list of (key, value) pairs in od' + return [(key, self[key]) for key in self] + + def iteritems(self): + 'od.iteritems -> an iterator over the (key, value) pairs in od' + for k in self: + yield (k, self[k]) + + +class LinkState(dict): + # dict: Link class -> timestamp + def __init__(self): + super(LinkState, self).__init__() + self._map = {} + + def get_peer(self, src): + return self._map.get(src, None) + + def update_link(self, src, dst): + link = Link(src, dst) + + self[link] = time.time() + self._map[src] = dst + + # return if the reverse link is also up or not + rev_link = Link(dst, src) + return rev_link in self + + def link_down(self, link): + del self[link] + del self._map[link.src] + + def rev_link_set_timestamp(self, rev_link, timestamp): + # rev_link may or may not in LinkSet + if rev_link in self: + self[rev_link] = timestamp + + def port_deleted(self, src): + dst = self.get_peer(src) + if dst is None: + raise KeyError() + + link = Link(src, dst) + rev_link = Link(dst, src) + del self[link] + del self._map[src] + # reverse link might not exist + self.pop(rev_link, None) + rev_link_dst = self._map.pop(dst, None) + + return dst, rev_link_dst + + +class LLDPPacket(object): + # make a LLDP packet for link discovery. + + CHASSIS_ID_PREFIX = 'dpid:' + CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX) + CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s' + + PORT_ID_STR = '!I' # uint32_t + PORT_ID_SIZE = 4 + + class LLDPUnknownFormat(RyuException): + message = '%(msg)s' + + @staticmethod + def lldp_packet(dpid, port_no, dl_addr, ttl): + pkt = packet.Packet() + + dst = lldp.LLDP_MAC_NEAREST_BRIDGE + src = dl_addr + ethertype = ETH_TYPE_LLDP + eth_pkt = ethernet.ethernet(dst, src, ethertype) + pkt.add_protocol(eth_pkt) + + tlv_chassis_id = lldp.ChassisID( + subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED, + chassis_id=LLDPPacket.CHASSIS_ID_FMT % + dpid_to_str(dpid)) + + tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT, + port_id=struct.pack( + LLDPPacket.PORT_ID_STR, + port_no)) + + tlv_ttl = lldp.TTL(ttl=ttl) + tlv_end = lldp.End() + + tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end) + lldp_pkt = lldp.lldp(tlvs) + pkt.add_protocol(lldp_pkt) + + pkt.serialize() + return pkt.data + + @staticmethod + def lldp_parse(data): + pkt = packet.Packet(data) + eth_pkt = pkt.next() + assert type(eth_pkt) == ethernet.ethernet + + lldp_pkt = pkt.next() + if type(lldp_pkt) != lldp.lldp: + raise LLDPPacket.LLDPUnknownFormat() + + tlv_chassis_id = lldp_pkt.tlvs[0] + if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype) + chassis_id = tlv_chassis_id.chassis_id + if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX): + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown chassis id format %s' % chassis_id) + src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:]) + + tlv_port_id = lldp_pkt.tlvs[1] + if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown port id subtype %d' % tlv_port_id.subtype) + port_id = tlv_port_id.port_id + if len(port_id) != LLDPPacket.PORT_ID_SIZE: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown port id %d' % port_id) + (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id) + + return src_dpid, src_port_no + + class Switches(app_manager.RyuApp): _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave, event.EventPortAdd, event.EventPortDelete, - event.EventPortModify] + event.EventPortModify, + event.EventLinkAdd, event.EventLinkDelete] + + DEFAULT_TTL = 120 # unused. ignored. + LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE, 0)) + + LLDP_SEND_GUARD = .05 + LLDP_SEND_PERIOD_PER_PORT = .9 + TIMEOUT_CHECK_PERIOD = 5. + LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2 + LINK_LLDP_DROP = 5 def __init__(self): super(Switches, self).__init__() self.name = 'switches' - self.dps = {} # datapath_id => class Datapath - self.port_state = {} # datapath_id => ports + self.dps = {} # datapath_id => Datapath class + self.port_state = {} # datapath_id => ports + self.ports = PortDataState() # Port class -> PortData class + self.links = LinkState() # Link class -> timestamp + self.is_active = True + + self.link_discovery = CONF.observe_links + if self.link_discovery: + self.install_flow = CONF.install_lldp_flow + self.explicit_drop = CONF.explicit_drop + self.lldp_event = gevent.event.Event() + self.link_event = gevent.event.Event() + self.threads.append(gevent.spawn_later(0, self.lldp_loop)) + self.threads.append(gevent.spawn_later(0, self.link_loop)) + + def close(self): + self.is_active = False + if self.link_discovery: + self.lldp_event.set() + self.link_event.set() + gevent.joinall(self.threads) def _register(self, dp): assert dp.id is not None @@ -133,11 +461,40 @@ class Switches(app_manager.RyuApp): del self.dps[dp.id] del self.port_state[dp.id] - def _get_switch(self, dp): - switch = Switch(dp) - for ofpport in self.port_state[dp.id].itervalues(): - switch.add_port(ofpport) - return switch + def _get_switch(self, dpid): + if dpid in self.dps: + switch = Switch(self.dps[dpid]) + for ofpport in self.port_state[dpid].itervalues(): + switch.add_port(ofpport) + return switch + + def _get_port(self, dpid, port_no): + switch = self._get_switch(dpid) + if switch: + for p in switch.ports: + if p.port_no == port_no: + return p + + def _port_added(self, port): + lldp_data = LLDPPacket.lldp_packet( + port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL) + self.ports.add_port(port, lldp_data) + # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s', + # port.dpid, port.port_no, port.is_live()) + + def _link_down(self, port): + try: + dst, rev_link_dst = self.links.port_deleted(port) + except KeyError: + # LOG.debug('key error. src=%s, dst=%s', + # port, self.links.get_peer(port)) + return + link = Link(port, dst) + self.send_event_to_observers(event.EventLinkDelete(link)) + if rev_link_dst: + rev_link = Link(dst, rev_link_dst) + self.send_event_to_observers(event.EventLinkDelete(rev_link)) + self.ports.move_front(dst) @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) @@ -148,19 +505,54 @@ class Switches(app_manager.RyuApp): if ev.state == MAIN_DISPATCHER: self._register(dp) - switch = self._get_switch(dp) + switch = self._get_switch(dp.id) LOG.debug('register %s', switch) self.send_event_to_observers(event.EventSwitchEnter(switch)) + if not self.link_discovery: + return + + if self.install_flow: + ofproto = dp.ofproto + ofproto_parser = dp.ofproto_parser + + # TODO:XXX need other versions + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + rule = nx_match.ClsRule() + rule.set_dl_dst(lldp.LLDP_MAC_NEAREST_BRIDGE) + rule.set_dl_type(ETH_TYPE_LLDP) + actions = [ofproto_parser.OFPActionOutput( + ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)] + dp.send_flow_mod( + rule=rule, cookie=0, command=ofproto.OFPFC_ADD, + idle_timeout=0, hard_timeout=0, actions=actions) + else: + LOG.error('cannot install flow. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + for port in switch.ports: + if not port.is_reserved(): + self._port_added(port) + self.lldp_event.set() + elif ev.state == DEAD_DISPATCHER: # dp.id is None when datapath dies before handshake if dp.id is None: return - switch = self._get_switch(dp) + switch = self._get_switch(dp.id) self._unregister(dp) LOG.debug('unregister %s', switch) self.send_event_to_observers(event.EventSwitchLeave(switch)) + if not self.link_discovery: + return + + for port in switch.ports: + if not port.is_reserved(): + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) def port_status_handler(self, ev): msg = ev.msg @@ -176,6 +568,14 @@ class Switches(app_manager.RyuApp): self.send_event_to_observers( event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport))) + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + self._port_added(port) + self.lldp_event.set() + elif reason == dp.ofproto.OFPPR_DELETE: #LOG.debug('A port was deleted.' + # '(datapath id = %s, port number = %s)', @@ -184,6 +584,15 @@ class Switches(app_manager.RyuApp): self.send_event_to_observers( event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport))) + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() + else: assert reason == dp.ofproto.OFPPR_MODIFY #LOG.debug('A port was modified.' + @@ -193,18 +602,184 @@ class Switches(app_manager.RyuApp): self.send_event_to_observers( event.EventPortModify(Port(dp.id, dp.ofproto, ofpport))) + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + if self.ports.set_down(port): + self._link_down(port) + self.lldp_event.set() + + @staticmethod + def _drop_packet(msg): + if msg.buffer_id == 0xffffffff: + return # TODO:use constant instead of -1 + + dp = msg.datapath + # TODO:XXX + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + dp.send_packet_out(dp.id, msg.in_port, []) + else: + LOG.error('cannot drop_packet. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def packet_in_handler(self, ev): + if not self.link_discovery: + return + + msg = ev.msg + try: + src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) + except LLDPPacket.LLDPUnknownFormat as e: + # This handler can receive all the packtes which can be + # not-LLDP packet. Ignore it silently + return + else: + dst_dpid = msg.datapath.id + dst_port_no = msg.in_port + + src = self._get_port(src_dpid, src_port_no) + if not src or src.dpid == dst_dpid: + return + + dst = self._get_port(dst_dpid, dst_port_no) + if not dst: + return + + old_peer = self.links.get_peer(src) + # LOG.debug("Packet-In") + # LOG.debug(" src=%s", src) + # LOG.debug(" dst=%s", dst) + # LOG.debug(" old_peer=%s", old_peer) + if old_peer and old_peer != dst: + old_link = Link(src, old_peer) + self.send_event_to_observers(event.EventLinkDelete(old_link)) + + link = Link(src, dst) + if not link in self.links: + self.send_event_to_observers(event.EventLinkAdd(link)) + + if not self.links.update_link(src, dst): + # reverse link is not detected yet. + # So schedule the check early because it's very likely it's up + try: + self.ports.lldp_received(dst) + except KeyError as e: + # There are races between EventOFPPacketIn and + # EventDPPortAdd. So packet-in event can happend before + # port add event. In that case key error can happend. + # LOG.debug('lldp_received: KeyError %s', e) + pass + else: + self.ports.move_front(dst) + self.lldp_event.set() + if self.explicit_drop: + self._drop_packet(msg) + + def send_lldp_packet(self, port): + try: + port_data = self.ports.lldp_sent(port) + except KeyError as e: + # ports can be modified during our sleep in self.lldp_loop() + # LOG.debug('send_lldp: KeyError %s', e) + return + if port_data.is_down: + return + + dp = self.dps.get(port.dpid, None) + if dp is None: + # datapath was already deleted + return + + # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no) + # TODO:XXX + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)] + dp.send_packet_out(actions=actions, data=port_data.lldp_data) + else: + LOG.error('cannot send lldp packet. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + def lldp_loop(self): + while self.is_active: + self.lldp_event.clear() + + now = time.time() + timeout = None + ports_now = [] + ports = [] + for (key, data) in self.ports.items(): + if data.timestamp is None: + ports_now.append(key) + continue + + expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT + if expire <= now: + ports.append(key) + continue + + timeout = expire - now + break + + for port in ports_now: + self.send_lldp_packet(port) + for port in ports: + self.send_lldp_packet(port) + gevent.sleep(self.LLDP_SEND_GUARD) # don't burst + + if timeout is not None and ports: + timeout = 0 # We have already slept + # LOG.debug('lldp sleep %s', timeout) + self.lldp_event.wait(timeout=timeout) + + def link_loop(self): + while self.is_active: + self.link_event.clear() + + now = time.time() + deleted = [] + for (link, timestamp) in self.links.items(): + # LOG.debug('%s timestamp %d (now %d)', link, timestamp, now) + if timestamp + self.LINK_TIMEOUT < now: + src = link.src + if src in self.ports: + port_data = self.ports.get_port(src) + # LOG.debug('port_data %s', port_data) + if port_data.lldp_dropped() > self.LINK_LLDP_DROP: + deleted.append(link) + + for link in deleted: + self.links.link_down(link) + # LOG.debug('delete %s', link) + self.send_event_to_observers(event.EventLinkDelete(link)) + + dst = link.dst + rev_link = Link(dst, link.src) + if rev_link not in deleted: + # It is very likely that the reverse link is also + # disconnected. Check it early. + expire = now - self.LINK_TIMEOUT + self.links.rev_link_set_timestamp(rev_link, expire) + if dst in self.ports: + self.ports.move_front(dst) + self.lldp_event.set() + + self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD) + @set_ev_cls(event.EventSwitchRequest) def switch_request_handler(self, req): - LOG.debug(req) + # LOG.debug(req) dpid = req.dpid switches = [] if dpid is None: # reply all list for dp in self.dps.itervalues(): - switches.append(self._get_switch(dp)) + switches.append(self._get_switch(dp.id)) elif dpid in self.dps: - switches.append(self._get_switch(self.dps[dpid])) + switches.append(self._get_switch(dpid)) rep = event.EventSwitchReply(req.src, switches) if req.sync: @@ -212,10 +787,33 @@ class Switches(app_manager.RyuApp): else: self.send_event(req.src, rep) + @set_ev_cls(event.EventLinkRequest) + def link_request_handler(self, req): + # LOG.debug(req) + dpid = req.dpid + + if dpid is None: + links = self.links + else: + links = [link for link in self.links if link.src.dpid == dpid] + rep = event.EventLinkReply(req.src, dpid, links) + if req.sync: + self.send_reply(rep) + else: + self.send_event(req.src, rep) + -def get(app, dpid=None): +def get_switch(app, dpid=None): return app.send_request(event.EventSwitchRequest(dpid)) -def get_all(app): - return get(app) +def get_all_switch(app): + return get_switch(app) + + +def get_link(app, dpid=None): + return app.send_request(event.EventLinkRequest(dpid)) + + +def get_all_link(app): + return get_link(app) |