diff options
author | Yuichi Ito <ito.yuichi0@gmail.com> | 2013-12-26 10:05:37 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-12-29 15:09:04 +0900 |
commit | 5611b9e96ef644696a18a841c7b9b9510d62e9c2 (patch) | |
tree | 139d5c3c6539c4c63548be129237a6668e8717fd | |
parent | a41275373cd4ec510017f8df6b280eadc1801520 (diff) |
packet lib: icmpv6: support MLD (v1/v2)
Signed-off-by: Yuichi Ito <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/lib/packet/icmpv6.py | 283 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_icmpv6.py | 1045 |
2 files changed, 1328 insertions, 0 deletions
diff --git a/ryu/lib/packet/icmpv6.py b/ryu/lib/packet/icmpv6.py index bef1bd76..4ba1d63d 100644 --- a/ryu/lib/packet/icmpv6.py +++ b/ryu/lib/packet/icmpv6.py @@ -34,6 +34,7 @@ ICMPV6_ECHO_REPLY = 129 # echo reply MLD_LISTENER_QUERY = 130 # multicast listener query MLD_LISTENER_REPOR = 131 # multicast listener report MLD_LISTENER_DONE = 132 # multicast listener done +MLDV2_LISTENER_REPORT = 143 # multicast listern report (v2) # RFC2292 decls ICMPV6_MEMBERSHIP_QUERY = 130 # group membership query @@ -64,6 +65,13 @@ ND_OPTION_PI = 3 # Prefix Information ND_OPTION_RH = 4 # Redirected Header ND_OPTION_MTU = 5 # MTU +MODE_IS_INCLUDE = 1 +MODE_IS_EXCLUDE = 2 +CHANGE_TO_INCLUDE_MODE = 3 +CHANGE_TO_EXCLUDE_MODE = 4 +ALLOW_NEW_SOURCES = 5 +BLOCK_OLD_SOURCES = 6 + class icmpv6(packet_base.PacketBase): """ICMPv6 (RFC 2463) header encoder/decoder class. @@ -87,6 +95,7 @@ class icmpv6(packet_base.PacketBase): ryu.lib.packet.icmpv6.nd_neighbor object, \ ryu.lib.packet.icmpv6.nd_router_solicit object, \ ryu.lib.packet.icmpv6.nd_router_advert object, \ + ryu.lib.packet.icmpv6.mld object, \ or a bytearray. ============== ==================== """ @@ -659,6 +668,280 @@ class echo(stringify.StringifyMixin): return length +@icmpv6.register_icmpv6_type( + MLD_LISTENER_QUERY, MLD_LISTENER_REPOR, MLD_LISTENER_DONE) +class mld(stringify.StringifyMixin): + """ICMPv6 sub encoder/decoder class for MLD Lister Query, + MLD Listener Report, and MLD Listener Done messages. (RFC 2710) + + http://www.ietf.org/rfc/rfc2710.txt + + This is used with ryu.lib.packet.icmpv6.icmpv6. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + ============== ========================================= + Attribute Description + ============== ========================================= + maxresp max response time in millisecond. it is + meaningful only in Query Message. + address a group address value. + ============== ========================================= + """ + + _PACK_STR = '!H2x16s' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, maxresp=0, address='::'): + self.maxresp = maxresp + self.address = address + + @classmethod + def parser(cls, buf, offset): + if cls._MIN_LEN < len(buf[offset:]): + msg = mldv2_query.parser(buf[offset:]) + else: + (maxresp, address) = struct.unpack_from( + cls._PACK_STR, buf, offset) + msg = cls(maxresp, addrconv.ipv6.bin_to_text(address)) + + return msg + + def serialize(self): + buf = struct.pack(mld._PACK_STR, self.maxresp, + addrconv.ipv6.text_to_bin(self.address)) + return buf + + def __len__(self): + return self._MIN_LEN + + +class mldv2_query(mld): + """ + ICMPv6 sub encoder/decoder class for MLD v2 Lister Query messages. + (RFC 3810) + + http://www.ietf.org/rfc/rfc3810.txt + + This is used with ryu.lib.packet.icmpv6.icmpv6. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + ============== ========================================= + Attribute Description + ============== ========================================= + maxresp max response time in millisecond. it is + meaningful only in Query Message. + address a group address value. + s_flg when set to 1, routers suppress the timer + process. + qrv robustness variable for a querier. + qqic an interval time for a querier in unit of + seconds. + num a number of the multicast servers. + srcs a list of IPv6 addresses of the multicast + servers. + ============== ========================================= + """ + + _PACK_STR = '!H2x16sBBH' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, maxresp=0, address='::', s_flg=0, qrv=2, + qqic=0, num=0, srcs=None): + super(mldv2_query, self).__init__(maxresp, address) + self.s_flg = s_flg + self.qrv = qrv + self.qqic = qqic + self.num = num + srcs = srcs or [] + assert isinstance(srcs, list) + for src in srcs: + assert isinstance(src, str) + self.srcs = srcs + + @classmethod + def parser(cls, buf): + (maxresp, address, s_qrv, qqic, num + ) = struct.unpack_from(cls._PACK_STR, buf) + s_flg = (s_qrv >> 3) & 0b1 + qrv = s_qrv & 0b111 + offset = cls._MIN_LEN + srcs = [] + while 0 < len(buf[offset:]) and num > len(srcs): + assert 16 <= len(buf[offset:]) + (src, ) = struct.unpack_from('16s', buf, offset) + srcs.append(addrconv.ipv6.bin_to_text(src)) + offset += 16 + assert num == len(srcs) + return cls(maxresp, addrconv.ipv6.bin_to_text(address), s_flg, + qrv, qqic, num, srcs) + + def serialize(self): + s_qrv = self.s_flg << 3 | self.qrv + buf = bytearray(struct.pack(self._PACK_STR, self.maxresp, + addrconv.ipv6.text_to_bin(self.address), s_qrv, + self.qqic, self.num)) + for src in self.srcs: + buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src))) + if 0 == self.num: + self.num = len(self.srcs) + struct.pack_into('!H', buf, 22, self.num) + return str(buf) + + def __len__(self): + return self._MIN_LEN + len(self.srcs) * 16 + + +@icmpv6.register_icmpv6_type(MLDV2_LISTENER_REPORT) +class mldv2_report(mld): + """ + ICMPv6 sub encoder/decoder class for MLD v2 Lister Report messages. + (RFC 3810) + + http://www.ietf.org/rfc/rfc3810.txt + + This is used with ryu.lib.packet.icmpv6.icmpv6. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + ============== ========================================= + Attribute Description + ============== ========================================= + record_num a number of the group records. + records a list of ryu.lib.packet.icmpv6.mldv2_report_group. + None if no records. + ============== ========================================= + """ + + _PACK_STR = '!2xH' + _MIN_LEN = struct.calcsize(_PACK_STR) + _class_prefixes = ['mldv2_report_group'] + + def __init__(self, record_num=0, records=None): + self.record_num = record_num + records = records or [] + assert isinstance(records, list) + for record in records: + assert isinstance(record, mldv2_report_group) + self.records = records + + @classmethod + def parser(cls, buf, offset): + (record_num, ) = struct.unpack_from(cls._PACK_STR, buf, offset) + offset += cls._MIN_LEN + records = [] + while 0 < len(buf[offset:]) and record_num > len(records): + record = mldv2_report_group.parser(buf[offset:]) + records.append(record) + offset += len(record) + assert record_num == len(records) + return cls(record_num, records) + + def serialize(self): + buf = bytearray(struct.pack(self._PACK_STR, self.record_num)) + for record in self.records: + buf.extend(record.serialize()) + if 0 == self.record_num: + self.record_num = len(self.records) + struct.pack_into('!H', buf, 2, self.record_num) + return str(buf) + + def __len__(self): + records_len = 0 + for record in self.records: + records_len += len(record) + return self._MIN_LEN + records_len + + +class mldv2_report_group(stringify.StringifyMixin): + """ + ICMPv6 sub encoder/decoder class for MLD v2 Lister Report Group + Record messages. (RFC 3810) + + This is used with ryu.lib.packet.icmpv6.mldv2_report. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte + order. + __init__ takes the corresponding args in this order. + + =============== ==================================================== + Attribute Description + =============== ==================================================== + type\_ a group record type for v3. + aux_len the length of the auxiliary data in 32-bit words. + num a number of the multicast servers. + address a group address value. + srcs a list of IPv6 addresses of the multicast servers. + aux the auxiliary data. + =============== ==================================================== + """ + _PACK_STR = '!BBH16s' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, type_=0, aux_len=0, num=0, address='::', + srcs=None, aux=None): + self.type_ = type_ + self.aux_len = aux_len + self.num = num + self.address = address + srcs = srcs or [] + assert isinstance(srcs, list) + for src in srcs: + assert isinstance(src, str) + self.srcs = srcs + self.aux = aux + + @classmethod + def parser(cls, buf): + (type_, aux_len, num, address + ) = struct.unpack_from(cls._PACK_STR, buf) + offset = cls._MIN_LEN + srcs = [] + while 0 < len(buf[offset:]) and num > len(srcs): + assert 16 <= len(buf[offset:]) + (src, ) = struct.unpack_from('16s', buf, offset) + srcs.append(addrconv.ipv6.bin_to_text(src)) + offset += 16 + assert num == len(srcs) + aux = None + if aux_len: + (aux, ) = struct.unpack_from('%ds' % (aux_len * 4), buf, offset) + msg = cls(type_, aux_len, num, addrconv.ipv6.bin_to_text(address), + srcs, aux) + return msg + + def serialize(self): + buf = bytearray(struct.pack(self._PACK_STR, self.type_, + self.aux_len, self.num, + addrconv.ipv6.text_to_bin(self.address))) + for src in self.srcs: + buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src))) + if 0 == self.num: + self.num = len(self.srcs) + struct.pack_into('!H', buf, 2, self.num) + if self.aux is not None: + mod = len(self.aux) % 4 + if mod: + self.aux += bytearray(4 - mod) + self.aux = str(self.aux) + buf.extend(self.aux) + if 0 == self.aux_len: + self.aux_len = len(self.aux) / 4 + struct.pack_into('!B', buf, 1, self.aux_len) + return str(buf) + + def __len__(self): + return self._MIN_LEN + len(self.srcs) * 16 + self.aux_len * 4 + + icmpv6.set_classes(icmpv6._ICMPV6_TYPES) nd_neighbor.set_classes(nd_neighbor._ND_OPTION_TYPES) nd_router_solicit.set_classes(nd_router_solicit._ND_OPTION_TYPES) diff --git a/ryu/tests/unit/packet/test_icmpv6.py b/ryu/tests/unit/packet/test_icmpv6.py index 0d68585b..6b5c3c27 100644 --- a/ryu/tests/unit/packet/test_icmpv6.py +++ b/ryu/tests/unit/packet/test_icmpv6.py @@ -967,3 +967,1048 @@ class Test_icmpv6_nd_option_pi(unittest.TestCase): eq_(res[5], 0) eq_(res[6], 0) eq_(res[7], addrconv.ipv6.text_to_bin('::')) + + +class Test_icmpv6_membership_query(unittest.TestCase): + type_ = 130 + code = 0 + csum = 0xb5a4 + maxresp = 10000 + address = 'ff08::1' + buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \ + + '\xff\x08\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + mld = icmpv6.mld(self.maxresp, self.address) + eq_(mld.maxresp, self.maxresp) + eq_(mld.address, self.address) + + def test_parser(self): + msg, n, _ = icmpv6.icmpv6.parser(self.buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data.maxresp, self.maxresp) + eq_(msg.data.address, self.address) + eq_(n, None) + + def test_serialize(self): + src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da' + dst_ipv6 = '3ffe:501:0:1001::2' + prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6) + mld_csum = icmpv6_csum(prev, self.buf) + + mld = icmpv6.mld(self.maxresp, self.address) + icmp = icmpv6.icmpv6(self.type_, self.code, 0, mld) + buf = buffer(icmp.serialize(bytearray(), prev)) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, buf, 0) + (maxresp, address) = struct.unpack_from( + mld._PACK_STR, buf, icmp._MIN_LEN) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, mld_csum) + eq_(maxresp, self.maxresp) + eq_(address, addrconv.ipv6.text_to_bin(self.address)) + + def test_to_string(self): + ml = icmpv6.mld(self.maxresp, self.address) + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, ml) + + mld_values = {'maxresp': self.maxresp, + 'address': self.address} + _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k])) + for k, v in inspect.getmembers(ml) + if k in mld_values]) + mld_str = '%s(%s)' % (icmpv6.mld.__name__, _mld_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': mld_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + + def test_default_args(self): + prev = ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6( + type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mld()) + prev.serialize(ic, None) + buf = ic.serialize(bytearray(), prev) + res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4])) + + eq_(res[0], icmpv6.MLD_LISTENER_QUERY) + eq_(res[1], 0) + eq_(res[2], icmpv6_csum(prev, buf)) + + res = struct.unpack(icmpv6.mld._PACK_STR, str(buf[4:])) + + eq_(res[0], 0) + eq_(res[1], addrconv.ipv6.text_to_bin('::')) + + def test_json(self): + ic1 = icmpv6.icmpv6( + type_=icmpv6.MLD_LISTENER_QUERY, + data=icmpv6.mld()) + jsondict = ic1.to_jsondict() + ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6']) + eq_(str(ic1), str(ic2)) + + +class Test_icmpv6_membership_report(Test_icmpv6_membership_query): + type_ = 131 + code = 0 + csum = 0xb4a4 + maxresp = 10000 + address = 'ff08::1' + buf = '\x83\x00\xb4\xa4\x27\x10\x00\x00' \ + + '\xff\x08\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' + + def test_json(self): + ic1 = icmpv6.icmpv6( + type_=icmpv6.MLD_LISTENER_REPOR, + data=icmpv6.mld()) + jsondict = ic1.to_jsondict() + ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6']) + eq_(str(ic1), str(ic2)) + + +class Test_icmpv6_membership_done(Test_icmpv6_membership_query): + type_ = 132 + code = 0 + csum = 0xb3a4 + maxresp = 10000 + address = 'ff08::1' + buf = '\x84\x00\xb3\xa4\x27\x10\x00\x00' \ + + '\xff\x08\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' + + def test_json(self): + ic1 = icmpv6.icmpv6( + type_=icmpv6.MLD_LISTENER_DONE, + data=icmpv6.mld()) + jsondict = ic1.to_jsondict() + ic2 = icmpv6.icmpv6.from_jsondict(jsondict['icmpv6']) + eq_(str(ic1), str(ic2)) + + +class Test_mldv2_query(unittest.TestCase): + type_ = 130 + code = 0 + csum = 0xb5a4 + maxresp = 10000 + address = 'ff08::1' + s_flg = 0 + qrv = 2 + s_qrv = s_flg << 3 | qrv + qqic = 10 + num = 0 + srcs = [] + + mld = icmpv6.mldv2_query( + maxresp, address, s_flg, qrv, qqic, num, srcs) + + buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \ + + '\xff\x08\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\x02\x0a\x00\x00' + + def setUp(self): + pass + + def setUp_with_srcs(self): + self.num = 2 + self.srcs = ['ff80::1', 'ff80::2'] + self.mld = icmpv6.mldv2_query( + self.maxresp, self.address, self.s_flg, self.qrv, self.qqic, + self.num, self.srcs) + self.buf = '\x82\x00\xb5\xa4\x27\x10\x00\x00' \ + + '\xff\x08\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\x02\x0a\x00\x02' \ + + '\xff\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xff\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' + + def tearDown(self): + pass + + def find_protocol(self, pkt, name): + for p in pkt.protocols: + if p.protocol_name == name: + return p + + def test_init(self): + eq_(self.mld.maxresp, self.maxresp) + eq_(self.mld.address, self.address) + eq_(self.mld.s_flg, self.s_flg) + eq_(self.mld.qrv, self.qrv) + eq_(self.mld.qqic, self.qqic) + eq_(self.mld.num, self.num) + eq_(self.mld.srcs, self.srcs) + + def test_init_with_srcs(self): + self.setUp_with_srcs() + self.test_init() + + def test_parser(self): + msg, n, _ = icmpv6.icmpv6.parser(self.buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data.maxresp, self.maxresp) + eq_(msg.data.address, self.address) + eq_(msg.data.s_flg, self.s_flg) + eq_(msg.data.qrv, self.qrv) + eq_(msg.data.qqic, self.qqic) + eq_(msg.data.num, self.num) + eq_(msg.data.srcs, self.srcs) + eq_(n, None) + + def test_parser_with_srcs(self): + self.setUp_with_srcs() + self.test_parser() + + def test_serialize(self): + src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da' + dst_ipv6 = '3ffe:501:0:1001::2' + prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6) + mld_csum = icmpv6_csum(prev, self.buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld) + buf = icmp.serialize(bytearray(), prev) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf)) + (maxresp, address, s_qrv, qqic, num) = struct.unpack_from( + self.mld._PACK_STR, str(buf), icmp._MIN_LEN) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, mld_csum) + eq_(maxresp, self.maxresp) + eq_(address, addrconv.ipv6.text_to_bin(self.address)) + s_flg = (s_qrv >> 3) & 0b1 + qrv = s_qrv & 0b111 + eq_(s_flg, self.s_flg) + eq_(qrv, self.qrv) + eq_(qqic, self.qqic) + eq_(num, self.num) + + def test_serialize_with_srcs(self): + self.setUp_with_srcs() + src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da' + dst_ipv6 = '3ffe:501:0:1001::2' + prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6) + mld_csum = icmpv6_csum(prev, self.buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld) + buf = icmp.serialize(bytearray(), prev) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf)) + (maxresp, address, s_qrv, qqic, num) = struct.unpack_from( + self.mld._PACK_STR, str(buf), icmp._MIN_LEN) + (addr1, addr2) = struct.unpack_from( + '!16s16s', str(buf), icmp._MIN_LEN + self.mld._MIN_LEN) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, mld_csum) + eq_(maxresp, self.maxresp) + eq_(address, addrconv.ipv6.text_to_bin(self.address)) + s_flg = (s_qrv >> 3) & 0b1 + qrv = s_qrv & 0b111 + eq_(s_flg, self.s_flg) + eq_(qrv, self.qrv) + eq_(qqic, self.qqic) + eq_(num, self.num) + eq_(addr1, addrconv.ipv6.text_to_bin(self.srcs[0])) + eq_(addr2, addrconv.ipv6.text_to_bin(self.srcs[1])) + + def _build_mldv2_query(self): + e = ethernet(ethertype=ether.ETH_TYPE_IPV6) + i = ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6(type_=icmpv6.MLD_LISTENER_QUERY, + data=self.mld) + p = e/i/ic + return p + + def test_build_mldv2_query(self): + p = self._build_mldv2_query() + + e = self.find_protocol(p, "ethernet") + ok_(e) + eq_(e.ethertype, ether.ETH_TYPE_IPV6) + + i = self.find_protocol(p, "ipv6") + ok_(i) + eq_(i.nxt, inet.IPPROTO_ICMPV6) + + ic = self.find_protocol(p, "icmpv6") + ok_(ic) + eq_(ic.type_, icmpv6.MLD_LISTENER_QUERY) + + eq_(ic.data.maxresp, self.maxresp) + eq_(ic.data.address, self.address) + eq_(ic.data.s_flg, self.s_flg) + eq_(ic.data.qrv, self.qrv) + eq_(ic.data.num, self.num) + eq_(ic.data.srcs, self.srcs) + + def test_build_mldv2_query_with_srcs(self): + self.setUp_with_srcs() + self.test_build_mldv2_query() + + def test_to_string(self): + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld) + + mld_values = {'maxresp': self.maxresp, + 'address': self.address, + 's_flg': self.s_flg, + 'qrv': self.qrv, + 'qqic': self.qqic, + 'num': self.num, + 'srcs': self.srcs} + _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k])) + for k, v in inspect.getmembers(self.mld) + if k in mld_values]) + mld_str = '%s(%s)' % (icmpv6.mldv2_query.__name__, _mld_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': mld_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + + def test_to_string_with_srcs(self): + self.setUp_with_srcs() + self.test_to_string() + + @raises(Exception) + def test_num_larger_than_srcs(self): + self.srcs = ['ff80::1', 'ff80::2', 'ff80::3'] + self.num = len(self.srcs) + 1 + self.buf = pack(icmpv6.mldv2_query._PACK_STR, self.maxresp, + addrconv.ipv6.text_to_bin(self.address), + self.s_qrv, self.qqic, self.num) + for src in self.srcs: + self.buf += pack('16s', addrconv.ipv6.text_to_bin(src)) + self.mld = icmpv6.mldv2_query( + self.maxresp, self.address, self.s_flg, self.qrv, self.qqic, + self.num, self.srcs) + self.test_parser() + + @raises(Exception) + def test_num_smaller_than_srcs(self): + self.srcs = ['ff80::1', 'ff80::2', 'ff80::3'] + self.num = len(self.srcs) - 1 + self.buf = pack(icmpv6.mldv2_query._PACK_STR, self.maxresp, + addrconv.ipv6.text_to_bin(self.address), + self.s_qrv, self.qqic, self.num) + for src in self.srcs: + self.buf += pack('16s', addrconv.ipv6.text_to_bin(src)) + self.mld = icmpv6.mldv2_query( + self.maxresp, self.address, self.s_flg, self.qrv, self.qqic, + self.num, self.srcs) + self.test_parser() + + def test_default_args(self): + prev = ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6( + type_=icmpv6.MLD_LISTENER_QUERY, data=icmpv6.mldv2_query()) + prev.serialize(ic, None) + buf = ic.serialize(bytearray(), prev) + res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4])) + + eq_(res[0], icmpv6.MLD_LISTENER_QUERY) + eq_(res[1], 0) + eq_(res[2], icmpv6_csum(prev, buf)) + + res = struct.unpack(icmpv6.mldv2_query._PACK_STR, str(buf[4:])) + + eq_(res[0], 0) + eq_(res[1], addrconv.ipv6.text_to_bin('::')) + eq_(res[2], 2) + eq_(res[3], 0) + eq_(res[4], 0) + + # srcs without num + srcs = ['ff80::1', 'ff80::2', 'ff80::3'] + que = icmpv6.mldv2_query(srcs=srcs) + buf = que.serialize() + res = struct.unpack_from( + icmpv6.mldv2_query._PACK_STR, str(buf)) + + eq_(res[0], 0) + eq_(res[1], addrconv.ipv6.text_to_bin('::')) + eq_(res[2], 2) + eq_(res[3], 0) + eq_(res[4], len(srcs)) + + (src1, src2, src3) = struct.unpack_from( + '16s16s16s', str(buf), icmpv6.mldv2_query._MIN_LEN) + + eq_(src1, addrconv.ipv6.text_to_bin(srcs[0])) + eq_(src2, addrconv.ipv6.text_to_bin(srcs[1])) + eq_(src3, addrconv.ipv6.text_to_bin(srcs[2])) + + def test_json(self): + jsondict = self.mld.to_jsondict() + mld = icmpv6.mldv2_query.from_jsondict(jsondict['mldv2_query']) + eq_(str(self.mld), str(mld)) + + def test_json_with_srcs(self): + self.setUp_with_srcs() + self.test_json() + + +class Test_mldv2_report(unittest.TestCase): + type_ = 143 + code = 0 + csum = 0xb5a4 + record_num = 0 + records = [] + + mld = icmpv6.mldv2_report(record_num, records) + + buf = '\x8f\x00\xb5\xa4\x00\x00\x00\x00' + + def setUp(self): + pass + + def setUp_with_records(self): + self.record1 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1') + self.record2 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2', + ['fe80::1', 'fe80::2']) + self.record3 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00') + self.record4 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4', + ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00') + self.records = [self.record1, self.record2, self.record3, + self.record4] + self.record_num = len(self.records) + self.mld = icmpv6.mldv2_report(self.record_num, self.records) + self.buf = '\x8f\x00\xb5\xa4\x00\x00\x00\x04' \ + + '\x01\x00\x00\x00' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\x01\x00\x00\x02' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' \ + + '\x01\x01\x00\x00' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x03' \ + + '\x61\x62\x63\x00' \ + + '\x01\x02\x00\x02' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x04' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' \ + + '\x61\x62\x63\x64\x65\x00\x00\x00' + + def tearDown(self): + pass + + def find_protocol(self, pkt, name): + for p in pkt.protocols: + if p.protocol_name == name: + return p + + def test_init(self): + eq_(self.mld.record_num, self.record_num) + eq_(self.mld.records, self.records) + + def test_init_with_records(self): + self.setUp_with_records() + self.test_init() + + def test_parser(self): + msg, n, _ = icmpv6.icmpv6.parser(self.buf) + + eq_(msg.type_, self.type_) + eq_(msg.code, self.code) + eq_(msg.csum, self.csum) + eq_(msg.data.record_num, self.record_num) + eq_(repr(msg.data.records), repr(self.records)) + + def test_parser_with_records(self): + self.setUp_with_records() + self.test_parser() + + def test_serialize(self): + src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da' + dst_ipv6 = '3ffe:501:0:1001::2' + prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6) + mld_csum = icmpv6_csum(prev, self.buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld) + buf = icmp.serialize(bytearray(), prev) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf)) + (record_num, ) = struct.unpack_from( + self.mld._PACK_STR, str(buf), icmp._MIN_LEN) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, mld_csum) + eq_(record_num, self.record_num) + + def test_serialize_with_records(self): + self.setUp_with_records() + src_ipv6 = '3ffe:507:0:1:200:86ff:fe05:80da' + dst_ipv6 = '3ffe:501:0:1001::2' + prev = ipv6(6, 0, 0, len(self.buf), 64, 255, src_ipv6, dst_ipv6) + mld_csum = icmpv6_csum(prev, self.buf) + + icmp = icmpv6.icmpv6(self.type_, self.code, 0, self.mld) + buf = icmp.serialize(bytearray(), prev) + + (type_, code, csum) = struct.unpack_from(icmp._PACK_STR, str(buf)) + (record_num, ) = struct.unpack_from( + self.mld._PACK_STR, str(buf), icmp._MIN_LEN) + offset = icmp._MIN_LEN + self.mld._MIN_LEN + rec1 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:])) + offset += len(rec1) + rec2 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:])) + offset += len(rec2) + rec3 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:])) + offset += len(rec3) + rec4 = icmpv6.mldv2_report_group.parser(buffer(buf[offset:])) + + eq_(type_, self.type_) + eq_(code, self.code) + eq_(csum, mld_csum) + eq_(record_num, self.record_num) + eq_(repr(rec1), repr(self.record1)) + eq_(repr(rec2), repr(self.record2)) + eq_(repr(rec3), repr(self.record3)) + eq_(repr(rec4), repr(self.record4)) + + def _build_mldv2_report(self): + e = ethernet(ethertype=ether.ETH_TYPE_IPV6) + i = ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6(type_=icmpv6.MLDV2_LISTENER_REPORT, + data=self.mld) + p = e/i/ic + return p + + def test_build_mldv2_report(self): + p = self._build_mldv2_report() + + e = self.find_protocol(p, "ethernet") + ok_(e) + eq_(e.ethertype, ether.ETH_TYPE_IPV6) + + i = self.find_protocol(p, "ipv6") + ok_(i) + eq_(i.nxt, inet.IPPROTO_ICMPV6) + + ic = self.find_protocol(p, "icmpv6") + ok_(ic) + eq_(ic.type_, icmpv6.MLDV2_LISTENER_REPORT) + + eq_(ic.data.record_num, self.record_num) + eq_(ic.data.records, self.records) + + def test_build_mldv2_report_with_records(self): + self.setUp_with_records() + self.test_build_mldv2_report() + + def test_to_string(self): + ic = icmpv6.icmpv6(self.type_, self.code, self.csum, self.mld) + + mld_values = {'record_num': self.record_num, + 'records': self.records} + _mld_str = ','.join(['%s=%s' % (k, repr(mld_values[k])) + for k, v in inspect.getmembers(self.mld) + if k in mld_values]) + mld_str = '%s(%s)' % (icmpv6.mldv2_report.__name__, _mld_str) + + icmp_values = {'type_': repr(self.type_), + 'code': repr(self.code), + 'csum': repr(self.csum), + 'data': mld_str} + _ic_str = ','.join(['%s=%s' % (k, icmp_values[k]) + for k, v in inspect.getmembers(ic) + if k in icmp_values]) + ic_str = '%s(%s)' % (icmpv6.icmpv6.__name__, _ic_str) + + eq_(str(ic), ic_str) + eq_(repr(ic), ic_str) + + def test_to_string_with_records(self): + self.setUp_with_records() + self.test_to_string() + + @raises(Exception) + def test_record_num_larger_than_records(self): + self.record1 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1') + self.record2 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2', + ['fe80::1', 'fe80::2']) + self.record3 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00') + self.record4 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4', + ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00') + self.records = [self.record1, self.record2, self.record3, + self.record4] + self.record_num = len(self.records) + 1 + self.buf = struct.pack( + icmpv6.mldv2_report._PACK_STR, self.record_num) + self.buf += self.record1.serialize() + self.buf += self.record2.serialize() + self.buf += self.record3.serialize() + self.buf += self.record4.serialize() + self.mld = icmpv6.mldv2_report(self.record_num, self.records) + self.test_parser() + + @raises(Exception) + def test_record_num_smaller_than_records(self): + self.record1 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1') + self.record2 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2', + ['fe80::1', 'fe80::2']) + self.record3 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 1, 0, 'ff00::3', [], 'abc\x00') + self.record4 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 2, 2, 'ff00::4', + ['fe80::1', 'fe80::2'], 'abcde\x00\x00\x00') + self.records = [self.record1, self.record2, self.record3, + self.record4] + self.record_num = len(self.records) - 1 + self.buf = struct.pack( + icmpv6.mldv2_report._PACK_STR, self.record_num) + self.buf += self.record1.serialize() + self.buf += self.record2.serialize() + self.buf += self.record3.serialize() + self.buf += self.record4.serialize() + self.mld = icmpv6.mldv2_report(self.record_num, self.records) + self.test_parser() + + def test_default_args(self): + prev = ipv6(nxt=inet.IPPROTO_ICMPV6) + ic = icmpv6.icmpv6( + type_=icmpv6.MLDV2_LISTENER_REPORT, data=icmpv6.mldv2_report()) + prev.serialize(ic, None) + buf = ic.serialize(bytearray(), prev) + res = struct.unpack(icmpv6.icmpv6._PACK_STR, str(buf[:4])) + + eq_(res[0], icmpv6.MLDV2_LISTENER_REPORT) + eq_(res[1], 0) + eq_(res[2], icmpv6_csum(prev, buf)) + + res = struct.unpack(icmpv6.mldv2_report._PACK_STR, str(buf[4:])) + + eq_(res[0], 0) + + # records without record_num + record1 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 0, 'ff00::1') + record2 = icmpv6.mldv2_report_group( + icmpv6.MODE_IS_INCLUDE, 0, 2, 'ff00::2', + ['fe80::1', 'fe80::2']) + records = [record1, record2] + rep = icmpv6.mldv2_report(records=records) + buf = rep.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report._PACK_STR, str(buf)) + + eq_(res[0], len(records)) + + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, str(buf), + icmpv6.mldv2_report._MIN_LEN) + + eq_(res[0], icmpv6.MODE_IS_INCLUDE) + eq_(res[1], 0) + eq_(res[2], 0) + eq_(res[3], addrconv.ipv6.text_to_bin('ff00::1')) + + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, str(buf), + icmpv6.mldv2_report._MIN_LEN + + icmpv6.mldv2_report_group._MIN_LEN) + + eq_(res[0], icmpv6.MODE_IS_INCLUDE) + eq_(res[1], 0) + eq_(res[2], 2) + eq_(res[3], addrconv.ipv6.text_to_bin('ff00::2')) + + res = struct.unpack_from( + '16s16s', str(buf), + icmpv6.mldv2_report._MIN_LEN + + icmpv6.mldv2_report_group._MIN_LEN + + icmpv6.mldv2_report_group._MIN_LEN) + + eq_(res[0], addrconv.ipv6.text_to_bin('fe80::1')) + eq_(res[1], addrconv.ipv6.text_to_bin('fe80::2')) + + def test_json(self): + jsondict = self.mld.to_jsondict() + mld = icmpv6.mldv2_report.from_jsondict(jsondict['mldv2_report']) + eq_(str(self.mld), str(mld)) + + def test_json_with_records(self): + self.setUp_with_records() + self.test_json() + + +class Test_mldv2_report_group(unittest.TestCase): + type_ = icmpv6.MODE_IS_INCLUDE + aux_len = 0 + num = 0 + address = 'ff00::1' + srcs = [] + aux = None + mld = icmpv6.mldv2_report_group( + type_, aux_len, num, address, srcs, aux) + buf = '\x01\x00\x00\x00' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' + + def setUp(self): + pass + + def setUp_with_srcs(self): + self.srcs = ['fe80::1', 'fe80::2', 'fe80::3'] + self.num = len(self.srcs) + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, self.srcs, + self.aux) + self.buf = '\x01\x00\x00\x03' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x03' + + def setUp_with_aux(self): + self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08' + self.aux_len = len(self.aux) / 4 + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, self.srcs, + self.aux) + self.buf = '\x01\x02\x00\x00' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\x01\x02\x03\x04\x05\x06\x07\x08' + + def setUp_with_srcs_and_aux(self): + self.srcs = ['fe80::1', 'fe80::2', 'fe80::3'] + self.num = len(self.srcs) + self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08' + self.aux_len = len(self.aux) / 4 + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, self.srcs, + self.aux) + self.buf = '\x01\x02\x00\x03' \ + + '\xff\x00\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x01' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x02' \ + + '\xfe\x80\x00\x00\x00\x00\x00\x00' \ + + '\x00\x00\x00\x00\x00\x00\x00\x03' \ + + '\x01\x02\x03\x04\x05\x06\x07\x08' + + def tearDown(self): + pass + + def test_init(self): + eq_(self.mld.type_, self.type_) + eq_(self.mld.aux_len, self.aux_len) + eq_(self.mld.num, self.num) + eq_(self.mld.address, self.address) + eq_(self.mld.srcs, self.srcs) + eq_(self.mld.aux, self.aux) + + def test_init_with_srcs(self): + self.setUp_with_srcs() + self.test_init() + + def test_init_with_aux(self): + self.setUp_with_aux() + self.test_init() + + def test_init_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + self.test_init() + + def test_parser(self): + _res = icmpv6.mldv2_report_group.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + + eq_(res.type_, self.type_) + eq_(res.aux_len, self.aux_len) + eq_(res.num, self.num) + eq_(res.address, self.address) + eq_(res.srcs, self.srcs) + eq_(res.aux, self.aux) + + def test_parser_with_srcs(self): + self.setUp_with_srcs() + self.test_parser() + + def test_parser_with_aux(self): + self.setUp_with_aux() + self.test_parser() + + def test_parser_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + self.test_parser() + + def test_serialize(self): + buf = self.mld.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, buffer(buf)) + + eq_(res[0], self.type_) + eq_(res[1], self.aux_len) + eq_(res[2], self.num) + eq_(res[3], addrconv.ipv6.text_to_bin(self.address)) + + def test_serialize_with_srcs(self): + self.setUp_with_srcs() + buf = self.mld.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, buffer(buf)) + (src1, src2, src3) = struct.unpack_from( + '16s16s16s', buffer(buf), icmpv6.mldv2_report_group._MIN_LEN) + eq_(res[0], self.type_) + eq_(res[1], self.aux_len) + eq_(res[2], self.num) + eq_(res[3], addrconv.ipv6.text_to_bin(self.address)) + eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0])) + eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1])) + eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2])) + + def test_serialize_with_aux(self): + self.setUp_with_aux() + buf = self.mld.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, buffer(buf)) + (aux, ) = struct.unpack_from( + '%ds' % (self.aux_len * 4), buffer(buf), + icmpv6.mldv2_report_group._MIN_LEN) + eq_(res[0], self.type_) + eq_(res[1], self.aux_len) + eq_(res[2], self.num) + eq_(res[3], addrconv.ipv6.text_to_bin(self.address)) + eq_(aux, self.aux) + + def test_serialize_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + buf = self.mld.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, buffer(buf)) + (src1, src2, src3) = struct.unpack_from( + '16s16s16s', buffer(buf), icmpv6.mldv2_report_group._MIN_LEN) + (aux, ) = struct.unpack_from( + '%ds' % (self.aux_len * 4), buffer(buf), + icmpv6.mldv2_report_group._MIN_LEN + 16 * 3) + eq_(res[0], self.type_) + eq_(res[1], self.aux_len) + eq_(res[2], self.num) + eq_(res[3], addrconv.ipv6.text_to_bin(self.address)) + eq_(src1, addrconv.ipv6.text_to_bin(self.srcs[0])) + eq_(src2, addrconv.ipv6.text_to_bin(self.srcs[1])) + eq_(src3, addrconv.ipv6.text_to_bin(self.srcs[2])) + eq_(aux, self.aux) + + def test_to_string(self): + igmp_values = {'type_': repr(self.type_), + 'aux_len': repr(self.aux_len), + 'num': repr(self.num), + 'address': repr(self.address), + 'srcs': repr(self.srcs), + 'aux': repr(self.aux)} + _g_str = ','.join(['%s=%s' % (k, igmp_values[k]) + for k, v in inspect.getmembers(self.mld) + if k in igmp_values]) + g_str = '%s(%s)' % (icmpv6.mldv2_report_group.__name__, _g_str) + + eq_(str(self.mld), g_str) + eq_(repr(self.mld), g_str) + + def test_to_string_with_srcs(self): + self.setUp_with_srcs() + self.test_to_string() + + def test_to_string_with_aux(self): + self.setUp_with_aux() + self.test_to_string() + + def test_to_string_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + self.test_to_string() + + def test_len(self): + eq_(len(self.mld), 20) + + def test_len_with_srcs(self): + self.setUp_with_srcs() + eq_(len(self.mld), 68) + + def test_len_with_aux(self): + self.setUp_with_aux() + eq_(len(self.mld), 28) + + def test_len_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + eq_(len(self.mld), 76) + + @raises + def test_num_larger_than_srcs(self): + self.srcs = ['fe80::1', 'fe80::2', 'fe80::3'] + self.num = len(self.srcs) + 1 + self.buf = struct.pack( + icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len, + self.num, addrconv.ipv6.text_to_bin(self.address)) + for src in self.srcs: + self.buf += pack('16s', addrconv.ipv6.text_to_bin(src)) + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, + self.srcs, self.aux) + self.test_parser() + + @raises + def test_num_smaller_than_srcs(self): + self.srcs = ['fe80::1', 'fe80::2', 'fe80::3'] + self.num = len(self.srcs) - 1 + self.buf = struct.pack( + icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len, + self.num, addrconv.ipv6.text_to_bin(self.address)) + for src in self.srcs: + self.buf += pack('16s', addrconv.ipv6.text_to_bin(src)) + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, + self.srcs, self.aux) + self.test_parser() + + @raises + def test_aux_len_larger_than_aux(self): + self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08' + self.aux_len = len(self.aux) / 4 + 1 + self.buf = struct.pack( + icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len, + self.num, addrconv.ipv6.text_to_bin(self.address)) + self.buf += self.aux + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, + self.srcs, self.aux) + self.test_parser() + + @raises + def test_aux_len_smaller_than_aux(self): + self.aux = '\x01\x02\x03\x04\x05\x06\x07\x08' + self.aux_len = len(self.aux) / 4 - 1 + self.buf = struct.pack( + icmpv6.mldv2_report_group._PACK_STR, self.type_, self.aux_len, + self.num, addrconv.ipv6.text_to_bin(self.address)) + self.buf += self.aux + self.mld = icmpv6.mldv2_report_group( + self.type_, self.aux_len, self.num, self.address, + self.srcs, self.aux) + self.test_parser() + + def test_default_args(self): + rep = icmpv6.mldv2_report_group() + buf = rep.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, str(buf)) + + eq_(res[0], 0) + eq_(res[1], 0) + eq_(res[2], 0) + eq_(res[3], addrconv.ipv6.text_to_bin('::')) + + # srcs without num + srcs = ['fe80::1', 'fe80::2', 'fe80::3'] + rep = icmpv6.mldv2_report_group(srcs=srcs) + buf = rep.serialize() + LOG.info(repr(buf)) + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, str(buf)) + + eq_(res[0], 0) + eq_(res[1], 0) + eq_(res[2], len(srcs)) + eq_(res[3], addrconv.ipv6.text_to_bin('::')) + + (src1, src2, src3) = struct.unpack_from( + '16s16s16s', str(buf), icmpv6.mldv2_report_group._MIN_LEN) + + eq_(src1, addrconv.ipv6.text_to_bin(srcs[0])) + eq_(src2, addrconv.ipv6.text_to_bin(srcs[1])) + eq_(src3, addrconv.ipv6.text_to_bin(srcs[2])) + + # aux without aux_len + rep = icmpv6.mldv2_report_group(aux='\x01\x02\x03') + buf = rep.serialize() + res = struct.unpack_from( + icmpv6.mldv2_report_group._PACK_STR, str(buf)) + + eq_(res[0], 0) + eq_(res[1], 1) + eq_(res[2], 0) + eq_(res[3], addrconv.ipv6.text_to_bin('::')) + eq_(buf[icmpv6.mldv2_report_group._MIN_LEN:], '\x01\x02\x03\x00') + + def test_json(self): + jsondict = self.mld.to_jsondict() + mld = icmpv6.mldv2_report_group.from_jsondict( + jsondict['mldv2_report_group']) + eq_(str(self.mld), str(mld)) + + def test_json_with_srcs(self): + self.setUp_with_srcs() + self.test_json() + + def test_json_with_aux(self): + self.setUp_with_aux() + self.test_json() + + def test_json_with_srcs_and_aux(self): + self.setUp_with_srcs_and_aux() + self.test_json() |