summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2014-07-30 16:03:55 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-07-30 17:41:39 +0900
commit3ead62d37f14e2490668680327f39e88bc69e94c (patch)
tree54f029d2232f70348ad7bd4917c40da7b9ca9230
parent851b486df467ad7f1f76ee0c99cc73e01eb7d853 (diff)
bgp: add bmp client function
now ryu bgp can send internal information through BGP monitoring protocol Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--ryu/services/protocols/bgp/api/rtconf.py16
-rw-r--r--ryu/services/protocols/bgp/bgpspeaker.py14
-rw-r--r--ryu/services/protocols/bgp/bmp.py204
-rw-r--r--ryu/services/protocols/bgp/core.py24
-rw-r--r--ryu/services/protocols/bgp/peer.py2
-rw-r--r--ryu/services/protocols/bgp/signals/emit.py8
6 files changed, 268 insertions, 0 deletions
diff --git a/ryu/services/protocols/bgp/api/rtconf.py b/ryu/services/protocols/bgp/api/rtconf.py
index ccc4deae..c11918c7 100644
--- a/ryu/services/protocols/bgp/api/rtconf.py
+++ b/ryu/services/protocols/bgp/api/rtconf.py
@@ -223,3 +223,19 @@ def del_network(prefix):
tm = CORE_MANAGER.get_core_service().table_manager
tm.add_to_global_table(prefix, is_withdraw=True)
return True
+
+# =============================================================================
+# BMP configuration related APIs
+# =============================================================================
+
+
+@register(name='bmp.start')
+def bmp_start(host, port):
+ core = CORE_MANAGER.get_core_service()
+ return core.start_bmp(host, port)
+
+
+@register(name='bmp.stop')
+def bmp_stop(host, port):
+ core = CORE_MANAGER.get_core_service()
+ return core.stop_bmp(host, port)
diff --git a/ryu/services/protocols/bgp/bgpspeaker.py b/ryu/services/protocols/bgp/bgpspeaker.py
index 7625295a..cc6d1224 100644
--- a/ryu/services/protocols/bgp/bgpspeaker.py
+++ b/ryu/services/protocols/bgp/bgpspeaker.py
@@ -456,3 +456,17 @@ class BGPSpeaker(object):
param[neighbors.IP_ADDRESS] = address
in_filter = call(func_name, **param)
return in_filter
+
+ def bmp_start(self, host, port):
+ func_name = 'bmp.start'
+ param = {}
+ param['host'] = host
+ param['port'] = port
+ call(func_name, **param)
+
+ def bmp_stop(self, host, port):
+ func_name = 'bmp.stop'
+ param = {}
+ param['host'] = host
+ param['port'] = port
+ call(func_name, **param)
diff --git a/ryu/services/protocols/bgp/bmp.py b/ryu/services/protocols/bgp/bmp.py
new file mode 100644
index 00000000..92849ee2
--- /dev/null
+++ b/ryu/services/protocols/bgp/bmp.py
@@ -0,0 +1,204 @@
+# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ryu.services.protocols.bgp.base import Activity
+from ryu.lib import hub
+from ryu.lib.packet import bmp
+from ryu.lib.packet import bgp
+from ryu.services.protocols.bgp import constants as const
+import socket
+import logging
+from calendar import timegm
+from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
+
+LOG = logging.getLogger('bgpspeaker.bmp')
+
+
+class BMPClient(Activity):
+ """A BMP client.
+
+ Try to establish BMP session between a configured BMP server.
+ If BMP session is established, transfer information about peers
+ (e.g. received and sent open msgs, contents of adj-rib-in, other stats)
+
+ """
+
+ def __init__(self, core_service, host, port):
+ super(BMPClient, self).__init__(name='BMPClient(%s:%s)' % (host, port))
+ self._core_service = core_service
+ self._core_service.signal_bus.register_listener(
+ BgpSignalBus.BGP_ADJ_RIB_IN_CHANGED,
+ lambda _, data: self.on_adj_rib_in_changed(data)
+ )
+ self._core_service.signal_bus.register_listener(
+ BgpSignalBus.BGP_ADJ_UP,
+ lambda _, data: self.on_adj_up(data)
+ )
+ self._core_service.signal_bus.register_listener(
+ BgpSignalBus.BGP_ADJ_DOWN,
+ lambda _, data: self.on_adj_down(data)
+ )
+ self._socket = None
+ self.server_address = (host, port)
+ self._connect_retry_event = hub.Event()
+ self._connect_retry_time = 5
+
+ def _run(self):
+ self._connect_retry_event.set()
+
+ while True:
+ self._connect_retry_event.wait()
+
+ try:
+ self._connect_retry_event.clear()
+ self._connect_tcp(self.server_address,
+ self._handle_bmp_session)
+ except socket.error:
+ self._connect_retry_event.set()
+ LOG.info('Will try to reconnect to %s after %s secs: %s' %
+ (self.server_address, self._connect_retry_time,
+ self._connect_retry_event.is_set()))
+
+ self.pause(self._connect_retry_time)
+
+ def _send(self, msg):
+ if not self._socket:
+ return
+ assert isinstance(msg, bmp.BMPMessage)
+ serialized_msg = msg.serialize()
+
+ ret = self._socket.send(msg.serialize())
+
+ def on_adj_rib_in_changed(self, data):
+ peer = data['peer']
+ path = data['received_route']
+ update_msg = peer._construct_update(path)
+ msg = self._construct_route_monitoring(peer, path)
+ self._send(msg)
+
+ def on_adj_up(self, data):
+ peer = data['peer']
+ msg = self._construct_peer_up_notification(peer)
+ self._send(msg)
+
+ for path in peer._adj_rib_in.itervalues():
+ update_msg = peer._construct_update(path)
+ msg = self._construct_route_monitoring(peer, path)
+ self._send(msg)
+
+ def on_adj_down(self, data):
+ peer = data['peer']
+ msg = self._construct_peer_down_notification(peer)
+ self._send(msg)
+
+ def _construct_peer_up_notification(self, peer):
+ if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \
+ peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN):
+ peer_type = bmp.BMP_PEER_TYPE_L3VPN
+ else:
+ peer_type = bmp.BMP_PEER_TYPE_GLOBAL
+
+ peer_distinguisher = 0
+ peer_as = peer._neigh_conf.remote_as
+ peer_bgp_id = self._core_service.router_id
+ timestamp = peer.state._established_time
+
+ local_address = peer.host_bind_ip
+ local_port = int(peer.host_bind_port)
+ peer_address, remote_port = peer.protocol._remotename
+ remote_port = int(remote_port)
+
+ sent_open_msg = peer.protocol.sent_open_msg
+ recv_open_msg = peer.protocol.recv_open_msg
+
+ msg = bmp.BMPPeerUpNotification(local_address=local_address,
+ local_port=local_port,
+ remote_port=remote_port,
+ sent_open_message=sent_open_msg,
+ received_open_message=recv_open_msg,
+ peer_type=peer_type,
+ is_post_policy=False,
+ peer_distinguisher=peer_distinguisher,
+ peer_address=peer_address,
+ peer_as=peer_as,
+ peer_bgp_id=peer_bgp_id,
+ timestamp=timestamp)
+
+ return msg
+
+ def _construct_peer_down_notification(self, peer):
+ return bmp.BMPPeerDownNotification(bmp.BMP_PEER_DOWN_REASON_UNKNOWN,
+ data=None)
+
+ def _construct_route_monitoring(self, peer, path):
+ if peer.is_mpbgp_cap_valid(bgp.RF_IPv4_VPN) or \
+ peer.is_mpbgp_cap_valid(bgp.RF_IPv6_VPN):
+ peer_type = bmp.BMP_PEER_TYPE_L3VPN
+ else:
+ peer_type = bmp.BMP_PEER_TYPE_GLOBAL
+
+ peer_distinguisher = 0
+ peer_as = peer._neigh_conf.remote_as
+ peer_bgp_id = self._core_service.router_id
+ peer_address, _ = peer.protocol._remotename
+
+ bgp_update = peer._construct_update(path)
+ is_post_policy = not path.filtered
+ timestamp = timegm(path.timestamp)
+
+ msg = bmp.BMPRouteMonitoring(bgp_update=bgp_update,
+ peer_type=peer_type,
+ is_post_policy=is_post_policy,
+ peer_distinguisher=peer_distinguisher,
+ peer_address=peer_address,
+ peer_as=peer_as, peer_bgp_id=peer_bgp_id,
+ timestamp=timestamp)
+
+ return msg
+
+ def _handle_bmp_session(self, socket):
+
+ self._socket = socket
+ # send init message
+ init_info = {'type': bmp.BMP_INIT_TYPE_STRING,
+ 'value': u'This is Ryu BGP BMP message'}
+ init_msg = bmp.BMPInitiation([init_info])
+ self._send(init_msg)
+
+ # send peer-up message for each peers
+ peer_manager = self._core_service.peer_manager
+
+ for peer in (p for p in peer_manager.iterpeers if p.in_established()):
+ msg = self._construct_peer_up_notification(peer)
+ self._send(msg)
+
+ for path in peer._adj_rib_in.itervalues():
+ update_msg = peer._construct_update(path)
+ msg = self._construct_route_monitoring(peer, path)
+ self._send(msg)
+
+ # TODO periodically send stats to bmpstation
+
+ while True:
+ # bmpstation shouldn't send any packet to bmpclient.
+ # this recv() is only meant to detect socket closed
+ ret = self._socket.recv(1)
+ if len(ret) == 0:
+ LOG.debug('BMP socket is closed. retry connecting..')
+ self._socket = None
+ self._connect_retry_event.set()
+ break
+
+ # silently ignore packets from the bmpstation
diff --git a/ryu/services/protocols/bgp/core.py b/ryu/services/protocols/bgp/core.py
index f16f78e5..8552a803 100644
--- a/ryu/services/protocols/bgp/core.py
+++ b/ryu/services/protocols/bgp/core.py
@@ -41,6 +41,7 @@ from ryu.services.protocols.bgp.signals.emit import BgpSignalBus
from ryu.services.protocols.bgp.speaker import BgpProtocol
from ryu.services.protocols.bgp.utils.rtfilter import RouteTargetManager
from ryu.services.protocols.bgp.utils import stats
+from ryu.services.protocols.bgp.bmp import BMPClient
from ryu.lib import sockopt
@@ -125,6 +126,9 @@ class CoreService(Factory, Activity):
# BgpProcessor instance (initialized during start)
self._bgp_processor = None
+ # BMP clients key: (host, port) value: BMPClient instance
+ self.bmpclients = {}
+
def _init_signal_listeners(self):
self._signal_bus.register_listener(
BgpSignalBus.BGP_DEST_CHANGED,
@@ -451,3 +455,23 @@ class CoreService(Factory, Activity):
peer._host_bind_ip = bind_ip
peer._host_bind_port = bind_port
self._spawn_activity(bgp_proto, peer)
+
+ def start_bmp(self, host, port):
+ if (host, port) in self.bmpclients:
+ bmpclient = self.bmpclients[(host, port)]
+ if bmpclient.started:
+ LOG.warn("bmpclient is already running for %s:%s" % (host,
+ port))
+ return False
+ bmpclient = BMPClient(self, host, port)
+ self.bmpclients[(host, port)] = bmpclient
+ self._spawn_activity(bmpclient)
+ return True
+
+ def stop_bmp(self, host, port):
+ if (host, port) not in self.bmpclients:
+ LOG.warn("no bmpclient is running for %s:%s" % (host, port))
+ return False
+
+ bmpclient = self.bmpclients[(host, port)]
+ bmpclient.stop()
diff --git a/ryu/services/protocols/bgp/peer.py b/ryu/services/protocols/bgp/peer.py
index b2b42175..a0a2314f 100644
--- a/ryu/services/protocols/bgp/peer.py
+++ b/ryu/services/protocols/bgp/peer.py
@@ -195,12 +195,14 @@ class PeerState(object):
if new_state == const.BGP_FSM_ESTABLISHED:
self.incr(PeerCounterNames.FSM_ESTB_TRANSITIONS)
self._established_time = time.time()
+ self._signal_bus.adj_up(self.peer)
NET_CONTROLLER.send_rpc_notification(
'neighbor.up', {'ip_address': self.peer.ip_address}
)
# transition from Established to another state
elif old_state == const.BGP_FSM_ESTABLISHED:
self._established_time = 0
+ self._signal_bus.adj_down(self.peer)
NET_CONTROLLER.send_rpc_notification(
'neighbor.down', {'ip_address': self.peer.ip_address}
)
diff --git a/ryu/services/protocols/bgp/signals/emit.py b/ryu/services/protocols/bgp/signals/emit.py
index f015fdb8..d97ce1c8 100644
--- a/ryu/services/protocols/bgp/signals/emit.py
+++ b/ryu/services/protocols/bgp/signals/emit.py
@@ -14,6 +14,8 @@ class BgpSignalBus(SignalBus):
BGP_BEST_PATH_CHANGED = ('core', 'best', 'changed')
BGP_ADJ_RIB_IN_CHANGED = ('core', 'adj', 'rib', 'in', 'changed')
BGP_ADJ_RIB_OUT_CHANGED = ('core', 'adj', 'rib', 'out', 'changed')
+ BGP_ADJ_UP = ('core', 'adj', 'up')
+ BGP_ADJ_DOWN = ('core', 'adj', 'down')
def bgp_error(self, peer, code, subcode, reason):
return self.emit_signal(
@@ -71,3 +73,9 @@ class BgpSignalBus(SignalBus):
return self.emit_signal(
self.BGP_ADJ_RIB_OUT_CHANGED,
{'peer': peer, 'sent_route': sent_route})
+
+ def adj_up(self, peer):
+ return self.emit_signal(self.BGP_ADJ_UP, {'peer': peer})
+
+ def adj_down(self, peer):
+ return self.emit_signal(self.BGP_ADJ_DOWN, {'peer': peer})