summaryrefslogtreecommitdiffhomepage
path: root/tests/unit/services/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/services/protocols')
-rw-r--r--tests/unit/services/protocols/__init__.py0
-rw-r--r--tests/unit/services/protocols/bgp/__init__.py0
-rw-r--r--tests/unit/services/protocols/bgp/core_managers/__init__.py0
-rw-r--r--tests/unit/services/protocols/bgp/core_managers/test_table_manager.py937
-rw-r--r--tests/unit/services/protocols/bgp/test_bgpspeaker.py1088
-rw-r--r--tests/unit/services/protocols/bgp/test_peer.py375
-rw-r--r--tests/unit/services/protocols/bgp/utils/__init__.py0
-rw-r--r--tests/unit/services/protocols/bgp/utils/test_bgp.py211
-rw-r--r--tests/unit/services/protocols/bgp/utils/test_validation.py215
9 files changed, 2826 insertions, 0 deletions
diff --git a/tests/unit/services/protocols/__init__.py b/tests/unit/services/protocols/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unit/services/protocols/__init__.py
diff --git a/tests/unit/services/protocols/bgp/__init__.py b/tests/unit/services/protocols/bgp/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/__init__.py
diff --git a/tests/unit/services/protocols/bgp/core_managers/__init__.py b/tests/unit/services/protocols/bgp/core_managers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/core_managers/__init__.py
diff --git a/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py b/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py
new file mode 100644
index 00000000..c9c9f55e
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/core_managers/test_table_manager.py
@@ -0,0 +1,937 @@
+# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import OrderedDict
+import unittest
+import logging
+try:
+ import mock # Python 2
+except ImportError:
+ from unittest import mock # Python 3
+
+from nose.tools import ok_, eq_, raises
+
+from ryu.lib.packet.bgp import BGPPathAttributeOrigin
+from ryu.lib.packet.bgp import BGPPathAttributeAsPath
+from ryu.lib.packet.bgp import BGP_ATTR_ORIGIN_IGP
+from ryu.lib.packet.bgp import BGP_ATTR_TYPE_ORIGIN
+from ryu.lib.packet.bgp import BGP_ATTR_TYPE_AS_PATH
+from ryu.lib.packet.bgp import BGP_ATTR_TYPE_EXTENDED_COMMUNITIES
+from ryu.lib.packet.bgp import IPAddrPrefix
+from ryu.lib.packet.bgp import IP6AddrPrefix
+from ryu.lib.packet.bgp import EvpnArbitraryEsi
+from ryu.lib.packet.bgp import EvpnLACPEsi
+from ryu.lib.packet.bgp import EvpnEthernetAutoDiscoveryNLRI
+from ryu.lib.packet.bgp import EvpnMacIPAdvertisementNLRI
+from ryu.lib.packet.bgp import EvpnInclusiveMulticastEthernetTagNLRI
+from ryu.lib.packet.bgp import FlowSpecIPv4NLRI
+from ryu.lib.packet.bgp import BGPPathAttributeExtendedCommunities
+from ryu.services.protocols.bgp.bgpspeaker import EVPN_MAX_ET
+from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_LACP
+from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_FAMILY_IPV4
+from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_FAMILY_VPNV4
+from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_TA_SAMPLE
+from ryu.services.protocols.bgp.bgpspeaker import FLOWSPEC_TA_TERMINAL
+from ryu.services.protocols.bgp.core import BgpCoreError
+from ryu.services.protocols.bgp.core_managers import table_manager
+from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4
+from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV6
+from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_L2_EVPN
+from ryu.services.protocols.bgp.rtconf.vrfs import VRF_RF_IPV4_FLOWSPEC
+from ryu.services.protocols.bgp.utils.bgp import create_v4flowspec_actions
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_TableCoreManager(unittest.TestCase):
+ """
+ Test case for bgp.core_managers.table_manager.TableCoreManager
+ """
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ def _test_update_vrf_table(self, prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ is_withdraw=False, **kwargs):
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+ vrf_table_mock = mock.MagicMock()
+ tbl_mng._tables = {(route_dist, route_family): vrf_table_mock}
+
+ # Test
+ tbl_mng.update_vrf_table(
+ route_dist=route_dist,
+ prefix=prefix_str,
+ next_hop=next_hop,
+ route_family=route_family,
+ route_type=route_type,
+ is_withdraw=is_withdraw,
+ **kwargs)
+
+ # Check
+ call_args_list = vrf_table_mock.insert_vrf_path.call_args_list
+ ok_(len(call_args_list) == 1) # insert_vrf_path should be called once
+ args, kwargs = call_args_list[0]
+ ok_(len(args) == 0) # no positional argument
+ eq_(str(prefix_inst), str(kwargs['nlri']))
+ eq_(is_withdraw, kwargs['is_withdraw'])
+ if is_withdraw:
+ eq_(None, kwargs['next_hop'])
+ eq_(False, kwargs['gen_lbl'])
+ else:
+ eq_(next_hop, kwargs['next_hop'])
+ eq_(True, kwargs['gen_lbl'])
+
+ def test_update_vrf_table_ipv4(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = '192.168.0.0'
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_IPV4
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ def test_update_vrf_table_ipv6(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = 'fe80::'
+ ip_prefix_len = 64
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IP6AddrPrefix(ip_prefix_len, ip_network)
+ next_hop = 'fe80::0011:aabb:ccdd:eeff'
+ route_family = VRF_RF_IPV6
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ def test_update_vrf_table_l2_evpn_with_esi_int(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ prefix_str = None # should be ignored
+ kwargs = {
+ 'ethernet_tag_id': 100,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'ip_addr': '192.168.0.1',
+ 'mpls_labels': [], # not be used
+ }
+ esi = EvpnArbitraryEsi(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ prefix_inst = EvpnMacIPAdvertisementNLRI(
+ route_dist=route_dist,
+ esi=esi,
+ **kwargs)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_L2_EVPN
+ route_type = EvpnMacIPAdvertisementNLRI.ROUTE_TYPE_NAME
+ kwargs['esi'] = 0
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ def test_update_vrf_table_l2_evpn_with_esi_dict(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ prefix_str = None # should be ignored
+ kwargs = {
+ 'ethernet_tag_id': EVPN_MAX_ET,
+ }
+ esi = EvpnLACPEsi(mac_addr='aa:bb:cc:dd:ee:ff', port_key=100)
+ prefix_inst = EvpnEthernetAutoDiscoveryNLRI(
+ route_dist=route_dist,
+ esi=esi,
+ **kwargs)
+ next_hop = '0.0.0.0'
+ route_family = VRF_RF_L2_EVPN
+ route_type = EvpnEthernetAutoDiscoveryNLRI.ROUTE_TYPE_NAME
+ kwargs['esi'] = {
+ 'type': ESI_TYPE_LACP,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'port_key': 100,
+ }
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ def test_update_vrf_table_l2_evpn_without_esi(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ prefix_str = None # should be ignored
+ kwargs = {
+ 'ethernet_tag_id': 100,
+ 'ip_addr': '192.168.0.1',
+ }
+ prefix_inst = EvpnInclusiveMulticastEthernetTagNLRI(
+ route_dist=route_dist, **kwargs)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_L2_EVPN
+ route_type = EvpnInclusiveMulticastEthernetTagNLRI.ROUTE_TYPE_NAME
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ def test_update_vrf_table_l2_evpn_with_vni(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ prefix_str = None # should be ignored
+ kwargs = {
+ 'ethernet_tag_id': 100,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'ip_addr': '192.168.0.1',
+ 'vni': 500,
+ }
+ esi = EvpnArbitraryEsi(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ prefix_inst = EvpnMacIPAdvertisementNLRI(
+ route_dist=route_dist,
+ esi=esi,
+ **kwargs)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_L2_EVPN
+ route_type = EvpnMacIPAdvertisementNLRI.ROUTE_TYPE_NAME
+ tunnel_type = 'vxlan'
+ kwargs['esi'] = 0
+
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+ vrf_table_mock = mock.MagicMock()
+ tbl_mng._tables = {(route_dist, route_family): vrf_table_mock}
+
+ # Test
+ tbl_mng.update_vrf_table(
+ route_dist=route_dist,
+ prefix=prefix_str,
+ next_hop=next_hop,
+ route_family=route_family,
+ route_type=route_type,
+ tunnel_type=tunnel_type,
+ **kwargs)
+
+ # Check
+ call_args_list = vrf_table_mock.insert_vrf_path.call_args_list
+ ok_(len(call_args_list) == 1) # insert_vrf_path should be called once
+ args, kwargs = call_args_list[0]
+ ok_(len(args) == 0) # no positional argument
+ eq_(str(prefix_inst), str(kwargs['nlri']))
+ eq_(next_hop, kwargs['next_hop'])
+ eq_(False, kwargs['gen_lbl']) # should not generate MPLS labels
+ eq_(tunnel_type, kwargs['tunnel_type'])
+
+ def test_update_vrf_table_ipv4_withdraw(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = '192.168.0.0'
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_IPV4
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ is_withdraw=True, **kwargs)
+
+ @raises(BgpCoreError)
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ def test_update_vrf_table_no_vrf(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = '192.168.0.0'
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_IPV4
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+ tbl_mng._tables = {} # no table
+
+ # Test
+ tbl_mng.update_vrf_table(
+ route_dist=route_dist,
+ prefix=prefix_str,
+ next_hop=next_hop,
+ route_family=route_family,
+ route_type=route_type,
+ **kwargs)
+
+ @raises(BgpCoreError)
+ def test_update_vrf_table_invalid_next_hop(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = '192.168.0.0'
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network)
+ next_hop = 'xxx.xxx.xxx.xxx' # invalid
+ route_family = VRF_RF_IPV4
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ @raises(BgpCoreError)
+ def test_update_vrf_table_invalid_ipv4_prefix(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = 'xxx.xxx.xxx.xxx' # invalid
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network)
+ next_hop = '10.0.0.1'
+ route_family = VRF_RF_IPV4
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ @raises(BgpCoreError)
+ def test_update_vrf_table_invalid_ipv6_prefix(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = 'xxxx::' # invalid
+ ip_prefix_len = 64
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IP6AddrPrefix(ip_prefix_len, ip_network)
+ next_hop = 'fe80::0011:aabb:ccdd:eeff'
+ route_family = VRF_RF_IPV6
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ @raises(BgpCoreError)
+ def test_update_vrf_table_invalid_route_family(self):
+ # Prepare test data
+ route_dist = '65000:100'
+ ip_network = '192.168.0.0'
+ ip_prefix_len = 24
+ prefix_str = '%s/%d' % (ip_network, ip_prefix_len)
+ prefix_inst = IPAddrPrefix(ip_prefix_len, ip_network)
+ next_hop = '10.0.0.1'
+ route_family = 'foobar' # invalid
+ route_type = None # should be ignored
+ kwargs = {} # should be ignored
+
+ self._test_update_vrf_table(prefix_inst, route_dist, prefix_str,
+ next_hop, route_family, route_type,
+ **kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.learn_path')
+ def _test_update_global_table(self, learn_path_mock, prefix, next_hop,
+ is_withdraw, expected_next_hop):
+ # Prepare test data
+ origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP)
+ aspath = BGPPathAttributeAsPath([[]])
+ pathattrs = OrderedDict()
+ pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin
+ pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath
+ pathattrs = str(pathattrs)
+
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+
+ # Test
+ tbl_mng.update_global_table(
+ prefix=prefix,
+ next_hop=next_hop,
+ is_withdraw=is_withdraw,
+ )
+
+ # Check
+ call_args_list = learn_path_mock.call_args_list
+ ok_(len(call_args_list) == 1) # learn_path should be called once
+ args, kwargs = call_args_list[0]
+ ok_(len(kwargs) == 0) # no keyword argument
+ output_path = args[0]
+ eq_(None, output_path.source)
+ eq_(prefix, output_path.nlri.prefix)
+ eq_(pathattrs, str(output_path.pathattr_map))
+ eq_(expected_next_hop, output_path.nexthop)
+ eq_(is_withdraw, output_path.is_withdraw)
+
+ def test_update_global_table_ipv4(self):
+ self._test_update_global_table(
+ prefix='192.168.0.0/24',
+ next_hop='10.0.0.1',
+ is_withdraw=False,
+ expected_next_hop='10.0.0.1',
+ )
+
+ def test_update_global_table_ipv4_withdraw(self):
+ self._test_update_global_table(
+ prefix='192.168.0.0/24',
+ next_hop='10.0.0.1',
+ is_withdraw=True,
+ expected_next_hop='10.0.0.1',
+ )
+
+ def test_update_global_table_ipv4_no_next_hop(self):
+ self._test_update_global_table(
+ prefix='192.168.0.0/24',
+ next_hop=None,
+ is_withdraw=True,
+ expected_next_hop='0.0.0.0',
+ )
+
+ def test_update_global_table_ipv6(self):
+ self._test_update_global_table(
+ prefix='fe80::/64',
+ next_hop='fe80::0011:aabb:ccdd:eeff',
+ is_withdraw=False,
+ expected_next_hop='fe80::0011:aabb:ccdd:eeff',
+ )
+
+ def test_update_global_table_ipv6_withdraw(self):
+ self._test_update_global_table(
+ prefix='fe80::/64',
+ next_hop='fe80::0011:aabb:ccdd:eeff',
+ is_withdraw=True,
+ expected_next_hop='fe80::0011:aabb:ccdd:eeff',
+ )
+
+ def test_update_global_table_ipv6_no_next_hop(self):
+ self._test_update_global_table(
+ prefix='fe80::/64',
+ next_hop=None,
+ is_withdraw=True,
+ expected_next_hop='::',
+ )
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ def _test_update_flowspec_vrf_table(self, flowspec_family, route_family,
+ route_dist, rules, prefix,
+ is_withdraw, actions=None):
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+ vrf_table_mock = mock.MagicMock()
+ tbl_mng._tables = {(route_dist, route_family): vrf_table_mock}
+
+ # Test
+ tbl_mng.update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules,
+ actions=actions,
+ is_withdraw=is_withdraw,
+ )
+
+ # Check
+ call_args_list = vrf_table_mock.insert_vrffs_path.call_args_list
+ ok_(len(
+ call_args_list) == 1) # insert_vrffs_path should be called once
+ args, kwargs = call_args_list[0]
+ ok_(len(args) == 0) # no positional argument
+ eq_(prefix, kwargs['nlri'].prefix)
+ eq_(is_withdraw, kwargs['is_withdraw'])
+
+ def test_update_flowspec_vrf_table_vpnv4(self):
+ flowspec_family = 'vpnv4fs'
+ route_family = 'ipv4fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_vrf_table_vpnv4_without_actions(self):
+ flowspec_family = 'vpnv4fs'
+ route_family = 'ipv4fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+ prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_vpnv4_invalid_actions(self):
+ flowspec_family = 'vpnv4fs'
+ route_family = 'ipv4fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+ actions = {
+ 'invalid_actions': {
+ 'invalid_param': 10,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_vpnv4_invalid_flowspec_family(self):
+ flowspec_family = 'invalid'
+ route_family = 'ipv4fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+ prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_vpnv4_invalid_route_family(self):
+ flowspec_family = 'vpnv4fs'
+ route_family = 'invalid'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+ prefix = 'ipv4fs(dst_prefix:10.70.1.0/24)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch(
+ 'ryu.services.protocols.bgp.core_managers.TableCoreManager.learn_path')
+ def _test_update_flowspec_global_table(self, learn_path_mock,
+ flowspec_family, rules, prefix,
+ is_withdraw, actions=None):
+ # Instantiate TableCoreManager
+ tbl_mng = table_manager.TableCoreManager(None, None)
+
+ # Test
+ tbl_mng.update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ actions=actions,
+ is_withdraw=is_withdraw,
+ )
+
+ # Check
+ call_args_list = learn_path_mock.call_args_list
+ ok_(len(call_args_list) == 1) # learn_path should be called once
+ args, kwargs = call_args_list[0]
+ ok_(len(kwargs) == 0) # no keyword argument
+ output_path = args[0]
+ eq_(None, output_path.source)
+ eq_(prefix, output_path.nlri.prefix)
+ eq_(None, output_path.nexthop)
+ eq_(is_withdraw, output_path.is_withdraw)
+
+ def test_update_flowspec_global_table_ipv4(self):
+ flowspec_family = 'ipv4fs'
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_global_table_ipv4_without_actions(self):
+ flowspec_family = 'ipv4fs'
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+ prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_global_table_ipv4_invalid_actions(self):
+ flowspec_family = 'ipv4fs'
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+ actions = {
+ 'invalid_actions': {
+ 'invalid_param': 10,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_global_table_ipv4_invalid_flowspec_family(self):
+ flowspec_family = 'invalid'
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:10.60.1.0/24)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_global_table_ipv6(self):
+ flowspec_family = 'ipv6fs'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv6fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_global_table_ipv6_without_actions(self):
+ flowspec_family = 'ipv6fs'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ prefix = 'ipv6fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_global_table_ipv6_invalid_actions(self):
+ flowspec_family = 'ipv6fs'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ actions = {
+ 'invalid_actions': {
+ 'invalid_param': 10,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_global_table_ipv6_invalid_flowspec_family(self):
+ flowspec_family = 'invalid'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv4fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_global_table(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_vrf_table_vpnv6(self):
+ flowspec_family = 'vpnv6fs'
+ route_family = 'ipv6fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'ipv6fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_vrf_table_vpnv6_without_actions(self):
+ flowspec_family = 'vpnv6fs'
+ route_family = 'ipv6fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ prefix = 'ipv6fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_vpnv6_invalid_actions(self):
+ flowspec_family = 'vpnv6fs'
+ route_family = 'ipv6fs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ actions = {
+ 'invalid_actions': {
+ 'invalid_param': 10,
+ },
+ }
+ prefix = 'ipv6fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_vpnv6_invalid_route_family(self):
+ flowspec_family = 'vpnv6fs'
+ route_family = 'invalid'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+ prefix = 'ipv4fs(dst_prefix:2001::3/128/32)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ def test_update_flowspec_vrf_table_l2vpn(self):
+ flowspec_family = 'l2vpnfs'
+ route_family = 'l2vpnfs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ }
+ prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ def test_update_flowspec_vrf_table_l2vpn_without_actions(self):
+ flowspec_family = 'l2vpnfs'
+ route_family = 'l2vpnfs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+ prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_l2vpn_invalid_actions(self):
+ flowspec_family = 'l2vpnfs'
+ route_family = 'l2vpnfs'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+ actions = {
+ 'invalid_actions': {
+ 'invalid_param': 10,
+ },
+ }
+ prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ actions=actions,
+ )
+
+ @raises(BgpCoreError)
+ def test_update_flowspec_vrf_table_l2vpn_invalid_route_family(self):
+ flowspec_family = 'l2vpnfs'
+ route_family = 'invalid'
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+ prefix = 'l2vpnfs(dst_mac:12:34:56:78:9a:bc)'
+
+ self._test_update_flowspec_vrf_table(
+ flowspec_family=flowspec_family,
+ route_family=route_family,
+ route_dist=route_dist,
+ rules=rules,
+ prefix=prefix,
+ is_withdraw=False,
+ )
diff --git a/tests/unit/services/protocols/bgp/test_bgpspeaker.py b/tests/unit/services/protocols/bgp/test_bgpspeaker.py
new file mode 100644
index 00000000..81a8bb3b
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/test_bgpspeaker.py
@@ -0,0 +1,1088 @@
+# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import logging
+try:
+ import mock # Python 2
+except ImportError:
+ from unittest import mock # Python 3
+
+from nose.tools import raises
+
+from ryu.services.protocols.bgp import bgpspeaker
+from ryu.services.protocols.bgp.bgpspeaker import EVPN_MAX_ET
+from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_LACP
+from ryu.services.protocols.bgp.api.prefix import ESI_TYPE_L2_BRIDGE
+from ryu.services.protocols.bgp.bgpspeaker import ESI_TYPE_MAC_BASED
+from ryu.services.protocols.bgp.api.prefix import REDUNDANCY_MODE_ALL_ACTIVE
+from ryu.services.protocols.bgp.api.prefix import REDUNDANCY_MODE_SINGLE_ACTIVE
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_BGPSpeaker(unittest.TestCase):
+ """
+ Test case for bgp.bgpspeaker.BGPSpeaker
+ """
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_eth_auto_discovery(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY
+ route_dist = '65000:100'
+ esi = {
+ 'type': ESI_TYPE_LACP,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'port_key': 100,
+ }
+ ethernet_tag_id = EVPN_MAX_ET
+ redundancy_mode = REDUNDANCY_MODE_ALL_ACTIVE
+ next_hop = '0.0.0.0'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'redundancy_mode': redundancy_mode,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ redundancy_mode=redundancy_mode,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_eth_auto_discovery_vni(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY
+ route_dist = '65000:100'
+ esi = {
+ 'type': ESI_TYPE_L2_BRIDGE,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'priority': 100,
+ }
+ ethernet_tag_id = EVPN_MAX_ET
+ redundancy_mode = REDUNDANCY_MODE_SINGLE_ACTIVE
+ vni = 500
+ next_hop = '0.0.0.0'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'redundancy_mode': redundancy_mode,
+ 'vni': vni,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ redundancy_mode=redundancy_mode,
+ vni=vni
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_mac_ip_adv(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ next_hop = '10.0.0.1'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'mac_addr': mac_addr,
+ 'ip_addr': ip_addr,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ next_hop=next_hop,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_mac_ip_adv_vni(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ vni = 500
+ next_hop = '10.0.0.1'
+ tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'mac_addr': mac_addr,
+ 'ip_addr': ip_addr,
+ 'vni': vni,
+ 'next_hop': next_hop,
+ 'tunnel_type': tunnel_type,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ vni=vni,
+ next_hop=next_hop,
+ tunnel_type=tunnel_type,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_multicast_etag(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ next_hop = '10.0.0.1'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ # 'esi': esi, # should be ignored
+ 'ethernet_tag_id': ethernet_tag_id,
+ # 'mac_addr': mac_addr, # should be ignored
+ 'ip_addr': ip_addr,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ next_hop=next_hop,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_multicast_etag_no_next_hop(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ next_hop = '0.0.0.0' # the default value
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ # 'esi': esi, # should be ignored
+ 'ethernet_tag_id': ethernet_tag_id,
+ # 'mac_addr': mac_addr, # should be ignored
+ 'ip_addr': ip_addr,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ # next_hop=next_hop, # omitted
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_eth_segment(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_ETH_SEGMENT
+ route_dist = '65000:100'
+ esi = {
+ 'type': ESI_TYPE_MAC_BASED,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'local_disc': 100,
+ }
+ ip_addr = '192.168.0.1'
+ next_hop = '0.0.0.0'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ip_addr': ip_addr,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ip_addr=ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_ip_prefix_route(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ ip_prefix = '192.168.0.0/24'
+ gw_ip_addr = '172.16.0.1'
+ next_hop = '0.0.0.0'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'ip_prefix': ip_prefix,
+ 'gw_ip_addr': gw_ip_addr,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_prefix=ip_prefix,
+ gw_ip_addr=gw_ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_ip_prefix_route_vni(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ ip_prefix = '192.168.0.0/24'
+ gw_ip_addr = '172.16.0.1'
+ vni = 500
+ tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN
+ next_hop = '0.0.0.0'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'ip_prefix': ip_prefix,
+ 'gw_ip_addr': gw_ip_addr,
+ 'tunnel_type': tunnel_type,
+ 'vni': vni,
+ 'next_hop': next_hop,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_prefix=ip_prefix,
+ gw_ip_addr=gw_ip_addr,
+ tunnel_type=tunnel_type,
+ vni=vni,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @raises(ValueError)
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_invalid_route_type(self, mock_call):
+ # Prepare test data
+ route_type = 'foobar' # Invalid EVPN route type
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ next_hop = '10.0.0.1'
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ next_hop=next_hop,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', 'Invalid arguments detected')
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_auto_discovery(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY
+ route_dist = '65000:100'
+ esi = {
+ 'type': ESI_TYPE_LACP,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'port_key': 100,
+ }
+ ethernet_tag_id = EVPN_MAX_ET
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ethernet_tag_id': ethernet_tag_id,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_mac_ip_adv(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE
+ route_dist = '65000:100'
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'mac_addr': mac_addr,
+ 'ip_addr': ip_addr,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_multicast_etag(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ # 'esi': esi, # should be ignored
+ 'ethernet_tag_id': ethernet_tag_id,
+ # 'mac_addr': mac_addr, # should be ignored
+ 'ip_addr': ip_addr,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', **expected_kwargs)
+
+ @raises(ValueError)
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_invalid_route_type(self, mock_call):
+ # Prepare test data
+ route_type = 'foobar' # Invalid EVPN route type
+ route_dist = '65000:100'
+ esi = 0 # denotes single-homed
+ ethernet_tag_id = 200
+ mac_addr = 'aa:bb:cc:dd:ee:ff'
+ ip_addr = '192.168.0.1'
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ethernet_tag_id=ethernet_tag_id,
+ mac_addr=mac_addr,
+ ip_addr=ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', 'Invalid arguments detected')
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_eth_segment(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_ETH_SEGMENT
+ route_dist = '65000:100'
+ esi = {
+ 'esi_type': ESI_TYPE_MAC_BASED,
+ 'mac_addr': 'aa:bb:cc:dd:ee:ff',
+ 'local_disc': 100,
+ }
+ ip_addr = '192.168.0.1'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'esi': esi,
+ 'ip_addr': ip_addr,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ esi=esi,
+ ip_addr=ip_addr,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_del_ip_prefix_route(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE
+ route_dist = '65000:100'
+ ethernet_tag_id = 200
+ ip_prefix = '192.168.0.0/24'
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'ip_prefix': ip_prefix,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_del(
+ route_type=route_type,
+ route_dist=route_dist,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_prefix=ip_prefix,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.delete_local', **expected_kwargs)
+
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_pmsi_no_tunnel_info(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ ethernet_tag_id = 200
+ next_hop = '0.0.0.0'
+ ip_addr = '192.168.0.1'
+ pmsi_tunnel_type = bgpspeaker.PMSI_TYPE_NO_TUNNEL_INFO
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'next_hop': next_hop,
+ 'ip_addr': ip_addr,
+ 'pmsi_tunnel_type': pmsi_tunnel_type,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_addr=ip_addr,
+ pmsi_tunnel_type=pmsi_tunnel_type,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_pmsi_ingress_rep(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ ethernet_tag_id = 200
+ next_hop = '0.0.0.0'
+ ip_addr = '192.168.0.1'
+ pmsi_tunnel_type = bgpspeaker.PMSI_TYPE_INGRESS_REP
+ expected_kwargs = {
+ 'route_type': route_type,
+ 'route_dist': route_dist,
+ 'ethernet_tag_id': ethernet_tag_id,
+ 'next_hop': next_hop,
+ 'ip_addr': ip_addr,
+ 'pmsi_tunnel_type': pmsi_tunnel_type,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_addr=ip_addr,
+ pmsi_tunnel_type=pmsi_tunnel_type,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', **expected_kwargs)
+
+ @raises(ValueError)
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_evpn_prefix_add_invalid_pmsi_tunnel_type(self, mock_call):
+ # Prepare test data
+ route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE
+ route_dist = '65000:100'
+ ethernet_tag_id = 200
+ next_hop = '0.0.0.0'
+ ip_addr = '192.168.0.1'
+ pmsi_tunnel_type = 1
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.evpn_prefix_add(
+ route_type=route_type,
+ route_dist=route_dist,
+ ethernet_tag_id=ethernet_tag_id,
+ ip_addr=ip_addr,
+ pmsi_tunnel_type=pmsi_tunnel_type,
+ )
+
+ # Check
+ mock_call.assert_called_with(
+ 'evpn_prefix.add_local', 'Invalid arguments detected')
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_ipv4(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+
+ actions = {
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ 'actions': actions,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ actions=actions)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_ipv4_without_actions(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ 'actions': {},
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_del_ipv4(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV4
+ rules = {
+ 'dst_prefix': '10.60.1.0/24',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_del(
+ flowspec_family=flowspec_family,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.del', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_vpnv4(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV4
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+
+ actions = {
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ 'actions': actions,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules,
+ actions=actions)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_del_vpnv4(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV4
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '10.70.1.0/24',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_del(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.del_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_ipv6(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+
+ actions = {
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ 'actions': actions,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ rules=rules,
+ actions=actions)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_ipv6_without_actions(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ 'actions': {},
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_del_ipv6(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_IPV6
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'rules': rules,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_del(
+ flowspec_family=flowspec_family,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.del', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_vpnv6(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV6
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+
+ actions = {
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ 'actions': actions,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules,
+ actions=actions)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_del_vpnv6(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_VPNV6
+ route_dist = '65001:100'
+ rules = {
+ 'dst_prefix': '2001::3/128/32',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_del(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.del_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_add_l2vpn(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_L2VPN
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+
+ actions = {
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ 'actions': actions,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_add(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules,
+ actions=actions)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.add_local', **expected_kwargs)
+
+ @mock.patch(
+ 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',
+ mock.MagicMock(return_value=None))
+ @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')
+ def test_flowspec_prefix_del_l2vpn(self, mock_call):
+ # Prepare test data
+ flowspec_family = bgpspeaker.FLOWSPEC_FAMILY_L2VPN
+ route_dist = '65001:100'
+ rules = {
+ 'dst_mac': '12:34:56:78:9a:bc',
+ }
+
+ expected_kwargs = {
+ 'flowspec_family': flowspec_family,
+ 'route_dist': route_dist,
+ 'rules': rules,
+ }
+
+ # Test
+ speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')
+ speaker.flowspec_prefix_del(
+ flowspec_family=flowspec_family,
+ route_dist=route_dist,
+ rules=rules)
+
+ # Check
+ mock_call.assert_called_with(
+ 'flowspec.del_local', **expected_kwargs)
diff --git a/tests/unit/services/protocols/bgp/test_peer.py b/tests/unit/services/protocols/bgp/test_peer.py
new file mode 100644
index 00000000..1597efdb
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/test_peer.py
@@ -0,0 +1,375 @@
+# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import logging
+try:
+ import mock # Python 2
+except ImportError:
+ from unittest import mock # Python 3
+
+from nose.tools import eq_
+
+from ryu.lib.packet import bgp
+from ryu.services.protocols.bgp import peer
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_Peer(unittest.TestCase):
+ """
+ Test case for peer.Peer
+ """
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def _test_construct_as_path_attr(
+ self, input_as_path, input_as4_path, expected_as_path):
+ # Prepare input data
+ input_as_path_attr = bgp.BGPPathAttributeAsPath(input_as_path)
+ input_as4_path_attr = bgp.BGPPathAttributeAs4Path(input_as4_path)
+ _peer = peer.Peer(None, None, None, None, None)
+
+ # TEST
+ output_as_path_attr = _peer._construct_as_path_attr(
+ input_as_path_attr, input_as4_path_attr)
+
+ eq_(bgp.BGP_ATTR_TYPE_AS_PATH, output_as_path_attr.type)
+ eq_(expected_as_path, output_as_path_attr.path_seg_list)
+
+ def test_construct_as_path_attr_sequence_only(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [[400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 400000, 300000, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_aggregated_as_path_1(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000], {10, 20, 30}, [23456, 23456, 40001]]
+ input_as4_path = [[400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000], {10, 20, 30}, [400000, 300000, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_aggregated_as_path_2(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000], {10, 20, 30}, [23456, 23456, 40001]]
+ input_as4_path = [[3000, 400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 3000, 400000, 300000, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_aggregated_path_3(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [[400000, 300000, 40001], {10, 20, 30}]
+ # Expected:
+ expected_as_path = [[65000, 400000, 300000, 40001], {10, 20, 30}]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_aggregated_as4_path(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [{10, 20, 30}, [400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000], {10, 20, 30}, [400000, 300000, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_too_short_as_path(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [[100000, 65000, 4000, 400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 23456, 23456, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_too_short_as4_path(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [[300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 23456, 300000, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ def test_construct_as_path_attr_empty_as4_path(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ input_as4_path = [[]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 23456, 23456, 40001]]
+
+ self._test_construct_as_path_attr(
+ input_as_path, input_as4_path, expected_as_path)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def test_construct_as_path_attr_as4_path_None(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ # input_as4_path = None
+ # Expected:
+ expected_as_path = [[65000, 4000, 23456, 23456, 40001]]
+
+ # Prepare input data
+ input_as_path_attr = bgp.BGPPathAttributeAsPath(input_as_path)
+ input_as4_path_attr = None
+ _peer = peer.Peer(None, None, None, None, None)
+
+ # TEST
+ output_as_path_attr = _peer._construct_as_path_attr(
+ input_as_path_attr, input_as4_path_attr)
+
+ eq_(bgp.BGP_ATTR_TYPE_AS_PATH, output_as_path_attr.type)
+ eq_(expected_as_path, output_as_path_attr.path_seg_list)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def _test_trans_as_path(
+ self, input_as_path, expected_as_path, expected_as4_path):
+ # Prepare input data
+ _peer = peer.Peer(None, None, None, None, None)
+
+ # TEST
+ output_as_path, output_as4_path = _peer._trans_as_path(input_as_path)
+
+ eq_(expected_as_path, output_as_path)
+ eq_(expected_as4_path, output_as4_path)
+
+ @mock.patch.object(
+ peer.Peer, 'is_four_octet_as_number_cap_valid',
+ mock.MagicMock(return_value=True))
+ def test_trans_as_path_as4_path_is_supported(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 400000, 300000, 40001]]
+ expected_as4_path = None
+
+ self._test_trans_as_path(
+ input_as_path, expected_as_path, expected_as4_path)
+
+ @mock.patch.object(
+ peer.Peer, 'is_four_octet_as_number_cap_valid',
+ mock.MagicMock(return_value=False))
+ def test_trans_as_path_sequence_only(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 400000, 300000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 23456, 23456, 40001]]
+ expected_as4_path = [[65000, 4000, 400000, 300000, 40001]]
+
+ self._test_trans_as_path(
+ input_as_path, expected_as_path, expected_as4_path)
+
+ @mock.patch.object(
+ peer.Peer, 'is_four_octet_as_number_cap_valid',
+ mock.MagicMock(return_value=False))
+ def test_trans_as_path_no_trans(self):
+ # Test Data
+ # Input:
+ input_as_path = [[65000, 4000, 40000, 30000, 40001]]
+ # Expected:
+ expected_as_path = [[65000, 4000, 40000, 30000, 40001]]
+ expected_as4_path = None
+
+ self._test_trans_as_path(
+ input_as_path, expected_as_path, expected_as4_path)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def _test_extract_and_reconstruct_as_path(
+ self, path_attributes, ex_as_path_value,
+ ex_aggregator_as_number, ex_aggregator_addr):
+ # Prepare test data
+ update_msg = bgp.BGPUpdate(path_attributes=path_attributes)
+ _peer = peer.Peer(None, None, None, None, None)
+
+ # Test
+ _peer._extract_and_reconstruct_as_path(update_msg)
+
+ umsg_pattrs = update_msg.pathattr_map
+ as_path_attr = umsg_pattrs.get(
+ bgp.BGP_ATTR_TYPE_AS_PATH, None)
+ as4_path_attr = umsg_pattrs.get(
+ bgp.BGP_ATTR_TYPE_AS4_PATH, None)
+ aggregator_attr = umsg_pattrs.get(
+ bgp.BGP_ATTR_TYPE_AGGREGATOR, None)
+ as4_aggregator_attr = umsg_pattrs.get(
+ bgp.BGP_ATTR_TYPE_AS4_AGGREGATOR, None)
+
+ eq_(ex_as_path_value, as_path_attr.value)
+ eq_(None, as4_path_attr)
+ eq_(ex_aggregator_as_number, aggregator_attr.as_number)
+ eq_(ex_aggregator_addr, aggregator_attr.addr)
+ eq_(None, as4_aggregator_attr)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def test_extract_and_reconstruct_as_path_with_no_as4_attr(self):
+ # Input values
+ in_as_path_value = [[1000, 2000, 3000]]
+ # in_as4_path_value
+ in_aggregator_as_number = 4000
+ in_aggregator_addr = '10.0.0.1'
+ # in_as4_aggregator_as_number
+ # in_as4_aggregator_addr
+
+ # Expected values
+ ex_as_path_value = [[1000, 2000, 3000]]
+ ex_aggregator_as_number = 4000
+ ex_aggregator_addr = '10.0.0.1'
+
+ # Prepare test data
+ path_attributes = [
+ bgp.BGPPathAttributeAsPath(
+ value=in_as_path_value),
+ bgp.BGPPathAttributeAggregator(
+ as_number=in_aggregator_as_number, addr=in_aggregator_addr),
+ ]
+
+ # Test
+ self._test_extract_and_reconstruct_as_path(
+ path_attributes, ex_as_path_value,
+ ex_aggregator_as_number, ex_aggregator_addr)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def test_extract_and_reconstruct_as_path_with_as4_attr(self):
+ # Input values
+ in_as_path_value = [[1000, 23456, 3000]]
+ in_as4_path_value = [[2000, 3000]]
+ in_aggregator_as_number = 23456
+ in_aggregator_addr = '10.0.0.1'
+ in_as4_aggregator_as_number = 4000
+ in_as4_aggregator_addr = '10.0.0.1'
+
+ # Expected values
+ ex_as_path_value = [[1000, 2000, 3000]]
+ ex_aggregator_as_number = 4000
+ ex_aggregator_addr = '10.0.0.1'
+
+ # Prepare test data
+ path_attributes = [
+ bgp.BGPPathAttributeAsPath(
+ value=in_as_path_value),
+ bgp.BGPPathAttributeAs4Path(
+ value=in_as4_path_value),
+ bgp.BGPPathAttributeAggregator(
+ as_number=in_aggregator_as_number,
+ addr=in_aggregator_addr),
+ bgp.BGPPathAttributeAs4Aggregator(
+ as_number=in_as4_aggregator_as_number,
+ addr=in_as4_aggregator_addr),
+ ]
+
+ # Test
+ self._test_extract_and_reconstruct_as_path(
+ path_attributes, ex_as_path_value,
+ ex_aggregator_as_number, ex_aggregator_addr)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def test_extract_and_reconstruct_as_path_with_not_trans_as_aggr(self):
+ # Input values
+ in_as_path_value = [[1000, 23456, 3000]]
+ in_as4_path_value = [[2000, 3000]]
+ in_aggregator_as_number = 4000 # not AS_TRANS
+ in_aggregator_addr = '10.0.0.1'
+ in_as4_aggregator_as_number = 4000
+ in_as4_aggregator_addr = '10.0.0.1'
+
+ # Expected values
+ ex_as_path_value = [[1000, 23456, 3000]]
+ ex_aggregator_as_number = 4000
+ ex_aggregator_addr = '10.0.0.1'
+
+ # Prepare test data
+ path_attributes = [
+ bgp.BGPPathAttributeAsPath(
+ value=in_as_path_value),
+ bgp.BGPPathAttributeAs4Path(
+ value=in_as4_path_value),
+ bgp.BGPPathAttributeAggregator(
+ as_number=in_aggregator_as_number,
+ addr=in_aggregator_addr),
+ bgp.BGPPathAttributeAs4Aggregator(
+ as_number=in_as4_aggregator_as_number,
+ addr=in_as4_aggregator_addr),
+ ]
+
+ # Test
+ self._test_extract_and_reconstruct_as_path(
+ path_attributes, ex_as_path_value,
+ ex_aggregator_as_number, ex_aggregator_addr)
+
+ @mock.patch.object(
+ peer.Peer, '__init__', mock.MagicMock(return_value=None))
+ def test_extract_and_reconstruct_as_path_with_short_as_path(self):
+ # Input values
+ in_as_path_value = [[1000, 23456, 3000]]
+ in_as4_path_value = [[2000, 3000, 4000, 5000]] # longer than AS_PATH
+ in_aggregator_as_number = 4000
+ in_aggregator_addr = '10.0.0.1'
+ # in_as4_aggregator_as_number
+ # in_as4_aggregator_addr
+
+ # Expected values
+ ex_as_path_value = [[1000, 23456, 3000]]
+ ex_aggregator_as_number = 4000
+ ex_aggregator_addr = '10.0.0.1'
+
+ # Prepare test data
+ path_attributes = [
+ bgp.BGPPathAttributeAsPath(
+ value=in_as_path_value),
+ bgp.BGPPathAttributeAs4Path(
+ value=in_as4_path_value),
+ bgp.BGPPathAttributeAggregator(
+ as_number=in_aggregator_as_number,
+ addr=in_aggregator_addr),
+ ]
+
+ # Test
+ self._test_extract_and_reconstruct_as_path(
+ path_attributes, ex_as_path_value,
+ ex_aggregator_as_number, ex_aggregator_addr)
diff --git a/tests/unit/services/protocols/bgp/utils/__init__.py b/tests/unit/services/protocols/bgp/utils/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/utils/__init__.py
diff --git a/tests/unit/services/protocols/bgp/utils/test_bgp.py b/tests/unit/services/protocols/bgp/utils/test_bgp.py
new file mode 100644
index 00000000..6933a28b
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/utils/test_bgp.py
@@ -0,0 +1,211 @@
+# Copyright (C) 2017 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import unittest
+
+from nose.tools import eq_, raises
+
+from ryu.lib.packet.bgp import (
+ BGPFlowSpecTrafficRateCommunity,
+ BGPFlowSpecTrafficActionCommunity,
+ BGPFlowSpecRedirectCommunity,
+ BGPFlowSpecTrafficMarkingCommunity,
+ BGPFlowSpecVlanActionCommunity,
+ BGPFlowSpecTPIDActionCommunity,
+)
+
+from ryu.services.protocols.bgp.core import BgpCoreError
+from ryu.services.protocols.bgp.utils.bgp import create_v4flowspec_actions
+from ryu.services.protocols.bgp.utils.bgp import create_v6flowspec_actions
+from ryu.services.protocols.bgp.utils.bgp import create_l2vpnflowspec_actions
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_Utils_BGP(unittest.TestCase):
+ """
+ Test case for ryu.services.protocols.bgp.utils.bgp
+ """
+
+ def _test_create_v4flowspec_actions(self, actions, expected_communities):
+ communities = create_v4flowspec_actions(actions)
+ expected_communities.sort(key=lambda x: x.subtype)
+ communities.sort(key=lambda x: x.subtype)
+ eq_(str(expected_communities), str(communities))
+
+ def test_create_v4flowspec_actions_all_actions(self):
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ 'traffic_action': {
+ 'action': 3,
+ },
+ 'redirect': {
+ 'as_number': 10,
+ 'local_administrator': 10,
+ },
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+ expected_communities = [
+ BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0),
+ BGPFlowSpecTrafficActionCommunity(action=3),
+ BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10),
+ BGPFlowSpecTrafficMarkingCommunity(dscp=24),
+ ]
+ self._test_create_v4flowspec_actions(actions, expected_communities)
+
+ def test_create_v4flowspec_actions_without_actions(self):
+ actions = None
+ expected_communities = []
+ self._test_create_v4flowspec_actions(actions, expected_communities)
+
+ @raises(ValueError)
+ def test_create_v4flowspec_actions_not_exist_actions(self):
+ actions = {
+ 'traffic_test': {
+ 'test': 10,
+ },
+ }
+ expected_communities = []
+ self._test_create_v4flowspec_actions(actions, expected_communities)
+
+ def _test_create_v6flowspec_actions(self, actions, expected_communities):
+ communities = create_v6flowspec_actions(actions)
+ expected_communities.sort(key=lambda x: x.subtype)
+ communities.sort(key=lambda x: x.subtype)
+ eq_(str(expected_communities), str(communities))
+
+ def test_create_v6flowspec_actions_all_actions(self):
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ 'traffic_action': {
+ 'action': 3,
+ },
+ 'redirect': {
+ 'as_number': 10,
+ 'local_administrator': 10,
+ },
+ 'traffic_marking': {
+ 'dscp': 24,
+ }
+ }
+ expected_communities = [
+ BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0),
+ BGPFlowSpecTrafficActionCommunity(action=3),
+ BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10),
+ BGPFlowSpecTrafficMarkingCommunity(dscp=24),
+ ]
+ self._test_create_v6flowspec_actions(actions, expected_communities)
+
+ def test_create_v6flowspec_actions_without_actions(self):
+ actions = None
+ expected_communities = []
+ self._test_create_v6flowspec_actions(actions, expected_communities)
+
+ @raises(ValueError)
+ def test_create_v6flowspec_actions_not_exist_actions(self):
+ actions = {
+ 'traffic_test': {
+ 'test': 10,
+ },
+ }
+ expected_communities = []
+ self._test_create_v6flowspec_actions(actions, expected_communities)
+
+ def _test_create_l2vpnflowspec_actions(self, actions, expected_communities):
+ communities = create_l2vpnflowspec_actions(actions)
+ expected_communities.sort(key=lambda x: x.subtype)
+ communities.sort(key=lambda x: x.subtype)
+ eq_(str(expected_communities), str(communities))
+
+ def test_create_l2vpnflowspec_actions_all_actions(self):
+ actions = {
+ 'traffic_rate': {
+ 'as_number': 0,
+ 'rate_info': 100.0,
+ },
+ 'traffic_action': {
+ 'action': 3,
+ },
+ 'redirect': {
+ 'as_number': 10,
+ 'local_administrator': 10,
+ },
+ 'traffic_marking': {
+ 'dscp': 24,
+ },
+ 'vlan_action': {
+ 'actions_1': (BGPFlowSpecVlanActionCommunity.POP |
+ BGPFlowSpecVlanActionCommunity.SWAP),
+ 'vlan_1': 3000,
+ 'cos_1': 3,
+ 'actions_2': BGPFlowSpecVlanActionCommunity.PUSH,
+ 'vlan_2': 4000,
+ 'cos_2': 2,
+ },
+ 'tpid_action': {
+ 'actions': (BGPFlowSpecTPIDActionCommunity.TI |
+ BGPFlowSpecTPIDActionCommunity.TO),
+ 'tpid_1': 5,
+ 'tpid_2': 6,
+ }
+ }
+ expected_communities = [
+ BGPFlowSpecTrafficRateCommunity(as_number=0, rate_info=100.0),
+ BGPFlowSpecTrafficActionCommunity(action=3),
+ BGPFlowSpecRedirectCommunity(as_number=10, local_administrator=10),
+ BGPFlowSpecTrafficMarkingCommunity(dscp=24),
+ BGPFlowSpecVlanActionCommunity(
+ actions_1=(BGPFlowSpecVlanActionCommunity.POP |
+ BGPFlowSpecVlanActionCommunity.SWAP),
+ vlan_1=3000,
+ cos_1=3,
+ actions_2=BGPFlowSpecVlanActionCommunity.PUSH,
+ vlan_2=4000,
+ cos_2=2,
+ ),
+ BGPFlowSpecTPIDActionCommunity(
+ actions=(BGPFlowSpecTPIDActionCommunity.TI |
+ BGPFlowSpecTPIDActionCommunity.TO),
+ tpid_1=5,
+ tpid_2=6,
+ ),
+ ]
+ self._test_create_l2vpnflowspec_actions(actions, expected_communities)
+
+ def test_create_l2vpnflowspec_actions_without_actions(self):
+ actions = None
+ expected_communities = []
+ self._test_create_l2vpnflowspec_actions(actions, expected_communities)
+
+ @raises(ValueError)
+ def test_create_l2vpnflowspec_actions_not_exist_actions(self):
+ actions = {
+ 'traffic_test': {
+ 'test': 10,
+ },
+ }
+ expected_communities = []
+ self._test_create_l2vpnflowspec_actions(actions, expected_communities)
diff --git a/tests/unit/services/protocols/bgp/utils/test_validation.py b/tests/unit/services/protocols/bgp/utils/test_validation.py
new file mode 100644
index 00000000..6d5f6ac3
--- /dev/null
+++ b/tests/unit/services/protocols/bgp/utils/test_validation.py
@@ -0,0 +1,215 @@
+# Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import unittest
+
+from nose.tools import eq_, ok_
+
+from ryu.services.protocols.bgp.utils import validation
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Test_Utils_Validation(unittest.TestCase):
+ """
+ Test case for ryu.services.protocols.bgp.utils.validation
+ """
+
+ def test_is_valid_mac(self):
+ ok_(validation.is_valid_mac('aa:bb:cc:dd:ee:ff'))
+
+ def test_is_valid_mac_hyphenation(self):
+ ok_(validation.is_valid_mac('aa-bb-cc-dd-ee-ff'))
+
+ def test_is_valid_mac_short(self):
+ eq_(False, validation.is_valid_mac('aa:bb:cc:dd:ee'))
+
+ def test_is_valid_ip_prefix(self):
+ ok_(validation.is_valid_ip_prefix(24, 32))
+
+ def test_is_valid_ip_prefix_str(self):
+ ok_(validation.is_valid_ip_prefix('24', 32))
+
+ def test_is_valid_ip_prefix_not_digit(self):
+ eq_(False, validation.is_valid_ip_prefix('foo', 32))
+
+ def test_is_valid_ip_prefix_over(self):
+ eq_(False, validation.is_valid_ip_prefix(100, 32))
+
+ def test_is_valid_ipv4(self):
+ ok_(validation.is_valid_ipv4('10.0.0.1'))
+
+ def test_is_valid_ipv4_not_dot(self):
+ eq_(False, validation.is_valid_ipv4('192:168:0:1'))
+
+ def test_is_valid_ipv4_prefix(self):
+ ok_(validation.is_valid_ipv4_prefix('10.0.0.1/24'))
+
+ def test_is_valid_ipv4_prefix_not_str(self):
+ eq_(False, validation.is_valid_ipv4_prefix(1234))
+
+ def test_is_valid_ipv4_prefix_without_prefix(self):
+ eq_(False, validation.is_valid_ipv4_prefix('10.0.0.1'))
+
+ def test_is_valid_ipv4_prefix_invalid_addr(self):
+ eq_(False, validation.is_valid_ipv4_prefix('xxx.xxx.xxx.xxx/24'))
+
+ def test_is_valid_ipv6(self):
+ ok_(validation.is_valid_ipv6('fe80::0011:aabb:ccdd:eeff'))
+
+ def test_is_valid_ipv6_not_colon(self):
+ eq_(False, validation.is_valid_ipv6('fe80--0011-aabb-ccdd-eeff'))
+
+ def test_is_valid_ipv6_prefix(self):
+ ok_(validation.is_valid_ipv6_prefix('fe80::0011:aabb:ccdd:eeff/64'))
+
+ def test_is_valid_ipv6_prefix_not_str(self):
+ eq_(False, validation.is_valid_ipv6_prefix(1234))
+
+ def test_is_valid_ipv6_prefix_without_prefix(self):
+ eq_(False,
+ validation.is_valid_ipv6_prefix('fe80::0011:aabb:ccdd:eeff'))
+
+ def test_is_valid_ipv6_prefix_invalid_addr(self):
+ eq_(False, validation.is_valid_ipv6_prefix('xxxx::xxxx/64'))
+
+ def test_is_valid_old_asn(self):
+ ok_(validation.is_valid_old_asn(65000))
+
+ def test_is_valid_old_asn_negative(self):
+ eq_(False, validation.is_valid_old_asn(-1))
+
+ def test_is_valid_old_asn_over(self):
+ eq_(False, validation.is_valid_old_asn(0xffff + 1))
+
+ def test_is_valid_asn(self):
+ ok_(validation.is_valid_asn(6553800))
+
+ def test_is_valid_asn_old(self):
+ ok_(validation.is_valid_asn(65000))
+
+ def test_is_valid_asn_negative(self):
+ eq_(False, validation.is_valid_asn(-1))
+
+ def test_is_valid_asn_over(self):
+ eq_(False, validation.is_valid_asn(0xffffffff + 1))
+
+ def test_is_valid_vpnv4_prefix(self):
+ ok_(validation.is_valid_vpnv4_prefix('100:200:10.0.0.1/24'))
+
+ def test_is_valid_vpnv4_prefix_not_str(self):
+ eq_(False, validation.is_valid_vpnv4_prefix(1234))
+
+ def test_is_valid_vpnv4_prefix_short_rd(self):
+ eq_(False, validation.is_valid_vpnv4_prefix('100:10.0.0.1/24'))
+
+ def test_is_valid_vpnv4_prefix_invalid_rd(self):
+ eq_(False, validation.is_valid_vpnv4_prefix('foo:bar:10.0.0.1/24'))
+
+ def test_is_valid_vpnv6_prefix(self):
+ ok_(validation.is_valid_vpnv6_prefix(
+ '100:200:fe80::0011:aabb:ccdd:eeff/64'))
+
+ def test_is_valid_vpnv6_prefix_not_str(self):
+ eq_(False, validation.is_valid_vpnv6_prefix(1234))
+
+ def test_is_valid_vpnv6_prefix_short_rd(self):
+ eq_(False, validation.is_valid_vpnv6_prefix('100:eeff/64'))
+
+ def test_is_valid_vpnv6_prefix_invalid_rd(self):
+ eq_(False, validation.is_valid_vpnv6_prefix('foo:bar:10.0.0.1/24'))
+
+ def test_is_valid_med(self):
+ ok_(validation.is_valid_med(100))
+
+ def test_is_valid_med_not_num(self):
+ eq_(False, validation.is_valid_med('foo'))
+
+ def test_is_valid_med_negative(self):
+ eq_(False, validation.is_valid_med(-1))
+
+ def test_is_valid_med_over(self):
+ eq_(False, validation.is_valid_med(0xffffffff + 1))
+
+ def test_is_valid_mpls_label(self):
+ ok_(validation.is_valid_mpls_label(100))
+
+ def test_is_valid_mpls_label_reserved(self):
+ eq_(False, validation.is_valid_mpls_label(4))
+
+ def test_is_valid_mpls_label_not_num(self):
+ eq_(False, validation.is_valid_mpls_label('foo'))
+
+ def test_is_valid_mpls_label_negative(self):
+ eq_(False, validation.is_valid_mpls_label(-1))
+
+ def test_is_valid_mpls_label_over(self):
+ eq_(False, validation.is_valid_mpls_label(0x100000 + 1))
+
+ def test_is_valid_mpls_labels(self):
+ ok_(validation.is_valid_mpls_labels([100, 200]))
+
+ def test_is_valid_mpls_labels_not_list(self):
+ eq_(False, validation.is_valid_mpls_labels(100))
+
+ def test_is_valid_mpls_labels_with_invalid_label(self):
+ eq_(False, validation.is_valid_mpls_labels(['foo', 200]))
+
+ def test_is_valid_route_dist(self):
+ ok_(validation.is_valid_route_dist('65000:222'))
+
+ def test_is_valid_route_dist_ipv4_based(self):
+ ok_(validation.is_valid_route_dist('10.0.0.1:333'))
+
+ def test_is_valid_route_not_str(self):
+ eq_(False, validation.is_valid_route_dist(65000))
+
+ def test_is_valid_route_dist_short(self):
+ eq_(False, validation.is_valid_route_dist('65000'))
+
+ def test_is_valid_route_dist_invalid_ipv4_addr(self):
+ eq_(False, validation.is_valid_route_dist('xxx.xxx.xxx.xxx:333'))
+
+ def test_is_valid_esi(self):
+ ok_(validation.is_valid_esi(100))
+
+ def test_is_valid_esi_not_int(self):
+ eq_(False, validation.is_valid_esi('foo'))
+
+ def test_is_valid_ethernet_tag_id(self):
+ ok_(validation.is_valid_ethernet_tag_id(100))
+
+ def test_is_valid_ethernet_tag_id_not_int(self):
+ eq_(False, validation.is_valid_ethernet_tag_id('foo'))
+
+ def test_is_valid_ethernet_tag_id_negative(self):
+ eq_(False, validation.is_valid_ethernet_tag_id(-1))
+
+ def test_is_valid_ethernet_tag_id_over(self):
+ eq_(False, validation.is_valid_ethernet_tag_id(0xffffffff + 1))
+
+ def test_is_valid_vni(self):
+ ok_(validation.is_valid_vni(100))
+
+ def test_is_valid_vni_not_int(self):
+ eq_(False, validation.is_valid_vni('foo'))
+
+ def test_is_valid_vni_negative(self):
+ eq_(False, validation.is_valid_vni(-1))
+
+ def test_is_valid_vni_over(self):
+ eq_(False, validation.is_valid_vni(0xffffff + 1))