diff options
author | Minoru TAKAHASHI <takahashi.minoru7@gmail.com> | 2014-07-28 13:26:04 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-07-28 19:00:48 +0900 |
commit | 44fc2f9b62b10a79dfb86c1d35b00ae7a38fc4f4 (patch) | |
tree | 1c3b9fd0cdec3c277d381bb448ead333c41ee15d | |
parent | 7d08001854e0adbdf4544a95c860377167218502 (diff) |
unit test for ofctl_v1_2/3
Signed-off-by: Minoru TAKAHASHI <takahashi.minoru7@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/tests/unit/lib/test_ofctl.py | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/ryu/tests/unit/lib/test_ofctl.py b/ryu/tests/unit/lib/test_ofctl.py new file mode 100644 index 00000000..cbdc3e19 --- /dev/null +++ b/ryu/tests/unit/lib/test_ofctl.py @@ -0,0 +1,617 @@ +# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import logging +import struct +import socket +import netaddr +import functools +import new +import itertools + +from nose.tools import * + +from ryu.lib import ofctl_v1_2 +from ryu.ofproto import ofproto_v1_2, ofproto_v1_2_parser +from ryu.lib import ofctl_v1_3 +from ryu.ofproto import ofproto_v1_3, ofproto_v1_3_parser +from ryu.ofproto import ofproto_protocol +from ryu.lib import mac +from ryu.lib import ip + +LOG = logging.getLogger('test_ofctl_v1_2, v1_3') + +""" Common Functions """ + + +def _to_match_eth(value): + eth_mask = value.split('/') + # MAC address + eth = mac.haddr_to_bin(eth_mask[0]) + # mask + mask = mac.haddr_to_bin('ff:ff:ff:ff:ff:ff') + if len(eth_mask) == 2: + mask = mac.haddr_to_bin(eth_mask[1]) + return eth, mask + + +def _to_match_tpsrc(value, match, rest): + match_append = {inet.IPPROTO_TCP: match.set_tcp_src, + inet.IPPROTO_UDP: match.set_udp_src} + nw_proto = rest.get('nw_proto', rest.get('ip_proto', 0)) + if nw_proto in match_append: + match_append[nw_proto](value) + return match + + +def _to_match_tpdst(value, match, rest): + match_append = {inet.IPPROTO_TCP: match.set_tcp_dst, + inet.IPPROTO_UDP: match.set_udp_dst} + nw_proto = rest.get('nw_proto', rest.get('ip_proto', 0)) + if nw_proto in match_append: + match_append[nw_proto](value) + return match + + +def _to_match_ip(value): + ip_mask = value.split('/') + # IP address + ipv4 = struct.unpack('!I', socket.inet_aton(ip_mask[0]))[0] + # netmask + netmask = ofproto_v1_2_parser.UINT32_MAX + if len(ip_mask) == 2: + # Check the mask is CIDR or not. + if ip_mask[1].isdigit(): + netmask &= ofproto_v1_2_parser.UINT32_MAX << 32 - int(ip_mask[1]) + else: + netmask = struct.unpack('!I', socket.inet_aton(ip_mask[1]))[0] + return ipv4, netmask + + +def _to_match_ipv6(value): + ip_mask = value.split('/') + if len(ip_mask) == 2 and ip_mask[1].isdigit() is False: + # Both address and netmask are colon-hexadecimal. + ipv6 = netaddr.IPAddress(ip_mask[0]).words + netmask = netaddr.IPAddress(ip_mask[1]).words + else: + # For other formats. + network = netaddr.IPNetwork(value) + ipv6 = network.ip.words + netmask = network.netmask.words + return ipv6, netmask + +conv_dict = { + 'eth_src': 'dl_src', + 'eth_dst': 'dl_dst', + 'eth_type': 'dl_type', + 'ipv4_dst': 'nw_dst', + 'ipv4_src': 'nw_src', + 'ip_proto': 'nw_proto', + 'vlan_vid': 'dl_vlan', + 'tcp_src': 'tp_src', + 'tcp_dst': 'tp_dst', + 'udp_src': 'tp_src', + 'udp_dst': 'tp_dst', +} + +""" Test_ofctl """ + + +class Test_ofctl(unittest.TestCase): + + def __init__(self, methodName): + super(Test_ofctl, self).__init__(methodName) + + def setUp(self): + pass + + def tearDown(self): + pass + + def _test_actions(self, act, test): + act_type = act["type"] + to_actions = test.to_actions + actions_to_str = test.actions_to_str + dp = ofproto_protocol.ProtocolDesc(version=test.ver) + act_list = [] + act_list.append(act) + # str -> action + result = to_actions(dp, act_list) + insts = result[0] + if act_type in test.supported_action: + cls = test.supported_action[act_type] + else: + cls = None + if act_type == 'GOTO_TABLE': + ok_(isinstance(insts, cls)) + eq_(insts.table_id, act["table_id"]) + elif act_type == 'WRITE_METADATA': + ok_(isinstance(insts, cls)) + eq_(insts.metadata, act["metadata"]) + eq_(insts.metadata_mask, act["metadata_mask"]) + elif act_type == 'METER': + ok_(isinstance(insts, cls)) + eq_(insts.meter_id, act["meter_id"]) + else: + ok_(isinstance(insts.actions[0], cls)) + if act_type == 'OUTPUT': + eq_(insts.actions[0].port, act["port"]) + elif act_type == 'SET_MPLS_TTL': + eq_(insts.actions[0].mpls_ttl, act["mpls_ttl"]) + elif act_type in ['PUSH_VLAN', 'PUSH_MPLS', + 'POP_MPLS', 'PUSH_PBB']: + eq_(insts.actions[0].ethertype, act["ethertype"]) + elif act_type == 'SET_QUEUE': + eq_(insts.actions[0].queue_id, act["queue_id"]) + elif act_type == 'GROUP': + eq_(insts.actions[0].group_id, act["group_id"]) + elif act_type == 'SET_NW_TTL': + eq_(insts.actions[0].nw_ttl, act["nw_ttl"]) + # action -> str + action_str = actions_to_str(result) + action_str_list = action_str[0].split(':') + eq_(action_str_list[0], act_type) + if act_type == 'GOTO_TABLE': + eq_(int(action_str_list[1]), act["table_id"]) + elif act_type == 'WRITE_METADATA': + met = action_str_list[1].split('/') + eq_(int(met[0], 16), act["metadata"]) + eq_(int(met[1], 16), act["metadata_mask"]) + elif act_type == 'METER': + eq_(int(action_str_list[1]), act["meter_id"]) + else: + if act_type == 'OUTPUT': + eq_(int(action_str_list[1]), act["port"]) + elif act_type == 'SET_MPLS_TTL': + eq_(int(action_str_list[1]), act["mpls_ttl"]) + elif act_type == 'PUSH_VLAN': + eq_(int(action_str_list[1]), act["ethertype"]) + elif act_type == 'PUSH_MPLS': + eq_(int(action_str_list[1]), act["ethertype"]) + elif act_type == 'POP_MPLS': + eq_(int(action_str_list[1]), act["ethertype"]) + elif act_type == 'SET_QUEUE': + eq_(int(action_str_list[1]), act["queue_id"]) + elif act_type == 'GROUP': + eq_(int(action_str_list[1]), act["group_id"]) + elif act_type == 'SET_NW_TTL': + eq_(int(action_str_list[1]), act["nw_ttl"]) + elif act_type == 'SET_FIELD': + eq_(action_str_list[1].strip(' {'), act["field"]) + eq_(action_str_list[2].strip('} '), act["value"]) + elif act_type == 'PUSH_PBB': + eq_(int(action_str_list[1]), act["ethertype"]) + + def _test_to_match(self, attrs, test): + to_match = test.to_match + match_to_str = test.match_to_str + dp = ofproto_protocol.ProtocolDesc(version=test.ver) + # str -> match + match = to_match(dp, attrs) + buf = bytearray() + match.serialize(buf, 0) + match = match.__class__.parser(str(buf), 0) + + def equal_match(key, value, cls_name, fields): + for field in fields: + if cls_name in str(field): + if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha', + 'eth_src', 'eth_dst']: + eth, mask = _to_match_eth(value) + str_eth = mac.haddr_to_str(eth) + str_mask = mac.haddr_to_str(mask) + str_value = mac.haddr_to_str(field.value) + for i in range(0, 17): + if str_mask[i] == 'f': + eq_(str_eth[i], str_value[i]) + else: + continue + eq_(mask, field.mask) + return + elif key in ['nw_src', 'nw_dst', 'ipv4_src', 'ipv4_dst', + 'arp_spa', 'arp_tpa']: + ipv4, mask = _to_match_ip(value) + eq_(ipv4, field.value) + eq_(mask, field.mask) + return + elif key in ['ipv6_src', 'ipv6_dst']: + ipv6, mask = _to_match_ipv6(value) + for i in range(0, 8): + if mask[i] == 65535: + eq_(ipv6[i], field.value[i]) + else: + continue + eq_(list(mask), field.mask) + return + elif key == 'ipv6_nd_target': + ipv6, mask = _to_match_ipv6(value) + for i in range(0, 8): + if mask[i] == 65535: + eq_(ipv6[i], field.value[i]) + else: + continue + return + elif key == 'ipv6_nd_sll' or key == 'ipv6_nd_tll': + eq_(mac.haddr_to_bin(value), field.value) + return + elif key == 'metadata': + eq_(int(value, 16), field.value) + return + else: + eq_(value, field.value) + return + assert False + + for key, value in attrs.items(): + if key.startswith('tp_'): + cls = test.supported_match[key][attrs["ip_proto"]] + elif key in test.supported_match: + cls = test.supported_match[key] + else: + cls = None + equal_match(key, value, cls.__name__, match.fields) + + # match -> str + match_str = match_to_str(match) + + def equal_str(key, value, match_str): + if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']: + eth_1, mask_1 = _to_match_eth(value) + eth_2, mask_2 = _to_match_eth(match_str[key]) + str_eth_1 = mac.haddr_to_str(eth_1) + str_mask_1 = mac.haddr_to_str(mask_1) + str_eth_2 = mac.haddr_to_str(eth_2) + for i in range(0, 17): + if str_mask_1[i] == 'f': + eq_(str_eth_1[i], str_eth_2[i]) + else: + continue + eq_(mask_1, mask_2) + return + elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']: + ipv4_1, ip_mask_1 = _to_match_ip(value) + ipv4_2, ip_mask_2 = _to_match_ip(match_str[key]) + eq_(ipv4_1, ipv4_2) + eq_(ip_mask_1, ip_mask_2) + return + elif key in ['ipv6_src', 'ipv6_dst']: + ipv6_1, netmask_1 = _to_match_ipv6(value) + ipv6_2, netmask_2 = _to_match_ipv6(match_str[key]) + for i in range(0, 8): + if netmask_1[i] == 65535: + eq_(ipv6_1[i], ipv6_2[i]) + else: + continue + eq_(netmask_1, netmask_2) + return + elif key == 'ipv6_nd_target': + ipv6_1, netmask_1 = _to_match_ipv6(value) + ipv6_2, netmask_2 = _to_match_ipv6(match_str[key]) + for i in range(0, 8): + if netmask_1[i] == 65535: + eq_(ipv6_1[i], ipv6_2[i]) + else: + continue + return + elif key == 'metadata': + eq_(str(int(value, 16)), match_str[key]) + return + eq_(value, match_str[key]) + + for key, value in attrs.items(): + if key in conv_dict: + key = conv_dict[key] + equal_str(key, value, match_str) + +""" Test_data for of_v1_2 """ + + +class test_data_v1_2(): + + def __init__(self): + self.supported_action = {} + self.supported_match = {} + self.act_list = [ + {'type': 'OUTPUT', 'port': 3}, + {'type': 'COPY_TTL_OUT'}, + {'type': 'COPY_TTL_IN'}, + {'type': 'SET_MPLS_TTL', 'mpls_ttl': 64}, + {'type': 'DEC_MPLS_TTL'}, + {'type': 'PUSH_VLAN', 'ethertype': 0x0800}, + {'type': 'POP_VLAN'}, + {'type': 'PUSH_MPLS', 'ethertype': 0x0800}, + {'type': 'POP_MPLS', 'ethertype': 0x0800}, + {'type': 'SET_QUEUE', 'queue_id': 7}, + {'type': 'GROUP', 'group_id': 5}, + {'type': 'SET_NW_TTL', 'nw_ttl': 64}, + {'type': 'DEC_NW_TTL'}, + {'type': 'GOTO_TABLE', 'table_id': 8}, + {'type': 'WRITE_METADATA', 'metadata': 8, + 'metadata_mask': (1 << 64) - 1}, + ] + self.attr_list = [ + {'in_port': 7}, + {'in_phy_port': 5, 'in_port': 3}, + {'metadata': '0x1212121212121212'}, + {'dl_src': "aa:bb:cc:11:22:33"}, + {'dl_src': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff"}, + {'dl_dst': "aa:bb:cc:11:22:33"}, + {'dl_dst': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff"}, + {'dl_type': 123}, + {'eth_src': "aa:bb:cc:11:22:33"}, + {'eth_src': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff"}, + {'eth_dst': "aa:bb:cc:11:22:33"}, + {'eth_dst': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff"}, + {'eth_type': 0x800}, + {'dl_vlan': 5}, + {'vlan_pcp': 3, 'vlan_vid': 3}, + {'ip_dscp': 3, 'eth_type': 0x0800}, + {'ip_ecn': 4, 'eth_type': 0x86dd}, + {'nw_src': "192.168.0.1", 'eth_type': 0x0800}, + {'nw_src': "192.168.0.1/24", 'eth_type': 0x0800}, + {'nw_src': "192.168.10.10/255.255.0.0", 'eth_type': 0x0800}, + {'nw_dst': "192.168.0.1", 'eth_type': 0x0800}, + {'nw_dst': "192.168.0.1/24", 'eth_type': 0x0800}, + {'nw_dst': "192.168.10.10/255.255.255.0"}, + {'nw_proto': 5, 'eth_type': 0x0800}, + {'ip_proto': 5, 'eth_type': 0x86dd}, + {'ipv4_src': "192.168.0.1", 'eth_type': 0x0800}, + {'ipv4_src': "192.168.0.1/24", 'eth_type': 0x0800}, + {'ipv4_src': "192.168.10.10/255.255.0.0", 'eth_type': 0x0800}, + {'ipv4_dst': "192.168.0.1", 'eth_type': 0x0800}, + {'ipv4_dst': "192.168.0.1/24", 'eth_type': 0x0800}, + {'ipv4_dst': "192.168.10.10/255.255.255.0", 'eth_type': 0x0800}, + {'tp_src': 1, 'ip_proto': 6}, + {'tp_dst': 2, 'ip_proto': 6}, + {'tp_src': 3, 'ip_proto': 17}, + {'tp_dst': 4, 'ip_proto': 17}, + {'vlan_vid': 3}, + {'tcp_src': 3, 'ip_proto': 6}, + {'tcp_dst': 5, 'ip_proto': 6}, + {'udp_src': 2, 'ip_proto': 17}, + {'udp_dst': 6, 'ip_proto': 17}, + {'sctp_src': 99, 'ip_proto': 132}, + {'sctp_dst': 99, 'ip_proto': 132}, + {'icmpv4_type': 5, 'ip_proto': 1}, + {'icmpv4_code': 6, 'ip_proto': 1}, + {'arp_op': 3, 'eth_type': 0x0806}, + {'arp_spa': "192.168.0.11", 'eth_type': 0x0806}, + {'arp_spa': "192.168.0.22/24", 'eth_type': 0x0806}, + {'arp_tpa': "192.168.0.33", 'eth_type': 0x0806}, + {'arp_tpa': "192.168.0.44/24", 'eth_type': 0x0806}, + {'arp_sha': "aa:bb:cc:11:22:33", 'eth_type': 0x0806}, + {'arp_sha': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff", + 'eth_type': 0x0806}, + {'arp_tha': "aa:bb:cc:11:22:33", 'eth_type': 0x0806}, + {'arp_tha': "aa:bb:cc:11:22:33/00:00:00:00:ff:ff", + 'eth_type': 0x0806}, + {'ipv6_src': '2001::aaaa:bbbb:cccc:1111', 'eth_type': 0x86dd}, + {'ipv6_src': '2001::aaaa:bbbb:cccc:1111/64', 'eth_type': 0x86dd}, + {'ipv6_dst': '2001::ffff:cccc:bbbb:1111', 'eth_type': 0x86dd}, + {'ipv6_dst': '2001::ffff:cccc:bbbb:1111/64', 'eth_type': 0x86dd}, + {'ipv6_flabel': 2, 'eth_type': 0x86dd}, + {'icmpv6_type': 3, 'ip_proto': 58}, + {'icmpv6_code': 4, 'ip_proto': 58}, + {'ipv6_nd_target': '2001::ffff:cccc:bbbb:1111', + 'icmpv6_type': 135, 'ip_proto': 58}, + {'ipv6_nd_sll': "aa:bb:cc:11:22:33", + 'icmpv6_type': 135, 'ip_proto': 58}, + {'ipv6_nd_tll': "aa:bb:cc:11:22:33", + 'icmpv6_type': 136, 'ip_proto': 58}, + {'mpls_label': 3, 'eth_type': 0x8848}, + {'mpls_tc': 2, 'eth_type': 0x8848} + ] + + def set_ver(self, ver): + self.ver = ver + + def set_attr(self, ofctl): + self.to_match = getattr(ofctl, "to_match") + self.match_to_str = getattr(ofctl, "match_to_str") + self.to_actions = getattr(ofctl, "to_actions") + self.actions_to_str = getattr(ofctl, "actions_to_str") + + def set_action_v1_2(self, parser): + self.supported_action.update( + { + 'OUTPUT': getattr(parser, "OFPActionOutput"), + 'COPY_TTL_OUT': getattr(parser, "OFPActionCopyTtlOut"), + 'COPY_TTL_IN': getattr(parser, "OFPActionCopyTtlIn"), + 'SET_MPLS_TTL': getattr(parser, "OFPActionSetMplsTtl"), + 'DEC_MPLS_TTL': getattr(parser, "OFPActionDecMplsTtl"), + 'PUSH_VLAN': getattr(parser, "OFPActionPushVlan"), + 'POP_VLAN': getattr(parser, "OFPActionPopVlan"), + 'PUSH_MPLS': getattr(parser, "OFPActionPushMpls"), + 'POP_MPLS': getattr(parser, "OFPActionPopMpls"), + 'SET_QUEUE': getattr(parser, "OFPActionSetQueue"), + 'GROUP': getattr(parser, "OFPActionGroup"), + 'SET_NW_TTL': getattr(parser, "OFPActionSetNwTtl"), + 'DEC_NW_TTL': getattr(parser, "OFPActionDecNwTtl"), + 'SET_FIELD': getattr(parser, "OFPActionSetField"), + 'GOTO_TABLE': getattr(parser, "OFPInstructionGotoTable"), + 'WRITE_METADATA': getattr(parser, + "OFPInstructionWriteMetadata"), + }) + + def set_match_v1_2(self, parser): + self.supported_match.update( + { + 'in_port': getattr(parser, "MTInPort"), + 'in_phy_port': getattr(parser, "MTInPhyPort"), + 'metadata': getattr(parser, "MTMetadata"), + 'eth_dst': getattr(parser, "MTEthDst"), + 'dl_dst': getattr(parser, "MTEthDst"), + 'eth_src': getattr(parser, "MTEthSrc"), + 'dl_src': getattr(parser, "MTEthSrc"), + 'dl_type': getattr(parser, "MTEthType"), + 'eth_type': getattr(parser, "MTEthType"), + 'dl_vlan': getattr(parser, "MTVlanVid"), + 'vlan_vid': getattr(parser, "MTVlanVid"), + 'vlan_pcp': getattr(parser, "MTVlanPcp"), + 'ip_dscp': getattr(parser, "MTIPDscp"), + 'ip_ecn': getattr(parser, "MTIPECN"), + 'nw_proto': getattr(parser, "MTIPProto"), + 'ip_proto': getattr(parser, "MTIPProto"), + 'nw_src': getattr(parser, "MTIPV4Src"), + 'nw_dst': getattr(parser, "MTIPV4Dst"), + 'ipv4_src': getattr(parser, "MTIPV4Src"), + 'ipv4_dst': getattr(parser, "MTIPV4Dst"), + 'tp_src': {6: getattr(parser, "MTTCPSrc"), + 17: getattr(parser, "MTUDPSrc")}, + 'tp_dst': {6: getattr(parser, "MTTCPDst"), + 17: getattr(parser, "MTUDPDst")}, + 'tcp_src': getattr(parser, "MTTCPSrc"), + 'tcp_dst': getattr(parser, "MTTCPDst"), + 'udp_src': getattr(parser, "MTUDPSrc"), + 'udp_dst': getattr(parser, "MTUDPDst"), + 'sctp_src': getattr(parser, "MTSCTPSrc"), + 'sctp_dst': getattr(parser, "MTSCTPDst"), + 'icmpv4_type': getattr(parser, "MTICMPV4Type"), + 'icmpv4_code': getattr(parser, "MTICMPV4Code"), + 'arp_op': getattr(parser, "MTArpOp"), + 'arp_spa': getattr(parser, "MTArpSpa"), + 'arp_tpa': getattr(parser, "MTArpTpa"), + 'arp_sha': getattr(parser, "MTArpSha"), + 'arp_tha': getattr(parser, "MTArpTha"), + 'ipv6_src': getattr(parser, "MTIPv6Src"), + 'ipv6_dst': getattr(parser, "MTIPv6Dst"), + 'ipv6_flabel': getattr(parser, "MTIPv6Flabel"), + 'icmpv6_type': getattr(parser, "MTICMPV6Type"), + 'icmpv6_code': getattr(parser, "MTICMPV6Code"), + 'ipv6_nd_target': getattr(parser, "MTIPv6NdTarget"), + 'ipv6_nd_sll': getattr(parser, "MTIPv6NdSll"), + 'ipv6_nd_tll': getattr(parser, "MTIPv6NdTll"), + 'mpls_label': getattr(parser, "MTMplsLabel"), + 'mpls_tc': getattr(parser, "MTMplsTc"), + }) + +""" Test_data for of_v1_3 """ + + +class test_data_v1_3(test_data_v1_2): + + def __init__(self): + test_data_v1_2.__init__(self) + self.act_list.extend( + [ + {'type': 'PUSH_PBB', 'ethertype': 0x0800}, + {'type': 'POP_PBB'}, + {'type': 'METER', 'meter_id': 3}, + ] + ) + self.attr_list.extend( + [ + {'mpls_bos': 3, 'eth_type': 0x8848}, + {'pbb_isid': 5, 'eth_type': 0x88E7}, + {'tunnel_id': 7}, + {'ipv6_exthdr': 3, 'eth_type': 0x86dd}, + ] + ) + + def set_action_v1_3(self, parser): + self.set_action_v1_2(parser) + self.supported_action.update( + { + 'PUSH_PBB': getattr(parser, "OFPActionPushPbb"), + 'POP_PBB': getattr(parser, "OFPActionPopPbb"), + 'METER': getattr(parser, "OFPInstructionMeter"), + }) + + def set_match_v1_3(self, parser): + self.set_match_v1_2(parser) + self.supported_match.update( + { + 'mpls_bos': getattr(parser, "MTMplsBos"), + 'pbb_isid': getattr(parser, "MTPbbIsid"), + 'tunnel_id': getattr(parser, "MTTunnelId"), + 'ipv6_exthdr': getattr(parser, "MTIPv6ExtHdr"), + }) + +""" Test_data for of_v1_4 """ + +# class test_data_v1_4(test_data_v1_3): + # def __init__(self): + # test_data_v1_3.__init__(self) + + # def set_action_v1_4(self, parser): + # self.set_action_v1_3(parser) + + # def set_match_v1_4(self, parser): + # self.set_match_v1_3(parser) + + +def _add_tests_actions(cls): + for act in cls.act_list: + method_name = 'test_' + str(cls.ver) + '_' + act["type"] + '_action' + + def _run(self, name, act, cls): + print ('processing %s ...' % name) + cls_ = Test_ofctl(name) + cls_._test_actions(act, cls) + print ('adding %s ...' % method_name) + func = functools.partial(_run, name=method_name, act=act, cls=cls) + func.func_name = method_name + func.__name__ = method_name + im = new.instancemethod(func, None, Test_ofctl) + setattr(Test_ofctl, method_name, im) + + +def _add_tests_match(cls): + for attr in cls.attr_list: + for key, value in attr.items(): + method_name = 'test_' + str(cls.ver) + '_' + key + '_match' + + def _run(self, name, attr, cls): + print ('processing %s ...' % name) + cls_ = Test_ofctl(name) + cls_._test_to_match(attr, cls) + print ('adding %s ...' % method_name) + func = functools.partial( + _run, name=method_name, attr=attr, cls=cls) + func.func_name = method_name + func.__name__ = method_name + im = new.instancemethod(func, None, Test_ofctl) + setattr(Test_ofctl, method_name, im) + +""" Test case """ + +# for of12 +cls = test_data_v1_2() +cls.set_action_v1_2(ofproto_v1_2_parser) +cls.set_match_v1_2(ofproto_v1_2_parser) +cls.set_ver(ofproto_v1_2.OFP_VERSION) +cls.set_attr(ofctl_v1_2) +_add_tests_actions(cls) +_add_tests_match(cls) + +# for of13 +cls = test_data_v1_3() +cls.set_action_v1_3(ofproto_v1_3_parser) +cls.set_match_v1_3(ofproto_v1_3_parser) +cls.set_ver(ofproto_v1_3.OFP_VERSION) +cls.set_attr(ofctl_v1_3) +_add_tests_actions(cls) +_add_tests_match(cls) + +# for of14 +# cls = test_data_v1_4() +# cls.set_action_v1_4(ofproto_v1_4_parser) +# cls.set_match_v1_4(ofproto_v1_4_parser) +# cls.set_ver(ofproto_v1_4.OFP_VERSION) +# cls.set_attr(ofctl_v1_4) +# _add_tests_actions(cls) +# _add_tests_match(cls) |