summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorHIYAMA Manabu <hiyama.manabu@po.ntts.co.jp>2012-10-19 15:59:38 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2012-10-23 13:33:43 +0900
commit51b3b9a2bd086d0bb8e21a0e0e3db015c7e55f7f (patch)
tree136dcd573e05eed1da560a3839980f69ac95b54c
parent3027becda19b44131d6d9b0223ef2b1ca33daaf2 (diff)
test: add unittests for packet library
Signed-off-by: HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/tests/unit/packet/test_arp.py5
-rw-r--r--ryu/tests/unit/packet/test_ethernet.py90
-rw-r--r--ryu/tests/unit/packet/test_ipv4.py131
-rw-r--r--ryu/tests/unit/packet/test_packet.py371
-rw-r--r--ryu/tests/unit/packet/test_tcp.py140
-rw-r--r--ryu/tests/unit/packet/test_udp.py96
-rw-r--r--ryu/tests/unit/packet/test_vlan.py5
7 files changed, 838 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_arp.py b/ryu/tests/unit/packet/test_arp.py
index e5be05b3..d53af418 100644
--- a/ryu/tests/unit/packet/test_arp.py
+++ b/ryu/tests/unit/packet/test_arp.py
@@ -174,3 +174,8 @@ class Test_arp(unittest.TestCase):
eq_(a.dst_mac, self.dst_mac)
eq_(a.dst_ip, self.dst_ip)
eq_(a.length, self.length)
+
+ @raises(Exception)
+ def test_malformed_arp(self):
+ m_short_buf = self.buf[1:arp._MIN_LEN]
+ arp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_ethernet.py b/ryu/tests/unit/packet/test_ethernet.py
new file mode 100644
index 00000000..6378bb65
--- /dev/null
+++ b/ryu/tests/unit/packet/test_ethernet.py
@@ -0,0 +1,90 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.arp import arp
+
+
+LOG = logging.getLogger('test_ethernet')
+
+
+class Test_ethernet(unittest.TestCase):
+ """ Test case for ethernet
+ """
+
+ dst = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
+ src = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
+ ethertype = ether.ETH_TYPE_ARP
+ length = struct.calcsize(ethernet._PACK_STR)
+
+ buf = pack(ethernet._PACK_STR, dst, src, ethertype)
+
+ e = ethernet(dst, src, ethertype)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def find_protocol(self, pkt, name):
+ for p in pkt.protocols:
+ if p.protocol_name == name:
+ return p
+
+ def test_init(self):
+ eq_(self.dst, self.e.dst)
+ eq_(self.src, self.e.src)
+ eq_(self.ethertype, self.e.ethertype)
+ eq_(self.length, self.e.length)
+
+ def test_parser(self):
+ res, ptype = self.e.parser(self.buf)
+ LOG.debug((res, ptype))
+
+ eq_(res.dst, self.dst)
+ eq_(res.src, self.src)
+ eq_(res.ethertype, self.ethertype)
+ eq_(res.length, self.length)
+ eq_(ptype, arp)
+
+ def test_serialize(self):
+ data = bytearray()
+ prev = None
+ buf = self.e.serialize(data, prev)
+
+ fmt = ethernet._PACK_STR
+ res = struct.unpack(fmt, buf)
+
+ eq_(res[0], self.dst)
+ eq_(res[1], self.src)
+ eq_(res[2], self.ethertype)
+
+ @raises(Exception)
+ def test_malformed_ethernet(self):
+ m_short_buf = self.buf[1:ethernet._MIN_LEN]
+ ethernet.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_ipv4.py b/ryu/tests/unit/packet/test_ipv4.py
new file mode 100644
index 00000000..d4b41ff5
--- /dev/null
+++ b/ryu/tests/unit/packet/test_ipv4.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet import packet_utils
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet.tcp import tcp
+import netaddr
+
+
+LOG = logging.getLogger('test_ipv4')
+
+
+class Test_ipv4(unittest.TestCase):
+ """ Test case for ipv4
+ """
+
+ version = 4
+ header_length = 5 + 10
+ ver_hlen = version << 4 | header_length
+ tos = 0
+ total_length = header_length + 64
+ identification = 30774
+ flags = 4
+ offset = 1480
+ flg_off = flags << 13 | offset
+ ttl = 64
+ proto = inet.IPPROTO_TCP
+ csum = 0xadc6
+ src = int(netaddr.IPAddress('131.151.32.21'))
+ dst = int(netaddr.IPAddress('131.151.32.129'))
+ length = header_length * 4
+ option = '\x86\x28\x00\x00\x00\x01\x01\x22' \
+ + '\x00\x01\xae\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification,
+ flg_off, ttl, proto, csum, src, dst) \
+ + option
+
+ ip = ipv4(version, header_length, tos, total_length, identification,
+ flags, offset, ttl, proto, csum, src, dst, option)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.version, self.ip.version)
+ eq_(self.header_length, self.ip.header_length)
+ eq_(self.tos, self.ip.tos)
+ eq_(self.total_length, self.ip.total_length)
+ eq_(self.identification, self.ip.identification)
+ eq_(self.flags, self.ip.flags)
+ eq_(self.offset, self.ip.offset)
+ eq_(self.ttl, self.ip.ttl)
+ eq_(self.proto, self.ip.proto)
+ eq_(self.csum, self.ip.csum)
+ eq_(self.src, self.ip.src)
+ eq_(self.dst, self.ip.dst)
+ eq_(self.length, self.ip.length)
+ eq_(self.option, self.ip.option)
+
+ def test_parser(self):
+ res, ptype = self.ip.parser(self.buf)
+
+ eq_(res.version, self.version)
+ eq_(res.header_length, self.header_length)
+ eq_(res.tos, self.tos)
+ eq_(res.total_length, self.total_length)
+ eq_(res.identification, self.identification)
+ eq_(res.flags, self.flags)
+ eq_(res.offset, self.offset)
+ eq_(res.ttl, self.ttl)
+ eq_(res.proto, self.proto)
+ eq_(res.csum, self.csum)
+ eq_(res.src, self.src)
+ eq_(res.dst, self.dst)
+ eq_(ptype, tcp)
+
+ def test_serialize(self):
+ buf = self.ip.serialize(bytearray(), None)
+ res = struct.unpack_from(ipv4._PACK_STR, str(buf))
+ option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)]
+
+ eq_(res[0], self.ver_hlen)
+ eq_(res[1], self.tos)
+ eq_(res[2], self.total_length)
+ eq_(res[3], self.identification)
+ eq_(res[4], self.flg_off)
+ eq_(res[5], self.ttl)
+ eq_(res[6], self.proto)
+ eq_(res[8], self.src)
+ eq_(res[9], self.dst)
+ eq_(option, self.option)
+
+ # checksum
+ csum = packet_utils.checksum(buf)
+ eq_(csum, 0)
+
+ @raises(Exception)
+ def test_malformed_ipv4(self):
+ m_short_buf = self.buf[1:ipv4._MIN_LEN]
+ ipv4.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py
new file mode 100644
index 00000000..2e2601a5
--- /dev/null
+++ b/ryu/tests/unit/packet/test_packet.py
@@ -0,0 +1,371 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+import array
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet import *
+
+
+LOG = logging.getLogger('test_packet')
+
+
+class TestPacket(unittest.TestCase):
+ """ Test case for packet
+ """
+
+ dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA')
+ src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB')
+ dst_ip = int(netaddr.IPAddress('192.168.128.10'))
+ dst_ip_bin = struct.pack('!I', dst_ip)
+ src_ip = int(netaddr.IPAddress('192.168.122.20'))
+ src_ip_bin = struct.pack('!I', src_ip)
+ payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \
+ + '\xcd\xc5\x00\x00\x00\x00\x00\x00' \
+ + '\x10\x11\x12\x13\x14\x15\x16\x17' \
+ + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
+
+ def get_protocols(self, pkt):
+ protocols = {}
+ for p in pkt:
+ if hasattr(p, 'protocol_name'):
+ protocols[p.protocol_name] = p
+ else:
+ protocols['payload'] = p
+ return protocols
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_arp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_ARP)
+ a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+ self.src_mac, self.src_ip, self.dst_mac,
+ self.dst_ip)
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(a)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac \
+ + self.src_mac \
+ + '\x08\x06'
+
+ # arp !HHBBH6sI6sI
+ a_buf = '\x00\x01' \
+ + '\x08\x00' \
+ + '\x06' \
+ + '\x04' \
+ + '\x00\x02' \
+ + self.src_mac \
+ + self.src_ip_bin \
+ + self.dst_mac \
+ + self.dst_ip_bin
+
+ buf = e_buf + a_buf
+ eq_(buf, p.data)
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_arp = protocols['arp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_ARP, p_eth.ethertype)
+
+ # arp
+ ok_(p_arp)
+ eq_(1, p_arp.hwtype)
+ eq_(ether.ETH_TYPE_IP, p_arp.proto)
+ eq_(6, p_arp.hlen)
+ eq_(4, p_arp.plen)
+ eq_(2, p_arp.opcode)
+ eq_(self.src_mac, p_arp.src_mac)
+ eq_(self.src_ip, p_arp.src_ip)
+ eq_(self.dst_mac, p_arp.dst_mac)
+ eq_(self.dst_ip, p_arp.dst_ip)
+
+ def test_vlan_arp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_8021Q)
+ v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP)
+ a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2,
+ self.src_mac, self.src_ip, self.dst_mac,
+ self.dst_ip)
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(v)
+ p.add_protocol(a)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac \
+ + self.src_mac \
+ + '\x81\x00'
+
+ # vlan !HH
+ v_buf = '\xF0\x03' \
+ + '\x08\x06'
+
+ # arp !HHBBH6sI6sI
+ a_buf = '\x00\x01' \
+ + '\x08\x00' \
+ + '\x06' \
+ + '\x04' \
+ + '\x00\x02' \
+ + self.src_mac \
+ + self.src_ip_bin \
+ + self.dst_mac \
+ + self.dst_ip_bin
+
+ buf = e_buf + v_buf + a_buf
+ eq_(buf, p.data)
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_vlan = protocols['vlan']
+ p_arp = protocols['arp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype)
+
+ # vlan
+ ok_(p_vlan)
+ eq_(0b111, p_vlan.pcp)
+ eq_(0b1, p_vlan.cfi)
+ eq_(3, p_vlan.vid)
+ eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype)
+
+ # arp
+ ok_(p_arp)
+ eq_(1, p_arp.hwtype)
+ eq_(ether.ETH_TYPE_IP, p_arp.proto)
+ eq_(6, p_arp.hlen)
+ eq_(4, p_arp.plen)
+ eq_(2, p_arp.opcode)
+ eq_(self.src_mac, p_arp.src_mac)
+ eq_(self.src_ip, p_arp.src_ip)
+ eq_(self.dst_mac, p_arp.dst_mac)
+ eq_(self.dst_ip, p_arp.dst_ip)
+
+ def test_ipv4_udp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IP)
+ ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0,
+ self.src_ip, self.dst_ip)
+ u = udp.udp(0x190F, 0x1F90, 0, 0)
+
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(ip)
+ p.add_protocol(u)
+ p.add_protocol(self.payload)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac \
+ + self.src_mac \
+ + '\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = '\x45' \
+ + '\x01' \
+ + '\x00\x3C' \
+ + '\x00\x03' \
+ + '\x20\x04' \
+ + '\x40' \
+ + '\x11' \
+ + '\x00\x00' \
+ + self.src_ip_bin \
+ + self.dst_ip_bin
+
+ # udp !HHHH
+ u_buf = '\x19\x0F' \
+ + '\x1F\x90' \
+ + '\x00\x28' \
+ + '\x00\x00'
+
+ buf = e_buf + ip_buf + u_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_udp = protocols['udp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(1, p_ipv4.tos)
+ l = len(ip_buf) + len(u_buf) + len(self.payload)
+ eq_(l, p_ipv4.total_length)
+ eq_(3, p_ipv4.identification)
+ eq_(1, p_ipv4.flags)
+ eq_(64, p_ipv4.ttl)
+ eq_(inet.IPPROTO_UDP, p_ipv4.proto)
+ eq_(self.src_ip, p_ipv4.src)
+ eq_(self.dst_ip, p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # udp
+ ok_(p_udp)
+ eq_(0x190f, p_udp.src_port)
+ eq_(0x1F90, p_udp.dst_port)
+ eq_(len(u_buf) + len(self.payload), p_udp.total_length)
+ eq_(0x77b2, p_udp.csum)
+ t = bytearray(u_buf)
+ struct.pack_into('!H', t, 6, p_udp.csum)
+ ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
+ 17, len(u_buf) + len(self.payload))
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'].tostring())
+
+ def test_ipv4_tcp(self):
+ # buid packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IP)
+ ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0,
+ self.src_ip, self.dst_ip)
+ t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f,
+ '\x01\x02')
+
+ p = packet.Packet()
+ p.add_protocol(e)
+ p.add_protocol(ip)
+ p.add_protocol(t)
+ p.add_protocol(self.payload)
+ p.serialize()
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac \
+ + self.src_mac \
+ + '\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = '\x45' \
+ + '\x00' \
+ + '\x00\x4C' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\x40' \
+ + '\x06' \
+ + '\x00\x00' \
+ + self.src_ip_bin \
+ + self.dst_ip_bin
+
+ # tcp !HHIIBBHHH + option
+ t_buf = '\x19\x0F' \
+ + '\x1F\x90' \
+ + '\x00\x00\x01\x23' \
+ + '\x00\x00\x00\x01' \
+ + '\x60' \
+ + '\x2A' \
+ + '\x08\x00' \
+ + '\x00\x00' \
+ + '\x00\x6F' \
+ + '\x01\x02\x00\x00'
+
+ buf = e_buf + ip_buf + t_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_tcp = protocols['tcp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(0, p_ipv4.tos)
+ l = len(ip_buf) + len(t_buf) + len(self.payload)
+ eq_(l, p_ipv4.total_length)
+ eq_(0, p_ipv4.identification)
+ eq_(0, p_ipv4.flags)
+ eq_(64, p_ipv4.ttl)
+ eq_(inet.IPPROTO_TCP, p_ipv4.proto)
+ eq_(self.src_ip, p_ipv4.src)
+ eq_(self.dst_ip, p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # tcp
+ ok_(p_tcp)
+ eq_(0x190f, p_tcp.src_port)
+ eq_(0x1F90, p_tcp.dst_port)
+ eq_(0x123, p_tcp.seq)
+ eq_(1, p_tcp.ack)
+ eq_(6, p_tcp.offset)
+ eq_(0b101010, p_tcp.bits)
+ eq_(2048, p_tcp.window_size)
+ eq_(0x6f, p_tcp.urgent)
+ eq_(len(t_buf), p_tcp.length)
+ t = bytearray(t_buf)
+ struct.pack_into('!H', t, 16, p_tcp.csum)
+ ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0,
+ 6, len(t_buf) + len(self.payload))
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'].tostring())
diff --git a/ryu/tests/unit/packet/test_tcp.py b/ryu/tests/unit/packet/test_tcp.py
new file mode 100644
index 00000000..978cb346
--- /dev/null
+++ b/ryu/tests/unit/packet/test_tcp.py
@@ -0,0 +1,140 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.tcp import tcp
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet import packet_utils
+
+
+LOG = logging.getLogger('test_tcp')
+
+
+class Test_tcp(unittest.TestCase):
+ """ Test case for tcp
+ """
+ src_port = 6431
+ dst_port = 8080
+ seq = 5
+ ack = 1
+ offset = 6
+ bits = 0b101010
+ window_size = 2048
+ csum = 12345
+ urgent = 128
+ option = '\x01\x02\x03\x04'
+
+ t = tcp(src_port, dst_port, seq, ack, offset, bits,
+ window_size, csum, urgent, option)
+
+ buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack,
+ offset << 4, bits, window_size, csum, urgent)
+ buf += option
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.src_port, self.t.src_port)
+ eq_(self.dst_port, self.t.dst_port)
+ eq_(self.seq, self.t.seq)
+ eq_(self.ack, self.t.ack)
+ eq_(self.offset, self.t.offset)
+ eq_(self.bits, self.t.bits)
+ eq_(self.window_size, self.t.window_size)
+ eq_(self.csum, self.t.csum)
+ eq_(self.urgent, self.t.urgent)
+ eq_(self.option, self.t.option)
+
+ def test_parser(self):
+ r1, r2 = self.t.parser(self.buf)
+
+ eq_(self.src_port, r1.src_port)
+ eq_(self.dst_port, r1.dst_port)
+ eq_(self.seq, r1.seq)
+ eq_(self.ack, r1.ack)
+ eq_(self.offset, r1.offset)
+ eq_(self.bits, r1.bits)
+ eq_(self.window_size, r1.window_size)
+ eq_(self.csum, r1.csum)
+ eq_(self.urgent, r1.urgent)
+ eq_(self.option, r1.option)
+ eq_(None, r2)
+
+ def test_serialize(self):
+ offset = 5
+ csum = 0
+
+ src_ip = int(netaddr.IPAddress('192.168.10.1'))
+ dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+ prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+ inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+ t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
+ offset, self.bits, self.window_size, csum, self.urgent)
+ buf = t.serialize(bytearray(), prev)
+ res = struct.unpack(tcp._PACK_STR, str(buf))
+
+ eq_(res[0], self.src_port)
+ eq_(res[1], self.dst_port)
+ eq_(res[2], self.seq)
+ eq_(res[3], self.ack)
+ eq_(res[4], offset << 4)
+ eq_(res[5], self.bits)
+ eq_(res[6], self.window_size)
+ eq_(res[8], self.urgent)
+
+ # checksum
+ ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 6, offset * 4)
+ d = ph + buf + bytearray()
+ s = packet_utils.checksum(d)
+ eq_(0, s)
+
+ def test_serialize_option(self):
+ offset = 6
+ csum = 0
+ option = '\x01\x02'
+
+ src_ip = int(netaddr.IPAddress('192.168.10.1'))
+ dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+ prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+ inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+ t = tcp(self.src_port, self.dst_port, self.seq, self.ack,
+ offset, self.bits, self.window_size, csum, self.urgent,
+ option)
+ buf = t.serialize(bytearray(), prev)
+ r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)]
+ eq_(option, r_option)
+
+ @raises(Exception)
+ def test_malformed_tcp(self):
+ m_short_buf = self.buf[1:tcp._MIN_LEN]
+ tcp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_udp.py b/ryu/tests/unit/packet/test_udp.py
new file mode 100644
index 00000000..e35d56cd
--- /dev/null
+++ b/ryu/tests/unit/packet/test_udp.py
@@ -0,0 +1,96 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import struct
+import netaddr
+from struct import *
+from nose.tools import *
+from nose.plugins.skip import Skip, SkipTest
+from ryu.ofproto import ether, inet
+from ryu.lib import mac
+from ryu.lib.packet.ethernet import ethernet
+from ryu.lib.packet.packet import Packet
+from ryu.lib.packet.udp import udp
+from ryu.lib.packet.ipv4 import ipv4
+from ryu.lib.packet import packet_utils
+
+
+LOG = logging.getLogger('test_udp')
+
+
+class Test_udp(unittest.TestCase):
+ """ Test case for udp
+ """
+ src_port = 6431
+ dst_port = 8080
+ total_length = 65507
+ csum = 12345
+ u = udp(src_port, dst_port, total_length, csum)
+ buf = pack(udp._PACK_STR, src_port, dst_port, total_length, csum)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.src_port, self.u.src_port)
+ eq_(self.dst_port, self.u.dst_port)
+ eq_(self.total_length, self.u.total_length)
+ eq_(self.csum, self.u.csum)
+
+ def test_parser(self):
+ r1, r2 = self.u.parser(self.buf)
+
+ eq_(self.src_port, r1.src_port)
+ eq_(self.dst_port, r1.dst_port)
+ eq_(self.total_length, r1.total_length)
+ eq_(self.csum, r1.csum)
+ eq_(None, r2)
+
+ def test_serialize(self):
+ src_port = 6431
+ dst_port = 8080
+ total_length = 0
+ csum = 0
+
+ src_ip = int(netaddr.IPAddress('192.168.10.1'))
+ dst_ip = int(netaddr.IPAddress('192.168.100.1'))
+ prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64,
+ inet.IPPROTO_UDP, 0, src_ip, dst_ip)
+
+ u = udp(src_port, dst_port, total_length, csum)
+ buf = u.serialize(bytearray(), prev)
+ res = struct.unpack(udp._PACK_STR, buf)
+
+ eq_(res[0], src_port)
+ eq_(res[1], dst_port)
+ eq_(res[2], struct.calcsize(udp._PACK_STR))
+
+ # checksum
+ ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 17, res[2])
+ d = ph + buf + bytearray()
+ s = packet_utils.checksum(d)
+ eq_(0, s)
+
+ @raises(Exception)
+ def test_malformed_udp(self):
+ m_short_buf = self.buf[1:udp._MIN_LEN]
+ udp.parser(m_short_buf)
diff --git a/ryu/tests/unit/packet/test_vlan.py b/ryu/tests/unit/packet/test_vlan.py
index cf5ae425..aaf7422d 100644
--- a/ryu/tests/unit/packet/test_vlan.py
+++ b/ryu/tests/unit/packet/test_vlan.py
@@ -137,3 +137,8 @@ class Test_vlan(unittest.TestCase):
eq_(v.vid, self.vid)
eq_(v.ethertype, self.ethertype)
eq_(v.length, self.length)
+
+ @raises(Exception)
+ def test_malformed_vlan(self):
+ m_short_buf = self.buf[1:vlan._MIN_LEN]
+ vlan.parser(m_short_buf)