diff options
author | HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp> | 2013-03-11 19:04:55 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-03-18 07:47:09 +0900 |
commit | 5cff2a36309eafa65a6d31e020d9949c49a0c102 (patch) | |
tree | 9b8b427868232bade98f7de6830657641e273ca0 | |
parent | 914826d474285f0b1b040020385b10312de5b69f (diff) |
packet lib: fix icmpv6.nd parser
Signed-off-by: HIYAMA Manabu <hiyama.manabu@po.ntts.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/lib/packet/icmpv6.py | 3 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_icmpv6.py | 339 |
2 files changed, 341 insertions, 1 deletions
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py index 5b50c956..f143662f 100644 --- a/ryu/lib/packet/icmpv6.py +++ b/ryu/lib/packet/icmpv6.py @@ -146,11 +146,12 @@ class nd_neighbor(object): @classmethod def parser(cls, buf, offset): (res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset) - msg = cls(res, dst) + msg = cls(res >> 29, dst) offset += cls._MIN_LEN if len(buf) > offset: (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset) cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None) + offset += 2 if cls_: msg.data = cls_.parser(buf, offset) else: diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py new file mode 100644 index 00000000..2fe5891a --- /dev/null +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -0,0 +1,339 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +import socket + +from nose.tools import ok_, eq_, nottest, raises +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet import icmpv6 +from ryu.lib.packet.ipv6 import ipv6 +from ryu.lib.packet import packet_utils + + +LOG = logging.getLogger(__name__) + + +def icmpv6_csum(prev, buf): + ph = struct.pack('!16s16sBBH', prev.src, prev.dst, 0, prev.nxt, + prev.payload_length) + h = bytearray(buf) + struct.pack_into('!H', h, 2, 0) + + return socket.htons(packet_utils.checksum(ph + h)) + + +class Test_icmpv6_header(unittest.TestCase): + type_ = 255 + code = 0 + csum = 207 + buf = '\xff\x00\x00\xcf' + icmp = icmpv6.icmpv6(type_, code, 0) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.type_, self.icmp.type_) + eq_(self.code, self.icmp.code) + eq_(0, self.icmp.csum) + + def test_parser(self): + msg, n = self.icmp.parser(self.buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data, None) + eq_(n, None) + + def test_serialize(self): + src_ipv6 = netaddr.IPAddress('fe80::200:ff:fe00:ef').packed + dst_ipv6 = netaddr.IPAddress('fe80::200:ff:fe00:1').packed + prev = ipv6(6, 0, 0, 4, 58, 255, src_ipv6, dst_ipv6) + + buf = self.icmp.serialize(bytearray(), prev) + (type_, code, csum) = struct.unpack(self.icmp._PACK_STR, buffer(buf)) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, self.csum) + + @raises(Exception) + def test_malformed_icmpv6(self): + m_short_buf = self.buf[1:self.icmp._MIN_LEN] + self.icmp.parser(m_short_buf) + + +class Test_icmpv6_echo_request(unittest.TestCase): + type_ = 128 + code = 0 + csum = 0xa572 + id_ = 0x7620 + seq = 0 + data = '\x01\xc9\xe7\x36\xd3\x39\x06\x00' + buf = '\x80\x00\xa5\x72\x76\x20\x00\x00' + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + echo = icmpv6.echo(0, 0) + eq_(echo.id, 0) + eq_(echo.seq, 0) + eq_(echo.data, None) + + def _test_parser(self, data=None): + buf = self.buf + str(data or '') + msg, n = icmpv6.icmpv6.parser(buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data.id, self.id_) + eq_(msg.data.seq, self.seq) + eq_(msg.data.data, data) + eq_(n, None) + + def test_parser_without_data(self): + self._test_parser() + + def test_parser_with_data(self): + self._test_parser(self.data) + + def _test_serialize(self, echo_data=None): + buf = self.buf + str(echo_data or '') + src_ipv6 = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed + dst_ipv6 = netaddr.IPAddress('3ffe:501:0:1001::2').packed + prev = ipv6(6, 0, 0, len(buf), 64, 255, src_ipv6, dst_ipv6) + echo_csum = icmpv6_csum(prev, buf) + + echo = icmpv6.echo(self.id_, self.seq, echo_data) + icmp = icmpv6.icmpv6(self.type_, self.code, 0, echo) + buf = buffer(icmp.serialize(bytearray(), prev)) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) + (id_, seq) = struct.unpack_from(echo._PACK_STR, buf, icmp._MIN_LEN) + data = buf[(icmp._MIN_LEN + echo._MIN_LEN):] + data = data if len(data) != 0 else None + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, echo_csum) + eq_(id_, self.id_) + eq_(seq, self.seq) + eq_(data, echo_data) + + def test_serialize_without_data(self): + self._test_serialize() + + def test_serialize_with_data(self): + self._test_serialize(self.data) + + +class Test_icmpv6_echo_reply(Test_icmpv6_echo_request): + def setUp(self): + self.type_ = 129 + self.csum = 0xa472 + self.buf = '\x81\x00\xa4\x72\x76\x20\x00\x00' + + +class Test_icmpv6_neighbor_solict(unittest.TestCase): + type_ = 135 + code = 0 + csum = 0x952d + res = 0 + dst = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed + nd_type = 1 + nd_length = 1 + nd_hw_src = '\x00\x60\x97\x07\x69\xea' + data = '\x01\x01\x00\x60\x97\x07\x69\xea' + buf = '\x87\x00\x95\x2d\x00\x00\x00\x00' \ + + '\x3f\xfe\x05\x07\x00\x00\x00\x01' \ + + '\x02\x00\x86\xff\xfe\x05\x80\xda' + src_ipv6 = netaddr.IPAddress('3ffe:507:0:1:200:86ff:fe05:80da').packed + dst_ipv6 = netaddr.IPAddress('3ffe:501:0:1001::2').packed + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + nd = icmpv6.nd_neighbor(self.res, self.dst) + eq_(nd.res >> 29, self.res) + eq_(nd.dst, self.dst) + eq_(nd.type_, None) + eq_(nd.length, None) + eq_(nd.data, None) + + def _test_parser(self, data=None): + buf = self.buf + str(data or '') + msg, n = icmpv6.icmpv6.parser(buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data.res >> 29, self.res) + eq_(msg.data.dst, self.dst) + eq_(n, None) + if data: + nd = msg.data + eq_(nd.type_, self.nd_type) + eq_(nd.length, self.nd_length) + eq_(nd.data.hw_src, self.nd_hw_src) + eq_(nd.data.data, None) + + def test_parser_without_data(self): + self._test_parser() + + def test_parser_with_data(self): + self._test_parser(self.data) + + def test_serialize_without_data(self): + nd = icmpv6.nd_neighbor(self.res, self.dst) + prev = ipv6(6, 0, 0, 24, 64, 255, self.src_ipv6, self.dst_ipv6) + nd_csum = icmpv6_csum(prev, self.buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd) + buf = buffer(icmp.serialize(bytearray(), prev)) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) + (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN) + data = buf[(icmp._MIN_LEN + nd._MIN_LEN):] + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, nd_csum) + eq_(res >> 29, self.res) + eq_(dst, self.dst) + eq_(data, '') + + def test_serialize_with_data(self): + nd_opt = icmpv6.nd_option_la(self.nd_hw_src) + nd = icmpv6.nd_neighbor( + self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6) + nd_csum = icmpv6_csum(prev, self.buf + self.data) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd) + buf = buffer(icmp.serialize(bytearray(), prev)) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) + (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN) + (nd_type, nd_length, nd_hw_src) = struct.unpack_from( + '!BB6s', buf, icmp._MIN_LEN + nd._MIN_LEN) + data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):] + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, nd_csum) + eq_(res >> 29, self.res) + eq_(dst, self.dst) + eq_(nd_type, self.nd_type) + eq_(nd_length, self.nd_length) + eq_(nd_hw_src, self.nd_hw_src) + + +class Test_icmpv6_neighbor_advert(Test_icmpv6_neighbor_solict): + def setUp(self): + self.type_ = 136 + self.csum = 0xb8ba + self.res = 7 + self.dst = netaddr.IPAddress('3ffe:507:0:1:260:97ff:fe07:69ea').packed + self.nd_type = 2 + self.nd_length = 1 + self.nd_data = None + self.nd_hw_src = '\x00\x60\x97\x07\x69\xea' + self.data = '\x02\x01\x00\x60\x97\x07\x69\xea' + self.buf = '\x88\x00\xb8\xba\xe0\x00\x00\x00' \ + + '\x3f\xfe\x05\x07\x00\x00\x00\x01' \ + + '\x02\x60\x97\xff\xfe\x07\x69\xea' + + +class Test_icmpv6_router_solict(unittest.TestCase): + type_ = 133 + code = 0 + csum = 0x97d9 + res = 0 + nd_type = 1 + nd_length = 1 + nd_data = None + nd_hw_src = '\x12\x2d\xa5\x6d\xbc\x0f' + data = '\x00\x00\x00\x00\x01\x01\x12\x2d\xa5\x6d\xbc\x0f' + buf = '\x85\x00\x97\xd9' + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + pass + + def _test_parser(self, data=None): + buf = self.buf + str(data or '') + msg, n = icmpv6.icmpv6.parser(buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data, data) + + def test_parser_without_data(self): + self._test_parser() + + def test_parser_with_data(self): + self._test_parser(self.data) + + def _test_serialize(self, nd_data=None): + nd_data = str(nd_data or '') + buf = self.buf + nd_data + src_ipv6 = netaddr.IPAddress('fe80::102d:a5ff:fe6d:bc0f').packed + dst_ipv6 = netaddr.IPAddress('ff02::2').packed + prev = ipv6(6, 0, 0, len(buf), 58, 255, src_ipv6, dst_ipv6) + nd_csum = icmpv6_csum(prev, buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, nd_data) + buf = buffer(icmp.serialize(bytearray(), prev)) + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) + data = buf[icmp._MIN_LEN:] + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, nd_csum) + eq_(data, nd_data) + + def test_serialize_without_data(self): + self._test_serialize() + + def test_serialize_with_data(self): + self._test_serialize(self.data) |