diff options
Diffstat (limited to 'tests/unit/packet/test_pbb.py')
-rw-r--r-- | tests/unit/packet/test_pbb.py | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/tests/unit/packet/test_pbb.py b/tests/unit/packet/test_pbb.py new file mode 100644 index 00000000..dd7778cd --- /dev/null +++ b/tests/unit/packet/test_pbb.py @@ -0,0 +1,172 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import struct +import unittest + +from nose.tools import eq_ +from nose.tools import ok_ +from nose.tools import raises +from ryu.ofproto import ether +from ryu.ofproto import inet +from ryu.lib.packet import ethernet +from ryu.lib.packet import packet +from ryu.lib.packet import ipv4 +from ryu.lib.packet import vlan +from ryu.lib.packet import pbb + + +LOG = logging.getLogger(__name__) + + +class Test_itag(unittest.TestCase): + + pcp = 3 + dei = 0 + uca = 1 + sid = 16770000 + data = pcp << 29 | dei << 28 | uca << 27 | sid + buf = struct.pack(pbb.itag._PACK_STR, data) + it = pbb.itag(pcp, dei, uca, sid) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.pcp, self.it.pcp) + eq_(self.dei, self.it.dei) + eq_(self.uca, self.it.uca) + eq_(self.sid, self.it.sid) + + def test_parser(self): + _res = pbb.itag.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(res.pcp, self.pcp) + eq_(res.dei, self.dei) + eq_(res.uca, self.uca) + eq_(res.sid, self.sid) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.it.serialize(data, prev) + res = struct.unpack(pbb.itag._PACK_STR, buf) + eq_(res[0], self.data) + + def _build_itag(self): + b_src_mac = '00:07:0d:af:f4:54' + b_dst_mac = '00:00:00:00:00:00' + b_ethertype = ether.ETH_TYPE_8021AD + e1 = ethernet.ethernet(b_dst_mac, b_src_mac, b_ethertype) + + b_pcp = 0 + b_cfi = 0 + b_vid = 32 + b_ethertype = ether.ETH_TYPE_8021Q + bt = vlan.svlan(b_pcp, b_cfi, b_vid, b_ethertype) + + c_src_mac = '11:11:11:11:11:11' + c_dst_mac = 'aa:aa:aa:aa:aa:aa' + c_ethertype = ether.ETH_TYPE_8021AD + e2 = ethernet.ethernet(c_dst_mac, c_src_mac, c_ethertype) + + s_pcp = 0 + s_cfi = 0 + s_vid = 32 + s_ethertype = ether.ETH_TYPE_8021Q + st = vlan.svlan(s_pcp, s_cfi, s_vid, s_ethertype) + + c_pcp = 0 + c_cfi = 0 + c_vid = 32 + c_ethertype = ether.ETH_TYPE_IP + ct = vlan.vlan(c_pcp, c_cfi, c_vid, c_ethertype) + + version = 4 + header_length = 20 + tos = 0 + total_length = 24 + identification = 0x8a5d + flags = 0 + offset = 1480 + ttl = 64 + proto = inet.IPPROTO_ICMP + csum = 0xa7f2 + src = '131.151.32.21' + dst = '131.151.32.129' + option = b'TEST' + ip = ipv4.ipv4(version, header_length, tos, total_length, + identification, flags, offset, ttl, proto, csum, + src, dst, option) + + p = packet.Packet() + + p.add_protocol(e1) + p.add_protocol(bt) + p.add_protocol(self.it) + p.add_protocol(e2) + p.add_protocol(st) + p.add_protocol(ct) + p.add_protocol(ip) + p.serialize() + + return p + + def test_build_itag(self): + p = self._build_itag() + + e = p.get_protocols(ethernet.ethernet) + ok_(e) + ok_(isinstance(e, list)) + eq_(e[0].ethertype, ether.ETH_TYPE_8021AD) + eq_(e[1].ethertype, ether.ETH_TYPE_8021AD) + + sv = p.get_protocols(vlan.svlan) + ok_(sv) + ok_(isinstance(sv, list)) + eq_(sv[0].ethertype, ether.ETH_TYPE_8021Q) + eq_(sv[1].ethertype, ether.ETH_TYPE_8021Q) + + it = p.get_protocol(pbb.itag) + ok_(it) + + v = p.get_protocol(vlan.vlan) + ok_(v) + eq_(v.ethertype, ether.ETH_TYPE_IP) + + ip = p.get_protocol(ipv4.ipv4) + ok_(ip) + + eq_(it.pcp, self.pcp) + eq_(it.dei, self.dei) + eq_(it.uca, self.uca) + eq_(it.sid, self.sid) + + @raises(Exception) + def test_malformed_itag(self): + m_short_buf = self.buf[1:pbb.itag._MIN_LEN] + pbb.itag.parser(m_short_buf) + + def test_json(self): + jsondict = self.it.to_jsondict() + it = pbb.itag.from_jsondict(jsondict['itag']) + eq_(str(self.it), str(it)) |