summaryrefslogtreecommitdiffhomepage
path: root/tests/unit/ofproto/test_parser_v10.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/ofproto/test_parser_v10.py')
-rw-r--r--tests/unit/ofproto/test_parser_v10.py5556
1 files changed, 5556 insertions, 0 deletions
diff --git a/tests/unit/ofproto/test_parser_v10.py b/tests/unit/ofproto/test_parser_v10.py
new file mode 100644
index 00000000..eefb65f2
--- /dev/null
+++ b/tests/unit/ofproto/test_parser_v10.py
@@ -0,0 +1,5556 @@
+# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+import unittest
+import logging
+import six
+from nose.tools import *
+from ryu.ofproto.ofproto_v1_0_parser import *
+from ryu.ofproto.nx_actions import *
+from ryu.ofproto import ofproto_v1_0_parser
+from ryu.lib import addrconv
+
+
+LOG = logging.getLogger('test_ofproto_v10')
+
+
+class TestOFPPhyPort(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPhyPort
+ """
+
+ # OFP_PHY_PORT_PACK_STR
+ # '!H6s16sIIIIII'... port_no, hw_addr, name, config, state
+ # curr, advertised, supported, peer
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ hw_addr = '52:54:54:10:20:99'
+ name = b'name'.ljust(16)
+ config = {'buf': b'\x84\xb6\x8c\x53', 'val': 2226555987}
+ state = {'buf': b'\x64\x07\xfb\xc9', 'val': 1678244809}
+ curr = {'buf': b'\xa9\xe8\x0a\x2b', 'val': 2850556459}
+ advertised = {'buf': b'\x78\xb9\x7b\x72', 'val': 2025421682}
+ supported = {'buf': b'\x7e\x65\x68\xad', 'val': 2120575149}
+ peer = {'buf': b'\xa4\x5b\x8b\xed', 'val': 2757463021}
+
+ buf = port_no['buf'] \
+ + addrconv.mac.text_to_bin(hw_addr) \
+ + name \
+ + config['buf'] \
+ + state['buf'] \
+ + curr['buf'] \
+ + advertised['buf'] \
+ + supported['buf'] \
+ + peer['buf']
+
+ c = OFPPhyPort(port_no['val'],
+ hw_addr,
+ name,
+ config['val'],
+ state['val'],
+ curr['val'],
+ advertised['val'],
+ supported['val'],
+ peer['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port_no['val'], self.c.port_no)
+ eq_(self.hw_addr, self.c.hw_addr)
+ eq_(self.name, self.c.name)
+ eq_(self.config['val'], self.c.config)
+ eq_(self.state['val'], self.c.state)
+ eq_(self.curr['val'], self.c.curr)
+ eq_(self.advertised['val'], self.c.advertised)
+ eq_(self.supported['val'], self.c.supported)
+ eq_(self.peer['val'], self.c.peer)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.port_no['val'], res.port_no)
+ eq_(self.hw_addr, res.hw_addr)
+ eq_(self.name, res.name)
+ eq_(self.config['val'], res.config)
+ eq_(self.state['val'], res.state)
+ eq_(self.curr['val'], res.curr)
+ eq_(self.advertised['val'], res.advertised)
+ eq_(self.supported['val'], res.supported)
+ eq_(self.peer['val'], res.peer)
+
+
+class TestOFPMatch(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPMatch
+ """
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...wildcards, in_port, dl_src, dl_dst, dl_vlan,
+ # dl_vlan_pcp, dl_type, nw_tos, nw_proto,
+ # nw_src, nw_dst, tp_src, tp_dst
+ wildcards = {'buf': b'\x00\x02\x10\x00', 'val': 135168}
+ in_port = {'buf': b'\x37\x8b', 'val': 14219}
+ dl_src = {'buf': b'\x52\x54\x54\x10\x20\x99',
+ 'human': '52:54:54:10:20:99'}
+ dl_dst = {'buf': b'\x61\x31\x50\x6d\xc9\xe5',
+ 'human': '61:31:50:6d:c9:e5'}
+ dl_vlan = {'buf': b'\xc1\xf9', 'val': 49657}
+ dl_vlan_pcp = {'buf': b'\x79', 'val': 121}
+ zfill0 = b'\x00'
+ dl_type = {'buf': b'\xa6\x9e', 'val': 42654}
+ nw_tos = {'buf': b'\xde', 'val': 222}
+ nw_proto = {'buf': b'\xe5', 'val': 229}
+ zfil11 = b'\x00' * 2
+ nw_src = {'buf': b'\x1b\x6d\x8d\x4b', 'val': 460164427,
+ 'human': '27.109.141.75'}
+ nw_dst = {'buf': b'\xab\x25\xe1\x20', 'val': 2871386400,
+ 'human': '171.37.225.32'}
+ tp_src = {'buf': b'\xd5\xc3', 'val': 54723}
+ tp_dst = {'buf': b'\x78\xb9', 'val': 30905}
+
+ buf = wildcards['buf'] \
+ + in_port['buf'] \
+ + dl_src['buf'] \
+ + dl_dst['buf'] \
+ + dl_vlan['buf'] \
+ + dl_vlan_pcp['buf'] \
+ + zfill0 \
+ + dl_type['buf'] \
+ + nw_tos['buf'] \
+ + nw_proto['buf'] \
+ + zfil11 \
+ + nw_src['buf'] \
+ + nw_dst['buf'] \
+ + tp_src['buf'] \
+ + tp_dst['buf']
+
+ def _get_obj(self, dl_src, dl_dst):
+ c = OFPMatch(self.wildcards['val'],
+ self.in_port['val'],
+ dl_src,
+ dl_dst,
+ self.dl_vlan['val'],
+ self.dl_vlan_pcp['val'],
+ self.dl_type['val'],
+ self.nw_tos['val'],
+ self.nw_proto['val'],
+ self.nw_src['val'],
+ self.nw_dst['val'],
+ self.tp_src['val'],
+ self.tp_dst['val'])
+ return c
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ c = self._get_obj(self.dl_src['buf'], self.dl_dst['buf'])
+
+ eq_(self.wildcards['val'], c.wildcards)
+ eq_(self.in_port['val'], c.in_port)
+ eq_(self.dl_src['buf'], c.dl_src)
+ eq_(self.dl_dst['buf'], c.dl_dst)
+ eq_(self.dl_vlan['val'], c.dl_vlan)
+ eq_(self.dl_vlan_pcp['val'], c.dl_vlan_pcp)
+ eq_(self.dl_type['val'], c.dl_type)
+ eq_(self.nw_tos['val'], c.nw_tos)
+ eq_(self.nw_proto['val'], c.nw_proto)
+ eq_(self.nw_src['val'], c.nw_src)
+ eq_(self.nw_dst['val'], c.nw_dst)
+ eq_(self.tp_src['val'], c.tp_src)
+ eq_(self.tp_dst['val'], c.tp_dst)
+
+ def test_init_zero(self):
+ c = self._get_obj(0, 0)
+ eq_(mac.DONTCARE, c.dl_src)
+ eq_(mac.DONTCARE, c.dl_dst)
+
+ def test_parse(self):
+ c = self._get_obj(self.dl_src['buf'], self.dl_dst['buf'])
+ res = c.parse(self.buf, 0)
+
+ eq_(self.wildcards['val'], res.wildcards)
+ eq_(self.in_port['val'], res.in_port)
+ eq_(self.dl_src['buf'], res.dl_src)
+ eq_(self.dl_dst['buf'], res.dl_dst)
+ eq_(self.dl_vlan['val'], res.dl_vlan)
+ eq_(self.dl_vlan_pcp['val'], res.dl_vlan_pcp)
+ eq_(self.dl_type['val'], res.dl_type)
+ eq_(self.nw_tos['val'], res.nw_tos)
+ eq_(self.nw_proto['val'], res.nw_proto)
+ eq_(self.nw_src['val'], res.nw_src)
+ eq_(self.nw_dst['val'], res.nw_dst)
+ eq_(self.tp_src['val'], res.tp_src)
+ eq_(self.tp_dst['val'], res.tp_dst)
+
+ def test_serialize(self):
+ buf = bytearray()
+ c = self._get_obj(self.dl_src['buf'], self.dl_dst['buf'])
+
+ c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_MATCH_PACK_STR
+ res = struct.unpack_from(fmt, six.binary_type(buf))
+
+ eq_(self.wildcards['val'], res[0])
+ eq_(self.in_port['val'], res[1])
+ eq_(self.dl_src['buf'], res[2])
+ eq_(self.dl_dst['buf'], res[3])
+ eq_(self.dl_vlan['val'], res[4])
+ eq_(self.dl_vlan_pcp['val'], res[5])
+ eq_(self.dl_type['val'], res[6])
+ eq_(self.nw_tos['val'], res[7])
+ eq_(self.nw_proto['val'], res[8])
+ eq_(self.nw_src['val'], res[9])
+ eq_(self.nw_dst['val'], res[10])
+ eq_(self.tp_src['val'], res[11])
+ eq_(self.tp_dst['val'], res[12])
+
+ def test_getitem(self):
+ c = self._get_obj(self.dl_src['buf'], self.dl_dst['buf'])
+
+ eq_(self.wildcards['val'], c["wildcards"])
+ eq_(self.in_port['val'], c["in_port"])
+ eq_(self.dl_src['human'], c["dl_src"])
+ eq_(self.dl_dst['human'], c["dl_dst"])
+ eq_(self.dl_vlan['val'], c["dl_vlan"])
+ eq_(self.dl_vlan_pcp['val'], c["dl_vlan_pcp"])
+ eq_(self.dl_type['val'], c["dl_type"])
+ eq_(self.nw_tos['val'], c["nw_tos"])
+ eq_(self.nw_proto['val'], c["nw_proto"])
+ eq_(self.nw_src['human'], c["nw_src"])
+ eq_(self.nw_dst['human'], c["nw_dst"])
+ eq_(self.tp_src['val'], c["tp_src"])
+ eq_(self.tp_dst['val'], c["tp_dst"])
+
+
+class TestOFPActionHeader(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionHeader
+ """
+
+ # OFP_ACTION_HEADER_PACK_STR
+ # '!HH4x'...type, len, zfill
+ type = {'buf': b'\x00\x02', 'val': ofproto.OFPAT_SET_VLAN_PCP}
+ len = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_HEADER_SIZE}
+ zfill = b'\x00' * 4
+
+ buf = type['buf'] \
+ + len['buf'] \
+ + zfill
+
+ c = OFPActionHeader(type['val'], len['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.type['val'], self.c.type)
+ eq_(self.len['val'], self.c.len)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_HEADER_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type['val'], res[0])
+ eq_(self.len['val'], res[1])
+
+
+class TestOFPActionOutput(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionOutput
+ """
+
+ # OFP_ACTION_OUTPUT_PACK_STR
+ # '!HHHH'...type, len, port, max_len
+ type_ = {'buf': b'\x00\x00', 'val': ofproto.OFPAT_OUTPUT}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+ port = {'buf': b'\x19\xce', 'val': 6606}
+ max_len = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + port['buf'] \
+ + max_len['buf']
+
+ c = OFPActionOutput(port['val'], max_len['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port['val'], self.c.port)
+ eq_(self.max_len['val'], self.c.max_len)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.port['val'], res.port)
+ eq_(self.max_len['val'], res.max_len)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x01', 'val': 1}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.port['buf'] \
+ + self.max_len['buf']
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.port['buf'] \
+ + self.max_len['buf']
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_OUTPUT_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.port['val'], res[2])
+ eq_(self.max_len['val'], res[3])
+
+
+class TestOFPActionVlanVid(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionVlanVid
+ """
+
+ # OFP_ACTION_VLAN_VID_PACK_STR
+ # '!HHH2x'...type, len, vlan_vid, zfill
+ type_ = {'buf': b'\x00\x01', 'val': ofproto.OFPAT_SET_VLAN_VID}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_VLAN_VID_SIZE}
+ vlan_vid = {'buf': b'\x3c\x0e', 'val': 15374}
+ zfill = b'\x00' * 2
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vlan_vid['buf'] \
+ + zfill
+
+ c = OFPActionVlanVid(vlan_vid['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.vlan_vid['val'], self.c.vlan_vid)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.vlan_vid['val'], res.vlan_vid)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x02', 'val': 2}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.vlan_vid['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.vlan_vid['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_VLAN_VID_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vlan_vid['val'], res[2])
+
+
+class TestOFPActionVlanPcp(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionVlanPcp
+ """
+
+ # OFP_ACTION_VLAN_PCP_PACK_STR
+ # '!HHB3x'...type, len, vlan_pcp, zfill
+ type_ = {'buf': b'\x00\x02', 'val': ofproto.OFPAT_SET_VLAN_PCP}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_VLAN_PCP_SIZE}
+ vlan_pcp = {'buf': b'\x1c', 'val': 28}
+ zfill = b'\x00' * 3
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vlan_pcp['buf'] \
+ + zfill
+
+ c = OFPActionVlanPcp(vlan_pcp['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.vlan_pcp['val'], self.c.vlan_pcp)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.vlan_pcp['val'], res.vlan_pcp)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x01', 'val': 1}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.vlan_pcp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.vlan_pcp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_VLAN_PCP_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vlan_pcp['val'], res[2])
+
+
+class TestOFPActionStripVlan(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionStripVlan
+ """
+
+ # OFP_ACTION_HEADER_PACK_STR
+ # '!HH4x'...type, len, zfill
+ type_ = {'buf': b'\x00\x03', 'val': ofproto.OFPAT_STRIP_VLAN}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_HEADER_SIZE}
+ zfill = b'\x00' * 4
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + zfill
+
+ c = OFPActionStripVlan()
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ ok_(self.c.parser(self.buf, 0))
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x01', 'val': 1}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+
+class TestOFPActionSetDlSrc(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetDlSrc
+ """
+
+ # OFP_ACTION_DL_ADDR_PACK_STR
+ # '!HH6s6x'...type, len, dl_addr, zfill
+ type_ = {'buf': b'\x00\x04', 'val': ofproto.OFPAT_SET_DL_SRC}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.OFP_ACTION_DL_ADDR_SIZE}
+ dl_addr = b'\x0e\xde\x27\xce\xc6\xcf'
+ zfill = b'\x00' * 6
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + dl_addr \
+ + zfill
+
+ c = OFPActionSetDlSrc(dl_addr)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.dl_addr, self.c.dl_addr)
+
+ def test_parser_type_src(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.dl_addr, res.dl_addr)
+
+ def test_parser_type_dst(self):
+ type_ = {'buf': b'\x00\x05', 'val': ofproto.OFPAT_SET_DL_DST}
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ eq_(self.dl_addr, res.dl_addr)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x06', 'val': 6}
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_DL_ADDR_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.dl_addr, res[2])
+
+
+class TestOFPActionSetDlDst(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetDlDst
+ """
+
+ # OFP_ACTION_DL_ADDR_PACK_STR
+ # '!HH6s6x'...type, len, dl_addr, zfill
+ type_ = {'buf': b'\x00\x05', 'val': ofproto.OFPAT_SET_DL_DST}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.OFP_ACTION_DL_ADDR_SIZE}
+ dl_addr = b'\x37\x48\x38\x9a\xf4\x28'
+ zfill = b'\x00' * 6
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + dl_addr \
+ + zfill
+
+ c = OFPActionSetDlDst(dl_addr)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.dl_addr, self.c.dl_addr)
+
+ def test_parser_type_dst(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.dl_addr, res.dl_addr)
+
+ def test_parser_type_src(self):
+ type_ = {'buf': b'\x00\x04', 'val': ofproto.OFPAT_SET_DL_SRC}
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ eq_(self.dl_addr, res.dl_addr)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x06', 'val': 6}
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.dl_addr \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_DL_ADDR_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.dl_addr, res[2])
+
+
+class TestOFPActionSetNwSrc(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetNwSrc
+ """
+
+ # OFP_ACTION_NW_ADDR_PACK_STR
+ # '!HHI'...type, len, nw_addr
+ type_ = {'buf': b'\x00\x06', 'val': ofproto.OFPAT_SET_NW_SRC}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_NW_ADDR_SIZE}
+ nw_addr = {'buf': b'\xc0\xa8\x7a\x0a', 'val': 3232266762}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + nw_addr['buf']
+
+ c = OFPActionSetNwSrc(nw_addr['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.nw_addr['val'], self.c.nw_addr)
+
+ def test_parser_src(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.nw_addr['val'], res.nw_addr)
+
+ def test_parser_dst(self):
+ type_ = {'buf': b'\x00\x07', 'val': ofproto.OFPAT_SET_NW_DST}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.nw_addr['buf']
+
+ res = self.c.parser(buf, 0)
+ eq_(self.nw_addr['val'], res.nw_addr)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x05', 'val': 5}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.nw_addr['buf']
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x10', 'val': 16}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.nw_addr['buf']
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_NW_ADDR_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.nw_addr['val'], res[2])
+
+
+class TestOFPActionSetNwDst(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetNwDst
+ """
+
+ # OFP_ACTION_NW_ADDR_PACK_STR
+ # '!HHI'...type, len, nw_addr
+ type_ = {'buf': b'\x00\x07', 'val': ofproto.OFPAT_SET_NW_DST}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_NW_ADDR_SIZE}
+ nw_addr = {'buf': b'\xc0\xa8\x7a\x0a', 'val': 3232266762}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + nw_addr['buf']
+
+ c = OFPActionSetNwDst(nw_addr['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.nw_addr['val'], self.c.nw_addr)
+
+ def test_parser_dst(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.nw_addr['val'], res.nw_addr)
+
+ def test_parser_src(self):
+ type_ = {'buf': b'\x00\x06', 'val': ofproto.OFPAT_SET_NW_SRC}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.nw_addr['buf']
+
+ res = self.c.parser(buf, 0)
+ eq_(self.nw_addr['val'], res.nw_addr)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x05', 'val': 5}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.nw_addr['buf']
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x10', 'val': 16}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.nw_addr['buf']
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_NW_ADDR_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.nw_addr['val'], res[2])
+
+
+class TestOFPActionSetNwTos(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetNwTos
+ """
+
+ # OFP_ACTION_NW_TOS_PACK_STR
+ # '!HHB3x'...type, len, tos, zfill
+ type_ = {'buf': b'\x00\x08', 'val': ofproto.OFPAT_SET_NW_TOS}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_NW_TOS_SIZE}
+ tos = {'buf': b'\xb6', 'val': 182}
+ zfill = b'\x00' * 3
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + tos['buf'] \
+ + zfill
+
+ c = OFPActionSetNwTos(tos['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.tos['val'], self.c.tos)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.tos['val'], res.tos)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x05', 'val': 5}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.tos['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.tos['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_NW_TOS_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.tos['val'], res[2])
+
+
+class TestOFPActionSetTpSrc(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetTpSrc
+ """
+
+ # OFP_ACTION_TP_PORT_PACK_STR
+ # '!HHH2x'...type, len, tp, zfill
+ type_ = {'buf': b'\x00\x09', 'val': ofproto.OFPAT_SET_TP_SRC}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_TP_PORT_SIZE}
+ tp = {'buf': b'\x07\xf1', 'val': 2033}
+ zfill = b'\x00' * 2
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + tp['buf'] \
+ + zfill
+
+ c = OFPActionSetTpSrc(tp['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.tp['val'], self.c.tp)
+
+ def test_parser_src(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.tp['val'], res.tp)
+
+ def test_parser_dst(self):
+ type_ = {'buf': b'\x00\x0a', 'val': ofproto.OFPAT_SET_TP_DST}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ res = self.c.parser(self.buf, 0)
+ eq_(self.tp['val'], res.tp)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_TP_PORT_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.tp['val'], res[2])
+
+
+class TestOFPActionSetTpDst(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionSetTpDst
+ """
+
+ # OFP_ACTION_TP_PORT_PACK_STR
+ # '!HHH2x'...type, len, tp, zfill
+ type_ = {'buf': b'\x00\x0a', 'val': ofproto.OFPAT_SET_TP_DST}
+ len_ = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_TP_PORT_SIZE}
+ tp = {'buf': b'\x06\x6d', 'val': 1645}
+ zfill = b'\x00' * 2
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + tp['buf'] \
+ + zfill
+
+ c = OFPActionSetTpDst(tp['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.tp['val'], self.c.tp)
+
+ def test_parser_dst(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.tp['val'], res.tp)
+
+ def test_parser_src(self):
+ type_ = {'buf': b'\x00\x09', 'val': ofproto.OFPAT_SET_TP_SRC}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ res = self.c.parser(buf, 0)
+ eq_(self.tp['val'], res.tp)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x10', 'val': 16}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x07', 'val': 7}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.tp['buf'] \
+ + self.zfill
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_TP_PORT_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.tp['val'], res[2])
+
+
+class TestOFPActionEnqueue(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPActionEnqueue
+ """
+
+ # OFP_ACTION_ENQUEUE_PACK_STR
+ # '!HHH6xI'...type_, len_, port, zfill, queue_id
+ type_ = {'buf': b'\x00\x0b', 'val': ofproto.OFPAT_ENQUEUE}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.OFP_ACTION_ENQUEUE_SIZE}
+ port = {'buf': b'\x04\x55', 'val': 1109}
+ zfill = b'\x00' * 6
+ queue_id = {'buf': b'\x0a\x5b\x03\x5e', 'val': 173736798}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + port['buf'] \
+ + zfill \
+ + queue_id['buf']
+
+ c = OFPActionEnqueue(port['val'], queue_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port['val'], self.c.port)
+ eq_(self.queue_id['val'], self.c.queue_id)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.port['val'], res.port)
+ eq_(self.queue_id['val'], res.queue_id)
+
+ @raises(AssertionError)
+ def test_parser_check_type(self):
+ type_ = {'buf': b'\x00\x0a', 'val': 10}
+
+ buf = type_['buf'] \
+ + self.len_['buf'] \
+ + self.port['buf'] \
+ + self.zfill \
+ + self.queue_id['buf']
+
+ self.c.parser(buf, 0)
+
+ @raises(AssertionError)
+ def test_parser_check_len(self):
+ len_ = {'buf': b'\x00\x05', 'val': 5}
+
+ buf = self.type_['buf'] \
+ + len_['buf'] \
+ + self.port['buf'] \
+ + self.zfill \
+ + self.queue_id['buf']
+
+ self.c.parser(buf, 0)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.OFP_ACTION_ENQUEUE_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.port['val'], res[2])
+ eq_(self.queue_id['val'], res[3])
+
+
+class TestNXActionResubmit(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionResubmit
+ """
+
+ # NX_ACTION_RESUBMIT_PACK_STR
+ # '!HHIHHB3x'...type, len, vendor, subtype, in_port, table, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_RESUBMIT_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20', 'val': 8992}
+ subtype = {'buf': b'\x00\x01', 'val': 1}
+ in_port = {'buf': b'\x0a\x4c', 'val': 2636}
+ table = {'buf': b'\x52', 'val': 82}
+ zfill = b'\x00' * 3
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + in_port['buf'] \
+ + table['buf'] \
+ + zfill
+
+ c = NXActionResubmit(in_port['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.in_port['val'], self.c.in_port)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.in_port['val'], res.in_port)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_RESUBMIT_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.in_port['val'], res[4])
+
+
+class TestNXActionResubmitTable(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionResubmitTable
+ """
+
+ # NX_ACTION_RESUBMIT_PACK_STR
+ # '!HHIHHB3x'...type, len, vendor, subtype, in_port, table, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_RESUBMIT_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20', 'val': 8992}
+ subtype = {'buf': b'\x00\x0e', 'val': 14}
+ in_port = {'buf': b'\x0a\x4c', 'val': 2636}
+ table_id = {'buf': b'\x52', 'val': 82}
+ zfill = b'\x00' * 3
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + in_port['buf'] \
+ + table_id['buf'] \
+ + zfill
+
+ c = NXActionResubmitTable(in_port['val'], table_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.in_port['val'], self.c.in_port)
+ eq_(self.table_id['val'], self.c.table_id)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.in_port['val'], res.in_port)
+ eq_(self.table_id['val'], res.table_id)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_RESUBMIT_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.in_port['val'], res[4])
+ eq_(self.table_id['val'], res[5])
+
+
+class TestNXActionSetTunnel(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionSetTunnel
+ """
+
+ # NX_ACTION_SET_TUNNEL_PACK_STR
+ # '!HHIH2xI'...type, len, vendor, subtype, zfill, tun_id
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_SET_TUNNEL_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20', 'val': 8992}
+ subtype = {'buf': b'\x00\x02', 'val': 2}
+ zfill = b'\x00' * 2
+ tun_id = {'buf': b'\x01\x6f\x01\xd0', 'val': 24052176}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + zfill \
+ + tun_id['buf']
+
+ c = NXActionSetTunnel(tun_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.tun_id['val'], self.c.tun_id)
+
+ def test_parse(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.tun_id['val'], res.tun_id)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_SET_TUNNEL_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.tun_id['val'], res[4])
+
+
+class TestNXActionSetQueue(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionSetQueue
+ """
+
+ # NX_ACTION_SET_QUEUE_PACK_STR
+ # '!HHIH2xI'...type, len, vendor, subtype, zfill, queue_id
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_SET_TUNNEL_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x04', 'val': ofproto.NXAST_SET_QUEUE}
+ zfill = b'\x00' * 2
+ queue_id = {'buf': b'\xde\xbe\xc5\x18', 'val': 3737044248}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + zfill \
+ + queue_id['buf']
+
+ c = NXActionSetQueue(queue_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.queue_id['val'], self.c.queue_id)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.queue_id['val'], res.queue_id)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_SET_QUEUE_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.queue_id['val'], res[4])
+
+
+class TestNXActionPopQueue(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionPopQueue
+ """
+
+ # NX_ACTION_POP_QUEUE_PACK_STR
+ # '!HHIH6x'...type, len, vendor, subtype, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_SET_TUNNEL_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x05', 'val': ofproto.NXAST_POP_QUEUE}
+ zfill = b'\x00' * 6
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + zfill
+
+ c = NXActionPopQueue()
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_POP_QUEUE_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+
+
+class TestNXActionRegMove(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionRegMove
+ """
+
+ # NX_ACTION_REG_MOVE_PACK_STR
+ # '!HHIHHHHII'...type_, len_, vendor, subtype, n_bits,
+ # src_ofs, dst_ofs, src, dst
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x18', 'val': ofproto.NX_ACTION_REG_MOVE_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x06', 'val': ofproto.NXAST_REG_MOVE}
+ n_bits = {'buf': b'\x3d\x98', 'val': 15768}
+ src_ofs = {'buf': b'\xf3\xa3', 'val': 62371}
+ dst_ofs = {'buf': b'\xdc\x67', 'val': 56423}
+ src_field = {'buf': b'\x00\x01\x00\x04', 'val': "reg0", "val2": 65540}
+ dst_field = {'buf': b'\x00\x01\x02\x04', 'val': "reg1", "val2": 66052}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + n_bits['buf'] \
+ + src_ofs['buf'] \
+ + dst_ofs['buf'] \
+ + src_field['buf'] \
+ + dst_field['buf']
+
+ c = NXActionRegMove(src_field['val'],
+ dst_field['val'],
+ n_bits['val'],
+ src_ofs['val'],
+ dst_ofs['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.src_field['val'], self.c.src_field)
+ eq_(self.dst_field['val'], self.c.dst_field)
+ eq_(self.n_bits['val'], self.c.n_bits)
+ eq_(self.src_field['val'], self.c.src_field)
+ eq_(self.dst_field['val'], self.c.dst_field)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.src_ofs['val'], res.src_ofs)
+ eq_(self.dst_ofs['val'], res.dst_ofs)
+ eq_(self.n_bits['val'], res.n_bits)
+ eq_(self.src_field['val'], res.src_field)
+ eq_(self.dst_field['val'], res.dst_field)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_REG_MOVE_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.n_bits['val'], res[4])
+ eq_(self.src_ofs['val'], res[5])
+ eq_(self.dst_ofs['val'], res[6])
+ eq_(self.src_field['val2'], res[7])
+ eq_(self.dst_field['val2'], res[8])
+
+
+class TestNXActionRegLoad(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionRegLoad
+ """
+
+ # NX_ACTION_REG_LOAD_PACK_STR
+ # '!HHIHHIQ'...type_, len_, vendor, subtype,
+ # ofs_nbits, dst, value
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x18', 'val': ofproto.NX_ACTION_REG_MOVE_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x07', 'val': ofproto.NXAST_REG_LOAD}
+ ofs_nbits = {'buf': b'\x3d\x98', 'val': 15768}
+ dst = {'buf': b'\x00\x01\x00\x04', 'val': "reg0", "val2": 65540}
+ value = {'buf': b'\x33\x51\xcd\x43\x25\x28\x18\x99',
+ 'val': 3697962457317775513}
+ start = 246
+ end = 270
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + ofs_nbits['buf'] \
+ + dst['buf'] \
+ + value['buf']
+
+ c = NXActionRegLoad(ofs_nbits['val'],
+ dst['val'],
+ value['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.dst['val'], self.c.dst)
+ eq_(self.value['val'], self.c.value)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.dst['val'], res.dst)
+ eq_(self.value['val'], res.value)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_REG_LOAD_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.ofs_nbits['val'], res[4])
+ eq_(self.dst['val2'], res[5])
+ eq_(self.value['val'], res[6])
+
+
+class TestNXActionSetTunnel64(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionSetTunnel64
+ """
+
+ # NX_ACTION_SET_TUNNEL64_PACK_STR
+ # '!HHIH6xQ'...type, len, vendor, subtype, zfill, tun_id
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x18', 'val': ofproto.NX_ACTION_SET_TUNNEL64_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x09', 'val': ofproto.NXAST_SET_TUNNEL64}
+ zfill = b'\x00' * 6
+ tun_id = {'buf': b'\x6e\x01\xa6\xea\x7e\x36\x1d\xd9',
+ 'val': 7926800345218817497}
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + zfill \
+ + tun_id['buf']
+
+ c = NXActionSetTunnel64(tun_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.tun_id['val'], self.c.tun_id)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.tun_id['val'], res.tun_id)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_SET_TUNNEL64_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.tun_id['val'], res[4])
+
+
+class TestNXActionMultipath(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionMultipath
+ """
+
+ # NX_ACTION_MULTIPATH_PACK_STR
+ # '!HHIHHH2xHHI2xHI'...type, len, vendor, subtype, fields, basis, zfill
+ # algorithm, max_link, arg, zfill, ofs_nbits, dst
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x20', 'val': ofproto.NX_ACTION_MULTIPATH_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x0a', 'val': ofproto.NXAST_MULTIPATH}
+ fields = {'buf': b'\x6d\xf5', 'val': 28149}
+ basis = {'buf': b'\x7c\x0a', 'val': 31754}
+ zfill0 = b'\x00' * 2
+ algorithm = {'buf': b'\x82\x1d', 'val': 33309}
+ max_link = {'buf': b'\x06\x2b', 'val': 1579}
+ arg = {'buf': b'\x18\x79\x41\xc8', 'val': 410599880}
+ zfill1 = b'\x00' * 2
+ ofs_nbits = {'buf': b'\xa9\x9a', 'val': 43418}
+ dst = {'buf': b'\x00\x01\x00\x04', 'val': "reg0", 'val2': 65540}
+ start = 678
+ end = 704
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + fields['buf'] \
+ + basis['buf'] \
+ + zfill0 \
+ + algorithm['buf'] \
+ + max_link['buf'] \
+ + arg['buf'] \
+ + zfill1 \
+ + ofs_nbits['buf'] \
+ + dst['buf']
+
+ c = NXActionMultipath(fields['val'],
+ basis['val'],
+ algorithm['val'],
+ max_link['val'],
+ arg['val'],
+ ofs_nbits['val'],
+ dst['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.fields['val'], self.c.fields)
+ eq_(self.basis['val'], self.c.basis)
+ eq_(self.algorithm['val'], self.c.algorithm)
+ eq_(self.max_link['val'], self.c.max_link)
+ eq_(self.arg['val'], self.c.arg)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.dst['val'], self.c.dst)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.fields['val'], res.fields)
+ eq_(self.basis['val'], res.basis)
+ eq_(self.algorithm['val'], res.algorithm)
+ eq_(self.max_link['val'], res.max_link)
+ eq_(self.arg['val'], res.arg)
+ eq_(self.ofs_nbits['val'], res.ofs_nbits)
+ eq_(self.dst['val'], res.dst)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_MULTIPATH_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.fields['val'], res[4])
+ eq_(self.basis['val'], res[5])
+ eq_(self.algorithm['val'], res[6])
+ eq_(self.max_link['val'], res[7])
+ eq_(self.arg['val'], res[8])
+ eq_(self.ofs_nbits['val'], res[9])
+ eq_(self.dst['val2'], res[10])
+
+
+class TestNXActionBundle(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionBundle
+ """
+
+ # NX_ACTION_BUNDLE_PACK_STR
+ # '!HHIHHHHIHHI4x'...type, len, vendor, subtype, algorithm,
+ # fields, basis, slave_type, n_slaves,
+ # ofs_nbits, dst, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x28', 'val': (ofproto.NX_ACTION_BUNDLE_SIZE + 8)}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x0c', 'val': ofproto.NXAST_BUNDLE}
+ algorithm = {'buf': b'\x51\xa7', 'val': 20903}
+ fields = {'buf': b'\xf8\xef', 'val': 63727}
+ basis = {'buf': b'\xfd\x6f', 'val': 64879}
+ slave_type = {'buf': b'\x7c\x51\x0f\xe0', 'val': 2085687264}
+ n_slaves = {'buf': b'\x00\x02', 'val': 2}
+ ofs_nbits = {'buf': b'\x00\x00', 'val': 0}
+ dst = {'buf': b'\x00\x00\x00\x00', 'val': 0}
+ zfill = b'\x00' * 4
+
+ slaves_buf = (b'\x00\x01', b'\x00\x02')
+ slaves_val = (1, 2)
+
+ _len = len_['val'] + len(slaves_val) * 2
+ _len += (_len % 8)
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + algorithm['buf'] \
+ + fields['buf'] \
+ + basis['buf'] \
+ + slave_type['buf'] \
+ + n_slaves['buf'] \
+ + ofs_nbits['buf'] \
+ + dst['buf'] \
+ + zfill \
+ + slaves_buf[0] \
+ + slaves_buf[1]
+
+ c = NXActionBundle(algorithm['val'],
+ fields['val'],
+ basis['val'],
+ slave_type['val'],
+ n_slaves['val'],
+ ofs_nbits['val'],
+ dst['val'],
+ slaves_val)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.algorithm['val'], self.c.algorithm)
+ eq_(self.fields['val'], self.c.fields)
+ eq_(self.basis['val'], self.c.basis)
+ eq_(self.slave_type['val'], self.c.slave_type)
+ eq_(self.n_slaves['val'], self.c.n_slaves)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.dst['val'], self.c.dst)
+
+ # slaves
+ slaves = self.c.slaves
+ eq_(self.slaves_val[0], slaves[0])
+ eq_(self.slaves_val[1], slaves[1])
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.algorithm['val'], res.algorithm)
+ eq_(self.fields['val'], res.fields)
+ eq_(self.basis['val'], res.basis)
+ eq_(self.slave_type['val'], res.slave_type)
+ eq_(self.n_slaves['val'], res.n_slaves)
+ eq_(self.ofs_nbits['val'], res.ofs_nbits)
+ eq_(self.dst['val'], res.dst)
+
+ # slaves
+ slaves = res.slaves
+ eq_(self.slaves_val[0], slaves[0])
+ eq_(self.slaves_val[1], slaves[1])
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = '!' \
+ + ofproto.NX_ACTION_BUNDLE_PACK_STR.replace('!', '') \
+ + 'HH4x'
+
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.algorithm['val'], res[4])
+ eq_(self.fields['val'], res[5])
+ eq_(self.basis['val'], res[6])
+ eq_(self.slave_type['val'], res[7])
+ eq_(self.n_slaves['val'], res[8])
+ eq_(self.ofs_nbits['val'], res[9])
+ eq_(self.dst['val'], res[10])
+
+
+class TestNXActionBundleLoad(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionBundleLoad
+ """
+
+ # NX_ACTION_BUNDLE_PACK_STR
+ # '!HHIHHHHIHHI4x'...type, len, vendor, subtype, algorithm,
+ # fields, basis, slave_type, n_slaves,
+ # ofs_nbits, dst, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x28', 'val': (ofproto.NX_ACTION_BUNDLE_SIZE + 8)}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x0d', 'val': ofproto.NXAST_BUNDLE_LOAD}
+ algorithm = {'buf': b'\x83\x15', 'val': 33557}
+ fields = {'buf': b'\xc2\x7a', 'val': 49786}
+ basis = {'buf': b'\x86\x18', 'val': 34328}
+ slave_type = {'buf': b'\x18\x42\x0b\x55', 'val': 406981461}
+ n_slaves = {'buf': b'\x00\x02', 'val': 2}
+ ofs_nbits = {'buf': b'\xd2\x9d', 'val': 53917}
+ dst = {'buf': b'\x00\x01\x00\x04', 'val': "reg0", 'val2': 65540}
+ zfill = b'\x00' * 4
+
+ slaves_buf = (b'\x00\x01', b'\x00\x02')
+ slaves_val = (1, 2)
+
+ _len = len_['val'] + len(slaves_val) * 2
+ _len += (_len % 8)
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + algorithm['buf'] \
+ + fields['buf'] \
+ + basis['buf'] \
+ + slave_type['buf'] \
+ + n_slaves['buf'] \
+ + ofs_nbits['buf'] \
+ + dst['buf'] \
+ + zfill \
+ + slaves_buf[0] \
+ + slaves_buf[1]
+
+ c = NXActionBundleLoad(algorithm['val'],
+ fields['val'],
+ basis['val'],
+ slave_type['val'],
+ n_slaves['val'],
+ ofs_nbits['val'],
+ dst['val'],
+ slaves_val)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.algorithm['val'], self.c.algorithm)
+ eq_(self.fields['val'], self.c.fields)
+ eq_(self.basis['val'], self.c.basis)
+ eq_(self.slave_type['val'], self.c.slave_type)
+ eq_(self.n_slaves['val'], self.c.n_slaves)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.dst['val'], self.c.dst)
+
+ # slaves
+ slaves = self.c.slaves
+ eq_(self.slaves_val[0], slaves[0])
+ eq_(self.slaves_val[1], slaves[1])
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.algorithm['val'], res.algorithm)
+ eq_(self.fields['val'], res.fields)
+ eq_(self.basis['val'], res.basis)
+ eq_(self.slave_type['val'], res.slave_type)
+ eq_(self.n_slaves['val'], res.n_slaves)
+ eq_(self.ofs_nbits['val'], res.ofs_nbits)
+ eq_(self.dst['val'], res.dst)
+
+ # slaves
+ slaves = res.slaves
+ eq_(self.slaves_val[0], slaves[0])
+ eq_(self.slaves_val[1], slaves[1])
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = '!' \
+ + ofproto.NX_ACTION_BUNDLE_PACK_STR.replace('!', '') \
+ + 'HH4x'
+
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.algorithm['val'], res[4])
+ eq_(self.fields['val'], res[5])
+ eq_(self.basis['val'], res[6])
+ eq_(self.slave_type['val'], res[7])
+ eq_(self.n_slaves['val'], res[8])
+ eq_(self.ofs_nbits['val'], res[9])
+ eq_(self.dst['val2'], res[10])
+
+
+class TestNXActionOutputReg(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionOutputReg
+ """
+
+ # NX_ACTION_OUTPUT_REG_PACK_STR
+ # '!HHIHHIH6x'...type, len, vendor, subtype, ofs_nbits,
+ # src, max_len, zfill
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x18', 'val': ofproto.NX_ACTION_OUTPUT_REG_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x0f', 'val': ofproto.NXAST_OUTPUT_REG}
+ ofs_nbits = {'buf': b'\xfe\x78', 'val': 65144}
+ src = {'buf': b'\x00\x01\x00\x04', 'val': "reg0", 'val2': 65540}
+ max_len = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+ zfill = b'\x00' * 6
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + ofs_nbits['buf'] \
+ + src['buf'] \
+ + max_len['buf'] \
+ + zfill
+
+ c = NXActionOutputReg(ofs_nbits['val'],
+ src['val'],
+ max_len['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.src['val'], self.c.src)
+ eq_(self.max_len['val'], self.c.max_len)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+ eq_(self.ofs_nbits['val'], self.c.ofs_nbits)
+ eq_(self.src['val'], res.src)
+ eq_(self.max_len['val'], res.max_len)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_OUTPUT_REG_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+ eq_(self.ofs_nbits['val'], res[4])
+ eq_(self.src['val2'], res[5])
+ eq_(self.max_len['val'], res[6])
+
+
+class TestNXActionExit(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXActionExit
+ """
+
+ # NX_ACTION_HEADER_PACK_STR
+ # '!HHIH'...type, len, vendor, subtype
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+ len_ = {'buf': b'\x00\x10', 'val': ofproto.NX_ACTION_HEADER_SIZE}
+ vendor = {'buf': b'\x00\x00\x23\x20',
+ 'val': ofproto_common.NX_EXPERIMENTER_ID}
+ subtype = {'buf': b'\x00\x11', 'val': ofproto.NXAST_EXIT}
+ zfill = b'\x00' * 6
+
+ buf = type_['buf'] \
+ + len_['buf'] \
+ + vendor['buf'] \
+ + subtype['buf'] \
+ + zfill
+
+ c = NXActionExit()
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.subtype['val'], self.c.subtype)
+
+ def test_parser(self):
+ res = OFPActionVendor.parser(self.buf, 0)
+ eq_(self.type_['val'], res.type)
+ eq_(self.len_['val'], res.len)
+ eq_(self.subtype['val'], res.subtype)
+
+ def test_serialize(self):
+ buf = bytearray()
+ self.c.serialize(buf, 0)
+
+ fmt = ofproto.NX_ACTION_HEADER_PACK_STR
+ res = struct.unpack(fmt, six.binary_type(buf))
+
+ eq_(self.type_['val'], res[0])
+ eq_(self.len_['val'], res[1])
+ eq_(self.vendor['val'], res[2])
+ eq_(self.subtype['val'], res[3])
+
+
+class TestOFPDescStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPDescStats
+ """
+
+ # OFP_DESC_STATS_PACK_STR
+ # '!256s256s256s32s256s'...mfr_desc, hw_desc, sw_desc, serial_num, dp_desc
+ mfr_desc = b'mfr_desc'.ljust(256)
+ hw_desc = b'hw_desc'.ljust(256)
+ sw_desc = b'sw_desc'.ljust(256)
+ serial_num = b'serial_num'.ljust(32)
+ dp_desc = b'dp_desc'.ljust(256)
+
+ buf = mfr_desc \
+ + hw_desc \
+ + sw_desc \
+ + serial_num \
+ + dp_desc
+
+ c = OFPDescStats(mfr_desc, hw_desc, sw_desc, serial_num, dp_desc)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.mfr_desc, self.c.mfr_desc)
+ eq_(self.hw_desc, self.c.hw_desc)
+ eq_(self.sw_desc, self.c.sw_desc)
+ eq_(self.serial_num, self.c.serial_num)
+ eq_(self.dp_desc, self.c.dp_desc)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.mfr_desc, self.mfr_desc)
+ eq_(self.hw_desc, self.hw_desc)
+ eq_(self.sw_desc, self.sw_desc)
+ eq_(self.serial_num, self.serial_num)
+ eq_(self.dp_desc, self.dp_desc)
+
+
+class TestOFPFlowStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFlowStats
+ """
+
+ # OFP_FLOW_STATS_0_PACK_STR
+ # '!HBx'...length, table_id, zfill
+ length = {'buf': b'\x00\x58', 'val': 88}
+ length_append_action = {'buf': b'\x00\x60', 'val': 96}
+ table_id = {'buf': b'\x51', 'val': 81}
+ zfill_0 = b'\x00'
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...
+ match = b'\x97\x7c\xa6\x1e' \
+ + b'\x5e\xa0' \
+ + b'\x7a\x3e\xed\x30\x4a\x90' \
+ + b'\x96\x8e\x67\xbe\x2f\xe2' \
+ + b'\xb1\x81' \
+ + b'\xbe' \
+ + b'\x00' \
+ + b'\x01\xab' \
+ + b'\x42' \
+ + b'\xfe' \
+ + b'\x00\x00' \
+ + b'\xa4\x5d\x5c\x42' \
+ + b'\xa2\x5c\x2e\x05' \
+ + b'\x5a\x94' \
+ + b'\x64\xd4'
+
+ # OFP_FLOW_STATS_1_PACK_STR
+ # '!IIHHH6xQQQ'...duration_sec, duration_nsec, priority,
+ # idle_timeout, hard_timeout, zfill,
+ # cookie, packet_count, byte_count
+ duration_sec = {'buf': b'\x94\x19\xb3\xd2', 'val': 2484712402}
+ duration_nsec = {'buf': b'\xee\x66\xcf\x7c', 'val': 3999715196}
+ priority = {'buf': b'\xe1\xc0', 'val': 57792}
+ idle_timeout = {'buf': b'\x8e\x10', 'val': 36368}
+ hard_timeout = {'buf': b'\xd4\x99', 'val': 54425}
+ zfill_1 = b'\x00\x00\x00\x00\x00\x00'
+ cookie = {'buf': b'\x0b\x01\xe8\xe5\xf0\x84\x8a\xe0',
+ 'val': 793171083674290912}
+ packet_count = {'buf': b'\x47\x5c\xc6\x05\x28\xff\x7c\xdb',
+ 'val': 5142202600015232219}
+ byte_count = {'buf': b'\x24\xe9\x4b\xee\xcb\x57\xd9\xc3',
+ 'val': 2659740543924820419}
+
+ # <action>_PACK_STR...type_, len_ [others...]
+ type = {'buf': b'\x00\x00', 'val': ofproto.OFPAT_OUTPUT}
+ len = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+ port = {'buf': b'\x59\x2a', 'val': 22826}
+ max_len = {'buf': b'\x00\x08', 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+ action = (type, len, port, max_len)
+
+ ACTION_TYPE = 0
+ ACTION_LEN = 1
+ ACTION_PORT = 2
+ ACTION_MAX_LEN = 3
+
+ c = OFPFlowStats()
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def _parser(self, action=None):
+ buf = self.table_id['buf'] \
+ + self.zfill_0 \
+ + self.match \
+ + self.duration_sec['buf'] \
+ + self.duration_nsec['buf'] \
+ + self.priority['buf'] \
+ + self.idle_timeout['buf'] \
+ + self.hard_timeout['buf'] \
+ + self.zfill_1 \
+ + self.cookie['buf'] \
+ + self.packet_count['buf'] \
+ + self.byte_count['buf']
+
+ if not action:
+ buf = self.length['buf'] + buf
+ else:
+ buf = self.length_append_action['buf'] + buf
+
+ for a in self.action:
+ buf = buf + a['buf']
+
+ return self.c.parser(buf, 0)
+
+ def test_parser(self):
+ res = self._parser()
+
+ eq_(self.length['val'], res.length)
+ eq_(self.table_id['val'], res.table_id)
+ eq_(self.duration_sec['val'], res.duration_sec)
+ eq_(self.duration_nsec['val'], res.duration_nsec)
+ eq_(self.priority['val'], res.priority)
+ eq_(self.idle_timeout['val'], res.idle_timeout)
+ eq_(self.hard_timeout['val'], res.hard_timeout)
+ eq_(self.cookie['val'], res.cookie)
+ eq_(self.packet_count['val'], res.packet_count)
+ eq_(self.byte_count['val'], res.byte_count)
+
+ def test_parser_append_actions(self):
+ res = self._parser(True).actions[0]
+
+ eq_(self.action[self.ACTION_TYPE]['val'], res.type)
+ eq_(self.action[self.ACTION_LEN]['val'], res.len)
+ eq_(self.action[self.ACTION_PORT]['val'], res.port)
+ eq_(self.action[self.ACTION_MAX_LEN]['val'], res.max_len)
+
+
+class TestOFPAggregateStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPAggregateStats
+ """
+
+ # OFP_AGGREGATE_STATS_REPLY_PACK_STR
+ # '!QQI4x'...packet_count, byte_count, flow_count, zfill
+ packet_count = {'buf': b'\x43\x95\x1b\xfb\x0f\xf6\xa7\xdd',
+ 'val': 4869829337189623773}
+ byte_count = {'buf': b'\x36\xda\x2d\x80\x2a\x95\x35\xdd',
+ 'val': 3952521651464517085}
+ flow_count = {'buf': b'\xc3\x0d\xc3\xed', 'val': 3272459245}
+ zfill = b'\x00' * 4
+
+ buf = packet_count['buf'] \
+ + byte_count['buf'] \
+ + flow_count['buf'] \
+ + zfill
+
+ c = OFPAggregateStats(packet_count['val'],
+ byte_count['val'],
+ flow_count['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.packet_count['val'], self.c.packet_count)
+ eq_(self.byte_count['val'], self.c.byte_count)
+ eq_(self.flow_count['val'], self.c.flow_count)
+
+ def test_parser(self):
+
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.packet_count['val'], res.packet_count)
+ eq_(self.byte_count['val'], res.byte_count)
+ eq_(self.flow_count['val'], res.flow_count)
+
+
+class TestOFPTableStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPTableStats
+ """
+
+ # OFP_TABLE_STATS_PACK_STR
+ # '!B3x32sIIIQQ'...table_id, zfill, name, wildcards, max_entries,
+ # active_count, lookup_count, matched_count
+ table_id = {'buf': b'\x5b', 'val': 91}
+ zfill = b'\x00' * 3
+ name = b'name'.ljust(32)
+ wildcards = {'buf': b'\xc5\xaf\x6e\x12', 'val': 3316608530}
+ max_entries = {'buf': b'\x95\x6c\x78\x4d', 'val': 2506913869}
+ active_count = {'buf': b'\x78\xac\xa8\x1e', 'val': 2024581150}
+ lookup_count = {'buf': b'\x40\x1d\x9c\x39\x19\xec\xd4\x1c',
+ 'val': 4620020561814017052}
+ matched_count = {'buf': b'\x27\x35\x02\xb6\xc5\x5e\x17\x65',
+ 'val': 2825167325263435621}
+
+ buf = table_id['buf'] \
+ + zfill \
+ + name \
+ + wildcards['buf'] \
+ + max_entries['buf'] \
+ + active_count['buf'] \
+ + lookup_count['buf'] \
+ + matched_count['buf']
+
+ c = OFPTableStats(table_id['val'],
+ name,
+ wildcards['val'],
+ max_entries['val'],
+ active_count['val'],
+ lookup_count['val'],
+ matched_count['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.table_id['val'], self.c.table_id)
+ eq_(self.name, self.c.name)
+ eq_(self.wildcards['val'], self.c.wildcards)
+ eq_(self.max_entries['val'], self.c.max_entries)
+ eq_(self.active_count['val'], self.c.active_count)
+ eq_(self.lookup_count['val'], self.c.lookup_count)
+ eq_(self.matched_count['val'], self.c.matched_count)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.table_id['val'], res.table_id)
+ eq_(self.name, res.name)
+ eq_(self.wildcards['val'], res.wildcards)
+ eq_(self.max_entries['val'], res.max_entries)
+ eq_(self.active_count['val'], res.active_count)
+ eq_(self.lookup_count['val'], res.lookup_count)
+ eq_(self.matched_count['val'], res.matched_count)
+
+
+class TestOFPPortStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPortStats
+ """
+
+ # OFP_PORT_STATS_PACK_STR
+ # '!H6xQQQQQQQQQQQQ'... port_no, zfill, rx_packets, tx_packets,
+ # rx_bytes, tx_bytes, rx_dropped, tx_dropped,
+ # rx_errors, tx_errors, rx_frame_err,
+ # rx_over_err, rx_crc_err, collisions
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ zfill = b'\x00' * 6
+ rx_packets = {'buf': b'\x53\x44\x36\x61\xc4\x86\xc0\x37',
+ 'val': 5999980397101236279}
+ tx_packets = {'buf': b'\x27\xa4\x41\xd7\xd4\x53\x9e\x42',
+ 'val': 2856480458895760962}
+ rx_bytes = {'buf': b'\x55\xa1\x38\x60\x43\x97\x0d\x89',
+ 'val': 6170274950576278921}
+ tx_bytes = {'buf': b'\x77\xe1\xd5\x63\x18\xae\x63\xaa',
+ 'val': 8638420181865882538}
+ rx_dropped = {'buf': b'\x60\xe6\x20\x01\x24\xda\x4e\x5a',
+ 'val': 6982303461569875546}
+ tx_dropped = {'buf': b'\x09\x2d\x5d\x71\x71\xb6\x8e\xc7',
+ 'val': 661287462113808071}
+ rx_errors = {'buf': b'\x2f\x7e\x35\xb3\x66\x3c\x19\x0d',
+ 'val': 3422231811478788365}
+ tx_errors = {'buf': b'\x57\x32\x08\x2f\x88\x32\x40\x6b',
+ 'val': 6283093430376743019}
+ rx_frame_err = {'buf': b'\x0c\x28\x6f\xad\xce\x66\x6e\x8b',
+ 'val': 876072919806406283}
+ rx_over_err = {'buf': b'\x5a\x90\x8f\x9b\xfc\x82\x2e\xa0',
+ 'val': 6525873760178941600}
+ rx_crc_err = {'buf': b'\x73\x3a\x71\x17\xd6\x74\x69\x47',
+ 'val': 8303073210207070535}
+ collisions = {'buf': b'\x2f\x52\x0c\x79\x96\x03\x6e\x79',
+ 'val': 3409801584220270201}
+
+ buf = port_no['buf'] \
+ + zfill \
+ + rx_packets['buf'] \
+ + tx_packets['buf'] \
+ + rx_bytes['buf'] \
+ + tx_bytes['buf'] \
+ + rx_dropped['buf'] \
+ + tx_dropped['buf'] \
+ + rx_errors['buf'] \
+ + tx_errors['buf'] \
+ + rx_frame_err['buf'] \
+ + rx_over_err['buf'] \
+ + rx_crc_err['buf'] \
+ + collisions['buf']
+
+ c = OFPPortStats(port_no['val'],
+ rx_packets['val'],
+ tx_packets['val'],
+ rx_bytes['val'],
+ tx_bytes['val'],
+ rx_dropped['val'],
+ tx_dropped['val'],
+ rx_errors['val'],
+ tx_errors['val'],
+ rx_frame_err['val'],
+ rx_over_err['val'],
+ rx_crc_err['val'],
+ collisions['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port_no['val'], self.c.port_no)
+ eq_(self.rx_packets['val'], self.c.rx_packets)
+ eq_(self.tx_packets['val'], self.c.tx_packets)
+ eq_(self.rx_bytes['val'], self.c.rx_bytes)
+ eq_(self.tx_bytes['val'], self.c.tx_bytes)
+ eq_(self.rx_dropped['val'], self.c.rx_dropped)
+ eq_(self.tx_dropped['val'], self.c.tx_dropped)
+ eq_(self.rx_errors['val'], self.c.rx_errors)
+ eq_(self.tx_errors['val'], self.c.tx_errors)
+ eq_(self.rx_frame_err['val'], self.c.rx_frame_err)
+ eq_(self.rx_over_err['val'], self.c.rx_over_err)
+ eq_(self.rx_crc_err['val'], self.c.rx_crc_err)
+ eq_(self.collisions['val'], self.c.collisions)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.port_no['val'], res.port_no)
+ eq_(self.rx_packets['val'], res.rx_packets)
+ eq_(self.tx_packets['val'], res.tx_packets)
+ eq_(self.rx_bytes['val'], res.rx_bytes)
+ eq_(self.tx_bytes['val'], res.tx_bytes)
+ eq_(self.rx_dropped['val'], res.rx_dropped)
+ eq_(self.tx_dropped['val'], res.tx_dropped)
+ eq_(self.rx_errors['val'], res.rx_errors)
+ eq_(self.tx_errors['val'], res.tx_errors)
+ eq_(self.rx_frame_err['val'], res.rx_frame_err)
+ eq_(self.rx_over_err['val'], res.rx_over_err)
+ eq_(self.rx_crc_err['val'], res.rx_crc_err)
+ eq_(self.collisions['val'], res.collisions)
+
+
+class TestOFPQueueStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueueStats
+ """
+
+ # OFP_QUEUE_STATS_PACK_STR
+ # '!H2xIQQQ...port_no, queue_id, tx_bytes, tx_packets, tx_errors
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ zfill = b'\x00' * 2
+ queue_id = {'buf': b'\x2a\xa8\x7f\x32', 'val': 715685682}
+ tx_bytes = {'buf': b'\x77\xe1\xd5\x63\x18\xae\x63\xaa',
+ 'val': 8638420181865882538}
+ tx_packets = {'buf': b'\x27\xa4\x41\xd7\xd4\x53\x9e\x42',
+ 'val': 2856480458895760962}
+ tx_errors = {'buf': b'\x57\x32\x08\x2f\x88\x32\x40\x6b',
+ 'val': 6283093430376743019}
+
+ c = OFPQueueStats(port_no['val'],
+ queue_id['val'],
+ tx_bytes['val'],
+ tx_packets['val'],
+ tx_errors['val'])
+
+ buf = port_no['buf'] \
+ + zfill \
+ + queue_id['buf'] \
+ + tx_bytes['buf'] \
+ + tx_packets['buf'] \
+ + tx_errors['buf']
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port_no['val'], self.c.port_no)
+ eq_(self.queue_id['val'], self.c.queue_id)
+ eq_(self.tx_bytes['val'], self.c.tx_bytes)
+ eq_(self.tx_packets['val'], self.c.tx_packets)
+ eq_(self.tx_errors['val'], self.c.tx_errors)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+
+ eq_(self.port_no['val'], res.port_no)
+ eq_(self.queue_id['val'], res.queue_id)
+ eq_(self.tx_bytes['val'], res.tx_bytes)
+ eq_(self.tx_packets['val'], res.tx_packets)
+ eq_(self.tx_errors['val'], res.tx_errors)
+
+
+class TestOFPVendorStats(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPVendorStats
+ """
+
+ specific_data = 'specific_data'
+ specific_data_after = 'data'
+ offset = specific_data.find(specific_data_after)
+
+ c = OFPVendorStats(specific_data)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.specific_data, self.c.specific_data)
+
+ def test_parser(self):
+ res = self.c.parser(self.specific_data, self.offset)
+ eq_(self.specific_data_after, res.specific_data)
+
+
+class TestOFPQueuePropNone(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueuePropNone
+ """
+
+ # OFP_QUEUE_PROP_HEADER_PACK_STR
+ # '!HH4x'...property_, len_
+ property = {'buf': b'\x00\x00', 'val': ofproto.OFPQT_NONE}
+ len = {'buf': b'\x00\x08', 'val': ofproto.OFP_QUEUE_PROP_HEADER_SIZE}
+ zfill = b'\x00' * 4
+
+ c = OFPQueuePropNone()
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ cls = OFPQueuePropHeader._QUEUE_PROPERTIES[self.c.cls_prop_type]
+
+ eq_(self.property['val'], self.c.cls_prop_type)
+ eq_(self.property['val'], self.c.property)
+ eq_(self.property['val'], cls.cls_prop_type)
+
+ eq_(self.len['val'], self.c.cls_prop_len)
+ eq_(self.len['val'], self.c.len)
+ eq_(self.len['val'], cls.cls_prop_len)
+
+ def test_parser(self):
+ buf = self.property['buf'] \
+ + self.len['buf'] \
+ + self.zfill
+
+ ok_(self.c.parser(buf, 0))
+
+
+class TestOFPQueuePropMinRate(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueuePropMinRate
+ """
+
+ # OFP_QUEUE_PROP_MIN_RATE_PACK_STR
+ # '!H6x'...rate
+ rate = {'buf': b'\x00\x01', 'val': ofproto.OFPQT_MIN_RATE}
+ len = {'buf': b'\x00\x10', 'val': ofproto.OFP_QUEUE_PROP_MIN_RATE_SIZE}
+ zfill = b'\x00' * 6
+
+ buf = rate['buf'] \
+ + zfill
+
+ c = OFPQueuePropMinRate(rate['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ cls = OFPQueuePropHeader._QUEUE_PROPERTIES[self.c.cls_prop_type]
+
+ eq_(self.rate['val'], self.c.cls_prop_type)
+ eq_(self.rate['val'], self.c.rate)
+ eq_(self.rate['val'], cls.cls_prop_type)
+
+ eq_(self.len['val'], self.c.cls_prop_len)
+ eq_(self.len['val'], cls.cls_prop_len)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.rate['val'], res.rate)
+
+
+class TestOFPPacketQueue(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPacketQueue
+ """
+
+ # OFP_PACKET_QUEUE_PQCK_STR
+ # '!IH2x'...queue_id, len_, zfill
+ queue_id = {'buf': b'\x4d\x4b\x3a\xd1', 'val': 1296775889}
+ len_ = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_QUEUE_PROP_HEADER_SIZE}
+ zfill = b'\x00' * 2
+
+ buf = queue_id['buf'] \
+ + len_['buf'] \
+ + zfill
+
+ c = OFPPacketQueue(queue_id['val'],
+ len_['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.queue_id['val'], self.c.queue_id)
+ eq_(self.len_['val'], self.c.len)
+
+ def test_parser(self):
+ res = self.c.parser(self.buf, 0)
+ eq_(self.queue_id['val'], res.queue_id)
+ eq_(self.len_['val'], res.len)
+
+ def test_parser_append_prop(self):
+ # OFP_QUEUE_PROP_HEADER_PACK_STR + OFP_QUEUE_PROP_MIN_RATE_PACK_STR
+ # '!HH4xH6x'...type, len, zfill, rate, zfill
+ len_ = {'buf': b'\x00\x10',
+ 'val': ofproto.OFP_QUEUE_PROP_MIN_RATE_SIZE}
+ a_type = {'buf': b'\x00\x01', 'val': ofproto.OFPQT_MIN_RATE}
+ a_len = {'buf': b'\x00\x10',
+ 'val': ofproto.OFP_QUEUE_PROP_MIN_RATE_SIZE}
+ a_zfill0 = b'\x00' * 4
+ a_rate = {'buf': b'\x00\x01', 'val': ofproto.OFPQT_MIN_RATE}
+ a_zfill1 = b'\x00' * 6
+
+ buf = self.queue_id['buf'] \
+ + len_['buf'] \
+ + self.zfill \
+ + a_type['buf'] \
+ + a_len['buf'] \
+ + a_zfill0 \
+ + a_rate['buf'] \
+ + a_zfill1
+
+ res = self.c.parser(buf, 0)
+
+ eq_(self.queue_id['val'], res.queue_id)
+ eq_(len_['val'], res.len)
+
+ append_cls = res.properties[0]
+
+ eq_(a_type['val'], append_cls.property)
+ eq_(a_len['val'], append_cls.len)
+ eq_(a_rate['val'], append_cls.rate)
+
+
+class TestOFPHello(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPHello
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = ofproto.OFP_VERSION
+ msg_type = ofproto.OFPT_HELLO
+ msg_len = ofproto.OFP_HEADER_SIZE
+ xid = 2183948390
+ data = b'\x00\x01\x02\x03'
+
+ fmt = ofproto.OFP_HEADER_PACK_STR
+ buf = struct.pack(fmt, version, msg_type, msg_len, xid) \
+ + data
+
+ res = OFPHello.parser(object, version, msg_type, msg_len, xid,
+ bytearray(buf))
+
+ eq_(version, res.version)
+ eq_(msg_type, res.msg_type)
+ eq_(msg_len, res.msg_len)
+ eq_(xid, res.xid)
+ eq_(six.binary_type(buf), six.binary_type(res.buf))
+
+ def test_serialize(self):
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPHello(Datapath)
+ c.serialize()
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_HELLO, c.msg_type)
+ eq_(0, c.xid)
+
+
+class TestOFPErrorMsg(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPErrorMsg
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x01', 'val': ofproto.OFPT_ERROR}
+ msg_len = {'buf': b'\x00\x0c',
+ 'val': ofproto.OFP_ERROR_MSG_SIZE}
+ xid = {'buf': b'\x87\x8b\x26\x7c', 'val': 2274043516}
+ type = {'buf': b'\xab\x3e', 'val': 43838}
+ code = {'buf': b'\x5d\x3c', 'val': 23868}
+ data = b'Error Message.'
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + type['buf'] \
+ + code['buf'] \
+ + data
+
+ res = OFPErrorMsg.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type['val'], res.type)
+ eq_(code['val'], res.code)
+ eq_(data, res.data)
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ type = 1306
+ code = 13774
+ data = b'Error Message.'
+
+ c = OFPErrorMsg(Datapath)
+ c.type = type
+ c.code = code
+ c.data = data
+
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_ERROR, c.msg_type)
+ eq_(0, c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_ERROR_MSG_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_ERROR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(type, res[4])
+ eq_(code, res[5])
+ eq_(data, res[6])
+
+
+class TestOFPEchoRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPEchoRequest
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x02', 'val': ofproto.OFPT_ECHO_REQUEST}
+ msg_len = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_HEADER_SIZE}
+ xid = {'buf': b'\x84\x47\xef\x3f', 'val': 2219306815}
+ data = b'Request Message.'
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + data
+
+ res = OFPEchoRequest.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(data, res.data)
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ data = b'Request Message.'
+
+ c = OFPEchoRequest(Datapath)
+ c.data = data
+
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_ECHO_REQUEST, c.msg_type)
+ eq_(0, c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_ECHO_REQUEST, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(data, res[4])
+
+
+class TestOFPEchoReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPEchoReply
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x03', 'val': ofproto.OFPT_ECHO_REPLY}
+ msg_len = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_HEADER_SIZE}
+ xid = {'buf': b'\x6e\x21\x3e\x62', 'val': 1847672418}
+ data = b'Reply Message.'
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + data
+
+ res = OFPEchoReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(data, res.data)
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ data = b'Reply Message.'
+
+ c = OFPEchoReply(Datapath)
+ c.data = data
+
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_ECHO_REPLY, c.msg_type)
+ eq_(0, c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_ECHO_REPLY, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(data, res[4])
+
+
+class TestOFPVendor(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPVendor
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x04', 'val': ofproto.OFPT_VENDOR}
+ msg_len = {'buf': b'\x00\x0c',
+ 'val': ofproto.OFP_VENDOR_HEADER_SIZE}
+ xid = {'buf': b'\x05\x45\xdf\x18', 'val': 88465176}
+ vendor = {'buf': b'\x53\xea\x25\x3e', 'val': 1407853886}
+ data = b'Vendor Message.'
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + vendor['buf'] \
+ + data
+
+ res = OFPVendor.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(vendor['val'], res.vendor)
+ eq_(data, res.data)
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ vendor = {'buf': b'\x38\x4b\xf9\x6c', 'val': 944503148}
+ data = b'Reply Message.'
+
+ c = OFPVendor(Datapath)
+ c.vendor = vendor['val']
+ c.data = data
+
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_VENDOR, c.msg_type)
+ eq_(0, c.xid)
+ eq_(vendor['val'], c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_VENDOR_HEADER_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(vendor['val'], res[4])
+ eq_(data, res[5])
+
+
+# class TestNXTRequest(unittest.TestCase):
+class TestNiciraHeader(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NiciraHeader
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ subtype = ofproto.NXT_FLOW_MOD_TABLE_ID
+
+ c = NiciraHeader(object, subtype)
+ eq_(subtype, c.subtype)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ data = b'Reply Message.'
+ subtype = ofproto.NXT_FLOW_MOD_TABLE_ID
+
+ c = NiciraHeader(Datapath, subtype)
+ c.data = data
+
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_VENDOR, c.msg_type)
+ eq_(0, c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(subtype, res[5])
+ eq_(data, res[6])
+
+
+class TestNXTSetFlowFormat(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXTSetFlowFormat
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ flow_format = {'buf': b'\xdc\x6b\xf5\x24', 'val': 3698062628}
+
+ c = NXTSetFlowFormat(object, flow_format['val'])
+ eq_(flow_format['val'], c.format)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flow_format = {'buf': b'\x5a\x4e\x59\xad', 'val': 1515084205}
+
+ c = NXTSetFlowFormat(Datapath, flow_format['val'])
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_VENDOR, c.msg_type)
+ eq_(0, c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NX_SET_FLOW_FORMAT_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(ofproto.NXT_SET_FLOW_FORMAT, res[5])
+ eq_(flow_format['val'], res[6])
+
+
+class TestNXTFlowMod(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXTFlowMod
+ """
+
+ # NX_FLOW_MOD_PACK_STR
+ # '!Q4HI3H6x'...cokkie, command, idle_timeout, head_timeout,
+ # priority, buffer_id, out_port, flags, rule, zfill
+ cookie = {'buf': b'\x04\x56\x27\xad\xbd\x43\xd6\x83',
+ 'val': 312480851306993283}
+ command = {'buf': b'\x61\xaa', 'val': 25002}
+ idle_timeout = {'buf': b'\x4e\xff', 'val': 20223}
+ hard_timeout = {'buf': b'\x80\x16', 'val': 32790}
+ priority = {'buf': b'\x70\x5f', 'val': 28767}
+ buffer_id = {'buf': b'\x7b\x97\x3a\x09', 'val': 2073508361}
+ out_port = {'buf': b'\x11\x7d', 'val': 4477}
+ flags = {'buf': b'\x5c\xb9', 'val': 23737}
+ rule = nx_match.ClsRule()
+ zfill = b'\x00' * 6
+
+ port = {'buf': b'\x2a\xe0', 'val': 10976}
+ actions = [OFPActionOutput(port['val'])]
+
+ def _get_obj(self, append_action=False):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ actions = None
+ if append_action:
+ actions = self.actions
+
+ c = NXTFlowMod(Datapath,
+ self.cookie['val'],
+ self.command['val'],
+ self.idle_timeout['val'],
+ self.hard_timeout['val'],
+ self.priority['val'],
+ self.buffer_id['val'],
+ self.out_port['val'],
+ self.flags['val'],
+ self.rule,
+ actions)
+
+ return c
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ c = self._get_obj()
+
+ eq_(self.cookie['val'], c.cookie)
+ eq_(self.command['val'], c.command)
+ eq_(self.idle_timeout['val'], c.idle_timeout)
+ eq_(self.hard_timeout['val'], c.hard_timeout)
+ eq_(self.priority['val'], c.priority)
+ eq_(self.buffer_id['val'], c.buffer_id)
+ eq_(self.out_port['val'], c.out_port)
+ eq_(self.flags['val'], c.flags)
+ eq_(self.rule.__hash__(), c.rule.__hash__())
+
+ def test_init_append_actions(self):
+ c = self._get_obj(True)
+
+ action = c.actions[0]
+ eq_(ofproto.OFPAT_OUTPUT, action.type)
+ eq_(ofproto.OFP_ACTION_OUTPUT_SIZE, action.len)
+ eq_(self.port['val'], action.port)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ c = self._get_obj()
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_VENDOR, c.msg_type)
+ eq_(0, c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NX_FLOW_MOD_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(ofproto.NXT_FLOW_MOD, res[5])
+ eq_(self.cookie['val'], res[6])
+ eq_(self.command['val'], res[7])
+ eq_(self.idle_timeout['val'], res[8])
+ eq_(self.hard_timeout['val'], res[9])
+ eq_(self.priority['val'], res[10])
+ eq_(self.buffer_id['val'], res[11])
+ eq_(self.out_port['val'], res[12])
+ eq_(self.flags['val'], res[13])
+
+ def test_serialize_append_actions(self):
+ c = self._get_obj(True)
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_VENDOR, c.msg_type)
+ eq_(0, c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NX_FLOW_MOD_PACK_STR.replace('!', '') \
+ + ofproto.OFP_ACTION_OUTPUT_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(ofproto.NXT_FLOW_MOD, res[5])
+ eq_(self.cookie['val'], res[6])
+ eq_(self.command['val'], res[7])
+ eq_(self.idle_timeout['val'], res[8])
+ eq_(self.hard_timeout['val'], res[9])
+ eq_(self.priority['val'], res[10])
+ eq_(self.buffer_id['val'], res[11])
+ eq_(self.out_port['val'], res[12])
+ eq_(self.flags['val'], res[13])
+
+ # action
+ eq_(0, res[14])
+ eq_(ofproto.OFPAT_OUTPUT, res[15])
+ eq_(ofproto.OFP_ACTION_OUTPUT_SIZE, res[16])
+ eq_(self.port['val'], res[17])
+ eq_(0xffe5, res[18])
+
+
+class TestNXTRoleRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXTRoleRequest
+ """
+
+ # NX_ROLE_PACK_STR
+ # '!I'...role
+ role = {'buf': b'\x62\x81\x27\x61', 'val': 1652631393}
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = NXTRoleRequest(Datapath, role['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.role['val'], self.c.role)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_VENDOR, self.c.msg_type)
+ eq_(0, self.c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, self.c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NX_ROLE_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(ofproto.NXT_ROLE_REQUEST, res[5])
+ eq_(self.role['val'], res[6])
+
+
+class TestNXTFlowModTableId(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.NXTFlowModTableId
+ """
+
+ # NX_FLOW_MOD_TABLE_ID_PACK_STR
+ # '!B7x'...set_, zfill
+ set_ = {'buf': b'\x71', 'val': 113}
+ zfill = b'\x00' * 7
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = NXTFlowModTableId(Datapath, set_['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.set_['val'], self.c.set)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_VENDOR, self.c.msg_type)
+ eq_(0, self.c.xid)
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, self.c.vendor)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NICIRA_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.NX_FLOW_MOD_TABLE_ID_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_VENDOR, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+ eq_(ofproto_common.NX_EXPERIMENTER_ID, res[4])
+ eq_(ofproto.NXT_FLOW_MOD_TABLE_ID, res[5])
+ eq_(self.set_['val'], res[6])
+
+
+class TestOFPSwitchFeatures(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPSwitchFeatures
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPSwitchFeatures(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x06', 'val': ofproto.OFPT_FEATURES_REPLY}
+ msg_len_val = ofproto.OFP_SWITCH_FEATURES_SIZE \
+ + ofproto.OFP_PHY_PORT_SIZE
+ msg_len = {'buf': b'\x00\x4c', 'val': msg_len_val}
+ xid = {'buf': b'\xcc\x0a\x41\xd4', 'val': 3423224276}
+
+ # OFP_SWITCH_FEATURES_PACK_STR
+ # '!QIB3xII'...datapath_id, n_buffers, n_tables,
+ # zfill, capabilities, actions
+ datapath_id = {'buf': b'\x11\xa3\x72\x63\x61\xde\x39\x81',
+ 'val': 1270985291017894273}
+ n_buffers = {'buf': b'\x80\x14\xd7\xf6', 'val': 2148849654}
+ n_tables = {'buf': b'\xe4', 'val': 228}
+ zfill = b'\x00' * 3
+ capabilities = {'buf': b'\x69\x4f\xe4\xc2', 'val': 1766843586}
+ actions = {'buf': b'\x78\x06\xd9\x0c', 'val': 2013714700}
+
+ # OFP_PHY_PORT_PACK_STR
+ # '!H6s16sIIIIII'... port_no, hw_addr, name, config, state
+ # curr, advertised, supported, peer
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ hw_addr = '3c:d1:2b:8d:3f:d6'
+ name = b'name'.ljust(16)
+ config = {'buf': b'\x84\xb6\x8c\x53', 'val': 2226555987}
+ state = {'buf': b'\x64\x07\xfb\xc9', 'val': 1678244809}
+ curr = {'buf': b'\xa9\xe8\x0a\x2b', 'val': 2850556459}
+ advertised = {'buf': b'\x78\xb9\x7b\x72', 'val': 2025421682}
+ supported = {'buf': b'\x7e\x65\x68\xad', 'val': 2120575149}
+ peer = {'buf': b'\xa4\x5b\x8b\xed', 'val': 2757463021}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + datapath_id['buf'] \
+ + n_buffers['buf'] \
+ + n_tables['buf'] \
+ + zfill \
+ + capabilities['buf'] \
+ + actions['buf'] \
+ + port_no['buf'] \
+ + addrconv.mac.text_to_bin(hw_addr) \
+ + name \
+ + config['buf'] \
+ + state['buf'] \
+ + curr['buf'] \
+ + advertised['buf'] \
+ + supported['buf'] \
+ + peer['buf']
+
+ res = OFPSwitchFeatures.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(datapath_id['val'], res.datapath_id)
+ eq_(n_buffers['val'], res.n_buffers)
+ eq_(n_tables['val'], res.n_tables)
+ eq_(capabilities['val'], res.capabilities)
+ eq_(actions['val'], res.actions)
+
+ # port
+ port = res.ports[port_no['val']]
+ eq_(port_no['val'], port.port_no)
+ eq_(hw_addr, hw_addr)
+ eq_(name, port.name)
+ eq_(config['val'], port.config)
+ eq_(state['val'], port.state)
+ eq_(curr['val'], port.curr)
+ eq_(advertised['val'], port.advertised)
+ eq_(supported['val'], port.supported)
+ eq_(peer['val'], port.peer)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPPortStatus(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPortStatus
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPPortStatus(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x0c', 'val': ofproto.OFPT_PORT_STATUS}
+ msg_len = {'buf': b'\x00\x40',
+ 'val': ofproto.OFP_PORT_STATUS_SIZE}
+ xid = {'buf': b'\x06\x27\x8b\x7b', 'val': 103254907}
+
+ # OFP_PORT_STATUS_PACK_STR
+ # '!B7xH6s16sIIIIII'...reason, zfill, port_no, hw_addr,
+ # name, config, state, curr,
+ # advertised, supported, peer
+ reason = {'buf': b'\x71', 'val': 113}
+ zfill = b'\x00' * 7
+ port_no = {'buf': b'\x48\xd8', 'val': 18648}
+ hw_addr = '41:f7:a3:52:8f:6b'
+ name = b'name'.ljust(16)
+ config = {'buf': b'\xae\x73\x90\xec', 'val': 2926809324}
+ state = {'buf': b'\x41\x37\x32\x1d', 'val': 1094136349}
+ curr = {'buf': b'\xa9\x47\x13\x2c', 'val': 2840007468}
+ advertised = {'buf': b'\xce\x6b\x4a\x87', 'val': 3463137927}
+ supported = {'buf': b'\xb8\x06\x65\xa1', 'val': 3087426977}
+ peer = {'buf': b'\x6a\x11\x52\x39', 'val': 1779520057}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + reason['buf'] \
+ + zfill \
+ + port_no['buf'] \
+ + addrconv.mac.text_to_bin(hw_addr) \
+ + name \
+ + config['buf'] \
+ + state['buf'] \
+ + curr['buf'] \
+ + advertised['buf'] \
+ + supported['buf'] \
+ + peer['buf']
+
+ res = OFPPortStatus.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(reason['val'], res.reason)
+
+ # desc
+ desc = res.desc
+ eq_(port_no['val'], desc.port_no)
+ eq_(hw_addr, desc.hw_addr)
+ eq_(name, desc.name)
+ eq_(config['val'], desc.config)
+ eq_(state['val'], desc.state)
+ eq_(curr['val'], desc.curr)
+ eq_(advertised['val'], desc.advertised)
+ eq_(supported['val'], desc.supported)
+ eq_(peer['val'], desc.peer)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPPacketIn(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPacketIn
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPPacketIn(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def _test_parser(self, padding=False):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x0a', 'val': ofproto.OFPT_PACKET_IN}
+ msg_len = {'buf': b'\x00\x14',
+ 'val': ofproto.OFP_PACKET_IN_SIZE}
+ xid = {'buf': b'\xd0\x23\x8c\x34', 'val': 3491990580}
+
+ # OFP_PACKET_IN_PACK_STR
+ # '!IHHBx2x'...buffer_id, total_len,
+ # in_port, reason, zfill, data
+ buffer_id = {'buf': b'\xae\x73\x90\xec', 'val': 2926809324}
+ total_len = {'buf': b'\x00\x10', 'val': 16}
+ in_port = {'buf': b'\x08\x42', 'val': 2114}
+ reason = {'buf': b'\x43', 'val': 67}
+ zfill = b'\x00' * 1
+ if padding:
+ data = b'PACKET IN'.ljust(20)
+ else:
+ data = b'PACKET IN'.ljust(16)
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + buffer_id['buf'] \
+ + total_len['buf'] \
+ + in_port['buf'] \
+ + reason['buf'] \
+ + zfill \
+ + data
+
+ res = OFPPacketIn.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(buffer_id['val'], res.buffer_id)
+ eq_(total_len['val'], res.total_len)
+ eq_(in_port['val'], res.in_port)
+ eq_(reason['val'], res.reason)
+ eq_(data[0:16], res.data)
+
+ return True
+
+ def test_parser(self):
+ ok_(self._test_parser())
+
+ def test_parser_padding(self):
+ ok_(self._test_parser(True))
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPGetConfigReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPGetConfigReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPGetConfigReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x0a', 'val': ofproto.OFPT_GET_CONFIG_REPLY}
+ msg_len = {'buf': b'\x00\x14',
+ 'val': ofproto.OFP_SWITCH_CONFIG_SIZE}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ # OFP_SWITCH_CONFIG_PACK_STR
+ # '!HH'...flags, miss_send_len
+ flags = {'buf': b'\xa0\xe2', 'val': 41186}
+ miss_send_len = {'buf': b'\x36\x0e', 'val': 13838}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf'] \
+ + flags['buf'] \
+ + miss_send_len['buf']
+
+ res = OFPGetConfigReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(flags['val'], res.flags)
+ eq_(miss_send_len['val'], res.miss_send_len)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPBarrierReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPBarrierReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPBarrierReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x13', 'val': ofproto.OFPT_BARRIER_REPLY}
+ msg_len = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_HEADER_SIZE}
+ xid = {'buf': b'\x66\xc4\xc3\xac', 'val': 1724171180}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ res = OFPBarrierReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPFlowRemoved(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFlowRemoved
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPFlowRemoved(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x0a', 'val': ofproto.OFPT_FLOW_REMOVED}
+ msg_len = {'buf': b'\x00\x14',
+ 'val': ofproto.OFP_FLOW_REMOVED_SIZE}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...wildcards, in_port, dl_src, dl_dst, dl_vlan,
+ # dl_vlan_pcp, dl_type, nw_tos, nw_proto,
+ # nw_src, nw_dst, tp_src, tp_dst
+ wildcards = {'buf': b'\xd2\x71\x25\x23', 'val': 3530630435}
+ in_port = {'buf': b'\x37\x8b', 'val': 14219}
+ dl_src = b'\x7f\x85\xc4\x70\x12\xda'
+ dl_dst = b'\x0a\x51\x17\x58\xb0\xbb'
+ dl_vlan = {'buf': b'\xc1\xf9', 'val': 49657}
+ dl_vlan_pcp = {'buf': b'\x79', 'val': 121}
+ zfill0 = b'\x00'
+ dl_type = {'buf': b'\xa6\x9e', 'val': 42654}
+ nw_tos = {'buf': b'\xde', 'val': 222}
+ nw_proto = {'buf': b'\xe5', 'val': 229}
+ zfil11 = b'\x00' * 2
+ nw_src = {'buf': b'\x1b\x6d\x8d\x4b', 'val': 460164427}
+ nw_dst = {'buf': b'\xab\x25\xe1\x20', 'val': 2871386400}
+ tp_src = {'buf': b'\xd5\xc3', 'val': 54723}
+ tp_dst = {'buf': b'\x78\xb9', 'val': 30905}
+
+ buf += wildcards['buf'] \
+ + in_port['buf'] \
+ + dl_src \
+ + dl_dst \
+ + dl_vlan['buf'] \
+ + dl_vlan_pcp['buf'] \
+ + zfill0 \
+ + dl_type['buf'] \
+ + nw_tos['buf'] \
+ + nw_proto['buf'] \
+ + zfil11 \
+ + nw_src['buf'] \
+ + nw_dst['buf'] \
+ + tp_src['buf'] \
+ + tp_dst['buf']
+
+ # OFP_FLOW_REMOVED_PACK_STR0
+ # '!QHBxIIH2xQQ'...cookie, priority, reason, zfill,
+ # duration_sec, duration_nsec, idle_timeout,
+ # zfill, packet_count, byte_count
+ cookie = {'buf': b'\x02\x79\xba\x00\xef\xab\xee\x44',
+ 'val': 178378173441633860}
+ priority = {'buf': b'\x02\xce', 'val': 718}
+ reason = {'buf': b'\xa9', 'val': 169}
+ zfill0 = b'\x00' * 1
+ duration_sec = {'buf': b'\x86\x24\xa3\xba', 'val': 2250548154}
+ duration_nsec = {'buf': b'\x94\x94\xc2\x23', 'val': 2492776995}
+ idle_timeout = {'buf': b'\xeb\x7c', 'val': 60284}
+ zfill1 = b'\x00' * 2
+ packet_count = {'buf': b'\x5a\x0d\xf2\x03\x8e\x0a\xbb\x8d',
+ 'val': 6489108735192644493}
+ byte_count = {'buf': b'\x65\xc8\xd3\x72\x51\xb5\xbb\x7c',
+ 'val': 7334344481123449724}
+
+ buf += cookie['buf'] \
+ + priority['buf'] \
+ + reason['buf'] \
+ + zfill0 \
+ + duration_sec['buf'] \
+ + duration_nsec['buf'] \
+ + idle_timeout['buf'] \
+ + zfill1 \
+ + packet_count['buf'] \
+ + byte_count['buf']
+
+ res = OFPFlowRemoved.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(cookie['val'], res.cookie)
+ eq_(priority['val'], res.priority)
+ eq_(reason['val'], res.reason)
+ eq_(duration_sec['val'], res.duration_sec)
+ eq_(duration_nsec['val'], res.duration_nsec)
+ eq_(idle_timeout['val'], res.idle_timeout)
+ eq_(packet_count['val'], res.packet_count)
+ eq_(byte_count['val'], res.byte_count)
+
+ # match
+ match = res.match
+ eq_(wildcards['val'], match.wildcards)
+ eq_(in_port['val'], match.in_port)
+ eq_(dl_src, match.dl_src)
+ eq_(dl_dst, match.dl_dst)
+ eq_(dl_vlan['val'], match.dl_vlan)
+ eq_(dl_vlan_pcp['val'], match.dl_vlan_pcp)
+ eq_(dl_type['val'], match.dl_type)
+ eq_(nw_tos['val'], match.nw_tos)
+ eq_(nw_proto['val'], match.nw_proto)
+ eq_(nw_src['val'], match.nw_src)
+ eq_(nw_dst['val'], match.nw_dst)
+ eq_(tp_src['val'], match.tp_src)
+ eq_(tp_dst['val'], match.tp_dst)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPQueueGetConfigReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueueGetConfigReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPQueueGetConfigReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x0a',
+ 'val': ofproto.OFPT_QUEUE_GET_CONFIG_REPLY}
+ msg_len_val = ofproto.OFP_QUEUE_GET_CONFIG_REPLY_SIZE \
+ + ofproto.OFP_PACKET_QUEUE_SIZE
+ msg_len = {'buf': b'\x00\x14', 'val': msg_len_val}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_QUEUE_GET_CONFIG_REPLY_PACK_STR
+ # '!H6x'...port, zfill
+ port = {'buf': b'\xfe\x66', 'val': 65126}
+ zfill = b'\x00' * 6
+
+ buf += port['buf'] \
+ + zfill
+
+ # OFP_PACKET_QUEUE_PQCK_STR
+ # '!IH2x'...queue_id, len_, zfill
+ queue_id = {'buf': b'\x4d\x4b\x3a\xd1', 'val': 1296775889}
+ len_ = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_QUEUE_PROP_HEADER_SIZE}
+ zfill = b'\x00' * 2
+
+ buf += queue_id['buf'] \
+ + len_['buf'] \
+ + zfill
+
+ res = OFPQueueGetConfigReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(port['val'], res.port)
+
+ # queue
+ queue = res.queues[0]
+ eq_(queue_id['val'], queue.queue_id)
+ eq_(len_['val'], queue.len)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPDescStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPDescStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPDescStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_DESC_STATS_SIZE
+ msg_len = {'buf': b'\x04\x38', 'val': msg_len_val}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x00', 'val': ofproto.OFPST_DESC}
+ flags = {'buf': b'\x30\xd9', 'val': 12505}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPDescStats
+ # OFP_DESC_STATS_PACK_STR
+ # '!256s256s256s32s256s'...mfr_desc, hw_desc, sw_desc,
+ # serial_num, dp_desc
+ mfr_desc = b'mfr_desc'.ljust(256)
+ hw_desc = b'hw_desc'.ljust(256)
+ sw_desc = b'sw_desc'.ljust(256)
+ serial_num = b'serial_num'.ljust(32)
+ dp_desc = b'dp_desc'.ljust(256)
+
+ buf += mfr_desc \
+ + hw_desc \
+ + sw_desc \
+ + serial_num \
+ + dp_desc
+
+ res = OFPDescStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body
+ eq_(mfr_desc, body.mfr_desc)
+ eq_(hw_desc, body.hw_desc)
+ eq_(sw_desc, body.sw_desc)
+ eq_(serial_num, body.serial_num)
+ eq_(dp_desc, body.dp_desc)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPFlowStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFlowStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPFlowStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_FLOW_STATS_SIZE
+ msg_len = {'buf': b'\x00\x64', 'val': msg_len_val}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x01', 'val': ofproto.OFPST_FLOW}
+ flags = {'buf': b'\x95\xf4', 'val': 38388}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPFlowStats
+ # OFP_FLOW_STATS_0_PACK_STR
+ # '!HBx'...length, table_id, zfill
+ length = {'buf': b'\x00\x60', 'val': 96}
+ table_id = {'buf': b'\x51', 'val': 81}
+ zfill = b'\x00'
+
+ buf += length['buf'] \
+ + table_id['buf'] \
+ + zfill
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...
+ match = b'\x97\x7c\xa6\x1e' \
+ + b'\x5e\xa0' \
+ + b'\x70\x17\xdc\x80\x59\x9e' \
+ + b'\x79\xc6\x56\x87\x92\x28' \
+ + b'\xb1\x81' \
+ + b'\xbe' \
+ + b'\x00' \
+ + b'\x01\xab' \
+ + b'\x42' \
+ + b'\xfe' \
+ + b'\x00\x00' \
+ + b'\xa4\x5d\x5c\x42' \
+ + b'\xa2\x5c\x2e\x05' \
+ + b'\x5a\x94' \
+ + b'\x64\xd4'
+
+ buf += match
+
+ # OFP_FLOW_STATS_1_PACK_STR
+ # '!IIHHH6xQQQ'...duration_sec, duration_nsec, priority,
+ # idle_timeout, hard_timeout, zfill,
+ # cookie, packet_count, byte_count
+ duration_sec = {'buf': b'\x94\x19\xb3\xd2', 'val': 2484712402}
+ duration_nsec = {'buf': b'\xee\x66\xcf\x7c', 'val': 3999715196}
+ priority = {'buf': b'\xe1\xc0', 'val': 57792}
+ idle_timeout = {'buf': b'\x8e\x10', 'val': 36368}
+ hard_timeout = {'buf': b'\xd4\x99', 'val': 54425}
+ zfill = b'\x00' * 6
+ cookie = {'buf': b'\x0b\x01\xe8\xe5\xf0\x84\x8a\xe0',
+ 'val': 793171083674290912}
+ packet_count = {'buf': b'\x47\x5c\xc6\x05\x28\xff\x7c\xdb',
+ 'val': 5142202600015232219}
+ byte_count = {'buf': b'\x24\xe9\x4b\xee\xcb\x57\xd9\xc3',
+ 'val': 2659740543924820419}
+
+ buf += duration_sec['buf']
+ buf += duration_nsec['buf']
+ buf += priority['buf']
+ buf += idle_timeout['buf']
+ buf += hard_timeout['buf']
+ buf += zfill
+ buf += cookie['buf']
+ buf += packet_count['buf']
+ buf += byte_count['buf']
+
+ # <action>_PACK_STR...type_, len_ [others...]
+ type = {'buf': b'\x00\x00', 'val': ofproto.OFPAT_OUTPUT}
+ len = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+ port = {'buf': b'\x59\x2a', 'val': 22826}
+ max_len = {'buf': b'\x00\x08',
+ 'val': ofproto.OFP_ACTION_OUTPUT_SIZE}
+
+ buf += type['buf'] \
+ + len['buf'] \
+ + port['buf'] \
+ + max_len['buf']
+
+ res = OFPFlowStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(length['val'], body.length)
+ eq_(table_id['val'], body.table_id)
+ eq_(duration_sec['val'], body.duration_sec)
+ eq_(duration_nsec['val'], body.duration_nsec)
+ eq_(priority['val'], body.priority)
+ eq_(idle_timeout['val'], body.idle_timeout)
+ eq_(hard_timeout['val'], body.hard_timeout)
+ eq_(cookie['val'], body.cookie)
+ eq_(packet_count['val'], body.packet_count)
+ eq_(byte_count['val'], body.byte_count)
+
+ # action
+ action = body.actions[0]
+ eq_(type['val'], action.type)
+ eq_(len['val'], action.len)
+ eq_(port['val'], action.port)
+ eq_(max_len['val'], action.max_len)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPAggregateStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPAggregateStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPAggregateStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_AGGREGATE_STATS_REPLY_SIZE
+ msg_len = {'buf': b'\x00\x4c', 'val': msg_len_val}
+ xid = {'buf': b'\xc6\xd6\xce\x38', 'val': 3335966264}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x02', 'val': ofproto.OFPST_AGGREGATE}
+ flags = {'buf': b'\x65\x66', 'val': 25958}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPAggregateStats
+ # OFP_AGGREGATE_STATS_REPLY_PACK_STR
+ # '!QQI4x'...packet_count, byte_count, flow_count, zfill
+ packet_count = {'buf': b'\x43\x95\x1b\xfb\x0f\xf6\xa7\xdd',
+ 'val': 4869829337189623773}
+ byte_count = {'buf': b'\x36\xda\x2d\x80\x2a\x95\x35\xdd',
+ 'val': 3952521651464517085}
+ flow_count = {'buf': b'\xc3\x0d\xc3\xed', 'val': 3272459245}
+ zfill = b'\x00' * 4
+
+ buf += packet_count['buf'] \
+ + byte_count['buf'] \
+ + flow_count['buf'] \
+ + zfill
+
+ res = OFPAggregateStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(packet_count['val'], body.packet_count)
+ eq_(byte_count['val'], body.byte_count)
+ eq_(flow_count['val'], body.flow_count)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPTableStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPTableStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPTableStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_TABLE_STATS_SIZE
+ msg_len = {'buf': b'\x00\x4c', 'val': msg_len_val}
+ xid = {'buf': b'\xd6\xb4\x8d\xe6', 'val': 3602157030}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x03', 'val': ofproto.OFPST_TABLE}
+ flags = {'buf': b'\xb3\xf0', 'val': 46064}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPTableStats
+ # OFP_TABLE_STATS_PACK_STR
+ # '!B3x32sIIIQQ'...table_id, zfill, name, wildcards, max_entries,
+ # active_count, lookup_count, matched_count
+ table_id = {'buf': b'\x5b', 'val': 91}
+ zfill = b'\x00' * 3
+ name = b'name'.ljust(32)
+ wildcards = {'buf': b'\xc5\xaf\x6e\x12', 'val': 3316608530}
+ max_entries = {'buf': b'\x95\x6c\x78\x4d', 'val': 2506913869}
+ active_count = {'buf': b'\x78\xac\xa8\x1e', 'val': 2024581150}
+ lookup_count = {'buf': b'\x40\x1d\x9c\x39\x19\xec\xd4\x1c',
+ 'val': 4620020561814017052}
+ matched_count = {'buf': b'\x27\x35\x02\xb6\xc5\x5e\x17\x65',
+ 'val': 2825167325263435621}
+
+ buf += table_id['buf'] \
+ + zfill \
+ + name \
+ + wildcards['buf'] \
+ + max_entries['buf'] \
+ + active_count['buf'] \
+ + lookup_count['buf'] \
+ + matched_count['buf']
+
+ res = OFPTableStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(table_id['val'], body.table_id)
+ eq_(name, body.name)
+ eq_(wildcards['val'], body.wildcards)
+ eq_(max_entries['val'], body.max_entries)
+ eq_(active_count['val'], body.active_count)
+ eq_(lookup_count['val'], body.lookup_count)
+ eq_(matched_count['val'], body.matched_count)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPPortStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPortStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPPortStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_PORT_STATS_SIZE
+ msg_len = {'buf': b'\x00\x74', 'val': msg_len_val}
+ xid = {'buf': b'\xc2\xaf\x3d\xff', 'val': 3266264575}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x04', 'val': ofproto.OFPST_PORT}
+ flags = {'buf': b'\xda\xde', 'val': 56030}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPPortStats
+ # OFP_PORT_STATS_PACK_STR
+ # '!H6xQQQQQQQQQQQQ'... port_no, zfill, rx_packets, tx_packets,
+ # rx_bytes, tx_bytes, rx_dropped, tx_dropped,
+ # rx_errors, tx_errors, rx_frame_err,
+ # rx_over_err, rx_crc_err, collisions
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ zfill = b'\x00' * 6
+ rx_packets = {'buf': b'\x53\x44\x36\x61\xc4\x86\xc0\x37',
+ 'val': 5999980397101236279}
+ tx_packets = {'buf': b'\x27\xa4\x41\xd7\xd4\x53\x9e\x42',
+ 'val': 2856480458895760962}
+ rx_bytes = {'buf': b'\x55\xa1\x38\x60\x43\x97\x0d\x89',
+ 'val': 6170274950576278921}
+ tx_bytes = {'buf': b'\x77\xe1\xd5\x63\x18\xae\x63\xaa',
+ 'val': 8638420181865882538}
+ rx_dropped = {'buf': b'\x60\xe6\x20\x01\x24\xda\x4e\x5a',
+ 'val': 6982303461569875546}
+ tx_dropped = {'buf': b'\x09\x2d\x5d\x71\x71\xb6\x8e\xc7',
+ 'val': 661287462113808071}
+ rx_errors = {'buf': b'\x2f\x7e\x35\xb3\x66\x3c\x19\x0d',
+ 'val': 3422231811478788365}
+ tx_errors = {'buf': b'\x57\x32\x08\x2f\x88\x32\x40\x6b',
+ 'val': 6283093430376743019}
+ rx_frame_err = {'buf': b'\x0c\x28\x6f\xad\xce\x66\x6e\x8b',
+ 'val': 876072919806406283}
+ rx_over_err = {'buf': b'\x5a\x90\x8f\x9b\xfc\x82\x2e\xa0',
+ 'val': 6525873760178941600}
+ rx_crc_err = {'buf': b'\x73\x3a\x71\x17\xd6\x74\x69\x47',
+ 'val': 8303073210207070535}
+ collisions = {'buf': b'\x2f\x52\x0c\x79\x96\x03\x6e\x79',
+ 'val': 3409801584220270201}
+
+ buf += port_no['buf'] \
+ + zfill \
+ + rx_packets['buf'] \
+ + tx_packets['buf'] \
+ + rx_bytes['buf'] \
+ + tx_bytes['buf'] \
+ + rx_dropped['buf'] \
+ + tx_dropped['buf'] \
+ + rx_errors['buf'] \
+ + tx_errors['buf'] \
+ + rx_frame_err['buf'] \
+ + rx_over_err['buf'] \
+ + rx_crc_err['buf'] \
+ + collisions['buf']
+
+ res = OFPPortStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(port_no['val'], body.port_no)
+ eq_(rx_packets['val'], body.rx_packets)
+ eq_(tx_packets['val'], body.tx_packets)
+ eq_(rx_bytes['val'], body.rx_bytes)
+ eq_(tx_bytes['val'], body.tx_bytes)
+ eq_(rx_dropped['val'], body.rx_dropped)
+ eq_(tx_dropped['val'], body.tx_dropped)
+ eq_(rx_errors['val'], body.rx_errors)
+ eq_(tx_errors['val'], body.tx_errors)
+ eq_(rx_frame_err['val'], body.rx_frame_err)
+ eq_(rx_over_err['val'], body.rx_over_err)
+ eq_(rx_crc_err['val'], body.rx_crc_err)
+ eq_(collisions['val'], body.collisions)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPQueueStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueueStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPQueueStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ msg_len_val = ofproto.OFP_STATS_MSG_SIZE \
+ + ofproto.OFP_QUEUE_STATS_SIZE
+ msg_len = {'buf': b'\x00\x2c', 'val': msg_len_val}
+ xid = {'buf': b'\x19\xfc\x28\x6c', 'val': 435955820}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\x00\x05', 'val': ofproto.OFPST_QUEUE}
+ flags = {'buf': b'\x3b\x2b', 'val': 15147}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPQueueStats
+ # OFP_QUEUE_STATS_PACK_STR
+ # '!H2xIQQQ...port_no, queue_id, tx_bytes, tx_packets, tx_errors
+ port_no = {'buf': b'\xe7\x6b', 'val': 59243}
+ zfill = b'\x00' * 2
+ queue_id = {'buf': b'\x2a\xa8\x7f\x32', 'val': 715685682}
+ tx_bytes = {'buf': b'\x77\xe1\xd5\x63\x18\xae\x63\xaa',
+ 'val': 8638420181865882538}
+ tx_packets = {'buf': b'\x27\xa4\x41\xd7\xd4\x53\x9e\x42',
+ 'val': 2856480458895760962}
+ tx_errors = {'buf': b'\x57\x32\x08\x2f\x88\x32\x40\x6b',
+ 'val': 6283093430376743019}
+
+ buf += port_no['buf'] \
+ + zfill \
+ + queue_id['buf'] \
+ + tx_bytes['buf'] \
+ + tx_packets['buf'] \
+ + tx_errors['buf']
+
+ res = OFPQueueStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(port_no['val'], body.port_no)
+ eq_(queue_id['val'], body.queue_id)
+ eq_(tx_bytes['val'], body.tx_bytes)
+ eq_(tx_packets['val'], body.tx_packets)
+ eq_(tx_errors['val'], body.tx_errors)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPVendorStatsReply(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPVendorStatsReply
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPVendorStatsReply(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ version = {'buf': b'\x01', 'val': ofproto.OFP_VERSION}
+ msg_type = {'buf': b'\x11', 'val': ofproto.OFPT_STATS_REPLY}
+ # ofproto.OFP_STATS_MSG_SIZE + len(specific_data)
+ msg_len = {'buf': b'\x00\x18',
+ 'val': ofproto.OFP_STATS_MSG_SIZE + 12}
+ xid = {'buf': b'\x94\xc4\xd2\xcd', 'val': 2495926989}
+
+ buf = version['buf'] \
+ + msg_type['buf'] \
+ + msg_len['buf'] \
+ + xid['buf']
+
+ # OFP_STATS_MSG_PACK_STR
+ # '!HH'...type_, flags
+ type_ = {'buf': b'\xff\xff', 'val': ofproto.OFPST_VENDOR}
+ flags = {'buf': b'\x30\xd9', 'val': 12505}
+
+ buf += type_['buf'] \
+ + flags['buf']
+
+ # stats_type_cls = OFPVendorStats
+ specific_data = b'specific_data'
+
+ buf += specific_data
+
+ res = OFPVendorStatsReply.parser(object,
+ version['val'],
+ msg_type['val'],
+ msg_len['val'],
+ xid['val'],
+ buf)
+
+ eq_(version['val'], res.version)
+ eq_(msg_type['val'], res.msg_type)
+ eq_(msg_len['val'], res.msg_len)
+ eq_(xid['val'], res.xid)
+ eq_(type_['val'], res.type)
+ eq_(flags['val'], res.flags)
+
+ # body
+ body = res.body[0]
+ eq_(specific_data, body)
+
+ def test_serialize(self):
+ # Not used.
+ pass
+
+
+class TestOFPFeaturesRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFeaturesRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPFeaturesRequest(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_FEATURES_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = ofproto.OFP_HEADER_PACK_STR
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_FEATURES_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+
+class TestOFPGetConfigRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPGetConfigRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPGetConfigRequest(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_GET_CONFIG_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = ofproto.OFP_HEADER_PACK_STR
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_GET_CONFIG_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+
+class TestOFPSetConfig(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPSetConfig
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ # OFP_SWITCH_CONFIG_PACK_STR
+ # '!HH'...flags, miss_send_len
+ flags = {'buf': b'\xa0\xe2', 'val': 41186}
+ miss_send_len = {'buf': b'\x36\x0e', 'val': 13838}
+
+ c = OFPSetConfig(Datapath,
+ flags['val'],
+ miss_send_len['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.miss_send_len['val'], self.c.miss_send_len)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_SET_CONFIG, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_SWITCH_CONFIG_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_SET_CONFIG, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+ eq_(self.flags['val'], res[4])
+ eq_(self.miss_send_len['val'], res[5])
+
+
+class TestOFPPacketOut(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPacketOut
+ """
+
+ port = 0x2ae0
+ actions = [OFPActionOutput(port, max_len=0)]
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def _get_obj(self, buffer_id, in_port, data=None):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPPacketOut(Datapath,
+ buffer_id,
+ in_port,
+ self.actions,
+ data)
+ return c
+
+ def test_init(self):
+ buffer_id = 0xffffffff
+ in_port = 0x40455
+ data = 'Message'
+
+ c = self._get_obj(buffer_id, in_port, data)
+
+ eq_(buffer_id, c.buffer_id)
+ eq_(in_port, c.in_port)
+ eq_(data, c.data)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ buffer_id = 0xffffffff
+ in_port = 0x9e07
+ data = b'Message'
+
+ c = self._get_obj(buffer_id, in_port, data)
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_PACKET_OUT, c.msg_type)
+ eq_(0, c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_PACKET_OUT_PACK_STR.replace('!', '') \
+ + ofproto.OFP_ACTION_OUTPUT_PACK_STR.replace('!', '') \
+ + str(len(data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_PACKET_OUT, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_PACKET_OUT_PACK_STR
+ eq_(buffer_id, res[4])
+ eq_(in_port, res[5])
+ eq_(ofproto.OFP_ACTION_OUTPUT_SIZE, res[6])
+
+ # OFP_ACTION_OUTPUT_PACK_STR
+ eq_(ofproto.OFPAT_OUTPUT, res[7])
+ eq_(ofproto.OFP_ACTION_OUTPUT_SIZE, res[8])
+ eq_(self.port, res[9])
+ eq_(0, res[10])
+
+ # data
+ eq_(data, res[11])
+
+ @raises(AssertionError)
+ def test_serialize_check_buffer_id(self):
+ buffer_id = 0xffffff00
+ in_port = 0xaa92
+ data = 'Message'
+
+ c = self._get_obj(buffer_id, in_port, data)
+ c.serialize()
+
+
+class TestOFPFlowMod(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFlowMod
+ """
+
+ # OFP_FLOW_MOD_PACK_STR0
+ # '!QHHHHIHH'...cookie, command, idle_timeout, hard_timeout,
+ # priority, buffer_id, out_port, flags
+ cookie = {'buf': b'\x1d\x86\xce\x6e\x8d\xc0\xbe\xa8',
+ 'val': 2127614848199081640}
+ command = {'buf': b'\xe1\x55', 'val': 57685}
+ idle_timeout = {'buf': b'\xf3\x6d', 'val': 62317}
+ hard_timeout = {'buf': b'\x1c\xc5', 'val': 7365}
+ priority = {'buf': b'\x9c\xe3', 'val': 40163}
+ buffer_id = {'buf': b'\xf0\xa1\x80\x33', 'val': 4037115955}
+ out_port = {'buf': b'\xfe\x0d', 'val': 65037}
+ flags = {'buf': b'\x00\x87', 'val': 135}
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...wildcards, in_port, dl_src, dl_dst, dl_vlan,
+ # dl_vlan_pcp, dl_type, nw_tos, nw_proto,
+ # nw_src, nw_dst, tp_src, tp_dst
+ wildcards = {'buf': b'\xd2\x71\x25\x23', 'val': 3530630435}
+ in_port = {'buf': b'\x37\x8b', 'val': 14219}
+ dl_src = b'\xdf\xcf\xe1\x5d\xcf\xc0'
+ dl_dst = b'\x76\xb3\xfb\xc6\x21\x2f'
+ dl_vlan = {'buf': b'\xc1\xf9', 'val': 49657}
+ dl_vlan_pcp = {'buf': b'\x79', 'val': 121}
+ zfill0 = b'\x00'
+ dl_type = {'buf': b'\xa6\x9e', 'val': 42654}
+ nw_tos = {'buf': b'\xde', 'val': 222}
+ nw_proto = {'buf': b'\xe5', 'val': 229}
+ zfil11 = b'\x00' * 2
+ nw_src = {'buf': b'\x1b\x6d\x8d\x4b', 'val': 460164427}
+ nw_dst = {'buf': b'\xab\x25\xe1\x20', 'val': 2871386400}
+ tp_src = {'buf': b'\xd5\xc3', 'val': 54723}
+ tp_dst = {'buf': b'\x78\xb9', 'val': 30905}
+
+ match = OFPMatch(wildcards['val'],
+ in_port['val'],
+ dl_src,
+ dl_dst,
+ dl_vlan['val'],
+ dl_vlan_pcp['val'],
+ dl_type['val'],
+ nw_tos['val'],
+ nw_proto['val'],
+ nw_src['val'],
+ nw_dst['val'],
+ tp_src['val'],
+ tp_dst['val'])
+
+ port = 0x2ae0
+ actions = [OFPActionOutput(port, max_len=1000)]
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def _get_obj(self, actions=None):
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPFlowMod(Datapath,
+ self.match,
+ self.cookie['val'],
+ self.command['val'],
+ self.idle_timeout['val'],
+ self.hard_timeout['val'],
+ self.priority['val'],
+ self.buffer_id['val'],
+ self.out_port['val'],
+ self.flags['val'],
+ actions)
+
+ return c
+
+ def test_init(self):
+ c = self._get_obj()
+
+ eq_(self.cookie['val'], c.cookie)
+ eq_(self.command['val'], c.command)
+ eq_(self.idle_timeout['val'], c.idle_timeout)
+ eq_(self.hard_timeout['val'], c.hard_timeout)
+ eq_(self.priority['val'], c.priority)
+ eq_(self.buffer_id['val'], c.buffer_id)
+ eq_(self.out_port['val'], c.out_port)
+ eq_(self.flags['val'], c.flags)
+
+ def test_init_actions(self):
+ c = self._get_obj(self.actions)
+ action = c.actions[0]
+
+ eq_(self.port, action.port)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ c = self._get_obj(self.actions)
+ c.serialize()
+
+ eq_(ofproto.OFP_VERSION, c.version)
+ eq_(ofproto.OFPT_FLOW_MOD, c.msg_type)
+ eq_(0, c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_MATCH_PACK_STR.replace('!', '') \
+ + ofproto.OFP_FLOW_MOD_PACK_STR0.replace('!', '') \
+ + ofproto.OFP_ACTION_OUTPUT_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_FLOW_MOD, res[1])
+ eq_(len(c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_MATCH_PACK_STR
+ eq_(self.wildcards['val'], res[4])
+ eq_(self.in_port['val'], res[5])
+ eq_(self.dl_src, res[6])
+ eq_(self.dl_dst, res[7])
+ eq_(self.dl_vlan['val'], res[8])
+ eq_(self.dl_vlan_pcp['val'], res[9])
+ eq_(self.dl_type['val'], res[10])
+ eq_(self.nw_tos['val'], res[11])
+ eq_(self.nw_proto['val'], res[12])
+ eq_(self.nw_src['val'], res[13])
+ eq_(self.nw_dst['val'], res[14])
+ eq_(self.tp_src['val'], res[15])
+ eq_(self.tp_dst['val'], res[16])
+
+ # OFP_FLOW_MOD_PACK_STR0
+ eq_(self.cookie['val'], res[17])
+ eq_(self.command['val'], res[18])
+ eq_(self.idle_timeout['val'], res[19])
+ eq_(self.hard_timeout['val'], res[20])
+ eq_(self.priority['val'], res[21])
+ eq_(self.buffer_id['val'], res[22])
+ eq_(self.out_port['val'], res[23])
+ eq_(self.flags['val'], res[24])
+
+ # OFP_ACTION_OUTPUT_PACK_STR
+ eq_(ofproto.OFPAT_OUTPUT, res[25])
+ eq_(ofproto.OFP_ACTION_OUTPUT_SIZE, res[26])
+ eq_(self.port, res[27])
+ eq_(1000, res[28])
+
+
+class TestOFPBarrierRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPBarrierRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ c = OFPBarrierRequest(Datapath)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ pass
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_BARRIER_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = ofproto.OFP_HEADER_PACK_STR
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_BARRIER_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+
+class TestOFPQueueGetConfigRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueueGetConfigRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ # OFP_QUEUE_GET_CONFIG_REQUEST_PACK_STR
+ # '!H2x'...port, zfill
+ port = {'buf': b'\xa0\xe2', 'val': 41186}
+ zfill = b'\x00' * 2
+
+ c = OFPQueueGetConfigRequest(Datapath,
+ port['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(self.port['val'], self.c.port)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_QUEUE_GET_CONFIG_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ a = ofproto.OFP_HEADER_PACK_STR.replace('!', '')
+ b = ofproto.OFP_QUEUE_GET_CONFIG_REQUEST_PACK_STR.replace('!', '')
+ fmt = '!' + a + b
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_QUEUE_GET_CONFIG_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+ eq_(self.port['val'], res[4])
+
+
+class TestOFPDescStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPDescStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ c = OFPDescStatsRequest(Datapath, flags['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_DESC, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_DESC, res[4])
+ eq_(self.flags['val'], res[5])
+
+
+class TestOFPFlowStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPFlowStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...wildcards, in_port, dl_src, dl_dst, dl_vlan,
+ # dl_vlan_pcp, dl_type, nw_tos, nw_proto,
+ # nw_src, nw_dst, tp_src, tp_dst
+ wildcards = {'buf': b'\xd2\x71\x25\x23', 'val': 3530630435}
+ in_port = {'buf': b'\x37\x8b', 'val': 14219}
+ dl_src = b'\x58\xd0\x8a\x69\xa4\xfc'
+ dl_dst = b'\xb6\xe2\xef\xb1\xa6\x2d'
+ dl_vlan = {'buf': b'\xc1\xf9', 'val': 49657}
+ dl_vlan_pcp = {'buf': b'\x79', 'val': 121}
+ zfill0 = b'\x00'
+ dl_type = {'buf': b'\xa6\x9e', 'val': 42654}
+ nw_tos = {'buf': b'\xde', 'val': 222}
+ nw_proto = {'buf': b'\xe5', 'val': 229}
+ zfil11 = b'\x00' * 2
+ nw_src = {'buf': b'\x1b\x6d\x8d\x4b', 'val': 460164427}
+ nw_dst = {'buf': b'\xab\x25\xe1\x20', 'val': 2871386400}
+ tp_src = {'buf': b'\xd5\xc3', 'val': 54723}
+ tp_dst = {'buf': b'\x78\xb9', 'val': 30905}
+
+ match = OFPMatch(wildcards['val'],
+ in_port['val'],
+ dl_src,
+ dl_dst,
+ dl_vlan['val'],
+ dl_vlan_pcp['val'],
+ dl_type['val'],
+ nw_tos['val'],
+ nw_proto['val'],
+ nw_src['val'],
+ nw_dst['val'],
+ tp_src['val'],
+ tp_dst['val'])
+
+ # OFP_FLOW_STATS_REQUEST_ID_PORT_STR
+ # '!BxH'...table_id, zfill, out_port
+ table_id = {'buf': b'\xd1', 'val': 209}
+ zfill = b'\x00' * 1
+ out_port = {'buf': b'\xe4\x9a', 'val': 58522}
+
+ c = OFPFlowStatsRequest(Datapath,
+ flags['val'],
+ match,
+ table_id['val'],
+ out_port['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_FLOW, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.table_id['val'], self.c.table_id)
+ eq_(self.out_port['val'], self.c.out_port)
+
+ # match
+ match = self.c.match
+ eq_(self.match.__hash__(), match.__hash__())
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '') \
+ + ofproto.OFP_MATCH_PACK_STR.replace('!', '') \
+ + ofproto.OFP_FLOW_STATS_REQUEST_ID_PORT_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_FLOW, res[4])
+ eq_(self.flags['val'], res[5])
+
+ # OFP_MATCH_PACK_STR
+ eq_(self.wildcards['val'], res[6])
+ eq_(self.in_port['val'], res[7])
+ eq_(self.dl_src, res[8])
+ eq_(self.dl_dst, res[9])
+ eq_(self.dl_vlan['val'], res[10])
+ eq_(self.dl_vlan_pcp['val'], res[11])
+ eq_(self.dl_type['val'], res[12])
+ eq_(self.nw_tos['val'], res[13])
+ eq_(self.nw_proto['val'], res[14])
+ eq_(self.nw_src['val'], res[15])
+ eq_(self.nw_dst['val'], res[16])
+ eq_(self.tp_src['val'], res[17])
+ eq_(self.tp_dst['val'], res[18])
+
+ # OFP_FLOW_STATS_REQUEST_ID_PORT_STR
+ eq_(self.table_id['val'], res[19])
+ eq_(self.out_port['val'], res[20])
+
+
+class TestOFPAggregateStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPAggregateStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ # OFP_MATCH_PACK_STR
+ # '!IH6s6sHBxHBB2xIIHH'...wildcards, in_port, dl_src, dl_dst, dl_vlan,
+ # dl_vlan_pcp, dl_type, nw_tos, nw_proto,
+ # nw_src, nw_dst, tp_src, tp_dst
+ wildcards = {'buf': b'\xea\x66\x4a\xd4', 'val': 3932572372}
+ in_port = {'buf': b'\x64\xac', 'val': 25772}
+ dl_src = b'\x90\x13\x60\x5e\x20\x4d'
+ dl_dst = b'\xb5\x5d\x14\x5e\xb9\x22'
+ dl_vlan = {'buf': b'\x8b\xeb', 'val': 35819}
+ dl_vlan_pcp = {'buf': b'\xe8', 'val': 232}
+ zfill0 = b'\x00'
+ dl_type = {'buf': b'\62\xc9', 'val': 25289}
+ nw_tos = {'buf': b'\xb5', 'val': 181}
+ nw_proto = {'buf': b'\xc4', 'val': 196}
+ zfil11 = b'\x00' * 2
+ nw_src = {'buf': b'\xb7\xd1\xb7\xef', 'val': 3083974639}
+ nw_dst = {'buf': b'\x7c\xc6\x18\x15', 'val': 2093357077}
+ tp_src = {'buf': b'\x26\x9a', 'val': 9882}
+ tp_dst = {'buf': b'\x7a\x89', 'val': 31369}
+
+ match = OFPMatch(wildcards['val'],
+ in_port['val'],
+ dl_src,
+ dl_dst,
+ dl_vlan['val'],
+ dl_vlan_pcp['val'],
+ dl_type['val'],
+ nw_tos['val'],
+ nw_proto['val'],
+ nw_src['val'],
+ nw_dst['val'],
+ tp_src['val'],
+ tp_dst['val'])
+
+ # OFP_FLOW_STATS_REQUEST_ID_PORT_STR
+ # '!BxH'...table_id, zfill, out_port
+ table_id = {'buf': b'\xd1', 'val': 209}
+ zfill = b'\x00' * 1
+ out_port = {'buf': b'\xb5\xe8', 'val': 46568}
+
+ c = OFPAggregateStatsRequest(Datapath,
+ flags['val'],
+ match,
+ table_id['val'],
+ out_port['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_AGGREGATE, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.table_id['val'], self.c.table_id)
+ eq_(self.out_port['val'], self.c.out_port)
+
+ # match
+ match = self.c.match
+ eq_(self.match.__hash__(), match.__hash__())
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '') \
+ + ofproto.OFP_MATCH_PACK_STR.replace('!', '') \
+ + ofproto.OFP_FLOW_STATS_REQUEST_ID_PORT_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_AGGREGATE, res[4])
+ eq_(self.flags['val'], res[5])
+
+ # OFP_MATCH_PACK_STR
+ eq_(self.wildcards['val'], res[6])
+ eq_(self.in_port['val'], res[7])
+ eq_(self.dl_src, res[8])
+ eq_(self.dl_dst, res[9])
+ eq_(self.dl_vlan['val'], res[10])
+ eq_(self.dl_vlan_pcp['val'], res[11])
+ eq_(self.dl_type['val'], res[12])
+ eq_(self.nw_tos['val'], res[13])
+ eq_(self.nw_proto['val'], res[14])
+ eq_(self.nw_src['val'], res[15])
+ eq_(self.nw_dst['val'], res[16])
+ eq_(self.tp_src['val'], res[17])
+ eq_(self.tp_dst['val'], res[18])
+
+ # OFP_FLOW_STATS_REQUEST_ID_PORT_STR
+ eq_(self.table_id['val'], res[19])
+ eq_(self.out_port['val'], res[20])
+
+
+class TestOFPTableStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPTableStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ c = OFPTableStatsRequest(Datapath, flags['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_TABLE, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_TABLE, res[4])
+ eq_(self.flags['val'], res[5])
+
+
+class TestOFPPortStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPPortStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ # OFP_PORT_STATS_REQUEST_PACK_STR
+ # '!H6x'...port_no, zfill
+ port_no = {'buf': b'\x6d\x27', 'val': 27943}
+
+ c = OFPPortStatsRequest(Datapath,
+ flags['val'],
+ port_no['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_PORT, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.port_no['val'], self.c.port_no)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '') \
+ + ofproto.OFP_PORT_STATS_REQUEST_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_PORT, res[4])
+ eq_(self.flags['val'], res[5])
+
+ # OFP_PORT_STATS_REQUEST_PACK_STR
+ eq_(self.port_no['val'], res[6])
+
+
+class TestOFPQueueStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPQueueStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ # OFP_QUEUE_STATS_REQUEST_PACK_STR
+ # '!HxxI'...port_no, zfill, zfill, queue_id
+ port_no = {'buf': b'\x0c\x2d', 'val': 3117}
+ queue_id = {'buf': b'\x1b\xe6\xba\x36', 'val': 468105782}
+
+ c = OFPQueueStatsRequest(Datapath,
+ flags['val'],
+ port_no['val'],
+ queue_id['val'])
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_QUEUE, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.port_no['val'], self.c.port_no)
+ eq_(self.queue_id['val'], self.c.queue_id)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '') \
+ + ofproto.OFP_QUEUE_STATS_REQUEST_PACK_STR.replace('!', '')
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_QUEUE, res[4])
+ eq_(self.flags['val'], res[5])
+
+ # OFP_QUEUE_STATS_REQUEST_PACK_STR
+ eq_(self.port_no['val'], res[6])
+ eq_(self.queue_id['val'], res[7])
+
+
+class TestOFPVendorStatsRequest(unittest.TestCase):
+ """ Test case for ofproto_v1_0_parser.OFPVendorStatsRequest
+ """
+
+ class Datapath(object):
+ ofproto = ofproto # copy to class attribute
+ ofproto_parser = ofproto_v1_0_parser
+
+ flags = {'buf': b'\x00\x00', 'val': 0}
+
+ # OFP_VENDOR_STATS_MSG_PACK_STR
+ # '!I'...vendor
+ vendor = {'buf': b'\xff\xff\xff\xff', 'val': ofproto.OFPAT_VENDOR}
+
+ specific_data = b'specific_data'
+
+ c = OFPVendorStatsRequest(Datapath,
+ flags['val'],
+ vendor['val'],
+ specific_data)
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_init(self):
+ eq_(ofproto.OFPST_VENDOR, self.c.type)
+ eq_(self.flags['val'], self.c.flags)
+ eq_(self.vendor['val'], self.c.vendor)
+ eq_(self.specific_data, self.c.specific_data)
+
+ def test_parser(self):
+ # Not used.
+ pass
+
+ def test_serialize(self):
+ self.c.serialize()
+
+ eq_(ofproto.OFP_VERSION, self.c.version)
+ eq_(ofproto.OFPT_STATS_REQUEST, self.c.msg_type)
+ eq_(0, self.c.xid)
+
+ fmt = '!' \
+ + ofproto.OFP_HEADER_PACK_STR.replace('!', '') \
+ + ofproto.OFP_STATS_MSG_PACK_STR.replace('!', '') \
+ + ofproto.OFP_VENDOR_STATS_MSG_PACK_STR.replace('!', '') \
+ + str(len(self.specific_data)) + 's'
+
+ res = struct.unpack(fmt, six.binary_type(self.c.buf))
+
+ # OFP_HEADER_PACK_STR
+ eq_(ofproto.OFP_VERSION, res[0])
+ eq_(ofproto.OFPT_STATS_REQUEST, res[1])
+ eq_(len(self.c.buf), res[2])
+ eq_(0, res[3])
+
+ # OFP_STATS_MSG_PACK_STR
+ eq_(ofproto.OFPST_VENDOR, res[4])
+ eq_(self.flags['val'], res[5])
+
+ # OFP_VENDOR_STATS_MSG_PACK_STR
+ eq_(self.vendor['val'], res[6])
+
+ # specific_data
+ eq_(self.specific_data, res[7])