diff options
author | Yuichi Ito <ito.yuichi0@gmail.com> | 2013-11-01 14:16:49 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-11-04 13:22:49 +0900 |
commit | 75586c6a57ce1dfd7ae3fad34168031a0b74ae1f (patch) | |
tree | 5affb2b1005a63f6cc559d68e7b128d19187e4d9 | |
parent | 0f7d5353e8ca350115e4ad8147fe40f9c88f778a (diff) |
packet lib: add unittests that use default parameters of IPv4/6 and TCP/UDP/SCTP
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/tests/unit/packet/test_packet.py | 562 |
1 files changed, 562 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py index 03e036dc..a97c6e85 100644 --- a/ryu/tests/unit/packet/test_packet.py +++ b/ryu/tests/unit/packet/test_packet.py @@ -562,6 +562,568 @@ class TestPacket(unittest.TestCase): eq_(pkt_str, str(pkt)) eq_(pkt_str, repr(pkt)) + def test_ipv4_sctp(self): + # build packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(proto=inet.IPPROTO_SCTP) + s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)]) + + p = e/ip/s + p.serialize() + + ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x00' \ + + '\x00\x50' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\xff' \ + + '\x84' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # sctp !HHII + chunk_data !BBHIHHI + payload + s_buf = '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00' \ + + '\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + self.payload + + buf = e_buf + ip_buf + s_buf + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv4 = protocols['ipv4'] + p_sctp = protocols['sctp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(0, p_ipv4.tos) + l = len(ip_buf) + len(s_buf) + eq_(l, p_ipv4.total_length) + eq_(0, p_ipv4.identification) + eq_(0, p_ipv4.flags) + eq_(255, p_ipv4.ttl) + eq_(inet.IPPROTO_SCTP, p_ipv4.proto) + eq_('0.0.0.0', p_ipv4.src) + eq_('0.0.0.0', p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # sctp + ok_(p_sctp) + eq_(0, p_sctp.src_port) + eq_(0, p_sctp.dst_port) + eq_(0, p_sctp.vtag) + assert isinstance(p_sctp.chunks[0], sctp.chunk_data) + eq_(0, p_sctp.chunks[0]._type) + eq_(0, p_sctp.chunks[0].unordered) + eq_(0, p_sctp.chunks[0].begin) + eq_(0, p_sctp.chunks[0].end) + eq_(16 + len(self.payload), p_sctp.chunks[0].length) + eq_(0, p_sctp.chunks[0].tsn) + eq_(0, p_sctp.chunks[0].sid) + eq_(0, p_sctp.chunks[0].seq) + eq_(0, p_sctp.chunks[0].payload_id) + eq_(self.payload, p_sctp.chunks[0].payload_data) + eq_(len(s_buf), len(p_sctp)) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IP} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv4_values = {'version': 4, + 'header_length': 5, + 'tos': 0, + 'total_length': l, + 'identification': 0, + 'flags': 0, + 'offset': 0, + 'ttl': 255, + 'proto': inet.IPPROTO_SCTP, + 'csum': p_ipv4.csum, + 'src': '0.0.0.0', + 'dst': '0.0.0.0', + 'option': None} + _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k])) + for k, v in inspect.getmembers(p_ipv4) + if k in ipv4_values]) + ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str) + + data_values = {'unordered': 0, + 'begin': 0, + 'end': 0, + 'length': 16 + len(self.payload), + 'tsn': 0, + 'sid': 0, + 'seq': 0, + 'payload_id': 0, + 'payload_data': self.payload} + _data_str = ','.join(['%s=%s' % (k, repr(data_values[k])) + for k in sorted(data_values.keys())]) + data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str) + + sctp_values = {'src_port': 0, + 'dst_port': 0, + 'vtag': 0, + 'csum': p_sctp.csum, + 'chunks': data_str} + _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k]) + for k, _ in inspect.getmembers(p_sctp) + if k in sctp_values]) + sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str) + + pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, sctp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv4_str, str(p_ipv4)) + eq_(ipv4_str, repr(p_ipv4)) + + eq_(sctp_str, str(p_sctp)) + eq_(sctp_str, repr(p_sctp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + + def test_ipv6_udp(self): + # build packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IPV6) + ip = ipv6.ipv6(nxt=inet.IPPROTO_UDP) + u = udp.udp() + + p = e/ip/u/self.payload + p.serialize() + + ipaddr = addrconv.ipv6.text_to_bin('::') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x86\xdd' + + # ipv6 !IHBB16s16s' + ip_buf = '\x60\x00\x00\x00' \ + + '\x00\x00' \ + + '\x11' \ + + '\x00' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # udp !HHHH + u_buf = '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x28' \ + + '\x00\x00' + + buf = e_buf + ip_buf + u_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv6 = protocols['ipv6'] + p_udp = protocols['udp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype) + + # ipv6 + ok_(p_ipv6) + eq_(6, p_ipv6.version) + eq_(0, p_ipv6.traffic_class) + eq_(0, p_ipv6.flow_label) + eq_(len(u_buf) + len(self.payload), p_ipv6.payload_length) + eq_(inet.IPPROTO_UDP, p_ipv6.nxt) + eq_(0, p_ipv6.hop_limit) + eq_('::', p_ipv6.src) + eq_('::', p_ipv6.dst) + + # udp + ok_(p_udp) + eq_(0, p_udp.src_port) + eq_(0, p_udp.dst_port) + eq_(len(u_buf) + len(self.payload), p_udp.total_length) + eq_(0x2bc2, p_udp.csum) + t = bytearray(u_buf) + struct.pack_into('!H', t, 6, p_udp.csum) + ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, + len(u_buf) + len(self.payload), 17) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + ok_('payload' in protocols) + eq_(self.payload, protocols['payload'].tostring()) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IPV6} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv6_values = {'version': 6, + 'traffic_class': 0, + 'flow_label': 0, + 'payload_length': len(u_buf) + len(self.payload), + 'nxt': inet.IPPROTO_UDP, + 'hop_limit': 0, + 'src': '::', + 'dst': '::', + 'ext_hdrs': []} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(p_ipv6) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + udp_values = {'src_port': 0, + 'dst_port': 0, + 'total_length': len(u_buf) + len(self.payload), + 'csum': 0x2bc2} + _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k])) + for k, v in inspect.getmembers(p_udp) + if k in udp_values]) + udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, udp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv6_str, str(p_ipv6)) + eq_(ipv6_str, repr(p_ipv6)) + + eq_(udp_str, str(p_udp)) + eq_(udp_str, repr(p_udp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + + def test_ipv6_tcp(self): + # build packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IPV6) + ip = ipv6.ipv6(nxt=inet.IPPROTO_TCP) + t = tcp.tcp(option='\x01\x02') + + p = e/ip/t/self.payload + p.serialize() + + ipaddr = addrconv.ipv6.text_to_bin('::') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x86\xdd' + + # ipv6 !IHBB16s16s' + ip_buf = '\x60\x00\x00\x00' \ + + '\x00\x00' \ + + '\x06' \ + + '\x00' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # tcp !HHIIBBHHH + option + t_buf = '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x60' \ + + '\x00' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x01\x02\x00\x00' + + buf = e_buf + ip_buf + t_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv6 = protocols['ipv6'] + p_tcp = protocols['tcp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype) + + # ipv6 + ok_(p_ipv6) + eq_(6, p_ipv6.version) + eq_(0, p_ipv6.traffic_class) + eq_(0, p_ipv6.flow_label) + eq_(len(t_buf) + len(self.payload), p_ipv6.payload_length) + eq_(inet.IPPROTO_TCP, p_ipv6.nxt) + eq_(0, p_ipv6.hop_limit) + eq_('::', p_ipv6.src) + eq_('::', p_ipv6.dst) + + # tcp + ok_(p_tcp) + eq_(0, p_tcp.src_port) + eq_(0, p_tcp.dst_port) + eq_(0, p_tcp.seq) + eq_(0, p_tcp.ack) + eq_(6, p_tcp.offset) + eq_(0, p_tcp.bits) + eq_(0, p_tcp.window_size) + eq_(0, p_tcp.urgent) + eq_(len(t_buf), len(p_tcp)) + t = bytearray(t_buf) + struct.pack_into('!H', t, 16, p_tcp.csum) + ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr, + len(t_buf) + len(self.payload), 6) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + ok_('payload' in protocols) + eq_(self.payload, protocols['payload'].tostring()) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IPV6} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv6_values = {'version': 6, + 'traffic_class': 0, + 'flow_label': 0, + 'payload_length': len(t_buf) + len(self.payload), + 'nxt': inet.IPPROTO_TCP, + 'hop_limit': 0, + 'src': '::', + 'dst': '::', + 'ext_hdrs': []} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(p_ipv6) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + tcp_values = {'src_port': 0, + 'dst_port': 0, + 'seq': 0, + 'ack': 0, + 'offset': 6, + 'bits': 0, + 'window_size': 0, + 'csum': p_tcp.csum, + 'urgent': 0, + 'option': p_tcp.option} + _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k])) + for k, v in inspect.getmembers(p_tcp) + if k in tcp_values]) + tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str) + + pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, tcp_str, + repr(protocols['payload'])) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv6_str, str(p_ipv6)) + eq_(ipv6_str, repr(p_ipv6)) + + eq_(tcp_str, str(p_tcp)) + eq_(tcp_str, repr(p_tcp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + + def test_ipv6_sctp(self): + # build packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IPV6) + ip = ipv6.ipv6(nxt=inet.IPPROTO_SCTP) + s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)]) + + p = e/ip/s + p.serialize() + + ipaddr = addrconv.ipv6.text_to_bin('::') + + # ethernet !6s6sH + e_buf = self.dst_mac_bin \ + + self.src_mac_bin \ + + '\x86\xdd' + + # ipv6 !IHBB16s16s' + ip_buf = '\x60\x00\x00\x00' \ + + '\x00\x00' \ + + '\x84' \ + + '\x00' \ + + '\x00\x00' \ + + ipaddr \ + + ipaddr + + # sctp !HHII + chunk_data !BBHIHHI + payload + s_buf = '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00' \ + + '\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x00\x00\x00\x00' \ + + self.payload + + buf = e_buf + ip_buf + s_buf + + # parse + pkt = packet.Packet(array.array('B', p.data)) + protocols = self.get_protocols(pkt) + p_eth = protocols['ethernet'] + p_ipv6 = protocols['ipv6'] + p_sctp = protocols['sctp'] + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype) + + # ipv6 + ok_(p_ipv6) + eq_(6, p_ipv6.version) + eq_(0, p_ipv6.traffic_class) + eq_(0, p_ipv6.flow_label) + eq_(len(s_buf), p_ipv6.payload_length) + eq_(inet.IPPROTO_SCTP, p_ipv6.nxt) + eq_(0, p_ipv6.hop_limit) + eq_('::', p_ipv6.src) + eq_('::', p_ipv6.dst) + + # sctp + ok_(p_sctp) + eq_(0, p_sctp.src_port) + eq_(0, p_sctp.dst_port) + eq_(0, p_sctp.vtag) + assert isinstance(p_sctp.chunks[0], sctp.chunk_data) + eq_(0, p_sctp.chunks[0]._type) + eq_(0, p_sctp.chunks[0].unordered) + eq_(0, p_sctp.chunks[0].begin) + eq_(0, p_sctp.chunks[0].end) + eq_(16 + len(self.payload), p_sctp.chunks[0].length) + eq_(0, p_sctp.chunks[0].tsn) + eq_(0, p_sctp.chunks[0].sid) + eq_(0, p_sctp.chunks[0].seq) + eq_(0, p_sctp.chunks[0].payload_id) + eq_(self.payload, p_sctp.chunks[0].payload_data) + eq_(len(s_buf), len(p_sctp)) + + # to string + eth_values = {'dst': self.dst_mac, + 'src': self.src_mac, + 'ethertype': ether.ETH_TYPE_IPV6} + _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k])) + for k, v in inspect.getmembers(p_eth) + if k in eth_values]) + eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str) + + ipv6_values = {'version': 6, + 'traffic_class': 0, + 'flow_label': 0, + 'payload_length': len(s_buf), + 'nxt': inet.IPPROTO_SCTP, + 'hop_limit': 0, + 'src': '::', + 'dst': '::', + 'ext_hdrs': []} + _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k])) + for k, v in inspect.getmembers(p_ipv6) + if k in ipv6_values]) + ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str) + + data_values = {'unordered': 0, + 'begin': 0, + 'end': 0, + 'length': 16 + len(self.payload), + 'tsn': 0, + 'sid': 0, + 'seq': 0, + 'payload_id': 0, + 'payload_data': self.payload} + _data_str = ','.join(['%s=%s' % (k, repr(data_values[k])) + for k in sorted(data_values.keys())]) + data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str) + + sctp_values = {'src_port': 0, + 'dst_port': 0, + 'vtag': 0, + 'csum': p_sctp.csum, + 'chunks': data_str} + _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k]) + for k, _ in inspect.getmembers(p_sctp) + if k in sctp_values]) + sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str) + + pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, sctp_str) + + eq_(eth_str, str(p_eth)) + eq_(eth_str, repr(p_eth)) + + eq_(ipv6_str, str(p_ipv6)) + eq_(ipv6_str, repr(p_ipv6)) + + eq_(sctp_str, str(p_sctp)) + eq_(sctp_str, repr(p_sctp)) + + eq_(pkt_str, str(pkt)) + eq_(pkt_str, repr(pkt)) + def test_llc_bpdu(self): # buid packet e = ethernet.ethernet(self.dst_mac, self.src_mac, |