summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYuichi Ito <ito.yuichi0@gmail.com>2013-12-26 10:05:37 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2013-12-29 15:09:04 +0900
commit5611b9e96ef644696a18a841c7b9b9510d62e9c2 (patch)
tree139d5c3c6539c4c63548be129237a6668e8717fd
parenta41275373cd4ec510017f8df6b280eadc1801520 (diff)
packet lib: icmpv6: support MLD (v1/v2)
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/lib/packet/icmpv6.py283
-rw-r--r--ryu/tests/unit/packet/test_icmpv6.py1045
2 files changed, 1328 insertions, 0 deletions
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py
index bef1bd76..4ba1d63d 100644
--- a/ryu/lib/packet/icmpv6.py
+++ b/ryu/lib/packet/icmpv6.py
@@ -34,6 +34,7 @@ ICMPV6_ECHO_REPLY = 129 # echo reply
MLD_LISTENER_QUERY = 130 # multicast listener query
MLD_LISTENER_REPOR = 131 # multicast listener report
MLD_LISTENER_DONE = 132 # multicast listener done
+MLDV2_LISTENER_REPORT = 143 # multicast listern report (v2)
# RFC2292 decls
ICMPV6_MEMBERSHIP_QUERY = 130 # group membership query
@@ -64,6 +65,13 @@ ND_OPTION_PI = 3 # Prefix Information
ND_OPTION_RH = 4 # Redirected Header
ND_OPTION_MTU = 5 # MTU
+MODE_IS_INCLUDE = 1
+MODE_IS_EXCLUDE = 2
+CHANGE_TO_INCLUDE_MODE = 3
+CHANGE_TO_EXCLUDE_MODE = 4
+ALLOW_NEW_SOURCES = 5
+BLOCK_OLD_SOURCES = 6
+
class icmpv6(packet_base.PacketBase):
"""ICMPv6 (RFC 2463) header encoder/decoder class.
@@ -87,6 +95,7 @@ class icmpv6(packet_base.PacketBase):
ryu.lib.packet.icmpv6.nd_neighbor object, \
ryu.lib.packet.icmpv6.nd_router_solicit object, \
ryu.lib.packet.icmpv6.nd_router_advert object, \
+ ryu.lib.packet.icmpv6.mld object, \
or a bytearray.
============== ====================
"""
@@ -659,6 +668,280 @@ class echo(stringify.StringifyMixin):
return length
+@icmpv6.register_icmpv6_type(
+ MLD_LISTENER_QUERY, MLD_LISTENER_REPOR, MLD_LISTENER_DONE)
+class mld(stringify.StringifyMixin):
+ """ICMPv6 sub encoder/decoder class for MLD Lister Query,
+ MLD Listener Report, and MLD Listener Done messages. (RFC 2710)
+
+ http://www.ietf.org/rfc/rfc2710.txt
+
+ This is used with ryu.lib.packet.icmpv6.icmpv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ ============== =========================================
+ Attribute Description
+ ============== =========================================
+ maxresp max response time in millisecond. it is
+ meaningful only in Query Message.
+ address a group address value.
+ ============== =========================================
+ """
+
+ _PACK_STR = '!H2x16s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, maxresp=0, address='::'):
+ self.maxresp = maxresp
+ self.address = address
+
+ @classmethod
+ def parser(cls, buf, offset):
+ if cls._MIN_LEN < len(buf[offset:]):
+ msg = mldv2_query.parser(buf[offset:])
+ else:
+ (maxresp, address) = struct.unpack_from(
+ cls._PACK_STR, buf, offset)
+ msg = cls(maxresp, addrconv.ipv6.bin_to_text(address))
+
+ return msg
+
+ def serialize(self):
+ buf = struct.pack(mld._PACK_STR, self.maxresp,
+ addrconv.ipv6.text_to_bin(self.address))
+ return buf
+
+ def __len__(self):
+ return self._MIN_LEN
+
+
+class mldv2_query(mld):
+ """
+ ICMPv6 sub encoder/decoder class for MLD v2 Lister Query messages.
+ (RFC 3810)
+
+ http://www.ietf.org/rfc/rfc3810.txt
+
+ This is used with ryu.lib.packet.icmpv6.icmpv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ ============== =========================================
+ Attribute Description
+ ============== =========================================
+ maxresp max response time in millisecond. it is
+ meaningful only in Query Message.
+ address a group address value.
+ s_flg when set to 1, routers suppress the timer
+ process.
+ qrv robustness variable for a querier.
+ qqic an interval time for a querier in unit of
+ seconds.
+ num a number of the multicast servers.
+ srcs a list of IPv6 addresses of the multicast
+ servers.
+ ============== =========================================
+ """
+
+ _PACK_STR = '!H2x16sBBH'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, maxresp=0, address='::', s_flg=0, qrv=2,
+ qqic=0, num=0, srcs=None):
+ super(mldv2_query, self).__init__(maxresp, address)
+ self.s_flg = s_flg
+ self.qrv = qrv
+ self.qqic = qqic
+ self.num = num
+ srcs = srcs or []
+ assert isinstance(srcs, list)
+ for src in srcs:
+ assert isinstance(src, str)
+ self.srcs = srcs
+
+ @classmethod
+ def parser(cls, buf):
+ (maxresp, address, s_qrv, qqic, num
+ ) = struct.unpack_from(cls._PACK_STR, buf)
+ s_flg = (s_qrv >> 3) & 0b1
+ qrv = s_qrv & 0b111
+ offset = cls._MIN_LEN
+ srcs = []
+ while 0 < len(buf[offset:]) and num > len(srcs):
+ assert 16 <= len(buf[offset:])
+ (src, ) = struct.unpack_from('16s', buf, offset)
+ srcs.append(addrconv.ipv6.bin_to_text(src))
+ offset += 16
+ assert num == len(srcs)
+ return cls(maxresp, addrconv.ipv6.bin_to_text(address), s_flg,
+ qrv, qqic, num, srcs)
+
+ def serialize(self):
+ s_qrv = self.s_flg << 3 | self.qrv
+ buf = bytearray(struct.pack(self._PACK_STR, self.maxresp,
+ addrconv.ipv6.text_to_bin(self.address), s_qrv,
+ self.qqic, self.num))
+ for src in self.srcs:
+ buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
+ if 0 == self.num:
+ self.num = len(self.srcs)
+ struct.pack_into('!H', buf, 22, self.num)
+ return str(buf)
+
+ def __len__(self):
+ return self._MIN_LEN + len(self.srcs) * 16
+
+
+@icmpv6.register_icmpv6_type(MLDV2_LISTENER_REPORT)
+class mldv2_report(mld):
+ """
+ ICMPv6 sub encoder/decoder class for MLD v2 Lister Report messages.
+ (RFC 3810)
+
+ http://www.ietf.org/rfc/rfc3810.txt
+
+ This is used with ryu.lib.packet.icmpv6.icmpv6.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte order.
+ __init__ takes the corresponding args in this order.
+
+ ============== =========================================
+ Attribute Description
+ ============== =========================================
+ record_num a number of the group records.
+ records a list of ryu.lib.packet.icmpv6.mldv2_report_group.
+ None if no records.
+ ============== =========================================
+ """
+
+ _PACK_STR = '!2xH'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+ _class_prefixes = ['mldv2_report_group']
+
+ def __init__(self, record_num=0, records=None):
+ self.record_num = record_num
+ records = records or []
+ assert isinstance(records, list)
+ for record in records:
+ assert isinstance(record, mldv2_report_group)
+ self.records = records
+
+ @classmethod
+ def parser(cls, buf, offset):
+ (record_num, ) = struct.unpack_from(cls._PACK_STR, buf, offset)
+ offset += cls._MIN_LEN
+ records = []
+ while 0 < len(buf[offset:]) and record_num > len(records):
+ record = mldv2_report_group.parser(buf[offset:])
+ records.append(record)
+ offset += len(record)
+ assert record_num == len(records)
+ return cls(record_num, records)
+
+ def serialize(self):
+ buf = bytearray(struct.pack(self._PACK_STR, self.record_num))
+ for record in self.records:
+ buf.extend(record.serialize())
+ if 0 == self.record_num:
+ self.record_num = len(self.records)
+ struct.pack_into('!H', buf, 2, self.record_num)
+ return str(buf)
+
+ def __len__(self):
+ records_len = 0
+ for record in self.records:
+ records_len += len(record)
+ return self._MIN_LEN + records_len
+
+
+class mldv2_report_group(stringify.StringifyMixin):
+ """
+ ICMPv6 sub encoder/decoder class for MLD v2 Lister Report Group
+ Record messages. (RFC 3810)
+
+ This is used with ryu.lib.packet.icmpv6.mldv2_report.
+
+ An instance has the following attributes at least.
+ Most of them are same to the on-wire counterparts but in host byte
+ order.
+ __init__ takes the corresponding args in this order.
+
+ =============== ====================================================
+ Attribute Description
+ =============== ====================================================
+ type\_ a group record type for v3.
+ aux_len the length of the auxiliary data in 32-bit words.
+ num a number of the multicast servers.
+ address a group address value.
+ srcs a list of IPv6 addresses of the multicast servers.
+ aux the auxiliary data.
+ =============== ====================================================
+ """
+ _PACK_STR = '!BBH16s'
+ _MIN_LEN = struct.calcsize(_PACK_STR)
+
+ def __init__(self, type_=0, aux_len=0, num=0, address='::',
+ srcs=None, aux=None):
+ self.type_ = type_
+ self.aux_len = aux_len
+ self.num = num
+ self.address = address
+ srcs = srcs or []
+ assert isinstance(srcs, list)
+ for src in srcs:
+ assert isinstance(src, str)
+ self.srcs = srcs
+ self.aux = aux
+
+ @classmethod
+ def parser(cls, buf):
+ (type_, aux_len, num, address
+ ) = struct.unpack_from(cls._PACK_STR, buf)
+ offset = cls._MIN_LEN
+ srcs = []
+ while 0 < len(buf[offset:]) and num > len(srcs):
+ assert 16 <= len(buf[offset:])
+ (src, ) = struct.unpack_from('16s', buf, offset)
+ srcs.append(addrconv.ipv6.bin_to_text(src))
+ offset += 16
+ assert num == len(srcs)
+ aux = None
+ if aux_len:
+ (aux, ) = struct.unpack_from('%ds' % (aux_len * 4), buf, offset)
+ msg = cls(type_, aux_len, num, addrconv.ipv6.bin_to_text(address),
+ srcs, aux)
+ return msg
+
+ def serialize(self):
+ buf = bytearray(struct.pack(self._PACK_STR, self.type_,
+ self.aux_len, self.num,
+ addrconv.ipv6.text_to_bin(self.address)))
+ for src in self.srcs:
+ buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
+ if 0 == self.num:
+ self.num = len(self.srcs)
+ struct.pack_into('!H', buf, 2, self.num)
+ if self.aux is not None:
+ mod = len(self.aux) % 4
+ if mod:
+ self.aux += bytearray(4 - mod)
+ self.aux = str(self.aux)
+ buf.extend(self.aux)
+ if 0 == self.aux_len:
+ self.aux_len = len(self.aux) / 4
+ struct.pack_into('!B', buf, 1, self.aux_len)
+ return str(buf)
+
+ def __len__(self):
+ return self._MIN_LEN + len(self.srcs) * 16 + self.aux_len * 4
+
+
icmpv6.set_classes(icmpv6._ICMPV6_TYPES)
nd_neighbor.set_classes(nd_neighbor._ND_OPTION_TYPES)
nd_router_solicit.set_classes(nd_router_solicit._ND_OPTION_TYPES)
diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py
index 0d68585b..6b5c3c27 100644
--- a/ryu/tests/unit/packet/test_icmpv6.py
+++ b/ryu/tests/unit/packet/test_icmpv6.py
@@ -967,3 +967,1048 @@ class Test_icmpv6_nd_option_pi(unittest.TestCase):
eq_(res[5], 0)
eq_(res[6], 0)
eq_(res[7], addrconv.ipv6.text_to_bin('::'))
+
+
+class Test_icmpv6_membership_query(unittest.TestCase):
+ type_ = 130
+ code = 0
+ csum = 0xb5a4
+ maxresp = 10000
+ address = 'ff08::1'
+ buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \
+ + '\xff\x08\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ mld = icmpv6.mld(self.maxresp, self.address)
+ eq_(mld.maxresp, self.maxresp)
+ eq_(mld.address, self.address)
+
+ def test_parser(self):
+ msg, n, _ = icmpv6.icmpv6.parser(self.buf)
+
+ eq_(msg.type_, self.type_)
+ eq_(msg.code, self.code)
+ eq_(msg.csum, self.csum)
+ eq_(msg.data.maxresp, self.maxresp)
+ eq_(msg.data.address, self.address)
+ eq_(n, None)
+
+ def test_serialize(self):
+ src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
+ dst_ipv6 = '3ffe:501:0:1001::2'
+ prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
+ mld_csum = icmpv6_csum(prev, self.buf)
+
+ mld = icmpv6.mld(self.maxresp, self.address)
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, mld)
+ buf = buffer(icmp.serialize(bytearray(), prev))
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0)
+ (maxresp, address) = struct.unpack_from(
+ mld._PACK_STR, buf, icmp._MIN_LEN)
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, mld_csum)
+ eq_(maxresp, self.maxresp)
+ eq_(address, addrconv.ipv6.text_to_bin(self.address))
+
+ def test_to_string(self):
+ ml = icmpv6.mld(self.maxresp, self.address)
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ml)
+
+ mld_values = {'maxresp': self.maxresp,
+ 'address': self.address}
+ _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
+ for k, v in inspect.getmembers(ml)
+ if k in mld_values])
+ mld_str = '%s(%s)' % (icmpv6.mld.__name__, _mld_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': mld_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
+ def test_default_args(self):
+ prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6(
+ type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mld())
+ prev.serialize(ic, None)
+ buf = ic.serialize(bytearray(), prev)
+ res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
+
+ eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
+ eq_(res[1], 0)
+ eq_(res[2], icmpv6_csum(prev, buf))
+
+ res = struct.unpack(icmpv6.mld._PACK_STR, str(buf[4:]))
+
+ eq_(res[0], 0)
+ eq_(res[1], addrconv.ipv6.text_to_bin('::'))
+
+ def test_json(self):
+ ic1 = icmpv6.icmpv6(
+ type_=icmpv6.MLD_LISTENER_QUERY,
+ data=icmpv6.mld())
+ jsondict = ic1.to_jsondict()
+ ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
+ eq_(str(ic1), str(ic2))
+
+
+class Test_icmpv6_membership_report(Test_icmpv6_membership_query):
+ type_ = 131
+ code = 0
+ csum = 0xb4a4
+ maxresp = 10000
+ address = 'ff08::1'
+ buf = '\x83\x00\xb4\xa4\x27\x10\x00\x00' \
+ + '\xff\x08\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ def test_json(self):
+ ic1 = icmpv6.icmpv6(
+ type_=icmpv6.MLD_LISTENER_REPOR,
+ data=icmpv6.mld())
+ jsondict = ic1.to_jsondict()
+ ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
+ eq_(str(ic1), str(ic2))
+
+
+class Test_icmpv6_membership_done(Test_icmpv6_membership_query):
+ type_ = 132
+ code = 0
+ csum = 0xb3a4
+ maxresp = 10000
+ address = 'ff08::1'
+ buf = '\x84\x00\xb3\xa4\x27\x10\x00\x00' \
+ + '\xff\x08\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ def test_json(self):
+ ic1 = icmpv6.icmpv6(
+ type_=icmpv6.MLD_LISTENER_DONE,
+ data=icmpv6.mld())
+ jsondict = ic1.to_jsondict()
+ ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6'])
+ eq_(str(ic1), str(ic2))
+
+
+class Test_mldv2_query(unittest.TestCase):
+ type_ = 130
+ code = 0
+ csum = 0xb5a4
+ maxresp = 10000
+ address = 'ff08::1'
+ s_flg = 0
+ qrv = 2
+ s_qrv = s_flg << 3 | qrv
+ qqic = 10
+ num = 0
+ srcs = []
+
+ mld = icmpv6.mldv2_query(
+ maxresp, address, s_flg, qrv, qqic, num, srcs)
+
+ buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \
+ + '\xff\x08\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\x02\x0a\x00\x00'
+
+ def setUp(self):
+ pass
+
+ def setUp_with_srcs(self):
+ self.num = 2
+ self.srcs = ['ff80::1', 'ff80::2']
+ self.mld = icmpv6.mldv2_query(
+ self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
+ self.num, self.srcs)
+ self.buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \
+ + '\xff\x08\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\x02\x0a\x00\x02' \
+ + '\xff\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xff\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02'
+
+ def tearDown(self):
+ pass
+
+ def find_protocol(self, pkt, name):
+ for p in pkt.protocols:
+ if p.protocol_name == name:
+ return p
+
+ def test_init(self):
+ eq_(self.mld.maxresp, self.maxresp)
+ eq_(self.mld.address, self.address)
+ eq_(self.mld.s_flg, self.s_flg)
+ eq_(self.mld.qrv, self.qrv)
+ eq_(self.mld.qqic, self.qqic)
+ eq_(self.mld.num, self.num)
+ eq_(self.mld.srcs, self.srcs)
+
+ def test_init_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_init()
+
+ def test_parser(self):
+ msg, n, _ = icmpv6.icmpv6.parser(self.buf)
+
+ eq_(msg.type_, self.type_)
+ eq_(msg.code, self.code)
+ eq_(msg.csum, self.csum)
+ eq_(msg.data.maxresp, self.maxresp)
+ eq_(msg.data.address, self.address)
+ eq_(msg.data.s_flg, self.s_flg)
+ eq_(msg.data.qrv, self.qrv)
+ eq_(msg.data.qqic, self.qqic)
+ eq_(msg.data.num, self.num)
+ eq_(msg.data.srcs, self.srcs)
+ eq_(n, None)
+
+ def test_parser_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_parser()
+
+ def test_serialize(self):
+ src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
+ dst_ipv6 = '3ffe:501:0:1001::2'
+ prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
+ mld_csum = icmpv6_csum(prev, self.buf)
+
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
+ buf = icmp.serialize(bytearray(), prev)
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf))
+ (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
+ self.mld._PACK_STR, str(buf), icmp._MIN_LEN)
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, mld_csum)
+ eq_(maxresp, self.maxresp)
+ eq_(address, addrconv.ipv6.text_to_bin(self.address))
+ s_flg = (s_qrv >> 3) & 0b1
+ qrv = s_qrv & 0b111
+ eq_(s_flg, self.s_flg)
+ eq_(qrv, self.qrv)
+ eq_(qqic, self.qqic)
+ eq_(num, self.num)
+
+ def test_serialize_with_srcs(self):
+ self.setUp_with_srcs()
+ src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
+ dst_ipv6 = '3ffe:501:0:1001::2'
+ prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
+ mld_csum = icmpv6_csum(prev, self.buf)
+
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
+ buf = icmp.serialize(bytearray(), prev)
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf))
+ (maxresp, address, s_qrv, qqic, num) = struct.unpack_from(
+ self.mld._PACK_STR, str(buf), icmp._MIN_LEN)
+ (addr1, addr2) = struct.unpack_from(
+ '!16s16s', str(buf), icmp._MIN_LEN + self.mld._MIN_LEN)
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, mld_csum)
+ eq_(maxresp, self.maxresp)
+ eq_(address, addrconv.ipv6.text_to_bin(self.address))
+ s_flg = (s_qrv >> 3) & 0b1
+ qrv = s_qrv & 0b111
+ eq_(s_flg, self.s_flg)
+ eq_(qrv, self.qrv)
+ eq_(qqic, self.qqic)
+ eq_(num, self.num)
+ eq_(addr1, addrconv.ipv6.text_to_bin(self.srcs[0]))
+ eq_(addr2, addrconv.ipv6.text_to_bin(self.srcs[1]))
+
+ def _build_mldv2_query(self):
+ e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ i = ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6(type_=icmpv6.MLD_LISTENER_QUERY,
+ data=self.mld)
+ p = e/i/ic
+ return p
+
+ def test_build_mldv2_query(self):
+ p = self._build_mldv2_query()
+
+ e = self.find_protocol(p, "ethernet")
+ ok_(e)
+ eq_(e.ethertype, ether.ETH_TYPE_IPV6)
+
+ i = self.find_protocol(p, "ipv6")
+ ok_(i)
+ eq_(i.nxt, inet.IPPROTO_ICMPV6)
+
+ ic = self.find_protocol(p, "icmpv6")
+ ok_(ic)
+ eq_(ic.type_, icmpv6.MLD_LISTENER_QUERY)
+
+ eq_(ic.data.maxresp, self.maxresp)
+ eq_(ic.data.address, self.address)
+ eq_(ic.data.s_flg, self.s_flg)
+ eq_(ic.data.qrv, self.qrv)
+ eq_(ic.data.num, self.num)
+ eq_(ic.data.srcs, self.srcs)
+
+ def test_build_mldv2_query_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_build_mldv2_query()
+
+ def test_to_string(self):
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
+
+ mld_values = {'maxresp': self.maxresp,
+ 'address': self.address,
+ 's_flg': self.s_flg,
+ 'qrv': self.qrv,
+ 'qqic': self.qqic,
+ 'num': self.num,
+ 'srcs': self.srcs}
+ _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
+ for k, v in inspect.getmembers(self.mld)
+ if k in mld_values])
+ mld_str = '%s(%s)' % (icmpv6.mldv2_query.__name__, _mld_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': mld_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
+ def test_to_string_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_to_string()
+
+ @raises(Exception)
+ def test_num_larger_than_srcs(self):
+ self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
+ self.num = len(self.srcs) + 1
+ self.buf = pack(icmpv6.mldv2_query._PACK_STR, self.maxresp,
+ addrconv.ipv6.text_to_bin(self.address),
+ self.s_qrv, self.qqic, self.num)
+ for src in self.srcs:
+ self.buf += pack('16s', addrconv.ipv6.text_to_bin(src))
+ self.mld = icmpv6.mldv2_query(
+ self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
+ self.num, self.srcs)
+ self.test_parser()
+
+ @raises(Exception)
+ def test_num_smaller_than_srcs(self):
+ self.srcs = ['ff80::1', 'ff80::2', 'ff80::3']
+ self.num = len(self.srcs) - 1
+ self.buf = pack(icmpv6.mldv2_query._PACK_STR, self.maxresp,
+ addrconv.ipv6.text_to_bin(self.address),
+ self.s_qrv, self.qqic, self.num)
+ for src in self.srcs:
+ self.buf += pack('16s', addrconv.ipv6.text_to_bin(src))
+ self.mld = icmpv6.mldv2_query(
+ self.maxresp, self.address, self.s_flg, self.qrv, self.qqic,
+ self.num, self.srcs)
+ self.test_parser()
+
+ def test_default_args(self):
+ prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6(
+ type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mldv2_query())
+ prev.serialize(ic, None)
+ buf = ic.serialize(bytearray(), prev)
+ res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
+
+ eq_(res[0], icmpv6.MLD_LISTENER_QUERY)
+ eq_(res[1], 0)
+ eq_(res[2], icmpv6_csum(prev, buf))
+
+ res = struct.unpack(icmpv6.mldv2_query._PACK_STR, str(buf[4:]))
+
+ eq_(res[0], 0)
+ eq_(res[1], addrconv.ipv6.text_to_bin('::'))
+ eq_(res[2], 2)
+ eq_(res[3], 0)
+ eq_(res[4], 0)
+
+ # srcs without num
+ srcs = ['ff80::1', 'ff80::2', 'ff80::3']
+ que = icmpv6.mldv2_query(srcs=srcs)
+ buf = que.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_query._PACK_STR, str(buf))
+
+ eq_(res[0], 0)
+ eq_(res[1], addrconv.ipv6.text_to_bin('::'))
+ eq_(res[2], 2)
+ eq_(res[3], 0)
+ eq_(res[4], len(srcs))
+
+ (src1, src2, src3) = struct.unpack_from(
+ '16s16s16s', str(buf), icmpv6.mldv2_query._MIN_LEN)
+
+ eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
+ eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
+ eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
+
+ def test_json(self):
+ jsondict = self.mld.to_jsondict()
+ mld = icmpv6.mldv2_query.from_jsondict(jsondict['mldv2_query'])
+ eq_(str(self.mld), str(mld))
+
+ def test_json_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_json()
+
+
+class Test_mldv2_report(unittest.TestCase):
+ type_ = 143
+ code = 0
+ csum = 0xb5a4
+ record_num = 0
+ records = []
+
+ mld = icmpv6.mldv2_report(record_num, records)
+
+ buf = '\x8f\x00\xb5\xa4\x00\x00\x00\x00'
+
+ def setUp(self):
+ pass
+
+ def setUp_with_records(self):
+ self.record1 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
+ self.record2 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
+ ['fe80::1', 'fe80::2'])
+ self.record3 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00')
+ self.record4 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
+ ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00')
+ self.records = [self.record1, self.record2, self.record3,
+ self.record4]
+ self.record_num = len(self.records)
+ self.mld = icmpv6.mldv2_report(self.record_num, self.records)
+ self.buf = '\x8f\x00\xb5\xa4\x00\x00\x00\x04' \
+ + '\x01\x00\x00\x00' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\x01\x00\x00\x02' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02' \
+ + '\x01\x01\x00\x00' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x03' \
+ + '\x61\x62\x63\x00' \
+ + '\x01\x02\x00\x02' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x04' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02' \
+ + '\x61\x62\x63\x64\x65\x00\x00\x00'
+
+ def tearDown(self):
+ pass
+
+ def find_protocol(self, pkt, name):
+ for p in pkt.protocols:
+ if p.protocol_name == name:
+ return p
+
+ def test_init(self):
+ eq_(self.mld.record_num, self.record_num)
+ eq_(self.mld.records, self.records)
+
+ def test_init_with_records(self):
+ self.setUp_with_records()
+ self.test_init()
+
+ def test_parser(self):
+ msg, n, _ = icmpv6.icmpv6.parser(self.buf)
+
+ eq_(msg.type_, self.type_)
+ eq_(msg.code, self.code)
+ eq_(msg.csum, self.csum)
+ eq_(msg.data.record_num, self.record_num)
+ eq_(repr(msg.data.records), repr(self.records))
+
+ def test_parser_with_records(self):
+ self.setUp_with_records()
+ self.test_parser()
+
+ def test_serialize(self):
+ src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
+ dst_ipv6 = '3ffe:501:0:1001::2'
+ prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
+ mld_csum = icmpv6_csum(prev, self.buf)
+
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
+ buf = icmp.serialize(bytearray(), prev)
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf))
+ (record_num, ) = struct.unpack_from(
+ self.mld._PACK_STR, str(buf), icmp._MIN_LEN)
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, mld_csum)
+ eq_(record_num, self.record_num)
+
+ def test_serialize_with_records(self):
+ self.setUp_with_records()
+ src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da'
+ dst_ipv6 = '3ffe:501:0:1001::2'
+ prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6)
+ mld_csum = icmpv6_csum(prev, self.buf)
+
+ icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld)
+ buf = icmp.serialize(bytearray(), prev)
+
+ (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf))
+ (record_num, ) = struct.unpack_from(
+ self.mld._PACK_STR, str(buf), icmp._MIN_LEN)
+ offset = icmp._MIN_LEN + self.mld._MIN_LEN
+ rec1 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:]))
+ offset += len(rec1)
+ rec2 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:]))
+ offset += len(rec2)
+ rec3 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:]))
+ offset += len(rec3)
+ rec4 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:]))
+
+ eq_(type_, self.type_)
+ eq_(code, self.code)
+ eq_(csum, mld_csum)
+ eq_(record_num, self.record_num)
+ eq_(repr(rec1), repr(self.record1))
+ eq_(repr(rec2), repr(self.record2))
+ eq_(repr(rec3), repr(self.record3))
+ eq_(repr(rec4), repr(self.record4))
+
+ def _build_mldv2_report(self):
+ e = ethernet(ethertype=ether.ETH_TYPE_IPV6)
+ i = ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6(type_=icmpv6.MLDV2_LISTENER_REPORT,
+ data=self.mld)
+ p = e/i/ic
+ return p
+
+ def test_build_mldv2_report(self):
+ p = self._build_mldv2_report()
+
+ e = self.find_protocol(p, "ethernet")
+ ok_(e)
+ eq_(e.ethertype, ether.ETH_TYPE_IPV6)
+
+ i = self.find_protocol(p, "ipv6")
+ ok_(i)
+ eq_(i.nxt, inet.IPPROTO_ICMPV6)
+
+ ic = self.find_protocol(p, "icmpv6")
+ ok_(ic)
+ eq_(ic.type_, icmpv6.MLDV2_LISTENER_REPORT)
+
+ eq_(ic.data.record_num, self.record_num)
+ eq_(ic.data.records, self.records)
+
+ def test_build_mldv2_report_with_records(self):
+ self.setUp_with_records()
+ self.test_build_mldv2_report()
+
+ def test_to_string(self):
+ ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld)
+
+ mld_values = {'record_num': self.record_num,
+ 'records': self.records}
+ _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k]))
+ for k, v in inspect.getmembers(self.mld)
+ if k in mld_values])
+ mld_str = '%s(%s)' % (icmpv6.mldv2_report.__name__, _mld_str)
+
+ icmp_values = {'type_': repr(self.type_),
+ 'code': repr(self.code),
+ 'csum': repr(self.csum),
+ 'data': mld_str}
+ _ic_str = ','.join(['%s=%s' % (k, icmp_values[k])
+ for k, v in inspect.getmembers(ic)
+ if k in icmp_values])
+ ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str)
+
+ eq_(str(ic), ic_str)
+ eq_(repr(ic), ic_str)
+
+ def test_to_string_with_records(self):
+ self.setUp_with_records()
+ self.test_to_string()
+
+ @raises(Exception)
+ def test_record_num_larger_than_records(self):
+ self.record1 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
+ self.record2 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
+ ['fe80::1', 'fe80::2'])
+ self.record3 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00')
+ self.record4 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
+ ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00')
+ self.records = [self.record1, self.record2, self.record3,
+ self.record4]
+ self.record_num = len(self.records) + 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report._PACK_STR, self.record_num)
+ self.buf += self.record1.serialize()
+ self.buf += self.record2.serialize()
+ self.buf += self.record3.serialize()
+ self.buf += self.record4.serialize()
+ self.mld = icmpv6.mldv2_report(self.record_num, self.records)
+ self.test_parser()
+
+ @raises(Exception)
+ def test_record_num_smaller_than_records(self):
+ self.record1 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
+ self.record2 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
+ ['fe80::1', 'fe80::2'])
+ self.record3 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00')
+ self.record4 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4',
+ ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00')
+ self.records = [self.record1, self.record2, self.record3,
+ self.record4]
+ self.record_num = len(self.records) - 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report._PACK_STR, self.record_num)
+ self.buf += self.record1.serialize()
+ self.buf += self.record2.serialize()
+ self.buf += self.record3.serialize()
+ self.buf += self.record4.serialize()
+ self.mld = icmpv6.mldv2_report(self.record_num, self.records)
+ self.test_parser()
+
+ def test_default_args(self):
+ prev = ipv6(nxt=inet.IPPROTO_ICMPV6)
+ ic = icmpv6.icmpv6(
+ type_=icmpv6.MLDV2_LISTENER_REPORT, data=icmpv6.mldv2_report())
+ prev.serialize(ic, None)
+ buf = ic.serialize(bytearray(), prev)
+ res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4]))
+
+ eq_(res[0], icmpv6.MLDV2_LISTENER_REPORT)
+ eq_(res[1], 0)
+ eq_(res[2], icmpv6_csum(prev, buf))
+
+ res = struct.unpack(icmpv6.mldv2_report._PACK_STR, str(buf[4:]))
+
+ eq_(res[0], 0)
+
+ # records without record_num
+ record1 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1')
+ record2 = icmpv6.mldv2_report_group(
+ icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2',
+ ['fe80::1', 'fe80::2'])
+ records = [record1, record2]
+ rep = icmpv6.mldv2_report(records=records)
+ buf = rep.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report._PACK_STR, str(buf))
+
+ eq_(res[0], len(records))
+
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, str(buf),
+ icmpv6.mldv2_report._MIN_LEN)
+
+ eq_(res[0], icmpv6.MODE_IS_INCLUDE)
+ eq_(res[1], 0)
+ eq_(res[2], 0)
+ eq_(res[3], addrconv.ipv6.text_to_bin('ff00::1'))
+
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, str(buf),
+ icmpv6.mldv2_report._MIN_LEN +
+ icmpv6.mldv2_report_group._MIN_LEN)
+
+ eq_(res[0], icmpv6.MODE_IS_INCLUDE)
+ eq_(res[1], 0)
+ eq_(res[2], 2)
+ eq_(res[3], addrconv.ipv6.text_to_bin('ff00::2'))
+
+ res = struct.unpack_from(
+ '16s16s', str(buf),
+ icmpv6.mldv2_report._MIN_LEN +
+ icmpv6.mldv2_report_group._MIN_LEN +
+ icmpv6.mldv2_report_group._MIN_LEN)
+
+ eq_(res[0], addrconv.ipv6.text_to_bin('fe80::1'))
+ eq_(res[1], addrconv.ipv6.text_to_bin('fe80::2'))
+
+ def test_json(self):
+ jsondict = self.mld.to_jsondict()
+ mld = icmpv6.mldv2_report.from_jsondict(jsondict['mldv2_report'])
+ eq_(str(self.mld), str(mld))
+
+ def test_json_with_records(self):
+ self.setUp_with_records()
+ self.test_json()
+
+
+class Test_mldv2_report_group(unittest.TestCase):
+ type_ = icmpv6.MODE_IS_INCLUDE
+ aux_len = 0
+ num = 0
+ address = 'ff00::1'
+ srcs = []
+ aux = None
+ mld = icmpv6.mldv2_report_group(
+ type_, aux_len, num, address, srcs, aux)
+ buf = '\x01\x00\x00\x00' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ def setUp(self):
+ pass
+
+ def setUp_with_srcs(self):
+ self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
+ self.num = len(self.srcs)
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address, self.srcs,
+ self.aux)
+ self.buf = '\x01\x00\x00\x03' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x03'
+
+ def setUp_with_aux(self):
+ self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08'
+ self.aux_len = len(self.aux) / 4
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address, self.srcs,
+ self.aux)
+ self.buf = '\x01\x02\x00\x00' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\x01\x02\x03\x04\x05\x06\x07\x08'
+
+ def setUp_with_srcs_and_aux(self):
+ self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
+ self.num = len(self.srcs)
+ self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08'
+ self.aux_len = len(self.aux) / 4
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address, self.srcs,
+ self.aux)
+ self.buf = '\x01\x02\x00\x03' \
+ + '\xff\x00\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x01' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x02' \
+ + '\xfe\x80\x00\x00\x00\x00\x00\x00' \
+ + '\x00\x00\x00\x00\x00\x00\x00\x03' \
+ + '\x01\x02\x03\x04\x05\x06\x07\x08'
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.mld.type_, self.type_)
+ eq_(self.mld.aux_len, self.aux_len)
+ eq_(self.mld.num, self.num)
+ eq_(self.mld.address, self.address)
+ eq_(self.mld.srcs, self.srcs)
+ eq_(self.mld.aux, self.aux)
+
+ def test_init_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_init()
+
+ def test_init_with_aux(self):
+ self.setUp_with_aux()
+ self.test_init()
+
+ def test_init_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ self.test_init()
+
+ def test_parser(self):
+ _res = icmpv6.mldv2_report_group.parser(self.buf)
+ if type(_res) is tuple:
+ res = _res[0]
+ else:
+ res = _res
+
+ eq_(res.type_, self.type_)
+ eq_(res.aux_len, self.aux_len)
+ eq_(res.num, self.num)
+ eq_(res.address, self.address)
+ eq_(res.srcs, self.srcs)
+ eq_(res.aux, self.aux)
+
+ def test_parser_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_parser()
+
+ def test_parser_with_aux(self):
+ self.setUp_with_aux()
+ self.test_parser()
+
+ def test_parser_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ self.test_parser()
+
+ def test_serialize(self):
+ buf = self.mld.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, buffer(buf))
+
+ eq_(res[0], self.type_)
+ eq_(res[1], self.aux_len)
+ eq_(res[2], self.num)
+ eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
+
+ def test_serialize_with_srcs(self):
+ self.setUp_with_srcs()
+ buf = self.mld.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, buffer(buf))
+ (src1, src2, src3) = struct.unpack_from(
+ '16s16s16s', buffer(buf), icmpv6.mldv2_report_group._MIN_LEN)
+ eq_(res[0], self.type_)
+ eq_(res[1], self.aux_len)
+ eq_(res[2], self.num)
+ eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
+ eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
+ eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
+ eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
+
+ def test_serialize_with_aux(self):
+ self.setUp_with_aux()
+ buf = self.mld.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, buffer(buf))
+ (aux, ) = struct.unpack_from(
+ '%ds' % (self.aux_len * 4), buffer(buf),
+ icmpv6.mldv2_report_group._MIN_LEN)
+ eq_(res[0], self.type_)
+ eq_(res[1], self.aux_len)
+ eq_(res[2], self.num)
+ eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
+ eq_(aux, self.aux)
+
+ def test_serialize_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ buf = self.mld.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, buffer(buf))
+ (src1, src2, src3) = struct.unpack_from(
+ '16s16s16s', buffer(buf), icmpv6.mldv2_report_group._MIN_LEN)
+ (aux, ) = struct.unpack_from(
+ '%ds' % (self.aux_len * 4), buffer(buf),
+ icmpv6.mldv2_report_group._MIN_LEN + 16 * 3)
+ eq_(res[0], self.type_)
+ eq_(res[1], self.aux_len)
+ eq_(res[2], self.num)
+ eq_(res[3], addrconv.ipv6.text_to_bin(self.address))
+ eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0]))
+ eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1]))
+ eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2]))
+ eq_(aux, self.aux)
+
+ def test_to_string(self):
+ igmp_values = {'type_': repr(self.type_),
+ 'aux_len': repr(self.aux_len),
+ 'num': repr(self.num),
+ 'address': repr(self.address),
+ 'srcs': repr(self.srcs),
+ 'aux': repr(self.aux)}
+ _g_str = ','.join(['%s=%s' % (k, igmp_values[k])
+ for k, v in inspect.getmembers(self.mld)
+ if k in igmp_values])
+ g_str = '%s(%s)' % (icmpv6.mldv2_report_group.__name__, _g_str)
+
+ eq_(str(self.mld), g_str)
+ eq_(repr(self.mld), g_str)
+
+ def test_to_string_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_to_string()
+
+ def test_to_string_with_aux(self):
+ self.setUp_with_aux()
+ self.test_to_string()
+
+ def test_to_string_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ self.test_to_string()
+
+ def test_len(self):
+ eq_(len(self.mld), 20)
+
+ def test_len_with_srcs(self):
+ self.setUp_with_srcs()
+ eq_(len(self.mld), 68)
+
+ def test_len_with_aux(self):
+ self.setUp_with_aux()
+ eq_(len(self.mld), 28)
+
+ def test_len_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ eq_(len(self.mld), 76)
+
+ @raises
+ def test_num_larger_than_srcs(self):
+ self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
+ self.num = len(self.srcs) + 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
+ self.num, addrconv.ipv6.text_to_bin(self.address))
+ for src in self.srcs:
+ self.buf += pack('16s', addrconv.ipv6.text_to_bin(src))
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address,
+ self.srcs, self.aux)
+ self.test_parser()
+
+ @raises
+ def test_num_smaller_than_srcs(self):
+ self.srcs = ['fe80::1', 'fe80::2', 'fe80::3']
+ self.num = len(self.srcs) - 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
+ self.num, addrconv.ipv6.text_to_bin(self.address))
+ for src in self.srcs:
+ self.buf += pack('16s', addrconv.ipv6.text_to_bin(src))
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address,
+ self.srcs, self.aux)
+ self.test_parser()
+
+ @raises
+ def test_aux_len_larger_than_aux(self):
+ self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08'
+ self.aux_len = len(self.aux) / 4 + 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
+ self.num, addrconv.ipv6.text_to_bin(self.address))
+ self.buf += self.aux
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address,
+ self.srcs, self.aux)
+ self.test_parser()
+
+ @raises
+ def test_aux_len_smaller_than_aux(self):
+ self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08'
+ self.aux_len = len(self.aux) / 4 - 1
+ self.buf = struct.pack(
+ icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len,
+ self.num, addrconv.ipv6.text_to_bin(self.address))
+ self.buf += self.aux
+ self.mld = icmpv6.mldv2_report_group(
+ self.type_, self.aux_len, self.num, self.address,
+ self.srcs, self.aux)
+ self.test_parser()
+
+ def test_default_args(self):
+ rep = icmpv6.mldv2_report_group()
+ buf = rep.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, str(buf))
+
+ eq_(res[0], 0)
+ eq_(res[1], 0)
+ eq_(res[2], 0)
+ eq_(res[3], addrconv.ipv6.text_to_bin('::'))
+
+ # srcs without num
+ srcs = ['fe80::1', 'fe80::2', 'fe80::3']
+ rep = icmpv6.mldv2_report_group(srcs=srcs)
+ buf = rep.serialize()
+ LOG.info(repr(buf))
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, str(buf))
+
+ eq_(res[0], 0)
+ eq_(res[1], 0)
+ eq_(res[2], len(srcs))
+ eq_(res[3], addrconv.ipv6.text_to_bin('::'))
+
+ (src1, src2, src3) = struct.unpack_from(
+ '16s16s16s', str(buf), icmpv6.mldv2_report_group._MIN_LEN)
+
+ eq_(src1, addrconv.ipv6.text_to_bin(srcs[0]))
+ eq_(src2, addrconv.ipv6.text_to_bin(srcs[1]))
+ eq_(src3, addrconv.ipv6.text_to_bin(srcs[2]))
+
+ # aux without aux_len
+ rep = icmpv6.mldv2_report_group(aux='\x01\x02\x03')
+ buf = rep.serialize()
+ res = struct.unpack_from(
+ icmpv6.mldv2_report_group._PACK_STR, str(buf))
+
+ eq_(res[0], 0)
+ eq_(res[1], 1)
+ eq_(res[2], 0)
+ eq_(res[3], addrconv.ipv6.text_to_bin('::'))
+ eq_(buf[icmpv6.mldv2_report_group._MIN_LEN:], '\x01\x02\x03\x00')
+
+ def test_json(self):
+ jsondict = self.mld.to_jsondict()
+ mld = icmpv6.mldv2_report_group.from_jsondict(
+ jsondict['mldv2_report_group'])
+ eq_(str(self.mld), str(mld))
+
+ def test_json_with_srcs(self):
+ self.setUp_with_srcs()
+ self.test_json()
+
+ def test_json_with_aux(self):
+ self.setUp_with_aux()
+ self.test_json()
+
+ def test_json_with_srcs_and_aux(self):
+ self.setUp_with_srcs_and_aux()
+ self.test_json()