diff options
author | watanabe.fumitaka <watanabe.fumitaka@nttcom.co.jp> | 2013-08-05 13:07:47 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-08-06 09:25:50 +0900 |
commit | 0b5291856bbdada41b7f64c399e71bdf22567a6c (patch) | |
tree | a86ff8061385fffac18697883c64bd60e8f017e8 | |
parent | e0d82b9d375a1ce697b9d32ffb28271a0d1c4304 (diff) |
packet lib to string: unit tests
Signed-off-by: WATANABE Fumitaka <watanabe.fumitaka@nttcom.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/tests/unit/packet/test_dhcp.py | 111 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_icmp.py | 67 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_icmpv6.py | 75 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_ipv6.py | 65 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_lldp.py | 212 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_mpls.py | 54 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_packet.py | 250 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_vrrp.py | 61 |
8 files changed, 895 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_dhcp.py b/ryu/tests/unit/packet/test_dhcp.py new file mode 100644 index 00000000..768b2338 --- /dev/null +++ b/ryu/tests/unit/packet/test_dhcp.py @@ -0,0 +1,111 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import inspect +import logging +import socket +import unittest +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib import mac +from ryu.lib.packet import dhcp + + +LOG = logging.getLogger(__name__) + + +class Test_dhcp_offer(unittest.TestCase): + + op = dhcp.DHCP_BOOT_REPLY + chaddr = 'AA:AA:AA:AA:AA:AA' + htype = 1 + hlen = 6 + hops = 0 + xid = 1 + secs = 0 + flags = 1 + ciaddr = '192.168.10.10' + yiaddr = '192.168.20.20' + siaddr = '192.168.30.30' + giaddr = '192.168.40.40' + sname = 'abc' + boot_file = '' + + option_list = [dhcp.option('35', '02'), + dhcp.option('01', 'ffffff00'), + dhcp.option('03', 'c0a80a09'), + dhcp.option('06', 'c0a80a09'), + dhcp.option('33', '0003f480'), + dhcp.option('3a', '0001fa40'), + dhcp.option('3b', '000375f0'), + dhcp.option('36', 'c0a80a09')] + magic_cookie = socket.inet_aton('99.130.83.99') + options = dhcp.options(option_list=option_list, + magic_cookie=magic_cookie) + + dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen, + hops=hops, xid=xid, secs=secs, flags=flags, + ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr, + giaddr=giaddr, sname=sname, boot_file=boot_file) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + option_values = ['tag', 'length', 'value'] + opt_str_list = [] + for option in self.option_list: + _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k))) + for k, v in inspect.getmembers(option) + if k in option_values]) + opt_str = '%s(%s)' % (dhcp.option.__name__, _opt_str) + opt_str_list.append(opt_str) + option_str = '[%s]' % ', '.join(opt_str_list) + + opts_vals = {'magic_cookie': repr(self.magic_cookie), + 'option_list': option_str, + 'options_len': repr(self.options.options_len)} + _options_str = ','.join(['%s=%s' % (k, opts_vals[k]) + for k, v in inspect.getmembers(self.options) + if k in opts_vals]) + options_str = '%s(%s)' % (dhcp.options.__name__, _options_str) + + dhcp_values = {'op': repr(self.op), + 'htype': repr(self.htype), + 'hlen': repr(self.hlen), + 'hops': repr(self.hops), + 'xid': repr(self.xid), + 'secs': repr(self.secs), + 'flags': repr(self.flags), + 'ciaddr': repr(self.ciaddr), + 'yiaddr': repr(self.yiaddr), + 'siaddr': repr(self.siaddr), + 'giaddr': repr(self.giaddr), + 'chaddr': repr(self.chaddr), + 'sname': repr(self.sname), + 'boot_file': repr(self.boot_file), + 'options': options_str} + _dh_str = ','.join(['%s=%s' % (k, dhcp_values[k]) + for k, v in inspect.getmembers(self.dh) + if k in dhcp_values]) + dh_str = '%s(%s)' % (dhcp.dhcp.__name__, _dh_str) + + eq_(str(self.dh), dh_str) + eq_(repr(self.dh), dh_str) diff --git a/ryu/tests/unit/packet/test_icmp.py b/ryu/tests/unit/packet/test_icmp.py new file mode 100644 index 00000000..75d9702b --- /dev/null +++ b/ryu/tests/unit/packet/test_icmp.py @@ -0,0 +1,67 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import inspect +import logging + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import icmp + + +LOG = logging.getLogger(__name__) + + +class Test_icmp_dest_unreach(unittest.TestCase): + + type_ = icmp.ICMP_DEST_UNREACH + code = icmp.ICMP_HOST_UNREACH_CODE + csum = 0 + + mtu = 10 + data = 'abc' + data_len = len(data) + dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data) + + ic = icmp.icmp(type_, code, csum, data=dst_unreach) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + data_values = {'data': self.data, + 'data_len': self.data_len, + 'mtu': self.mtu} + _data_str = ','.join(['%s=%s' % (k, repr(data_values[k])) + for k, v in inspect.getmembers(self.dst_unreach) + if k in data_values]) + data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str) + + icmp_values = {'type': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': data_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(self.ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str) + + eq_(str(self.ic), ic_str) + eq_(repr(self.ic), ic_str) diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index 87e16960..848ecf9d 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -18,6 +18,7 @@ import unittest import logging import struct +import inspect from nose.tools import ok_, eq_, nottest, raises from nose.plugins.skip import Skip, SkipTest @@ -157,6 +158,30 @@ class Test_icmpv6_echo_request(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + def test_to_string(self): + ec = icmpv6.echo(self.id_, self.seq, self.data) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec) + + echo_values = {'id': self.id_, + 'seq': self.seq, + 'data': self.data} + _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k])) + for k, v in inspect.getmembers(ec) + if k in echo_values]) + echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': echo_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_echo_reply(Test_icmpv6_echo_request): def setUp(self): @@ -263,6 +288,41 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase): eq_(nd_length, self.nd_length) eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src)) + def test_to_string(self): + nd_opt = icmpv6.nd_option_la(self.nd_hw_src) + nd = icmpv6.nd_neighbor( + self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd) + + nd_opt_values = {'hw_src': self.nd_hw_src, + 'data': None} + _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k])) + for k, v in inspect.getmembers(nd_opt) + if k in nd_opt_values]) + nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str) + + nd_values = {'res': repr(nd.res), + 'dst': repr(self.dst), + 'type_': repr(self.nd_type), + 'length': repr(self.nd_length), + 'data': nd_opt_str} + _nd_str = ','.join(['%s=%s' % (k, nd_values[k]) + for k, v in inspect.getmembers(nd) + if k in nd_values]) + nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': nd_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict): def setUp(self): @@ -339,3 +399,18 @@ class Test_icmpv6_router_solict(unittest.TestCase): def test_serialize_with_data(self): self._test_serialize(self.data) + + def test_to_string(self): + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data) + + icmp_values = {'type_': self.type_, + 'code': self.code, + 'csum': self.csum, + 'data': self.data} + _ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k])) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py new file mode 100644 index 00000000..5454fbfd --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -0,0 +1,65 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib import ip +from ryu.lib.packet import ipv6 + + +LOG = logging.getLogger(__name__) + + +class Test_ipv6(unittest.TestCase): + + version = 6 + traffic_class = 0 + flow_label = 0 + payload_length = 817 + nxt = 6 + hop_limit = 128 + src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3') + dst = ip.ipv6_to_bin('2001:4860:0:2001::68') + + ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length, + nxt, hop_limit, src, dst) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + ipv6_values = {'version': self.version, + 'traffic_class': self.traffic_class, + 'flow_label': self.flow_label, + 'payload_length': self.payload_length, + 'nxt': self.nxt, + 'hop_limit': self.hop_limit, + 'src': self.src, + 'dst': self.dst} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(self.ip) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + eq_(str(self.ip), ipv6_str) + eq_(repr(self.ip), ipv6_str) diff --git a/ryu/tests/unit/packet/test_lldp.py b/ryu/tests/unit/packet/test_lldp.py index 7f95fb47..60e93b00 100644 --- a/ryu/tests/unit/packet/test_lldp.py +++ b/ryu/tests/unit/packet/test_lldp.py @@ -18,6 +18,7 @@ import unittest import logging import struct +import inspect from nose.tools import ok_, eq_, nottest from ryu.ofproto import ether @@ -120,6 +121,62 @@ class TestLLDPMandatoryTLV(unittest.TestCase): pkt.serialize() eq_(pkt.data, self.data) + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x04\x96\x1f\xa7\x26') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/3') + ttl = lldp.TTL(ttl=120) + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, end) + lldp_pkt = lldp.lldp(tlvs) + + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x04\x96\x1f\xa7\x26', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + _tlvs_str = '(%s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_end_str) + + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) + class TestLLDPOptionalTLV(unittest.TestCase): def setUp(self): @@ -260,3 +317,158 @@ class TestLLDPOptionalTLV(unittest.TestCase): # self.data has many organizationally specific TLVs data = str(pkt.data[:-2]) eq_(data, self.data[:len(data)]) + + def test_to_string(self): + chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS, + chassis_id='\x00\x01\x30\xf9\xad\xa0') + port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME, + port_id='1/1') + ttl = lldp.TTL(ttl=120) + port_desc = lldp.PortDescription( + port_description='Summit300-48-Port 1001\x00') + sys_name = lldp.SystemName(system_name='Summit300-48\x00') + sys_desc = lldp.SystemDescription( + system_description='Summit300-48 - Version 7.4e.1 (Build 5) ' + + 'by Release_Master 05/27/05 04:53:11\x00') + sys_cap = lldp.SystemCapabilities( + subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT, + system_cap=0x14, + enabled_cap=0x14) + man_addr = lldp.ManagementAddress( + addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0', + intf_subtype=0x02, intf_num=1001, + oid='') + org_spec = lldp.OrganizationallySpecific( + oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00') + end = lldp.End() + tlvs = (chassis_id, port_id, ttl, port_desc, sys_name, + sys_desc, sys_cap, man_addr, org_spec, end) + lldp_pkt = lldp.lldp(tlvs) + + # ChassisID string + chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS, + 'chassis_id': '\x00\x01\x30\xf9\xad\xa0', + 'len': chassis_id.len, + 'typelen': chassis_id.typelen} + _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k])) + for k, v in inspect.getmembers(chassis_id) + if k in chassis_id_values]) + tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str) + + # PortID string + port_id_values = {'subtype': port_id.subtype, + 'port_id': port_id.port_id, + 'len': port_id.len, + 'typelen': port_id.typelen} + _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k])) + for k, v in inspect.getmembers(port_id) + if k in port_id_values]) + tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str) + + # TTL string + ttl_values = {'ttl': ttl.ttl, + 'len': ttl.len, + 'typelen': ttl.typelen} + _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k])) + for k, v in inspect.getmembers(ttl) + if k in ttl_values]) + tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str) + + # PortDescription string + port_desc_values = {'tlv_info': port_desc.tlv_info, + 'len': port_desc.len, + 'typelen': port_desc.typelen} + _port_desc_str = ','.join(['%s=%s' % (k, repr(port_desc_values[k])) + for k, v in inspect.getmembers(port_desc) + if k in port_desc_values]) + tlv_port_desc_str = '%s(%s)' % (lldp.PortDescription.__name__, + _port_desc_str) + + # SystemName string + sys_name_values = {'tlv_info': sys_name.tlv_info, + 'len': sys_name.len, + 'typelen': sys_name.typelen} + _system_name_str = ','.join(['%s=%s' % (k, repr(sys_name_values[k])) + for k, v in inspect.getmembers(sys_name) + if k in sys_name_values]) + tlv_system_name_str = '%s(%s)' % (lldp.SystemName.__name__, + _system_name_str) + + # SystemDescription string + sys_desc_values = {'tlv_info': sys_desc.tlv_info, + 'len': sys_desc.len, + 'typelen': sys_desc.typelen} + _sys_desc_str = ','.join(['%s=%s' % (k, repr(sys_desc_values[k])) + for k, v in inspect.getmembers(sys_desc) + if k in sys_desc_values]) + tlv_sys_desc_str = '%s(%s)' % (lldp.SystemDescription.__name__, + _sys_desc_str) + + # SystemCapabilities string + sys_cap_values = {'subtype': lldp.ChassisID.SUB_CHASSIS_COMPONENT, + 'system_cap': 0x14, + 'enabled_cap': 0x14, + 'len': sys_cap.len, + 'typelen': sys_cap.typelen} + _sys_cap_str = ','.join(['%s=%s' % (k, repr(sys_cap_values[k])) + for k, v in inspect.getmembers(sys_cap) + if k in sys_cap_values]) + tlv_sys_cap_str = '%s(%s)' % (lldp.SystemCapabilities.__name__, + _sys_cap_str) + + # ManagementAddress string + man_addr_values = {'addr_subtype': 0x06, + 'addr': '\x00\x01\x30\xf9\xad\xa0', + 'addr_len': man_addr.addr_len, + 'intf_subtype': 0x02, + 'intf_num': 1001, + 'oid': '', + 'oid_len': man_addr.oid_len, + 'len': man_addr.len, + 'typelen': man_addr.typelen} + _man_addr_str = ','.join(['%s=%s' % (k, repr(man_addr_values[k])) + for k, v in inspect.getmembers(man_addr) + if k in man_addr_values]) + tlv_man_addr_str = '%s(%s)' % (lldp.ManagementAddress.__name__, + _man_addr_str) + + # OrganizationallySpecific string + org_spec_values = {'oui': '\x00\x12\x0f', + 'subtype': 0x02, + 'info': '\x07\x01\x00', + 'len': org_spec.len, + 'typelen': org_spec.typelen} + _org_spec_str = ','.join(['%s=%s' % (k, repr(org_spec_values[k])) + for k, v in inspect.getmembers(org_spec) + if k in org_spec_values]) + tlv_org_spec_str = '%s(%s)' % (lldp.OrganizationallySpecific.__name__, + _org_spec_str) + + # End string + end_values = {'len': end.len, + 'typelen': end.typelen} + _end_str = ','.join(['%s=%s' % (k, repr(end_values[k])) + for k, v in inspect.getmembers(end) + if k in end_values]) + tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str) + + # tlvs string + _tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)' + tlvs_str = _tlvs_str % (tlv_chassis_id_str, + tlv_port_id_str, + tlv_ttl_str, + tlv_port_desc_str, + tlv_system_name_str, + tlv_sys_desc_str, + tlv_sys_cap_str, + tlv_man_addr_str, + tlv_org_spec_str, + tlv_end_str) + + # lldp string + _lldp_str = '%s(tlvs=%s)' + lldp_str = _lldp_str % (lldp.lldp.__name__, + tlvs_str) + + eq_(str(lldp_pkt), lldp_str) + eq_(repr(lldp_pkt), lldp_str) diff --git a/ryu/tests/unit/packet/test_mpls.py b/ryu/tests/unit/packet/test_mpls.py new file mode 100644 index 00000000..abde0307 --- /dev/null +++ b/ryu/tests/unit/packet/test_mpls.py @@ -0,0 +1,54 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import logging +import inspect + +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.lib.packet import mpls + + +LOG = logging.getLogger(__name__) + + +class Test_mpls(unittest.TestCase): + + label = 29 + exp = 6 + bsb = 1 + ttl = 64 + mp = mpls.mpls(label, exp, bsb, ttl) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_to_string(self): + mpls_values = {'label': self.label, + 'exp': self.exp, + 'bsb': self.bsb, + 'ttl': self.ttl} + _mpls_str = ','.join(['%s=%s' % (k, repr(mpls_values[k])) + for k, v in inspect.getmembers(self.mp) + if k in mpls_values]) + mpls_str = '%s(%s)' % (mpls.mpls.__name__, _mpls_str) + + eq_(str(self.mp), mpls_str) + eq_(repr(self.mp), mpls_str) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index 45f27fd0..d8ea0d49 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -19,6 +19,7 @@ import unittest import logging import struct import array +import inspect from nose.tools import * from nose.plugins.skip import Skip, SkipTest from ryu.ofproto import ether, inet @@ -116,6 +117,40 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_ARP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + arp_values = {'hwtype': 1, + 'proto': ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': self.src_mac, + 'dst_mac': self.dst_mac, + 'src_ip': self.src_ip, + 'dst_ip': self.dst_ip} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s' % (eth_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_vlan_arp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -185,6 +220,52 @@ class TestPacket(unittest.TestCase): eq_(self.dst_mac, p_arp.dst_mac) eq_(self.dst_ip, p_arp.dst_ip) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_8021Q} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + vlan_values = {'pcp': 0b111, + 'cfi': 0b1, + 'vid': 3, + 'ethertype': ether.ETH_TYPE_ARP} + _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k])) + for k, v in inspect.getmembers(p_vlan) + if k in vlan_values]) + vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str) + + arp_values = {'hwtype': 1, + 'proto': ether.ETH_TYPE_IP, + 'hlen': 6, + 'plen': 4, + 'opcode': 2, + 'src_mac': self.src_mac, + 'dst_mac': self.dst_mac, + 'src_ip': self.src_ip, + 'dst_ip': self.dst_ip} + _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k])) + for k, v in inspect.getmembers(p_arp) + if k in arp_values]) + arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str) + + pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(vlan_str, str(p_vlan)) + eq_(vlan_str, repr(p_vlan)) + + eq_(arp_str, str(p_arp)) + eq_(arp_str, repr(p_arp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_udp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -272,6 +353,57 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 1, + 'total_length': l, + 'identification': 3, + 'flags': 1, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_UDP, + 'csum': p_ipv4.csum, + 'src': self.src_ip, + 'dst': self.dst_ip, + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + udp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'total_length': len(u_buf) + len(self.payload), + 'csum': 0x77b2} + _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k])) + for k, v in inspect.getmembers(p_udp) + if k in udp_values]) + udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(udp_str, str(p_udp)) + eq_(udp_str, repr(p_udp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_ipv4_tcp(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -371,6 +503,63 @@ class TestPacket(unittest.TestCase): ok_('payload' in protocols) eq_(self.payload, protocols['payload'].tostring()) + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 0, + 'total_length': l, + 'identification': 0, + 'flags': 0, + 'offset': p_ipv4.offset, + 'ttl': 64, + 'proto': inet.IPPROTO_TCP, + 'csum': p_ipv4.csum, + 'src': self.src_ip, + 'dst': self.dst_ip, + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + tcp_values = {'src_port': 0x190f, + 'dst_port': 0x1F90, + 'seq': 0x123, + 'ack': 1, + 'offset': 6, + 'bits': 0b101010, + 'window_size': 2048, + 'csum': p_tcp.csum, + 'urgent': 0x6f, + 'option': p_tcp.option} + _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k])) + for k, v in inspect.getmembers(p_tcp) + if k in tcp_values]) + tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(tcp_str, str(p_tcp)) + eq_(tcp_str, repr(p_tcp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_llc_bpdu(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, @@ -462,3 +651,64 @@ class TestPacket(unittest.TestCase): eq_(20, p_bpdu.max_age) eq_(2, p_bpdu.hello_time) eq_(15, p_bpdu.forward_delay) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IEEE802_3} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ctrl_values = {'modifier_function1': 0, + 'pf_bit': 0, + 'modifier_function2': 0} + _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k])) + for k, v in inspect.getmembers(p_llc.control) + if k in ctrl_values]) + ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str) + + llc_values = {'dsap_addr': repr(llc.SAP_BDPU), + 'ssap_addr': repr(llc.SAP_BDPU), + 'control': ctrl_str} + _llc_str = ','.join(['%s=%s' % (k, llc_values[k]) + for k, v in inspect.getmembers(p_llc) + if k in llc_values]) + llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str) + + bpdu_values = {'protocol_id': bpdu.PROTOCOL_IDENTIFIER, + 'version_id': bpdu.PROTOCOLVERSION_ID_BPDU, + 'bpdu_type': bpdu.TYPE_CONFIG_BPDU, + 'flags': 0, + 'root_priority': long(32768), + 'root_system_id_extension': long(0), + 'root_mac_address': self.src_mac, + 'root_path_cost': 0, + 'bridge_priority': long(32768), + 'bridge_system_id_extension': long(0), + 'bridge_mac_address': self.dst_mac, + 'port_priority': 128, + 'port_number': 4, + 'message_age': float(1), + 'max_age': float(20), + 'hello_time': float(2), + 'forward_delay': float(15)} + _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k])) + for k, v in inspect.getmembers(p_bpdu) + if k in bpdu_values]) + bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str) + + pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(llc_str, str(p_llc)) + eq_(llc_str, repr(p_llc)) + + eq_(bpdu_str, str(p_bpdu)) + eq_(bpdu_str, repr(p_bpdu)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) diff --git a/ryu/tests/unit/packet/test_vrrp.py b/ryu/tests/unit/packet/test_vrrp.py index 13b19ca1..a26bc90f 100644 --- a/ryu/tests/unit/packet/test_vrrp.py +++ b/ryu/tests/unit/packet/test_vrrp.py @@ -19,6 +19,7 @@ import unittest import logging import struct +import inspect from nose.tools import eq_, ok_ from nose.tools import raises @@ -183,6 +184,26 @@ class Test_vrrpv2(unittest.TestCase): max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv2_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv2.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': self.auth_type, + 'auth_data': self.auth_data, + 'identification': self.vrrpv2.identification} + _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k])) + for k, v in inspect.getmembers(self.vrrpv2) + if k in vrrpv2_values]) + vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str) + + eq_(str(self.vrrpv2), vrrpv2_str) + eq_(repr(self.vrrpv2), vrrpv2_str) + class Test_vrrpv3_ipv4(unittest.TestCase): """ Test case for vrrp v3 IPv4 @@ -328,6 +349,26 @@ class Test_vrrpv3_ipv4(unittest.TestCase): max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1 ok_(not self._test_is_valid(max_adver_int=max_adver_int)) + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv3.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) + class Test_vrrpv3_ipv6(unittest.TestCase): """ Test case for vrrp v3 IPv6 @@ -430,3 +471,23 @@ class Test_vrrpv3_ipv6(unittest.TestCase): print(len(p0.data), p0.data) print(len(p1.data), p1.data) eq_(p0.data, p1.data) + + def test_to_string(self): + vrrpv3_values = {'version': self.version, + 'type': self.type_, + 'vrid': self.vrid, + 'priority': self.priority, + 'count_ip': self.count_ip, + 'max_adver_int': self.max_adver_int, + 'checksum': self.vrrpv3.checksum, + 'ip_addresses': [self.ip_address], + 'auth_type': None, + 'auth_data': None, + 'identification': self.vrrpv3.identification} + _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k])) + for k, v in inspect.getmembers(self.vrrpv3) + if k in vrrpv3_values]) + vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str) + + eq_(str(self.vrrpv3), vrrpv3_str) + eq_(repr(self.vrrpv3), vrrpv3_str) |