summaryrefslogtreecommitdiffhomepage
path: root/test/scenario_test/route_server_policy_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/scenario_test/route_server_policy_test.py')
-rw-r--r--test/scenario_test/route_server_policy_test.py395
1 files changed, 379 insertions, 16 deletions
diff --git a/test/scenario_test/route_server_policy_test.py b/test/scenario_test/route_server_policy_test.py
index 7c2e0456..47b2abc6 100644
--- a/test/scenario_test/route_server_policy_test.py
+++ b/test/scenario_test/route_server_policy_test.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
-import os
import time
import sys
import nose
@@ -25,13 +23,6 @@ from noseplugin import parser_option
from gobgp_test import GoBGPTestBase
from constant import *
-peer1 = "10.0.0.1"
-peer2 = "10.0.0.2"
-peer3 = "10.0.0.3"
-
-prefix1 = "192.168.2.0/24"
-prefix2 = "192.168.20.0/24"
-prefix3 = "192.168.200.0/24"
class GoBGPTest(GoBGPTestBase):
@@ -44,7 +35,9 @@ class GoBGPTest(GoBGPTestBase):
use_local = parser_option.use_local
go_path = parser_option.go_path
log_debug = parser_option.gobgp_log_debug
- fab.init_policy_test_env_executor(self.quagga_num, use_local, go_path, log_debug, policy=policy_pattern)
+ fab.init_policy_test_env_executor(self.quagga_num, use_local, go_path,
+ log_debug, policy=policy_pattern,
+ use_ipv6=self.use_ipv6_gobgp)
print "please wait " + str(self.initial_wait_time) + " second"
time.sleep(self.initial_wait_time)
@@ -67,11 +60,15 @@ class GoBGPTest(GoBGPTestBase):
addresses = self.get_neighbor_address(self.gobgp_config)
self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+ peer1 = "10.0.0.1"
+ peer2 = "10.0.0.2"
+ peer3 = "10.0.0.3"
+ prefix1 = "192.168.2.0/24"
path = self.get_paths_in_localrib(peer1, prefix1, retry=3)
self.assertIsNotNone(path)
# check show ip bgp on peer1(quagga1)
- qpath = self.get_routing_table(peer1,prefix1, retry=3)
+ qpath = self.get_route(peer1,prefix1, retry=3)
print qpath
self.assertIsNotNone(qpath)
@@ -85,7 +82,7 @@ class GoBGPTest(GoBGPTestBase):
self.assertIsNone(path)
# check show ip bgp on peer1(quagga3)
- qpath = self.get_routing_table(peer3,prefix1, retry=3)
+ qpath = self.get_route(peer3,prefix1, retry=3)
# print qpath
self.assertIsNone(qpath)
@@ -107,12 +104,17 @@ class GoBGPTest(GoBGPTestBase):
addresses = self.get_neighbor_address(self.gobgp_config)
self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+ peer1 = "10.0.0.1"
+ peer2 = "10.0.0.2"
+ peer3 = "10.0.0.3"
+ prefix1 = "192.168.2.0/24"
+
paths = self.get_paths_in_localrib(peer1, prefix1, retry=3)
# print paths
self.assertIsNotNone(paths)
# check show ip bgp on peer1(quagga1)
- qpath = self.get_routing_table(peer1, prefix1, retry=3)
+ qpath = self.get_route(peer1, prefix1, retry=3)
# print qpath
self.assertIsNotNone(qpath)
@@ -130,7 +132,7 @@ class GoBGPTest(GoBGPTestBase):
self.assertIsNone(path)
# check show ip bgp on peer1(quagga3)
- qpath = self.get_routing_table(peer3,prefix1, retry=3)
+ qpath = self.get_route(peer3,prefix1, retry=3)
# print qpath
self.assertIsNone(qpath)
@@ -166,6 +168,13 @@ class GoBGPTest(GoBGPTestBase):
# coming from peer2(10.0.0.2) to peer3(10.0.0.3)'s import-policy.
self.initialize(policy_pattern="p3")
+ peer1 = "10.0.0.1"
+ peer2 = "10.0.0.2"
+ peer3 = "10.0.0.3"
+ prefix1 = "192.168.2.0/24"
+ prefix2 = "192.168.20.0/24"
+ prefix3 = "192.168.200.0/24"
+
# add other network
tn = qaccess.login(peer2)
print "add network 192.168.20.0/24"
@@ -184,7 +193,7 @@ class GoBGPTest(GoBGPTestBase):
return paths is not None
def path_exists_in_routing_table(peer, prefix,r=10):
- qpath = self.get_routing_table(peer, prefix, retry=r)
+ qpath = self.get_route(peer, prefix, retry=r)
return qpath is not None
def path_exists_in_adj_rib_in(peer, prefix,r=10):
@@ -267,6 +276,13 @@ class GoBGPTest(GoBGPTestBase):
# coming from peer2(10.0.0.2) to peer3(10.0.0.3)'s export-policy.
self.initialize(policy_pattern="p4")
+ peer1 = "10.0.0.1"
+ peer2 = "10.0.0.2"
+ peer3 = "10.0.0.3"
+ prefix1 = "192.168.2.0/24"
+ prefix2 = "192.168.20.0/24"
+ prefix3 = "192.168.200.0/24"
+
# add other network
tn = qaccess.login(peer2)
print "add network 192.168.20.0/24"
@@ -285,7 +301,7 @@ class GoBGPTest(GoBGPTestBase):
return paths is not None
def path_exists_in_routing_table(peer, prefix,r=10):
- qpath = self.get_routing_table(peer, prefix, retry=r)
+ qpath = self.get_route(peer, prefix, retry=r)
return qpath is not None
def path_exists_in_adj_rib_in(peer, prefix,r=10):
@@ -351,6 +367,353 @@ class GoBGPTest(GoBGPTestBase):
self.assertFalse(path_exists_in_routing_table(peer3, prefix2,r=3))
self.assertTrue(path_exists_in_routing_table(peer3, prefix3))
+ """
+ import-policy test
+ r1=2001:0:10:2::/64
+ --------------------------------------------------
+ peer2 ->(r1)-> | ->(r1)-> peer1-rib ->(r1)-> peer1-adj-rib-out | ->(r1)-> peer1
+ | |
+ | ->x peer3-rib |
+ --------------------------------------------------
+ """
+ def test_05_import_policy_initial_ipv6(self):
+
+ # initialize test environment
+ # policy_pattern:p5 attaches a policy to reject route 2001:0:10:2:: (64...128)
+ # coming from peer2(2001::192:168:0:2) to peer3(2001::192:168:0:3)'s
+ # import-policy.
+ self.use_ipv6_gobgp = True
+ self.initialize(policy_pattern="p5")
+
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+
+ peer1 = "2001::192:168:0:1"
+ peer2 = "2001::192:168:0:2"
+ peer3 = "2001::192:168:0:3"
+ r1 = "2001:0:10:2::/64"
+ w = self.wait_per_retry
+
+ # path = util.get_paths_in_localrib(peer1, r1, retry=3, interval=w, rf=IPv6)
+ path = self.get_paths_in_localrib(peer1, r1, retry=3, af=IPv6, interval=w)
+ self.assertIsNotNone(path)
+
+ # check show ip bgp on peer1(quagga1)
+ # qpath = util.get_route(peer1, r1_pref, retry=3, interval=w, rf=IPv6)
+ qpath = self.get_route(peer1, r1, retry=3, af=IPv6)
+ self.assertIsNotNone(qpath)
+
+ # check adj-rib-out in peer2
+ # path = util.get_adj_rib_in(base_url, peer2, r1_pref, retry=3, interval=w, rf=IPv6)
+ path = self.get_adj_rib_in(peer2, r1, retry=3, af=IPv6)
+ # print path
+ self.assertIsNotNone(path)
+
+ # path = util.get_paths_in_localrib(base_url, peer3, r1, retry=0, interval=w, rf=IPv6)
+ path = self.get_paths_in_localrib(peer3, r1, retry=0, af=IPv6)
+ # print path
+ self.assertIsNone(path)
+
+ # check show ip bgp on peer1(quagga3)
+ # qpath = util.get_route(peer3, r1_pref, retry=3, interval=w, rf=IPv6)
+ qpath = self.get_route(peer3, r1, retry=3, interval=w, af=IPv6)
+ print qpath
+ self.assertIsNone(qpath)
+
+
+ """
+ export-policy test
+ r1=2001:0:10:2::/64
+ --------------------------------------------------
+ peer2 ->(r1)-> | ->(r1)-> peer1-rib ->(r1)-> peer1-adj-rib-out | ->(r1)-> peer1
+ | |
+ | ->(r1)-> peer3-rib ->x peer3-adj-rib-out |
+ --------------------------------------------------
+ """
+ def test_06_export_policy_initial_ipv6(self):
+
+ # initialize test environment
+ # policy_pattern:p6 attaches a policy to reject route 2001:0:10:2:: (64...128)
+ # coming from peer2(2001::192:168:0:2) to peer3(2001::192:168:0:3)'s export-policy.
+ self.use_ipv6_gobgp = True
+ self.initialize(policy_pattern="p6")
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+
+ peer1 = "2001::192:168:0:1"
+ peer2 = "2001::192:168:0:2"
+ peer3 = "2001::192:168:0:3"
+ r1 = "2001:0:10:2::/64"
+ w = self.wait_per_retry
+
+ paths = self.get_paths_in_localrib(peer1, r1, retry=3, interval=w, af=IPv6)
+
+ # print paths
+ self.assertIsNotNone(paths)
+
+ # check show ip bgp on peer1(quagga1)
+ qpath = self.get_route(peer1, r1, retry=3, interval=w, af=IPv6)
+ # print qpath
+ self.assertIsNotNone(qpath)
+
+ # check adj-rib-out in peer2
+ path = self.get_adj_rib_in(peer2, r1, retry=1, interval=w, af=IPv6)
+ # print path
+ self.assertIsNotNone(path)
+
+ path = self.get_paths_in_localrib(peer3, r1, af=IPv6)
+ # print path
+ self.assertIsNotNone(path)
+
+ path = self.get_adj_rib_out(peer3, r1, retry=1, interval=w, af=IPv6)
+ # print path
+ self.assertIsNone(path)
+
+ # check show ip bgp on peer1(quagga3)
+ qpath = self.get_route(peer3, r1, retry=3, interval=w, af=IPv6)
+ # print qpath
+ self.assertIsNone(qpath)
+
+
+ """
+ import-policy test
+ r1=2001:0:10:2::/64
+ r2=2001:0:10:20::/64
+ r3=2001:0:10:200::/64
+ -------------------------------------------------
+ |peer1 |
+ peer2 ->(r1,r2,r3)-> | ->(r1,r2,r3)-> rib ->(r1,r2,r3)-> adj-rib-out | ->(r1,r2,r3)-> peer1
+ | |
+ |peer3 |
+ | ->(r1)-> rib ->(r1)-> adj-rib-out | ->(r1)-> peer3
+ -------------------------------------------------
+ |
+ update gobgp.conf
+ |
+ V
+ -------------------------------------------------
+ |peer1 |
+ peer2 ->(r1,r2,r3)-> | ->(r1,r2,r3)-> rib ->(r1,r2,r3)-> adj-rib-out | ->(r1,r2,r3)-> peer1
+ | |
+ |peer3 |
+ | ->(r1,r3)-> rib ->(r1,r3)-> adj-rib-out | ->(r1,r3)-> peer3
+ -------------------------------------------------
+ """
+ def test_07_import_policy_update(self):
+ # initialize test environment
+ # policy_pattern:p7 attaches a policy to reject route
+ # 2001:0:10:2::/64, 2001:0:10:20::/64, 2001:0:10:200::/64
+ # coming from peer2(2001::192:168:0:2) to peer3(2001::192:168:0:3)'s
+ # import-policy.
+ self.use_ipv6_gobgp = True
+ self.initialize(policy_pattern="p7")
+
+ peer1 = "2001::192:168:0:1"
+ peer2 = "2001::192:168:0:2"
+ peer3 = "2001::192:168:0:3"
+ r1 = "2001:0:10:2::/64"
+ r2 = "2001:0:10:20::/64"
+ r3 = "2001:0:10:200::/64"
+
+ w = self.wait_per_retry
+
+ # add other network
+ tn = qaccess.login(peer2)
+ print "add network 2001:0:10:20::/64"
+ qaccess.add_network(tn, 65002, r2, use_ipv6=True)
+ print "add network 2001:0:10:200::/64"
+ qaccess.add_network(tn, 65002, r3, use_ipv6=True)
+ qaccess.logout(tn)
+
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+
+ time.sleep(self.initial_wait_time)
+
+ def path_exists_in_localrib(peer, prefix, r=10):
+ paths = self.get_paths_in_localrib(peer, prefix,
+ retry=r, interval=w, af=IPv6)
+ return paths is not None
+
+ def path_exists_in_routing_table(peer, prefix, r=10):
+ qpath = self.get_route(peer, prefix, retry=r, interval=w, af=IPv6)
+ return qpath is not None
+
+ def path_exists_in_adj_rib_in(peer, prefix, r=10):
+ path = self.get_adj_rib_in(peer, prefix,
+ retry=r, interval=w, af=IPv6)
+ return path is not None
+
+ self.assertTrue(path_exists_in_localrib(peer1, r1))
+ self.assertTrue(path_exists_in_localrib(peer1, r2))
+ self.assertTrue(path_exists_in_localrib(peer1, r3))
+
+ self.assertTrue(path_exists_in_localrib(peer3, r1))
+ self.assertFalse(path_exists_in_localrib(peer3, r2, r=3))
+ self.assertFalse(path_exists_in_localrib(peer3, r3, r=0))
+
+ # check show ip bgp on peer1(quagga1)
+ self.assertTrue(path_exists_in_routing_table(peer1, r1))
+ self.assertTrue(path_exists_in_routing_table(peer1, r2))
+ self.assertTrue(path_exists_in_routing_table(peer1, r3))
+
+ # check show ip bgp on peer3(quagga3)
+ self.assertTrue(path_exists_in_routing_table(peer3, r1))
+ self.assertFalse(path_exists_in_routing_table(peer3, r2, r=3))
+ self.assertFalse(path_exists_in_routing_table(peer3, r3, r=0))
+
+ # check adj-rib-out in peer2
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r1))
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r2))
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r3))
+
+ # update policy
+ print "update_policy_config"
+ fab.update_policy_config(parser_option.go_path, policy_pattern="p7")
+ time.sleep(self.initial_wait_time)
+
+ # soft reset
+ print "soft_reset"
+ self.soft_reset(peer2, IPv6)
+
+ # check local-rib
+ self.assertTrue(path_exists_in_localrib(peer3, r1))
+ self.assertFalse(path_exists_in_localrib(peer3, r2, r=3))
+ self.assertTrue(path_exists_in_localrib(peer3, r3))
+
+ # check show ip bgp on peer3(quagga3)
+ self.assertTrue(path_exists_in_routing_table(peer3, r1))
+ self.assertFalse(path_exists_in_routing_table(peer3, r2, r=0))
+ self.assertTrue(path_exists_in_routing_table(peer3, r3))
+
+ """
+ export-policy test
+ r1=2001:0:10:2::/64
+ r2=2001:0:10:20::/64
+ r3=2001:0:10:200::/64
+ -------------------------------------------------
+ |peer1 |
+ peer2 ->(r1,r2,r3)-> | ->(r1,r2,r3)-> rib ->(r1,r2,r3)-> adj-rib-out | ->(r1,r2,r3)-> peer1
+ | |
+ |peer3 |
+ | ->(r1,r2,r3)-> rib ->(r1)-> adj-rib-out | ->(r1)-> peer3
+ -------------------------------------------------
+ |
+ update gobgp.conf
+ |
+ V
+ -------------------------------------------------
+ |peer1 |
+ peer2 ->(r1,r2,r3)-> | ->(r1,r2,r3)-> rib ->(r1,r2,r3)-> adj-rib-out | ->(r1,r2,r3)-> peer1
+ | |
+ |peer3 |
+ | ->(r1,r2,r3)-> rib ->(r1,r3)-> adj-rib-out | ->(r1,r3)-> peer3
+ -------------------------------------------------
+ """
+ @nose.tools.nottest
+ def test_08_export_policy_update(self):
+ # initialize test environment
+ # policy_pattern:p8 attaches a policy to reject route
+ # 2001:0:10:2::/64, 2001:0:10:20::/64, 2001:0:10:200::/64
+ # coming from peer2(2001::192:168:0:2) to peer3(2001::192:168:0:3)'s
+ # export-policy.
+ self.use_ipv6_gobgp = True
+ self.initialize(policy_pattern="p8")
+
+ peer1 = "2001::192:168:0:1"
+ peer2 = "2001::192:168:0:2"
+ peer3 = "2001::192:168:0:3"
+ r1 = "2001:0:10:2::/64"
+ r2 = "2001:0:10:20::/64"
+ r3 = "2001:0:10:200::/64"
+ w = self.wait_per_retry
+
+ # add other network
+ tn = qaccess.login(peer2)
+ print "add network 2001:0:10:20::/64"
+ qaccess.add_network(tn, 65002, r2, use_ipv6=True)
+ print "add network 2001:0:10:200::/64"
+ qaccess.add_network(tn, 65002, r3, use_ipv6=True)
+ qaccess.logout(tn)
+
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
+
+ time.sleep(self.initial_wait_time)
+
+ def path_exists_in_localrib(peer, prefix, r=10):
+ paths = self.get_paths_in_localrib(peer, prefix,
+ retry=r, interval=w, af=IPv6)
+ return paths is not None
+
+ def path_exists_in_routing_table(peer, prefix, r=10):
+ qpath = self.get_route(peer, prefix, retry=r, interval=w, af=IPv6)
+ return qpath is not None
+
+ def path_exists_in_adj_rib_in(peer, prefix, r=10):
+ path = self.get_adj_rib_in(peer, prefix,
+ retry=r, interval=w, af=IPv6)
+ return path is not None
+
+ def path_exists_in_adj_rib_out(peer, prefix, r=10):
+ path = self.get_adj_rib_out(peer, prefix,
+ retry=r, interval=w, af=IPv6)
+ return path is not None
+
+ self.assertTrue(path_exists_in_localrib(peer1, r1))
+ self.assertTrue(path_exists_in_localrib(peer1, r2))
+ self.assertTrue(path_exists_in_localrib(peer1, r3))
+
+ # check peer3 local-rib
+ self.assertTrue(path_exists_in_localrib(peer3, r1))
+ self.assertTrue(path_exists_in_localrib(peer3, r2))
+ self.assertTrue(path_exists_in_localrib(peer3, r3))
+
+ # check peer3 rib-out
+ self.assertTrue(path_exists_in_adj_rib_out(peer3, r1))
+ self.assertFalse(path_exists_in_adj_rib_out(peer3, r2, r=3))
+ self.assertFalse(path_exists_in_adj_rib_out(peer3, r3, r=3))
+
+ # check show ip bgp on peer1(quagga1)
+ self.assertTrue(path_exists_in_routing_table(peer1, r1))
+ self.assertTrue(path_exists_in_routing_table(peer1, r2))
+ self.assertTrue(path_exists_in_routing_table(peer1, r3))
+
+ # check show ip bgp on peer3(quagga3)
+ self.assertTrue(path_exists_in_routing_table(peer3, r1))
+ self.assertFalse(path_exists_in_routing_table(peer3, r2, r=3))
+ self.assertFalse(path_exists_in_routing_table(peer3, r3, r=0))
+
+ # check adj-rib-out in peer2
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r1))
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r2))
+ self.assertTrue(path_exists_in_adj_rib_in(peer2, r3))
+
+ # update policy
+ print "update_policy_config"
+ fab.update_policy_config(parser_option.go_path, policy_pattern="p8")
+ time.sleep(self.initial_wait_time)
+
+ # soft reset
+ print "soft_reset"
+ self.soft_reset(peer2, "ipv6")
+
+ # check local-rib
+ self.assertTrue(path_exists_in_localrib(peer3, r1))
+ self.assertTrue(path_exists_in_localrib(peer3, r2))
+ self.assertTrue(path_exists_in_localrib(peer3, r3))
+
+ # check local-adj-out-rib
+ self.assertTrue(path_exists_in_adj_rib_out(peer3, r1))
+ self.assertFalse(path_exists_in_adj_rib_out(peer3, r2, r=3))
+ # Currently this test fails because of export_policy handling
+ self.assertTrue(path_exists_in_adj_rib_out(peer3, r3))
+
+ # check show ip bgp on peer3(quagga3)
+ self.assertTrue(path_exists_in_routing_table(peer3, r1))
+ self.assertFalse(path_exists_in_routing_table(peer3, r2, r=3))
+ self.assertTrue(path_exists_in_routing_table(peer3, r3))
+
if __name__ == '__main__':
if fab.test_user_check() is False: