1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import sys
import nose
import quagga_access as qaccess
import docker_control as fab
from gobgp_test import GoBGPTestBase
from gobgp_test import ADJ_RIB_IN, ADJ_RIB_OUT, LOCAL_RIB, GLOBAL_RIB
from gobgp_test import NEIGHBOR
from noseplugin import OptionParser
from noseplugin import parser_option
class GoBGPIPv6Test(GoBGPTestBase):
quagga_num = 2
append_quagga = 10
remove_quagga = 10
append_quagga_best = 20
def __init__(self, *args, **kwargs):
super(GoBGPIPv6Test, self).__init__(*args, **kwargs)
# test each neighbor state is turned establish
def test_01_ipv4_ipv6_neighbor_established(self):
print "test_ipv4_ipv6_neighbor_established"
image = parser_option.gobgp_image
go_path = parser_option.go_path
log_debug = True if parser_option.gobgp_log_level == 'debug' else False
fab.init_ipv6_test_env_executor(self.quagga_num, image, go_path, log_debug)
print "please wait (" + str(self.initial_wait_time) + " second)"
time.sleep(self.initial_wait_time)
fab.docker_container_ipv6_quagga_append_executor([3, 4], go_path)
print "please wait (" + str(self.initial_wait_time) + " second)"
time.sleep(self.initial_wait_time)
if self.check_load_config() is False:
return
addresses = self.get_neighbor_address(self.gobgp_config)
self.retry_routine_for_state(addresses, "BGP_FSM_ESTABLISHED")
for address in addresses:
# get neighbor state and remote ip from gobgp connections
print "check of [ " + address + " ]"
neighbor = self.ask_gobgp(NEIGHBOR, address)
state = neighbor['info']['bgp_state']
remote_ip = neighbor['conf']['remote_ip']
self.assertEqual(address, remote_ip)
self.assertEqual(state, "BGP_FSM_ESTABLISHED")
print "state" + state
def test_02_ipv4_ipv6_received_route(self):
print "test_ipv4_ipv6_received_route"
if self.check_load_config() is False:
return
for address in self.get_neighbor_address(self.gobgp_config):
print "check of [ " + address + " ]"
af = fab.IPv6 if ":" in address else fab.IPv4
def check_func():
local_rib = self.ask_gobgp(LOCAL_RIB, address, af)
for quagga_config in self.quagga_configs:
if quagga_config.peer_ip == address or quagga_config.ip_version != af:
for c_dest in quagga_config.destinations.itervalues():
# print "config : ", c_dest.prefix, "my ip or different ip version!!!"
exist_n = 0
for g_dest in local_rib:
if c_dest.prefix == g_dest['prefix']:
exist_n += 1
if exist_n != 0:
return False
else:
for c_dest in quagga_config.destinations.itervalues():
exist_n = 0
for g_dest in local_rib:
if c_dest.prefix == g_dest['prefix']:
exist_n += 1
if exist_n != 1:
return False
return True
retry_count = 0
cmp_result = False
while retry_count < self.dest_check_limit:
cmp_result = check_func()
if cmp_result:
print "compare OK"
break
else:
retry_count += 1
print "compare NG -> retry ( %d / %d )" % (retry_count, self.dest_check_limit)
time.sleep(self.wait_per_retry)
self.assertEqual(cmp_result, True)
def test_03_advertising_route(self):
print "test_advertising_route"
if self.check_load_config() is False:
return
for address in self.get_neighbor_address(self.gobgp_config):
print "check of [ " + address + " ]"
af = fab.IPv6 if ":" in address else fab.IPv4
def check_func():
tn = qaccess.login(address)
q_rib = qaccess.show_rib(tn, af)
for quagga_config in self.quagga_configs:
if quagga_config.peer_ip == address or quagga_config.ip_version != af:
for c_dest in quagga_config.destinations.itervalues():
exist_n = 0
for c_path in c_dest.paths:
# print "conf : ", c_path.network, c_path.nexthop, "my ip or different ip version!!!"
for q_path in q_rib:
# print "quag : ", q_path['Network'], q_path['Next Hop']
if "0.0.0.0" == q_path['Next Hop'] or "::" == q_path['Next Hop']:
continue
if c_path.network.split("/")[0] == q_path['Network']:
exist_n += 1
if exist_n != 0:
return False
else:
for c_dest in quagga_config.destinations.itervalues():
exist_n = 0
for c_path in c_dest.paths:
# print "conf : ", c_path.network, c_path.nexthop
for q_path in q_rib:
# print "quag : ", q_path['Network'], q_path['Next Hop']
if quagga_config.ip_version != fab.IPv6:
c_path.network = c_path.network.split("/")[0]
if c_path.network == q_path['Network'] and c_path.nexthop == q_path['Next Hop']:
exist_n += 1
if exist_n != 1:
return False
return True
retry_count = 0
check_result = False
while retry_count < self.dest_check_limit:
check_result = check_func()
if check_result:
print "compare OK"
break
else:
retry_count += 1
print "compare NG -> retry ( %d / %d )" % (retry_count, self.dest_check_limit)
time.sleep(self.wait_per_retry)
self.assertEqual(check_result, True)
if __name__ == '__main__':
if fab.test_user_check() is False:
print "you are not root."
sys.exit(1)
if fab.docker_pkg_check() is False:
print "not install docker package."
sys.exit(1)
nose.main(argv=sys.argv, addplugins=[OptionParser()], defaultTest=sys.argv[0])
|