# Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from fabric.api import local, lcd from fabric import colors from fabric.utils import indent from fabric.state import env, output from docker import Client import netaddr import os import time import itertools DEFAULT_TEST_PREFIX = '' DEFAULT_TEST_BASE_DIR = '/tmp/gobgp' TEST_PREFIX = DEFAULT_TEST_PREFIX TEST_BASE_DIR = DEFAULT_TEST_BASE_DIR BGP_FSM_IDLE = 'BGP_FSM_IDLE' BGP_FSM_ACTIVE = 'BGP_FSM_ACTIVE' BGP_FSM_ESTABLISHED = 'BGP_FSM_ESTABLISHED' BGP_ATTR_TYPE_ORIGIN = 1 BGP_ATTR_TYPE_AS_PATH = 2 BGP_ATTR_TYPE_NEXT_HOP = 3 BGP_ATTR_TYPE_MULTI_EXIT_DISC = 4 BGP_ATTR_TYPE_LOCAL_PREF = 5 BGP_ATTR_TYPE_COMMUNITIES = 8 BGP_ATTR_TYPE_ORIGINATOR_ID = 9 BGP_ATTR_TYPE_CLUSTER_LIST = 10 BGP_ATTR_TYPE_MP_REACH_NLRI = 14 BGP_ATTR_TYPE_EXTENDED_COMMUNITIES = 16 env.abort_exception = RuntimeError output.stderr = False def try_several_times(f, t=3, s=1): e = None for i in range(t): try: r = f() except RuntimeError as e: time.sleep(s) else: return r raise e def get_bridges(): return try_several_times(lambda : local("docker network ls | awk 'NR > 1{print $2}'", capture=True)).split('\n') def get_containers(): return try_several_times(lambda : local("docker ps -a | awk 'NR > 1 {print $NF}'", capture=True)).split('\n') class CmdBuffer(list): def __init__(self, delim='\n'): super(CmdBuffer, self).__init__() self.delim = delim def __lshift__(self, value): self.append(value) def __str__(self): return self.delim.join(self) def make_gobgp_ctn(tag='gobgp', local_gobgp_path='', from_image='osrg/quagga'): if local_gobgp_path == '': local_gobgp_path = os.getcwd() c = CmdBuffer() c << 'FROM {0}'.format(from_image) c << 'ADD gobgp /go/src/github.com/osrg/gobgp/' c << 'RUN go get github.com/osrg/gobgp/gobgpd' c << 'RUN go install github.com/osrg/gobgp/gobgpd' c << 'RUN go get github.com/osrg/gobgp/gobgp' c << 'RUN go install github.com/osrg/gobgp/gobgp' rindex = local_gobgp_path.rindex('gobgp') if rindex < 0: raise Exception('{0} seems not gobgp dir'.format(local_gobgp_path)) workdir = local_gobgp_path[:rindex] with lcd(workdir): local('echo \'{0}\' > Dockerfile'.format(str(c))) local('docker build -t {0} .'.format(tag)) local('rm Dockerfile') class Bridge(object): def __init__(self, name, subnet='', with_ip=True, self_ip=False): self.name = name if TEST_PREFIX != '': self.name = '{0}_{1}'.format(TEST_PREFIX, name) self.with_ip = with_ip if with_ip: self.subnet = netaddr.IPNetwork(subnet) def f(): for host in self.subnet: yield host self._ip_generator = f() # throw away first network address self.next_ip_address() def f(): if self.name in get_bridges(): self.delete() v6 = '' if self.subnet.version == 6: v6 = '--ipv6' self.id = local('docker network create --driver bridge {0} --subnet {1} {2}'.format(v6, subnet, self.name), capture=True) try_several_times(f) self.self_ip = self_ip if self_ip: self.ip_addr = self.next_ip_address() try_several_times(lambda :local("ip addr add {0} dev {1}".format(self.ip_addr, self.name))) self.ctns = [] def next_ip_address(self): return "{0}/{1}".format(self._ip_generator.next(), self.subnet.prefixlen) def addif(self, ctn): name = ctn.next_if_name() self.ctns.append(ctn) local("docker network connect {0} {1}".format(self.name, ctn.docker_name())) i = [x for x in Client(timeout=60, version='auto').inspect_network(self.id)['Containers'].values() if x['Name'] == ctn.docker_name()][0] if self.subnet.version == 4: addr = i['IPv4Address'] else: addr = i['IPv6Address'] ctn.ip_addrs.append(('eth1', addr, self.name)) def delete(self): try_several_times(lambda : local("docker network rm {0}".format(self.name))) class Container(object): def __init__(self, name, image): self.name = name self.image = image self.shared_volumes = [] self.ip_addrs = [] self.ip6_addrs = [] self.is_running = False self.eths = [] self.tcpdump_running = False if self.docker_name() in get_containers(): self.remove() def docker_name(self): if TEST_PREFIX == DEFAULT_TEST_PREFIX: return self.name return '{0}_{1}'.format(TEST_PREFIX, self.name) def next_if_name(self): name = 'eth{0}'.format(len(self.eths)+1) self.eths.append(name) return name def run(self): c = CmdBuffer(' ') c << "docker run --privileged=true" for sv in self.shared_volumes: c << "-v {0}:{1}".format(sv[0], sv[1]) c << "--name {0} -id {1}".format(self.docker_name(), self.image) self.id = try_several_times(lambda : local(str(c), capture=True)) self.is_running = True self.local("ip li set up dev lo") for line in self.local("ip a show dev eth0", capture=True).split('\n'): if line.strip().startswith("inet "): elems = [e.strip() for e in line.strip().split(' ')] self.ip_addrs.append(('eth0', elems[1], 'docker0')) elif line.strip().startswith("inet6 "): elems = [e.strip() for e in line.strip().split(' ')] self.ip6_addrs.append(('eth0', elems[1], 'docker0')) return 0 def stop(self): ret = try_several_times(lambda : local("docker stop -t 0 " + self.docker_name(), capture=True)) self.is_running = False return ret def remove(self): ret = try_several_times(lambda : local("docker rm -f " + self.docker_name(), capture=True)) self.is_running = False return ret def pipework(self, bridge, ip_addr, intf_name=""): if not self.is_running: print colors.yellow('call run() before pipeworking') return c = CmdBuffer(' ') c << "pipework {0}".format(bridge.name) if intf_name != "": c << "-i {0}".format(intf_name) else: intf_name = "eth1" c << "{0} {1}".format(self.docker_name(), ip_addr) self.ip_addrs.append((intf_name, ip_addr, bridge.name)) try_several_times(lambda :local(str(c))) def local(self, cmd, capture=False, stream=False, detach=False): if stream: dckr = Client(timeout=120, version='auto') i = dckr.exec_create(container=self.docker_name(), cmd=cmd) return dckr.exec_start(i['Id'], tty=True, stream=stream, detach=detach) else: flag = '-d' if detach else '' return local('docker exec {0} {1} {2}'.format(flag, self.docker_name(), cmd), capture) def get_pid(self): if self.is_running: cmd = "docker inspect -f '{{.State.Pid}}' " + self.docker_name() return int(local(cmd, capture=True)) return -1 def start_tcpdump(self, interface=None, filename=None, expr='tcp port 179'): if self.tcpdump_running: raise Exception('tcpdump already running') self.tcpdump_running = True if not interface: interface = "eth0" if not filename: filename = '{0}.dump'.format(interface) self.local("tcpdump -U -i {0} -w {1}/{2} {3}".format(interface, self.shared_volumes[0][1], filename, expr), detach=True) return '{0}/{1}'.format(self.shared_volumes[0][0], filename) def stop_tcpdump(self): self.local("pkill tcpdump") self.tcpdump_running = False class BGPContainer(Container): WAIT_FOR_BOOT = 1 RETRY_INTERVAL = 5 def __init__(self, name, asn, router_id, ctn_image_name): self.config_dir = '/'.join((TEST_BASE_DIR, TEST_PREFIX, name)) local('if [ -e {0} ]; then rm -r {0}; fi'.format(self.config_dir)) local('mkdir -p {0}'.format(self.config_dir)) local('chmod 777 {0}'.format(self.config_dir)) self.asn = asn self.router_id = router_id self.peers = {} self.routes = {} self.policies = {} super(BGPContainer, self).__init__(name, ctn_image_name) def __repr__(self): return str({'name':self.name, 'asn':self.asn, 'router_id':self.router_id}) def run(self): self.create_config() super(BGPContainer, self).run() return self.WAIT_FOR_BOOT def add_peer(self, peer, passwd=None, vpn=False, is_rs_client=False, policies=None, passive=False, is_rr_client=False, cluster_id=None, flowspec=False, bridge='', reload_config=True, as2=False, graceful_restart=None, local_as=None, prefix_limit=None, v6=False, llgr=None): neigh_addr = '' local_addr = '' it = itertools.product(self.ip_addrs, peer.ip_addrs) if v6: it = itertools.product(self.ip6_addrs, peer.ip6_addrs) for me, you in it: if bridge != '' and bridge != me[2]: continue if me[2] == you[2]: neigh_addr = you[1] local_addr = me[1] if v6: addr, mask = local_addr.split('/') local_addr = "{0}%{1}/{2}".format(addr, me[0], mask) break if neigh_addr == '': raise Exception('peer {0} seems not ip reachable'.format(peer)) if not policies: policies = {} self.peers[peer] = {'neigh_addr': neigh_addr, 'passwd': passwd, 'vpn': vpn, 'flowspec': flowspec, 'is_rs_client': is_rs_client, 'is_rr_client': is_rr_client, 'cluster_id': cluster_id, 'policies': policies, 'passive': passive, 'local_addr': local_addr, 'as2': as2, 'graceful_restart': graceful_restart, 'local_as': local_as, 'prefix_limit': prefix_limit, 'llgr': llgr} if self.is_running and reload_config: self.create_config() self.reload_config() def del_peer(self, peer, reload_config=True): del self.peers[peer] if self.is_running and reload_config: self.create_config() self.reload_config() def disable_peer(self, peer): raise Exception('implement disable_peer() method') def enable_peer(self, peer): raise Exception('implement enable_peer() method') def log(self): return local('cat {0}/*.log'.format(self.config_dir), capture=True) def add_route(self, route, rf='ipv4', attribute=None, aspath=None, community=None, med=None, extendedcommunity=None, nexthop=None, matchs=None, thens=None, local_pref=None, reload_config=True): self.routes[route] = {'prefix': route, 'rf': rf, 'attr': attribute, 'next-hop': nexthop, 'as-path': aspath, 'community': community, 'med': med, 'local-pref': local_pref, 'extended-community': extendedcommunity, 'matchs': matchs, 'thens' : thens} if self.is_running and reload_config: self.create_config() self.reload_config() def add_policy(self, policy, peer, typ, default='accept', reload_config=True): self.set_default_policy(peer, typ, default) self.define_policy(policy) self.assign_policy(peer, policy, typ) if self.is_running and reload_config: self.create_config() self.reload_config() def set_default_policy(self, peer, typ, default): if typ in ['in', 'out', 'import', 'export'] and default in ['reject', 'accept']: if 'default-policy' not in self.peers[peer]: self.peers[peer]['default-policy'] = {} self.peers[peer]['default-policy'][typ] = default else: raise Exception('wrong type or default') def define_policy(self, policy): self.policies[policy['name']] = policy def assign_policy(self, peer, policy, typ): if peer not in self.peers: raise Exception('peer {0} not found'.format(peer.name)) name = policy['name'] if name not in self.policies: raise Exception('policy {0} not found'.format(name)) self.peers[peer]['policies'][typ] = policy def get_local_rib(self, peer, rf): raise Exception('implement get_local_rib() method') def get_global_rib(self, rf): raise Exception('implement get_global_rib() method') def get_neighbor_state(self, peer_id): raise Exception('implement get_neighbor() method') def get_reachablily(self, prefix, timeout=20): version = netaddr.IPNetwork(prefix).version addr = prefix.split('/')[0] if version == 4: ping_cmd = 'ping' elif version == 6: ping_cmd = 'ping6' else: raise Exception('unsupported route family: {0}'.format(version)) cmd = '/bin/bash -c "/bin/{0} -c 1 -w 1 {1} | xargs echo"'.format(ping_cmd, addr) interval = 1 count = 0 while True: res = self.local(cmd, capture=True) print colors.yellow(res) if '1 packets received' in res and '0% packet loss': break time.sleep(interval) count += interval if count >= timeout: raise Exception('timeout') return True def wait_for(self, expected_state, peer, timeout=120): interval = 1 count = 0 while True: state = self.get_neighbor_state(peer) y = colors.yellow print y("{0}'s peer {1} state: {2}".format(self.router_id, peer.router_id, state)) if state == expected_state: return time.sleep(interval) count += interval if count >= timeout: raise Exception('timeout') def add_static_route(self, network, next_hop): cmd = '/sbin/ip route add {0} via {1}'.format(network, next_hop) self.local(cmd) def set_ipv6_forward(self): cmd = 'sysctl -w net.ipv6.conf.all.forwarding=1' self.local(cmd) def create_config(self): raise Exception('implement create_config() method') def reload_config(self): raise Exception('implement reload_config() method')