diff options
author | HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp> | 2012-10-19 15:59:38 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2012-10-23 13:33:43 +0900 |
commit | 51b3b9a2bd086d0bb8e21a0e0e3db015c7e55f7f (patch) | |
tree | 136dcd573e05eed1da560a3839980f69ac95b54c | |
parent | 3027becda19b44131d6d9b0223ef2b1ca33daaf2 (diff) |
test: add unittests for packet library
Signed-off-by: HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/tests/unit/packet/test_arp.py | 5 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_ethernet.py | 90 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_ipv4.py | 131 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_packet.py | 371 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_tcp.py | 140 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_udp.py | 96 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_vlan.py | 5 |
7 files changed, 838 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_arp.py b/ryu/tests/unit/packet/test_arp.py index e5be05b3..d53af418 100644 --- a/ryu/tests/unit/packet/test_arp.py +++ b/ryu/tests/unit/packet/test_arp.py @@ -174,3 +174,8 @@ class Test_arp(unittest.TestCase): eq_(a.dst_mac, self.dst_mac) eq_(a.dst_ip, self.dst_ip) eq_(a.length, self.length) + + @raises(Exception) + def test_malformed_arp(self): + m_short_buf = self.buf[1:arp._MIN_LEN] + arp.parser(m_short_buf) diff --git a/ryu/tests/unit/packet/test_ethernet.py b/ryu/tests/unit/packet/test_ethernet.py new file mode 100644 index 00000000..6378bb65 --- /dev/null +++ b/ryu/tests/unit/packet/test_ethernet.py @@ -0,0 +1,90 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.arp import arp + + +LOG = logging.getLogger('test_ethernet') + + +class Test_ethernet(unittest.TestCase): + """ Test case for ethernet + """ + + dst = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA') + src = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB') + ethertype = ether.ETH_TYPE_ARP + length = struct.calcsize(ethernet._PACK_STR) + + buf = pack(ethernet._PACK_STR, dst, src, ethertype) + + e = ethernet(dst, src, ethertype) + + def setUp(self): + pass + + def tearDown(self): + pass + + def find_protocol(self, pkt, name): + for p in pkt.protocols: + if p.protocol_name == name: + return p + + def test_init(self): + eq_(self.dst, self.e.dst) + eq_(self.src, self.e.src) + eq_(self.ethertype, self.e.ethertype) + eq_(self.length, self.e.length) + + def test_parser(self): + res, ptype = self.e.parser(self.buf) + LOG.debug((res, ptype)) + + eq_(res.dst, self.dst) + eq_(res.src, self.src) + eq_(res.ethertype, self.ethertype) + eq_(res.length, self.length) + eq_(ptype, arp) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.e.serialize(data, prev) + + fmt = ethernet._PACK_STR + res = struct.unpack(fmt, buf) + + eq_(res[0], self.dst) + eq_(res[1], self.src) + eq_(res[2], self.ethertype) + + @raises(Exception) + def test_malformed_ethernet(self): + m_short_buf = self.buf[1:ethernet._MIN_LEN] + ethernet.parser(m_short_buf) diff --git a/ryu/tests/unit/packet/test_ipv4.py b/ryu/tests/unit/packet/test_ipv4.py new file mode 100644 index 00000000..d4b41ff5 --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv4.py @@ -0,0 +1,131 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet import packet_utils +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet.tcp import tcp +import netaddr + + +LOG = logging.getLogger('test_ipv4') + + +class Test_ipv4(unittest.TestCase): + """ Test case for ipv4 + """ + + version = 4 + header_length = 5 + 10 + ver_hlen = version << 4 | header_length + tos = 0 + total_length = header_length + 64 + identification = 30774 + flags = 4 + offset = 1480 + flg_off = flags << 13 | offset + ttl = 64 + proto = inet.IPPROTO_TCP + csum = 0xadc6 + src = int(netaddr.IPAddress('131.151.32.21')) + dst = int(netaddr.IPAddress('131.151.32.129')) + length = header_length * 4 + option = '\x86\x28\x00\x00\x00\x01\x01\x22' \ + + '\x00\x01\xae\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' + + buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification, + flg_off, ttl, proto, csum, src, dst) \ + + option + + ip = ipv4(version, header_length, tos, total_length, identification, + flags, offset, ttl, proto, csum, src, dst, option) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.version, self.ip.version) + eq_(self.header_length, self.ip.header_length) + eq_(self.tos, self.ip.tos) + eq_(self.total_length, self.ip.total_length) + eq_(self.identification, self.ip.identification) + eq_(self.flags, self.ip.flags) + eq_(self.offset, self.ip.offset) + eq_(self.ttl, self.ip.ttl) + eq_(self.proto, self.ip.proto) + eq_(self.csum, self.ip.csum) + eq_(self.src, self.ip.src) + eq_(self.dst, self.ip.dst) + eq_(self.length, self.ip.length) + eq_(self.option, self.ip.option) + + def test_parser(self): + res, ptype = self.ip.parser(self.buf) + + eq_(res.version, self.version) + eq_(res.header_length, self.header_length) + eq_(res.tos, self.tos) + eq_(res.total_length, self.total_length) + eq_(res.identification, self.identification) + eq_(res.flags, self.flags) + eq_(res.offset, self.offset) + eq_(res.ttl, self.ttl) + eq_(res.proto, self.proto) + eq_(res.csum, self.csum) + eq_(res.src, self.src) + eq_(res.dst, self.dst) + eq_(ptype, tcp) + + def test_serialize(self): + buf = self.ip.serialize(bytearray(), None) + res = struct.unpack_from(ipv4._PACK_STR, str(buf)) + option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] + + eq_(res[0], self.ver_hlen) + eq_(res[1], self.tos) + eq_(res[2], self.total_length) + eq_(res[3], self.identification) + eq_(res[4], self.flg_off) + eq_(res[5], self.ttl) + eq_(res[6], self.proto) + eq_(res[8], self.src) + eq_(res[9], self.dst) + eq_(option, self.option) + + # checksum + csum = packet_utils.checksum(buf) + eq_(csum, 0) + + @raises(Exception) + def test_malformed_ipv4(self): + m_short_buf = self.buf[1:ipv4._MIN_LEN] + ipv4.parser(m_short_buf) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py new file mode 100644 index 00000000..2e2601a5 --- /dev/null +++ b/ryu/tests/unit/packet/test_packet.py @@ -0,0 +1,371 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +import array +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet import * + + +LOG = logging.getLogger('test_packet') + + +class TestPacket(unittest.TestCase): + """ Test case for packet + """ + + dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA') + src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB') + dst_ip = int(netaddr.IPAddress('192.168.128.10')) + dst_ip_bin = struct.pack('!I', dst_ip) + src_ip = int(netaddr.IPAddress('192.168.122.20')) + src_ip_bin = struct.pack('!I', src_ip) + payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \ + + '\xcd\xc5\x00\x00\x00\x00\x00\x00' \ + + '\x10\x11\x12\x13\x14\x15\x16\x17' \ + + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' + + def get_protocols(self, pkt): + protocols = {} + for p in pkt: + if hasattr(p, 'protocol_name'): + protocols[p.protocol_name] = p + else: + protocols['payload'] = p + return protocols + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_arp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_ARP) + a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2, + self.src_mac, self.src_ip, self.dst_mac, + self.dst_ip) + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(a) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x06' + + # arp !HHBBH6sI6sI + a_buf = '\x00\x01' \ + + '\x08\x00' \ + + '\x06' \ + + '\x04' \ + + '\x00\x02' \ + + self.src_mac \ + + self.src_ip_bin \ + + self.dst_mac \ + + self.dst_ip_bin + + buf = e_buf + a_buf + eq_(buf, p.data) + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_arp = protocols['arp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_ARP, p_eth.ethertype) + + # arp + ok_(p_arp) + eq_(1, p_arp.hwtype) + eq_(ether.ETH_TYPE_IP, p_arp.proto) + eq_(6, p_arp.hlen) + eq_(4, p_arp.plen) + eq_(2, p_arp.opcode) + eq_(self.src_mac, p_arp.src_mac) + eq_(self.src_ip, p_arp.src_ip) + eq_(self.dst_mac, p_arp.dst_mac) + eq_(self.dst_ip, p_arp.dst_ip) + + def test_vlan_arp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_8021Q) + v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP) + a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2, + self.src_mac, self.src_ip, self.dst_mac, + self.dst_ip) + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(v) + p.add_protocol(a) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x81\x00' + + # vlan !HH + v_buf = '\xF0\x03' \ + + '\x08\x06' + + # arp !HHBBH6sI6sI + a_buf = '\x00\x01' \ + + '\x08\x00' \ + + '\x06' \ + + '\x04' \ + + '\x00\x02' \ + + self.src_mac \ + + self.src_ip_bin \ + + self.dst_mac \ + + self.dst_ip_bin + + buf = e_buf + v_buf + a_buf + eq_(buf, p.data) + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_vlan = protocols['vlan'] + p_arp = protocols['arp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype) + + # vlan + ok_(p_vlan) + eq_(0b111, p_vlan.pcp) + eq_(0b1, p_vlan.cfi) + eq_(3, p_vlan.vid) + eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype) + + # arp + ok_(p_arp) + eq_(1, p_arp.hwtype) + eq_(ether.ETH_TYPE_IP, p_arp.proto) + eq_(6, p_arp.hlen) + eq_(4, p_arp.plen) + eq_(2, p_arp.opcode) + eq_(self.src_mac, p_arp.src_mac) + eq_(self.src_ip, p_arp.src_ip) + eq_(self.dst_mac, p_arp.dst_mac) + eq_(self.dst_ip, p_arp.dst_ip) + + def test_ipv4_udp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0, + self.src_ip, self.dst_ip) + u = udp.udp(0x190F, 0x1F90, 0, 0) + + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(ip) + p.add_protocol(u) + p.add_protocol(self.payload) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x01' \ + + '\x00\x3C' \ + + '\x00\x03' \ + + '\x20\x04' \ + + '\x40' \ + + '\x11' \ + + '\x00\x00' \ + + self.src_ip_bin \ + + self.dst_ip_bin + + # udp !HHHH + u_buf = '\x19\x0F' \ + + '\x1F\x90' \ + + '\x00\x28' \ + + '\x00\x00' + + buf = e_buf + ip_buf + u_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv4 = protocols['ipv4'] + p_udp = protocols['udp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(1, p_ipv4.tos) + l = len(ip_buf) + len(u_buf) + len(self.payload) + eq_(l, p_ipv4.total_length) + eq_(3, p_ipv4.identification) + eq_(1, p_ipv4.flags) + eq_(64, p_ipv4.ttl) + eq_(inet.IPPROTO_UDP, p_ipv4.proto) + eq_(self.src_ip, p_ipv4.src) + eq_(self.dst_ip, p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # udp + ok_(p_udp) + eq_(0x190f, p_udp.src_port) + eq_(0x1F90, p_udp.dst_port) + eq_(len(u_buf) + len(self.payload), p_udp.total_length) + eq_(0x77b2, p_udp.csum) + t = bytearray(u_buf) + struct.pack_into('!H', t, 6, p_udp.csum) + ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0, + 17, len(u_buf) + len(self.payload)) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + ok_('payload' in protocols) + eq_(self.payload, protocols['payload'].tostring()) + + def test_ipv4_tcp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0, + self.src_ip, self.dst_ip) + t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f, + '\x01\x02') + + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(ip) + p.add_protocol(t) + p.add_protocol(self.payload) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x00' \ + + '\x00\x4C' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x40' \ + + '\x06' \ + + '\x00\x00' \ + + self.src_ip_bin \ + + self.dst_ip_bin + + # tcp !HHIIBBHHH + option + t_buf = '\x19\x0F' \ + + '\x1F\x90' \ + + '\x00\x00\x01\x23' \ + + '\x00\x00\x00\x01' \ + + '\x60' \ + + '\x2A' \ + + '\x08\x00' \ + + '\x00\x00' \ + + '\x00\x6F' \ + + '\x01\x02\x00\x00' + + buf = e_buf + ip_buf + t_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv4 = protocols['ipv4'] + p_tcp = protocols['tcp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(0, p_ipv4.tos) + l = len(ip_buf) + len(t_buf) + len(self.payload) + eq_(l, p_ipv4.total_length) + eq_(0, p_ipv4.identification) + eq_(0, p_ipv4.flags) + eq_(64, p_ipv4.ttl) + eq_(inet.IPPROTO_TCP, p_ipv4.proto) + eq_(self.src_ip, p_ipv4.src) + eq_(self.dst_ip, p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # tcp + ok_(p_tcp) + eq_(0x190f, p_tcp.src_port) + eq_(0x1F90, p_tcp.dst_port) + eq_(0x123, p_tcp.seq) + eq_(1, p_tcp.ack) + eq_(6, p_tcp.offset) + eq_(0b101010, p_tcp.bits) + eq_(2048, p_tcp.window_size) + eq_(0x6f, p_tcp.urgent) + eq_(len(t_buf), p_tcp.length) + t = bytearray(t_buf) + struct.pack_into('!H', t, 16, p_tcp.csum) + ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0, + 6, len(t_buf) + len(self.payload)) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + ok_('payload' in protocols) + eq_(self.payload, protocols['payload'].tostring()) diff --git a/ryu/tests/unit/packet/test_tcp.py b/ryu/tests/unit/packet/test_tcp.py new file mode 100644 index 00000000..978cb346 --- /dev/null +++ b/ryu/tests/unit/packet/test_tcp.py @@ -0,0 +1,140 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.tcp import tcp +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet import packet_utils + + +LOG = logging.getLogger('test_tcp') + + +class Test_tcp(unittest.TestCase): + """ Test case for tcp + """ + src_port = 6431 + dst_port = 8080 + seq = 5 + ack = 1 + offset = 6 + bits = 0b101010 + window_size = 2048 + csum = 12345 + urgent = 128 + option = '\x01\x02\x03\x04' + + t = tcp(src_port, dst_port, seq, ack, offset, bits, + window_size, csum, urgent, option) + + buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack, + offset << 4, bits, window_size, csum, urgent) + buf += option + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.src_port, self.t.src_port) + eq_(self.dst_port, self.t.dst_port) + eq_(self.seq, self.t.seq) + eq_(self.ack, self.t.ack) + eq_(self.offset, self.t.offset) + eq_(self.bits, self.t.bits) + eq_(self.window_size, self.t.window_size) + eq_(self.csum, self.t.csum) + eq_(self.urgent, self.t.urgent) + eq_(self.option, self.t.option) + + def test_parser(self): + r1, r2 = self.t.parser(self.buf) + + eq_(self.src_port, r1.src_port) + eq_(self.dst_port, r1.dst_port) + eq_(self.seq, r1.seq) + eq_(self.ack, r1.ack) + eq_(self.offset, r1.offset) + eq_(self.bits, r1.bits) + eq_(self.window_size, r1.window_size) + eq_(self.csum, r1.csum) + eq_(self.urgent, r1.urgent) + eq_(self.option, r1.option) + eq_(None, r2) + + def test_serialize(self): + offset = 5 + csum = 0 + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + t = tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent) + buf = t.serialize(bytearray(), prev) + res = struct.unpack(tcp._PACK_STR, str(buf)) + + eq_(res[0], self.src_port) + eq_(res[1], self.dst_port) + eq_(res[2], self.seq) + eq_(res[3], self.ack) + eq_(res[4], offset << 4) + eq_(res[5], self.bits) + eq_(res[6], self.window_size) + eq_(res[8], self.urgent) + + # checksum + ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 6, offset * 4) + d = ph + buf + bytearray() + s = packet_utils.checksum(d) + eq_(0, s) + + def test_serialize_option(self): + offset = 6 + csum = 0 + option = '\x01\x02' + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + t = tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent, + option) + buf = t.serialize(bytearray(), prev) + r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)] + eq_(option, r_option) + + @raises(Exception) + def test_malformed_tcp(self): + m_short_buf = self.buf[1:tcp._MIN_LEN] + tcp.parser(m_short_buf) diff --git a/ryu/tests/unit/packet/test_udp.py b/ryu/tests/unit/packet/test_udp.py new file mode 100644 index 00000000..e35d56cd --- /dev/null +++ b/ryu/tests/unit/packet/test_udp.py @@ -0,0 +1,96 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.udp import udp +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet import packet_utils + + +LOG = logging.getLogger('test_udp') + + +class Test_udp(unittest.TestCase): + """ Test case for udp + """ + src_port = 6431 + dst_port = 8080 + total_length = 65507 + csum = 12345 + u = udp(src_port, dst_port, total_length, csum) + buf = pack(udp._PACK_STR, src_port, dst_port, total_length, csum) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.src_port, self.u.src_port) + eq_(self.dst_port, self.u.dst_port) + eq_(self.total_length, self.u.total_length) + eq_(self.csum, self.u.csum) + + def test_parser(self): + r1, r2 = self.u.parser(self.buf) + + eq_(self.src_port, r1.src_port) + eq_(self.dst_port, r1.dst_port) + eq_(self.total_length, r1.total_length) + eq_(self.csum, r1.csum) + eq_(None, r2) + + def test_serialize(self): + src_port = 6431 + dst_port = 8080 + total_length = 0 + csum = 0 + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + u = udp(src_port, dst_port, total_length, csum) + buf = u.serialize(bytearray(), prev) + res = struct.unpack(udp._PACK_STR, buf) + + eq_(res[0], src_port) + eq_(res[1], dst_port) + eq_(res[2], struct.calcsize(udp._PACK_STR)) + + # checksum + ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 17, res[2]) + d = ph + buf + bytearray() + s = packet_utils.checksum(d) + eq_(0, s) + + @raises(Exception) + def test_malformed_udp(self): + m_short_buf = self.buf[1:udp._MIN_LEN] + udp.parser(m_short_buf) diff --git a/ryu/tests/unit/packet/test_vlan.py b/ryu/tests/unit/packet/test_vlan.py index cf5ae425..aaf7422d 100644 --- a/ryu/tests/unit/packet/test_vlan.py +++ b/ryu/tests/unit/packet/test_vlan.py @@ -137,3 +137,8 @@ class Test_vlan(unittest.TestCase): eq_(v.vid, self.vid) eq_(v.ethertype, self.ethertype) eq_(v.length, self.length) + + @raises(Exception) + def test_malformed_vlan(self): + m_short_buf = self.buf[1:vlan._MIN_LEN] + vlan.parser(m_short_buf) |