diff options
Diffstat (limited to 'tests/unit/services/protocols')
-rw-r--r-- | tests/unit/services/protocols/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/core_managers/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/core_managers/test_table_manager.py | 937 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/test_bgpspeaker.py | 1088 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/test_peer.py | 375 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/utils/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/utils/test_bgp.py | 211 | ||||
-rw-r--r-- | tests/unit/services/protocols/bgp/utils/test_validation.py | 215 |
9 files changed, 2826 insertions, 0 deletions
diff --git a/tests/unit/services/protocols/__init__.py b/tests/unit/services/protocols/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unit/services/protocols/__init__.py diff --git a/tests/unit/services/protocols/bgp/__init__.py b/tests/unit/services/protocols/bgp/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unit/services/protocols/bgp/__init__.py diff --git a/tests/unit/services/protocols/bgp/core_managers/__init__.py b/tests/unit/services/protocols/bgp/core_managers/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unit/services/protocols/bgp/core_managers/__init__.py diff --git a/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py b/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py new file mode 100644 index 00000000..c9c9f55e --- /dev/null +++ b/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py @@ -0,0 +1,937 @@ +# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import OrderedDict +import unittest +import logging +try: + import mock # Python 2 +except ImportError: + from unittest import mock # Python 3 + +from nose.tools import ok_, eq_, raises + +from ryu.lib.packet.bgp import BGPPathAttributeOrigin +from ryu.lib.packet.bgp import BGPPathAttributeAsPath +from ryu.lib.packet.bgp import BGP_ATTR_ORIGIN_IGP +from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGIN +from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS_PATH +from ryu.lib.packet.bgp import BGP_ATTR_TYPE_EXTENDED_COMMUNITIES +from ryu.lib.packet.bgp import IPAddrPrefix +from ryu.lib.packet.bgp import IP6AddrPrefix +from ryu.lib.packet.bgp import EvpnArbitraryEsi +from ryu.lib.packet.bgp import EvpnLACPEsi +from ryu.lib.packet.bgp import EvpnEthernetAutoDiscoveryNLRI +from ryu.lib.packet.bgp import EvpnMacIPAdvertisementNLRI +from ryu.lib.packet.bgp import EvpnInclusiveMulticastEthernetTagNLRI +from ryu.lib.packet.bgp import FlowSpecIPv4NLRI +from ryu.lib.packet.bgp import BGPPathAttributeExtendedCommunities +from ryu.services.protocols.bgp.bgpspeaker import EVPN_MAX_ET +from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_LACP +from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_FAMILY_IPV4 +from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_FAMILY_VPNV4 +from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_TA_SAMPLE +from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_TA_TERMINAL +from ryu.services.protocols.bgp.core import BgpCoreError +from ryu.services.protocols.bgp.core_managers import table_manager +from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4 +from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV6 +from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_L2_EVPN +from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4_FLOWSPEC +from ryu.services.protocols.bgp.utils.bgp import create_v4flowspec_actions + + +LOG = logging.getLogger(__name__) + + +class Test_TableCoreManager(unittest.TestCase): + """ + Test case for bgp.core_managers.table_manager.TableCoreManager + """ + + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + def _test_update_vrf_table(self, prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + is_withdraw=False, **kwargs): + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + vrf_table_mock = mock.MagicMock() + tbl_mng._tables = {(route_dist, route_family): vrf_table_mock} + + # Test + tbl_mng.update_vrf_table( + route_dist=route_dist, + prefix=prefix_str, + next_hop=next_hop, + route_family=route_family, + route_type=route_type, + is_withdraw=is_withdraw, + **kwargs) + + # Check + call_args_list = vrf_table_mock.insert_vrf_path.call_args_list + ok_(len(call_args_list) == 1) # insert_vrf_path should be called once + args, kwargs = call_args_list[0] + ok_(len(args) == 0) # no positional argument + eq_(str(prefix_inst), str(kwargs['nlri'])) + eq_(is_withdraw, kwargs['is_withdraw']) + if is_withdraw: + eq_(None, kwargs['next_hop']) + eq_(False, kwargs['gen_lbl']) + else: + eq_(next_hop, kwargs['next_hop']) + eq_(True, kwargs['gen_lbl']) + + def test_update_vrf_table_ipv4(self): + # Prepare test data + route_dist = '65000:100' + ip_network = '192.168.0.0' + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network) + next_hop = '10.0.0.1' + route_family = VRF_RF_IPV4 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + def test_update_vrf_table_ipv6(self): + # Prepare test data + route_dist = '65000:100' + ip_network = 'fe80::' + ip_prefix_len = 64 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IP6AddrPrefix(ip_prefix_len, ip_network) + next_hop = 'fe80::0011:aabb:ccdd:eeff' + route_family = VRF_RF_IPV6 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + def test_update_vrf_table_l2_evpn_with_esi_int(self): + # Prepare test data + route_dist = '65000:100' + prefix_str = None # should be ignored + kwargs = { + 'ethernet_tag_id': 100, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'ip_addr': '192.168.0.1', + 'mpls_labels': [], # not be used + } + esi = EvpnArbitraryEsi(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00') + prefix_inst = EvpnMacIPAdvertisementNLRI( + route_dist=route_dist, + esi=esi, + **kwargs) + next_hop = '10.0.0.1' + route_family = VRF_RF_L2_EVPN + route_type = EvpnMacIPAdvertisementNLRI.ROUTE_TYPE_NAME + kwargs['esi'] = 0 + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + def test_update_vrf_table_l2_evpn_with_esi_dict(self): + # Prepare test data + route_dist = '65000:100' + prefix_str = None # should be ignored + kwargs = { + 'ethernet_tag_id': EVPN_MAX_ET, + } + esi = EvpnLACPEsi(mac_addr='aa:bb:cc:dd:ee:ff', port_key=100) + prefix_inst = EvpnEthernetAutoDiscoveryNLRI( + route_dist=route_dist, + esi=esi, + **kwargs) + next_hop = '0.0.0.0' + route_family = VRF_RF_L2_EVPN + route_type = EvpnEthernetAutoDiscoveryNLRI.ROUTE_TYPE_NAME + kwargs['esi'] = { + 'type': ESI_TYPE_LACP, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'port_key': 100, + } + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + def test_update_vrf_table_l2_evpn_without_esi(self): + # Prepare test data + route_dist = '65000:100' + prefix_str = None # should be ignored + kwargs = { + 'ethernet_tag_id': 100, + 'ip_addr': '192.168.0.1', + } + prefix_inst = EvpnInclusiveMulticastEthernetTagNLRI( + route_dist=route_dist, **kwargs) + next_hop = '10.0.0.1' + route_family = VRF_RF_L2_EVPN + route_type = EvpnInclusiveMulticastEthernetTagNLRI.ROUTE_TYPE_NAME + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + def test_update_vrf_table_l2_evpn_with_vni(self): + # Prepare test data + route_dist = '65000:100' + prefix_str = None # should be ignored + kwargs = { + 'ethernet_tag_id': 100, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'ip_addr': '192.168.0.1', + 'vni': 500, + } + esi = EvpnArbitraryEsi(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00') + prefix_inst = EvpnMacIPAdvertisementNLRI( + route_dist=route_dist, + esi=esi, + **kwargs) + next_hop = '10.0.0.1' + route_family = VRF_RF_L2_EVPN + route_type = EvpnMacIPAdvertisementNLRI.ROUTE_TYPE_NAME + tunnel_type = 'vxlan' + kwargs['esi'] = 0 + + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + vrf_table_mock = mock.MagicMock() + tbl_mng._tables = {(route_dist, route_family): vrf_table_mock} + + # Test + tbl_mng.update_vrf_table( + route_dist=route_dist, + prefix=prefix_str, + next_hop=next_hop, + route_family=route_family, + route_type=route_type, + tunnel_type=tunnel_type, + **kwargs) + + # Check + call_args_list = vrf_table_mock.insert_vrf_path.call_args_list + ok_(len(call_args_list) == 1) # insert_vrf_path should be called once + args, kwargs = call_args_list[0] + ok_(len(args) == 0) # no positional argument + eq_(str(prefix_inst), str(kwargs['nlri'])) + eq_(next_hop, kwargs['next_hop']) + eq_(False, kwargs['gen_lbl']) # should not generate MPLS labels + eq_(tunnel_type, kwargs['tunnel_type']) + + def test_update_vrf_table_ipv4_withdraw(self): + # Prepare test data + route_dist = '65000:100' + ip_network = '192.168.0.0' + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network) + next_hop = '10.0.0.1' + route_family = VRF_RF_IPV4 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + is_withdraw=True, **kwargs) + + @raises(BgpCoreError) + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + def test_update_vrf_table_no_vrf(self): + # Prepare test data + route_dist = '65000:100' + ip_network = '192.168.0.0' + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + next_hop = '10.0.0.1' + route_family = VRF_RF_IPV4 + route_type = None # should be ignored + kwargs = {} # should be ignored + + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + tbl_mng._tables = {} # no table + + # Test + tbl_mng.update_vrf_table( + route_dist=route_dist, + prefix=prefix_str, + next_hop=next_hop, + route_family=route_family, + route_type=route_type, + **kwargs) + + @raises(BgpCoreError) + def test_update_vrf_table_invalid_next_hop(self): + # Prepare test data + route_dist = '65000:100' + ip_network = '192.168.0.0' + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network) + next_hop = 'xxx.xxx.xxx.xxx' # invalid + route_family = VRF_RF_IPV4 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + @raises(BgpCoreError) + def test_update_vrf_table_invalid_ipv4_prefix(self): + # Prepare test data + route_dist = '65000:100' + ip_network = 'xxx.xxx.xxx.xxx' # invalid + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network) + next_hop = '10.0.0.1' + route_family = VRF_RF_IPV4 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + @raises(BgpCoreError) + def test_update_vrf_table_invalid_ipv6_prefix(self): + # Prepare test data + route_dist = '65000:100' + ip_network = 'xxxx::' # invalid + ip_prefix_len = 64 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IP6AddrPrefix(ip_prefix_len, ip_network) + next_hop = 'fe80::0011:aabb:ccdd:eeff' + route_family = VRF_RF_IPV6 + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + @raises(BgpCoreError) + def test_update_vrf_table_invalid_route_family(self): + # Prepare test data + route_dist = '65000:100' + ip_network = '192.168.0.0' + ip_prefix_len = 24 + prefix_str = '%s/%d' % (ip_network, ip_prefix_len) + prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network) + next_hop = '10.0.0.1' + route_family = 'foobar' # invalid + route_type = None # should be ignored + kwargs = {} # should be ignored + + self._test_update_vrf_table(prefix_inst, route_dist, prefix_str, + next_hop, route_family, route_type, + **kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.learn_path') + def _test_update_global_table(self, learn_path_mock, prefix, next_hop, + is_withdraw, expected_next_hop): + # Prepare test data + origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP) + aspath = BGPPathAttributeAsPath([[]]) + pathattrs = OrderedDict() + pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin + pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath + pathattrs = str(pathattrs) + + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + + # Test + tbl_mng.update_global_table( + prefix=prefix, + next_hop=next_hop, + is_withdraw=is_withdraw, + ) + + # Check + call_args_list = learn_path_mock.call_args_list + ok_(len(call_args_list) == 1) # learn_path should be called once + args, kwargs = call_args_list[0] + ok_(len(kwargs) == 0) # no keyword argument + output_path = args[0] + eq_(None, output_path.source) + eq_(prefix, output_path.nlri.prefix) + eq_(pathattrs, str(output_path.pathattr_map)) + eq_(expected_next_hop, output_path.nexthop) + eq_(is_withdraw, output_path.is_withdraw) + + def test_update_global_table_ipv4(self): + self._test_update_global_table( + prefix='192.168.0.0/24', + next_hop='10.0.0.1', + is_withdraw=False, + expected_next_hop='10.0.0.1', + ) + + def test_update_global_table_ipv4_withdraw(self): + self._test_update_global_table( + prefix='192.168.0.0/24', + next_hop='10.0.0.1', + is_withdraw=True, + expected_next_hop='10.0.0.1', + ) + + def test_update_global_table_ipv4_no_next_hop(self): + self._test_update_global_table( + prefix='192.168.0.0/24', + next_hop=None, + is_withdraw=True, + expected_next_hop='0.0.0.0', + ) + + def test_update_global_table_ipv6(self): + self._test_update_global_table( + prefix='fe80::/64', + next_hop='fe80::0011:aabb:ccdd:eeff', + is_withdraw=False, + expected_next_hop='fe80::0011:aabb:ccdd:eeff', + ) + + def test_update_global_table_ipv6_withdraw(self): + self._test_update_global_table( + prefix='fe80::/64', + next_hop='fe80::0011:aabb:ccdd:eeff', + is_withdraw=True, + expected_next_hop='fe80::0011:aabb:ccdd:eeff', + ) + + def test_update_global_table_ipv6_no_next_hop(self): + self._test_update_global_table( + prefix='fe80::/64', + next_hop=None, + is_withdraw=True, + expected_next_hop='::', + ) + + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + def _test_update_flowspec_vrf_table(self, flowspec_family, route_family, + route_dist, rules, prefix, + is_withdraw, actions=None): + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + vrf_table_mock = mock.MagicMock() + tbl_mng._tables = {(route_dist, route_family): vrf_table_mock} + + # Test + tbl_mng.update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules, + actions=actions, + is_withdraw=is_withdraw, + ) + + # Check + call_args_list = vrf_table_mock.insert_vrffs_path.call_args_list + ok_(len( + call_args_list) == 1) # insert_vrffs_path should be called once + args, kwargs = call_args_list[0] + ok_(len(args) == 0) # no positional argument + eq_(prefix, kwargs['nlri'].prefix) + eq_(is_withdraw, kwargs['is_withdraw']) + + def test_update_flowspec_vrf_table_vpnv4(self): + flowspec_family = 'vpnv4fs' + route_family = 'ipv4fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_vrf_table_vpnv4_without_actions(self): + flowspec_family = 'vpnv4fs' + route_family = 'ipv4fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_vpnv4_invalid_actions(self): + flowspec_family = 'vpnv4fs' + route_family = 'ipv4fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + actions = { + 'invalid_actions': { + 'invalid_param': 10, + }, + } + prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_vpnv4_invalid_flowspec_family(self): + flowspec_family = 'invalid' + route_family = 'ipv4fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_vpnv4_invalid_route_family(self): + flowspec_family = 'vpnv4fs' + route_family = 'invalid' + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__', + mock.MagicMock(return_value=None)) + @mock.patch( + 'ryu.services.protocols.bgp.core_managers.TableCoreManager.learn_path') + def _test_update_flowspec_global_table(self, learn_path_mock, + flowspec_family, rules, prefix, + is_withdraw, actions=None): + # Instantiate TableCoreManager + tbl_mng = table_manager.TableCoreManager(None, None) + + # Test + tbl_mng.update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + actions=actions, + is_withdraw=is_withdraw, + ) + + # Check + call_args_list = learn_path_mock.call_args_list + ok_(len(call_args_list) == 1) # learn_path should be called once + args, kwargs = call_args_list[0] + ok_(len(kwargs) == 0) # no keyword argument + output_path = args[0] + eq_(None, output_path.source) + eq_(prefix, output_path.nlri.prefix) + eq_(None, output_path.nexthop) + eq_(is_withdraw, output_path.is_withdraw) + + def test_update_flowspec_global_table_ipv4(self): + flowspec_family = 'ipv4fs' + rules = { + 'dst_prefix': '10.60.1.0/24', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_global_table_ipv4_without_actions(self): + flowspec_family = 'ipv4fs' + rules = { + 'dst_prefix': '10.60.1.0/24', + } + prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_global_table_ipv4_invalid_actions(self): + flowspec_family = 'ipv4fs' + rules = { + 'dst_prefix': '10.60.1.0/24', + } + actions = { + 'invalid_actions': { + 'invalid_param': 10, + }, + } + prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + @raises(BgpCoreError) + def test_update_flowspec_global_table_ipv4_invalid_flowspec_family(self): + flowspec_family = 'invalid' + rules = { + 'dst_prefix': '10.60.1.0/24', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_global_table_ipv6(self): + flowspec_family = 'ipv6fs' + rules = { + 'dst_prefix': '2001::3/128/32', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv6fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_global_table_ipv6_without_actions(self): + flowspec_family = 'ipv6fs' + rules = { + 'dst_prefix': '2001::3/128/32', + } + prefix = 'ipv6fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_global_table_ipv6_invalid_actions(self): + flowspec_family = 'ipv6fs' + rules = { + 'dst_prefix': '2001::3/128/32', + } + actions = { + 'invalid_actions': { + 'invalid_param': 10, + }, + } + prefix = 'ipv4fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + @raises(BgpCoreError) + def test_update_flowspec_global_table_ipv6_invalid_flowspec_family(self): + flowspec_family = 'invalid' + rules = { + 'dst_prefix': '2001::3/128/32', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv4fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_global_table( + flowspec_family=flowspec_family, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_vrf_table_vpnv6(self): + flowspec_family = 'vpnv6fs' + route_family = 'ipv6fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'ipv6fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_vrf_table_vpnv6_without_actions(self): + flowspec_family = 'vpnv6fs' + route_family = 'ipv6fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + prefix = 'ipv6fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_vpnv6_invalid_actions(self): + flowspec_family = 'vpnv6fs' + route_family = 'ipv6fs' + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + actions = { + 'invalid_actions': { + 'invalid_param': 10, + }, + } + prefix = 'ipv6fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_vpnv6_invalid_route_family(self): + flowspec_family = 'vpnv6fs' + route_family = 'invalid' + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + prefix = 'ipv4fs(dst_prefix:2001::3/128/32)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + def test_update_flowspec_vrf_table_l2vpn(self): + flowspec_family = 'l2vpnfs' + route_family = 'l2vpnfs' + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + } + prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + def test_update_flowspec_vrf_table_l2vpn_without_actions(self): + flowspec_family = 'l2vpnfs' + route_family = 'l2vpnfs' + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_l2vpn_invalid_actions(self): + flowspec_family = 'l2vpnfs' + route_family = 'l2vpnfs' + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + actions = { + 'invalid_actions': { + 'invalid_param': 10, + }, + } + prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + actions=actions, + ) + + @raises(BgpCoreError) + def test_update_flowspec_vrf_table_l2vpn_invalid_route_family(self): + flowspec_family = 'l2vpnfs' + route_family = 'invalid' + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)' + + self._test_update_flowspec_vrf_table( + flowspec_family=flowspec_family, + route_family=route_family, + route_dist=route_dist, + rules=rules, + prefix=prefix, + is_withdraw=False, + ) diff --git a/tests/unit/services/protocols/bgp/test_bgpspeaker.py b/tests/unit/services/protocols/bgp/test_bgpspeaker.py new file mode 100644 index 00000000..81a8bb3b --- /dev/null +++ b/tests/unit/services/protocols/bgp/test_bgpspeaker.py @@ -0,0 +1,1088 @@ +# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import logging +try: + import mock # Python 2 +except ImportError: + from unittest import mock # Python 3 + +from nose.tools import raises + +from ryu.services.protocols.bgp import bgpspeaker +from ryu.services.protocols.bgp.bgpspeaker import EVPN_MAX_ET +from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_LACP +from ryu.services.protocols.bgp.api.prefix import ESI_TYPE_L2_BRIDGE +from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_MAC_BASED +from ryu.services.protocols.bgp.api.prefix import REDUNDANCY_MODE_ALL_ACTIVE +from ryu.services.protocols.bgp.api.prefix import REDUNDANCY_MODE_SINGLE_ACTIVE + + +LOG = logging.getLogger(__name__) + + +class Test_BGPSpeaker(unittest.TestCase): + """ + Test case for bgp.bgpspeaker.BGPSpeaker + """ + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_eth_auto_discovery(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY + route_dist = '65000:100' + esi = { + 'type': ESI_TYPE_LACP, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'port_key': 100, + } + ethernet_tag_id = EVPN_MAX_ET + redundancy_mode = REDUNDANCY_MODE_ALL_ACTIVE + next_hop = '0.0.0.0' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'redundancy_mode': redundancy_mode, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + redundancy_mode=redundancy_mode, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_eth_auto_discovery_vni(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY + route_dist = '65000:100' + esi = { + 'type': ESI_TYPE_L2_BRIDGE, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'priority': 100, + } + ethernet_tag_id = EVPN_MAX_ET + redundancy_mode = REDUNDANCY_MODE_SINGLE_ACTIVE + vni = 500 + next_hop = '0.0.0.0' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'redundancy_mode': redundancy_mode, + 'vni': vni, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + redundancy_mode=redundancy_mode, + vni=vni + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_mac_ip_adv(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + next_hop = '10.0.0.1' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'mac_addr': mac_addr, + 'ip_addr': ip_addr, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + next_hop=next_hop, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_mac_ip_adv_vni(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + vni = 500 + next_hop = '10.0.0.1' + tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'mac_addr': mac_addr, + 'ip_addr': ip_addr, + 'vni': vni, + 'next_hop': next_hop, + 'tunnel_type': tunnel_type, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + vni=vni, + next_hop=next_hop, + tunnel_type=tunnel_type, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_multicast_etag(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + next_hop = '10.0.0.1' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + # 'esi': esi, # should be ignored + 'ethernet_tag_id': ethernet_tag_id, + # 'mac_addr': mac_addr, # should be ignored + 'ip_addr': ip_addr, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + next_hop=next_hop, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_multicast_etag_no_next_hop(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + next_hop = '0.0.0.0' # the default value + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + # 'esi': esi, # should be ignored + 'ethernet_tag_id': ethernet_tag_id, + # 'mac_addr': mac_addr, # should be ignored + 'ip_addr': ip_addr, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + # next_hop=next_hop, # omitted + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_eth_segment(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_ETH_SEGMENT + route_dist = '65000:100' + esi = { + 'type': ESI_TYPE_MAC_BASED, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'local_disc': 100, + } + ip_addr = '192.168.0.1' + next_hop = '0.0.0.0' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ip_addr': ip_addr, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ip_addr=ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_ip_prefix_route(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + ip_prefix = '192.168.0.0/24' + gw_ip_addr = '172.16.0.1' + next_hop = '0.0.0.0' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'ip_prefix': ip_prefix, + 'gw_ip_addr': gw_ip_addr, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + ip_prefix=ip_prefix, + gw_ip_addr=gw_ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_ip_prefix_route_vni(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + ip_prefix = '192.168.0.0/24' + gw_ip_addr = '172.16.0.1' + vni = 500 + tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN + next_hop = '0.0.0.0' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + 'ip_prefix': ip_prefix, + 'gw_ip_addr': gw_ip_addr, + 'tunnel_type': tunnel_type, + 'vni': vni, + 'next_hop': next_hop, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + ip_prefix=ip_prefix, + gw_ip_addr=gw_ip_addr, + tunnel_type=tunnel_type, + vni=vni, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @raises(ValueError) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_invalid_route_type(self, mock_call): + # Prepare test data + route_type = 'foobar' # Invalid EVPN route type + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + next_hop = '10.0.0.1' + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + next_hop=next_hop, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', 'Invalid arguments detected') + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_auto_discovery(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY + route_dist = '65000:100' + esi = { + 'type': ESI_TYPE_LACP, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'port_key': 100, + } + ethernet_tag_id = EVPN_MAX_ET + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ethernet_tag_id': ethernet_tag_id, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_mac_ip_adv(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE + route_dist = '65000:100' + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'ethernet_tag_id': ethernet_tag_id, + 'mac_addr': mac_addr, + 'ip_addr': ip_addr, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_multicast_etag(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + # 'esi': esi, # should be ignored + 'ethernet_tag_id': ethernet_tag_id, + # 'mac_addr': mac_addr, # should be ignored + 'ip_addr': ip_addr, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', **expected_kwargs) + + @raises(ValueError) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_invalid_route_type(self, mock_call): + # Prepare test data + route_type = 'foobar' # Invalid EVPN route type + route_dist = '65000:100' + esi = 0 # denotes single-homed + ethernet_tag_id = 200 + mac_addr = 'aa:bb:cc:dd:ee:ff' + ip_addr = '192.168.0.1' + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ethernet_tag_id=ethernet_tag_id, + mac_addr=mac_addr, + ip_addr=ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', 'Invalid arguments detected') + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_eth_segment(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_ETH_SEGMENT + route_dist = '65000:100' + esi = { + 'esi_type': ESI_TYPE_MAC_BASED, + 'mac_addr': 'aa:bb:cc:dd:ee:ff', + 'local_disc': 100, + } + ip_addr = '192.168.0.1' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'esi': esi, + 'ip_addr': ip_addr, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + esi=esi, + ip_addr=ip_addr, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_del_ip_prefix_route(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE + route_dist = '65000:100' + ethernet_tag_id = 200 + ip_prefix = '192.168.0.0/24' + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'ethernet_tag_id': ethernet_tag_id, + 'ip_prefix': ip_prefix, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_del( + route_type=route_type, + route_dist=route_dist, + ethernet_tag_id=ethernet_tag_id, + ip_prefix=ip_prefix, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.delete_local', **expected_kwargs) + + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_pmsi_no_tunnel_info(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + ethernet_tag_id = 200 + next_hop = '0.0.0.0' + ip_addr = '192.168.0.1' + pmsi_tunnel_type = bgpspeaker.PMSI_TYPE_NO_TUNNEL_INFO + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'ethernet_tag_id': ethernet_tag_id, + 'next_hop': next_hop, + 'ip_addr': ip_addr, + 'pmsi_tunnel_type': pmsi_tunnel_type, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + ethernet_tag_id=ethernet_tag_id, + ip_addr=ip_addr, + pmsi_tunnel_type=pmsi_tunnel_type, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_pmsi_ingress_rep(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + ethernet_tag_id = 200 + next_hop = '0.0.0.0' + ip_addr = '192.168.0.1' + pmsi_tunnel_type = bgpspeaker.PMSI_TYPE_INGRESS_REP + expected_kwargs = { + 'route_type': route_type, + 'route_dist': route_dist, + 'ethernet_tag_id': ethernet_tag_id, + 'next_hop': next_hop, + 'ip_addr': ip_addr, + 'pmsi_tunnel_type': pmsi_tunnel_type, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + ethernet_tag_id=ethernet_tag_id, + ip_addr=ip_addr, + pmsi_tunnel_type=pmsi_tunnel_type, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', **expected_kwargs) + + @raises(ValueError) + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_evpn_prefix_add_invalid_pmsi_tunnel_type(self, mock_call): + # Prepare test data + route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE + route_dist = '65000:100' + ethernet_tag_id = 200 + next_hop = '0.0.0.0' + ip_addr = '192.168.0.1' + pmsi_tunnel_type = 1 + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.evpn_prefix_add( + route_type=route_type, + route_dist=route_dist, + ethernet_tag_id=ethernet_tag_id, + ip_addr=ip_addr, + pmsi_tunnel_type=pmsi_tunnel_type, + ) + + # Check + mock_call.assert_called_with( + 'evpn_prefix.add_local', 'Invalid arguments detected') + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_ipv4(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4 + rules = { + 'dst_prefix': '10.60.1.0/24', + } + + actions = { + 'traffic_marking': { + 'dscp': 24, + } + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + 'actions': actions, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + rules=rules, + actions=actions) + + # Check + mock_call.assert_called_with( + 'flowspec.add', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_ipv4_without_actions(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4 + rules = { + 'dst_prefix': '10.60.1.0/24', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + 'actions': {}, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.add', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_del_ipv4(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4 + rules = { + 'dst_prefix': '10.60.1.0/24', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_del( + flowspec_family=flowspec_family, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.del', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_vpnv4(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV4 + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + + actions = { + 'traffic_marking': { + 'dscp': 24, + } + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + 'actions': actions, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules, + actions=actions) + + # Check + mock_call.assert_called_with( + 'flowspec.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_del_vpnv4(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV4 + route_dist = '65001:100' + rules = { + 'dst_prefix': '10.70.1.0/24', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_del( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.del_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_ipv6(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6 + rules = { + 'dst_prefix': '2001::3/128/32', + } + + actions = { + 'traffic_marking': { + 'dscp': 24, + } + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + 'actions': actions, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + rules=rules, + actions=actions) + + # Check + mock_call.assert_called_with( + 'flowspec.add', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_ipv6_without_actions(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6 + rules = { + 'dst_prefix': '2001::3/128/32', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + 'actions': {}, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.add', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_del_ipv6(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6 + rules = { + 'dst_prefix': '2001::3/128/32', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'rules': rules, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_del( + flowspec_family=flowspec_family, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.del', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_vpnv6(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV6 + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + + actions = { + 'traffic_marking': { + 'dscp': 24, + } + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + 'actions': actions, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules, + actions=actions) + + # Check + mock_call.assert_called_with( + 'flowspec.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_del_vpnv6(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV6 + route_dist = '65001:100' + rules = { + 'dst_prefix': '2001::3/128/32', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_del( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.del_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_add_l2vpn(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_L2VPN + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + + actions = { + 'traffic_marking': { + 'dscp': 24, + } + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + 'actions': actions, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_add( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules, + actions=actions) + + # Check + mock_call.assert_called_with( + 'flowspec.add_local', **expected_kwargs) + + @mock.patch( + 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__', + mock.MagicMock(return_value=None)) + @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call') + def test_flowspec_prefix_del_l2vpn(self, mock_call): + # Prepare test data + flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_L2VPN + route_dist = '65001:100' + rules = { + 'dst_mac': '12:34:56:78:9a:bc', + } + + expected_kwargs = { + 'flowspec_family': flowspec_family, + 'route_dist': route_dist, + 'rules': rules, + } + + # Test + speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1') + speaker.flowspec_prefix_del( + flowspec_family=flowspec_family, + route_dist=route_dist, + rules=rules) + + # Check + mock_call.assert_called_with( + 'flowspec.del_local', **expected_kwargs) diff --git a/tests/unit/services/protocols/bgp/test_peer.py b/tests/unit/services/protocols/bgp/test_peer.py new file mode 100644 index 00000000..1597efdb --- /dev/null +++ b/tests/unit/services/protocols/bgp/test_peer.py @@ -0,0 +1,375 @@ +# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import logging +try: + import mock # Python 2 +except ImportError: + from unittest import mock # Python 3 + +from nose.tools import eq_ + +from ryu.lib.packet import bgp +from ryu.services.protocols.bgp import peer + + +LOG = logging.getLogger(__name__) + + +class Test_Peer(unittest.TestCase): + """ + Test case for peer.Peer + """ + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def _test_construct_as_path_attr( + self, input_as_path, input_as4_path, expected_as_path): + # Prepare input data + input_as_path_attr = bgp.BGPPathAttributeAsPath(input_as_path) + input_as4_path_attr = bgp.BGPPathAttributeAs4Path(input_as4_path) + _peer = peer.Peer(None, None, None, None, None) + + # TEST + output_as_path_attr = _peer._construct_as_path_attr( + input_as_path_attr, input_as4_path_attr) + + eq_(bgp.BGP_ATTR_TYPE_AS_PATH, output_as_path_attr.type) + eq_(expected_as_path, output_as_path_attr.path_seg_list) + + def test_construct_as_path_attr_sequence_only(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [[400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 400000, 300000, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_aggregated_as_path_1(self): + # Test Data + # Input: + input_as_path = [[65000, 4000], {10, 20, 30}, [23456, 23456, 40001]] + input_as4_path = [[400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000], {10, 20, 30}, [400000, 300000, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_aggregated_as_path_2(self): + # Test Data + # Input: + input_as_path = [[65000, 4000], {10, 20, 30}, [23456, 23456, 40001]] + input_as4_path = [[3000, 400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 3000, 400000, 300000, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_aggregated_path_3(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [[400000, 300000, 40001], {10, 20, 30}] + # Expected: + expected_as_path = [[65000, 400000, 300000, 40001], {10, 20, 30}] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_aggregated_as4_path(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [{10, 20, 30}, [400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000], {10, 20, 30}, [400000, 300000, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_too_short_as_path(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [[100000, 65000, 4000, 400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 23456, 23456, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_too_short_as4_path(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [[300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 23456, 300000, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + def test_construct_as_path_attr_empty_as4_path(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + input_as4_path = [[]] + # Expected: + expected_as_path = [[65000, 4000, 23456, 23456, 40001]] + + self._test_construct_as_path_attr( + input_as_path, input_as4_path, expected_as_path) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def test_construct_as_path_attr_as4_path_None(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 23456, 23456, 40001]] + # input_as4_path = None + # Expected: + expected_as_path = [[65000, 4000, 23456, 23456, 40001]] + + # Prepare input data + input_as_path_attr = bgp.BGPPathAttributeAsPath(input_as_path) + input_as4_path_attr = None + _peer = peer.Peer(None, None, None, None, None) + + # TEST + output_as_path_attr = _peer._construct_as_path_attr( + input_as_path_attr, input_as4_path_attr) + + eq_(bgp.BGP_ATTR_TYPE_AS_PATH, output_as_path_attr.type) + eq_(expected_as_path, output_as_path_attr.path_seg_list) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def _test_trans_as_path( + self, input_as_path, expected_as_path, expected_as4_path): + # Prepare input data + _peer = peer.Peer(None, None, None, None, None) + + # TEST + output_as_path, output_as4_path = _peer._trans_as_path(input_as_path) + + eq_(expected_as_path, output_as_path) + eq_(expected_as4_path, output_as4_path) + + @mock.patch.object( + peer.Peer, 'is_four_octet_as_number_cap_valid', + mock.MagicMock(return_value=True)) + def test_trans_as_path_as4_path_is_supported(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 400000, 300000, 40001]] + expected_as4_path = None + + self._test_trans_as_path( + input_as_path, expected_as_path, expected_as4_path) + + @mock.patch.object( + peer.Peer, 'is_four_octet_as_number_cap_valid', + mock.MagicMock(return_value=False)) + def test_trans_as_path_sequence_only(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 400000, 300000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 23456, 23456, 40001]] + expected_as4_path = [[65000, 4000, 400000, 300000, 40001]] + + self._test_trans_as_path( + input_as_path, expected_as_path, expected_as4_path) + + @mock.patch.object( + peer.Peer, 'is_four_octet_as_number_cap_valid', + mock.MagicMock(return_value=False)) + def test_trans_as_path_no_trans(self): + # Test Data + # Input: + input_as_path = [[65000, 4000, 40000, 30000, 40001]] + # Expected: + expected_as_path = [[65000, 4000, 40000, 30000, 40001]] + expected_as4_path = None + + self._test_trans_as_path( + input_as_path, expected_as_path, expected_as4_path) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def _test_extract_and_reconstruct_as_path( + self, path_attributes, ex_as_path_value, + ex_aggregator_as_number, ex_aggregator_addr): + # Prepare test data + update_msg = bgp.BGPUpdate(path_attributes=path_attributes) + _peer = peer.Peer(None, None, None, None, None) + + # Test + _peer._extract_and_reconstruct_as_path(update_msg) + + umsg_pattrs = update_msg.pathattr_map + as_path_attr = umsg_pattrs.get( + bgp.BGP_ATTR_TYPE_AS_PATH, None) + as4_path_attr = umsg_pattrs.get( + bgp.BGP_ATTR_TYPE_AS4_PATH, None) + aggregator_attr = umsg_pattrs.get( + bgp.BGP_ATTR_TYPE_AGGREGATOR, None) + as4_aggregator_attr = umsg_pattrs.get( + bgp.BGP_ATTR_TYPE_AS4_AGGREGATOR, None) + + eq_(ex_as_path_value, as_path_attr.value) + eq_(None, as4_path_attr) + eq_(ex_aggregator_as_number, aggregator_attr.as_number) + eq_(ex_aggregator_addr, aggregator_attr.addr) + eq_(None, as4_aggregator_attr) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def test_extract_and_reconstruct_as_path_with_no_as4_attr(self): + # Input values + in_as_path_value = [[1000, 2000, 3000]] + # in_as4_path_value + in_aggregator_as_number = 4000 + in_aggregator_addr = '10.0.0.1' + # in_as4_aggregator_as_number + # in_as4_aggregator_addr + + # Expected values + ex_as_path_value = [[1000, 2000, 3000]] + ex_aggregator_as_number = 4000 + ex_aggregator_addr = '10.0.0.1' + + # Prepare test data + path_attributes = [ + bgp.BGPPathAttributeAsPath( + value=in_as_path_value), + bgp.BGPPathAttributeAggregator( + as_number=in_aggregator_as_number, addr=in_aggregator_addr), + ] + + # Test + self._test_extract_and_reconstruct_as_path( + path_attributes, ex_as_path_value, + ex_aggregator_as_number, ex_aggregator_addr) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def test_extract_and_reconstruct_as_path_with_as4_attr(self): + # Input values + in_as_path_value = [[1000, 23456, 3000]] + in_as4_path_value = [[2000, 3000]] + in_aggregator_as_number = 23456 + in_aggregator_addr = '10.0.0.1' + in_as4_aggregator_as_number = 4000 + in_as4_aggregator_addr = '10.0.0.1' + + # Expected values + ex_as_path_value = [[1000, 2000, 3000]] + ex_aggregator_as_number = 4000 + ex_aggregator_addr = '10.0.0.1' + + # Prepare test data + path_attributes = [ + bgp.BGPPathAttributeAsPath( + value=in_as_path_value), + bgp.BGPPathAttributeAs4Path( + value=in_as4_path_value), + bgp.BGPPathAttributeAggregator( + as_number=in_aggregator_as_number, + addr=in_aggregator_addr), + bgp.BGPPathAttributeAs4Aggregator( + as_number=in_as4_aggregator_as_number, + addr=in_as4_aggregator_addr), + ] + + # Test + self._test_extract_and_reconstruct_as_path( + path_attributes, ex_as_path_value, + ex_aggregator_as_number, ex_aggregator_addr) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def test_extract_and_reconstruct_as_path_with_not_trans_as_aggr(self): + # Input values + in_as_path_value = [[1000, 23456, 3000]] + in_as4_path_value = [[2000, 3000]] + in_aggregator_as_number = 4000 # not AS_TRANS + in_aggregator_addr = '10.0.0.1' + in_as4_aggregator_as_number = 4000 + in_as4_aggregator_addr = '10.0.0.1' + + # Expected values + ex_as_path_value = [[1000, 23456, 3000]] + ex_aggregator_as_number = 4000 + ex_aggregator_addr = '10.0.0.1' + + # Prepare test data + path_attributes = [ + bgp.BGPPathAttributeAsPath( + value=in_as_path_value), + bgp.BGPPathAttributeAs4Path( + value=in_as4_path_value), + bgp.BGPPathAttributeAggregator( + as_number=in_aggregator_as_number, + addr=in_aggregator_addr), + bgp.BGPPathAttributeAs4Aggregator( + as_number=in_as4_aggregator_as_number, + addr=in_as4_aggregator_addr), + ] + + # Test + self._test_extract_and_reconstruct_as_path( + path_attributes, ex_as_path_value, + ex_aggregator_as_number, ex_aggregator_addr) + + @mock.patch.object( + peer.Peer, '__init__', mock.MagicMock(return_value=None)) + def test_extract_and_reconstruct_as_path_with_short_as_path(self): + # Input values + in_as_path_value = [[1000, 23456, 3000]] + in_as4_path_value = [[2000, 3000, 4000, 5000]] # longer than AS_PATH + in_aggregator_as_number = 4000 + in_aggregator_addr = '10.0.0.1' + # in_as4_aggregator_as_number + # in_as4_aggregator_addr + + # Expected values + ex_as_path_value = [[1000, 23456, 3000]] + ex_aggregator_as_number = 4000 + ex_aggregator_addr = '10.0.0.1' + + # Prepare test data + path_attributes = [ + bgp.BGPPathAttributeAsPath( + value=in_as_path_value), + bgp.BGPPathAttributeAs4Path( + value=in_as4_path_value), + bgp.BGPPathAttributeAggregator( + as_number=in_aggregator_as_number, + addr=in_aggregator_addr), + ] + + # Test + self._test_extract_and_reconstruct_as_path( + path_attributes, ex_as_path_value, + ex_aggregator_as_number, ex_aggregator_addr) diff --git a/tests/unit/services/protocols/bgp/utils/__init__.py b/tests/unit/services/protocols/bgp/utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unit/services/protocols/bgp/utils/__init__.py diff --git a/tests/unit/services/protocols/bgp/utils/test_bgp.py b/tests/unit/services/protocols/bgp/utils/test_bgp.py new file mode 100644 index 00000000..6933a28b --- /dev/null +++ b/tests/unit/services/protocols/bgp/utils/test_bgp.py @@ -0,0 +1,211 @@ +# Copyright (C) 2017 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import unittest + +from nose.tools import eq_, raises + +from ryu.lib.packet.bgp import ( + BGPFlowSpecTrafficRateCommunity, + BGPFlowSpecTrafficActionCommunity, + BGPFlowSpecRedirectCommunity, + BGPFlowSpecTrafficMarkingCommunity, + BGPFlowSpecVlanActionCommunity, + BGPFlowSpecTPIDActionCommunity, +) + +from ryu.services.protocols.bgp.core import BgpCoreError +from ryu.services.protocols.bgp.utils.bgp import create_v4flowspec_actions +from ryu.services.protocols.bgp.utils.bgp import create_v6flowspec_actions +from ryu.services.protocols.bgp.utils.bgp import create_l2vpnflowspec_actions + + +LOG = logging.getLogger(__name__) + + +class Test_Utils_BGP(unittest.TestCase): + """ + Test case for ryu.services.protocols.bgp.utils.bgp + """ + + def _test_create_v4flowspec_actions(self, actions, expected_communities): + communities = create_v4flowspec_actions(actions) + expected_communities.sort(key=lambda x: x.subtype) + communities.sort(key=lambda x: x.subtype) + eq_(str(expected_communities), str(communities)) + + def test_create_v4flowspec_actions_all_actions(self): + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + 'traffic_action': { + 'action': 3, + }, + 'redirect': { + 'as_number': 10, + 'local_administrator': 10, + }, + 'traffic_marking': { + 'dscp': 24, + } + } + expected_communities = [ + BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0), + BGPFlowSpecTrafficActionCommunity(action=3), + BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10), + BGPFlowSpecTrafficMarkingCommunity(dscp=24), + ] + self._test_create_v4flowspec_actions(actions, expected_communities) + + def test_create_v4flowspec_actions_without_actions(self): + actions = None + expected_communities = [] + self._test_create_v4flowspec_actions(actions, expected_communities) + + @raises(ValueError) + def test_create_v4flowspec_actions_not_exist_actions(self): + actions = { + 'traffic_test': { + 'test': 10, + }, + } + expected_communities = [] + self._test_create_v4flowspec_actions(actions, expected_communities) + + def _test_create_v6flowspec_actions(self, actions, expected_communities): + communities = create_v6flowspec_actions(actions) + expected_communities.sort(key=lambda x: x.subtype) + communities.sort(key=lambda x: x.subtype) + eq_(str(expected_communities), str(communities)) + + def test_create_v6flowspec_actions_all_actions(self): + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + 'traffic_action': { + 'action': 3, + }, + 'redirect': { + 'as_number': 10, + 'local_administrator': 10, + }, + 'traffic_marking': { + 'dscp': 24, + } + } + expected_communities = [ + BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0), + BGPFlowSpecTrafficActionCommunity(action=3), + BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10), + BGPFlowSpecTrafficMarkingCommunity(dscp=24), + ] + self._test_create_v6flowspec_actions(actions, expected_communities) + + def test_create_v6flowspec_actions_without_actions(self): + actions = None + expected_communities = [] + self._test_create_v6flowspec_actions(actions, expected_communities) + + @raises(ValueError) + def test_create_v6flowspec_actions_not_exist_actions(self): + actions = { + 'traffic_test': { + 'test': 10, + }, + } + expected_communities = [] + self._test_create_v6flowspec_actions(actions, expected_communities) + + def _test_create_l2vpnflowspec_actions(self, actions, expected_communities): + communities = create_l2vpnflowspec_actions(actions) + expected_communities.sort(key=lambda x: x.subtype) + communities.sort(key=lambda x: x.subtype) + eq_(str(expected_communities), str(communities)) + + def test_create_l2vpnflowspec_actions_all_actions(self): + actions = { + 'traffic_rate': { + 'as_number': 0, + 'rate_info': 100.0, + }, + 'traffic_action': { + 'action': 3, + }, + 'redirect': { + 'as_number': 10, + 'local_administrator': 10, + }, + 'traffic_marking': { + 'dscp': 24, + }, + 'vlan_action': { + 'actions_1': (BGPFlowSpecVlanActionCommunity.POP | + BGPFlowSpecVlanActionCommunity.SWAP), + 'vlan_1': 3000, + 'cos_1': 3, + 'actions_2': BGPFlowSpecVlanActionCommunity.PUSH, + 'vlan_2': 4000, + 'cos_2': 2, + }, + 'tpid_action': { + 'actions': (BGPFlowSpecTPIDActionCommunity.TI | + BGPFlowSpecTPIDActionCommunity.TO), + 'tpid_1': 5, + 'tpid_2': 6, + } + } + expected_communities = [ + BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0), + BGPFlowSpecTrafficActionCommunity(action=3), + BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10), + BGPFlowSpecTrafficMarkingCommunity(dscp=24), + BGPFlowSpecVlanActionCommunity( + actions_1=(BGPFlowSpecVlanActionCommunity.POP | + BGPFlowSpecVlanActionCommunity.SWAP), + vlan_1=3000, + cos_1=3, + actions_2=BGPFlowSpecVlanActionCommunity.PUSH, + vlan_2=4000, + cos_2=2, + ), + BGPFlowSpecTPIDActionCommunity( + actions=(BGPFlowSpecTPIDActionCommunity.TI | + BGPFlowSpecTPIDActionCommunity.TO), + tpid_1=5, + tpid_2=6, + ), + ] + self._test_create_l2vpnflowspec_actions(actions, expected_communities) + + def test_create_l2vpnflowspec_actions_without_actions(self): + actions = None + expected_communities = [] + self._test_create_l2vpnflowspec_actions(actions, expected_communities) + + @raises(ValueError) + def test_create_l2vpnflowspec_actions_not_exist_actions(self): + actions = { + 'traffic_test': { + 'test': 10, + }, + } + expected_communities = [] + self._test_create_l2vpnflowspec_actions(actions, expected_communities) diff --git a/tests/unit/services/protocols/bgp/utils/test_validation.py b/tests/unit/services/protocols/bgp/utils/test_validation.py new file mode 100644 index 00000000..6d5f6ac3 --- /dev/null +++ b/tests/unit/services/protocols/bgp/utils/test_validation.py @@ -0,0 +1,215 @@ +# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest + +from nose.tools import eq_, ok_ + +from ryu.services.protocols.bgp.utils import validation + + +LOG = logging.getLogger(__name__) + + +class Test_Utils_Validation(unittest.TestCase): + """ + Test case for ryu.services.protocols.bgp.utils.validation + """ + + def test_is_valid_mac(self): + ok_(validation.is_valid_mac('aa:bb:cc:dd:ee:ff')) + + def test_is_valid_mac_hyphenation(self): + ok_(validation.is_valid_mac('aa-bb-cc-dd-ee-ff')) + + def test_is_valid_mac_short(self): + eq_(False, validation.is_valid_mac('aa:bb:cc:dd:ee')) + + def test_is_valid_ip_prefix(self): + ok_(validation.is_valid_ip_prefix(24, 32)) + + def test_is_valid_ip_prefix_str(self): + ok_(validation.is_valid_ip_prefix('24', 32)) + + def test_is_valid_ip_prefix_not_digit(self): + eq_(False, validation.is_valid_ip_prefix('foo', 32)) + + def test_is_valid_ip_prefix_over(self): + eq_(False, validation.is_valid_ip_prefix(100, 32)) + + def test_is_valid_ipv4(self): + ok_(validation.is_valid_ipv4('10.0.0.1')) + + def test_is_valid_ipv4_not_dot(self): + eq_(False, validation.is_valid_ipv4('192:168:0:1')) + + def test_is_valid_ipv4_prefix(self): + ok_(validation.is_valid_ipv4_prefix('10.0.0.1/24')) + + def test_is_valid_ipv4_prefix_not_str(self): + eq_(False, validation.is_valid_ipv4_prefix(1234)) + + def test_is_valid_ipv4_prefix_without_prefix(self): + eq_(False, validation.is_valid_ipv4_prefix('10.0.0.1')) + + def test_is_valid_ipv4_prefix_invalid_addr(self): + eq_(False, validation.is_valid_ipv4_prefix('xxx.xxx.xxx.xxx/24')) + + def test_is_valid_ipv6(self): + ok_(validation.is_valid_ipv6('fe80::0011:aabb:ccdd:eeff')) + + def test_is_valid_ipv6_not_colon(self): + eq_(False, validation.is_valid_ipv6('fe80--0011-aabb-ccdd-eeff')) + + def test_is_valid_ipv6_prefix(self): + ok_(validation.is_valid_ipv6_prefix('fe80::0011:aabb:ccdd:eeff/64')) + + def test_is_valid_ipv6_prefix_not_str(self): + eq_(False, validation.is_valid_ipv6_prefix(1234)) + + def test_is_valid_ipv6_prefix_without_prefix(self): + eq_(False, + validation.is_valid_ipv6_prefix('fe80::0011:aabb:ccdd:eeff')) + + def test_is_valid_ipv6_prefix_invalid_addr(self): + eq_(False, validation.is_valid_ipv6_prefix('xxxx::xxxx/64')) + + def test_is_valid_old_asn(self): + ok_(validation.is_valid_old_asn(65000)) + + def test_is_valid_old_asn_negative(self): + eq_(False, validation.is_valid_old_asn(-1)) + + def test_is_valid_old_asn_over(self): + eq_(False, validation.is_valid_old_asn(0xffff + 1)) + + def test_is_valid_asn(self): + ok_(validation.is_valid_asn(6553800)) + + def test_is_valid_asn_old(self): + ok_(validation.is_valid_asn(65000)) + + def test_is_valid_asn_negative(self): + eq_(False, validation.is_valid_asn(-1)) + + def test_is_valid_asn_over(self): + eq_(False, validation.is_valid_asn(0xffffffff + 1)) + + def test_is_valid_vpnv4_prefix(self): + ok_(validation.is_valid_vpnv4_prefix('100:200:10.0.0.1/24')) + + def test_is_valid_vpnv4_prefix_not_str(self): + eq_(False, validation.is_valid_vpnv4_prefix(1234)) + + def test_is_valid_vpnv4_prefix_short_rd(self): + eq_(False, validation.is_valid_vpnv4_prefix('100:10.0.0.1/24')) + + def test_is_valid_vpnv4_prefix_invalid_rd(self): + eq_(False, validation.is_valid_vpnv4_prefix('foo:bar:10.0.0.1/24')) + + def test_is_valid_vpnv6_prefix(self): + ok_(validation.is_valid_vpnv6_prefix( + '100:200:fe80::0011:aabb:ccdd:eeff/64')) + + def test_is_valid_vpnv6_prefix_not_str(self): + eq_(False, validation.is_valid_vpnv6_prefix(1234)) + + def test_is_valid_vpnv6_prefix_short_rd(self): + eq_(False, validation.is_valid_vpnv6_prefix('100:eeff/64')) + + def test_is_valid_vpnv6_prefix_invalid_rd(self): + eq_(False, validation.is_valid_vpnv6_prefix('foo:bar:10.0.0.1/24')) + + def test_is_valid_med(self): + ok_(validation.is_valid_med(100)) + + def test_is_valid_med_not_num(self): + eq_(False, validation.is_valid_med('foo')) + + def test_is_valid_med_negative(self): + eq_(False, validation.is_valid_med(-1)) + + def test_is_valid_med_over(self): + eq_(False, validation.is_valid_med(0xffffffff + 1)) + + def test_is_valid_mpls_label(self): + ok_(validation.is_valid_mpls_label(100)) + + def test_is_valid_mpls_label_reserved(self): + eq_(False, validation.is_valid_mpls_label(4)) + + def test_is_valid_mpls_label_not_num(self): + eq_(False, validation.is_valid_mpls_label('foo')) + + def test_is_valid_mpls_label_negative(self): + eq_(False, validation.is_valid_mpls_label(-1)) + + def test_is_valid_mpls_label_over(self): + eq_(False, validation.is_valid_mpls_label(0x100000 + 1)) + + def test_is_valid_mpls_labels(self): + ok_(validation.is_valid_mpls_labels([100, 200])) + + def test_is_valid_mpls_labels_not_list(self): + eq_(False, validation.is_valid_mpls_labels(100)) + + def test_is_valid_mpls_labels_with_invalid_label(self): + eq_(False, validation.is_valid_mpls_labels(['foo', 200])) + + def test_is_valid_route_dist(self): + ok_(validation.is_valid_route_dist('65000:222')) + + def test_is_valid_route_dist_ipv4_based(self): + ok_(validation.is_valid_route_dist('10.0.0.1:333')) + + def test_is_valid_route_not_str(self): + eq_(False, validation.is_valid_route_dist(65000)) + + def test_is_valid_route_dist_short(self): + eq_(False, validation.is_valid_route_dist('65000')) + + def test_is_valid_route_dist_invalid_ipv4_addr(self): + eq_(False, validation.is_valid_route_dist('xxx.xxx.xxx.xxx:333')) + + def test_is_valid_esi(self): + ok_(validation.is_valid_esi(100)) + + def test_is_valid_esi_not_int(self): + eq_(False, validation.is_valid_esi('foo')) + + def test_is_valid_ethernet_tag_id(self): + ok_(validation.is_valid_ethernet_tag_id(100)) + + def test_is_valid_ethernet_tag_id_not_int(self): + eq_(False, validation.is_valid_ethernet_tag_id('foo')) + + def test_is_valid_ethernet_tag_id_negative(self): + eq_(False, validation.is_valid_ethernet_tag_id(-1)) + + def test_is_valid_ethernet_tag_id_over(self): + eq_(False, validation.is_valid_ethernet_tag_id(0xffffffff + 1)) + + def test_is_valid_vni(self): + ok_(validation.is_valid_vni(100)) + + def test_is_valid_vni_not_int(self): + eq_(False, validation.is_valid_vni('foo')) + + def test_is_valid_vni_negative(self): + eq_(False, validation.is_valid_vni(-1)) + + def test_is_valid_vni_over(self): + eq_(False, validation.is_valid_vni(0xffffff + 1)) |