summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorwatanabe.fumitaka <watanabe.fumitaka@nttcom.co.jp>2013-08-05 13:07:47 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2013-08-06 09:25:50 +0900
commit0b5291856bbdada41b7f64c399e71bdf22567a6c (patch)
treea86ff8061385fffac18697883c64bd60e8f017e8
parente0d82b9d375a1ce697b9d32ffb28271a0d1c4304 (diff)
packet lib to string: unit tests
Signed-off-by: WATANABE Fumitaka <watanabe.fumitaka@nttcom.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/tests/unit/packet/test_dhcp.py111
-rw-r--r--ryu/tests/unit/packet/test_icmp.py67
-rw-r--r--ryu/tests/unit/packet/test_icmpv6.py75
-rw-r--r--ryu/tests/unit/packet/test_ipv6.py65
-rw-r--r--ryu/tests/unit/packet/test_lldp.py212
-rw-r--r--ryu/tests/unit/packet/test_mpls.py54
-rw-r--r--ryu/tests/unit/packet/test_packet.py250
-rw-r--r--ryu/tests/unit/packet/test_vrrp.py61
8 files changed, 895 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_dhcp.py b/ryu/tests/unit/packet/test_dhcp.py
new file mode 100644
index 00000000..768b2338
--- /dev/null
+++ b/ryu/tests/unit/packet/test_dhcp.py
@@ -0,0 +1,111 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import inspect
+import logging
+import socket
+import unittest
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.lib import ip
+from ryu.lib import mac
+from ryu.lib.packet import dhcp
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_dhcp_offer(unittest.TestCase):
+
+ op = dhcp.DHCP_BOOT_REPLY
+ chaddr = 'AA:AA:AA:AA:AA:AA'
+ htype = 1
+ hlen = 6
+ hops = 0
+ xid = 1
+ secs = 0
+ flags = 1
+ ciaddr = '192.168.10.10'
+ yiaddr = '192.168.20.20'
+ siaddr = '192.168.30.30'
+ giaddr = '192.168.40.40'
+ sname = 'abc'
+ boot_file = ''
+
+ option_list = [dhcp.option('35', '02'),
+ dhcp.option('01', 'ffffff00'),
+ dhcp.option('03', 'c0a80a09'),
+ dhcp.option('06', 'c0a80a09'),
+ dhcp.option('33', '0003f480'),
+ dhcp.option('3a', '0001fa40'),
+ dhcp.option('3b', '000375f0'),
+ dhcp.option('36', 'c0a80a09')]
+ magic_cookie = socket.inet_aton('99.130.83.99')
+ options = dhcp.options(option_list=option_list,
+ magic_cookie=magic_cookie)
+
+ dh = dhcp.dhcp(op, chaddr, options, htype=htype, hlen=hlen,
+ hops=hops, xid=xid, secs=secs, flags=flags,
+ ciaddr=ciaddr, yiaddr=yiaddr, siaddr=siaddr,
+ giaddr=giaddr, sname=sname, boot_file=boot_file)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_to_string(self):
+ option_values = ['tag', 'length', 'value']
+ opt_str_list = []
+ for option in self.option_list:
+ _opt_str = ','.join(['%s=%s' % (k, repr(getattr(option, k)))
+ for k, v in inspect.getmembers(option)
+ if k in option_values])
+ opt_str = '%s(%s)' % (dhcp.option.__name__, _opt_str)
+ opt_str_list.append(opt_str)
+ option_str = '[%s]' % ', '.join(opt_str_list)
+
+ opts_vals = {'magic_cookie': repr(self.magic_cookie),
+ 'option_list': option_str,
+ 'options_len': repr(self.options.options_len)}
+ _options_str = ','.join(['%s=%s' % (k, opts_vals[k])
+ for k, v in inspect.getmembers(self.options)
+ if k in opts_vals])
+ options_str = '%s(%s)' % (dhcp.options.__name__, _options_str)
+
+ dhcp_values = {'op': repr(self.op),
+ 'htype': repr(self.htype),
+ 'hlen': repr(self.hlen),
+ 'hops': repr(self.hops),
+ 'xid': repr(self.xid),
+ 'secs': repr(self.secs),
+ 'flags': repr(self.flags),
+ 'ciaddr': repr(self.ciaddr),
+ 'yiaddr': repr(self.yiaddr),
+ 'siaddr': repr(self.siaddr),
+ 'giaddr': repr(self.giaddr),
+ 'chaddr': repr(self.chaddr),
+ 'sname': repr(self.sname),
+ 'boot_file': repr(self.boot_file),
+ 'options': options_str}
+ _dh_str = ','.join(['%s=%s' % (k, dhcp_values[k])
+ for k, v in inspect.getmembers(self.dh)
+ if k in dhcp_values])
+ dh_str = '%s(%s)' % (dhcp.dhcp.__name__, _dh_str)
+
+ eq_(str(self.dh), dh_str)
+ eq_(repr(self.dh), dh_str)
diff --git a/ryu/tests/unit/packet/test_icmp.py b/ryu/tests/unit/packet/test_icmp.py
new file mode 100644
index 00000000..75d9702b
--- /dev/null
+++ b/ryu/tests/unit/packet/test_icmp.py
@@ -0,0 +1,67 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import inspect
+import logging
+
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.lib.packet import icmp
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_icmp_dest_unreach(unittest.TestCase):
+
+ type_ = icmp.ICMP_DEST_UNREACH
+ code = icmp.ICMP_HOST_UNREACH_CODE
+ csum = 0
+
+ mtu = 10
+ data = 'abc'
+ data_len = len(data)
+ dst_unreach = icmp.dest_unreach(data_len=data_len, mtu=mtu, data=data)
+
+ ic = icmp.icmp(type_, code, csum, data=dst_unreach)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_to_string(self):
+ data_values = {'data': self.data,
+ 'data_len': self.data_len,
+ 'mtu': self.mtu}
+ _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
+ for k, v in inspect.getmembers(self.dst_unreach)
+ if k in data_values])
+ data_str = '%s(%s)' % (icmp.dest_unreach.__name__, _data_str)
+
+ icmp_values = {'type': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': data_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(self.ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmp.icmp.__name__, _ic_str)
+
+ eq_(str(self.ic), ic_str)
+ eq_(repr(self.ic), ic_str)
diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py
index 87e16960..848ecf9d 100644
--- a/ryu/tests/unit/packet/test_icmpv6.py
+++ b/ryu/tests/unit/packet/test_icmpv6.py
@@ -18,6 +18,7 @@
import unittest
import logging
import struct
+import inspect
from nose.tools import ok_, eq_, nottest, raises
from nose.plugins.skip import Skip, SkipTest
@@ -157,6 +158,30 @@ class Test_icmpv6_echo_request(unittest.TestCase):
def test_serialize_with_data(self):
self._test_serialize(self.data)
+ def test_to_string(self):
+ ec = icmpv6.echo(self.id_, self.seq, self.data)
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ec)
+
+ echo_values = {'id': self.id_,
+ 'seq': self.seq,
+ 'data': self.data}
+ _echo_str = ','.join(['%s=%s' % (k, repr(echo_values[k]))
+ for k, v in inspect.getmembers(ec)
+ if k in echo_values])
+ echo_str = '%s(%s)' % (icmpv6.echo.__name__, _echo_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': echo_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
class Test_icmpv6_echo_reply(Test_icmpv6_echo_request):
def setUp(self):
@@ -263,6 +288,41 @@ class Test_icmpv6_neighbor_solict(unittest.TestCase):
eq_(nd_length, self.nd_length)
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
+ def test_to_string(self):
+ nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(
+ self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
+
+ nd_opt_values = {'hw_src': self.nd_hw_src,
+ 'data': None}
+ _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
+ for k, v in inspect.getmembers(nd_opt)
+ if k in nd_opt_values])
+ nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
+
+ nd_values = {'res': repr(nd.res),
+ 'dst': repr(self.dst),
+ 'type_': repr(self.nd_type),
+ 'length': repr(self.nd_length),
+ 'data': nd_opt_str}
+ _nd_str = ','.join(['%s=%s' % (k, nd_values[k])
+ for k, v in inspect.getmembers(nd)
+ if k in nd_values])
+ nd_str = '%s(%s)' % (icmpv6.nd_neighbor.__name__, _nd_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': nd_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict):
def setUp(self):
@@ -339,3 +399,18 @@ class Test_icmpv6_router_solict(unittest.TestCase):
def test_serialize_with_data(self):
self._test_serialize(self.data)
+
+ def test_to_string(self):
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.data)
+
+ icmp_values = {'type_': self.type_,
+ 'code': self.code,
+ 'csum': self.csum,
+ 'data': self.data}
+ _ic_str = ','.join(['%s=%s' % (k, repr(icmp_values[k]))
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py
new file mode 100644
index 00000000..5454fbfd
--- /dev/null
+++ b/ryu/tests/unit/packet/test_ipv6.py
@@ -0,0 +1,65 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import logging
+import inspect
+
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.lib import ip
+from ryu.lib.packet import ipv6
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_ipv6(unittest.TestCase):
+
+ version = 6
+ traffic_class = 0
+ flow_label = 0
+ payload_length = 817
+ nxt = 6
+ hop_limit = 128
+ src = ip.ipv6_to_bin('2002:4637:d5d3::4637:d5d3')
+ dst = ip.ipv6_to_bin('2001:4860:0:2001::68')
+
+ ip = ipv6.ipv6(version, traffic_class, flow_label, payload_length,
+ nxt, hop_limit, src, dst)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_to_string(self):
+ ipv6_values = {'version': self.version,
+ 'traffic_class': self.traffic_class,
+ 'flow_label': self.flow_label,
+ 'payload_length': self.payload_length,
+ 'nxt': self.nxt,
+ 'hop_limit': self.hop_limit,
+ 'src': self.src,
+ 'dst': self.dst}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(self.ip)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ eq_(str(self.ip), ipv6_str)
+ eq_(repr(self.ip), ipv6_str)
diff --git a/ryu/tests/unit/packet/test_lldp.py b/ryu/tests/unit/packet/test_lldp.py
index 7f95fb47..60e93b00 100644
--- a/ryu/tests/unit/packet/test_lldp.py
+++ b/ryu/tests/unit/packet/test_lldp.py
@@ -18,6 +18,7 @@
import unittest
import logging
import struct
+import inspect
from nose.tools import ok_, eq_, nottest
from ryu.ofproto import ether
@@ -120,6 +121,62 @@ class TestLLDPMandatoryTLV(unittest.TestCase):
pkt.serialize()
eq_(pkt.data, self.data)
+ def test_to_string(self):
+ chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
+ chassis_id='\x00\x04\x96\x1f\xa7\x26')
+ port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
+ port_id='1/3')
+ ttl = lldp.TTL(ttl=120)
+ end = lldp.End()
+ tlvs = (chassis_id, port_id, ttl, end)
+ lldp_pkt = lldp.lldp(tlvs)
+
+ chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS,
+ 'chassis_id': '\x00\x04\x96\x1f\xa7\x26',
+ 'len': chassis_id.len,
+ 'typelen': chassis_id.typelen}
+ _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k]))
+ for k, v in inspect.getmembers(chassis_id)
+ if k in chassis_id_values])
+ tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str)
+
+ port_id_values = {'subtype': port_id.subtype,
+ 'port_id': port_id.port_id,
+ 'len': port_id.len,
+ 'typelen': port_id.typelen}
+ _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k]))
+ for k, v in inspect.getmembers(port_id)
+ if k in port_id_values])
+ tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str)
+
+ ttl_values = {'ttl': ttl.ttl,
+ 'len': ttl.len,
+ 'typelen': ttl.typelen}
+ _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k]))
+ for k, v in inspect.getmembers(ttl)
+ if k in ttl_values])
+ tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str)
+
+ end_values = {'len': end.len,
+ 'typelen': end.typelen}
+ _end_str = ','.join(['%s=%s' % (k, repr(end_values[k]))
+ for k, v in inspect.getmembers(end)
+ if k in end_values])
+ tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str)
+
+ _tlvs_str = '(%s, %s, %s, %s)'
+ tlvs_str = _tlvs_str % (tlv_chassis_id_str,
+ tlv_port_id_str,
+ tlv_ttl_str,
+ tlv_end_str)
+
+ _lldp_str = '%s(tlvs=%s)'
+ lldp_str = _lldp_str % (lldp.lldp.__name__,
+ tlvs_str)
+
+ eq_(str(lldp_pkt), lldp_str)
+ eq_(repr(lldp_pkt), lldp_str)
+
class TestLLDPOptionalTLV(unittest.TestCase):
def setUp(self):
@@ -260,3 +317,158 @@ class TestLLDPOptionalTLV(unittest.TestCase):
# self.data has many organizationally specific TLVs
data = str(pkt.data[:-2])
eq_(data, self.data[:len(data)])
+
+ def test_to_string(self):
+ chassis_id = lldp.ChassisID(subtype=lldp.ChassisID.SUB_MAC_ADDRESS,
+ chassis_id='\x00\x01\x30\xf9\xad\xa0')
+ port_id = lldp.PortID(subtype=lldp.PortID.SUB_INTERFACE_NAME,
+ port_id='1/1')
+ ttl = lldp.TTL(ttl=120)
+ port_desc = lldp.PortDescription(
+ port_description='Summit300-48-Port 1001\x00')
+ sys_name = lldp.SystemName(system_name='Summit300-48\x00')
+ sys_desc = lldp.SystemDescription(
+ system_description='Summit300-48 - Version 7.4e.1 (Build 5) '
+ + 'by Release_Master 05/27/05 04:53:11\x00')
+ sys_cap = lldp.SystemCapabilities(
+ subtype=lldp.ChassisID.SUB_CHASSIS_COMPONENT,
+ system_cap=0x14,
+ enabled_cap=0x14)
+ man_addr = lldp.ManagementAddress(
+ addr_subtype=0x06, addr='\x00\x01\x30\xf9\xad\xa0',
+ intf_subtype=0x02, intf_num=1001,
+ oid='')
+ org_spec = lldp.OrganizationallySpecific(
+ oui='\x00\x12\x0f', subtype=0x02, info='\x07\x01\x00')
+ end = lldp.End()
+ tlvs = (chassis_id, port_id, ttl, port_desc, sys_name,
+ sys_desc, sys_cap, man_addr, org_spec, end)
+ lldp_pkt = lldp.lldp(tlvs)
+
+ # ChassisID string
+ chassis_id_values = {'subtype': lldp.ChassisID.SUB_MAC_ADDRESS,
+ 'chassis_id': '\x00\x01\x30\xf9\xad\xa0',
+ 'len': chassis_id.len,
+ 'typelen': chassis_id.typelen}
+ _ch_id_str = ','.join(['%s=%s' % (k, repr(chassis_id_values[k]))
+ for k, v in inspect.getmembers(chassis_id)
+ if k in chassis_id_values])
+ tlv_chassis_id_str = '%s(%s)' % (lldp.ChassisID.__name__, _ch_id_str)
+
+ # PortID string
+ port_id_values = {'subtype': port_id.subtype,
+ 'port_id': port_id.port_id,
+ 'len': port_id.len,
+ 'typelen': port_id.typelen}
+ _port_id_str = ','.join(['%s=%s' % (k, repr(port_id_values[k]))
+ for k, v in inspect.getmembers(port_id)
+ if k in port_id_values])
+ tlv_port_id_str = '%s(%s)' % (lldp.PortID.__name__, _port_id_str)
+
+ # TTL string
+ ttl_values = {'ttl': ttl.ttl,
+ 'len': ttl.len,
+ 'typelen': ttl.typelen}
+ _ttl_str = ','.join(['%s=%s' % (k, repr(ttl_values[k]))
+ for k, v in inspect.getmembers(ttl)
+ if k in ttl_values])
+ tlv_ttl_str = '%s(%s)' % (lldp.TTL.__name__, _ttl_str)
+
+ # PortDescription string
+ port_desc_values = {'tlv_info': port_desc.tlv_info,
+ 'len': port_desc.len,
+ 'typelen': port_desc.typelen}
+ _port_desc_str = ','.join(['%s=%s' % (k, repr(port_desc_values[k]))
+ for k, v in inspect.getmembers(port_desc)
+ if k in port_desc_values])
+ tlv_port_desc_str = '%s(%s)' % (lldp.PortDescription.__name__,
+ _port_desc_str)
+
+ # SystemName string
+ sys_name_values = {'tlv_info': sys_name.tlv_info,
+ 'len': sys_name.len,
+ 'typelen': sys_name.typelen}
+ _system_name_str = ','.join(['%s=%s' % (k, repr(sys_name_values[k]))
+ for k, v in inspect.getmembers(sys_name)
+ if k in sys_name_values])
+ tlv_system_name_str = '%s(%s)' % (lldp.SystemName.__name__,
+ _system_name_str)
+
+ # SystemDescription string
+ sys_desc_values = {'tlv_info': sys_desc.tlv_info,
+ 'len': sys_desc.len,
+ 'typelen': sys_desc.typelen}
+ _sys_desc_str = ','.join(['%s=%s' % (k, repr(sys_desc_values[k]))
+ for k, v in inspect.getmembers(sys_desc)
+ if k in sys_desc_values])
+ tlv_sys_desc_str = '%s(%s)' % (lldp.SystemDescription.__name__,
+ _sys_desc_str)
+
+ # SystemCapabilities string
+ sys_cap_values = {'subtype': lldp.ChassisID.SUB_CHASSIS_COMPONENT,
+ 'system_cap': 0x14,
+ 'enabled_cap': 0x14,
+ 'len': sys_cap.len,
+ 'typelen': sys_cap.typelen}
+ _sys_cap_str = ','.join(['%s=%s' % (k, repr(sys_cap_values[k]))
+ for k, v in inspect.getmembers(sys_cap)
+ if k in sys_cap_values])
+ tlv_sys_cap_str = '%s(%s)' % (lldp.SystemCapabilities.__name__,
+ _sys_cap_str)
+
+ # ManagementAddress string
+ man_addr_values = {'addr_subtype': 0x06,
+ 'addr': '\x00\x01\x30\xf9\xad\xa0',
+ 'addr_len': man_addr.addr_len,
+ 'intf_subtype': 0x02,
+ 'intf_num': 1001,
+ 'oid': '',
+ 'oid_len': man_addr.oid_len,
+ 'len': man_addr.len,
+ 'typelen': man_addr.typelen}
+ _man_addr_str = ','.join(['%s=%s' % (k, repr(man_addr_values[k]))
+ for k, v in inspect.getmembers(man_addr)
+ if k in man_addr_values])
+ tlv_man_addr_str = '%s(%s)' % (lldp.ManagementAddress.__name__,
+ _man_addr_str)
+
+ # OrganizationallySpecific string
+ org_spec_values = {'oui': '\x00\x12\x0f',
+ 'subtype': 0x02,
+ 'info': '\x07\x01\x00',
+ 'len': org_spec.len,
+ 'typelen': org_spec.typelen}
+ _org_spec_str = ','.join(['%s=%s' % (k, repr(org_spec_values[k]))
+ for k, v in inspect.getmembers(org_spec)
+ if k in org_spec_values])
+ tlv_org_spec_str = '%s(%s)' % (lldp.OrganizationallySpecific.__name__,
+ _org_spec_str)
+
+ # End string
+ end_values = {'len': end.len,
+ 'typelen': end.typelen}
+ _end_str = ','.join(['%s=%s' % (k, repr(end_values[k]))
+ for k, v in inspect.getmembers(end)
+ if k in end_values])
+ tlv_end_str = '%s(%s)' % (lldp.End.__name__, _end_str)
+
+ # tlvs string
+ _tlvs_str = '(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
+ tlvs_str = _tlvs_str % (tlv_chassis_id_str,
+ tlv_port_id_str,
+ tlv_ttl_str,
+ tlv_port_desc_str,
+ tlv_system_name_str,
+ tlv_sys_desc_str,
+ tlv_sys_cap_str,
+ tlv_man_addr_str,
+ tlv_org_spec_str,
+ tlv_end_str)
+
+ # lldp string
+ _lldp_str = '%s(tlvs=%s)'
+ lldp_str = _lldp_str % (lldp.lldp.__name__,
+ tlvs_str)
+
+ eq_(str(lldp_pkt), lldp_str)
+ eq_(repr(lldp_pkt), lldp_str)
diff --git a/ryu/tests/unit/packet/test_mpls.py b/ryu/tests/unit/packet/test_mpls.py
new file mode 100644
index 00000000..abde0307
--- /dev/null
+++ b/ryu/tests/unit/packet/test_mpls.py
@@ -0,0 +1,54 @@
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import logging
+import inspect
+
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.lib.packet import mpls
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_mpls(unittest.TestCase):
+
+ label = 29
+ exp = 6
+ bsb = 1
+ ttl = 64
+ mp = mpls.mpls(label, exp, bsb, ttl)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_to_string(self):
+ mpls_values = {'label': self.label,
+ 'exp': self.exp,
+ 'bsb': self.bsb,
+ 'ttl': self.ttl}
+ _mpls_str = ','.join(['%s=%s' % (k, repr(mpls_values[k]))
+ for k, v in inspect.getmembers(self.mp)
+ if k in mpls_values])
+ mpls_str = '%s(%s)' % (mpls.mpls.__name__, _mpls_str)
+
+ eq_(str(self.mp), mpls_str)
+ eq_(repr(self.mp), mpls_str)
diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py
index 45f27fd0..d8ea0d49 100644
--- a/ryu/tests/unit/packet/test_packet.py
+++ b/ryu/tests/unit/packet/test_packet.py
@@ -19,6 +19,7 @@ import unittest
import logging
import struct
import array
+import inspect
from nose.tools import *
from nose.plugins.skip import Skip, SkipTest
from ryu.ofproto import ether, inet
@@ -116,6 +117,40 @@ class TestPacket(unittest.TestCase):
eq_(self.dst_mac, p_arp.dst_mac)
eq_(self.dst_ip, p_arp.dst_ip)
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_ARP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ arp_values = {'hwtype': 1,
+ 'proto': ether.ETH_TYPE_IP,
+ 'hlen': 6,
+ 'plen': 4,
+ 'opcode': 2,
+ 'src_mac': self.src_mac,
+ 'dst_mac': self.dst_mac,
+ 'src_ip': self.src_ip,
+ 'dst_ip': self.dst_ip}
+ _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
+ for k, v in inspect.getmembers(p_arp)
+ if k in arp_values])
+ arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
+
+ pkt_str = '%s, %s' % (eth_str, arp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(arp_str, str(p_arp))
+ eq_(arp_str, repr(p_arp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
def test_vlan_arp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@@ -185,6 +220,52 @@ class TestPacket(unittest.TestCase):
eq_(self.dst_mac, p_arp.dst_mac)
eq_(self.dst_ip, p_arp.dst_ip)
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_8021Q}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ vlan_values = {'pcp': 0b111,
+ 'cfi': 0b1,
+ 'vid': 3,
+ 'ethertype': ether.ETH_TYPE_ARP}
+ _vlan_str = ','.join(['%s=%s' % (k, repr(vlan_values[k]))
+ for k, v in inspect.getmembers(p_vlan)
+ if k in vlan_values])
+ vlan_str = '%s(%s)' % (vlan.vlan.__name__, _vlan_str)
+
+ arp_values = {'hwtype': 1,
+ 'proto': ether.ETH_TYPE_IP,
+ 'hlen': 6,
+ 'plen': 4,
+ 'opcode': 2,
+ 'src_mac': self.src_mac,
+ 'dst_mac': self.dst_mac,
+ 'src_ip': self.src_ip,
+ 'dst_ip': self.dst_ip}
+ _arp_str = ','.join(['%s=%s' % (k, repr(arp_values[k]))
+ for k, v in inspect.getmembers(p_arp)
+ if k in arp_values])
+ arp_str = '%s(%s)' % (arp.arp.__name__, _arp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, vlan_str, arp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(vlan_str, str(p_vlan))
+ eq_(vlan_str, repr(p_vlan))
+
+ eq_(arp_str, str(p_arp))
+ eq_(arp_str, repr(p_arp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
def test_ipv4_udp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@@ -272,6 +353,57 @@ class TestPacket(unittest.TestCase):
ok_('payload' in protocols)
eq_(self.payload, protocols['payload'].tostring())
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 1,
+ 'total_length': l,
+ 'identification': 3,
+ 'flags': 1,
+ 'offset': p_ipv4.offset,
+ 'ttl': 64,
+ 'proto': inet.IPPROTO_UDP,
+ 'csum': p_ipv4.csum,
+ 'src': self.src_ip,
+ 'dst': self.dst_ip,
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ udp_values = {'src_port': 0x190f,
+ 'dst_port': 0x1F90,
+ 'total_length': len(u_buf) + len(self.payload),
+ 'csum': 0x77b2}
+ _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
+ for k, v in inspect.getmembers(p_udp)
+ if k in udp_values])
+ udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, udp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(udp_str, str(p_udp))
+ eq_(udp_str, repr(p_udp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
def test_ipv4_tcp(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@@ -371,6 +503,63 @@ class TestPacket(unittest.TestCase):
ok_('payload' in protocols)
eq_(self.payload, protocols['payload'].tostring())
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 0,
+ 'total_length': l,
+ 'identification': 0,
+ 'flags': 0,
+ 'offset': p_ipv4.offset,
+ 'ttl': 64,
+ 'proto': inet.IPPROTO_TCP,
+ 'csum': p_ipv4.csum,
+ 'src': self.src_ip,
+ 'dst': self.dst_ip,
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ tcp_values = {'src_port': 0x190f,
+ 'dst_port': 0x1F90,
+ 'seq': 0x123,
+ 'ack': 1,
+ 'offset': 6,
+ 'bits': 0b101010,
+ 'window_size': 2048,
+ 'csum': p_tcp.csum,
+ 'urgent': 0x6f,
+ 'option': p_tcp.option}
+ _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
+ for k, v in inspect.getmembers(p_tcp)
+ if k in tcp_values])
+ tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv4_str, tcp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(tcp_str, str(p_tcp))
+ eq_(tcp_str, repr(p_tcp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
def test_llc_bpdu(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,
@@ -462,3 +651,64 @@ class TestPacket(unittest.TestCase):
eq_(20, p_bpdu.max_age)
eq_(2, p_bpdu.hello_time)
eq_(15, p_bpdu.forward_delay)
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IEEE802_3}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ctrl_values = {'modifier_function1': 0,
+ 'pf_bit': 0,
+ 'modifier_function2': 0}
+ _ctrl_str = ','.join(['%s=%s' % (k, repr(ctrl_values[k]))
+ for k, v in inspect.getmembers(p_llc.control)
+ if k in ctrl_values])
+ ctrl_str = '%s(%s)' % (llc.ControlFormatU.__name__, _ctrl_str)
+
+ llc_values = {'dsap_addr': repr(llc.SAP_BDPU),
+ 'ssap_addr': repr(llc.SAP_BDPU),
+ 'control': ctrl_str}
+ _llc_str = ','.join(['%s=%s' % (k, llc_values[k])
+ for k, v in inspect.getmembers(p_llc)
+ if k in llc_values])
+ llc_str = '%s(%s)' % (llc.llc.__name__, _llc_str)
+
+ bpdu_values = {'protocol_id': bpdu.PROTOCOL_IDENTIFIER,
+ 'version_id': bpdu.PROTOCOLVERSION_ID_BPDU,
+ 'bpdu_type': bpdu.TYPE_CONFIG_BPDU,
+ 'flags': 0,
+ 'root_priority': long(32768),
+ 'root_system_id_extension': long(0),
+ 'root_mac_address': self.src_mac,
+ 'root_path_cost': 0,
+ 'bridge_priority': long(32768),
+ 'bridge_system_id_extension': long(0),
+ 'bridge_mac_address': self.dst_mac,
+ 'port_priority': 128,
+ 'port_number': 4,
+ 'message_age': float(1),
+ 'max_age': float(20),
+ 'hello_time': float(2),
+ 'forward_delay': float(15)}
+ _bpdu_str = ','.join(['%s=%s' % (k, repr(bpdu_values[k]))
+ for k, v in inspect.getmembers(p_bpdu)
+ if k in bpdu_values])
+ bpdu_str = '%s(%s)' % (bpdu.ConfigurationBPDUs.__name__, _bpdu_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, llc_str, bpdu_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(llc_str, str(p_llc))
+ eq_(llc_str, repr(p_llc))
+
+ eq_(bpdu_str, str(p_bpdu))
+ eq_(bpdu_str, repr(p_bpdu))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
diff --git a/ryu/tests/unit/packet/test_vrrp.py b/ryu/tests/unit/packet/test_vrrp.py
index 13b19ca1..a26bc90f 100644
--- a/ryu/tests/unit/packet/test_vrrp.py
+++ b/ryu/tests/unit/packet/test_vrrp.py
@@ -19,6 +19,7 @@
import unittest
import logging
import struct
+import inspect
from nose.tools import eq_, ok_
from nose.tools import raises
@@ -183,6 +184,26 @@ class Test_vrrpv2(unittest.TestCase):
max_adver_int = vrrp.VRRP_V2_MAX_ADVER_INT_MAX + 1
ok_(not self._test_is_valid(max_adver_int=max_adver_int))
+ def test_to_string(self):
+ vrrpv2_values = {'version': self.version,
+ 'type': self.type_,
+ 'vrid': self.vrid,
+ 'priority': self.priority,
+ 'count_ip': self.count_ip,
+ 'max_adver_int': self.max_adver_int,
+ 'checksum': self.vrrpv2.checksum,
+ 'ip_addresses': [self.ip_address],
+ 'auth_type': self.auth_type,
+ 'auth_data': self.auth_data,
+ 'identification': self.vrrpv2.identification}
+ _vrrpv2_str = ','.join(['%s=%s' % (k, repr(vrrpv2_values[k]))
+ for k, v in inspect.getmembers(self.vrrpv2)
+ if k in vrrpv2_values])
+ vrrpv2_str = '%s(%s)' % (vrrp.vrrpv2.__name__, _vrrpv2_str)
+
+ eq_(str(self.vrrpv2), vrrpv2_str)
+ eq_(repr(self.vrrpv2), vrrpv2_str)
+
class Test_vrrpv3_ipv4(unittest.TestCase):
""" Test case for vrrp v3 IPv4
@@ -328,6 +349,26 @@ class Test_vrrpv3_ipv4(unittest.TestCase):
max_adver_int = vrrp.VRRP_V3_MAX_ADVER_INT_MAX + 1
ok_(not self._test_is_valid(max_adver_int=max_adver_int))
+ def test_to_string(self):
+ vrrpv3_values = {'version': self.version,
+ 'type': self.type_,
+ 'vrid': self.vrid,
+ 'priority': self.priority,
+ 'count_ip': self.count_ip,
+ 'max_adver_int': self.max_adver_int,
+ 'checksum': self.vrrpv3.checksum,
+ 'ip_addresses': [self.ip_address],
+ 'auth_type': None,
+ 'auth_data': None,
+ 'identification': self.vrrpv3.identification}
+ _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
+ for k, v in inspect.getmembers(self.vrrpv3)
+ if k in vrrpv3_values])
+ vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
+
+ eq_(str(self.vrrpv3), vrrpv3_str)
+ eq_(repr(self.vrrpv3), vrrpv3_str)
+
class Test_vrrpv3_ipv6(unittest.TestCase):
""" Test case for vrrp v3 IPv6
@@ -430,3 +471,23 @@ class Test_vrrpv3_ipv6(unittest.TestCase):
print(len(p0.data), p0.data)
print(len(p1.data), p1.data)
eq_(p0.data, p1.data)
+
+ def test_to_string(self):
+ vrrpv3_values = {'version': self.version,
+ 'type': self.type_,
+ 'vrid': self.vrid,
+ 'priority': self.priority,
+ 'count_ip': self.count_ip,
+ 'max_adver_int': self.max_adver_int,
+ 'checksum': self.vrrpv3.checksum,
+ 'ip_addresses': [self.ip_address],
+ 'auth_type': None,
+ 'auth_data': None,
+ 'identification': self.vrrpv3.identification}
+ _vrrpv3_str = ','.join(['%s=%s' % (k, repr(vrrpv3_values[k]))
+ for k, v in inspect.getmembers(self.vrrpv3)
+ if k in vrrpv3_values])
+ vrrpv3_str = '%s(%s)' % (vrrp.vrrpv3.__name__, _vrrpv3_str)
+
+ eq_(str(self.vrrpv3), vrrpv3_str)
+ eq_(repr(self.vrrpv3), vrrpv3_str)