summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/app/simple_switch_lacp.py115
-rw-r--r--ryu/lib/lacplib.py302
2 files changed, 417 insertions, 0 deletions
diff --git a/ryu/app/simple_switch_lacp.py b/ryu/app/simple_switch_lacp.py
new file mode 100644
index 00000000..2b4edb01
--- /dev/null
+++ b/ryu/app/simple_switch_lacp.py
@@ -0,0 +1,115 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import struct
+
+from ryu.base import app_manager
+from ryu.controller.handler import set_ev_cls
+from ryu.ofproto import ofproto_v1_0
+from ryu.lib import addrconv
+from ryu.lib import lacplib
+from ryu.lib.dpid import str_to_dpid
+
+
+class SimpleSwitchLacp(app_manager.RyuApp):
+ OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]
+ _CONTEXTS = {'lacplib': lacplib.LacpLib}
+
+ def __init__(self, *args, **kwargs):
+ super(SimpleSwitchLacp, self).__init__(*args, **kwargs)
+ self.mac_to_port = {}
+ self._lacp = kwargs['lacplib']
+ # in this sample application, bonding i/fs of the switchs
+ # shall be set up as follows:
+ # - the port 1 and 2 of the datapath 1 face the slave i/fs.
+ # - the port 3, 4 and 5 of the datapath 1 face the others.
+ # - the port 1 and 2 of the datapath 2 face the others.
+ self._lacp.add(
+ dpid=str_to_dpid('0000000000000001'), ports=[1, 2])
+ self._lacp.add(
+ dpid=str_to_dpid('0000000000000001'), ports=[3, 4, 5])
+ self._lacp.add(
+ dpid=str_to_dpid('0000000000000002'), ports=[1, 2])
+
+ def add_flow(self, datapath, in_port, dst, actions):
+ ofproto = datapath.ofproto
+ parser = datapath.ofproto_parser
+
+ match = parser.OFPMatch(in_port=in_port,
+ dl_dst=addrconv.mac.text_to_bin(dst))
+ mod = parser.OFPFlowMod(
+ datapath=datapath, match=match, cookie=0,
+ command=ofproto.OFPFC_ADD, actions=actions)
+ datapath.send_msg(mod)
+
+ def del_flow(self, datapath, dst):
+ ofproto = datapath.ofproto
+ parser = datapath.ofproto_parser
+
+ match = parser.OFPMatch(dl_dst=addrconv.mac.text_to_bin(dst))
+ mod = parser.OFPFlowMod(
+ datapath=datapath, match=match, cookie=0,
+ command=ofproto.OFPFC_DELETE)
+ datapath.send_msg(mod)
+
+ @set_ev_cls(lacplib.EventPacketIn, lacplib.LAG_EV_DISPATCHER)
+ def _packet_in_handler(self, ev):
+ msg = ev.msg
+ datapath = msg.datapath
+ ofproto = datapath.ofproto
+
+ (dst_, src_, _eth_type) = struct.unpack_from(
+ '!6s6sH', buffer(msg.data), 0)
+ src = addrconv.mac.bin_to_text(src_)
+ dst = addrconv.mac.bin_to_text(dst_)
+
+ dpid = datapath.id
+ self.mac_to_port.setdefault(dpid, {})
+
+ self.logger.info("packet in %s %s %s %s",
+ dpid, src, dst, msg.in_port)
+
+ # learn a mac address to avoid FLOOD next time.
+ self.mac_to_port[dpid][src] = msg.in_port
+
+ if dst in self.mac_to_port[dpid]:
+ out_port = self.mac_to_port[dpid][dst]
+ else:
+ out_port = ofproto.OFPP_FLOOD
+
+ actions = [datapath.ofproto_parser.OFPActionOutput(out_port)]
+
+ # install a flow to avoid packet_in next time
+ if out_port != ofproto.OFPP_FLOOD:
+ self.add_flow(datapath, msg.in_port, dst, actions)
+
+ out = datapath.ofproto_parser.OFPPacketOut(
+ datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
+ actions=actions)
+ datapath.send_msg(out)
+
+ @set_ev_cls(lacplib.EventSlaveStateChanged, lacplib.LAG_EV_DISPATCHER)
+ def _slave_state_changed_handler(self, ev):
+ datapath = ev.datapath
+ dpid = datapath.id
+ port_no = ev.port
+ enabled = ev.enabled
+ self.logger.info("slave state changed port: %d enabled: %s",
+ port_no, enabled)
+ if dpid in self.mac_to_port:
+ for mac in self.mac_to_port[dpid]:
+ self.del_flow(datapath, mac)
+ del self.mac_to_port[dpid]
+ self.mac_to_port.setdefault(dpid, {})
diff --git a/ryu/lib/lacplib.py b/ryu/lib/lacplib.py
new file mode 100644
index 00000000..e89e684c
--- /dev/null
+++ b/ryu/lib/lacplib.py
@@ -0,0 +1,302 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from ryu.base import app_manager
+from ryu.controller import event
+from ryu.controller import ofp_event
+from ryu.controller.handler import MAIN_DISPATCHER
+from ryu.controller.handler import set_ev_cls
+from ryu.ofproto import ether
+from ryu.lib import addrconv
+from ryu.lib.dpid import dpid_to_str
+from ryu.lib.packet import packet
+from ryu.lib.packet import ethernet
+from ryu.lib.packet import slow
+
+
+LAG_EV_DISPATCHER = "lacplib"
+
+
+class EventPacketIn(event.EventBase):
+ """a PacketIn event class using except LACP."""
+ def __init__(self, msg):
+ """initialization."""
+ super(EventPacketIn, self).__init__()
+ self.msg = msg
+
+
+class EventSlaveStateChanged(event.EventBase):
+ """a event class that notifies the changes of the statuses of the
+ slave i/fs."""
+ def __init__(self, datapath, port, enabled):
+ """initialization."""
+ super(EventSlaveStateChanged, self).__init__()
+ self.datapath = datapath
+ self.port = port
+ self.enabled = enabled
+
+
+class LacpLib(app_manager.RyuApp):
+ """LACP exchange library. this works only in a PASSIVE mode."""
+
+ #-------------------------------------------------------------------
+ # PUBLIC METHODS
+ #-------------------------------------------------------------------
+ def __init__(self):
+ """initialization."""
+ super(LacpLib, self).__init__()
+ self.name = 'lacplib'
+ self.bonds = []
+ self._set_logger()
+
+ def add(self, dpid, ports):
+ """add a setting of a bonding i/f.
+ 'add' method takes the correspondig args in this order.
+
+ ========= =====================================================
+ Attribute Description
+ ========= =====================================================
+ dpid an integer value that means datapath id.
+
+ ports a list of integer values that means the ports face
+ with the slave i/fs.
+ ========= =====================================================
+
+ if you want to use multi LAG, call 'add' method more than once.
+ """
+ assert isinstance(ports, list)
+ assert 2 <= len(ports)
+ ifs = {}
+ for port in ports:
+ ifs[port] = {'enabled': False, 'timeout': 0}
+ bond = {}
+ bond[dpid] = ifs
+ self.bonds.append(bond)
+
+ #-------------------------------------------------------------------
+ # PUBLIC METHODS ( EVENT HANDLERS )
+ #-------------------------------------------------------------------
+ @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
+ def packet_in_handler(self, evt):
+ """PacketIn event handler. when the received packet was LACP,
+ proceed it. otherwise, send a event."""
+ req_pkt = packet.Packet(evt.msg.data)
+ if slow.lacp in req_pkt:
+ (req_lacp, ) = req_pkt.get_protocols(slow.lacp)
+ (req_eth, ) = req_pkt.get_protocols(ethernet.ethernet)
+ self._do_lacp(req_lacp, req_eth.src, evt.msg)
+ else:
+ self.send_event_to_observers(EventPacketIn(evt.msg))
+
+ @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)
+ def flow_removed_handler(self, evt):
+ """FlowRemoved event handler. when the removed flow entry was
+ for LACP, set the status of the slave i/f to enabled, and
+ send a event that LACP exchange timeout has occurred."""
+ msg = evt.msg
+ datapath = msg.datapath
+ dpid = datapath.id
+ match = msg.match
+ port = match.in_port
+ if ether.ETH_TYPE_SLOW != match.dl_type:
+ return
+ self.logger.info(
+ "SW=%s PORT=%d LACP exchange timeout has occurred.",
+ dpid_to_str(dpid), port)
+ self._set_slave_enabled(dpid, port, False)
+ self._set_slave_timeout(dpid, port, 0)
+ self.send_event_to_observers(
+ EventSlaveStateChanged(datapath, port, False))
+
+ #-------------------------------------------------------------------
+ # PRIVATE METHODS ( RELATED TO LACP )
+ #-------------------------------------------------------------------
+ def _do_lacp(self, req_lacp, src, msg):
+ """packet-in process when the received packet is LACP."""
+ datapath = msg.datapath
+ dpid = datapath.id
+ port = msg.in_port
+ ofproto = datapath.ofproto
+ parser = datapath.ofproto_parser
+
+ self.logger.info("SW=%s PORT=%d LACP received.",
+ dpid_to_str(dpid), port)
+ self.logger.debug(str(req_lacp))
+
+ # when LACP arrived at disabled port, update the status of
+ # the slave i/f and reset all flow entries except for LACP.
+ if not self._get_slave_enabled(dpid, port):
+ self.logger.info(
+ "SW=%s PORT=%d the slave i/f has just been up.",
+ dpid_to_str(dpid), port)
+ self._set_slave_enabled(dpid, port, True)
+ self.send_event_to_observers(
+ EventSlaveStateChanged(datapath, port, True))
+
+ # set the idle_timeout time using the actor state of the
+ # received packet.
+ if req_lacp.LACP_STATE_SHORT_TIMEOUT == \
+ req_lacp.actor_state_timeout:
+ idle_timeout = req_lacp.SHORT_TIMEOUT_TIME
+ else:
+ idle_timeout = req_lacp.LONG_TIMEOUT_TIME
+
+ # when the timeout time has changed, update the timeout time of
+ # the slave i/f and re-enter a flow entry for the packet from
+ # the slave i/f with idle_timeout.
+ if idle_timeout != self._get_slave_timeout(dpid, port):
+ self.logger.info(
+ "SW=%s PORT=%d the timeout time has changed.",
+ dpid_to_str(dpid), port)
+ self._set_slave_timeout(dpid, port, idle_timeout)
+ self._set_flow_entry_packet_in(src, port, idle_timeout,
+ msg)
+
+ # create a response packet.
+ res_pkt = self._create_response(datapath, port, req_lacp)
+
+ # packet-out the response packet.
+ out_port = ofproto.OFPP_IN_PORT
+ actions = [parser.OFPActionOutput(out_port)]
+ out = datapath.ofproto_parser.OFPPacketOut(
+ datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER,
+ data=res_pkt.data, in_port=port, actions=actions)
+ datapath.send_msg(out)
+
+ def _create_response(self, datapath, port, req):
+ """create a packet including LACP."""
+ src = datapath.ports[port].hw_addr
+ res_ether = ethernet.ethernet(
+ slow.SLOW_PROTOCOL_MULTICAST, src, ether.ETH_TYPE_SLOW)
+ res_lacp = self._create_lacp(datapath, port, req)
+ res_pkt = packet.Packet()
+ res_pkt.add_protocol(res_ether)
+ res_pkt.add_protocol(res_lacp)
+ res_pkt.serialize()
+ return res_pkt
+
+ def _create_lacp(self, datapath, port, req):
+ """create a LACP packet."""
+ actor_system = datapath.ports[datapath.ofproto.OFPP_LOCAL].hw_addr
+ res = slow.lacp(
+ actor_system_priority=0xffff,
+ actor_system=actor_system,
+ actor_key=req.actor_key,
+ actor_port_priority=0xff,
+ actor_port=port,
+ actor_state_activity=req.LACP_STATE_PASSIVE,
+ actor_state_timeout=req.actor_state_timeout,
+ actor_state_aggregation=req.actor_state_aggregation,
+ actor_state_synchronization=req.actor_state_synchronization,
+ actor_state_collecting=req.actor_state_collecting,
+ actor_state_distributing=req.actor_state_distributing,
+ actor_state_defaulted=req.LACP_STATE_OPERATIONAL_PARTNER,
+ actor_state_expired=req.LACP_STATE_NOT_EXPIRED,
+ partner_system_priority=req.actor_system_priority,
+ partner_system=req.actor_system,
+ partner_key=req.actor_key,
+ partner_port_priority=req.actor_port_priority,
+ partner_port=req.actor_port,
+ partner_state_activity=req.actor_state_activity,
+ partner_state_timeout=req.actor_state_timeout,
+ partner_state_aggregation=req.actor_state_aggregation,
+ partner_state_synchronization=req.actor_state_synchronization,
+ partner_state_collecting=req.actor_state_collecting,
+ partner_state_distributing=req.actor_state_distributing,
+ partner_state_defaulted=req.actor_state_defaulted,
+ partner_state_expired=req.actor_state_expired,
+ collector_max_delay=0)
+ self.logger.info("SW=%s PORT=%d LACP sent.",
+ dpid_to_str(datapath.id), port)
+ self.logger.debug(str(res))
+ return res
+
+ def _get_slave_enabled(self, dpid, port):
+ """get whether a slave i/f at some port of some datapath is
+ enable or not."""
+ slave = self._get_slave(dpid, port)
+ if slave:
+ return slave['enabled']
+ else:
+ return False
+
+ def _set_slave_enabled(self, dpid, port, enabled):
+ """set whether a slave i/f at some port of some datapath is
+ enable or not."""
+ slave = self._get_slave(dpid, port)
+ if slave:
+ slave['enabled'] = enabled
+
+ def _get_slave_timeout(self, dpid, port):
+ """get the timeout time at some port of some datapath."""
+ slave = self._get_slave(dpid, port)
+ if slave:
+ return slave['timeout']
+ else:
+ return 0
+
+ def _set_slave_timeout(self, dpid, port, timeout):
+ """set the timeout time at some port of some datapath."""
+ slave = self._get_slave(dpid, port)
+ if slave:
+ slave['timeout'] = timeout
+
+ def _get_slave(self, dpid, port):
+ """get slave i/f at some port of some datapath."""
+ result = None
+ for bond in self.bonds:
+ if dpid in bond:
+ if port in bond[dpid]:
+ result = bond[dpid][port]
+ break
+ return result
+
+ #-------------------------------------------------------------------
+ # PRIVATE METHODS ( RELATED TO OPEN FLOW PROTOCOL )
+ #-------------------------------------------------------------------
+ def _set_flow_entry_packet_in(self, src, port, timeout, msg):
+ """enter a flow entry for the packet from the slave i/f
+ with idle_timeout."""
+ datapath = msg.datapath
+ ofproto = datapath.ofproto
+ parser = datapath.ofproto_parser
+
+ wildcards = ofproto.OFPFW_ALL
+ wildcards &= ~ofproto.OFPFW_IN_PORT
+ wildcards &= ~ofproto.OFPFW_DL_SRC
+ wildcards &= ~ofproto.OFPFW_DL_TYPE
+ match = parser.OFPMatch(wildcards=wildcards, in_port=port,
+ dl_src=addrconv.mac.text_to_bin(src),
+ dl_type=ether.ETH_TYPE_SLOW)
+ actions = [parser.OFPActionOutput(
+ ofproto.OFPP_CONTROLLER, ofproto.OFP_MSG_SIZE_MAX)]
+ mod = parser.OFPFlowMod(
+ datapath=datapath, match=match, cookie=0,
+ command=ofproto.OFPFC_ADD, idle_timeout=timeout,
+ flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
+ datapath.send_msg(mod)
+
+ #-------------------------------------------------------------------
+ # PRIVATE METHODS ( OTHERS )
+ #-------------------------------------------------------------------
+ def _set_logger(self):
+ """change log format."""
+ self.logger.propagate = False
+ hdl = logging.StreamHandler()
+ fmt_str = '[LACP][%(levelname)s] %(message)s'
+ hdl.setFormatter(logging.Formatter(fmt_str))
+ self.logger.addHandler(hdl)