diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2014-07-30 16:03:55 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-07-30 17:41:39 +0900 |
commit | 3ead62d37f14e2490668680327f39e88bc69e94c (patch) | |
tree | 54f029d2232f70348ad7bd4917c40da7b9ca9230 | |
parent | 851b486df467ad7f1f76ee0c99cc73e01eb7d853 (diff) |
bgp: add bmp client function
now ryu bgp can send internal information through BGP monitoring protocol
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | ryu/services/protocols/bgp/api/rtconf.py | 16 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/bgpspeaker.py | 14 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/bmp.py | 204 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/core.py | 24 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/peer.py | 2 | ||||
-rw-r--r-- | ryu/services/protocols/bgp/signals/emit.py | 8 |
6 files changed, 268 insertions, 0 deletions
diff --git a/ryu/services/protocols/bgp/api/rtconf.py b/ryu/services/protocols/bgp/api/rtconf.py index ccc4deae..c11918c7 100644 --- a/ryu/services/protocols/bgp/api/rtconf.py +++ b/ryu/services/protocols/bgp/api/rtconf.py @@ -223,3 +223,19 @@ def del_network(prefix): tm = CORE_MANAGER.get_core_service().table_manager tm.add_to_global_table(prefix, is_withdraw=True) return True + +# ============================================================================= +# BMP configuration related APIs +# ============================================================================= + + +@register(name='bmp.start') +def bmp_start(host, port): + core = CORE_MANAGER.get_core_service() + return core.start_bmp(host, port) + + +@register(name='bmp.stop') +def bmp_stop(host, port): + core = CORE_MANAGER.get_core_service() + return core.stop_bmp(host, port) diff --git a/ryu/services/protocols/bgp/bgpspeaker.py b/ryu/services/protocols/bgp/bgpspeaker.py index 7625295a..cc6d1224 100644 --- a/ryu/services/protocols/bgp/bgpspeaker.py +++ b/ryu/services/protocols/bgp/bgpspeaker.py @@ -456,3 +456,17 @@ class BGPSpeaker(object): param[neighbors.IP_ADDRESS] = address in_filter = call(func_name, **param) return in_filter + + def bmp_start(self, host, port): + func_name = 'bmp.start' + param = {} + param['host'] = host + param['port'] = port + call(func_name, **param) + + def bmp_stop(self, host, port): + func_name = 'bmp.stop' + param = {} + param['host'] = host + param['port'] = port + call(func_name, **param) diff --git a/ryu/services/protocols/bgp/bmp.py b/ryu/services/protocols/bgp/bmp.py new file mode 100644 index 00000000..92849ee2 --- /dev/null +++ b/ryu/services/protocols/bgp/bmp.py @@ -0,0 +1,204 @@ +# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ryu.services.protocols.bgp.base import Activity +from ryu.lib import hub +from ryu.lib.packet import bmp +from ryu.lib.packet import bgp +from ryu.services.protocols.bgp import constants as const +import socket +import logging +from calendar import timegm +from ryu.services.protocols.bgp.signals.emit import BgpSignalBus + +LOG = logging.getLogger('bgpspeaker.bmp') + + +class BMPClient(Activity): + """A BMP client. + + Try to establish BMP session between a configured BMP server. + If BMP session is established, transfer information about peers + (e.g. received and sent open msgs, contents of adj-rib-in, other stats) + + """ + + def __init__(self, core_service, host, port): + super(BMPClient, self).__init__(name='BMPClient(%s:%s)' % (host, port)) + self._core_service = core_service + self._core_service.signal_bus.register_listener( + BgpSignalBus.BGP_ADJ_RIB_IN_CHANGED, + lambda _, data: self.on_adj_rib_in_changed(data) + ) + self._core_service.signal_bus.register_listener( + BgpSignalBus.BGP_ADJ_UP, + lambda _, data: self.on_adj_up(data) + ) + self._core_service.signal_bus.register_listener( + BgpSignalBus.BGP_ADJ_DOWN, + lambda _, data: self.on_adj_down(data) + ) + self._socket = None + self.server_address = (host, port) + self._connect_retry_event = hub.Event() + self._connect_retry_time = 5 + + def _run(self): + self._connect_retry_event.set() + + while True: + self._connect_retry_event.wait() + + try: + self._connect_retry_event.clear() + self._connect_tcp(self.server_address, + self._handle_bmp_session) + except socket.error: + self._connect_retry_event.set() + LOG.info('Will try to reconnect to %s after %s secs: %s' % + (self.server_address, self._connect_retry_time, + self._connect_retry_event.is_set())) + + self.pause(self._connect_retry_time) + + def _send(self, msg): + if not self._socket: + return + assert isinstance(msg, bmp.BMPMessage) + serialized_msg = msg.serialize() + + ret = self._socket.send(msg.serialize()) + + def on_adj_rib_in_changed(self, data): + peer = data['peer'] + path = data['received_route'] + update_msg = peer._construct_update(path) + msg = self._construct_route_monitoring(peer, path) + self._send(msg) + + def on_adj_up(self, data): + peer = data['peer'] + msg = self._construct_peer_up_notification(peer) + self._send(msg) + + for path in peer._adj_rib_in.itervalues(): + update_msg = peer._construct_update(path) + msg = self._construct_route_monitoring(peer, path) + self._send(msg) + + def on_adj_down(self, data): + peer = data['peer'] + msg = self._construct_peer_down_notification(peer) + self._send(msg) + + def _construct_peer_up_notification(self, peer): + if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \ + peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN): + peer_type = bmp.BMP_PEER_TYPE_L3VPN + else: + peer_type = bmp.BMP_PEER_TYPE_GLOBAL + + peer_distinguisher = 0 + peer_as = peer._neigh_conf.remote_as + peer_bgp_id = self._core_service.router_id + timestamp = peer.state._established_time + + local_address = peer.host_bind_ip + local_port = int(peer.host_bind_port) + peer_address, remote_port = peer.protocol._remotename + remote_port = int(remote_port) + + sent_open_msg = peer.protocol.sent_open_msg + recv_open_msg = peer.protocol.recv_open_msg + + msg = bmp.BMPPeerUpNotification(local_address=local_address, + local_port=local_port, + remote_port=remote_port, + sent_open_message=sent_open_msg, + received_open_message=recv_open_msg, + peer_type=peer_type, + is_post_policy=False, + peer_distinguisher=peer_distinguisher, + peer_address=peer_address, + peer_as=peer_as, + peer_bgp_id=peer_bgp_id, + timestamp=timestamp) + + return msg + + def _construct_peer_down_notification(self, peer): + return bmp.BMPPeerDownNotification(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, + data=None) + + def _construct_route_monitoring(self, peer, path): + if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \ + peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN): + peer_type = bmp.BMP_PEER_TYPE_L3VPN + else: + peer_type = bmp.BMP_PEER_TYPE_GLOBAL + + peer_distinguisher = 0 + peer_as = peer._neigh_conf.remote_as + peer_bgp_id = self._core_service.router_id + peer_address, _ = peer.protocol._remotename + + bgp_update = peer._construct_update(path) + is_post_policy = not path.filtered + timestamp = timegm(path.timestamp) + + msg = bmp.BMPRouteMonitoring(bgp_update=bgp_update, + peer_type=peer_type, + is_post_policy=is_post_policy, + peer_distinguisher=peer_distinguisher, + peer_address=peer_address, + peer_as=peer_as, peer_bgp_id=peer_bgp_id, + timestamp=timestamp) + + return msg + + def _handle_bmp_session(self, socket): + + self._socket = socket + # send init message + init_info = {'type': bmp.BMP_INIT_TYPE_STRING, + 'value': u'This is Ryu BGP BMP message'} + init_msg = bmp.BMPInitiation([init_info]) + self._send(init_msg) + + # send peer-up message for each peers + peer_manager = self._core_service.peer_manager + + for peer in (p for p in peer_manager.iterpeers if p.in_established()): + msg = self._construct_peer_up_notification(peer) + self._send(msg) + + for path in peer._adj_rib_in.itervalues(): + update_msg = peer._construct_update(path) + msg = self._construct_route_monitoring(peer, path) + self._send(msg) + + # TODO periodically send stats to bmpstation + + while True: + # bmpstation shouldn't send any packet to bmpclient. + # this recv() is only meant to detect socket closed + ret = self._socket.recv(1) + if len(ret) == 0: + LOG.debug('BMP socket is closed. retry connecting..') + self._socket = None + self._connect_retry_event.set() + break + + # silently ignore packets from the bmpstation diff --git a/ryu/services/protocols/bgp/core.py b/ryu/services/protocols/bgp/core.py index f16f78e5..8552a803 100644 --- a/ryu/services/protocols/bgp/core.py +++ b/ryu/services/protocols/bgp/core.py @@ -41,6 +41,7 @@ from ryu.services.protocols.bgp.signals.emit import BgpSignalBus from ryu.services.protocols.bgp.speaker import BgpProtocol from ryu.services.protocols.bgp.utils.rtfilter import RouteTargetManager from ryu.services.protocols.bgp.utils import stats +from ryu.services.protocols.bgp.bmp import BMPClient from ryu.lib import sockopt @@ -125,6 +126,9 @@ class CoreService(Factory, Activity): # BgpProcessor instance (initialized during start) self._bgp_processor = None + # BMP clients key: (host, port) value: BMPClient instance + self.bmpclients = {} + def _init_signal_listeners(self): self._signal_bus.register_listener( BgpSignalBus.BGP_DEST_CHANGED, @@ -451,3 +455,23 @@ class CoreService(Factory, Activity): peer._host_bind_ip = bind_ip peer._host_bind_port = bind_port self._spawn_activity(bgp_proto, peer) + + def start_bmp(self, host, port): + if (host, port) in self.bmpclients: + bmpclient = self.bmpclients[(host, port)] + if bmpclient.started: + LOG.warn("bmpclient is already running for %s:%s" % (host, + port)) + return False + bmpclient = BMPClient(self, host, port) + self.bmpclients[(host, port)] = bmpclient + self._spawn_activity(bmpclient) + return True + + def stop_bmp(self, host, port): + if (host, port) not in self.bmpclients: + LOG.warn("no bmpclient is running for %s:%s" % (host, port)) + return False + + bmpclient = self.bmpclients[(host, port)] + bmpclient.stop() diff --git a/ryu/services/protocols/bgp/peer.py b/ryu/services/protocols/bgp/peer.py index b2b42175..a0a2314f 100644 --- a/ryu/services/protocols/bgp/peer.py +++ b/ryu/services/protocols/bgp/peer.py @@ -195,12 +195,14 @@ class PeerState(object): if new_state == const.BGP_FSM_ESTABLISHED: self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS) self._established_time = time.time() + self._signal_bus.adj_up(self.peer) NET_CONTROLLER.send_rpc_notification( 'neighbor.up', {'ip_address': self.peer.ip_address} ) # transition from Established to another state elif old_state == const.BGP_FSM_ESTABLISHED: self._established_time = 0 + self._signal_bus.adj_down(self.peer) NET_CONTROLLER.send_rpc_notification( 'neighbor.down', {'ip_address': self.peer.ip_address} ) diff --git a/ryu/services/protocols/bgp/signals/emit.py b/ryu/services/protocols/bgp/signals/emit.py index f015fdb8..d97ce1c8 100644 --- a/ryu/services/protocols/bgp/signals/emit.py +++ b/ryu/services/protocols/bgp/signals/emit.py @@ -14,6 +14,8 @@ class BgpSignalBus(SignalBus): BGP_BEST_PATH_CHANGED = ('core', 'best', 'changed') BGP_ADJ_RIB_IN_CHANGED = ('core', 'adj', 'rib', 'in', 'changed') BGP_ADJ_RIB_OUT_CHANGED = ('core', 'adj', 'rib', 'out', 'changed') + BGP_ADJ_UP = ('core', 'adj', 'up') + BGP_ADJ_DOWN = ('core', 'adj', 'down') def bgp_error(self, peer, code, subcode, reason): return self.emit_signal( @@ -71,3 +73,9 @@ class BgpSignalBus(SignalBus): return self.emit_signal( self.BGP_ADJ_RIB_OUT_CHANGED, {'peer': peer, 'sent_route': sent_route}) + + def adj_up(self, peer): + return self.emit_signal(self.BGP_ADJ_UP, {'peer': peer}) + + def adj_down(self, peer): + return self.emit_signal(self.BGP_ADJ_DOWN, {'peer': peer}) |