summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYuichi Ito <ito.yuichi0@gmail.com>2013-11-01 14:16:49 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2013-11-04 13:22:49 +0900
commit75586c6a57ce1dfd7ae3fad34168031a0b74ae1f (patch)
tree5affb2b1005a63f6cc559d68e7b128d19187e4d9
parent0f7d5353e8ca350115e4ad8147fe40f9c88f778a (diff)
packet lib: add unittests that use default parameters of IPv4/6 and TCP/UDP/SCTP
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/tests/unit/packet/test_packet.py562
1 files changed, 562 insertions, 0 deletions
diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py
index 03e036dc..a97c6e85 100644
--- a/ryu/tests/unit/packet/test_packet.py
+++ b/ryu/tests/unit/packet/test_packet.py
@@ -562,6 +562,568 @@ class TestPacket(unittest.TestCase):
eq_(pkt_str, str(pkt))
eq_(pkt_str, repr(pkt))
+ def test_ipv4_sctp(self):
+ # build packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IP)
+ ip = ipv4.ipv4(proto=inet.IPPROTO_SCTP)
+ s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
+
+ p = e/ip/s
+ p.serialize()
+
+ ipaddr = addrconv.ipv4.text_to_bin('0.0.0.0')
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + '\x08\x00'
+
+ # ipv4 !BBHHHBBHII
+ ip_buf = '\x45' \
+ + '\x00' \
+ + '\x00\x50' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\xff' \
+ + '\x84' \
+ + '\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # sctp !HHII + chunk_data !BBHIHHI + payload
+ s_buf = '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00' \
+ + '\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + self.payload
+
+ buf = e_buf + ip_buf + s_buf
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv4 = protocols['ipv4']
+ p_sctp = protocols['sctp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IP, p_eth.ethertype)
+
+ # ipv4
+ ok_(p_ipv4)
+ eq_(4, p_ipv4.version)
+ eq_(5, p_ipv4.header_length)
+ eq_(0, p_ipv4.tos)
+ l = len(ip_buf) + len(s_buf)
+ eq_(l, p_ipv4.total_length)
+ eq_(0, p_ipv4.identification)
+ eq_(0, p_ipv4.flags)
+ eq_(255, p_ipv4.ttl)
+ eq_(inet.IPPROTO_SCTP, p_ipv4.proto)
+ eq_('0.0.0.0', p_ipv4.src)
+ eq_('0.0.0.0', p_ipv4.dst)
+ t = bytearray(ip_buf)
+ struct.pack_into('!H', t, 10, p_ipv4.csum)
+ eq_(packet_utils.checksum(t), 0)
+
+ # sctp
+ ok_(p_sctp)
+ eq_(0, p_sctp.src_port)
+ eq_(0, p_sctp.dst_port)
+ eq_(0, p_sctp.vtag)
+ assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
+ eq_(0, p_sctp.chunks[0]._type)
+ eq_(0, p_sctp.chunks[0].unordered)
+ eq_(0, p_sctp.chunks[0].begin)
+ eq_(0, p_sctp.chunks[0].end)
+ eq_(16 + len(self.payload), p_sctp.chunks[0].length)
+ eq_(0, p_sctp.chunks[0].tsn)
+ eq_(0, p_sctp.chunks[0].sid)
+ eq_(0, p_sctp.chunks[0].seq)
+ eq_(0, p_sctp.chunks[0].payload_id)
+ eq_(self.payload, p_sctp.chunks[0].payload_data)
+ eq_(len(s_buf), len(p_sctp))
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IP}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv4_values = {'version': 4,
+ 'header_length': 5,
+ 'tos': 0,
+ 'total_length': l,
+ 'identification': 0,
+ 'flags': 0,
+ 'offset': 0,
+ 'ttl': 255,
+ 'proto': inet.IPPROTO_SCTP,
+ 'csum': p_ipv4.csum,
+ 'src': '0.0.0.0',
+ 'dst': '0.0.0.0',
+ 'option': None}
+ _ipv4_str = ','.join(['%s=%s' % (k, repr(ipv4_values[k]))
+ for k, v in inspect.getmembers(p_ipv4)
+ if k in ipv4_values])
+ ipv4_str = '%s(%s)' % (ipv4.ipv4.__name__, _ipv4_str)
+
+ data_values = {'unordered': 0,
+ 'begin': 0,
+ 'end': 0,
+ 'length': 16 + len(self.payload),
+ 'tsn': 0,
+ 'sid': 0,
+ 'seq': 0,
+ 'payload_id': 0,
+ 'payload_data': self.payload}
+ _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
+ for k in sorted(data_values.keys())])
+ data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
+
+ sctp_values = {'src_port': 0,
+ 'dst_port': 0,
+ 'vtag': 0,
+ 'csum': p_sctp.csum,
+ 'chunks': data_str}
+ _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
+ for k, _ in inspect.getmembers(p_sctp)
+ if k in sctp_values])
+ sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv4_str, sctp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv4_str, str(p_ipv4))
+ eq_(ipv4_str, repr(p_ipv4))
+
+ eq_(sctp_str, str(p_sctp))
+ eq_(sctp_str, repr(p_sctp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_udp(self):
+ # build packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_UDP)
+ u = udp.udp()
+
+ p = e/ip/u/self.payload
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + '\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = '\x60\x00\x00\x00' \
+ + '\x00\x00' \
+ + '\x11' \
+ + '\x00' \
+ + '\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # udp !HHHH
+ u_buf = '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x28' \
+ + '\x00\x00'
+
+ buf = e_buf + ip_buf + u_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_udp = protocols['udp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(u_buf) + len(self.payload), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_UDP, p_ipv6.nxt)
+ eq_(0, p_ipv6.hop_limit)
+ eq_('::', p_ipv6.src)
+ eq_('::', p_ipv6.dst)
+
+ # udp
+ ok_(p_udp)
+ eq_(0, p_udp.src_port)
+ eq_(0, p_udp.dst_port)
+ eq_(len(u_buf) + len(self.payload), p_udp.total_length)
+ eq_(0x2bc2, p_udp.csum)
+ t = bytearray(u_buf)
+ struct.pack_into('!H', t, 6, p_udp.csum)
+ ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
+ len(u_buf) + len(self.payload), 17)
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'].tostring())
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(u_buf) + len(self.payload),
+ 'nxt': inet.IPPROTO_UDP,
+ 'hop_limit': 0,
+ 'src': '::',
+ 'dst': '::',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ udp_values = {'src_port': 0,
+ 'dst_port': 0,
+ 'total_length': len(u_buf) + len(self.payload),
+ 'csum': 0x2bc2}
+ _udp_str = ','.join(['%s=%s' % (k, repr(udp_values[k]))
+ for k, v in inspect.getmembers(p_udp)
+ if k in udp_values])
+ udp_str = '%s(%s)' % (udp.udp.__name__, _udp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, udp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(udp_str, str(p_udp))
+ eq_(udp_str, repr(p_udp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_tcp(self):
+ # build packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_TCP)
+ t = tcp.tcp(option='\x01\x02')
+
+ p = e/ip/t/self.payload
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + '\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = '\x60\x00\x00\x00' \
+ + '\x00\x00' \
+ + '\x06' \
+ + '\x00' \
+ + '\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # tcp !HHIIBBHHH + option
+ t_buf = '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x60' \
+ + '\x00' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\x01\x02\x00\x00'
+
+ buf = e_buf + ip_buf + t_buf + self.payload
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_tcp = protocols['tcp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(t_buf) + len(self.payload), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_TCP, p_ipv6.nxt)
+ eq_(0, p_ipv6.hop_limit)
+ eq_('::', p_ipv6.src)
+ eq_('::', p_ipv6.dst)
+
+ # tcp
+ ok_(p_tcp)
+ eq_(0, p_tcp.src_port)
+ eq_(0, p_tcp.dst_port)
+ eq_(0, p_tcp.seq)
+ eq_(0, p_tcp.ack)
+ eq_(6, p_tcp.offset)
+ eq_(0, p_tcp.bits)
+ eq_(0, p_tcp.window_size)
+ eq_(0, p_tcp.urgent)
+ eq_(len(t_buf), len(p_tcp))
+ t = bytearray(t_buf)
+ struct.pack_into('!H', t, 16, p_tcp.csum)
+ ph = struct.pack('!16s16sI3xB', ipaddr, ipaddr,
+ len(t_buf) + len(self.payload), 6)
+ t = ph + t + self.payload
+ eq_(packet_utils.checksum(t), 0)
+
+ # payload
+ ok_('payload' in protocols)
+ eq_(self.payload, protocols['payload'].tostring())
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(t_buf) + len(self.payload),
+ 'nxt': inet.IPPROTO_TCP,
+ 'hop_limit': 0,
+ 'src': '::',
+ 'dst': '::',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ tcp_values = {'src_port': 0,
+ 'dst_port': 0,
+ 'seq': 0,
+ 'ack': 0,
+ 'offset': 6,
+ 'bits': 0,
+ 'window_size': 0,
+ 'csum': p_tcp.csum,
+ 'urgent': 0,
+ 'option': p_tcp.option}
+ _tcp_str = ','.join(['%s=%s' % (k, repr(tcp_values[k]))
+ for k, v in inspect.getmembers(p_tcp)
+ if k in tcp_values])
+ tcp_str = '%s(%s)' % (tcp.tcp.__name__, _tcp_str)
+
+ pkt_str = '%s, %s, %s, %s' % (eth_str, ipv6_str, tcp_str,
+ repr(protocols['payload']))
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(tcp_str, str(p_tcp))
+ eq_(tcp_str, repr(p_tcp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
+ def test_ipv6_sctp(self):
+ # build packet
+ e = ethernet.ethernet(self.dst_mac, self.src_mac,
+ ether.ETH_TYPE_IPV6)
+ ip = ipv6.ipv6(nxt=inet.IPPROTO_SCTP)
+ s = sctp.sctp(chunks=[sctp.chunk_data(payload_data=self.payload)])
+
+ p = e/ip/s
+ p.serialize()
+
+ ipaddr = addrconv.ipv6.text_to_bin('::')
+
+ # ethernet !6s6sH
+ e_buf = self.dst_mac_bin \
+ + self.src_mac_bin \
+ + '\x86\xdd'
+
+ # ipv6 !IHBB16s16s'
+ ip_buf = '\x60\x00\x00\x00' \
+ + '\x00\x00' \
+ + '\x84' \
+ + '\x00' \
+ + '\x00\x00' \
+ + ipaddr \
+ + ipaddr
+
+ # sctp !HHII + chunk_data !BBHIHHI + payload
+ s_buf = '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00' \
+ + '\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00' \
+ + '\x00\x00\x00\x00' \
+ + self.payload
+
+ buf = e_buf + ip_buf + s_buf
+
+ # parse
+ pkt = packet.Packet(array.array('B', p.data))
+ protocols = self.get_protocols(pkt)
+ p_eth = protocols['ethernet']
+ p_ipv6 = protocols['ipv6']
+ p_sctp = protocols['sctp']
+
+ # ethernet
+ ok_(p_eth)
+ eq_(self.dst_mac, p_eth.dst)
+ eq_(self.src_mac, p_eth.src)
+ eq_(ether.ETH_TYPE_IPV6, p_eth.ethertype)
+
+ # ipv6
+ ok_(p_ipv6)
+ eq_(6, p_ipv6.version)
+ eq_(0, p_ipv6.traffic_class)
+ eq_(0, p_ipv6.flow_label)
+ eq_(len(s_buf), p_ipv6.payload_length)
+ eq_(inet.IPPROTO_SCTP, p_ipv6.nxt)
+ eq_(0, p_ipv6.hop_limit)
+ eq_('::', p_ipv6.src)
+ eq_('::', p_ipv6.dst)
+
+ # sctp
+ ok_(p_sctp)
+ eq_(0, p_sctp.src_port)
+ eq_(0, p_sctp.dst_port)
+ eq_(0, p_sctp.vtag)
+ assert isinstance(p_sctp.chunks[0], sctp.chunk_data)
+ eq_(0, p_sctp.chunks[0]._type)
+ eq_(0, p_sctp.chunks[0].unordered)
+ eq_(0, p_sctp.chunks[0].begin)
+ eq_(0, p_sctp.chunks[0].end)
+ eq_(16 + len(self.payload), p_sctp.chunks[0].length)
+ eq_(0, p_sctp.chunks[0].tsn)
+ eq_(0, p_sctp.chunks[0].sid)
+ eq_(0, p_sctp.chunks[0].seq)
+ eq_(0, p_sctp.chunks[0].payload_id)
+ eq_(self.payload, p_sctp.chunks[0].payload_data)
+ eq_(len(s_buf), len(p_sctp))
+
+ # to string
+ eth_values = {'dst': self.dst_mac,
+ 'src': self.src_mac,
+ 'ethertype': ether.ETH_TYPE_IPV6}
+ _eth_str = ','.join(['%s=%s' % (k, repr(eth_values[k]))
+ for k, v in inspect.getmembers(p_eth)
+ if k in eth_values])
+ eth_str = '%s(%s)' % (ethernet.ethernet.__name__, _eth_str)
+
+ ipv6_values = {'version': 6,
+ 'traffic_class': 0,
+ 'flow_label': 0,
+ 'payload_length': len(s_buf),
+ 'nxt': inet.IPPROTO_SCTP,
+ 'hop_limit': 0,
+ 'src': '::',
+ 'dst': '::',
+ 'ext_hdrs': []}
+ _ipv6_str = ','.join(['%s=%s' % (k, repr(ipv6_values[k]))
+ for k, v in inspect.getmembers(p_ipv6)
+ if k in ipv6_values])
+ ipv6_str = '%s(%s)' % (ipv6.ipv6.__name__, _ipv6_str)
+
+ data_values = {'unordered': 0,
+ 'begin': 0,
+ 'end': 0,
+ 'length': 16 + len(self.payload),
+ 'tsn': 0,
+ 'sid': 0,
+ 'seq': 0,
+ 'payload_id': 0,
+ 'payload_data': self.payload}
+ _data_str = ','.join(['%s=%s' % (k, repr(data_values[k]))
+ for k in sorted(data_values.keys())])
+ data_str = '[%s(%s)]' % (sctp.chunk_data.__name__, _data_str)
+
+ sctp_values = {'src_port': 0,
+ 'dst_port': 0,
+ 'vtag': 0,
+ 'csum': p_sctp.csum,
+ 'chunks': data_str}
+ _sctp_str = ','.join(['%s=%s' % (k, sctp_values[k])
+ for k, _ in inspect.getmembers(p_sctp)
+ if k in sctp_values])
+ sctp_str = '%s(%s)' % (sctp.sctp.__name__, _sctp_str)
+
+ pkt_str = '%s, %s, %s' % (eth_str, ipv6_str, sctp_str)
+
+ eq_(eth_str, str(p_eth))
+ eq_(eth_str, repr(p_eth))
+
+ eq_(ipv6_str, str(p_ipv6))
+ eq_(ipv6_str, repr(p_ipv6))
+
+ eq_(sctp_str, str(p_sctp))
+ eq_(sctp_str, repr(p_sctp))
+
+ eq_(pkt_str, str(pkt))
+ eq_(pkt_str, repr(pkt))
+
def test_llc_bpdu(self):
# buid packet
e = ethernet.ethernet(self.dst_mac, self.src_mac,