diff options
author | Yuichi Ito <ito.yuichi0@gmail.com> | 2013-09-17 13:37:22 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-09-24 02:09:14 +0900 |
commit | d174c98421986e0a39c1f1007ff2bd915bef110c (patch) | |
tree | 9b9e3b7b3c768fe9c1cca218095ae1388404f629 | |
parent | e49266946d8efa47ac1065551373c6dfd794cf82 (diff) |
packet lib: ipv6: support destination header
Signed-off-by: itoyuichi <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/lib/packet/ipv6.py | 26 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_ipv6.py | 126 |
2 files changed, 152 insertions, 0 deletions
diff --git a/ryu/lib/packet/ipv6.py b/ryu/lib/packet/ipv6.py index 4ffdd3b6..747ec407 100644 --- a/ryu/lib/packet/ipv6.py +++ b/ryu/lib/packet/ipv6.py @@ -241,6 +241,32 @@ class hop_opts(opt_header): super(hop_opts, self).__init__(size, data) +@ipv6.register_header_type(inet.IPPROTO_DSTOPTS) +class dst_opts(opt_header): + """IPv6 (RFC 2460) destination header encoder/decoder class. + + This is used with ryu.lib.packet.ipv6.ipv6. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================= + Attribute Description + ============== ======================================= + size the length of the destination header, + not include the first 8 octet. + data IPv6 options. + ============== ======================================= + """ + TYPE = inet.IPPROTO_DSTOPTS + + def __init__(self, size, data): + super(dst_opts, self).__init__(size, data) + + class option(stringify.StringifyMixin): """IPv6 (RFC 2460) Options header encoder/decoder class. diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py index 493af039..5a71c567 100644 --- a/ryu/tests/unit/packet/test_ipv6.py +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -83,6 +83,34 @@ class Test_ipv6(unittest.TestCase): addrconv.ipv6.text_to_bin(self.dst)) self.buf += self.hop_opts.serialize() + def setUp_with_dst_opts(self): + self.opt1_type = 5 + self.opt1_len = 2 + self.opt1_data = '\x00\x00' + self.opt2_type = 1 + self.opt2_len = 0 + self.opt2_data = None + self.options = [ + ipv6.option(self.opt1_type, self.opt1_len, self.opt1_data), + ipv6.option(self.opt2_type, self.opt2_len, self.opt2_data), + ] + self.dst_opts_size = 0 + self.dst_opts = ipv6.dst_opts(self.dst_opts_size, self.options) + self.ext_hdrs = [self.dst_opts] + self.payload_length += len(self.dst_opts) + self.ip = ipv6.ipv6( + self.version, self.traffic_class, self.flow_label, + self.payload_length, self.nxt, self.hop_limit, self.src, + self.dst, self.ext_hdrs) + self.dst_opts.nxt = self.nxt + self.nxt = self.dst_opts.TYPE + self.buf = struct.pack( + ipv6.ipv6._PACK_STR, self.v_tc_flow, + self.payload_length, self.nxt, self.hop_limit, + addrconv.ipv6.text_to_bin(self.src), + addrconv.ipv6.text_to_bin(self.dst)) + self.buf += self.dst_opts.serialize() + def tearDown(self): pass @@ -101,6 +129,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_hop_opts() self.test_init() + def test_init_with_dst_opts(self): + self.setUp_with_dst_opts() + self.test_init() + def test_parser(self): _res = self.ip.parser(str(self.buf)) if type(_res) is tuple: @@ -122,6 +154,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_hop_opts() self.test_parser() + def test_parser_with_dst_opts(self): + self.setUp_with_dst_opts() + self.test_parser() + def test_serialize(self): data = bytearray() prev = None @@ -146,6 +182,16 @@ class Test_ipv6(unittest.TestCase): hop_opts = ipv6.hop_opts.parser(str(buf[ipv6.ipv6._MIN_LEN:])) eq_(repr(self.hop_opts), repr(hop_opts)) + def test_serialize_with_dst_opts(self): + self.setUp_with_dst_opts() + self.test_serialize() + + data = bytearray() + prev = None + buf = self.ip.serialize(data, prev) + dst_opts = ipv6.dst_opts.parser(str(buf[ipv6.ipv6._MIN_LEN:])) + eq_(repr(self.dst_opts), repr(dst_opts)) + def test_to_string(self): ipv6_values = {'version': self.version, 'traffic_class': self.traffic_class, @@ -168,6 +214,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_hop_opts() self.test_to_string() + def test_to_string_with_dst_opts(self): + self.setUp_with_dst_opts() + self.test_to_string() + def test_len(self): eq_(len(self.ip), 40) @@ -175,6 +225,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_hop_opts() eq_(len(self.ip), 40 + len(self.hop_opts)) + def test_len_with_dst_opts(self): + self.setUp_with_dst_opts() + eq_(len(self.ip), 40 + len(self.dst_opts)) + class Test_hop_opts(unittest.TestCase): @@ -248,6 +302,78 @@ class Test_hop_opts(unittest.TestCase): eq_(16, len(self.hop)) +class Test_dst_opts(unittest.TestCase): + + def setUp(self): + self.nxt = 60 + self.size = 8 + self.data = [ + ipv6.option(5, 2, '\x00\x00'), + ipv6.option(1, 0, None), + ipv6.option(0xc2, 4, '\x00\x01\x00\x00'), + ipv6.option(1, 0, None), + ] + self.dst = ipv6.dst_opts(self.size, self.data) + self.dst.set_nxt(self.nxt) + self.form = '!BB' + self.buf = struct.pack(self.form, self.nxt, self.size) \ + + self.data[0].serialize() \ + + self.data[1].serialize() \ + + self.data[2].serialize() \ + + self.data[3].serialize() + + def tearDown(self): + pass + + def test_init(self): + eq_(self.nxt, self.dst.nxt) + eq_(self.size, self.dst.size) + eq_(self.data, self.dst.data) + + @raises(Exception) + def test_invalid_size(self): + ipv6.dst_opts(self.nxt, 1, self.data) + + def test_parser(self): + _res = ipv6.dst_opts.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.nxt, res.nxt) + eq_(self.size, res.size) + eq_(str(self.data), str(res.data)) + + def test_serialize(self): + buf = self.dst.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.nxt, res[0]) + eq_(self.size, res[1]) + offset = struct.calcsize(self.form) + opt1 = ipv6.option.parser(str(buf[offset:])) + offset += len(opt1) + opt2 = ipv6.option.parser(str(buf[offset:])) + offset += len(opt2) + opt3 = ipv6.option.parser(str(buf[offset:])) + offset += len(opt3) + opt4 = ipv6.option.parser(str(buf[offset:])) + eq_(5, opt1.type_) + eq_(2, opt1.len_) + eq_('\x00\x00', opt1.data) + eq_(1, opt2.type_) + eq_(0, opt2.len_) + eq_(None, opt2.data) + eq_(0xc2, opt3.type_) + eq_(4, opt3.len_) + eq_('\x00\x01\x00\x00', opt3.data) + eq_(1, opt4.type_) + eq_(0, opt4.len_) + eq_(None, opt4.data) + + def test_len(self): + eq_(16, len(self.dst)) + + class Test_option(unittest.TestCase): def setUp(self): |