diff options
-rw-r--r-- | ryu/tests/unit/packet/test_tcp.py | 143 |
1 files changed, 110 insertions, 33 deletions
diff --git a/ryu/tests/unit/packet/test_tcp.py b/ryu/tests/unit/packet/test_tcp.py index 862ad6f4..7224df0e 100644 --- a/ryu/tests/unit/packet/test_tcp.py +++ b/ryu/tests/unit/packet/test_tcp.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# Copyright (C) 2012-2015 Nippon Telegraph and Telephone Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,9 +21,8 @@ import six import struct from struct import * from nose.tools import * -from ryu.ofproto import ether, inet -from ryu.lib.packet.packet import Packet -from ryu.lib.packet.tcp import tcp +from ryu.ofproto import inet +from ryu.lib.packet import tcp from ryu.lib.packet.ipv4 import ipv4 from ryu.lib.packet import packet_utils from ryu.lib import addrconv @@ -46,10 +45,10 @@ class Test_tcp(unittest.TestCase): urgent = 128 option = b'\x01\x02\x03\x04' - t = tcp(src_port, dst_port, seq, ack, offset, bits, - window_size, csum, urgent, option) + t = tcp.tcp(src_port, dst_port, seq, ack, offset, bits, + window_size, csum, urgent, option) - buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack, + buf = pack(tcp.tcp._PACK_STR, src_port, dst_port, seq, ack, offset << 4, bits, window_size, csum, urgent) buf += option @@ -95,10 +94,10 @@ class Test_tcp(unittest.TestCase): prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0, src_ip, dst_ip) - t = tcp(self.src_port, self.dst_port, self.seq, self.ack, - offset, self.bits, self.window_size, csum, self.urgent) + t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent) buf = t.serialize(bytearray(), prev) - res = struct.unpack(tcp._PACK_STR, six.binary_type(buf)) + res = struct.unpack(tcp.tcp._PACK_STR, six.binary_type(buf)) eq_(res[0], self.src_port) eq_(res[1], self.dst_port) @@ -109,41 +108,63 @@ class Test_tcp(unittest.TestCase): eq_(res[6], self.window_size) eq_(res[8], self.urgent) + # test __len__ + # offset indicates the number of 32 bit (= 4 bytes) + # words in the TCP Header. + # So, we compare len(tcp) with offset * 4, here. + eq_(offset * 4, len(t)) + # checksum ph = struct.pack('!4s4sBBH', addrconv.ipv4.text_to_bin(src_ip), addrconv.ipv4.text_to_bin(dst_ip), 0, 6, offset * 4) - d = ph + buf + bytearray() + d = ph + buf s = packet_utils.checksum(d) eq_(0, s) def test_serialize_option(self): - offset = 6 + # prepare test data + offset = 0 csum = 0 - option = b'\x01\x02' - - src_ip = '192.168.10.1' - dst_ip = '192.168.100.1' + option = [ + tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460), + tcp.TCPOptionSACKPermitted(), + tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632), + tcp.TCPOptionNoOperation(), + tcp.TCPOptionWindowScale(shift_cnt=9), + ] + option_buf = ( + b'\x02\x04\x05\xb4' + b'\x04\x02' + b'\x08\x0a\x11\x22\x33\x44\x55\x66\x77\x88' + b'\x01' + b'\x03\x03\x09' + ) prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, - inet.IPPROTO_TCP, 0, src_ip, dst_ip) + inet.IPPROTO_TCP, 0, '192.168.10.1', '192.168.100.1') - t = tcp(self.src_port, self.dst_port, self.seq, self.ack, - offset, self.bits, self.window_size, csum, self.urgent, - option) + # test serializer + t = tcp.tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent, + option) buf = t.serialize(bytearray(), prev) - r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)] - eq_(option, r_option) + r_option_buf = buf[tcp.tcp._MIN_LEN:tcp.tcp._MIN_LEN + len(option_buf)] + eq_(option_buf, r_option_buf) + + # test parser + (r_tcp, _, _) = tcp.tcp.parser(buf) + eq_(str(option), str(r_tcp.option)) @raises(Exception) def test_malformed_tcp(self): - m_short_buf = self.buf[1:tcp._MIN_LEN] - tcp.parser(m_short_buf) + m_short_buf = self.buf[1:tcp.tcp._MIN_LEN] + tcp.tcp.parser(m_short_buf) def test_default_args(self): prev = ipv4(proto=inet.IPPROTO_TCP) - t = tcp() + t = tcp.tcp() buf = t.serialize(bytearray(), prev) - res = struct.unpack(tcp._PACK_STR, buf) + res = struct.unpack(tcp.tcp._PACK_STR, buf) eq_(res[0], 1) eq_(res[1], 1) @@ -155,9 +176,9 @@ class Test_tcp(unittest.TestCase): eq_(res[8], 0) # with option, without offset - t = tcp(option=b'\x01\x02\x03') + t = tcp.tcp(option=[tcp.TCPOptionMaximumSegmentSize(1460)]) buf = t.serialize(bytearray(), prev) - res = struct.unpack(tcp._PACK_STR + '4s', buf) + res = struct.unpack(tcp.tcp._PACK_STR + '4s', buf) eq_(res[0], 1) eq_(res[1], 1) @@ -167,12 +188,12 @@ class Test_tcp(unittest.TestCase): eq_(res[5], 0) eq_(res[6], 0) eq_(res[8], 0) - eq_(res[9], b'\x01\x02\x03\x00') + eq_(res[9], b'\x02\x04\x05\xb4') # with option, with long offset - t = tcp(offset=7, option=b'\x01\x02\x03') + t = tcp.tcp(offset=7, option=[tcp.TCPOptionWindowScale(shift_cnt=9)]) buf = t.serialize(bytearray(), prev) - res = struct.unpack(tcp._PACK_STR + '8s', buf) + res = struct.unpack(tcp.tcp._PACK_STR + '8s', buf) eq_(res[0], 1) eq_(res[1], 1) @@ -182,9 +203,65 @@ class Test_tcp(unittest.TestCase): eq_(res[5], 0) eq_(res[6], 0) eq_(res[8], 0) - eq_(res[9], b'\x01\x02\x03\x00\x00\x00\x00\x00') + eq_(res[9], b'\x03\x03\x09\x00\x00\x00\x00\x00') def test_json(self): jsondict = self.t.to_jsondict() - t = tcp.from_jsondict(jsondict['tcp']) + t = tcp.tcp.from_jsondict(jsondict['tcp']) eq_(str(self.t), str(t)) + + +class Test_TCPOption(unittest.TestCase): + # prepare test data + input_options = [ + tcp.TCPOptionEndOfOptionList(), + tcp.TCPOptionNoOperation(), + tcp.TCPOptionMaximumSegmentSize(max_seg_size=1460), + tcp.TCPOptionWindowScale(shift_cnt=9), + tcp.TCPOptionSACKPermitted(), + tcp.TCPOptionSACK(blocks=[(1, 2), (3, 4)], length=18), + tcp.TCPOptionTimestamps(ts_val=287454020, ts_ecr=1432778632), + tcp.TCPOptionUserTimeout(granularity=1, user_timeout=564), + tcp.TCPOptionAuthentication( + key_id=1, r_next_key_id=2, + mac=b'abcdefghijkl', length=16), + tcp.TCPOptionUnknown(value=b'foobar', kind=255, length=8), + tcp.TCPOptionUnknown(value=b'', kind=255, length=2), + ] + input_buf = ( + b'\x00' # End of Option List + b'\x01' # No-Operation + b'\x02\x04\x05\xb4' # Maximum Segment Size + b'\x03\x03\x09' # Window Scale + b'\x04\x02' # SACK Permitted + b'\x05\x12' # SACK + b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03\x00\x00\x00\x04' + b'\x08\x0a' # Timestamps + b'\x11\x22\x33\x44\x55\x66\x77\x88' + b'\x1c\x04\x82\x34' # User Timeout Option + b'\x1d\x10\x01\x02' # TCP Authentication Option (TCP-AO) + b'abcdefghijkl' + b'\xff\x08' # Unknown with body + b'foobar' + b'\xff\x02' # Unknown + ) + + def test_serialize(self): + output_buf = bytearray() + for option in self.input_options: + output_buf += option.serialize() + eq_(self.input_buf, output_buf) + + def test_parser(self): + buf = self.input_buf + output_options = [] + while buf: + opt, buf = tcp.TCPOption.parser(buf) + output_options.append(opt) + eq_(str(self.input_options), str(output_options)) + + def test_json(self): + for option in self.input_options: + json_dict = option.to_jsondict()[option.__class__.__name__] + output_option = option.__class__.from_jsondict(json_dict) + eq_(str(option), str(output_option)) |