diff options
author | Yuichi Ito <ito.yuichi0@gmail.com> | 2013-09-17 13:37:56 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2013-09-24 02:09:25 +0900 |
commit | 3905f4c0194cdbaca9bff1cfd34937b18fdd62ed (patch) | |
tree | 5e041261e58eb731c08bc3a18d9b9346a11ff012 | |
parent | dc26a90bbe3288ef88c348889394afdc1a7b4eff (diff) |
packet lib: ipv6: support IP Authentication header
Signed-off-by: itoyuichi <ito.yuichi0@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/lib/packet/ipv6.py | 59 | ||||
-rw-r--r-- | ryu/tests/unit/packet/test_ipv6.py | 94 |
2 files changed, 153 insertions, 0 deletions
diff --git a/ryu/lib/packet/ipv6.py b/ryu/lib/packet/ipv6.py index ff50cdfc..aaf3b462 100644 --- a/ryu/lib/packet/ipv6.py +++ b/ryu/lib/packet/ipv6.py @@ -376,3 +376,62 @@ class fragment(header): def __len__(self): return self._MIN_LEN + + +@ipv6.register_header_type(inet.IPPROTO_AH) +class auth(header): + """IP Authentication header (RFC 2402) encoder/decoder class. + + This is used with ryu.lib.packet.ipv6.ipv6. + + An instance has the following attributes at least. + Most of them are same to the on-wire counterparts but in host byte order. + __init__ takes the corresponding args in this order. + + .. tabularcolumns:: |l|L| + + ============== ======================================= + Attribute Description + ============== ======================================= + size the length of the Authentication Header + in 64-bit words, subtracting 1. + spi security parameters index. + seq sequence number. + data authentication data. + ============== ======================================= + """ + TYPE = inet.IPPROTO_AH + + _PACK_STR = '!BB2xII' + _MIN_LEN = struct.calcsize(_PACK_STR) + + def __init__(self, size, spi, seq, data): + super(auth, self).__init__() + self.size = size + self.spi = spi + self.seq = seq + self.data = data + + @classmethod + def _get_size(cls, size): + return (int(size) - 1) * 8 + + @classmethod + def parser(cls, buf): + (nxt, size, spi, seq) = struct.unpack_from(cls._PACK_STR, buf) + form = "%ds" % (cls._get_size(size) - cls._MIN_LEN) + (data, ) = struct.unpack_from(form, buf, cls._MIN_LEN) + ret = cls(size, spi, seq, data) + ret.set_nxt(nxt) + return ret + + def serialize(self): + buf = struct.pack(self._PACK_STR, self.nxt, self.size, self.spi, + self.seq) + buf = bytearray(buf) + form = "%ds" % (auth._get_size(self.size) - self._MIN_LEN) + buf.extend(struct.pack(form, self.data)) + return buf + + def __len__(self): + return auth._get_size(self.size) diff --git a/ryu/tests/unit/packet/test_ipv6.py b/ryu/tests/unit/packet/test_ipv6.py index 42c55232..f39247fe 100644 --- a/ryu/tests/unit/packet/test_ipv6.py +++ b/ryu/tests/unit/packet/test_ipv6.py @@ -132,6 +132,28 @@ class Test_ipv6(unittest.TestCase): addrconv.ipv6.text_to_bin(self.dst)) self.buf += self.fragment.serialize() + def setUp_with_auth(self): + self.auth_size = 4 + self.auth_spi = 256 + self.auth_seq = 1 + self.auth_data = '\xa0\xe7\xf8\xab\xf9\x69\x1a\x8b\xf3\x9f\x7c\xae' + self.auth = ipv6.auth( + self.auth_size, self.auth_spi, self.auth_seq, self.auth_data) + self.ext_hdrs = [self.auth] + self.payload_length += len(self.auth) + self.ip = ipv6.ipv6( + self.version, self.traffic_class, self.flow_label, + self.payload_length, self.nxt, self.hop_limit, self.src, + self.dst, self.ext_hdrs) + self.auth.nxt = self.nxt + self.nxt = self.auth.TYPE + self.buf = struct.pack( + ipv6.ipv6._PACK_STR, self.v_tc_flow, + self.payload_length, self.nxt, self.hop_limit, + addrconv.ipv6.text_to_bin(self.src), + addrconv.ipv6.text_to_bin(self.dst)) + self.buf += self.auth.serialize() + def tearDown(self): pass @@ -158,6 +180,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_fragment() self.test_init() + def test_init_with_auth(self): + self.setUp_with_auth() + self.test_init() + def test_parser(self): _res = self.ip.parser(str(self.buf)) if type(_res) is tuple: @@ -187,6 +213,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_fragment() self.test_parser() + def test_parser_with_auth(self): + self.setUp_with_auth() + self.test_parser() + def test_serialize(self): data = bytearray() prev = None @@ -231,6 +261,16 @@ class Test_ipv6(unittest.TestCase): fragment = ipv6.fragment.parser(str(buf[ipv6.ipv6._MIN_LEN:])) eq_(repr(self.fragment), repr(fragment)) + def test_serialize_with_auth(self): + self.setUp_with_auth() + self.test_serialize() + + data = bytearray() + prev = None + buf = self.ip.serialize(data, prev) + auth = ipv6.auth.parser(str(buf[ipv6.ipv6._MIN_LEN:])) + eq_(repr(self.auth), repr(auth)) + def test_to_string(self): ipv6_values = {'version': self.version, 'traffic_class': self.traffic_class, @@ -261,6 +301,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_fragment() self.test_to_string() + def test_to_string_with_auth(self): + self.setUp_with_auth() + self.test_to_string() + def test_len(self): eq_(len(self.ip), 40) @@ -276,6 +320,10 @@ class Test_ipv6(unittest.TestCase): self.setUp_with_fragment() eq_(len(self.ip), 40 + len(self.fragment)) + def test_len_with_auth(self): + self.setUp_with_auth() + eq_(len(self.ip), 40 + len(self.auth)) + class Test_hop_opts(unittest.TestCase): @@ -533,3 +581,49 @@ class Test_fragment(unittest.TestCase): def test_len(self): eq_(8, len(self.fragment)) + + +class Test_auth(unittest.TestCase): + + def setUp(self): + self.nxt = 0 + self.size = 4 + self.spi = 256 + self.seq = 1 + self.data = '\x21\xd3\xa9\x5c\x5f\xfd\x4d\x18\x46\x22\xb9\xf8' + self.auth = ipv6.auth(self.size, self.spi, self.seq, self.data) + self.auth.set_nxt(self.nxt) + self.form = '!BB2xII12s' + self.buf = struct.pack(self.form, self.nxt, self.size, self.spi, + self.seq, self.data) + + def test_init(self): + eq_(self.nxt, self.auth.nxt) + eq_(self.size, self.auth.size) + eq_(self.spi, self.auth.spi) + eq_(self.seq, self.auth.seq) + eq_(self.data, self.auth.data) + + def test_parser(self): + _res = ipv6.auth.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + eq_(self.nxt, res.nxt) + eq_(self.size, res.size) + eq_(self.spi, res.spi) + eq_(self.seq, res.seq) + eq_(self.data, res.data) + + def test_serialize(self): + buf = self.auth.serialize() + res = struct.unpack_from(self.form, str(buf)) + eq_(self.nxt, res[0]) + eq_(self.size, res[1]) + eq_(self.spi, res[2]) + eq_(self.seq, res[3]) + eq_(self.data, res[4]) + + def test_len(self): + eq_((4 - 1) * 8, len(self.auth)) |