summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2012-07-28 10:24:46 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2012-07-28 10:24:46 +0900
commit8c1978942daaa96a5336aab3def2f94e37cccc5c (patch)
tree738b478a7abbe86f963b57eafac6c1c0505e593f
parentdfa4ab185a45cd647da16ee8a4c50eeb90615d2b (diff)
test: simplify integrated OVS test suite
This patch simplifies integrated OVS test suite. Currently, we wait for a barrier response before moving to the next test. However, we don't need. The logic works like the following: sending a flow mod (deleting all the flows) sending a barrier sending a flow mod (set up a flow to test) sending a barrier sending a flow stats Then the reply handler for flow stats verifies the result and move to the next. You can run a test suite like: $ ryu-manager ~/git/ryu/ryu/tests/integrated/test_add_flow_v12_actions.py We can try two more suites: ryu/tests/integrated/test_add_flow_v10.py ryu/tests/integrated/test_add_flow_v12_matches.py Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rwxr-xr-xbin/ryu-manager1
-rw-r--r--ryu/tests/integrated/test_add_flow_v10.py764
-rw-r--r--ryu/tests/integrated/test_add_flow_v12_actions.py1288
-rw-r--r--ryu/tests/integrated/test_add_flow_v12_matches.py2003
-rw-r--r--ryu/tests/integrated/tester.py380
5 files changed, 1099 insertions, 3337 deletions
diff --git a/bin/ryu-manager b/bin/ryu-manager
index cee0b416..cfa56a0a 100755
--- a/bin/ryu-manager
+++ b/bin/ryu-manager
@@ -32,7 +32,6 @@ from ryu import utils
from ryu.app import wsgi
from ryu.base.app_manager import AppManager
from ryu.controller import controller
-from ryu.tests.integrated import tester
FLAGS = gflags.FLAGS
diff --git a/ryu/tests/integrated/test_add_flow_v10.py b/ryu/tests/integrated/test_add_flow_v10.py
index 538e6aa7..ed6c7b73 100644
--- a/ryu/tests/integrated/test_add_flow_v10.py
+++ b/ryu/tests/integrated/test_add_flow_v10.py
@@ -17,691 +17,245 @@
import logging
-from ryu.controller import handler
-from ryu.controller.handler import set_ev_cls
from ryu.tests.integrated import tester
from ryu.ofproto import nx_match
LOG = logging.getLogger(__name__)
-_target = 'br-tester'
+class RunTest(tester.TestFlowBase):
+ """ Test case for add flows of1.0
+ """
-class RunTest(tester.RunTestBase):
+ def __init__(self, *args, **kwargs):
+ super(RunTest, self).__init__(*args, **kwargs)
+ self._verify = []
- def __init__(self):
- super(RunTest, self).__init__()
+ def add_action(self, dp, action):
+ rule = nx_match.ClsRule()
+ self.send_flow_mod(
+ dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
+ 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, action)
- def send_flow_mod(self, rule, cookie, command, idle_timeout, hard_timeout,
- priority=None, buffer_id=0xffffffff,
+ def add_rule(self, dp, rule):
+ self.send_flow_mod(
+ dp, rule, 0, dp.ofproto.OFPFC_ADD, 0, 0, None,
+ 0xffffffff, None, dp.ofproto.OFPFF_SEND_FLOW_REM, [])
+
+ def send_flow_mod(self, dp, rule, cookie, command, idle_timeout,
+ hard_timeout, priority=None, buffer_id=0xffffffff,
out_port=None, flags=0, actions=None):
if priority is None:
- priority = self.ofproto.OFP_DEFAULT_PRIORITY
+ priority = dp.ofproto.OFP_DEFAULT_PRIORITY
if out_port is None:
- out_port = self.ofproto.OFPP_NONE
+ out_port = dp.ofproto.OFPP_NONE
match_tuple = rule.match_tuple()
- match = self.ofproto_parser.OFPMatch(*match_tuple)
- flow_mod = self.ofproto_parser.OFPFlowMod(
- self.datapath, match, cookie, command, idle_timeout, hard_timeout,
- priority, buffer_id, out_port, flags, actions)
-
- self.datapath.send_msg(flow_mod)
-
- def test_action_output(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
- out_port = 2
- self.set_val('out_port', out_port)
+ match = dp.ofproto_parser.OFPMatch(*match_tuple)
- actions = [
- datapath.ofproto_parser.OFPActionOutput(out_port),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
+ m = dp.ofproto_parser.OFPFlowMod(
+ dp, match, cookie, command, idle_timeout, hard_timeout,
+ priority, buffer_id, out_port, flags, actions)
- def check_action_output(self):
- ovs_actions = {}
- out_port = self.get_val('out_port')
+ dp.send_msg(m)
+ def _verify_action(self, actions, type_, name, value):
try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_out_port = ovs_actions['output']
- except (KeyError, IndexError):
- ovs_out_port = ''
-
- if ovs_out_port == '' or int(ovs_out_port) != out_port:
- err = 'send_actions=[output:%s] ovs_actions=[%s]' \
- % (out_port, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_vlan_vid(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
- vlan_vid = 3
- self.set_val('vlan_vid', vlan_vid)
+ action = actions[0]
+ if action.cls_action_type != type_:
+ return "Action type error. send:%s, val:%s" \
+ % (type_, action.cls_action_type)
+ except IndexError:
+ return "Action is not setting."
+
+ f_value = None
+ if name:
+ try:
+ if isinstance(name, list):
+ f_value = [getattr(action, n) for n in name]
+ else:
+ f_value = getattr(action, name)
+ except AttributeError:
+ pass
+
+ if f_value != value:
+ return "Value error. send:%s=%s val:%s" \
+ % (name, value, f_value)
+ return True
+
+ def _verify_rule(self, rule, name, value):
+ f_value = getattr(rule, name)
+ if f_value != value:
+ return "Value error. send:%s=%s val:%s" \
+ % (name, value, f_value)
+ return True
+
+ def verify_default(self, dp, stats):
+ verify = self._verify
+ self._verify = []
+ match = stats[0].match
+ actions = stats[0].actions
+
+ if len(verify) == 2:
+ return self._verify_rule(match, *verify)
+ elif len(verify) == 3:
+ return self._verify_action(actions, *verify)
+ else:
+ return "self._verify is invalid."
+
+ # Test of Actions
+ def test_action_output(self, dp):
+ out_port = 2
+ self._verify = [dp.ofproto.OFPAT_OUTPUT,
+ 'port', out_port]
+ action = dp.ofproto_parser.OFPActionOutput(out_port)
+ self.add_action(dp, [action, ])
- actions = [
- datapath.ofproto_parser.OFPActionVlanVid(vlan_vid),
- ]
+ def test_rule_set_in_port(self, dp):
+ in_port = 32
+ self._verify = ['in_port', in_port]
rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_vlan_vid(self):
- ovs_actions = {}
- vlan_vid = self.get_val('vlan_vid')
+ rule.set_in_port(in_port)
+ self.add_rule(dp, rule)
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_vlan_vid = ovs_actions['mod_vlan_vid']
- except (KeyError, IndexError):
- ovs_vlan_vid = ''
-
- if ovs_vlan_vid == '' or int(ovs_vlan_vid) != vlan_vid:
- err = 'send_actions=[vlan_vid:%s] ovs_actions=[%s]' \
- % (vlan_vid, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_vlan_pcp(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ def test_action_vlan_vid(self, dp):
+ vlan_vid = 2
+ self._verify = [dp.ofproto.OFPAT_SET_VLAN_VID,
+ 'vlan_vid', vlan_vid]
+ action = dp.ofproto_parser.OFPActionVlanVid(vlan_vid)
+ self.add_action(dp, [action, ])
+ def test_action_vlan_pcp(self, dp):
vlan_pcp = 4
- self.set_val('vlan_pcp', vlan_pcp)
-
- actions = [
- datapath.ofproto_parser.OFPActionVlanPcp(vlan_pcp),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_vlan_pcp(self):
- ovs_actions = {}
- vlan_pcp = self.get_val('vlan_pcp')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_vlan_pcp = ovs_actions['mod_vlan_pcp']
- except (KeyError, IndexError):
- ovs_vlan_pcp = ''
-
- if ovs_vlan_pcp == '' or int(ovs_vlan_pcp) != vlan_pcp:
- err = 'send_actions=[vlan_vid:%s] ovs_actions=[%s]' \
- % (vlan_pcp, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_strip_vlan(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
- actions = [
- datapath.ofproto_parser.OFPActionStripVlan(),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_strip_vlan(self):
- ovs_actions = {}
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- except (KeyError, IndexError):
- pass
+ self._verify = [dp.ofproto.OFPAT_SET_VLAN_PCP,
+ 'vlan_pcp', vlan_pcp]
+ action = dp.ofproto_parser.OFPActionVlanPcp(vlan_pcp)
+ self.add_action(dp, [action, ])
- if not 'strip_vlan' in ovs_actions:
- err = 'send_actions=[strip_vlan] ovs_actions=[%s]' \
- % (self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_dl_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ def test_action_strip_vlan(self, dp):
+ vlan_pcp = 4
+ self._verify = [dp.ofproto.OFPAT_STRIP_VLAN,
+ None, None]
+ action = dp.ofproto_parser.OFPActionStripVlan()
+ self.add_action(dp, [action, ])
+ def test_action_set_dl_src(self, dp):
dl_src = '56:b3:42:04:b2:7a'
- self.set_val('dl_src', dl_src)
-
dl_src_bin = self.haddr_to_bin(dl_src)
- actions = [
- datapath.ofproto_parser.OFPActionSetDlSrc(dl_src_bin),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_dl_src(self):
- ovs_actions = {}
- dl_src = self.get_val('dl_src')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_dl_src = ovs_actions['mod_dl_src']
- except (KeyError, IndexError):
- ovs_dl_src = ''
-
- if ovs_dl_src == '' or ovs_dl_src != dl_src:
- err = 'send_actions=[dl_src:%s] ovs_actions=[%s]' \
- % (dl_src, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_dl_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_SET_DL_SRC,
+ 'dl_addr', dl_src_bin]
+ action = dp.ofproto_parser.OFPActionSetDlSrc(dl_src_bin)
+ self.add_action(dp, [action, ])
+ def test_action_set_dl_dst(self, dp):
dl_dst = 'c2:93:a2:fb:d0:f4'
- self.set_val('dl_dst', dl_dst)
-
dl_dst_bin = self.haddr_to_bin(dl_dst)
- actions = [
- datapath.ofproto_parser.OFPActionSetDlDst(dl_dst_bin),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_dl_dst(self):
- ovs_actions = {}
- dl_dst = self.get_val('dl_dst')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_dl_dst = ovs_actions['mod_dl_dst']
- except (KeyError, IndexError):
- ovs_dl_dst = ''
-
- if ovs_dl_dst == '' or ovs_dl_dst != dl_dst:
- err = 'send_actions=[dl_dst:%s] ovs_actions=[%s]' \
- % (dl_dst, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_nw_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_SET_DL_DST,
+ 'dl_addr', dl_dst_bin]
+ action = dp.ofproto_parser.OFPActionSetDlDst(dl_dst_bin)
+ self.add_action(dp, [action, ])
+ def test_action_set_nw_src(self, dp):
nw_src = '216.132.81.105'
- self.set_val('nw_src', nw_src)
-
nw_src_int = self.ipv4_to_int(nw_src)
+ self._verify = [dp.ofproto.OFPAT_SET_NW_SRC,
+ 'nw_addr', nw_src_int]
+ action = dp.ofproto_parser.OFPActionSetNwSrc(nw_src_int)
+ self.add_action(dp, [action, ])
- actions = [
- datapath.ofproto_parser.OFPActionSetNwSrc(nw_src_int),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_nw_src(self):
- ovs_actions = {}
- nw_src = self.get_val('nw_src')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_nw_src = ovs_actions['mod_nw_src']
- except (KeyError, IndexError):
- ovs_nw_src = ''
-
- if ovs_nw_src == '' or ovs_nw_src != nw_src:
- err = 'send_actions=[nw_src:%s] ovs_actions=[%s]' \
- % (nw_src, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_nw_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_action_set_nw_dst(self, dp):
nw_dst = '223.201.206.3'
- self.set_val('nw_dst', nw_dst)
-
nw_dst_int = self.ipv4_to_int(nw_dst)
+ self._verify = [dp.ofproto.OFPAT_SET_NW_DST,
+ 'nw_addr', nw_dst_int]
+ action = dp.ofproto_parser.OFPActionSetNwDst(nw_dst_int)
+ self.add_action(dp, [action, ])
- actions = [
- datapath.ofproto_parser.OFPActionSetNwDst(nw_dst_int),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_nw_dst(self):
- ovs_actions = {}
- nw_dst = self.get_val('nw_dst')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_nw_dst = ovs_actions['mod_nw_dst']
- except (KeyError, IndexError):
- ovs_nw_dst = ''
-
- if ovs_nw_dst == '' or ovs_nw_dst != nw_dst:
- err = 'send_actions=[nw_dst:%s] ovs_actions=[%s]' \
- % (nw_dst, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_nw_tos(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_action_set_nw_tos(self, dp):
nw_tos = 111
- self.set_val('nw_tos', nw_tos)
-
- actions = [
- datapath.ofproto_parser.OFPActionSetNwTos(nw_tos),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_nw_tos(self):
- ovs_actions = {}
- nw_tos = self.get_val('nw_tos')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_nw_tos = ovs_actions['mod_nw_tos']
- except (KeyError, IndexError):
- ovs_nw_tos = ''
-
- if ovs_nw_tos == '' or int(ovs_nw_tos) != nw_tos:
- err = 'send_actions=[nw_tos:%s] ovs_actions=[%s]' \
- % (nw_tos, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_tp_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_SET_NW_TOS,
+ 'tos', nw_tos]
+ action = dp.ofproto_parser.OFPActionSetNwTos(nw_tos)
+ self.add_action(dp, [action, ])
+ def test_action_set_tp_src(self, dp):
tp_src = 55420
- self.set_val('tp_src', tp_src)
-
- actions = [
- datapath.ofproto_parser.OFPActionSetTpSrc(tp_src),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_tp_src(self):
- ovs_actions = {}
- tp_src = self.get_val('tp_src')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_tp_src = ovs_actions['mod_tp_src']
- except (KeyError, IndexError):
- ovs_tp_src = ''
-
- if ovs_tp_src == '' or int(ovs_tp_src) != tp_src:
- err = 'send_actions=[tp_src:%s] ovs_actions=[%s]' \
- % (tp_src, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_set_tp_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_SET_TP_SRC,
+ 'tp', tp_src]
+ action = dp.ofproto_parser.OFPActionSetTpSrc(tp_src)
+ self.add_action(dp, [action, ])
+ def test_action_set_tp_dst(self, dp):
tp_dst = 15430
- self.set_val('tp_dst', tp_dst)
-
- actions = [
- datapath.ofproto_parser.OFPActionSetTpDst(tp_dst),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_set_tp_dst(self):
- ovs_actions = {}
- tp_dst = self.get_val('tp_dst')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_tp_dst = ovs_actions['mod_tp_dst']
- except (KeyError, IndexError):
- ovs_tp_dst = ''
-
- if ovs_tp_dst == '' or int(ovs_tp_dst) != tp_dst:
- err = 'send_actions=[tp_src:%s] ovs_actions=[%s]' \
- % (tp_dst, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_action_enqueue(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_SET_TP_DST,
+ 'tp', tp_dst]
+ action = dp.ofproto_parser.OFPActionSetTpDst(tp_dst)
+ self.add_action(dp, [action, ])
+ def test_action_enqueue(self, dp):
port = 207
queue_id = 4287508753
- self.set_val('enqueue', str(port) + 'q' + str(queue_id))
-
- actions = [
- datapath.ofproto_parser.OFPActionEnqueue(port, queue_id),
- ]
-
- rule = nx_match.ClsRule()
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_action_enqueue(self):
- ovs_actions = {}
- enqueue = self.get_val('enqueue')
-
- try:
- ovs_actions = self.get_ovs_flows(_target)[0]['actions']
- ovs_enqueue = ovs_actions['enqueue']
- except (KeyError, IndexError):
- ovs_enqueue = ''
-
- if ovs_enqueue == '' or ovs_enqueue != enqueue:
- err = 'send_actions=[enqueue:%s] ovs_actions=[%s]' \
- % (enqueue, self.cnv_txt(ovs_actions))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_in_port(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self._verify = [dp.ofproto.OFPAT_ENQUEUE,
+ ['port', 'queue_id'], [port, queue_id]]
+ action = dp.ofproto_parser.OFPActionEnqueue(port, queue_id)
+ self.add_action(dp, [action, ])
+ # Test of Rules
+ def test_rule_set_in_port(self, dp):
in_port = 32
- self.set_val('in_port', in_port)
+ self._verify = ['in_port', in_port]
- actions = []
rule = nx_match.ClsRule()
rule.set_in_port(in_port)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_in_port(self):
- ovs_rules = {}
- in_port = self.get_val('in_port')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- ovs_in_port = ovs_rules['in_port']
- except (KeyError, IndexError):
- ovs_in_port = ''
-
- if ovs_in_port == '' or int(ovs_in_port) != in_port:
- err = 'send_rules=[in_port:%s] ovs_rules=[%s]' \
- % (in_port, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
+ self.add_rule(dp, rule)
+ def test_rule_set_dl_src(self, dp):
dl_src = 'b8:a1:94:51:78:83'
- self.set_val('dl_src', dl_src)
-
dl_src_bin = self.haddr_to_bin(dl_src)
+ self._verify = ['dl_src', dl_src_bin]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_src(dl_src_bin)
+ self.add_rule(dp, rule)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_src(self):
- ovs_rules = {}
- dl_src = self.get_val('dl_src')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- ovs_dl_src = ovs_rules['dl_src']
- except (KeyError, IndexError):
- ovs_dl_src = ''
-
- if ovs_dl_src == '' or ovs_dl_src != dl_src:
- err = 'send_rules=[dl_src:%s] ovs_rules=[%s]' \
- % (dl_src, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_type_ip(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_rule_set_dl_type_ip(self, dp):
dl_type = nx_match.ETH_TYPE_IP
- self.set_val('dl_type', 'ip')
+ self._verify = ['dl_type', dl_type]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_type(dl_type)
+ self.add_rule(dp, rule)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_type_ip(self):
- ovs_rules = {}
- dl_type = self.get_val('dl_type')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- except (KeyError, IndexError):
- pass
-
- if not dl_type in ovs_rules:
- err = 'send_rules=[dl_type:%s] ovs_rules=[%s]' \
- % (dl_type, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_type_arp(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_rule_set_dl_type_arp(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
- self.set_val('dl_type', 'arp')
+ self._verify = ['dl_type', dl_type]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_type(dl_type)
+ self.add_rule(dp, rule)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_type_arp(self):
- ovs_rules = {}
- dl_type = self.get_val('dl_type')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- except (KeyError, IndexError):
- pass
-
- if not dl_type in ovs_rules:
- err = 'send_rules=[dl_type:%s] ovs_rules=[%s]' \
- % (dl_type, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_type_vlan(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_rule_set_dl_type_vlan(self, dp):
dl_type = nx_match.ETH_TYPE_VLAN
- self.set_val('dl_type', nx_match.ETH_TYPE_VLAN)
+ self._verify = ['dl_type', dl_type]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_type(dl_type)
+ self.add_rule(dp, rule)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_type_vlan(self):
- ovs_rules = {}
- dl_type = self.get_val('dl_type')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- ovs_dl_type = ovs_rules['dl_type']
- except (KeyError, IndexError):
- ovs_dl_type = ''
-
- if ovs_dl_type == '' or int(ovs_dl_type, 16) != dl_type:
- err = 'send_rules=[dl_src:%s] ovs_rules=[%s]' \
- % (dl_type, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_type_ipv6(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_rule_set_dl_type_ipv6(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
- self.set_val('dl_type', 'ipv6')
+ self._verify = ['dl_type', dl_type]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_type(dl_type)
+ self.add_rule(dp, rule)
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_type_ipv6(self):
- ovs_rules = {}
- dl_type = self.get_val('dl_type')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- except (KeyError, IndexError):
- pass
-
- if not dl_type in ovs_rules:
- err = 'send_rules=[dl_type:%s] ovs_rules=[%s]' \
- % (dl_type, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
-
- def test_rule_set_dl_type_lacp(self):
- datapath = self.datapath
- ofproto = self.ofproto
-
+ def test_rule_set_dl_type_lacp(self, dp):
dl_type = nx_match.ETH_TYPE_LACP
- self.set_val('dl_type', nx_match.ETH_TYPE_LACP)
+ self._verify = ['dl_type', dl_type]
- actions = []
rule = nx_match.ClsRule()
rule.set_dl_type(dl_type)
-
- self.send_flow_mod(
- rule=rule, cookie=0, command=ofproto.OFPFC_ADD,
- idle_timeout=0, hard_timeout=0,
- priority=ofproto.OFP_DEFAULT_PRIORITY,
- flags=ofproto.OFPFF_SEND_FLOW_REM, actions=actions)
-
- def check_rule_set_dl_type_lacp(self):
- ovs_rules = {}
- dl_type = self.get_val('dl_type')
-
- try:
- ovs_rules = self.get_ovs_flows(_target)[0]['rules']
- ovs_dl_type = ovs_rules['dl_type']
- except (KeyError, IndexError):
- ovs_dl_type = ''
-
- if ovs_dl_type == '' or int(ovs_dl_type, 16) != dl_type:
- err = 'send_rules=[dl_src:%s] ovs_rules=[%s]' \
- % (dl_type, self.cnv_txt(ovs_rules))
- self.results(ret=False, msg=err)
- return
- self.results()
+ self.add_rule(dp, rule)
diff --git a/ryu/tests/integrated/test_add_flow_v12_actions.py b/ryu/tests/integrated/test_add_flow_v12_actions.py
index 261b769b..9e93bffa 100644
--- a/ryu/tests/integrated/test_add_flow_v12_actions.py
+++ b/ryu/tests/integrated/test_add_flow_v12_actions.py
@@ -17,15 +17,11 @@
import logging
-from ryu.controller import handler
-from ryu.controller.handler import set_ev_cls
-from ryu.tests.integrated import tester
from ryu.ofproto import nx_match
+from ryu.tests.integrated import tester
LOG = logging.getLogger(__name__)
-_target = 'br-tester'
-
IPPROTO_ICMP = 1
IPPROTO_TCP = 6
IPPROTO_UDP = 17
@@ -40,1024 +36,344 @@ IPPROTO_SCTP = 132
ETH_TYPE_MPLS = 0x8847
-class RunTest(tester.RunTestBase):
+class RunTest(tester.TestFlowBase):
""" Test case for add flows of Actions
"""
- def __init__(self):
- super(RunTest, self).__init__()
-
- def _set_val(self, type_, name, val):
- self.set_val('type', type_)
- self.set_val('name', name)
- self.set_val('val', val)
-
- def _get_val(self):
- type_ = self.get_val('type')
- name = self.get_val('name')
- val = self.get_val('val')
- return (type_, name, val)
-
- def _check_default(self):
- type_, name, val = self._get_val()
+ def __init__(self, *args, **kwargs):
+ super(RunTest, self).__init__(*args, **kwargs)
+
+ self._verify = []
+
+ def add_apply_actions(self, dp, actions):
+ inst = [dp.ofproto_parser.OFPInstructionActions(
+ dp.ofproto.OFPIT_APPLY_ACTIONS, actions)]
+
+ match = dp.ofproto_parser.OFPMatch()
+ m = dp.ofproto_parser.OFPFlowMod(dp, 0, 0, 0,
+ dp.ofproto.OFPFC_ADD,
+ 0, 0, 0xff, 0xffffffff,
+ dp.ofproto.OFPP_ANY,
+ dp.ofproto.OFPG_ANY,
+ 0, match, inst)
+ dp.send_msg(m)
+
+ def add_set_field_action(self, dp, field, value):
+ self._verify = [dp.ofproto.OFPAT_SET_FIELD,
+ 'field', field, value]
+ f = dp.ofproto_parser.OFPMatchField.make(field, value)
+ actions = [dp.ofproto_parser.OFPActionSetField(f), ]
+ self.add_apply_actions(dp, actions)
+
+ def verify_default(self, dp, stats):
+ verify = self._verify
+ self._verify = []
+
+ type_ = name = field = value = None
+ if len(verify) == 1:
+ (type_, ) = verify
+ elif len(verify) == 3:
+ (type_, name, value) = verify
+ elif len(verify) == 4:
+ (type_, name, field, value) = verify
+ else:
+ return "self._verify is invalid."
- ovs_flow = {}
try:
- ovs_flow = self.get_ovs_flows(_target)[0][type_]
- except (KeyError, IndexError):
- pass
-
- ng = 0
- if val == None:
- if name in ovs_flow:
- ng = 1
- elif not name in ovs_flow or val != ovs_flow[name]:
- ng = 1
-
- if ng:
- err = ' send (%s:%s=%s)\n flow (%s:%s)' \
- % (type_, name, val, type_, self.cnv_txt(ovs_flow))
- self.results(ret=False, msg=err)
- else:
- self.results()
+ action = stats[0].instructions[0].actions[0]
+ if action.cls_action_type != type_:
+ return "Action type error. send:%s, val:%s" \
+ % (type_, action.cls_action_type)
+ except IndexError:
+ return "Action is not setting."
+
+ s_val = None
+ if name:
+ try:
+ s_val = getattr(action, name)
+ except AttributeError:
+ pass
+
+ if name == 'field':
+ if s_val.header != field:
+ return "Field error. send:%s val:%s" \
+ % (field, s_val.field)
+ s_val = s_val.value
+
+ if name and s_val != value:
+ return "Value error. send:%s=%s val:%s" \
+ % (name, value, s_val)
+
+ return True
+
+ def verify_action_drop(self, dp, stats):
+ for s in stats:
+ for i in s.instructions:
+ if len(i.actions):
+ return "has actions. %s" % (i.actions)
+ return True
# Test of General Actions
- def test_action_output(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
+ def test_action_output(self, dp):
out_port = 255
- self._set_val('apply_actions',
- 'output',
- str(out_port))
-
- match = ofproto_parser.OFPMatch()
- actions = [
- ofproto_parser.OFPActionOutput(out_port, 0),
- ]
- inst = [
- ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions),
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_drop(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- self._set_val('apply_actions',
- 'drop',
- 'drop')
-
- match = ofproto_parser.OFPMatch()
- inst = [
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
+ self._verify = [dp.ofproto.OFPAT_OUTPUT,
+ 'port', out_port]
+
+ actions = [dp.ofproto_parser.OFPActionOutput(out_port, 0), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_drop(self, dp):
+ self.add_apply_actions(dp, [])
# Test of Push-Tag/Pop-Tag Actions
- def test_action_push_vlan(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- push_vlan = nx_match.ETH_TYPE_VLAN
- self._set_val('apply_actions',
- 'push_vlan',
- hex(push_vlan))
-
- match = ofproto_parser.OFPMatch()
- actions = [
- ofproto_parser.OFPActionPushVlan(push_vlan),
- ]
- inst = [
- ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions),
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_pop_vlan(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- self._set_val('apply_actions',
- 'pop_vlan',
- 'pop_vlan')
-
- match = ofproto_parser.OFPMatch()
- actions = [
- ofproto_parser.OFPActionPopVlan(),
- ]
- inst = [
- ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions),
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_push_mpls(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- push_mpls = ETH_TYPE_MPLS
- self._set_val('apply_actions',
- 'push_mpls',
- hex(push_mpls))
-
- match = ofproto_parser.OFPMatch()
- actions = [
- ofproto_parser.OFPActionPushMpls(push_mpls),
- ]
- inst = [
- ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions),
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_pop_mpls(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- pop_mpls = nx_match.ETH_TYPE_IP
- self._set_val('apply_actions',
- 'pop_mpls',
- hex(pop_mpls))
-
- match = ofproto_parser.OFPMatch()
- actions = [
- ofproto_parser.OFPActionPopMpls(pop_mpls),
- ]
- inst = [
- ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions),
- ]
-
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
+ def test_action_push_vlan(self, dp):
+ ethertype = nx_match.ETH_TYPE_VLAN
+ self._verify = [dp.ofproto.OFPAT_PUSH_VLAN,
+ 'ethertype', ethertype]
- # Test of Set-Filed Actions
- def test_action_set_field_dl_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ actions = [dp.ofproto_parser.OFPActionPushVlan(ethertype)]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_pop_vlan(self, dp):
+ self._verify = [dp.ofproto.OFPAT_POP_VLAN, ]
+
+ actions = [dp.ofproto_parser.OFPActionPopVlan(), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_push_mpls(self, dp):
+ ethertype = ETH_TYPE_MPLS
+ self._verify = [dp.ofproto.OFPAT_PUSH_MPLS,
+ 'ethertype', ethertype]
+
+ actions = [dp.ofproto_parser.OFPActionPushMpls(ethertype), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_pop_mpls(self, dp):
+ ethertype = nx_match.ETH_TYPE_VLAN
+ self._verify = [dp.ofproto.OFPAT_POP_MPLS,
+ 'ethertype', ethertype]
+ actions = [dp.ofproto_parser.OFPActionPopMpls(ethertype), ]
+ self.add_apply_actions(dp, actions)
+ # Test of Set-Filed Actions
+ def test_action_set_field_dl_dst(self, dp):
+ field = dp.ofproto.OXM_OF_ETH_DST
dl_dst = 'e2:7a:09:79:0b:0f'
- self._set_val('apply_actions',
- 'set_field',
- dl_dst + '->eth_dst')
-
- dl_dst_bin = self.haddr_to_bin(dl_dst)
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ETH_DST, dl_dst_bin)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_dl_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.haddr_to_bin(dl_dst)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_dl_src(self, dp):
+ field = dp.ofproto.OXM_OF_ETH_SRC
dl_src = '08:82:63:b6:62:05'
- self._set_val('apply_actions',
- 'set_field',
- dl_src + '->eth_src')
- dl_src_bin = self.haddr_to_bin(dl_src)
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ETH_SRC, dl_src_bin)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_dl_type(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- dl_type = nx_match.ETH_TYPE_IPV6
- self._set_val('apply_actions',
- 'set_field',
- hex(dl_type) + '->eth_type')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ETH_TYPE, dl_type)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_vlan_vid(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- dl_vlan = 0x1e4
- self._set_val('apply_actions',
- 'set_field',
- str(dl_vlan) + '->dl_vlan')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_VLAN_VID, dl_vlan)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_vlan_pcp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- vlan_pcp = 3
- self._set_val('apply_actions',
- 'set_field',
- str(vlan_pcp) + '->dl_vlan_pcp')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_VLAN_PCP, vlan_pcp)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_nw_dscp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- nw_dscp = 32
- self._set_val('apply_actions',
- 'set_field',
- str(nw_dscp) + '->nw_tos')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IP_DSCP, nw_dscp)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_nw_ecn(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- nw_ecn = 1
- self._set_val('apply_actions',
- 'set_field',
- str(nw_ecn) + '->nw_ecn')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IP_ECN, nw_ecn)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ip_proto(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- ip_proto = IPPROTO_TCP
- self._set_val('apply_actions',
- 'set_field',
- str(ip_proto) + '->nw_proto')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IP_PROTO, ip_proto)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv4_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.haddr_to_bin(dl_src)
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_dl_type(self, dp):
+ field = dp.ofproto.OXM_OF_ETH_TYPE
+ value = nx_match.ETH_TYPE_IPV6
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_vlan_vid(self, dp):
+ field = dp.ofproto.OXM_OF_VLAN_VID
+ value = 0x1e4
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_vlan_pcp(self, dp):
+ field = dp.ofproto.OXM_OF_VLAN_PCP
+ value = 3
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_nw_dscp(self, dp):
+ field = dp.ofproto.OXM_OF_IP_DSCP
+ value = 32
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_nw_ecn(self, dp):
+ field = dp.ofproto.OXM_OF_IP_ECN
+ value = 1
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ip_proto(self, dp):
+ field = dp.ofproto.OXM_OF_IP_PROTO
+ value = IPPROTO_TCP
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ipv4_src(self, dp):
+ field = dp.ofproto.OXM_OF_IPV4_SRC
ipv4_src = '192.168.3.92'
- self._set_val('apply_actions',
- 'set_field',
- ipv4_src + '->ip_src')
- ipv4_src_int = self.ipv4_to_int(ipv4_src)
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV4_SRC, ipv4_src_int)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv4_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv4_to_int(ipv4_src)
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ipv4_dst(self, dp):
+ field = dp.ofproto.OXM_OF_IPV4_DST
ipv4_dst = '192.168.74.122'
- self._set_val('apply_actions',
- 'set_field',
- ipv4_dst + '->ip_dst')
- ipv4_dst_int = self.ipv4_to_int(ipv4_dst)
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV4_DST, ipv4_dst_int)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_tcp_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- tcp_src = 105
- self._set_val('apply_actions',
- 'set_field',
- str(tcp_src) + '->tcp_src')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_TCP_SRC, tcp_src)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_tcp_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- tcp_dst = 75
- self._set_val('apply_actions',
- 'set_field',
- str(tcp_dst) + '->tcp_dst')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_TCP_DST, tcp_dst)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_udp_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- udp_src = 197
- self._set_val('apply_actions',
- 'set_field',
- str(udp_src) + '->udp_src')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_UDP_SRC, udp_src)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_udp_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- udp_dst = 17
- self._set_val('apply_actions',
- 'set_field',
- str(udp_dst) + '->udp_dst')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_UDP_DST, udp_dst)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_icmpv4_type(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- icmp_type = 8
- self._set_val('apply_actions',
- 'set_field',
- str(icmp_type) + '->icmp_type')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ICMPV4_TYPE, icmp_type)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_icmpv4_code(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- icmp_code = 2
- self._set_val('apply_actions',
- 'set_field',
- str(icmp_code) + '->icmp_code')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ICMPV4_CODE, icmp_code)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_arp_op(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- arp_op = 2
- self._set_val('apply_actions',
- 'set_field',
- str(arp_op) + '->arp_op')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ARP_OP, arp_op)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_arp_spa(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv4_to_int(ipv4_dst)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_tcp_src(self, dp):
+ field = dp.ofproto.OXM_OF_TCP_SRC
+ value = 105
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_tcp_dst(self, dp):
+ field = dp.ofproto.OXM_OF_TCP_DST
+ value = 75
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_udp_src(self, dp):
+ field = dp.ofproto.OXM_OF_UDP_SRC
+ value = 197
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_udp_dst(self, dp):
+ field = dp.ofproto.OXM_OF_UDP_DST
+ value = 17
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_icmpv4_type(self, dp):
+ field = dp.ofproto.OXM_OF_ICMPV4_TYPE
+ value = 8
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_icmpv4_code(self, dp):
+ field = dp.ofproto.OXM_OF_ICMPV4_CODE
+ value = 2
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_arp_op(self, dp):
+ field = dp.ofproto.OXM_OF_ARP_OP
+ value = 2
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_arp_spa(self, dp):
+ field = dp.ofproto.OXM_OF_ARP_SPA
nw_src = '192.168.132.179'
- nw_src_int = self.ipv4_to_int(nw_src)
- self._set_val('apply_actions',
- 'set_field',
- str(nw_src) + '->arp_spa')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ARP_SPA, nw_src_int)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_arp_tpa(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv4_to_int(nw_src)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_arp_tpa(self, dp):
+ field = dp.ofproto.OXM_OF_ARP_TPA
nw_dst = '192.168.118.85'
- nw_dst_int = self.ipv4_to_int(nw_dst)
- self._set_val('apply_actions',
- 'set_field',
- str(nw_dst) + '->arp_tpa')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ARP_TPA, nw_dst_int)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_arp_sha(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv4_to_int(nw_dst)
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_arp_sha(self, dp):
+ field = dp.ofproto.OXM_OF_ARP_SHA
arp_sha = '50:29:e7:7f:6c:7f'
- arp_sha_bin = self.haddr_to_bin(arp_sha)
- self._set_val('apply_actions',
- 'set_field',
- arp_sha + '->arp_sha')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ARP_SHA, arp_sha_bin)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_arp_tha(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.haddr_to_bin(arp_sha)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_arp_tha(self, dp):
+ field = dp.ofproto.OXM_OF_ARP_THA
arp_tha = '71:c8:72:2f:47:fd'
- arp_tha_bin = self.haddr_to_bin(arp_tha)
- self._set_val('apply_actions',
- 'set_field',
- arp_tha + '->arp_tha')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ARP_THA, arp_tha_bin)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.haddr_to_bin(arp_tha)
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ipv6_src(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_SRC
ipv6_src = '7527:c798:c772:4a18:117a:14ff:c1b6:e4ef'
- ipv6_src_int = self.ipv6_to_int(ipv6_src)
- self._set_val('apply_actions',
- 'set_field',
- ipv6_src + '->ipv6_src')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_SRC, ipv6_src_int)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv6_to_int(ipv6_src)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_ipv6_dst(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_DST
ipv6_dst = '8893:65b3:6b49:3bdb:3d2:9401:866c:c96'
- ipv6_dst_int = self.ipv6_to_int(ipv6_dst)
- self._set_val('apply_actions',
- 'set_field',
- ipv6_dst + '->ipv6_dst')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_DST, ipv6_dst_int)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_flabel(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- flabel = 0x2c12
- self._set_val('apply_actions',
- 'set_field',
- hex(flabel) + '->ipv6_label')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_FLABEL, flabel)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_icmpv6_type(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- icmp_type = 129
- self._set_val('apply_actions',
- 'set_field',
- str(icmp_type) + '->icmpv6_type')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ICMPV6_TYPE, icmp_type)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_icmpv6_code(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- icmp_code = 129
- self._set_val('apply_actions',
- 'set_field',
- str(icmp_code) + '->icmpv6_code')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_ICMPV6_CODE, icmp_code)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_nd_target(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv6_to_int(ipv6_dst)
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ipv6_flabel(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_FLABEL
+ value = 0x2c12
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_icmpv6_type(self, dp):
+ field = dp.ofproto.OXM_OF_ICMPV6_TYPE
+ value = 129
+
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_icmpv6_code(self, dp):
+ field = dp.ofproto.OXM_OF_ICMPV6_CODE
+ value = 2
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_ipv6_nd_target(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_ND_TARGET
target = "5420:db3f:921b:3e33:2791:98f:dd7f:2e19"
- target_int = self.ipv6_to_int(target)
- self._set_val('apply_actions',
- 'set_field',
- target + '->nd_target')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_ND_TARGET, target_int)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_nd_sll(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.ipv6_to_int(target)
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_ipv6_nd_sll(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_ND_SLL
sll = "54:db:3f:3e:27:19"
- sll_bin = self.haddr_to_bin(sll)
- self._set_val('apply_actions',
- 'set_field',
- sll + '->nd_sll')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_ND_SLL, sll_bin)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_ipv6_nd_tll(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ value = self.haddr_to_bin(sll)
+
+ self.add_set_field_action(dp, field, value)
+ def test_action_set_field_ipv6_nd_tll(self, dp):
+ field = dp.ofproto.OXM_OF_IPV6_ND_TLL
tll = "83:13:48:1e:d0:b0"
- tll_bin = self.haddr_to_bin(tll)
- self._set_val('apply_actions',
- 'set_field',
- tll + '->nd_tll')
-
- f1 = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_IPV6_ND_TLL, tll_bin)
- actions = [
- ofproto_parser.OFPActionSetField(f1),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_mpls_label(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- label = 0x4cd41
- self._set_val('apply_actions',
- 'set_field',
- str(label) + '->mpls_label')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_MPLS_LABEL, label)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_set_field_mpls_tc(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- tc = 0b101
- self._set_val('apply_actions',
- 'set_field',
- str(tc) + '->mpls_tc')
-
- field = ofproto_parser.OFPMatchField.make(
- ofproto.OXM_OF_MPLS_TC, tc)
- actions = [
- ofproto_parser.OFPActionSetField(field),
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
+ value = self.haddr_to_bin(tll)
- # Test of Change-TTL Actions
- def test_action_set_mpls_ttl(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_mpls_label(self, dp):
+ field = dp.ofproto.OXM_OF_MPLS_LABEL
+ value = 0x4cd41
+ self.add_set_field_action(dp, field, value)
+
+ def test_action_set_field_mpls_tc(self, dp):
+ field = dp.ofproto.OXM_OF_MPLS_TC
+ value = 0b101
+
+ self.add_set_field_action(dp, field, value)
+
+ # Test of Change-TTL Actions
+ def test_action_set_mpls_ttl(self, dp):
mpls_ttl = 8
- self._set_val('apply_actions',
- 'set_mpls_ttl',
- str(mpls_ttl))
-
- actions = [
- ofproto_parser.OFPActionSetMplsTtl(mpls_ttl)
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_dec_mpls_ttl(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- self._set_val('apply_actions',
- 'dec_mpls_ttl',
- 'dec_mpls_ttl')
- actions = [
- ofproto_parser.OFPActionDecMplsTtl()
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_copy_ttl_out(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- self._set_val('apply_actions',
- 'copy_ttl_out',
- 'copy_ttl_out')
- actions = [
- ofproto_parser.OFPActionCopyTtlOut()
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
-
- def test_action_copy_ttl_in(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- self._set_val('apply_actions',
- 'copy_ttl_in',
- 'copy_ttl_in')
- actions = [
- ofproto_parser.OFPActionCopyTtlIn()
- ]
- inst = [ofproto_parser.OFPInstructionActions(
- ofproto.OFPIT_APPLY_ACTIONS, actions)]
-
- match = ofproto_parser.OFPMatch()
- m = ofproto_parser.OFPFlowMod(
- datapath, 0, 0, 0, ofproto.OFPFC_ADD, 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff, 0, match, inst)
- datapath.send_msg(m)
+ self._verify = [dp.ofproto.OFPAT_SET_MPLS_TTL,
+ 'mpls_ttl', mpls_ttl]
+ actions = [dp.ofproto_parser.OFPActionSetMplsTtl(mpls_ttl), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_dec_mpls_ttl(self, dp):
+ self._verify = [dp.ofproto.OFPAT_DEC_MPLS_TTL]
+ actions = [dp.ofproto_parser.OFPActionDecMplsTtl(), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_copy_ttl_out(self, dp):
+ self._verify = [dp.ofproto.OFPAT_COPY_TTL_OUT]
+ actions = [dp.ofproto_parser.OFPActionCopyTtlOut(), ]
+ self.add_apply_actions(dp, actions)
+
+ def test_action_copy_ttl_in(self, dp):
+ self._verify = [dp.ofproto.OFPAT_COPY_TTL_IN]
+ actions = [dp.ofproto_parser.OFPActionCopyTtlIn(), ]
+ self.add_apply_actions(dp, actions)
diff --git a/ryu/tests/integrated/test_add_flow_v12_matches.py b/ryu/tests/integrated/test_add_flow_v12_matches.py
index fac9c3ec..cb26330f 100644
--- a/ryu/tests/integrated/test_add_flow_v12_matches.py
+++ b/ryu/tests/integrated/test_add_flow_v12_matches.py
@@ -16,16 +16,13 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import logging
+import itertools
-from ryu.controller import handler
-from ryu.controller.handler import set_ev_cls
-from ryu.tests.integrated import tester
from ryu.ofproto import nx_match
+from ryu.tests.integrated import tester
LOG = logging.getLogger(__name__)
-_target = 'br-tester'
-
IPPROTO_ICMP = 1
IPPROTO_TCP = 6
IPPROTO_UDP = 17
@@ -38,2000 +35,1070 @@ IPPROTO_DSTOPTS = 60
IPPROTO_SCTP = 132
-class RunTest(tester.RunTestBase):
+class RunTest(tester.TestFlowBase):
""" Test case for add flows of Matches
"""
- def __init__(self):
- super(RunTest, self).__init__()
-
- def _set_val(self, type_, name, val):
- self.set_val('type', type_)
- self.set_val('name', name)
- self.set_val('val', val)
-
- def _get_val(self):
- type_ = self.get_val('type')
- name = self.get_val('name')
- val = self.get_val('val')
- return (type_, name, val)
-
- def _check_default(self):
- type_, name, val = self._get_val()
-
- ovs_flow = {}
- try:
- ovs_flow = self.get_ovs_flows(_target)[0][type_]
- except (KeyError, IndexError):
- pass
-
- ng = 0
- if val == None:
- if name in ovs_flow:
- ng = 1
- elif not name in ovs_flow or val != ovs_flow[name]:
- ng = 1
-
- if ng:
- err = ' send (%s:%s=%s)\n flow (%s:%s)' \
- % (type_, name, val, type_, self.cnv_txt(ovs_flow))
- self.results(ret=False, msg=err)
+ def __init__(self, *args, **kwargs):
+ super(RunTest, self).__init__(*args, **kwargs)
+
+ self._verify = []
+
+ def add_matches(self, dp, match):
+ m = dp.ofproto_parser.OFPFlowMod(dp, 0, 0, 0,
+ dp.ofproto.OFPFC_ADD,
+ 0, 0, 0, 0xffffffff,
+ dp.ofproto.OFPP_ANY,
+ 0xffffffff, 0, match, [])
+ dp.send_msg(m)
+
+ def verify_default(self, dp, stats):
+ verify = self._verify
+ self._verify = []
+
+ headers = value = mask = None
+ if len(verify) == 3:
+ (headers, value, mask, ) = verify
else:
- self.results()
-
- def test_rule_set_in_port(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- in_port = 42123
- self._set_val('rules',
- 'in_port',
- str(in_port))
-
- match = ofproto_parser.OFPMatch()
- match.set_in_port(in_port)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ return "self._verify is invalid."
+
+ f_value = f_mask = None
+ for f in stats[0].match.fields:
+ if f.header in headers:
+ f_value = f.value
+ if len(headers) == 2:
+ f_mask = f.mask
+ break
+
+ if f_value == value and f_mask == mask:
+ return True
+ elif value == None:
+ return "Field[%s] is setting." % (headers, )
+ else:
+ return "Value error. send: (%s/%s), val:(%s/%s)" \
+ % (value, mask, f_value, f_mask)
+ def test_rule_set_dl_dst(self, dp):
dl_dst = 'e2:7a:09:79:0b:0f'
- self._set_val('rules',
- 'dl_dst',
- dl_dst)
dl_dst_bin = self.haddr_to_bin(dl_dst)
- match = ofproto_parser.OFPMatch()
- match.set_dl_dst(dl_dst_bin)
- inst = []
+ self._verify = [(dp.ofproto.OXM_OF_ETH_DST,
+ dp.ofproto.OXM_OF_ETH_DST_W, ),
+ dl_dst_bin, None]
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_dst_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_dst(dl_dst_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_dst_masked_ff(self, dp):
dl_dst = 'd0:98:79:b4:75:b5'
- self._set_val('rules',
- 'dl_dst',
- dl_dst)
dl_dst_bin = self.haddr_to_bin(dl_dst)
-
mask = 'ff:ff:ff:ff:ff:ff'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_dst_masked(dl_dst_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_dst_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self._verify = [(dp.ofproto.OXM_OF_ETH_DST,
+ dp.ofproto.OXM_OF_ETH_DST_W, ),
+ dl_dst_bin, None]
- dl_dst = 'a6:cf:40:9d:72:ec'
- mask = 'ff:ff:ff:ff:ff:00'
- self._set_val('rules',
- 'dl_dst',
- dl_dst[:-2] + '00' + '/' + mask)
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_dst_masked(dl_dst_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_dst_masked_f0(self, dp):
+ dl_dst = 'e2:7a:09:79:0b:0f'
dl_dst_bin = self.haddr_to_bin(dl_dst)
+ mask = 'ff:ff:ff:ff:ff:00'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_dst_masked(dl_dst_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_dst_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self._verify = [(dp.ofproto.OXM_OF_ETH_DST,
+ dp.ofproto.OXM_OF_ETH_DST_W, ),
+ dl_dst_bin[:-1] + '\x00', mask_bin]
- dl_dst = 'c6:12:6a:ae:da:0a'
- mask = '00:00:00:00:00:00'
- self._set_val('rules',
- 'dl_dst',
- None)
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_dst_masked(dl_dst_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_dst_masked_00(self, dp):
+ dl_dst = 'e2:7a:09:79:0b:0f'
dl_dst_bin = self.haddr_to_bin(dl_dst)
+ mask = '00:00:00:00:00:00'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_dst_masked(dl_dst_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
+ self._verify = [(dp.ofproto.OXM_OF_ETH_DST,
+ dp.ofproto.OXM_OF_ETH_DST_W, ),
+ None, None]
- def test_rule_set_dl_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_dst_masked(dl_dst_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_src(self, dp):
dl_src = 'e2:7a:09:79:0b:0f'
- self._set_val('rules',
- 'dl_src',
- dl_src)
dl_src_bin = self.haddr_to_bin(dl_src)
- match = ofproto_parser.OFPMatch()
+ self._verify = [(dp.ofproto.OXM_OF_ETH_SRC,
+ dp.ofproto.OXM_OF_ETH_SRC_W, ),
+ dl_src_bin, None]
+
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_src(dl_src_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_src_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- dl_src = 'd0:98:79:b4:75:b5'
- self._set_val('rules',
- 'dl_src',
- dl_src)
- dl_src_bin = self.haddr_to_bin(dl_src)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_src_masked_ff(self, dp):
+ dl_src = 'e2:7a:09:79:0b:0f'
+ dl_src_bin = self.haddr_to_bin(dl_src)
mask = 'ff:ff:ff:ff:ff:ff'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_src_masked(dl_src_bin, mask_bin)
- inst = []
+ self._verify = [(dp.ofproto.OXM_OF_ETH_SRC,
+ dp.ofproto.OXM_OF_ETH_SRC_W, ),
+ dl_src_bin, None]
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_src_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- dl_src = 'a6:cf:40:9d:72:ec'
- mask = 'ff:ff:ff:ff:ff:00'
- self._set_val('rules',
- 'dl_src',
- dl_src[:-2] + '00' + '/' + mask)
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_src_masked(dl_src_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_src_masked_f0(self, dp):
+ dl_src = 'e2:7a:09:79:0b:0f'
dl_src_bin = self.haddr_to_bin(dl_src)
+ mask = 'ff:ff:ff:ff:ff:00'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_src_masked(dl_src_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_src_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self._verify = [(dp.ofproto.OXM_OF_ETH_SRC,
+ dp.ofproto.OXM_OF_ETH_SRC_W, ),
+ dl_src_bin[:-1] + '\x00', mask_bin]
- dl_src = 'c6:12:6a:ae:da:0a'
- mask = '00:00:00:00:00:00'
- self._set_val('rules',
- 'dl_src',
- None)
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_src_masked(dl_src_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_src_masked_00(self, dp):
+ dl_src = 'e2:7a:09:79:0b:0f'
dl_src_bin = self.haddr_to_bin(dl_src)
+ mask = '00:00:00:00:00:00'
mask_bin = self.haddr_to_bin(mask)
- match = ofproto_parser.OFPMatch()
- match.set_dl_src_masked(dl_src_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
+ self._verify = [(dp.ofproto.OXM_OF_ETH_SRC,
+ dp.ofproto.OXM_OF_ETH_SRC_W, ),
+ None, None]
- def test_rule_set_dl_type_ip(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_dl_src_masked(dl_src_bin, mask_bin)
+ self.add_matches(dp, match)
+ def test_rule_set_dl_type_ip(self, dp):
dl_type = nx_match.ETH_TYPE_IP
- self._set_val('rules',
- 'ip',
- 'ip')
+ self._verify = [(dp.ofproto.OXM_OF_ETH_TYPE, ),
+ dl_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_type_arp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_dl_type_arp(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
- self._set_val('rules',
- 'arp',
- 'arp')
+ self._verify = [(dp.ofproto.OXM_OF_ETH_TYPE, ),
+ dl_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_type_vlan(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_dl_type_vlan(self, dp):
dl_type = nx_match.ETH_TYPE_VLAN
- self._set_val('rules',
- 'dl_type',
- hex(dl_type))
+ self._verify = [(dp.ofproto.OXM_OF_ETH_TYPE, ),
+ dl_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_type_ipv6(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_dl_type_ipv6(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
- self._set_val('rules',
- 'ipv6',
- 'ipv6')
+ self._verify = [(dp.ofproto.OXM_OF_ETH_TYPE, ),
+ dl_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_dl_type_lacp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_dl_type_lacp(self, dp):
dl_type = nx_match.ETH_TYPE_LACP
- self._set_val('rules',
- 'dl_type',
- hex(dl_type))
+ self._verify = [(dp.ofproto.OXM_OF_ETH_TYPE, ),
+ dl_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
+ self.add_matches(dp, match)
- def test_rule_set_ip_dscp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- dl_type = nx_match.ETH_TYPE_IP
+ def test_rule_set_ip_dscp(self, dp):
ip_dscp = 36
- self._set_val('rules',
- 'nw_tos',
- str(ip_dscp))
+ dl_type = nx_match.ETH_TYPE_IP
+ self._verify = [(dp.ofproto.OXM_OF_IP_DSCP, ),
+ ip_dscp, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_dscp(ip_dscp)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_vlan_vid(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
-
- vlan_vid = 0b101010101010
- self._set_val('rules',
- 'dl_vlan',
- str(vlan_vid))
-
- match = ofproto_parser.OFPMatch()
- match.set_vlan_vid(vlan_vid)
- inst = []
+ self.add_matches(dp, match)
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
+ def test_rule_set_vlan_vid(self, dp):
+ vlan_vid = 0x4ef
+ self._verify = [(dp.ofproto.OXM_OF_VLAN_VID,
+ dp.ofproto.OXM_OF_VLAN_VID_W, ),
+ vlan_vid, None]
- def test_rule_set_vlan_vid_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ match = dp.ofproto_parser.OFPMatch()
+ match.set_vlan_vid(vlan_vid)
+ self.add_matches(dp, match)
+ def test_rule_set_vlan_vid_masked_ff(self, dp):
vlan_vid = 0x4ef
mask = 0xfff
- self._set_val('rules',
- 'dl_vlan',
- str(vlan_vid))
+ self._verify = [(dp.ofproto.OXM_OF_VLAN_VID,
+ dp.ofproto.OXM_OF_VLAN_VID_W, ),
+ vlan_vid, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_vlan_vid_masked(vlan_vid, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_vlan_vid_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
- vlan_vid = 0x7f
+ def test_rule_set_vlan_vid_masked_f0(self, dp):
+ vlan_vid = 0x4ef
mask = 0xff0
- # OVS set CFI filed is '1'
- self._set_val('rules',
- 'vlan_tci',
- '0x1070/0x1ff0')
+ self._verify = [(dp.ofproto.OXM_OF_VLAN_VID,
+ dp.ofproto.OXM_OF_VLAN_VID_W, ),
+ vlan_vid & mask, mask]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_vlan_vid_masked(vlan_vid, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_vlan_vid_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_vlan_vid_masked_00(self, dp):
vlan_vid = 0x4ef
mask = 0x000
- self._set_val('rules',
- 'vlan_vid',
- None)
+ self._verify = [(dp.ofproto.OXM_OF_VLAN_VID,
+ dp.ofproto.OXM_OF_VLAN_VID_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_vlan_vid_masked(vlan_vid, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_vlan_pcp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
- vlan_vid = 4023
+ def test_rule_set_vlan_pcp(self, dp):
+ vlan_vid = 0x4ef
vlan_pcp = 5
- self._set_val('rules',
- 'dl_vlan_pcp',
- str(vlan_pcp))
+ self._verify = [(dp.ofproto.OXM_OF_VLAN_PCP, ),
+ vlan_pcp, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_vlan_vid(vlan_vid)
match.set_vlan_pcp(vlan_pcp)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_ecn(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_ecn(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_ecn = 3
- self._set_val('rules',
- 'nw_ecn',
- str(ip_ecn))
+ self._verify = [(dp.ofproto.OXM_OF_IP_ECN, ),
+ ip_ecn, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_ecn(ip_ecn)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_icmp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_icmp(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_ICMP
- self._set_val('rules',
- 'icmp',
- 'icmp')
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_tcp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_tcp(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_TCP
- self._set_val('rules',
- 'tcp',
- 'tcp')
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_udp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_udp(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_UDP
- self._set_val('rules',
- 'udp',
- 'udp')
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_ipv6_route(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_ipv6_route(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ROUTING
- self._set_val('rules',
- 'nw_proto',
- str(ip_proto))
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_ipv6_frag(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_ipv6_frag(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_FRAGMENT
- self._set_val('rules',
- 'nw_proto',
- str(ip_proto))
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_ipv6_icmp(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_ipv6_icmp(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
- self._set_val('rules',
- 'icmp6',
- 'icmp6')
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_ipv6_none(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_ipv6_none(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_NONE
- self._set_val('rules',
- 'nw_proto',
- str(ip_proto))
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ip_proto_ipv6_dstopts(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ip_proto_ipv6_dstopts(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_DSTOPTS
- self._set_val('rules',
- 'nw_proto',
- str(ip_proto))
+ self._verify = [(dp.ofproto.OXM_OF_IP_PROTO, ),
+ ip_proto, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_src(self, dp):
dl_type = nx_match.ETH_TYPE_IP
src = '192.168.196.250'
src_int = self.ipv4_to_int(src)
- self._set_val('rules',
- 'nw_src',
- src)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_SRC,
+ dp.ofproto.OXM_OF_IPV4_SRC_W, ),
+ src_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_src(src_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_src_masked_32(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_src_masked_32(self, dp):
dl_type = nx_match.ETH_TYPE_IP
- src = '192.168.98.73'
- mask = '255.255.255.255'
- self._set_val('rules',
- 'nw_src',
- src)
-
+ src = '192.168.196.250'
src_int = self.ipv4_to_int(src)
+ mask = '255.255.255.255'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_SRC,
+ dp.ofproto.OXM_OF_IPV4_SRC_W, ),
+ src_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_src_masked(src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_src_masked_24(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_src_masked_24(self, dp):
dl_type = nx_match.ETH_TYPE_IP
- src = '192.168.188.254'
- mask = '255.255.255.0'
- self._set_val('rules',
- 'nw_src',
- src[:-3] + '0/24')
-
+ src = '192.168.196.250'
src_int = self.ipv4_to_int(src)
+ mask = '255.255.255.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_SRC,
+ dp.ofproto.OXM_OF_IPV4_SRC_W, ),
+ src_int & mask_int, mask_int]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_src_masked(src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_src_masked_0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_src_masked_0(self, dp):
dl_type = nx_match.ETH_TYPE_IP
- src = '192.168.188.254'
- mask = '0.0.0.0'
- self._set_val('rules',
- 'nw_src',
- None)
-
+ src = '192.168.196.250'
src_int = self.ipv4_to_int(src)
+ mask = '0.0.0.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_SRC,
+ dp.ofproto.OXM_OF_IPV4_SRC_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_src_masked(src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_dst(self, dp):
dl_type = nx_match.ETH_TYPE_IP
dst = '192.168.54.155'
- self._set_val('rules',
- 'nw_dst',
- dst)
dst_int = self.ipv4_to_int(dst)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_DST,
+ dp.ofproto.OXM_OF_IPV4_DST_W, ),
+ dst_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_dst(dst_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_dst_masked_32(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_dst_masked_32(self, dp):
dl_type = nx_match.ETH_TYPE_IP
dst = '192.168.54.155'
- mask = '255.255.255.255'
- self._set_val('rules',
- 'nw_dst',
- dst)
-
dst_int = self.ipv4_to_int(dst)
+ mask = '255.255.255.255'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_DST,
+ dp.ofproto.OXM_OF_IPV4_DST_W, ),
+ dst_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_dst_masked(dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_dst_masked_24(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_dst_masked_24(self, dp):
dl_type = nx_match.ETH_TYPE_IP
dst = '192.168.54.155'
- mask = '255.255.255.0'
- self._set_val('rules',
- 'nw_dst',
- dst[:-3] + '0/24')
-
dst_int = self.ipv4_to_int(dst)
+ mask = '255.255.255.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_DST,
+ dp.ofproto.OXM_OF_IPV4_DST_W, ),
+ dst_int & mask_int, mask_int]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_dst_masked(dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv4_dst_masked_0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv4_dst_masked_0(self, dp):
dl_type = nx_match.ETH_TYPE_IP
dst = '192.168.54.155'
- mask = '0.0.0.0'
- self._set_val('rules',
- 'nw_dst',
- None)
-
dst_int = self.ipv4_to_int(dst)
+ mask = '0.0.0.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_IPV4_DST,
+ dp.ofproto.OXM_OF_IPV4_DST_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv4_dst_masked(dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_tcp_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_tcp_src(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_TCP
tp_src = 1103
- self._set_val('rules',
- 'tp_src',
- str(tp_src))
+ self._verify = [(dp.ofproto.OXM_OF_TCP_SRC, ),
+ tp_src, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_tcp_src(tp_src)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_tcp_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_tcp_dst(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_TCP
tp_dst = 236
- self._set_val('rules',
- 'tp_dst',
- str(tp_dst))
+ self._verify = [(dp.ofproto.OXM_OF_TCP_DST, ),
+ tp_dst, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_tcp_dst(tp_dst)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_udp_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_udp_src(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_UDP
tp_src = 56617
- self._set_val('rules',
- 'tp_src',
- str(tp_src))
+ self._verify = [(dp.ofproto.OXM_OF_UDP_SRC, ),
+ tp_src, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_udp_src(tp_src)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_udp_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_udp_dst(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_UDP
tp_dst = 61278
- self._set_val('rules',
- 'tp_dst',
- str(tp_dst))
+ self._verify = [(dp.ofproto.OXM_OF_UDP_DST, ),
+ tp_dst, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_udp_dst(tp_dst)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_icmpv4_type(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_icmpv4_type(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_ICMP
- # type = 8: Echo Request
icmp_type = 8
- self._set_val('rules',
- 'icmp_type',
- str(icmp_type))
+ self._verify = [(dp.ofproto.OXM_OF_ICMPV4_TYPE, ),
+ icmp_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv4_type(icmp_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_icmpv4_code(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_icmpv4_code(self, dp):
dl_type = nx_match.ETH_TYPE_IP
ip_proto = IPPROTO_ICMP
- # type = 9 : Router Advertisement
- # code = 16: Does not route common traffic
icmp_type = 9
icmp_code = 16
- self._set_val('rules',
- 'icmp_code',
- str(icmp_code))
+ self._verify = [(dp.ofproto.OXM_OF_ICMPV4_CODE, ),
+ icmp_code, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv4_type(icmp_type)
match.set_icmpv4_code(icmp_code)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_opcode(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_opcode(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_op = 1
- self._set_val('rules',
- 'arp_op',
- str(arp_op))
+ self._verify = [(dp.ofproto.OXM_OF_ARP_OP, ),
+ arp_op, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_opcode(arp_op)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_spa(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_spa(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_src = '192.168.222.57'
nw_src_int = self.ipv4_to_int(nw_src)
- self._set_val('rules',
- 'nw_src',
- nw_src)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SPA,
+ dp.ofproto.OXM_OF_ARP_SPA_W, ),
+ nw_src_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_spa(nw_src_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_spa_masked_32(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_spa_masked_32(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_src = '192.168.222.57'
- mask = '255.255.255.255'
nw_src_int = self.ipv4_to_int(nw_src)
+ mask = '255.255.255.255'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SPA,
+ dp.ofproto.OXM_OF_ARP_SPA_W, ),
+ nw_src_int, None]
- self._set_val('rules',
- 'nw_src',
- nw_src)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_spa_masked(nw_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_spa_masked_24(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_spa_masked_24(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_src = '192.168.222.57'
- mask = '255.255.255.0'
nw_src_int = self.ipv4_to_int(nw_src)
+ mask = '255.255.255.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SPA,
+ dp.ofproto.OXM_OF_ARP_SPA_W, ),
+ nw_src_int & mask_int, mask_int]
- self._set_val('rules',
- 'nw_src',
- nw_src[:-2] + '0/24')
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_spa_masked(nw_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_spa_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_spa_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_src = '192.168.222.57'
- mask = '0.0.0.0'
nw_src_int = self.ipv4_to_int(nw_src)
+ mask = '0.0.0.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SPA,
+ dp.ofproto.OXM_OF_ARP_SPA_W, ),
+ None, None]
- self._set_val('rules',
- 'nw_src',
- None)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_spa_masked(nw_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tpa(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tpa(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_dst = '192.168.198.233'
nw_dst_int = self.ipv4_to_int(nw_dst)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_TPA,
+ dp.ofproto.OXM_OF_ARP_TPA_W, ),
+ nw_dst_int, None]
- self._set_val('rules',
- 'nw_dst',
- nw_dst)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tpa(nw_dst_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tpa_masked_32(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tpa_masked_32(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_dst = '192.168.198.233'
- mask = '255.255.255.255'
nw_dst_int = self.ipv4_to_int(nw_dst)
+ mask = '255.255.255.255'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_TPA,
+ dp.ofproto.OXM_OF_ARP_TPA_W, ),
+ nw_dst_int, None]
- self._set_val('rules',
- 'nw_dst',
- nw_dst)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tpa_masked(nw_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tpa_masked_24(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tpa_masked_24(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_dst = '192.168.198.233'
- mask = '255.255.255.0'
nw_dst_int = self.ipv4_to_int(nw_dst)
+ mask = '255.255.255.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_TPA,
+ dp.ofproto.OXM_OF_ARP_TPA_W, ),
+ nw_dst_int & mask_int, mask_int]
- self._set_val('rules',
- 'nw_dst',
- nw_dst[:-3] + '0/24')
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tpa_masked(nw_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tpa_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tpa_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
nw_dst = '192.168.198.233'
- mask = '0.0.0.0'
nw_dst_int = self.ipv4_to_int(nw_dst)
+ mask = '0.0.0.0'
mask_int = self.ipv4_to_int(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_TPA,
+ dp.ofproto.OXM_OF_ARP_TPA_W, ),
+ None, None]
- self._set_val('rules',
- 'nw_dst',
- None)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tpa_masked(nw_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_sha(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_sha(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_sha = '3e:ec:13:9b:f3:0b'
arp_sha_bin = self.haddr_to_bin(arp_sha)
- self._set_val('rules',
- 'arp_sha',
- arp_sha)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SHA,
+ dp.ofproto.OXM_OF_ARP_SHA_W, ),
+ arp_sha_bin, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_sha(arp_sha_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_sha_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_sha_masked_ff(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_sha = '3e:ec:13:9b:f3:0b'
- mask = 'ff:ff:ff:ff:ff:ff'
arp_sha_bin = self.haddr_to_bin(arp_sha)
+ mask = 'ff:ff:ff:ff:ff:ff'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SHA,
+ dp.ofproto.OXM_OF_ARP_SHA_W, ),
+ arp_sha_bin, None]
- self._set_val('rules',
- 'arp_sha',
- arp_sha)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_sha_masked(arp_sha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_sha_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_sha_masked_f0(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_sha = '3e:ec:13:9b:f3:0b'
- mask = 'ff:ff:ff:ff:ff:00'
arp_sha_bin = self.haddr_to_bin(arp_sha)
+ mask = 'ff:ff:ff:ff:ff:00'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SHA,
+ dp.ofproto.OXM_OF_ARP_SHA_W, ),
+ arp_sha_bin[:-1] + '\x00', mask_bin]
- self._set_val('rules',
- 'arp_sha',
- arp_sha[:-2] + '00/' + mask)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_sha_masked(arp_sha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_sha_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_sha_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_sha = '3e:ec:13:9b:f3:0b'
- mask = '00:00:00:00:00:00'
arp_sha_bin = self.haddr_to_bin(arp_sha)
+ mask = '00:00:00:00:00:00'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_SHA,
+ dp.ofproto.OXM_OF_ARP_SHA_W, ),
+ None, None]
- self._set_val('rules',
- 'arp_sha',
- None)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_sha_masked(arp_sha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tha(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tha(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
arp_tha = '83:6c:21:52:49:68'
arp_tha_bin = self.haddr_to_bin(arp_tha)
- self._set_val('rules',
- 'arp_tha',
- arp_tha)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_THA,
+ dp.ofproto.OXM_OF_ARP_THA_W, ),
+ arp_tha_bin, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tha(arp_tha_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tha_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tha_masked_ff(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
- arp_tha = '3e:ec:13:9b:f3:0b'
- mask = 'ff:ff:ff:ff:ff:ff'
+ arp_tha = '83:6c:21:52:49:68'
arp_tha_bin = self.haddr_to_bin(arp_tha)
+ mask = 'ff:ff:ff:ff:ff:ff'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_THA,
+ dp.ofproto.OXM_OF_ARP_THA_W, ),
+ arp_tha_bin, None]
- self._set_val('rules',
- 'arp_tha',
- arp_tha)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tha_masked(arp_tha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tha_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tha_masked_f0(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
- arp_tha = '3e:ec:13:9b:f3:0b'
- mask = 'ff:ff:ff:ff:ff:00'
+ arp_tha = '83:6c:21:52:49:68'
arp_tha_bin = self.haddr_to_bin(arp_tha)
+ mask = 'ff:ff:ff:ff:ff:00'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_THA,
+ dp.ofproto.OXM_OF_ARP_THA_W, ),
+ arp_tha_bin[:-1] + '\x00', mask_bin]
- self._set_val('rules',
- 'arp_tha',
- arp_tha[:-2] + '00/' + mask)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tha_masked(arp_tha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_arp_tha_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_arp_tha_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_ARP
- arp_tha = '3e:ec:13:9b:f3:0b'
- mask = '00:00:00:00:00:00'
+ arp_tha = '83:6c:21:52:49:68'
arp_tha_bin = self.haddr_to_bin(arp_tha)
+ mask = '00:00:00:00:00:00'
mask_bin = self.haddr_to_bin(mask)
+ self._verify = [(dp.ofproto.OXM_OF_ARP_THA,
+ dp.ofproto.OXM_OF_ARP_THA_W, ),
+ None, None]
- self._set_val('rules',
- 'arp_tha',
- None)
-
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_arp_tha_masked(arp_tha_bin, mask_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_src(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_src(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_src = '2001:db8:bd05:1d2:288a:1fc0:1:10ee'
ipv6_src_int = self.ipv6_to_int(ipv6_src)
- self._set_val('rules',
- 'ipv6_src',
- ipv6_src)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_SRC,
+ dp.ofproto.OXM_OF_IPV6_SRC_W, ),
+ ipv6_src_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_src(ipv6_src_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_src_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_src_masked_ff(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_src = '2001:db8:bd05:1d2:288a:1fc0:1:10ee'
- mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'
ipv6_src_int = self.ipv6_to_int(ipv6_src)
+ mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_src',
- ipv6_src)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_SRC,
+ dp.ofproto.OXM_OF_IPV6_SRC_W, ),
+ ipv6_src_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_src_masked(ipv6_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_src_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_src_masked_f0(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_src = '2001:db8:bd05:1d2:288a:1fc0:1:10ee'
- mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:0'
ipv6_src_int = self.ipv6_to_int(ipv6_src)
+ mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:0'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_src',
- ipv6_src[:-4] + '0/112')
+ ipv6_src_masked = [x & y for (x, y) in
+ itertools.izip(ipv6_src_int, mask_int)]
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_SRC,
+ dp.ofproto.OXM_OF_IPV6_SRC_W, ),
+ ipv6_src_masked, mask_int]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_src_masked(ipv6_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_src_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_src_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_src = '2001:db8:bd05:1d2:288a:1fc0:1:10ee'
- mask = '0:0:0:0:0:0:0:0'
ipv6_src_int = self.ipv6_to_int(ipv6_src)
+ mask = '0:0:0:0:0:0:0:0'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_src',
- None)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_SRC,
+ dp.ofproto.OXM_OF_IPV6_SRC_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_src_masked(ipv6_src_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_dst(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_dst(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_dst = 'e9e8:9ea5:7d67:82cc:ca54:1fc0:2d24:f038'
ipv6_dst_int = self.ipv6_to_int(ipv6_dst)
- self._set_val('rules',
- 'ipv6_dst',
- ipv6_dst)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_DST,
+ dp.ofproto.OXM_OF_IPV6_DST_W, ),
+ ipv6_dst_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_dst(ipv6_dst_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_dst_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_dst_masked_ff(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_dst = 'e9e8:9ea5:7d67:82cc:ca54:1fc0:2d24:f038'
- mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'
ipv6_dst_int = self.ipv6_to_int(ipv6_dst)
+ mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_dst',
- ipv6_dst)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_DST,
+ dp.ofproto.OXM_OF_IPV6_DST_W, ),
+ ipv6_dst_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_dst_masked(ipv6_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_dst_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_dst_masked_f0(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_dst = 'e9e8:9ea5:7d67:82cc:ca54:1fc0:2d24:f038'
- mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:0'
ipv6_dst_int = self.ipv6_to_int(ipv6_dst)
+ mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:0'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_dst',
- ipv6_dst[:-4] + '0/112')
+ ipv6_dst_masked = [x & y for (x, y) in
+ itertools.izip(ipv6_dst_int, mask_int)]
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_DST,
+ dp.ofproto.OXM_OF_IPV6_DST_W, ),
+ ipv6_dst_masked, mask_int]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_dst_masked(ipv6_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_dst_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_dst_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_dst = 'e9e8:9ea5:7d67:82cc:ca54:1fc0:2d24:f038'
- mask = '0:0:0:0:0:0:0:0'
ipv6_dst_int = self.ipv6_to_int(ipv6_dst)
+ mask = '0:0:0:0:0:0:0:0'
mask_int = self.ipv6_to_int(mask)
- self._set_val('rules',
- 'ipv6_dst',
- None)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_DST,
+ dp.ofproto.OXM_OF_IPV6_DST_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_dst_masked(ipv6_dst_int, mask_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_flabel(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_flabel(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_label = 0xc5384
- self._set_val('rules',
- 'ipv6_label',
- hex(ipv6_label))
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_FLABEL,
+ dp.ofproto.OXM_OF_IPV6_FLABEL_W, ),
+ ipv6_label, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_flabel(ipv6_label)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_flabel_masked_ff(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_flabel_masked_ff(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_label = 0xc5384
mask = 0xfffff
- self._set_val('rules',
- 'ipv6_label',
- hex(ipv6_label))
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_FLABEL,
+ dp.ofproto.OXM_OF_IPV6_FLABEL_W, ),
+ ipv6_label, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_flabel_masked(ipv6_label, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_flabel_masked_f0(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_flabel_masked_f0(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_label = 0xc5384
mask = 0xffff0
- self._set_val('rules',
- 'ipv6_label',
- hex(ipv6_label)[:-1] + '0/' + hex(mask))
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_FLABEL,
+ dp.ofproto.OXM_OF_IPV6_FLABEL_W, ),
+ ipv6_label & mask, mask]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_flabel_masked(ipv6_label, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_flabel_masked_00(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_flabel_masked_00(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ipv6_label = 0xc5384
mask = 0x0
- self._set_val('rules',
- 'ipv6_label',
- None)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_FLABEL,
+ dp.ofproto.OXM_OF_IPV6_FLABEL_W, ),
+ None, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ipv6_flabel_masked(ipv6_label, mask)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_icmpv6_type(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_icmpv6_type(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
icmp_type = 129
- self._set_val('rules',
- 'icmp_type',
- str(icmp_type))
+ self._verify = [(dp.ofproto.OXM_OF_ICMPV6_TYPE, ),
+ icmp_type, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv6_type(icmp_type)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_icmpv6_code(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_icmpv6_code(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
icmp_type = 138
icmp_code = 1
- self._set_val('rules',
- 'icmp_code',
- str(icmp_code))
+ self._verify = [(dp.ofproto.OXM_OF_ICMPV6_CODE, ),
+ icmp_code, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv6_type(icmp_type)
match.set_icmpv6_code(icmp_code)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_nd_target(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_nd_target(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
-
- # type = 135 : Neighbor Solicitation
icmp_type = 135
target = "5420:db3f:921b:3e33:2791:98f:dd7f:2e19"
target_int = self.ipv6_to_int(target)
- self._set_val('rules',
- 'nd_target',
- target)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_ND_TARGET, ),
+ target_int, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv6_type(icmp_type)
match.set_ipv6_nd_target(target_int)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_nd_sll(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_nd_sll(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
-
- # type = 135 : Neighbor Solicitation
icmp_type = 135
nd_sll = "93:6d:d0:d4:e8:36"
nd_sll_bin = self.haddr_to_bin(nd_sll)
- self._set_val('rules',
- 'nd_sll',
- nd_sll)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_ND_SLL, ),
+ nd_sll_bin, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv6_type(icmp_type)
match.set_ipv6_nd_sll(nd_sll_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_ipv6_nd_tll(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_ipv6_nd_tll(self, dp):
dl_type = nx_match.ETH_TYPE_IPV6
ip_proto = IPPROTO_ICMPV6
-
- # type = 136 : Neighbor Advertisement
icmp_type = 136
nd_tll = "18:f6:66:b6:f1:b3"
nd_tll_bin = self.haddr_to_bin(nd_tll)
- self._set_val('rules',
- 'nd_tll',
- nd_tll)
+ self._verify = [(dp.ofproto.OXM_OF_IPV6_ND_TLL, ),
+ nd_tll_bin, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_ip_proto(ip_proto)
match.set_icmpv6_type(icmp_type)
match.set_ipv6_nd_tll(nd_tll_bin)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_mpls_label(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_mpls_label(self, dp):
dl_type = 0x8847
label = 2144
- self._set_val('rules',
- 'mpls_label',
- str(label))
+ self._verify = [(dp.ofproto.OXM_OF_MPLS_LABEL, ),
+ label, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_mpls_label(label)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
-
- def test_rule_set_mpls_tc(self):
- datapath = self.datapath
- ofproto = self.ofproto
- ofproto_parser = self.ofproto_parser
+ self.add_matches(dp, match)
+ def test_rule_set_mpls_tc(self, dp):
dl_type = 0x8847
tc = 3
- self._set_val('rules',
- 'mpls_tc',
- str(tc))
+ self._verify = [(dp.ofproto.OXM_OF_MPLS_TC, ),
+ tc, None]
- match = ofproto_parser.OFPMatch()
+ match = dp.ofproto_parser.OFPMatch()
match.set_dl_type(dl_type)
match.set_mpls_tc(tc)
- inst = []
-
- m = ofproto_parser.OFPFlowMod(datapath, 0, 0, 0,
- ofproto.OFPFC_ADD,
- 0, 0, 0, 0xffffffff,
- ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- datapath.send_msg(m)
+ self.add_matches(dp, match)
diff --git a/ryu/tests/integrated/tester.py b/ryu/tests/integrated/tester.py
index a447169c..31cf8693 100644
--- a/ryu/tests/integrated/tester.py
+++ b/ryu/tests/integrated/tester.py
@@ -16,319 +16,145 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
import sys
-import gflags
import logging
-import subprocess
-import traceback
from ryu import utils
from ryu.lib import mac
from ryu.base import app_manager
from ryu.controller import ofp_event
-from ryu.controller import dispatcher
-from ryu.controller import event
from ryu.controller import handler
+from ryu.controller import dpset
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
-from ryu.ofproto import nx_match
from ryu.ofproto import ofproto_v1_0
from ryu.ofproto import ofproto_v1_2
LOG = logging.getLogger(__name__)
-FLAGS = gflags.FLAGS
-gflags.DEFINE_string('run_test_mod', '', 'Test run the module name.')
-
-
-class EventRunTest(event.EventBase):
- def __init__(self, datapath):
- super(EventRunTest, self).__init__()
- self.datapath = datapath
-
-
-QUEUE_NAME_RUN_TEST_EV = 'run_test_event'
-DISPATCHER_NAME_RUN_TEST_EV = 'run_test_event'
-RUN_TEST_EV_DISPATCHER = dispatcher.EventDispatcher(
- DISPATCHER_NAME_RUN_TEST_EV)
-
LOG_TEST_START = 'TEST_START: %s'
LOG_TEST_RESULTS = 'TEST_RESULTS:'
-LOG_TEST_FINISH = 'TEST_FINISHED: Completed=[%s], OK=[%s], NG=[%s]'
-
-
-class Tester(app_manager.RyuApp):
-
- def __init__(self, *args, **kwargs):
- super(Tester, self).__init__()
- self.ev_q = dispatcher.EventQueue(QUEUE_NAME_RUN_TEST_EV,
- RUN_TEST_EV_DISPATCHER)
-
- run_test_mod = utils.import_module(FLAGS.run_test_mod)
- LOG.debug('import run_test_mod.[%s]', run_test_mod.__name__)
-
- self.run_test = run_test_mod.RunTest(*args, **kwargs)
- handler.register_instance(self.run_test)
-
- @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
- def switch_features_handler(self, ev):
- msg = ev.msg
- datapath = msg.datapath
-
- send_delete_all_flows(datapath)
- datapath.send_barrier()
-
- @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)
- def barrier_replay_handler(self, ev):
- self.ev_q.queue(EventRunTest(ev.msg.datapath))
-
- @set_ev_cls(EventRunTest, RUN_TEST_EV_DISPATCHER)
- def run_test_halder(self, ev):
- dp = ev.datapath
- t = self.run_test
-
- if not t._test_started():
- t._test_init(dp)
-
- if not self._run_test(t):
- # run_test was throwing exception.
- LOG.info(LOG_TEST_FINISH, False, t._RESULTS_OK, t._RESULTS_NG)
- return
-
- if not t._test_completed():
- t.datapath.send_barrier()
- return
-
- # Completed all tests.
- LOG.info(LOG_TEST_FINISH, True, t._RESULTS_OK, t._RESULTS_NG)
-
- def _run_test(self, t):
- running = t._running()
-
- if len(running) == 0:
- # next test
- name = t._pop_test()
- LOG.info(LOG_TEST_START, name)
- try:
- getattr(t, name)()
- except Exception:
- exc_type, exc_value, exc_traceback = sys.exc_info()
- traceback.print_exception(exc_type, exc_value,
- exc_traceback, file=sys.stdout)
- send_delete_all_flows(t.datapath)
- return False
- else:
- # check
- name = 'check_' + running[5:]
-
- if not name in dir(t):
- name = '_check_default'
-
- err = 0
- try:
- # LOG.debug('_run_test: CHECK_TEST = [%s]', name)
- getattr(t, name)()
- except Exception:
- exc_type, exc_value, exc_traceback = sys.exc_info()
- traceback.print_exception(exc_type, exc_value,
- exc_traceback, file=sys.stdout)
- err = 1
- finally:
- send_delete_all_flows(t.datapath)
- if err:
- return False
- t._check_run()
-
- return True
-
-
-def _send_delete_all_flows_v10(dp):
- rule = nx_match.ClsRule()
- match = dp.ofproto_parser.OFPMatch(dp.ofproto.OFPFW_ALL,
- 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0)
- m = dp.ofproto_parser.OFPFlowMod(
- dp, match, 0,
- dp.ofproto.OFPFC_DELETE,
- 0, 0, 0, 0,
- dp.ofproto.OFPP_NONE, 0, None)
- dp.send_msg(m)
-
+LOG_TEST_FINISH = 'TEST_FINISHED: Completed=[%s]'
-def _send_delete_all_flows_v12(dp):
- match = dp.ofproto_parser.OFPMatch()
- inst = []
- m = dp.ofproto_parser.OFPFlowMod(dp, 0, 0, 0,
- dp.ofproto.OFPFC_DELETE,
- 0, 0, 0, 0,
- dp.ofproto.OFPP_ANY, 0xffffffff,
- 0, match, inst)
- dp.send_msg(m)
-
-def send_delete_all_flows(dp):
- assert dp.ofproto in (ofproto_v1_0, ofproto_v1_2)
- if dp.ofproto == ofproto_v1_0:
- _send_delete_all_flows_v10(dp)
- elif dp.ofproto == ofproto_v1_2:
- _send_delete_all_flows_v12(dp)
- else:
- # this function will be remove.
- dp.send_delete_all_flows()
-
-
-def run_command(cmd, redirect_output=True, check_exit_code=True, env=None):
- if redirect_output:
- stdout = subprocess.PIPE
- else:
- stdout = None
-
- proc = subprocess.Popen(cmd, stdout=stdout,
- stderr=subprocess.STDOUT, env=env)
- output = proc.communicate()[0]
-
- LOG.debug('Exec command "%s" \n%s', ' '.join(cmd), output)
- if check_exit_code and proc.returncode != 0:
- raise Exception('Command "%s" failed.\n%s' % (' '.join(cmd), output))
- return output
-
-
-class RunTestBase(object):
+class TestFlowBase(app_manager.RyuApp):
"""
To run the tests is required for the following pair of functions.
1. test_<test name>()
To send flows to switch.
- 2. check_<test name>() or _check_default()
+ 2. verify_<test name>() or _verify_default()
To check flows of switch.
-
- To deal common values to the functions(test_ and check_)
- can use `set_val('name', val)` and `get_val('name')`.
- This values is initialized before the next tests.
"""
- def __init__(self):
- super(RunTestBase, self).__init__()
-
- self._TEST_STARTED = False
- self._TESTS = []
- self._RUNNING = ''
- self._RESULTS_OK = 0
- self._RESULTS_NG = 0
- self._CHECK = {}
-
- def _test_started(self):
- return self._TEST_STARTED
-
- def _test_init(self, dp):
- self.datapath = dp
- self.ofproto = dp.ofproto
- self.ofproto_parser = dp.ofproto_parser
-
- for name in dir(self):
- if name.startswith("test_"):
- self._TESTS.append(name)
- self._TEST_STARTED = True
-
- def _test_completed(self):
- if self._TEST_STARTED:
- if len(self._RUNNING) + len(self._TESTS) == 0:
- return True
- return False
-
- def _pop_test(self):
- self._RUNNING = self._TESTS.pop()
- return self._RUNNING
-
- def _running(self):
- return self._RUNNING
+ _CONTEXTS = {
+ 'dpset': dpset.DPSet,
+ }
- def _check_run(self):
- self._RUNNING = ''
-
- def _check_default(self):
- err = 'function %s() is not found.' % (self._RUNNING, )
- self.results(ret=False, msg=err)
-
- def results(self, name=None, ret=True, msg=''):
- if not name:
- name = self._RUNNING
-
- if ret:
- res = 'OK'
- self._RESULTS_OK += 1
+ def __init__(self, *args, **kwargs):
+ super(TestFlowBase, self).__init__(*args, **kwargs)
+ self.pending = []
+ self.results = {}
+ self.current = None
+ self.unclear = 0
+
+ for t in dir(self):
+ if t.startswith("test_"):
+ self.pending.append(t)
+ self.unclear = len(self.pending)
+
+ def delete_all_flows(self, dp):
+ if dp.ofproto == ofproto_v1_0:
+ match = dp.ofproto_parser.OFPMatch(dp.ofproto.OFPFW_ALL,
+ 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0)
+ m = dp.ofproto_parser.OFPFlowMod(
+ dp, match, 0,
+ dp.ofproto.OFPFC_DELETE,
+ 0, 0, 0, 0,
+ dp.ofproto.OFPP_NONE, 0, None)
+ elif dp.ofproto == ofproto_v1_2:
+ match = dp.ofproto_parser.OFPMatch()
+ m = dp.ofproto_parser.OFPFlowMod(dp, 0, 0, 0,
+ dp.ofproto.OFPFC_DELETE,
+ 0, 0, 0, 0xffffffff,
+ dp.ofproto.OFPP_ANY, 0xffffffff,
+ 0, match, [])
+
+ dp.send_msg(m)
+
+ def send_flow_stats(self, dp):
+ if dp.ofproto == ofproto_v1_0:
+ match = dp.ofproto_parser.OFPMatch(dp.ofproto.OFPFW_ALL,
+ 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0)
+ m = dp.ofproto_parser.OFPFlowStatsRequest(
+ dp, 0, match,
+ 0, dp.ofproto.OFPP_NONE)
+ elif dp.ofproto == ofproto_v1_2:
+ match = dp.ofproto_parser.OFPMatch()
+ m = dp.ofproto_parser.OFPFlowStatsRequest(dp, 0,
+ dp.ofproto.OFPP_ANY,
+ dp.ofproto.OFPG_ANY,
+ 0, 0, match)
+
+ dp.send_msg(m)
+
+ def verify_default(self, dp, stats):
+ return 'function %s() is not found.' % ("verify" + self.current[4:], )
+
+ def start_next_test(self, dp):
+ self.delete_all_flows(dp)
+ dp.send_barrier()
+ if len(self.pending):
+ t = self.pending.pop()
+ LOG.info(LOG_TEST_START, t)
+ self.current = t
+ getattr(self, t)(dp)
+ dp.send_barrier()
+ self.send_flow_stats(dp)
else:
- res = 'NG'
- self._RESULTS_NG += 1
-
- LOG.info('%s %s [%s] %s', LOG_TEST_RESULTS, name, res, '\n' + msg)
-
- def set_val(self, name, val):
- self._CHECK[name] = val
-
- def get_val(self, name):
- return self._CHECK[name]
-
- def del_val(self, name):
- del self._CHECK[name]
-
- def del_val_all(self):
- self._CHECK.clear()
-
- def get_ovs_flows(self, target):
- # flows (return):
- # [flow1, flow2,...]
- # flow:
- # {'actions': actions, 'rules': rules}
- # or {'apply_actions': actions, 'rules': rules}
- # or {'write_actions': actions, 'rules': rules}
- # or {'clear_actions': actions, 'rules': rules}
- # actions, rules:
- # {'<name>': <val>}
-
- cmd = ('sudo', 'ovs-ofctl', 'dump-flows', target)
- output = run_command(cmd)
-
- flows = []
- for line in output.splitlines():
- if line.startswith(" "):
- flow = {}
- rules, actions = line.split('actions=')
- rules = self.cnv_list(rules, '=')
+ LOG.info("TEST_RESULTS:")
+ for t, r in self.results.items():
+ LOG.info(" %s: %s", t, r)
+ LOG.info(LOG_TEST_FINISH, self.unclear == 0)
+
+ @handler.set_ev_cls(ofp_event.EventOFPFlowStatsReply,
+ handler.MAIN_DISPATCHER)
+ def flow_reply_handler(self, ev):
+ self.run_verify(ev)
+
+ @handler.set_ev_cls(ofp_event.EventOFPStatsReply,
+ handler.MAIN_DISPATCHER)
+ def stats_reply_handler(self, ev):
+ self.run_verify(ev)
+
+ def run_verify(self, ev):
+ msg = ev.msg
+ dp = msg.datapath
- if actions.startswith("apply_actions"):
- a_name = 'apply_actions'
- actions = actions[len(a_name) + 1:-1]
- elif actions.startswith("write_actions"):
- a_name = 'write_actions'
- actions = actions[len(a_name) + 1:-1]
- elif actions.startswith("clear_actions"):
- a_name = 'clear_actions'
- actions = actions[len(a_name) + 1:-1]
- else:
- a_name = 'actions'
- actions = self.cnv_list(actions, ':')
- flows.append({'rules': rules, a_name: actions, })
+ verify_func = self.verify_default
+ v = "verify" + self.current[4:]
+ if v in dir(self):
+ verify_func = getattr(self, v)
- return flows
+ result = verify_func(dp, msg.body)
+ if result == True:
+ self.unclear -= 1
- def cnv_list(self, tmp, sep):
- list_ = {}
- for p in tmp.split(','):
- if len(p.strip()) == 0:
- continue
+ self.results[self.current] = result
+ self.start_next_test(dp)
- if p.find(sep) > 0:
- name, val = p.strip().split(sep, 1)
- else:
- name = val = p.strip()
- list_[name] = val
- return list_
+ @handler.set_ev_cls(dpset.EventDP, dpset.DPSET_EV_DISPATCHER)
+ def handler_datapath(self, ev):
+ if ev.enter:
+ self.start_next_test(ev.dp)
- def cnv_txt(self, tmp, sep='='):
- return ",".join([(str(x) + sep + str(tmp[x])) for x in tmp if x >= 0])
+ @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)
+ def barrier_replay_handler(self, ev):
+ pass
def haddr_to_str(self, addr):
return mac.haddr_to_str(addr)