summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMinoru TAKAHASHI <takahashi.minoru7@gmail.com>2015-09-08 09:15:21 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-09-10 11:20:49 +0900
commit969b4b6837ee29c1158d44878fbf3d555b96e81a (patch)
treee42c120ef08a2ca9e661a7c04198ef10cf2952de
parentdc2d5a90b78f675d5d26ce24271fd3fced87be62 (diff)
test_ofctl: improving readability
Signed-off-by: Minoru TAKAHASHI <takahashi.minoru7@gmail.com> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/tests/unit/lib/test_ofctl.py610
1 files changed, 312 insertions, 298 deletions
diff --git a/ryu/tests/unit/lib/test_ofctl.py b/ryu/tests/unit/lib/test_ofctl.py
index bf59097d..33c3e964 100644
--- a/ryu/tests/unit/lib/test_ofctl.py
+++ b/ryu/tests/unit/lib/test_ofctl.py
@@ -116,220 +116,304 @@ class Test_ofctl(unittest.TestCase):
pass
def _test_actions(self, act, test):
- act_type = act["type"]
- to_actions = test.to_actions
- actions_to_str = test.actions_to_str
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
- act_list = []
- act_list.append(act)
- # str -> action
- result = to_actions(dp, act_list)
- insts = result[0]
+ act_type = act["type"]
- def equal_act(insts, act, act_type, test):
- if act_type in test.supported_action:
- cls = test.supported_action[act_type]
- else:
- cls = None
- if act_type == 'GOTO_TABLE':
- ok_(isinstance(insts, cls))
- eq_(insts.table_id, act["table_id"])
- elif act_type == 'WRITE_METADATA':
- ok_(isinstance(insts, cls))
- eq_(insts.metadata, act["metadata"])
- eq_(insts.metadata_mask, act["metadata_mask"])
- elif act_type == 'METER':
- ok_(isinstance(insts, cls))
- eq_(insts.meter_id, act["meter_id"])
- else:
- if test.ver == ofproto_v1_0.OFP_VERSION:
- action = insts
- else:
- action = insts.actions[0]
- ok_(isinstance(action, cls))
- if act_type == 'OUTPUT':
- eq_(action.port, act["port"])
- elif act_type == 'SET_VLAN_VID':
- eq_(action.vlan_vid, act["vlan_vid"])
- elif act_type == 'SET_VLAN_PCP':
- eq_(action.vlan_pcp, act["vlan_pcp"])
- elif act_type == 'SET_DL_SRC':
- eq_(addrconv.mac.bin_to_text(action.dl_addr),
- act["dl_src"])
- elif act_type == 'SET_DL_DST':
- eq_(addrconv.mac.bin_to_text(action.dl_addr),
- act["dl_dst"])
- elif act_type == 'SET_NW_SRC':
- ip = netaddr.ip.IPAddress(action.nw_addr)
- eq_(str(ip), act["nw_src"])
- elif act_type == 'SET_NW_DST':
- ip = netaddr.ip.IPAddress(action.nw_addr)
- eq_(str(ip), act["nw_dst"])
- elif act_type == 'SET_NW_TOS':
- eq_(action.tos, act["nw_tos"])
- elif act_type == 'SET_TP_SRC':
- eq_(action.tp, act["tp_src"])
- elif act_type == 'SET_TP_DST':
- eq_(action.tp, act["tp_dst"])
- elif act_type == 'ENQUEUE':
- eq_(action.queue_id, act["queue_id"])
- eq_(action.port, act["port"])
- elif act_type == 'SET_MPLS_TTL':
- eq_(action.mpls_ttl, act["mpls_ttl"])
- elif act_type in ['PUSH_VLAN', 'PUSH_MPLS',
- 'POP_MPLS', 'PUSH_PBB']:
- eq_(action.ethertype, act["ethertype"])
- elif act_type == 'SET_QUEUE':
- eq_(action.queue_id, act["queue_id"])
- elif act_type == 'GROUP':
- eq_(action.group_id, act["group_id"])
- elif act_type == 'SET_NW_TTL':
- eq_(action.nw_ttl, act["nw_ttl"])
- elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
- 'COPY_TTL_IN', 'DEC_MPLS_TTL',
- 'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
- pass
- else:
- assert False
+ # str -> action
+ insts = test.to_actions(dp, [act])
if test.ver == ofproto_v1_0.OFP_VERSION:
- equal_act(insts, act, act_type, test)
+ action = insts[0]
+ self._equal_str_to_act(action, act, act_type, test)
else:
- if act_type == 'WRITE_ACTIONS':
- ok_(isinstance(insts, test._parser.OFPInstructionActions))
- eq_(insts.type, test._ofproto.OFPIT_WRITE_ACTIONS)
- equal_act(insts, act["actions"][0], act["actions"][0]["type"], test)
- elif act_type == 'CLEAR_ACTIONS':
- ok_(isinstance(insts, test._parser.OFPInstructionActions))
- eq_(insts.type, test._ofproto.OFPIT_CLEAR_ACTIONS)
- else:
- if act_type not in ['GOTO_TABLE', 'WRITE_METADATA', 'METER']:
- ok_(isinstance(insts, test._parser.OFPInstructionActions))
- eq_(insts.type, test._ofproto.OFPIT_APPLY_ACTIONS)
- equal_act(insts, act, act_type, test)
- else:
- equal_act(insts, act, act_type, test)
+ inst = insts[0]
+ self._equal_str_to_inst(inst, act, act_type, test)
# action -> str
- action_str = actions_to_str(result)
+ inst_str = test.actions_to_str(insts)
+ if test.ver == ofproto_v1_0.OFP_VERSION:
+ act_str = inst_str
+ self._equal_act_to_str(act_str, act, act_type, test)
+ else:
+ self._equal_inst_to_str(inst_str, act, act_type, test)
+
+ def _test_match(self, attrs, test):
+ dp = ofproto_protocol.ProtocolDesc(version=test.ver)
- def equal_str(action_str, act, act_type, test):
- action_str_list = action_str[0].split(':', 1)
- eq_(action_str_list[0], act_type)
+ # str -> match
+ match = test.to_match(dp, attrs)
+
+ for key, value in attrs.items():
+ key = self._conv_key(test, key, attrs)
+ self._equal_str_to_match(key, value, match, test)
+
+ # match -> str
+ match_str = test.match_to_str(match)
+
+ for key, value in attrs.items():
+ if key in conv_of12_to_of10_dict:
+ key_old = conv_of12_to_of10_dict[key]
+ else:
+ key_old = key
+ self._equal_match_to_str(key_old, value, match_str, test)
+ def _equal_str_to_inst(self, inst, act, act_type, test):
+ if act_type in test.supported_action:
+ cls = test.supported_action[act_type]
+ else:
+ cls = None
+ if act_type == 'GOTO_TABLE':
+ ok_(isinstance(inst, cls))
+ eq_(inst.table_id, act["table_id"])
+ elif act_type == 'WRITE_METADATA':
+ ok_(isinstance(inst, cls))
+ eq_(inst.metadata, act["metadata"])
+ eq_(inst.metadata_mask, act["metadata_mask"])
+ elif act_type == 'METER':
+ ok_(isinstance(inst, cls))
+ eq_(inst.meter_id, act["meter_id"])
+ elif act_type == 'WRITE_ACTIONS':
+ ok_(isinstance(inst, cls))
+ eq_(inst.type, test._ofproto.OFPIT_WRITE_ACTIONS)
+ self._equal_str_to_act(inst.actions[0],
+ act["actions"][0],
+ act["actions"][0]["type"],
+ test)
+ elif act_type == 'CLEAR_ACTIONS':
+ ok_(isinstance(inst, cls))
+ eq_(inst.type, test._ofproto.OFPIT_CLEAR_ACTIONS)
+ else:
+ # APPLY_ACTIONS or Uknown Action Type
+ ok_(isinstance(inst, test._parser.OFPInstructionActions))
+ eq_(inst.type, test._ofproto.OFPIT_APPLY_ACTIONS)
+ self._equal_str_to_act(inst.actions[0], act,
+ act_type, test)
+
+ def _equal_str_to_act(self, action, act, act_type, test):
+ if act_type in test.supported_action:
+ cls = test.supported_action[act_type]
+ else:
+ cls = None
+ ok_(isinstance(action, cls))
+ if act_type == 'OUTPUT':
+ eq_(action.port, act["port"])
+ elif act_type == 'SET_VLAN_VID':
+ eq_(action.vlan_vid, act["vlan_vid"])
+ elif act_type == 'SET_VLAN_PCP':
+ eq_(action.vlan_pcp, act["vlan_pcp"])
+ elif act_type == 'SET_DL_SRC':
+ eq_(addrconv.mac.bin_to_text(action.dl_addr),
+ act["dl_src"])
+ elif act_type == 'SET_DL_DST':
+ eq_(addrconv.mac.bin_to_text(action.dl_addr),
+ act["dl_dst"])
+ elif act_type == 'SET_NW_SRC':
+ ip = netaddr.ip.IPAddress(action.nw_addr)
+ eq_(str(ip), act["nw_src"])
+ elif act_type == 'SET_NW_DST':
+ ip = netaddr.ip.IPAddress(action.nw_addr)
+ eq_(str(ip), act["nw_dst"])
+ elif act_type == 'SET_NW_TOS':
+ eq_(action.tos, act["nw_tos"])
+ elif act_type == 'SET_TP_SRC':
+ eq_(action.tp, act["tp_src"])
+ elif act_type == 'SET_TP_DST':
+ eq_(action.tp, act["tp_dst"])
+ elif act_type == 'ENQUEUE':
+ eq_(action.queue_id, act["queue_id"])
+ eq_(action.port, act["port"])
+ elif act_type == 'SET_MPLS_TTL':
+ eq_(action.mpls_ttl, act["mpls_ttl"])
+ elif act_type in ['PUSH_VLAN', 'PUSH_MPLS',
+ 'POP_MPLS', 'PUSH_PBB']:
+ eq_(action.ethertype, act["ethertype"])
+ elif act_type == 'SET_QUEUE':
+ eq_(action.queue_id, act["queue_id"])
+ elif act_type == 'GROUP':
+ eq_(action.group_id, act["group_id"])
+ elif act_type == 'SET_NW_TTL':
+ eq_(action.nw_ttl, act["nw_ttl"])
+ elif act_type == 'SET_FIELD':
+ eq_(action.key, act['field'])
+ eq_(action.value, act['value'])
+ elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
+ 'COPY_TTL_IN', 'DEC_MPLS_TTL',
+ 'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
+ pass
+ else: # Uknown Action Type
+ assert False
+
+ def _equal_inst_to_str(self, inst_str, act, act_type, test):
+ if act_type == 'WRITE_ACTIONS':
+ act_str = inst_str[0]["WRITE_ACTIONS"]
+ act = act["actions"][0]
+ act_type = act["type"]
+ self._equal_act_to_str(act_str, act, act_type, test)
+ else:
+ inst_str_list = inst_str[0].split(':', 1)
+ eq_(inst_str_list[0], act_type)
if act_type == 'GOTO_TABLE':
- eq_(int(action_str_list[1]), act["table_id"])
+ eq_(int(inst_str_list[1]), act["table_id"])
elif act_type == 'WRITE_METADATA':
- met = action_str_list[1].split('/')
+ met = inst_str_list[1].split('/')
eq_(int(met[0], 16), act["metadata"])
eq_(int(met[1], 16), act["metadata_mask"])
elif act_type == 'METER':
- eq_(int(action_str_list[1]), act["meter_id"])
+ eq_(int(inst_str_list[1]), act["meter_id"])
elif act_type == 'CLEAR_ACTIONS':
pass
else:
- if act_type == 'OUTPUT':
- eq_(int(action_str_list[1]), act["port"])
- elif act_type == 'SET_VLAN_VID':
- eq_(int(action_str_list[1]), act["vlan_vid"])
- elif act_type == 'SET_VLAN_PCP':
- eq_(int(action_str_list[1]), act["vlan_pcp"])
- elif act_type == 'SET_DL_SRC':
- eq_(action_str_list[1], act["dl_src"])
- elif act_type == 'SET_DL_DST':
- eq_(action_str_list[1], act["dl_dst"])
- elif act_type == 'SET_NW_SRC':
- eq_(action_str_list[1], act["nw_src"])
- elif act_type == 'SET_NW_DST':
- eq_(action_str_list[1], act["nw_dst"])
- elif act_type == 'SET_NW_TOS':
- eq_(int(action_str_list[1]), act["nw_tos"])
- elif act_type == 'SET_TP_SRC':
- eq_(int(action_str_list[1]), act["tp_src"])
- elif act_type == 'SET_TP_DST':
- eq_(int(action_str_list[1]), act["tp_dst"])
- elif act_type == 'ENQUEUE':
- enq = action_str_list[1].split(':')
- eq_(int(enq[0], 10), act["port"])
- eq_(int(enq[1], 10), act["queue_id"])
- elif act_type == 'SET_MPLS_TTL':
- eq_(int(action_str_list[1]), act["mpls_ttl"])
- elif act_type == 'PUSH_VLAN':
- eq_(int(action_str_list[1]), act["ethertype"])
- elif act_type == 'PUSH_MPLS':
- eq_(int(action_str_list[1]), act["ethertype"])
- elif act_type == 'POP_MPLS':
- eq_(int(action_str_list[1]), act["ethertype"])
- elif act_type == 'SET_QUEUE':
- eq_(int(action_str_list[1]), act["queue_id"])
- elif act_type == 'GROUP':
- eq_(int(action_str_list[1]), act["group_id"])
- elif act_type == 'SET_NW_TTL':
- eq_(int(action_str_list[1]), act["nw_ttl"])
- elif act_type == 'SET_FIELD':
- eq_(action_str_list[1].strip(' {'), act["field"])
- eq_(action_str_list[2].strip('} '), act["value"])
- elif act_type == 'PUSH_PBB':
- eq_(int(action_str_list[1]), act["ethertype"])
- elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
- 'COPY_TTL_IN', 'DEC_MPLS_TTL',
- 'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
- pass
- else:
- assert False
-
- if act_type == 'WRITE_ACTIONS':
- action_str = action_str[0]["WRITE_ACTIONS"]
- act = act["actions"][0]
- act_type = act["type"]
- equal_str(action_str, act, act_type, test)
+ # APPLY_ACTIONS
+ act_str = inst_str
+ self._equal_act_to_str(act_str, act, act_type, test)
+
+ def _equal_act_to_str(self, act_str, act, act_type, test):
+ act_str_list = act_str[0].split(':', 1)
+ eq_(act_str_list[0], act_type)
+ if act_type == 'OUTPUT':
+ eq_(int(act_str_list[1]), act["port"])
+ elif act_type == 'SET_VLAN_VID':
+ eq_(int(act_str_list[1]), act["vlan_vid"])
+ elif act_type == 'SET_VLAN_PCP':
+ eq_(int(act_str_list[1]), act["vlan_pcp"])
+ elif act_type == 'SET_DL_SRC':
+ eq_(act_str_list[1], act["dl_src"])
+ elif act_type == 'SET_DL_DST':
+ eq_(act_str_list[1], act["dl_dst"])
+ elif act_type == 'SET_NW_SRC':
+ eq_(act_str_list[1], act["nw_src"])
+ elif act_type == 'SET_NW_DST':
+ eq_(act_str_list[1], act["nw_dst"])
+ elif act_type == 'SET_NW_TOS':
+ eq_(int(act_str_list[1]), act["nw_tos"])
+ elif act_type == 'SET_TP_SRC':
+ eq_(int(act_str_list[1]), act["tp_src"])
+ elif act_type == 'SET_TP_DST':
+ eq_(int(act_str_list[1]), act["tp_dst"])
+ elif act_type == 'ENQUEUE':
+ enq = act_str_list[1].split(':')
+ eq_(int(enq[0], 10), act["port"])
+ eq_(int(enq[1], 10), act["queue_id"])
+ elif act_type == 'SET_MPLS_TTL':
+ eq_(int(act_str_list[1]), act["mpls_ttl"])
+ elif act_type == 'PUSH_VLAN':
+ eq_(int(act_str_list[1]), act["ethertype"])
+ elif act_type == 'PUSH_MPLS':
+ eq_(int(act_str_list[1]), act["ethertype"])
+ elif act_type == 'POP_MPLS':
+ eq_(int(act_str_list[1]), act["ethertype"])
+ elif act_type == 'SET_QUEUE':
+ eq_(int(act_str_list[1]), act["queue_id"])
+ elif act_type == 'GROUP':
+ eq_(int(act_str_list[1]), act["group_id"])
+ elif act_type == 'SET_NW_TTL':
+ eq_(int(act_str_list[1]), act["nw_ttl"])
+ elif act_type == 'SET_FIELD':
+ field, value = act_str_list[1].split(':')
+ eq_(field.strip(' {'), act["field"])
+ eq_(int(value.strip('} ')), act["value"])
+ elif act_type == 'PUSH_PBB':
+ eq_(int(act_str_list[1]), act["ethertype"])
+ elif act_type in ['STRIP_VLAN', 'COPY_TTL_OUT',
+ 'COPY_TTL_IN', 'DEC_MPLS_TTL',
+ 'POP_VLAN', 'DEC_NW_TTL', 'POP_PBB']:
+ pass
else:
- equal_str(action_str, act, act_type, test)
-
- def _test_to_match(self, attrs, test):
- to_match = test.to_match
- match_to_str = test.match_to_str
- dp = ofproto_protocol.ProtocolDesc(version=test.ver)
- ofproto = dp.ofproto
-
- # str -> match
- match = to_match(dp, attrs)
-
- def equal_match(key, value, match):
- key = self._conv_key(test, key, attrs)
- field_value = self._get_field_value(test, key, match)
-
- if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
- # MAC address
- eth, mask = _to_match_eth(value)
- if mask is not None:
- # with mask
- for i in range(0, len(mask)):
- if mask[i] == 'f':
- eq_(eth[i], field_value[0][i])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(eth, field_value)
- return
- elif key in ['dl_src', 'dl_dst']:
- eth, mask = _to_match_eth(value)
- field_value = addrconv.mac.bin_to_text(field_value)
+ assert False
+
+ def _equal_str_to_match(self, key, value, match, test):
+ field_value = self._get_field_value(test, key, match)
+
+ if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
+ # MAC address
+ eth, mask = _to_match_eth(value)
+ if mask is not None:
+ # with mask
+ for i in range(0, len(mask)):
+ if mask[i] == 'f':
+ eq_(eth[i], field_value[0][i])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
eq_(eth, field_value)
- return
- elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
- # IPv4 address
- ipv4, mask = _to_match_ip(value)
- if mask is not None:
- # with mask
- eq_(ipv4, field_value[0])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(ipv4, field_value)
- return
- elif key in ['nw_src', 'nw_dst']:
- # IPv4 address
+ return
+ elif key in ['dl_src', 'dl_dst']:
+ eth, mask = _to_match_eth(value)
+ field_value = addrconv.mac.bin_to_text(field_value)
+ eq_(eth, field_value)
+ return
+ elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
+ # IPv4 address
+ ipv4, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ eq_(ipv4, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv4, field_value)
+ return
+ elif key in ['nw_src', 'nw_dst']:
+ # IPv4 address
+ ipv4, mask = _to_match_ip(value)
+ field_value = _to_match_ip(field_value)
+ if mask is not None:
+ # with mask
+ eq_(ipv4, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv4, field_value[0])
+ return
+ elif key in ['ipv6_src', 'ipv6_dst']:
+ # IPv6 address
+ ipv6, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ eq_(ipv6, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv6, field_value)
+ return
+ elif key == 'vlan_vid':
+ if test.ver == ofproto_v1_0.OFP_VERSION:
+ eq_(value, field_value)
+ else:
+ eq_(test.expected_value['vlan_vid'][
+ value]['to_match'], field_value)
+ return
+ else:
+ if isinstance(value, str) and '/' in value:
+ # with mask
+ value, mask = _to_match_masked_int(value)
+ value &= mask
+ eq_(value, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(_str_to_int(value), field_value)
+ return
+
+ def _equal_match_to_str(self, key, value, match_str, test):
+ field_value = match_str[key]
+ if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']:
+ # MAC address
+ eth, mask = _to_match_eth(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ for i in range(0, len(mask)):
+ if mask[i] == 'f':
+ eq_(eth[i], field_value[0][i])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(eth, field_value)
+ return
+ elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
+ # IPv4 address
+ if test.ver == ofproto_v1_0.OFP_VERSION:
ipv4, mask = _to_match_ip(value)
field_value = _to_match_ip(field_value)
if mask is not None:
@@ -339,117 +423,45 @@ class Test_ofctl(unittest.TestCase):
else:
# without mask
eq_(ipv4, field_value[0])
- return
- elif key in ['ipv6_src', 'ipv6_dst']:
- # IPv6 address
- ipv6, mask = _to_match_ip(value)
- if mask is not None:
- # with mask
- eq_(ipv6, field_value[0])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(ipv6, field_value)
- return
- elif key == 'vlan_vid':
- if test.ver == ofproto_v1_0.OFP_VERSION:
- eq_(value, field_value)
- else:
- eq_(test.expected_value['vlan_vid'][
- value]['to_match'], field_value)
- return
else:
- if isinstance(value, str) and '/' in value:
- # with mask
- value, mask = _to_match_masked_int(value)
- value &= mask
- eq_(value, field_value[0])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(_str_to_int(value), field_value)
- return
-
- for key, value in attrs.items():
- equal_match(key, value, match)
-
- # match -> str
- match_str = match_to_str(match)
-
- def equal_str(key, value, match_str):
- field_value = match_str[key]
- if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']:
- # MAC address
- eth, mask = _to_match_eth(value)
- if mask is not None:
- # with mask
- field_value = field_value.split('/')
- for i in range(0, len(mask)):
- if mask[i] == 'f':
- eq_(eth[i], field_value[0][i])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(eth, field_value)
- return
- elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
- # IPv4 address
- if test.ver == ofproto_v1_0.OFP_VERSION:
- ipv4, mask = _to_match_ip(value)
- field_value = _to_match_ip(field_value)
- if mask is not None:
- # with mask
- eq_(ipv4, field_value[0])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(ipv4, field_value[0])
- else:
- ipv4, mask = _to_match_ip(value)
- if mask is not None:
- # with mask
- field_value = field_value.split('/')
- eq_(ipv4, field_value[0])
- eq_(mask, field_value[1])
- else:
- # without mask
- eq_(ipv4, field_value)
- return
- elif key in ['ipv6_src', 'ipv6_dst']:
- # IPv6 address
- ipv6, mask = _to_match_ip(value)
+ ipv4, mask = _to_match_ip(value)
if mask is not None:
# with mask
field_value = field_value.split('/')
- eq_(ipv6, field_value[0])
+ eq_(ipv4, field_value[0])
eq_(mask, field_value[1])
else:
# without mask
- eq_(ipv6, field_value)
- return
- elif key == 'dl_vlan':
- if test.ver == ofproto_v1_0.OFP_VERSION:
- eq_(value, field_value)
- else:
- eq_(test.expected_value['vlan_vid'][
- value]['to_str'], field_value)
- return
+ eq_(ipv4, field_value)
+ return
+ elif key in ['ipv6_src', 'ipv6_dst']:
+ # IPv6 address
+ ipv6, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ eq_(ipv6, field_value[0])
+ eq_(mask, field_value[1])
else:
- if isinstance(value, str) and '/' in value:
- # with mask
- value = _to_masked_int_str(value)
- eq_(value, field_value)
- else:
- # without mask
- eq_(_str_to_int(value), field_value)
- return
-
- for key, value in attrs.items():
- if key in conv_of12_to_of10_dict:
- key_old = conv_of12_to_of10_dict[key]
+ # without mask
+ eq_(ipv6, field_value)
+ return
+ elif key == 'dl_vlan':
+ if test.ver == ofproto_v1_0.OFP_VERSION:
+ eq_(value, field_value)
else:
- key_old = key
- equal_str(key_old, value, match_str)
+ eq_(test.expected_value['vlan_vid'][
+ value]['to_str'], field_value)
+ return
+ else:
+ if isinstance(value, str) and '/' in value:
+ # with mask
+ value = _to_masked_int_str(value)
+ eq_(value, field_value)
+ else:
+ # without mask
+ eq_(_str_to_int(value), field_value)
+ return
def _conv_key(self, test, key, attrs):
if test.ver != ofproto_v1_0.OFP_VERSION:
@@ -723,6 +735,8 @@ class test_data_v1_2(test_data_base):
'SET_FIELD': self._parser.OFPActionSetField,
'GOTO_TABLE': self._parser.OFPInstructionGotoTable,
'WRITE_METADATA': self._parser.OFPInstructionWriteMetadata,
+ 'WRITE_ACTIONS': self._parser.OFPInstructionActions,
+ 'CLEAR_ACTIONS': self._parser.OFPInstructionActions,
})
self.set_expected_value()
@@ -827,7 +841,7 @@ def _add_tests_match(cls):
def _run(self, name, attr, cls):
print('processing %s ...' % name)
cls_ = Test_ofctl(name)
- cls_._test_to_match(attr, cls)
+ cls_._test_match(attr, cls)
print('adding %s ...' % method_name)
func = functools.partial(
_run, name=method_name, attr=attr, cls=cls)