diff options
Diffstat (limited to 'tests/unit/packet/test_udp.py')
-rw-r--r-- | tests/unit/packet/test_udp.py | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/tests/unit/packet/test_udp.py b/tests/unit/packet/test_udp.py new file mode 100644 index 00000000..0d7d0aa9 --- /dev/null +++ b/tests/unit/packet/test_udp.py @@ -0,0 +1,110 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +from struct import * +from nose.tools import * +from ryu.ofproto import ether, inet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.udp import udp +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet import packet_utils +from ryu.lib import addrconv + + +LOG = logging.getLogger('test_udp') + + +class Test_udp(unittest.TestCase): + """ Test case for udp + """ + src_port = 6431 + dst_port = 8080 + total_length = 65507 + csum = 12345 + u = udp(src_port, dst_port, total_length, csum) + buf = pack(udp._PACK_STR, src_port, dst_port, total_length, csum) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.src_port, self.u.src_port) + eq_(self.dst_port, self.u.dst_port) + eq_(self.total_length, self.u.total_length) + eq_(self.csum, self.u.csum) + + def test_parser(self): + r1, r2, _ = self.u.parser(self.buf) + + eq_(self.src_port, r1.src_port) + eq_(self.dst_port, r1.dst_port) + eq_(self.total_length, r1.total_length) + eq_(self.csum, r1.csum) + eq_(None, r2) + + def test_serialize(self): + src_port = 6431 + dst_port = 8080 + total_length = 0 + csum = 0 + + src_ip = '192.168.10.1' + dst_ip = '192.168.100.1' + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + u = udp(src_port, dst_port, total_length, csum) + buf = u.serialize(bytearray(), prev) + res = struct.unpack(udp._PACK_STR, buf) + + eq_(res[0], src_port) + eq_(res[1], dst_port) + eq_(res[2], struct.calcsize(udp._PACK_STR)) + + # checksum + ph = struct.pack('!4s4sBBH', + addrconv.ipv4.text_to_bin(src_ip), + addrconv.ipv4.text_to_bin(dst_ip), 0, 17, res[2]) + d = ph + buf + bytearray() + s = packet_utils.checksum(d) + eq_(0, s) + + @raises(Exception) + def test_malformed_udp(self): + m_short_buf = self.buf[1:udp._MIN_LEN] + udp.parser(m_short_buf) + + def test_default_args(self): + prev = ipv4(proto=inet.IPPROTO_UDP) + u = udp() + buf = u.serialize(bytearray(), prev) + res = struct.unpack(udp._PACK_STR, buf) + + eq_(res[0], 1) + eq_(res[1], 1) + eq_(res[2], udp._MIN_LEN) + + def test_json(self): + jsondict = self.u.to_jsondict() + u = udp.from_jsondict(jsondict['udp']) + eq_(str(self.u), str(u)) |