summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ryu/tests/unit/lib/test_ofctl.py338
1 files changed, 161 insertions, 177 deletions
diff --git a/ryu/tests/unit/lib/test_ofctl.py b/ryu/tests/unit/lib/test_ofctl.py
index 52d1b9a9..dd73b5ea 100644
--- a/ryu/tests/unit/lib/test_ofctl.py
+++ b/ryu/tests/unit/lib/test_ofctl.py
@@ -15,12 +15,9 @@
import unittest
import logging
-import struct
-import socket
import netaddr
import functools
import new
-import itertools
from nose.tools import *
@@ -29,92 +26,62 @@ from ryu.ofproto import ofproto_v1_2, ofproto_v1_2_parser
from ryu.lib import ofctl_v1_3
from ryu.ofproto import ofproto_v1_3, ofproto_v1_3_parser
from ryu.ofproto import ofproto_protocol
-from ryu.lib import mac
-from ryu.lib import ip
+from ryu.ofproto import inet
LOG = logging.getLogger('test_ofctl_v1_2, v1_3')
""" Common Functions """
-def _to_match_eth(value):
- eth_mask = value.split('/')
- # MAC address
- eth = mac.haddr_to_bin(eth_mask[0])
- # mask
- mask = mac.haddr_to_bin('ff:ff:ff:ff:ff:ff')
- if len(eth_mask) == 2:
- mask = mac.haddr_to_bin(eth_mask[1])
- return eth, mask
-
-
-def _to_match_tpsrc(value, match, rest):
- match_append = {inet.IPPROTO_TCP: match.set_tcp_src,
- inet.IPPROTO_UDP: match.set_udp_src}
- nw_proto = rest.get('nw_proto', rest.get('ip_proto', 0))
- if nw_proto in match_append:
- match_append[nw_proto](value)
- return match
-
-
-def _to_match_tpdst(value, match, rest):
- match_append = {inet.IPPROTO_TCP: match.set_tcp_dst,
- inet.IPPROTO_UDP: match.set_udp_dst}
- nw_proto = rest.get('nw_proto', rest.get('ip_proto', 0))
- if nw_proto in match_append:
- match_append[nw_proto](value)
- return match
+def _str_to_int(src):
+ if isinstance(src, str):
+ if src.startswith("0x") or src.startswith("0X"):
+ dst = int(src, 16)
+ else:
+ dst = int(src)
+ else:
+ dst = src
+ return dst
-def _to_match_ip(value):
- ip_mask = value.split('/')
- # IP address
- ipv4 = struct.unpack('!I', socket.inet_aton(ip_mask[0]))[0]
- # netmask
- netmask = ofproto_v1_2_parser.UINT32_MAX
- if len(ip_mask) == 2:
- # Check the mask is CIDR or not.
- if ip_mask[1].isdigit():
- netmask &= ofproto_v1_2_parser.UINT32_MAX << 32 - int(ip_mask[1])
- else:
- netmask = struct.unpack('!I', socket.inet_aton(ip_mask[1]))[0]
- return ipv4, netmask
+def _to_match_eth(value):
+ if '/' in value:
+ value = value.split('/')
+ return value[0], value[1]
+ else:
+ return value, None
-def _to_match_ipv6(value):
- ip_mask = value.split('/')
- if len(ip_mask) == 2 and ip_mask[1].isdigit() is False:
- # Both address and netmask are colon-hexadecimal.
- ipv6 = netaddr.IPAddress(ip_mask[0]).words
- netmask = netaddr.IPAddress(ip_mask[1]).words
+def _to_match_ip(value):
+ if '/' in value:
+ ip = netaddr.ip.IPNetwork(value)
+ ip_addr = str(ip.network)
+ ip_mask = str(ip.netmask)
+ return ip_addr, ip_mask
else:
- # For other formats.
- network = netaddr.IPNetwork(value)
- ipv6 = network.ip.words
- netmask = network.netmask.words
- return ipv6, netmask
+ return value, None
def _to_match_metadata(value):
if '/' in value:
- metadata = value.split('/')
- return _str_to_int(metadata[0]), _str_to_int(metadata[1])
+ value = value.split('/')
+ return _str_to_int(value[0]), _str_to_int(value[1])
else:
- return _str_to_int(value), ofproto_v1_2_parser.UINT64_MAX
+ return _str_to_int(value), None
-def _str_to_int(src):
- if isinstance(src, str):
- if src.startswith("0x") or src.startswith("0X"):
- dst = int(src, 16)
- else:
- dst = int(src)
- else:
- dst = src
- return dst
+conv_of10_to_of12_dict = {
+ 'dl_dst': 'eth_dst',
+ 'dl_src': 'eth_src',
+ 'dl_type': 'eth_type',
+ 'dl_vlan': 'vlan_vid',
+ 'nw_src': 'ipv4_src',
+ 'nw_dst': 'ipv4_dst',
+ 'nw_proto': 'ip_proto'
+}
-conv_dict = {
+conv_of12_to_of10_dict = {
'eth_src': 'dl_src',
'eth_dst': 'dl_dst',
'eth_type': 'dl_type',
@@ -125,7 +92,7 @@ conv_dict = {
'tcp_src': 'tp_src',
'tcp_dst': 'tp_dst',
'udp_src': 'tp_src',
- 'udp_dst': 'tp_dst',
+ 'udp_dst': 'tp_dst'
}
""" Test_ofctl """
@@ -220,133 +187,150 @@ class Test_ofctl(unittest.TestCase):
to_match = test.to_match
match_to_str = test.match_to_str
dp = ofproto_protocol.ProtocolDesc(version=test.ver)
+ ofproto = dp.ofproto
+
# str -> match
match = to_match(dp, attrs)
- buf = bytearray()
- match.serialize(buf, 0)
- match = match.__class__.parser(str(buf), 0)
-
- def equal_match(key, value, cls_name, fields):
- for field in fields:
- if cls_name in str(field):
- if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha',
- 'eth_src', 'eth_dst']:
- eth, mask = _to_match_eth(value)
- str_eth = mac.haddr_to_str(eth)
- str_mask = mac.haddr_to_str(mask)
- str_value = mac.haddr_to_str(field.value)
- for i in range(0, 17):
- if str_mask[i] == 'f':
- eq_(str_eth[i], str_value[i])
- else:
- continue
- eq_(mask, field.mask)
- return
- elif key in ['nw_src', 'nw_dst', 'ipv4_src', 'ipv4_dst',
- 'arp_spa', 'arp_tpa']:
- ipv4, mask = _to_match_ip(value)
- if mask == (1 << 32) - 1:
- mask = None
- eq_(ipv4, field.value)
- eq_(mask, field.mask)
- return
- elif key in ['ipv6_src', 'ipv6_dst']:
- ipv6, mask = _to_match_ipv6(value)
- for i in range(0, 8):
- if mask[i] == 65535:
- eq_(ipv6[i], field.value[i])
- else:
- continue
- eq_(list(mask), field.mask)
- return
- elif key == 'ipv6_nd_target':
- ipv6, mask = _to_match_ipv6(value)
- for i in range(0, 8):
- if mask[i] == 65535:
- eq_(ipv6[i], field.value[i])
- else:
- continue
- return
- elif key == 'ipv6_nd_sll' or key == 'ipv6_nd_tll':
- eq_(mac.haddr_to_bin(value), field.value)
- return
- elif key == 'metadata':
- metadata, mask = _to_match_metadata(value)
- metadata = metadata & mask
- if mask == (1 << 64) - 1:
- mask = None
- eq_(metadata, field.value)
- eq_(mask, field.mask)
- return
- else:
- eq_(value, field.value)
- return
- assert False
+
+ def equal_match(key, value, match):
+ field_value = match[key]
+ if key in ['eth_src', 'eth_dst', 'arp_sha', 'arp_tha']:
+ # MAC address
+ eth, mask = _to_match_eth(value)
+ if mask is not None:
+ # with mask
+ for i in range(0, len(mask)):
+ if mask[i] == 'f':
+ eq_(eth[i], field_value[0][i])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(eth, field_value)
+ return
+ elif key in ['ipv4_src', 'ipv4_dst', 'arp_spa', 'arp_tpa']:
+ # IPv4 address
+ ipv4, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ eq_(ipv4, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv4, field_value)
+ return
+ elif key in ['ipv6_src', 'ipv6_dst']:
+ # IPv6 address
+ ipv6, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ eq_(ipv6, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv6, field_value)
+ return
+ elif key == 'vlan_vid':
+ vid = value | ofproto.OFPVID_PRESENT
+ eq_(vid, field_value)
+ return
+ elif key == 'metadata':
+ # Metadata
+ meta, mask = _to_match_metadata(value)
+ if mask is not None:
+ # with mask
+ meta &= mask
+ eq_(meta, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(meta, field_value)
+ return
+ else:
+ eq_(value, field_value)
+ return
for key, value in attrs.items():
- if key.startswith('tp_'):
- cls = test.supported_match[key][attrs["ip_proto"]]
- elif key in test.supported_match:
- cls = test.supported_match[key]
+ if key in conv_of10_to_of12_dict:
+ # For old field name
+ key_new = conv_of10_to_of12_dict[key]
+ elif key == 'tp_src' or key == 'tp_dst':
+ # TCP/UDP port
+ conv = {inet.IPPROTO_TCP: {'tp_src': 'tcp_src',
+ 'tp_dst': 'tcp_dst'},
+ inet.IPPROTO_UDP: {'tp_src': 'udp_src',
+ 'tp_dst': 'udp_dst'}}
+ ip_proto = attrs.get('nw_proto', attrs.get('ip_proto', 0))
+ key_new = conv[ip_proto][key]
else:
- cls = None
- equal_match(key, value, cls.__name__, match.fields)
+ key_new = key
+ equal_match(key_new, value, match)
# match -> str
match_str = match_to_str(match)
def equal_str(key, value, match_str):
+ field_value = match_str[key]
if key in ['dl_src', 'dl_dst', 'arp_sha', 'arp_tha']:
- eth_1, mask_1 = _to_match_eth(value)
- eth_2, mask_2 = _to_match_eth(match_str[key])
- str_eth_1 = mac.haddr_to_str(eth_1)
- str_mask_1 = mac.haddr_to_str(mask_1)
- str_eth_2 = mac.haddr_to_str(eth_2)
- for i in range(0, 17):
- if str_mask_1[i] == 'f':
- eq_(str_eth_1[i], str_eth_2[i])
- else:
- continue
- eq_(mask_1, mask_2)
+ # MAC address
+ eth, mask = _to_match_eth(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ for i in range(0, len(mask)):
+ if mask[i] == 'f':
+ eq_(eth[i], field_value[0][i])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(eth, field_value)
return
elif key in['nw_src', 'nw_dst', 'arp_spa', 'arp_tpa']:
- ipv4_1, ip_mask_1 = _to_match_ip(value)
- ipv4_2, ip_mask_2 = _to_match_ip(match_str[key])
- eq_(ipv4_1, ipv4_2)
- eq_(ip_mask_1, ip_mask_2)
+ # IPv4 address
+ ipv4, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ eq_(ipv4, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv4, field_value)
return
elif key in ['ipv6_src', 'ipv6_dst']:
- ipv6_1, netmask_1 = _to_match_ipv6(value)
- ipv6_2, netmask_2 = _to_match_ipv6(match_str[key])
- for i in range(0, 8):
- if netmask_1[i] == 65535:
- eq_(ipv6_1[i], ipv6_2[i])
- else:
- continue
- eq_(netmask_1, netmask_2)
- return
- elif key == 'ipv6_nd_target':
- ipv6_1, netmask_1 = _to_match_ipv6(value)
- ipv6_2, netmask_2 = _to_match_ipv6(match_str[key])
- for i in range(0, 8):
- if netmask_1[i] == 65535:
- eq_(ipv6_1[i], ipv6_2[i])
- else:
- continue
+ # IPv6 address
+ ipv6, mask = _to_match_ip(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ eq_(ipv6, field_value[0])
+ eq_(mask, field_value[1])
+ else:
+ # without mask
+ eq_(ipv6, field_value)
return
elif key == 'metadata':
- metadata_1, mask_1 = _to_match_metadata(value)
- metadata_1 = metadata_1 & mask_1
- metadata_2, mask_2 = _to_match_metadata(match_str[key])
- eq_(metadata_1, metadata_2)
- eq_(mask_1, mask_2)
+ # Metadata
+ meta, mask = _to_match_metadata(value)
+ if mask is not None:
+ # with mask
+ field_value = field_value.split('/')
+ meta &= mask
+ eq_(str(meta), field_value[0])
+ eq_(str(mask), field_value[1])
+ else:
+ # without mask
+ eq_(str(meta), field_value)
+ return
+ else:
+ eq_(value, field_value)
return
- eq_(value, match_str[key])
for key, value in attrs.items():
- if key in conv_dict:
- key = conv_dict[key]
- equal_str(key, value, match_str)
+ if key in conv_of12_to_of10_dict:
+ key_old = conv_of12_to_of10_dict[key]
+ else:
+ key_old = key
+ equal_str(key_old, value, match_str)
""" Test_data for of_v1_2 """