summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/lib/packet/icmpv6.py231
-rw-r--r--ryu/tests/unit/packet/test_icmpv6.py66
2 files changed, 139 insertions, 158 deletions
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py
index 0353949d..7d9f814a 100644
--- a/ryu/lib/packet/icmpv6.py
+++ b/ryu/lib/packet/icmpv6.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
import struct
import sys
import array
@@ -157,14 +158,8 @@ class nd_neighbor(stringify.StringifyMixin):
res R,S,O Flags for Neighbor Advertisement. \
The 3 MSBs of "Reserved" field for Neighbor Solicitation.
dst Target Address
- type\_ "Type" field of the first option. None if no options. \
- NOTE: This implementation doesn't support two or more \
- options.
- length "Length" field of the first option. None if no options.
- data An object to describe the first option. \
- None if no options. \
- Either ryu.lib.packet.icmpv6.nd_option_la object \
- or a bytearray.
+ option a derived object of ryu.lib.packet.icmpv6.nd_option \
+ or a bytearray. None if no options.
============== ====================
"""
@@ -180,27 +175,24 @@ class nd_neighbor(stringify.StringifyMixin):
return cls
return _register_nd_option_type
- def __init__(self, res, dst, type_=None, length=None, data=None):
+ def __init__(self, res, dst, option=None):
self.res = res
self.dst = dst
- self.type_ = type_
- self.length = length
- self.data = data
+ self.option = option
@classmethod
def parser(cls, buf, offset):
(res, dst) = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst))
offset += cls._MIN_LEN
+ option = None
if len(buf) > offset:
- (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
- cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
- offset += 2
- if cls_:
- msg.data = cls_.parser(buf, offset)
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data = buf[offset:]
-
+ option = buf[offset:]
+ msg = cls(res >> 29, addrconv.ipv6.bin_to_text(dst), option)
return msg
def serialize(self):
@@ -208,14 +200,12 @@ class nd_neighbor(stringify.StringifyMixin):
hdr = bytearray(struct.pack(
nd_neighbor._PACK_STR, res,
addrconv.ipv6.text_to_bin(self.dst)))
- if self.type_ is not None:
- hdr += bytearray(struct.pack('!BB', self.type_, self.length))
- if self.type_ in nd_neighbor._ND_OPTION_TYPES:
- hdr += self.data.serialize()
- elif self.data is not None:
- hdr += bytearray(self.data)
-
- return hdr
+ if self.option is not None:
+ if isinstance(self.option, nd_option):
+ hdr.extend(self.option.serialize())
+ else:
+ hdr.extend(self.option)
+ return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_SOLICIT)
@@ -235,14 +225,8 @@ class nd_router_solicit(stringify.StringifyMixin):
Attribute Description
============== ====================
res This field is unused. It MUST be initialized to zero.
- type\_ "Type" field of the first option. None if no options. \
- NOTE: This implementation doesn't support two or more \
- options.
- length "Length" field of the first option. None if no options.
- data An object to describe the first option. \
- None if no options. \
- Either ryu.lib.packet.icmpv6.nd_option_la object \
- or a bytearray.
+ option a derived object of ryu.lib.packet.icmpv6.nd_option \
+ or a bytearray. None if no options.
============== ====================
"""
@@ -258,39 +242,34 @@ class nd_router_solicit(stringify.StringifyMixin):
return cls
return _register_nd_option_type
- def __init__(self, res, type_=None, length=None, data=None):
+ def __init__(self, res, option=None):
self.res = res
- self.type_ = type_
- self.length = length
- self.data = data
+ self.option = option
@classmethod
def parser(cls, buf, offset):
- res = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(res)
+ (res, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
+ option = None
if len(buf) > offset:
- (msg.type_, msg.length) = struct.unpack_from('!BB', buf, offset)
- cls_ = cls._ND_OPTION_TYPES.get(msg.type_, None)
- offset += 2
- if cls_:
- msg.data = cls_.parser(buf, offset)
+ (type_, ) = struct.unpack_from('!B', buf, offset)
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data = buf[offset:]
-
+ option = buf[offset:]
+ msg = cls(res, option)
return msg
def serialize(self):
- hdr = bytearray(struct.pack(nd_router_solicit._PACK_STR, self.res))
-
- if self.type_ is not None:
- hdr += bytearray(struct.pack('!BB', self.type_, self.length))
- if self.type_ in nd_router_solicit._ND_OPTION_TYPES:
- hdr += self.data.serialize()
- elif self.data is not None:
- hdr += bytearray(self.data)
-
- return hdr
+ hdr = bytearray(struct.pack(
+ nd_router_solicit._PACK_STR, self.res))
+ if self.option is not None:
+ if isinstance(self.option, nd_option):
+ hdr.extend(self.option.serialize())
+ else:
+ hdr.extend(self.option)
+ return str(hdr)
@icmpv6.register_icmpv6_type(ND_ROUTER_ADVERT)
@@ -314,17 +293,9 @@ class nd_router_advert(stringify.StringifyMixin):
rou_l Router Lifetime.
rea_t Reachable Time.
ret_t Retrans Timer.
- type\_ List of option type. Each index refers to an option. \
- None if no options. \
- NOTE: This implementation support one or more \
- options.
- length List of option length. Each index refers to an option. \
- None if no options. \
- data List of option data. Each index refers to an option. \
- None if no options. \
- ryu.lib.packet.icmpv6.nd_option_la object, \
- ryu.lib.packet.icmpv6.nd_option_pi object \
- or a bytearray.
+ options List of a derived object of \
+ ryu.lib.packet.icmpv6.nd_option or a bytearray. \
+ None if no options.
============== ====================
"""
@@ -340,41 +311,32 @@ class nd_router_advert(stringify.StringifyMixin):
return cls
return _register_nd_option_type
- def __init__(self, ch_l, res, rou_l, rea_t, ret_t, type_=None, length=None,
- data=None):
+ def __init__(self, ch_l, res, rou_l, rea_t, ret_t, options=None):
self.ch_l = ch_l
self.res = res
self.rou_l = rou_l
self.rea_t = rea_t
self.ret_t = ret_t
- self.type_ = type_
- self.length = length
- self.data = data
+ options = options or []
+ assert isinstance(options, list)
+ self.options = options
@classmethod
def parser(cls, buf, offset):
- (ch_l, res, rou_l, rea_t, ret_t) = struct.unpack_from(cls._PACK_STR,
- buf, offset)
- msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t)
+ (ch_l, res, rou_l, rea_t, ret_t
+ ) = struct.unpack_from(cls._PACK_STR, buf, offset)
offset += cls._MIN_LEN
- msg.type_ = list()
- msg.length = list()
- msg.data = list()
+ options = []
while len(buf) > offset:
(type_, length) = struct.unpack_from('!BB', buf, offset)
- msg.type_.append(type_)
- msg.length.append(length)
- cls_ = cls._ND_OPTION_TYPES.get(type_, None)
- offset += 2
- byte_len = length * 8 - 2
- if cls_:
- msg.data.append(cls_.parser(buf[:offset+cls_._MIN_LEN],
- offset))
- offset += cls_._MIN_LEN
+ cls_ = cls._ND_OPTION_TYPES.get(type_)
+ if cls_ is not None:
+ option = cls_.parser(buf, offset)
else:
- msg.data.append(buf[offset:offset + byte_len])
- offset += byte_len
-
+ option = buf[offset:offset + (length * 8 - 2)]
+ options.append(option)
+ offset += len(option)
+ msg = cls(ch_l, res >> 6, rou_l, rea_t, ret_t, options)
return msg
def serialize(self):
@@ -382,22 +344,37 @@ class nd_router_advert(stringify.StringifyMixin):
hdr = bytearray(struct.pack(
nd_router_advert._PACK_STR, self.ch_l, res, self.rou_l,
self.rea_t, self.ret_t))
- if self.type_ is not None:
- for i in range(len(self.type_)):
- hdr += bytearray(struct.pack('!BB', self.type_[i],
- self.length[i]))
- if self.type_[i] in nd_router_advert._ND_OPTION_TYPES:
- hdr += self.data[i].serialize()
- elif self.data[i] is not None:
- hdr += bytearray(self.data[i])
+ for option in self.options:
+ if isinstance(option, nd_option):
+ hdr.extend(option.serialize())
+ else:
+ hdr.extend(option)
+ return str(hdr)
- return hdr
+
+class nd_option(stringify.StringifyMixin):
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self, type_, length):
+ self.type_ = type_
+ self.length = length
+
+ @classmethod
+ @abc.abstractmethod
+ def parser(cls, buf):
+ pass
+
+ @abc.abstractmethod
+ def serialize(self):
+ pass
@nd_neighbor.register_nd_option_type(ND_OPTION_SLA, ND_OPTION_TLA)
@nd_router_solicit.register_nd_option_type(ND_OPTION_SLA)
@nd_router_advert.register_nd_option_type(ND_OPTION_SLA)
-class nd_option_la(stringify.StringifyMixin):
+class nd_option_la(nd_option):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Source/Target Link-Layer Address Option. (RFC 4861)
@@ -414,6 +391,8 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
+ type\_ type of the option.
+ length length of the option.
hw_src Link-Layer Address. \
NOTE: If the address is longer than 6 octets this contains \
the first 6 octets in the address. \
@@ -426,17 +405,19 @@ class nd_option_la(stringify.StringifyMixin):
============== ====================
"""
- _PACK_STR = '!6s'
+ _PACK_STR = '!BB6s'
_MIN_LEN = struct.calcsize(_PACK_STR)
- def __init__(self, hw_src, data=None):
+ def __init__(self, type_, length, hw_src, data=None):
+ super(nd_option_la, self).__init__(type_, length)
self.hw_src = hw_src
self.data = data
@classmethod
def parser(cls, buf, offset):
- (hw_src, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
- msg = cls(addrconv.mac.bin_to_text(hw_src))
+ (type_, length, hw_src
+ ) = struct.unpack_from(cls._PACK_STR, buf, offset)
+ msg = cls(type_, length, addrconv.mac.bin_to_text(hw_src))
offset += cls._MIN_LEN
if len(buf) > offset:
msg.data = buf[offset:]
@@ -444,17 +425,19 @@ class nd_option_la(stringify.StringifyMixin):
return msg
def serialize(self):
- hdr = bytearray(struct.pack(self._PACK_STR,
- addrconv.mac.text_to_bin(self.hw_src)))
-
+ buf = bytearray(struct.pack(
+ self._PACK_STR, self.type_, self.length,
+ addrconv.mac.text_to_bin(self.hw_src)))
if self.data is not None:
- hdr += bytearray(self.data)
-
- return hdr
+ buf.extend(self.data)
+ mod = len(buf) % 8
+ if mod:
+ buf.extend(bytearray(8 - mod))
+ return str(buf)
@nd_router_advert.register_nd_option_type(ND_OPTION_PI)
-class nd_option_pi(stringify.StringifyMixin):
+class nd_option_pi(nd_option):
"""ICMPv6 sub encoder/decoder class for Neighbor discovery
Prefix Information Option. (RFC 4861)
@@ -469,6 +452,8 @@ class nd_option_pi(stringify.StringifyMixin):
============== ====================
Attribute Description
============== ====================
+ type\_ type of the option.
+ length length of the option.
pl Prefix Length.
res1 L,A,R\* Flags for Prefix Information.
val_l Valid Lifetime.
@@ -483,7 +468,8 @@ class nd_option_pi(stringify.StringifyMixin):
_PACK_STR = '!BBIII16s'
_MIN_LEN = struct.calcsize(_PACK_STR)
- def __init__(self, pl, res1, val_l, pre_l, res2, prefix):
+ def __init__(self, type_, length, pl, res1, val_l, pre_l, res2, prefix):
+ super(nd_option_pi, self).__init__(self, type_, length)
self.pl = pl
self.res1 = res1
self.val_l = val_l
@@ -493,20 +479,19 @@ class nd_option_pi(stringify.StringifyMixin):
@classmethod
def parser(cls, buf, offset):
- (pl, res1, val_l, pre_l, res2, prefix) = struct.unpack_from(cls.
- _PACK_STR,
- buf,
- offset)
- msg = cls(pl, res1 >> 5, val_l, pre_l, res2,
+ (type_, length, pl, res1, val_l, pre_l, res2, prefix
+ ) = struct.unpack_from(cls._PACK_STR, buf, offset)
+ msg = cls(type_, length, pl, res1 >> 5, val_l, pre_l, res2,
addrconv.ipv6.bin_to_text(prefix))
return msg
def serialize(self):
res1 = self.res1 << 5
- hdr = bytearray(struct.pack(self._PACK_STR, self.pl, res1,
- self.val_l, self.pre_l, self.res2,
- addrconv.ipv6.text_to_bin(self.prefix)))
+ hdr = bytearray(struct.pack(
+ self._PACK_STR, self.type_, self.length, self.pl,
+ res1, self.val_l, self.pre_l, self.res2,
+ addrconv.ipv6.text_to_bin(self.prefix)))
return hdr
diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py
index c24839ca..520d64ec 100644
--- a/ryu/tests/unit/packet/test_icmpv6.py
+++ b/ryu/tests/unit/packet/test_icmpv6.py
@@ -216,9 +216,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
nd = icmpv6.nd_neighbor(self.res, self.dst)
eq_(nd.res, self.res)
eq_(nd.dst, self.dst)
- eq_(nd.type_, None)
- eq_(nd.length, None)
- eq_(nd.data, None)
+ eq_(nd.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@@ -232,11 +230,11 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
addrconv.ipv6.text_to_bin(self.dst))
eq_(n, None)
if data:
- nd = msg.data
+ nd = msg.data.option
eq_(nd.type_, self.nd_type)
eq_(nd.length, self.nd_length)
- eq_(nd.data.hw_src, self.nd_hw_src)
- eq_(nd.data.data, None)
+ eq_(nd.hw_src, self.nd_hw_src)
+ eq_(nd.data, None)
def test_parser_without_data(self):
self._test_parser()
@@ -264,9 +262,9 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- nd = icmpv6.nd_neighbor(
- self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_la(
+ self.nd_type, self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
prev = ipv6(6, 0, 0, 32, 64, 255, self.src_ipv6, self.dst_ipv6)
nd_csum = icmpv6_csum(prev, self.buf + self.data)
@@ -276,7 +274,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
(res, dst) = struct.unpack_from(nd._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
- '!BB6s', buf, icmp._MIN_LEN + nd._MIN_LEN)
+ nd_opt._PACK_STR, buf, icmp._MIN_LEN + nd._MIN_LEN)
data = buf[(icmp._MIN_LEN + nd._MIN_LEN + 8):]
eq_(type_, self.type_)
@@ -289,12 +287,14 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- nd = icmpv6.nd_neighbor(
- self.res, self.dst, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_la(
+ self.nd_type, self.nd_length, self.nd_hw_src)
+ nd = icmpv6.nd_neighbor(self.res, self.dst, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, nd)
- nd_opt_values = {'hw_src': self.nd_hw_src,
+ nd_opt_values = {'type_': self.nd_type,
+ 'length': self.nd_length,
+ 'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
@@ -303,9 +303,7 @@ class Test_icmpv6_neighbor_solicit(unittest.TestCase):
nd_values = {'res': repr(nd.res),
'dst': repr(self.dst),
- 'type_': repr(self.nd_type),
- 'length': repr(self.nd_length),
- 'data': nd_opt_str}
+ 'option': nd_opt_str}
_nd_str = ','.join(['%s=%s' % (k, nd_values[k])
for k, v in inspect.getmembers(nd)
if k in nd_values])
@@ -362,9 +360,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
def test_init(self):
rs = icmpv6.nd_router_solicit(self.res)
eq_(rs.res, self.res)
- eq_(rs.type_, None)
- eq_(rs.length, None)
- eq_(rs.data, None)
+ eq_(rs.option, None)
def _test_parser(self, data=None):
buf = self.buf + str(data or '')
@@ -374,14 +370,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(msg.code, self.code)
eq_(msg.csum, self.csum)
if data is not None:
- eq_(msg.data.res[0], self.res)
+ eq_(msg.data.res, self.res)
eq_(n, None)
if data:
- rs = msg.data
+ rs = msg.data.option
eq_(rs.type_, self.nd_type)
eq_(rs.length, self.nd_length)
- eq_(rs.data.hw_src, self.nd_hw_src)
- eq_(rs.data.data, None)
+ eq_(rs.hw_src, self.nd_hw_src)
+ eq_(rs.data, None)
def test_parser_without_data(self):
self._test_parser()
@@ -408,9 +404,9 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(data, '')
def test_serialize_with_data(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- rs = icmpv6.nd_router_solicit(self.res, self.nd_type, self.nd_length,
- nd_opt)
+ nd_opt = icmpv6.nd_option_la(
+ self.nd_type, self.nd_length, self.nd_hw_src)
+ rs = icmpv6.nd_router_solicit(self.res, nd_opt)
prev = ipv6(6, 0, 0, 16, 64, 255, self.src_ipv6, self.dst_ipv6)
rs_csum = icmpv6_csum(prev, self.buf + self.data)
@@ -420,7 +416,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
(type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
res = struct.unpack_from(rs._PACK_STR, buf, icmp._MIN_LEN)
(nd_type, nd_length, nd_hw_src) = struct.unpack_from(
- '!BB6s', buf, icmp._MIN_LEN + rs._MIN_LEN)
+ nd_opt._PACK_STR, buf, icmp._MIN_LEN + rs._MIN_LEN)
data = buf[(icmp._MIN_LEN + rs._MIN_LEN + 8):]
eq_(type_, self.type_)
@@ -432,12 +428,14 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
eq_(nd_hw_src, addrconv.mac.text_to_bin(self.nd_hw_src))
def test_to_string(self):
- nd_opt = icmpv6.nd_option_la(self.nd_hw_src)
- rs = icmpv6.nd_router_solicit(
- self.res, self.nd_type, self.nd_length, nd_opt)
+ nd_opt = icmpv6.nd_option_la(
+ self.nd_type, self.nd_length, self.nd_hw_src)
+ rs = icmpv6.nd_router_solicit(self.res, nd_opt)
ic = icmpv6.icmpv6(self.type_, self.code, self.csum, rs)
- nd_opt_values = {'hw_src': self.nd_hw_src,
+ nd_opt_values = {'type_': self.nd_type,
+ 'length': self.nd_length,
+ 'hw_src': self.nd_hw_src,
'data': None}
_nd_opt_str = ','.join(['%s=%s' % (k, repr(nd_opt_values[k]))
for k, v in inspect.getmembers(nd_opt)
@@ -445,9 +443,7 @@ class Test_icmpv6_router_solicit(unittest.TestCase):
nd_opt_str = '%s(%s)' % (icmpv6.nd_option_la.__name__, _nd_opt_str)
rs_values = {'res': repr(rs.res),
- 'type_': repr(self.nd_type),
- 'length': repr(self.nd_length),
- 'data': nd_opt_str}
+ 'option': nd_opt_str}
_rs_str = ','.join(['%s=%s' % (k, rs_values[k])
for k, v in inspect.getmembers(rs)
if k in rs_values])