diff options
Diffstat (limited to 'tests/integrated/test_add_flow_v10.py')
-rw-r--r-- | tests/integrated/test_add_flow_v10.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/tests/integrated/test_add_flow_v10.py b/tests/integrated/test_add_flow_v10.py new file mode 100644 index 00000000..895cf15d --- /dev/null +++ b/tests/integrated/test_add_flow_v10.py @@ -0,0 +1,258 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import logging + +from ryu.ofproto import ofproto_v1_0 +from ryu.ofproto import ether +from ryu.ofproto import nx_match + +from tests.integrated import tester + +LOG = logging.getLogger(__name__) + + +class RunTest(tester.TestFlowBase): + """ Test case for add flows of1.0 + """ + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION] + + def __init__(self, *args, **kwargs): + super(RunTest, self).__init__(*args, **kwargs) + self._verify = [] + + def add_action(self, dp, action): + rule = nx_match.ClsRule() + self.send_flow_mod( + dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None, + 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action) + + def add_rule(self, dp, rule): + self.send_flow_mod( + dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None, + 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, []) + + def send_flow_mod(self, dp, rule, cookie, command, idle_timeout, + hard_timeout, priority=None, buffer_id=0xffffffff, + out_port=None, flags=0, actions=None): + + if priority is None: + priority = dp.ofproto.OFP_DEFAULT_PRIORITY + if out_port is None: + out_port = dp.ofproto.OFPP_NONE + + match_tuple = rule.match_tuple() + match = dp.ofproto_parser.OFPMatch(*match_tuple) + + m = dp.ofproto_parser.OFPFlowMod( + dp, match, cookie, command, idle_timeout, hard_timeout, + priority, buffer_id, out_port, flags, actions) + + dp.send_msg(m) + + def _verify_action(self, actions, type_, name, value): + try: + action = actions[0] + if action.cls_action_type != type_: + return "Action type error. send:%s, val:%s" \ + % (type_, action.cls_action_type) + except IndexError: + return "Action is not setting." + + f_value = None + if name: + try: + if isinstance(name, list): + f_value = [getattr(action, n) for n in name] + else: + f_value = getattr(action, name) + except AttributeError: + pass + + if f_value != value: + return "Value error. send:%s=%s val:%s" \ + % (name, value, f_value) + return True + + def _verify_rule(self, rule, name, value): + f_value = getattr(rule, name) + if f_value != value: + return "Value error. send:%s=%s val:%s" \ + % (name, value, f_value) + return True + + def verify_default(self, dp, stats): + verify = self._verify + self._verify = [] + match = stats[0].match + actions = stats[0].actions + + if len(verify) == 2: + return self._verify_rule(match, *verify) + elif len(verify) == 3: + return self._verify_action(actions, *verify) + else: + return "self._verify is invalid." + + # Test of Actions + def test_action_output(self, dp): + out_port = 2 + self._verify = [dp.ofproto.OFPAT_OUTPUT, + 'port', out_port] + action = dp.ofproto_parser.OFPActionOutput(out_port) + self.add_action(dp, [action, ]) + + def test_action_vlan_vid(self, dp): + vlan_vid = 2 + self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID, + 'vlan_vid', vlan_vid] + action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid) + self.add_action(dp, [action, ]) + + def test_action_vlan_pcp(self, dp): + vlan_pcp = 4 + self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP, + 'vlan_pcp', vlan_pcp] + action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp) + self.add_action(dp, [action, ]) + + def test_action_strip_vlan(self, dp): + vlan_pcp = 4 + self._verify = [dp.ofproto.OFPAT_STRIP_VLAN, + None, None] + action = dp.ofproto_parser.OFPActionStripVlan() + self.add_action(dp, [action, ]) + + def test_action_set_dl_src(self, dp): + dl_src = '56:b3:42:04:b2:7a' + dl_src_bin = self.haddr_to_bin(dl_src) + self._verify = [dp.ofproto.OFPAT_SET_DL_SRC, + 'dl_addr', dl_src_bin] + action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin) + self.add_action(dp, [action, ]) + + def test_action_set_dl_dst(self, dp): + dl_dst = 'c2:93:a2:fb:d0:f4' + dl_dst_bin = self.haddr_to_bin(dl_dst) + self._verify = [dp.ofproto.OFPAT_SET_DL_DST, + 'dl_addr', dl_dst_bin] + action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin) + self.add_action(dp, [action, ]) + + def test_action_set_nw_src(self, dp): + nw_src = '216.132.81.105' + nw_src_int = self.ipv4_to_int(nw_src) + self._verify = [dp.ofproto.OFPAT_SET_NW_SRC, + 'nw_addr', nw_src_int] + action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int) + self.add_action(dp, [action, ]) + + def test_action_set_nw_dst(self, dp): + nw_dst = '223.201.206.3' + nw_dst_int = self.ipv4_to_int(nw_dst) + self._verify = [dp.ofproto.OFPAT_SET_NW_DST, + 'nw_addr', nw_dst_int] + action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int) + self.add_action(dp, [action, ]) + + def test_action_set_nw_tos(self, dp): + # lowest two bits must be zero + nw_tos = 1 << 2 + self._verify = [dp.ofproto.OFPAT_SET_NW_TOS, + 'tos', nw_tos] + action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos) + self.add_action(dp, [action, ]) + + def test_action_set_tp_src(self, dp): + tp_src = 55420 + self._verify = [dp.ofproto.OFPAT_SET_TP_SRC, + 'tp', tp_src] + action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src) + self.add_action(dp, [action, ]) + + def test_action_set_tp_dst(self, dp): + tp_dst = 15430 + self._verify = [dp.ofproto.OFPAT_SET_TP_DST, + 'tp', tp_dst] + action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst) + self.add_action(dp, [action, ]) + + def test_action_enqueue(self, dp): + port = 207 + queue_id = 4287508753 + self._verify = [dp.ofproto.OFPAT_ENQUEUE, + ['port', 'queue_id'], [port, queue_id]] + action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id) + self.add_action(dp, [action, ]) + + # Test of Rules + def test_rule_set_in_port(self, dp): + in_port = 32 + self._verify = ['in_port', in_port] + + rule = nx_match.ClsRule() + rule.set_in_port(in_port) + self.add_rule(dp, rule) + + def test_rule_set_dl_src(self, dp): + dl_src = 'b8:a1:94:51:78:83' + dl_src_bin = self.haddr_to_bin(dl_src) + self._verify = ['dl_src', dl_src_bin] + + rule = nx_match.ClsRule() + rule.set_dl_src(dl_src_bin) + self.add_rule(dp, rule) + + def test_rule_set_dl_type_ip(self, dp): + dl_type = ether.ETH_TYPE_IP + self._verify = ['dl_type', dl_type] + + rule = nx_match.ClsRule() + rule.set_dl_type(dl_type) + self.add_rule(dp, rule) + + def test_rule_set_dl_type_arp(self, dp): + dl_type = ether.ETH_TYPE_ARP + self._verify = ['dl_type', dl_type] + + rule = nx_match.ClsRule() + rule.set_dl_type(dl_type) + self.add_rule(dp, rule) + + def test_rule_set_dl_type_vlan(self, dp): + dl_type = ether.ETH_TYPE_8021Q + self._verify = ['dl_type', dl_type] + + rule = nx_match.ClsRule() + rule.set_dl_type(dl_type) + self.add_rule(dp, rule) + + def test_rule_set_dl_type_ipv6(self, dp): + dl_type = ether.ETH_TYPE_IPV6 + self._verify = ['dl_type', dl_type] + + rule = nx_match.ClsRule() + rule.set_dl_type(dl_type) + self.add_rule(dp, rule) + + def test_rule_set_dl_type_lacp(self, dp): + dl_type = ether.ETH_TYPE_SLOW + self._verify = ['dl_type', dl_type] + + rule = nx_match.ClsRule() + rule.set_dl_type(dl_type) + self.add_rule(dp, rule) |