diff options
Diffstat (limited to 'test/scenario_test/route_server_malformed_test.py')
-rw-r--r-- | test/scenario_test/route_server_malformed_test.py | 56 |
1 files changed, 51 insertions, 5 deletions
diff --git a/test/scenario_test/route_server_malformed_test.py b/test/scenario_test/route_server_malformed_test.py index 4a2d90b6..05ed601d 100644 --- a/test/scenario_test/route_server_malformed_test.py +++ b/test/scenario_test/route_server_malformed_test.py @@ -20,11 +20,17 @@ import sys import nose import collections import docker_control as fab +import requests +import json +import toml from noseplugin import OptionParser from noseplugin import parser_option sleep_time = 20 +gobgp_ip = "10.0.255.1" +gobgp_port = "8080" +gobgp_config_file = "/usr/local/gobgp/gobgpd.conf" def check_pattern(): @@ -35,9 +41,12 @@ def check_pattern(): pattern["<File to be used in test>"] = "<at that time the message>" """ pattern = collections.OrderedDict() - pattern["malformed1-exabgp-gobgp-v4-MP_REACH_NLRI.conf"] = "UPDATE message error / Attribute Flags Error / 0x600F0411223344" + pattern["malformed1-exabgp-gobgp-v4-MP_REACH_NLRI.conf"] = "UPDATE message error / Attribute Flags Error / 0x600E0411223344" + pattern["malformed1-exabgp-gobgp-v4-MP_UNREACH_NLRI.conf"] = "UPDATE message error / Attribute Flags Error / 0x600F0411223344" pattern["malformed1-exabgp-gobgp-v4-AS_PATH.conf"] = "UPDATE message error / Attribute Flags Error / 0x60020411223344" pattern["malformed1-exabgp-gobgp-v4-AS4_PATH.conf"] = "UPDATE message error / Attribute Flags Error / 0x60110411223344" + pattern["malformed1-exabgp-gobgp-v4-NEXTHOP_INVALID.conf"] = "UPDATE message error / Attribute Flags Error / 0x600E08010110FFFFFF0000" + pattern["malformed1-exabgp-gobgp-v4-ROUTE_FAMILY_INVALID.conf"] = "UPDATE message error / Attribute Flags Error / 0x600E150002011020010DB800000000000000000000000100" return pattern @@ -59,7 +68,6 @@ def test_malformed_packet(): sys.exit(1) use_local = parser_option.use_local - go_path = parser_option.go_path for pkey in pattern: conf_file = pwd + "/exabgp_test_conf/" + pkey @@ -67,7 +75,7 @@ def test_malformed_packet(): fab.init_malformed_test_env_executor(pkey, use_local) print "please wait" time.sleep(sleep_time) - yield check_em, pkey, pattern[pkey] + yield check_func, pkey, pattern[pkey] else: print "config file not exists." @@ -75,9 +83,30 @@ def test_malformed_packet(): sys.exit(1) -def check_em(exabgp_conf, result): +def check_func(exabgp_conf, result): + # get neighbor addresses from gobgpd.conf + addresses = get_neighbor_address() + # check whether the service of gobgp is normally + url = "http://" + gobgp_ip + ":" + gobgp_port + "/v1/bgp/neighbors" + r = requests.get(url) + neighbors = json.loads(r.text) + + assert len(neighbors) == len(addresses) + + for neighbor in neighbors: + state = neighbor['info']['bgp_state'] + remote_ip = neighbor['conf']['remote_ip'] + e_transitions = neighbor['info']['fsm_established_transitions'] + if remote_ip == "10.0.0.1": + print "check of [ " + remote_ip + " ]" + assert state == "BGP_FSM_ESTABLISHED" + assert e_transitions == 1 + else: + print "check of [ " + remote_ip + " ]" + assert remote_ip == "10.0.0.100" + + # get notification message from exabgp log err_msg = fab.get_notification_from_exabgp_log() - # parse_msg = re.search(r'error.*', err_msg).group(0) notification = None parse_msg = re.search(r'error.*', err_msg) if parse_msg is not None: @@ -87,8 +116,25 @@ def check_em(exabgp_conf, result): print "notification message : " print " >>> " + str(notification) + # check notification messege assert notification == result + +# get address of each neighbor from gobpg configration +def get_neighbor_address(): + address = [] + try: + gobgp_config = toml.loads(open(gobgp_config_file).read()) + neighbors_config = gobgp_config['NeighborList'] + for neighbor_config in neighbors_config: + neighbor_ip = neighbor_config['NeighborAddress'] + address.append(neighbor_ip) + + except IOError, (errno, strerror): + print "I/O error(%s): %s" % (errno, strerror) + + return address + if __name__ == '__main__': if fab.test_user_check() is False: print "you are not root." |