From 21f29c6f416b5dc482927a46318a4683e39ada92 Mon Sep 17 00:00:00 2001 From: HIYAMA Manabu Date: Thu, 11 Oct 2012 17:54:37 +0900 Subject: test: add unittests for packet library Signed-off-by: HIYAMA Manabu Signed-off-by: FUJITA Tomonori --- ryu/tests/unit/packet/__init__.py | 0 ryu/tests/unit/packet/test_arp.py | 176 +++++++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_vlan.py | 139 +++++++++++++++++++++++++++++ tools/test-requires | 1 + 4 files changed, 316 insertions(+) create mode 100644 ryu/tests/unit/packet/__init__.py create mode 100644 ryu/tests/unit/packet/test_arp.py create mode 100644 ryu/tests/unit/packet/test_vlan.py diff --git a/ryu/tests/unit/packet/__init__.py b/ryu/tests/unit/packet/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ryu/tests/unit/packet/test_arp.py b/ryu/tests/unit/packet/test_arp.py new file mode 100644 index 00000000..e5be05b3 --- /dev/null +++ b/ryu/tests/unit/packet/test_arp.py @@ -0,0 +1,176 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.arp import arp +from ryu.lib.packet.vlan import vlan + + +LOG = logging.getLogger('test_arp') + + +class Test_arp(unittest.TestCase): + """ Test case for arp + """ + + hwtype = 1 + proto = 0x0800 + hlen = 6 + plen = 4 + opcode = 1 + src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54') + src_ip = int(netaddr.IPAddress('24.166.172.1')) + dst_mac = mac.haddr_to_bin('00:00:00:00:00:00') + dst_ip = int(netaddr.IPAddress('24.166.173.159')) + + fmt = arp._PACK_STR + length = struct.calcsize(arp._PACK_STR) + buf = pack(fmt, hwtype, proto, hlen, plen, opcode, src_mac, src_ip, + dst_mac, dst_ip) + + a = arp(hwtype, proto, hlen, plen, opcode, src_mac, src_ip, dst_mac, + dst_ip) + + def setUp(self): + pass + + def tearDown(self): + pass + + def find_protocol(self, pkt, name): + for p in pkt.protocols: + if p.protocol_name == name: + return p + + def test_init(self): + eq_(self.hwtype, self.a.hwtype) + eq_(self.proto, self.a.proto) + eq_(self.hlen, self.a.hlen) + eq_(self.plen, self.a.plen) + eq_(self.opcode, self.a.opcode) + eq_(self.src_mac, self.a.src_mac) + eq_(self.src_ip, self.a.src_ip) + eq_(self.dst_mac, self.a.dst_mac) + eq_(self.dst_ip, self.a.dst_ip) + eq_(self.length, self.a.length) + + def test_parser(self): + _res = self.a.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + + eq_(res.hwtype, self.hwtype) + eq_(res.proto, self.proto) + eq_(res.hlen, self.hlen) + eq_(res.plen, self.plen) + eq_(res.opcode, self.opcode) + eq_(res.src_mac, self.src_mac) + eq_(res.src_ip, self.src_ip) + eq_(res.dst_mac, self.dst_mac) + eq_(res.dst_ip, self.dst_ip) + eq_(res.length, self.length) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.a.serialize(data, prev) + + fmt = arp._PACK_STR + res = struct.unpack(fmt, buf) + + eq_(res[0], self.hwtype) + eq_(res[1], self.proto) + eq_(res[2], self.hlen) + eq_(res[3], self.plen) + eq_(res[4], self.opcode) + eq_(res[5], self.src_mac) + eq_(res[6], self.src_ip) + eq_(res[7], self.dst_mac) + eq_(res[8], self.dst_ip) + + def _build_arp(self, vlan_enabled): + if vlan_enabled is True: + ethertype = ether.ETH_TYPE_8021Q + v = vlan(1, 1, 3, ether.ETH_TYPE_ARP) + else: + ethertype = ether.ETH_TYPE_ARP + e = ethernet(self.dst_mac, self.src_mac, ethertype) + p = Packet() + + p.add_protocol(e) + if vlan_enabled is True: + p.add_protocol(v) + p.add_protocol(self.a) + p.serialize() + return p + + def test_build_arp_vlan(self): + p = self._build_arp(True) + + e = self.find_protocol(p, "ethernet") + ok_(e) + eq_(e.ethertype, ether.ETH_TYPE_8021Q) + + v = self.find_protocol(p, "vlan") + ok_(v) + eq_(v.ethertype, ether.ETH_TYPE_ARP) + + a = self.find_protocol(p, "arp") + ok_(a) + + eq_(a.hwtype, self.hwtype) + eq_(a.proto, self.proto) + eq_(a.hlen, self.hlen) + eq_(a.plen, self.plen) + eq_(a.opcode, self.opcode) + eq_(a.src_mac, self.src_mac) + eq_(a.src_ip, self.src_ip) + eq_(a.dst_mac, self.dst_mac) + eq_(a.dst_ip, self.dst_ip) + + def test_build_arp_novlan(self): + p = self._build_arp(False) + + e = self.find_protocol(p, "ethernet") + ok_(e) + eq_(e.ethertype, ether.ETH_TYPE_ARP) + + a = self.find_protocol(p, "arp") + ok_(a) + + eq_(a.hwtype, self.hwtype) + eq_(a.proto, self.proto) + eq_(a.hlen, self.hlen) + eq_(a.plen, self.plen) + eq_(a.opcode, self.opcode) + eq_(a.src_mac, self.src_mac) + eq_(a.src_ip, self.src_ip) + eq_(a.dst_mac, self.dst_mac) + eq_(a.dst_ip, self.dst_ip) + eq_(a.length, self.length) diff --git a/ryu/tests/unit/packet/test_vlan.py b/ryu/tests/unit/packet/test_vlan.py new file mode 100644 index 00000000..cf5ae425 --- /dev/null +++ b/ryu/tests/unit/packet/test_vlan.py @@ -0,0 +1,139 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet.vlan import vlan + + +LOG = logging.getLogger('test_vlan') + + +class Test_vlan(unittest.TestCase): + """ Test case for vlan + """ + + pcp = 0 + cfi = 0 + vid = 32 + tci = pcp << 15 | cfi << 12 | vid + ethertype = ether.ETH_TYPE_IP + length = struct.calcsize(vlan._PACK_STR) + + buf = pack(vlan._PACK_STR, tci, ethertype) + + v = vlan(pcp, cfi, vid, ethertype) + + def setUp(self): + pass + + def tearDown(self): + pass + + def find_protocol(self, pkt, name): + for p in pkt.protocols: + if p.protocol_name == name: + return p + + def test_init(self): + eq_(self.pcp, self.v.pcp) + eq_(self.cfi, self.v.cfi) + eq_(self.vid, self.v.vid) + eq_(self.ethertype, self.v.ethertype) + eq_(self.length, self.v.length) + + def test_parser(self): + res, ptype = self.v.parser(self.buf) + + eq_(res.pcp, self.pcp) + eq_(res.cfi, self.cfi) + eq_(res.vid, self.vid) + eq_(res.ethertype, self.ethertype) + eq_(res.length, self.length) + eq_(ptype, ipv4) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.v.serialize(data, prev) + + fmt = vlan._PACK_STR + res = struct.unpack(fmt, buf) + + eq_(res[0], self.tci) + eq_(res[1], self.ethertype) + + def _build_vlan(self): + src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54') + dst_mac = mac.haddr_to_bin('00:00:00:00:00:00') + ethertype = ether.ETH_TYPE_8021Q + e = ethernet(dst_mac, src_mac, ethertype) + + version = 4 + header_length = 20 + tos = 0 + total_length = 24 + identification = 0x8a5d + flags = 0 + offset = 1480 + ttl = 64 + proto = inet.IPPROTO_ICMP + csum = 0xa7f2 + src = int(netaddr.IPAddress('131.151.32.21')) + dst = int(netaddr.IPAddress('131.151.32.129')) + option = 'TEST' + ip = ipv4(version, header_length, tos, total_length, identification, + flags, offset, ttl, proto, csum, src, dst, option) + + p = Packet() + + p.add_protocol(e) + p.add_protocol(self.v) + p.add_protocol(ip) + p.serialize() + + return p + + def test_build_vlan(self): + p = self._build_vlan() + + e = self.find_protocol(p, "ethernet") + ok_(e) + eq_(e.ethertype, ether.ETH_TYPE_8021Q) + + v = self.find_protocol(p, "vlan") + ok_(v) + eq_(v.ethertype, ether.ETH_TYPE_IP) + + ip = self.find_protocol(p, "ipv4") + ok_(ip) + + eq_(v.pcp, self.pcp) + eq_(v.cfi, self.cfi) + eq_(v.vid, self.vid) + eq_(v.ethertype, self.ethertype) + eq_(v.length, self.length) diff --git a/tools/test-requires b/tools/test-requires index da50e3e0..6f21bff0 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -2,3 +2,4 @@ coverage nose pep8 pylint==0.25.0 +netaddr -- cgit v1.2.3