diff options
-rw-r--r-- | ryu/lib/packet/icmpv6.py | 231 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_icmpv6.py | 66 |
2 files changed, 139 insertions, 158 deletions
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py index 0353949d..7d9f814a 100644 --- a/ryu/lib/packet/icmpv6.py +++ b/ryu/lib/packet/icmpv6.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import struct import sys import array @@ -157,14 +158,8 @@ class nd_neighbor(stringify.StringifyMixin): res R,S,O Flags for Neighbor Advertisement. \ The 3 MSBs of "Reserved" field for Neighbor Solicitation. dst Target Address - type\_ "Type" field of the first option. None if no options. \ - NOTE: This implementation doesn't support two or more \ - options. - length "Length" field of the first option. None if no options. - data An object to describe the first option. \ - None if no options. \ - Either ryu.lib.packet.icmpv6.nd_option_la object \ - or a bytearray. + option a derived object of ryu.lib.packet.icmpv6.nd_option \ + or a bytearray. None if no options. ============== ==================== """ @@ -180,27 +175,24 @@ class nd_neighbor(stringify.StringifyMixin): return cls return _register_nd_option_type - def __init__(self, res, dst, type_=None, length=None, data=None): + def __init__(self, res, dst, option=None): self.res = res self.dst = dst - self.type_ = type_ - self.length = length - self.data = data + self.option = option @classmethod def parser(cls, buf, offset): (res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset) - msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst)) offset += cls._MIN_LEN + option = None if len(buf) > offset: - (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset) - cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None) - offset += 2 - if cls_: - msg.data = cls_.parser(buf, offset) + (type_, ) = struct.unpack_from('!B', buf, offset) + cls_ = cls._ND_OPTION_TYPES.get(type_) + if cls_ is not None: + option = cls_.parser(buf, offset) else: - msg.data = buf[offset:] - + option = buf[offset:] + msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst), option) return msg def serialize(self): @@ -208,14 +200,12 @@ class nd_neighbor(stringify.StringifyMixin): hdr = bytearray(struct.pack( nd_neighbor._PACK_STR, res, addrconv.ipv6.text_to_bin(self.dst))) - if self.type_ is not None: - hdr += bytearray(struct.pack('!BB', self.type_, self.length)) - if self.type_ in nd_neighbor._ND_OPTION_TYPES: - hdr += self.data.serialize() - elif self.data is not None: - hdr += bytearray(self.data) - - return hdr + if self.option is not None: + if isinstance(self.option, nd_option): + hdr.extend(self.option.serialize()) + else: + hdr.extend(self.option) + return str(hdr) @icmpv6.register_icmpv6_type(ND_ROUTER_SOLICIT) @@ -235,14 +225,8 @@ class nd_router_solicit(stringify.StringifyMixin): Attribute Description ============== ==================== res This field is unused. It MUST be initialized to zero. - type\_ "Type" field of the first option. None if no options. \ - NOTE: This implementation doesn't support two or more \ - options. - length "Length" field of the first option. None if no options. - data An object to describe the first option. \ - None if no options. \ - Either ryu.lib.packet.icmpv6.nd_option_la object \ - or a bytearray. + option a derived object of ryu.lib.packet.icmpv6.nd_option \ + or a bytearray. None if no options. ============== ==================== """ @@ -258,39 +242,34 @@ class nd_router_solicit(stringify.StringifyMixin): return cls return _register_nd_option_type - def __init__(self, res, type_=None, length=None, data=None): + def __init__(self, res, option=None): self.res = res - self.type_ = type_ - self.length = length - self.data = data + self.option = option @classmethod def parser(cls, buf, offset): - res = struct.unpack_from(cls._PACK_STR, buf, offset) - msg = cls(res) + (res, ) = struct.unpack_from(cls._PACK_STR, buf, offset) offset += cls._MIN_LEN + option = None if len(buf) > offset: - (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset) - cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None) - offset += 2 - if cls_: - msg.data = cls_.parser(buf, offset) + (type_, ) = struct.unpack_from('!B', buf, offset) + cls_ = cls._ND_OPTION_TYPES.get(type_) + if cls_ is not None: + option = cls_.parser(buf, offset) else: - msg.data = buf[offset:] - + option = buf[offset:] + msg = cls(res, option) return msg def serialize(self): - hdr = bytearray(struct.pack(nd_router_solicit._PACK_STR, self.res)) - - if self.type_ is not None: - hdr += bytearray(struct.pack('!BB', self.type_, self.length)) - if self.type_ in nd_router_solicit._ND_OPTION_TYPES: - hdr += self.data.serialize() - elif self.data is not None: - hdr += bytearray(self.data) - - return hdr + hdr = bytearray(struct.pack( + nd_router_solicit._PACK_STR, self.res)) + if self.option is not None: + if isinstance(self.option, nd_option): + hdr.extend(self.option.serialize()) + else: + hdr.extend(self.option) + return str(hdr) @icmpv6.register_icmpv6_type(ND_ROUTER_ADVERT) @@ -314,17 +293,9 @@ class nd_router_advert(stringify.StringifyMixin): rou_l Router Lifetime. rea_t Reachable Time. ret_t Retrans Timer. - type\_ List of option type. Each index refers to an option. \ - None if no options. \ - NOTE: This implementation support one or more \ - options. - length List of option length. Each index refers to an option. \ - None if no options. \ - data List of option data. Each index refers to an option. \ - None if no options. \ - ryu.lib.packet.icmpv6.nd_option_la object, \ - ryu.lib.packet.icmpv6.nd_option_pi object \ - or a bytearray. + options List of a derived object of \ + ryu.lib.packet.icmpv6.nd_option or a bytearray. \ + None if no options. ============== ==================== """ @@ -340,41 +311,32 @@ class nd_router_advert(stringify.StringifyMixin): return cls return _register_nd_option_type - def __init__(self, ch_l, res, rou_l, rea_t, ret_t, type_=None, length=None, - data=None): + def __init__(self, ch_l, res, rou_l, rea_t, ret_t, options=None): self.ch_l = ch_l self.res = res self.rou_l = rou_l self.rea_t = rea_t self.ret_t = ret_t - self.type_ = type_ - self.length = length - self.data = data + options = options or [] + assert isinstance(options, list) + self.options = options @classmethod def parser(cls, buf, offset): - (ch_l, res, rou_l, rea_t, ret_t) = struct.unpack_from(cls._PACK_STR, - buf, offset) - msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t) + (ch_l, res, rou_l, rea_t, ret_t + ) = struct.unpack_from(cls._PACK_STR, buf, offset) offset += cls._MIN_LEN - msg.type_ = list() - msg.length = list() - msg.data = list() + options = [] while len(buf) > offset: (type_, length) = struct.unpack_from('!BB', buf, offset) - msg.type_.append(type_) - msg.length.append(length) - cls_ = cls._ND_OPTION_TYPES.get(type_, None) - offset += 2 - byte_len = length * 8 - 2 - if cls_: - msg.data.append(cls_.parser(buf[:offset+cls_._MIN_LEN], - offset)) - offset += cls_._MIN_LEN + cls_ = cls._ND_OPTION_TYPES.get(type_) + if cls_ is not None: + option = cls_.parser(buf, offset) else: - msg.data.append(buf[offset:offset + byte_len]) - offset += byte_len - + option = buf[offset:offset + (length * 8 - 2)] + options.append(option) + offset += len(option) + msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t, options) return msg def serialize(self): @@ -382,22 +344,37 @@ class nd_router_advert(stringify.StringifyMixin): hdr = bytearray(struct.pack( nd_router_advert._PACK_STR, self.ch_l, res, self.rou_l, self.rea_t, self.ret_t)) - if self.type_ is not None: - for i in range(len(self.type_)): - hdr += bytearray(struct.pack('!BB', self.type_[i], - self.length[i])) - if self.type_[i] in nd_router_advert._ND_OPTION_TYPES: - hdr += self.data[i].serialize() - elif self.data[i] is not None: - hdr += bytearray(self.data[i]) + for option in self.options: + if isinstance(option, nd_option): + hdr.extend(option.serialize()) + else: + hdr.extend(option) + return str(hdr) - return hdr + +class nd_option(stringify.StringifyMixin): + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __init__(self, type_, length): + self.type_ = type_ + self.length = length + + @classmethod + @abc.abstractmethod + def parser(cls, buf): + pass + + @abc.abstractmethod + def serialize(self): + pass @nd_neighbor.register_nd_option_type(ND_OPTION_SLA, ND_OPTION_TLA) @nd_router_solicit.register_nd_option_type(ND_OPTION_SLA) @nd_router_advert.register_nd_option_type(ND_OPTION_SLA) -class nd_option_la(stringify.StringifyMixin): +class nd_option_la(nd_option): """ICMPv6 sub encoder/decoder class for Neighbor discovery Source/Target Link-Layer Address Option. (RFC 4861) @@ -414,6 +391,8 @@ class nd_option_la(stringify.StringifyMixin): ============== ==================== Attribute Description ============== ==================== + type\_ type of the option. + length length of the option. hw_src Link-Layer Address. \ NOTE: If the address is longer than 6 octets this contains \ the first 6 octets in the address. \ @@ -426,17 +405,19 @@ class nd_option_la(stringify.StringifyMixin): ============== ==================== """ - _PACK_STR = '!6s' + _PACK_STR = '!BB6s' _MIN_LEN = struct.calcsize(_PACK_STR) - def __init__(self, hw_src, data=None): + def __init__(self, type_, length, hw_src, data=None): + super(nd_option_la, self).__init__(type_, length) self.hw_src = hw_src self.data = data @classmethod def parser(cls, buf, offset): - (hw_src, ) = struct.unpack_from(cls._PACK_STR, buf, offset) - msg = cls(addrconv.mac.bin_to_text(hw_src)) + (type_, length, hw_src + ) = struct.unpack_from(cls._PACK_STR, buf, offset) + msg = cls(type_, length, addrconv.mac.bin_to_text(hw_src)) offset += cls._MIN_LEN if len(buf) > offset: msg.data = buf[offset:] @@ -444,17 +425,19 @@ class nd_option_la(stringify.StringifyMixin): return msg def serialize(self): - hdr = bytearray(struct.pack(self._PACK_STR, - addrconv.mac.text_to_bin(self.hw_src))) - + buf = bytearray(struct.pack( + self._PACK_STR, self.type_, self.length, + addrconv.mac.text_to_bin(self.hw_src))) if self.data is not None: - hdr += bytearray(self.data) - - return hdr + buf.extend(self.data) + mod = len(buf) % 8 + if mod: + buf.extend(bytearray(8 - mod)) + return str(buf) @nd_router_advert.register_nd_option_type(ND_OPTION_PI) -class nd_option_pi(stringify.StringifyMixin): +class nd_option_pi(nd_option): """ICMPv6 sub encoder/decoder class for Neighbor discovery Prefix Information Option. (RFC 4861) @@ -469,6 +452,8 @@ class nd_option_pi(stringify.StringifyMixin): ============== ==================== Attribute Description ============== ==================== + type\_ type of the option. + length length of the option. pl Prefix Length. res1 L,A,R\* Flags for Prefix Information. val_l Valid Lifetime. @@ -483,7 +468,8 @@ class nd_option_pi(stringify.StringifyMixin): _PACK_STR = '!BBIII16s' _MIN_LEN = struct.calcsize(_PACK_STR) - def __init__(self, pl, res1, val_l, pre_l, res2, prefix): + def __init__(self, type_, length, pl, res1, val_l, pre_l, res2, prefix): + super(nd_option_pi, self).__init__(self, type_, length) self.pl = pl self.res1 = res1 self.val_l = val_l @@ -493,20 +479,19 @@ class nd_option_pi(stringify.StringifyMixin): @classmethod def parser(cls, buf, offset): - (pl, res1, val_l, pre_l, res2, prefix) = struct.unpack_from(cls. - _PACK_STR, - buf, - offset) - msg = cls(pl, res1 >> 5, val_l, pre_l, res2, + (type_, length, pl, res1, val_l, pre_l, res2, prefix + ) = struct.unpack_from(cls._PACK_STR, buf, offset) + msg = cls(type_, length, pl, res1 >> 5, val_l, pre_l, res2, addrconv.ipv6.bin_to_text(prefix)) return msg def serialize(self): res1 = self.res1 << 5 - hdr = bytearray(struct.pack(self._PACK_STR, self.pl, res1, - self.val_l, self.pre_l, self.res2, - addrconv.ipv6.text_to_bin(self.prefix))) + hdr = bytearray(struct.pack( + self._PACK_STR, self.type_, self.length, self.pl, + res1, self.val_l, self.pre_l, self.res2, + addrconv.ipv6.text_to_bin(self.prefix))) return hdr diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index c24839ca..520d64ec 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -216,9 +216,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): nd = icmpv6.nd_neighbor(self.res, self.dst) eq_(nd.res, self.res) eq_(nd.dst, self.dst) - eq_(nd.type_, None) - eq_(nd.length, None) - eq_(nd.data, None) + eq_(nd.option, None) def _test_parser(self, data=None): buf = self.buf + str(data or '') @@ -232,11 +230,11 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): addrconv.ipv6.text_to_bin(self.dst)) eq_(n, None) if data: - nd = msg.data + nd = msg.data.option eq_(nd.type_, self.nd_type) eq_(nd.length, self.nd_length) - eq_(nd.data.hw_src, self.nd_hw_src) - eq_(nd.data.data, None) + eq_(nd.hw_src, self.nd_hw_src) + eq_(nd.data, None) def test_parser_without_data(self): self._test_parser() @@ -264,9 +262,9 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): eq_(data, '') def test_serialize_with_data(self): - nd_opt = icmpv6.nd_option_la(self.nd_hw_src) - nd = icmpv6.nd_neighbor( - self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + nd_opt = icmpv6.nd_option_la( + self.nd_type, self.nd_length, self.nd_hw_src) + nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt) prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6) nd_csum = icmpv6_csum(prev, self.buf + self.data) @@ -276,7 +274,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) (res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN) (nd_type, nd_length, nd_hw_src) = struct.unpack_from( - '!BB6s', buf, icmp._MIN_LEN + nd._MIN_LEN) + nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN) data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):] eq_(type_, self.type_) @@ -289,12 +287,14 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src)) def test_to_string(self): - nd_opt = icmpv6.nd_option_la(self.nd_hw_src) - nd = icmpv6.nd_neighbor( - self.res, self.dst, self.nd_type, self.nd_length, nd_opt) + nd_opt = icmpv6.nd_option_la( + self.nd_type, self.nd_length, self.nd_hw_src) + nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt) ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd) - nd_opt_values = {'hw_src': self.nd_hw_src, + nd_opt_values = {'type_': self.nd_type, + 'length': self.nd_length, + 'hw_src': self.nd_hw_src, 'data': None} _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k])) for k, v in inspect.getmembers(nd_opt) @@ -303,9 +303,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase): nd_values = {'res': repr(nd.res), 'dst': repr(self.dst), - 'type_': repr(self.nd_type), - 'length': repr(self.nd_length), - 'data': nd_opt_str} + 'option': nd_opt_str} _nd_str = ','.join(['%s=%s' % (k, nd_values[k]) for k, v in inspect.getmembers(nd) if k in nd_values]) @@ -362,9 +360,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase): def test_init(self): rs = icmpv6.nd_router_solicit(self.res) eq_(rs.res, self.res) - eq_(rs.type_, None) - eq_(rs.length, None) - eq_(rs.data, None) + eq_(rs.option, None) def _test_parser(self, data=None): buf = self.buf + str(data or '') @@ -374,14 +370,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase): eq_(msg.code, self.code) eq_(msg.csum, self.csum) if data is not None: - eq_(msg.data.res[0], self.res) + eq_(msg.data.res, self.res) eq_(n, None) if data: - rs = msg.data + rs = msg.data.option eq_(rs.type_, self.nd_type) eq_(rs.length, self.nd_length) - eq_(rs.data.hw_src, self.nd_hw_src) - eq_(rs.data.data, None) + eq_(rs.hw_src, self.nd_hw_src) + eq_(rs.data, None) def test_parser_without_data(self): self._test_parser() @@ -408,9 +404,9 @@ class Test_icmpv6_router_solicit(unittest.TestCase): eq_(data, '') def test_serialize_with_data(self): - nd_opt = icmpv6.nd_option_la(self.nd_hw_src) - rs = icmpv6.nd_router_solicit(self.res, self.nd_type, self.nd_length, - nd_opt) + nd_opt = icmpv6.nd_option_la( + self.nd_type, self.nd_length, self.nd_hw_src) + rs = icmpv6.nd_router_solicit(self.res, nd_opt) prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6) rs_csum = icmpv6_csum(prev, self.buf + self.data) @@ -420,7 +416,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase): (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN) (nd_type, nd_length, nd_hw_src) = struct.unpack_from( - '!BB6s', buf, icmp._MIN_LEN + rs._MIN_LEN) + nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN) data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):] eq_(type_, self.type_) @@ -432,12 +428,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase): eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src)) def test_to_string(self): - nd_opt = icmpv6.nd_option_la(self.nd_hw_src) - rs = icmpv6.nd_router_solicit( - self.res, self.nd_type, self.nd_length, nd_opt) + nd_opt = icmpv6.nd_option_la( + self.nd_type, self.nd_length, self.nd_hw_src) + rs = icmpv6.nd_router_solicit(self.res, nd_opt) ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs) - nd_opt_values = {'hw_src': self.nd_hw_src, + nd_opt_values = {'type_': self.nd_type, + 'length': self.nd_length, + 'hw_src': self.nd_hw_src, 'data': None} _nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k])) for k, v in inspect.getmembers(nd_opt) @@ -445,9 +443,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase): nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str) rs_values = {'res': repr(rs.res), - 'type_': repr(self.nd_type), - 'length': repr(self.nd_length), - 'data': nd_opt_str} + 'option': nd_opt_str} _rs_str = ','.join(['%s=%s' % (k, rs_values[k]) for k, v in inspect.getmembers(rs) if k in rs_values]) |