summaryrefslogtreecommitdiffhomepage
path: root/test/scenario_test/route_server_policy_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/scenario_test/route_server_policy_test.py')
-rw-r--r--test/scenario_test/route_server_policy_test.py133
1 files changed, 31 insertions, 102 deletions
diff --git a/test/scenario_test/route_server_policy_test.py b/test/scenario_test/route_server_policy_test.py
index 42d74d9e..9ae29a2b 100644
--- a/test/scenario_test/route_server_policy_test.py
+++ b/test/scenario_test/route_server_policy_test.py
@@ -14,42 +14,25 @@
# limitations under the License.
import unittest
-import requests
-import json
-import toml
import os
import time
import sys
import nose
import quagga_access as qaccess
-from peer_info import Peer
-from peer_info import Destination
-from peer_info import Path
-from ciscoconfparse import CiscoConfParse
import docker_control as fab
from noseplugin import OptionParser
from noseplugin import parser_option
-import scenario_test_util as util
+from gobgp_test import GoBGPTestBase
+from constant import *
-class GoBGPTest(unittest.TestCase):
+class GoBGPTest(GoBGPTestBase):
- gobgp_ip = "10.0.255.1"
- gobgp_port = "8080"
- rest_url_neighbor = "http://" + gobgp_ip + ":" + gobgp_port + "/v1/bgp/neighbor/"
- base_dir = "/tmp/gobgp/"
- gobgp_config_file = "/tmp/gobgp/gobgpd.conf"
- gobgp_config = None
quagga_num = 3
- initial_wait_time = 10
- wait_per_retry = 5
def __init__(self, *args, **kwargs):
super(GoBGPTest, self).__init__(*args, **kwargs)
- def setUp(self):
- self.quagga_configs = []
-
def initialize(self, policy_pattern=None):
use_local = parser_option.use_local
go_path = parser_option.go_path
@@ -60,12 +43,6 @@ class GoBGPTest(unittest.TestCase):
self.assertTrue(self.check_load_config())
- def check_established(self, addresses):
- for address in addresses:
- result = self.retry_until(address, target_state="BGP_FSM_ESTABLISHED",retry=10)
- self.assertEqual(result, True)
-
-
"""
import-policy test
---------------------------------------
@@ -80,33 +57,32 @@ class GoBGPTest(unittest.TestCase):
# policy_pattern:p1 attaches a policy to reject route 192.168.0.0/16 (16...24)
# coming from peer2(10.0.0.2) to peer3(10.0.0.3)'s import-policy.
self.initialize(policy_pattern="p1")
- self.check_established(util.get_neighbor_address(self.gobgp_config))
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
peer1 = "10.0.0.1"
peer2 = "10.0.0.2"
peer3 = "10.0.0.3"
- base_url = self.rest_url_neighbor
- w = self.wait_per_retry
- path = util.get_paths_in_localrib(base_url, peer1, "192.168.2.0", retry=3, interval=w)
+ path = self.get_paths_in_localrib(peer1, "192.168.2.0", retry=3)
self.assertIsNotNone(path)
# check show ip bgp on peer1(quagga1)
- qpath = util.get_routing_table(peer1,"192.168.2.0", retry=3, interval=w)
+ qpath = self.get_routing_table(peer1,"192.168.2.0", retry=3)
print qpath
self.assertIsNotNone(qpath)
# check adj-rib-out in peer2
- path = util.get_adj_rib_in(base_url, peer2, "192.168.2.0/24", retry=3, interval=w)
+ path = self.get_adj_rib_in(peer2, "192.168.2.0/24", retry=3)
# print path
self.assertIsNotNone(path)
- path = util.get_paths_in_localrib(base_url, peer3, "192.168.2.0",retry=0, interval=w)
+ path = self.get_paths_in_localrib(peer3, "192.168.2.0",retry=0)
# print path
self.assertIsNone(path)
# check show ip bgp on peer1(quagga3)
- qpath = util.get_routing_table(peer3,"192.168.2.0", retry=3, interval=w)
+ qpath = self.get_routing_table(peer3,"192.168.2.0", retry=3)
# print qpath
self.assertIsNone(qpath)
@@ -125,38 +101,37 @@ class GoBGPTest(unittest.TestCase):
# policy_pattern:p1 attaches a policy to reject route 192.168.0.0/16 (16...24)
# coming from peer2(10.0.0.2) to peer3(10.0.0.3)'s export-policy.
self.initialize(policy_pattern="p2")
- self.check_established(util.get_neighbor_address(self.gobgp_config))
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
peer1 = "10.0.0.1"
peer2 = "10.0.0.2"
peer3 = "10.0.0.3"
- base_url = self.rest_url_neighbor
- w = self.wait_per_retry
- paths = util.get_paths_in_localrib(base_url, peer1, "192.168.2.0", retry=3, interval=w)
+ paths = self.get_paths_in_localrib(peer1, "192.168.2.0", retry=3)
# print paths
self.assertIsNotNone(paths)
# check show ip bgp on peer1(quagga1)
- qpath = util.get_routing_table(peer1, "192.168.2.0", retry=3, interval=w)
+ qpath = self.get_routing_table(peer1, "192.168.2.0", retry=3)
# print qpath
self.assertIsNotNone(qpath)
# check adj-rib-out in peer2
- path = util.get_adj_rib_in(base_url, peer2, "192.168.2.0/24", retry=1, interval=w)
+ path = self.get_adj_rib_in(peer2, "192.168.2.0/24", retry=1)
# print path
self.assertIsNotNone(path)
- path = util.get_paths_in_localrib(base_url, peer3, "192.168.2.0")
+ path = self.get_paths_in_localrib(peer3, "192.168.2.0")
# print path
self.assertIsNotNone(path)
- path = util.get_adj_rib_out(base_url, peer3, "192.168.2.0", retry=1, interval=w)
+ path = self.get_adj_rib_out(peer3, "192.168.2.0", retry=1)
# print path
self.assertIsNone(path)
# check show ip bgp on peer1(quagga3)
- qpath = util.get_routing_table(peer3,"192.168.2.0", retry=3, interval=w)
+ qpath = self.get_routing_table(peer3,"192.168.2.0", retry=3)
# print qpath
self.assertIsNone(qpath)
@@ -195,8 +170,6 @@ class GoBGPTest(unittest.TestCase):
peer1 = "10.0.0.1"
peer2 = "10.0.0.2"
peer3 = "10.0.0.3"
- base_url = self.rest_url_neighbor
- w = self.wait_per_retry
# add other network
tn = qaccess.login(peer2)
@@ -206,20 +179,21 @@ class GoBGPTest(unittest.TestCase):
qaccess.add_network(tn, 65002, "192.168.200.0/24")
qaccess.logout(tn)
- self.check_established(util.get_neighbor_address(self.gobgp_config))
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
time.sleep(self.initial_wait_time)
def path_exists_in_localrib(peer, prefix,r=10):
- paths = util.get_paths_in_localrib(base_url, peer, prefix, retry=r, interval=w)
+ paths = self.get_paths_in_localrib(peer, prefix, retry=r)
return paths is not None
def path_exists_in_routing_table(peer, prefix,r=10):
- qpath = util.get_routing_table(peer, prefix, retry=r, interval=w)
+ qpath = self.get_routing_table(peer, prefix, retry=r)
return qpath is not None
def path_exists_in_adj_rib_in(peer, prefix,r=10):
- path = util.get_adj_rib_in(base_url, peer, prefix, retry=r, interval=w)
+ path = self.get_adj_rib_in(peer, prefix, retry=r)
return path is not None
@@ -253,7 +227,7 @@ class GoBGPTest(unittest.TestCase):
# soft reset
print "soft_reset"
- self.soft_reset(peer2, "ipv4")
+ self.soft_reset(peer2, IPv4)
# check local-rib
self.assertTrue(path_exists_in_localrib(peer3,"192.168.2.0"))
@@ -301,8 +275,6 @@ class GoBGPTest(unittest.TestCase):
peer1 = "10.0.0.1"
peer2 = "10.0.0.2"
peer3 = "10.0.0.3"
- base_url = self.rest_url_neighbor
- w = self.wait_per_retry
# add other network
tn = qaccess.login(peer2)
@@ -312,25 +284,25 @@ class GoBGPTest(unittest.TestCase):
qaccess.add_network(tn, 65002, "192.168.200.0/24")
qaccess.logout(tn)
- self.check_established(self.get_neighbor_address(self.gobgp_config))
+ addresses = self.get_neighbor_address(self.gobgp_config)
+ self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
time.sleep(self.initial_wait_time)
-
def path_exists_in_localrib(peer, prefix,r=10):
- paths = util.get_paths_in_localrib(base_url, peer, prefix, retry=r, interval=w)
+ paths = self.get_paths_in_localrib(peer, prefix, retry=r)
return paths is not None
def path_exists_in_routing_table(peer, prefix,r=10):
- qpath = util.get_routing_table(peer, prefix, retry=r, interval=w)
+ qpath = self.get_routing_table(peer, prefix, retry=r)
return qpath is not None
def path_exists_in_adj_rib_in(peer, prefix,r=10):
- path = util.get_adj_rib_in(base_url, peer, prefix, retry=r, interval=w)
+ path = self.get_adj_rib_in(peer, prefix, retry=r)
return path is not None
def path_exists_in_adj_rib_out(peer, prefix,r=10):
- path = util.get_adj_rib_out(base_url, peer, prefix, retry=r, interval=w)
+ path = self.get_adj_rib_out(peer, prefix, retry=r)
return path is not None
@@ -371,7 +343,7 @@ class GoBGPTest(unittest.TestCase):
# soft reset
print "soft_reset"
- self.soft_reset(peer2, "ipv4")
+ self.soft_reset(peer2, IPv4)
# check local-rib
self.assertTrue(path_exists_in_localrib(peer3,"192.168.2.0"))
@@ -389,49 +361,6 @@ class GoBGPTest(unittest.TestCase):
self.assertTrue(path_exists_in_routing_table(peer3, "192.168.200.0"))
- def retry_until(self, neighbor_address, target_state="BGP_FSM_ESTABLISHED", retry=3):
- retry_count = 0
-
- while True:
-
- current_state = util.get_neighbor_state(self.rest_url_neighbor, neighbor_address)
- if current_state == target_state:
- print "state changed to %s : %s" % (current_state, neighbor_address)
- return True
- else:
- retry_count += 1
- if retry_count > retry:
- break
- else:
- print "current state is %s" % current_state
- print "please wait more (" + str(self.wait_per_retry) + " second)"
- time.sleep(self.wait_per_retry)
-
- print "exceeded retry count : %s" % neighbor_address
- return False
-
-
- def soft_reset(self, neighbor_address, route_family, type="in"):
- url = self.rest_url_neighbor + neighbor_address + "/softreset"+type+"/" + route_family
- r = requests.post(url)
- if r.status_code == requests.codes.ok:
- print "Succeed"
- else:
- print "Failed"
-
-
- def check_load_config(self):
- self.gobgp_config = util.load_gobgp_config(self.gobgp_config_file)
- self.quagga_configs = util.load_quagga_config(self.base_dir)
- if self.gobgp_config is None:
- print "Failed to read the gobgp configuration file"
- return False
- if len(self.quagga_configs) == 0:
- print "Failed to read the quagga configuration file"
- return False
- return True
-
-
if __name__ == '__main__':
if fab.test_user_check() is False:
print "you are not root."