diff options
Diffstat (limited to 'ryu/services/protocols/bgp/speaker.py')
-rw-r--r-- | ryu/services/protocols/bgp/speaker.py | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/ryu/services/protocols/bgp/speaker.py b/ryu/services/protocols/bgp/speaker.py new file mode 100644 index 00000000..320db03e --- /dev/null +++ b/ryu/services/protocols/bgp/speaker.py @@ -0,0 +1,596 @@ +# Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" + BGP protocol implementation. +""" +import logging +import socket +import struct +import traceback + +from ryu.services.protocols.bgp.base import Activity +from ryu.services.protocols.bgp.base import add_bgp_error_metadata +from ryu.services.protocols.bgp.base import BGPSException +from ryu.services.protocols.bgp.base import CORE_ERROR_CODE +from ryu.services.protocols.bgp.constants import BGP_FSM_CONNECT +from ryu.services.protocols.bgp.constants import BGP_FSM_OPEN_CONFIRM +from ryu.services.protocols.bgp.constants import BGP_FSM_OPEN_SENT +from ryu.services.protocols.bgp.constants import BGP_VERSION_NUM +from ryu.services.protocols.bgp.protocol import Protocol +from ryu.services.protocols.bgp.protocols.bgp.capabilities import \ + EnhancedRouteRefreshCap +from ryu.services.protocols.bgp.protocols.bgp.capabilities import \ + MultiprotocolExtentionCap +from ryu.services.protocols.bgp.protocols.bgp.capabilities import \ + RouteRefreshCap +import ryu.services.protocols.bgp.protocols.bgp.exceptions as exceptions +from ryu.services.protocols.bgp.protocols.bgp.exceptions import BgpExc +from ryu.services.protocols.bgp.protocols.bgp import messages +from ryu.services.protocols.bgp.protocols.bgp.messages import Keepalive +from ryu.services.protocols.bgp.protocols.bgp.messages import Notification +from ryu.services.protocols.bgp.protocols.bgp.messages import Open +from ryu.services.protocols.bgp.protocols.bgp.messages import RouteRefresh +from ryu.services.protocols.bgp.protocols.bgp.messages import Update +from ryu.services.protocols.bgp.protocols.bgp import nlri +from ryu.services.protocols.bgp.protocols.bgp.nlri import RF_RTC_UC +from ryu.services.protocols.bgp.utils.validation import is_valid_old_asn + + +LOG = logging.getLogger('bgpspeaker.speaker') + +# BGP min. and max. message lengths as per RFC. +BGP_MIN_MSG_LEN = 19 +BGP_MAX_MSG_LEN = 4096 + +# Keep-alive singleton. +_KEEP_ALIVE = Keepalive() + + +@add_bgp_error_metadata(code=CORE_ERROR_CODE, sub_code=2, + def_desc='Unknown error occurred related to Speaker.') +class BgpProtocolException(BGPSException): + """Base exception related to peer connection management. + """ + pass + + +def nofitication_factory(code, subcode): + """Returns a `Notification` message corresponding to given codes. + + Parameters: + - `code`: (int) BGP error code + - `subcode`: (int) BGP error sub-code + """ + reason = Notification.REASONS.get((code, subcode)) + if not reason: + raise ValueError('Invalid code/sub-code.') + + return Notification(code, subcode) + + +class BgpProtocol(Protocol, Activity): + """Protocol that handles BGP messages. + """ + MESSAGE_MARKER = ('\xff\xff\xff\xff\xff\xff\xff\xff' + '\xff\xff\xff\xff\xff\xff\xff\xff') + + def __init__(self, socket, signal_bus, is_reactive_conn=False): + # Validate input. + if socket is None: + raise ValueError('Invalid arguments passed.') + activity_name = ('BgpProtocol %s, %s, %s' % ( + is_reactive_conn, socket.getpeername(), socket.getsockname()) + ) + Activity.__init__(self, name=activity_name) + # Intialize instance variables. + self._peer = None + self._recv_buff = '' + self._socket = socket + self._signal_bus = signal_bus + self._holdtime = None + self._keepalive = None + self._expiry = None + # Add socket to Activity's socket container for managing it. + if is_reactive_conn: + self._asso_socket_map['passive_conn'] = self._socket + else: + self._asso_socket_map['active_conn'] = self._socket + self._open_msg = None + self.state = BGP_FSM_CONNECT + self._is_reactive = is_reactive_conn + self.sent_open_msg = None + self.recv_open_msg = None + self._is_bound = False + + def get_peername(self): + return self._socket.getpeername() + + def get_sockname(self): + return self._socket.getsockname() + + @property + def is_reactive(self): + return self._is_reactive + + @property + def holdtime(self): + return self._holdtime + + @property + def keepalive(self): + return self._keepalive + + def is_colliding(self, other_protocol): + if not isinstance(other_protocol, BgpProtocol): + raise ValueError('Currently only support comparing with ' + '`BgpProtocol`') + + # Compare protocol connection end point's addresses + if (self.get_peername()[0] == other_protocol.get_peername()[0] and + self.get_sockname()[0] == other_protocol.get_sockname()[0]): + return True + + return False + + def is_local_router_id_greater(self): + """Compares *True* if local router id is greater when compared to peer + bgp id. + + Should only be called after protocol has reached OpenConfirm state. + """ + from ryu.services.protocols.bgp.speaker.utils.bgp import from_inet_ptoi + + if not self.state == BGP_FSM_OPEN_CONFIRM: + raise BgpProtocolException(desc='Can access remote router id only' + ' after open message is received') + remote_id = self.recv_open_msg.bgpid + local_id = self.sent_open_msg.bgpid + return from_inet_ptoi(local_id) > from_inet_ptoi(remote_id) + + def is_enhanced_rr_cap_valid(self): + """Checks is enhanced route refresh capability is enabled/valid. + + Checks sent and received `Open` messages to see if this session with + peer is capable of enhanced route refresh capability. + """ + if not self.recv_open_msg: + raise ValueError('Did not yet receive peers open message.') + + err_cap_enabled = False + local_cap = self.sent_open_msg.caps + peer_cap = self.recv_open_msg.caps + # Both local and peer should advertise ERR capability for it to be + # enabled. + if (local_cap.get(EnhancedRouteRefreshCap.CODE) and + peer_cap.get(EnhancedRouteRefreshCap.CODE)): + err_cap_enabled = True + + return err_cap_enabled + + def _check_route_fmly_adv(self, open_msg, route_family): + match_found = False + + local_caps = open_msg.caps + mbgp_cap = local_caps.get(MultiprotocolExtentionCap.CODE) + # Check MP_BGP capability was advertised. + if mbgp_cap: + # Iterate over all advertised mp_bgp caps to find a match. + for peer_cap in mbgp_cap: + if (route_family.afi == peer_cap.route_family.afi and + route_family.safi == peer_cap.route_family.safi): + match_found = True + + return match_found + + def is_route_family_adv(self, route_family): + """Checks if `route_family` was advertised to peer as per MP_BGP cap. + + Returns: + - True: if given address family was advertised. + - False: if given address family was not advertised. + """ + return self._check_route_fmly_adv(self.sent_open_msg, route_family) + + def is_route_family_adv_recv(self, route_family): + """Checks if `route_family` was advertised by peer as per MP_BGP cap. + + Returns: + - True: if given address family was advertised. + - False: if given address family was not advertised. + """ + return self._check_route_fmly_adv(self.recv_open_msg, route_family) + + @property + def negotiated_afs(self): + local_caps = self.sent_open_msg.caps + remote_caps = self.recv_open_msg.caps + + local_mbgp_cap = local_caps.get(MultiprotocolExtentionCap.CODE) + remote_mbgp_cap = remote_caps.get(MultiprotocolExtentionCap.CODE) + # Check MP_BGP capabilities were advertised. + if local_mbgp_cap and remote_mbgp_cap: + local_families = { + (peer_cap.route_family.afi, peer_cap.route_family.safi) + for peer_cap in local_mbgp_cap + } + remote_families = { + (peer_cap.route_family.afi, peer_cap.route_family.safi) + for peer_cap in remote_mbgp_cap + } + afi_safi = local_families.intersection(remote_families) + else: + afi_safi = set() + + afs = [] + for afi, safi in afi_safi: + afs.append(nlri.get_rf(afi, safi)) + return afs + + def is_mbgp_cap_valid(self, route_family): + """Returns true if both sides of this protocol have advertise + capability for this address family. + """ + return (self.is_route_family_adv(route_family) and + self.is_route_family_adv_recv(route_family)) + + def _run(self, peer): + """Sends open message to peer and handles received messages. + + Parameters: + - `peer`: the peer to which this protocol instance is connected to. + """ + # We know the peer we are connected to, we send open message. + self._peer = peer + self.connection_made() + + # We wait for peer to send messages. + self._recv_loop() + + def data_received(self, next_bytes): + try: + self._data_received(next_bytes) + except BgpExc as exc: + LOG.error( + "BGPExc Exception while receiving data: " + "%s \n Traceback %s \n" + % (str(exc), traceback.format_exc()) + ) + if exc.SEND_ERROR: + self.send_notification(exc.CODE, exc.SUB_CODE) + else: + self._socket.close() + raise exc + + @staticmethod + def parse_msg_header(buff): + """Parses given `buff` into bgp message header format. + + Returns a tuple of marker, length, type of bgp message. + """ + return struct.unpack('!16sHB', buff) + + def _data_received(self, next_bytes): + """Maintains buffer of bytes received from peer and extracts bgp + message from this buffer if enough data is received. + + Validates bgp message marker, length, type and data and constructs + appropriate bgp message instance and calls handler. + + :Parameters: + - `next_bytes`: next set of bytes received from peer. + """ + # Append buffer with received bytes. + self._recv_buff += next_bytes + + while True: + # If current buffer size is less then minimum bgp message size, we + # return as we do not have a complete bgp message to work with. + if len(self._recv_buff) < BGP_MIN_MSG_LEN: + return + + # Parse message header into elements. + auth, length, ptype = BgpProtocol.parse_msg_header( + self._recv_buff[:BGP_MIN_MSG_LEN]) + + # Check if we have valid bgp message marker. + # We should get default marker since we are not supporting any + # authentication. + if (auth != BgpProtocol.MESSAGE_MARKER): + LOG.error('Invalid message marker received: %s' % auth) + raise exceptions.NotSync() + + # Check if we have valid bgp message length. + check = lambda: length < BGP_MIN_MSG_LEN\ + or length > BGP_MAX_MSG_LEN + + # RFC says: The minimum length of the OPEN message is 29 + # octets (including the message header). + check2 = lambda: ptype == Open.TYPE_CODE\ + and length < Open.MIN_LENGTH + + # RFC says: A KEEPALIVE message consists of only the + # message header and has a length of 19 octets. + check3 = lambda: ptype == Keepalive.TYPE_CODE\ + and length != BGP_MIN_MSG_LEN + + # RFC says: The minimum length of the UPDATE message is 23 + # octets. + check4 = lambda: ptype == Update.TYPE_CODE\ + and length < Update.MIN_LENGTH + + if check() or check2() or check3() or check4(): + raise exceptions.BadLen(ptype, length) + + # If we have partial message we wait for rest of the message. + if len(self._recv_buff) < length: + return + + # If we have full message, we get its payload/data. + payload = self._recv_buff[BGP_MIN_MSG_LEN:length] + + # Update buffer to not contain any part of the current message. + self._recv_buff = self._recv_buff[length:] + + # Try to decode payload into specified message type. + # If we have any error parsing the message, we send appropriate + # bgp notification message. + msg = messages.decode(ptype, payload, length) + + # If we have a valid bgp message we call message handler. + self._handle_msg(msg) + + def send_notification(self, code, subcode): + """Utility to send notification message. + + Closes the socket after sending the message. + :Parameters: + - `socket`: (socket) - socket over which to send notification + message. + - `code`: (int) - BGP Notification code + - `subcode`: (int) - BGP Notification sub-code + + RFC ref: http://tools.ietf.org/html/rfc4486 + http://www.iana.org/assignments/bgp-parameters/bgp-parameters.xhtml + """ + reason = Notification.REASONS.get((code, subcode)) + if not reason: + # Not checking for type of parameters to allow some flexibility + # via. duck-typing. + raise ValueError('Unsupported code/sub-code given.') + + notification = Notification(code, subcode, reason) + self._socket.sendall(notification.encode()) + self._signal_bus.bgp_error(self._peer, code, subcode, reason) + LOG.error( + 'Sent notification to %r>> %s' % + (self._socket.getpeername(), notification) + ) + self._socket.close() + + def send(self, msg): + if not self.started: + raise BgpProtocolException('Tried to send message to peer when ' + 'this protocol instance is not started' + ' or is no longer is started state.') + self._socket.sendall(msg.encode()) + if msg.MSG_NAME == Notification.MSG_NAME: + LOG.error('Sent notification to %s>> %s' % + (self.get_peername(), msg)) + + self._signal_bus.bgp_notification_sent(self._peer, msg) + + else: + LOG.debug('Sent msg. %s to %s>> %s' % + (msg.MSG_NAME, self.get_peername(), msg)) + + def stop(self): + Activity.stop(self) + + def _validate_open_msg(self, open_msg): + """Validates BGP OPEN message according from application context. + + Parsing modules takes care of validating OPEN message that need no + context. But here we validate it according to current application + settings. RTC or RR/ERR are MUST capability if peer does not support + either one of them we have to end session. + """ + assert open_msg.TYPE_CODE == Open.TYPE_CODE + # Validate remote ASN. + remote_asnum = open_msg.asnum + # Since 4byte AS is not yet supported, we validate AS as old style AS. + if (not is_valid_old_asn(remote_asnum) or + remote_asnum != self._peer.remote_as): + raise exceptions.BadPeerAs() + + # Validate bgp version number. + if open_msg.version != BGP_VERSION_NUM: + raise exceptions.UnsupportedVersion(BGP_VERSION_NUM) + + adv_caps = open_msg.caps + rr_cap_adv = adv_caps.get(RouteRefreshCap.CODE) + err_cap_adv = adv_caps.get(EnhancedRouteRefreshCap.CODE) + # If either RTC or RR/ERR are MUST capability if peer does not support + # either one of them we have to end session as we have to request peer + # to send prefixes for new VPNs that may be created automatically. + # TODO(PH): Check with experts if error is suitable in this case + if not (rr_cap_adv or err_cap_adv or + self._check_route_fmly_adv(open_msg, RF_RTC_UC)): + raise exceptions.UnsupportedOptParam() + + def _handle_msg(self, msg): + """When a BGP message is received, send it to peer. + + Open messages are validated here. Peer handler is called to handle each + message except for *Open* and *Notification* message. On receiving + *Notification* message we close connection with peer. + """ + LOG.debug('Received %s msg. from %s<< \n%s' % + (msg.MSG_NAME, str(self.get_peername()), msg)) + + # If we receive open message we try to bind to protocol + if (msg.MSG_NAME == Open.MSG_NAME): + if self.state == BGP_FSM_OPEN_SENT: + # Validate open message. + self._validate_open_msg(msg) + self.recv_open_msg = msg + self.state = BGP_FSM_OPEN_CONFIRM + self._peer.state.bgp_state = self.state + + # Try to bind this protocol to peer. + self._is_bound = self._peer.bind_protocol(self) + + # If this protocol failed to bind to peer. + if not self._is_bound: + # Failure to bind to peer indicates connection collision + # resolution choose different instance of protocol and this + # instance has to close. Before closing it sends + # appropriate notification msg. to peer. + raise exceptions.CollisionResolution() + + # If peer sends Hold Time as zero, then according to RFC we do + # not set Hold Time and Keep Alive timer. + if msg.holdtime == 0: + LOG.info('The Hold Time sent by the peer is zero, hence ' + 'not setting any Hold Time and Keep Alive' + ' timers.') + else: + # Start Keep Alive timer considering Hold Time preference + # of the peer. + self._start_timers(msg.holdtime) + self._send_keepalive() + + # Peer does not see open message. + return + else: + # If we receive a Open message out of order + LOG.error('Open message received when current state is not ' + 'OpenSent') + # Received out-of-order open message + # We raise Finite state machine error + raise exceptions.FiniteStateMachineError() + elif msg.MSG_NAME == Notification.MSG_NAME: + if self._peer: + self._signal_bus.bgp_notification_received(self._peer, msg) + # If we receive notification message + LOG.error('Received notification message, hence closing ' + 'connection %s' % msg) + self._socket.close() + return + + # If we receive keepalive or update message, we reset expire timer. + if (msg.MSG_NAME == Keepalive.MSG_NAME or + msg.MSG_NAME == Update.MSG_NAME): + if self._expiry: + self._expiry.reset() + + # Call peer message handler for appropriate messages. + if (msg.MSG_NAME in + (Keepalive.MSG_NAME, Update.MSG_NAME, RouteRefresh.MSG_NAME)): + self._peer.handle_msg(msg) + # We give chance to other threads to run. + self.pause(0) + + def _start_timers(self, peer_holdtime): + """Starts keepalive and expire timers. + + Hold time is set to min. of peer and configured/default hold time. + Starts keep alive timer and expire timer based on this value. + """ + neg_timer = min(self._holdtime, peer_holdtime) + if neg_timer < self._holdtime: + LOG.info('Negotiated hold time (%s) is lower then ' + 'configured/default (%s).' % (neg_timer, self._holdtime)) + # We use negotiated timer value. + self._holdtime = neg_timer + self._keepalive = self._create_timer('Keepalive Timer', + self._send_keepalive) + interval = self._holdtime / 3 + self._keepalive.start(interval, now=False) + # Setup the expire timer. + self._expiry = self._create_timer('Holdtime Timer', self._expired) + self._expiry.start(self._holdtime, now=False) + LOG.debug('Started keep-alive and expire timer for negotiated hold' + 'time %s' % self._holdtime) + + def _expired(self): + """Hold timer expired event handler. + """ + LOG.info('Negotiated hold time %s expired.' % self._holdtime) + code = exceptions.HoldTimerExpired.CODE + subcode = exceptions.HoldTimerExpired.SUB_CODE + self.send_notification(code, subcode) + self.connection_lost('Negotiated hold time %s expired.' % + self._holdtime) + self.stop() + + def _send_keepalive(self): + self.send(_KEEP_ALIVE) + + def _recv_loop(self): + """Sits in tight loop collecting data received from peer and + processing it. + """ + required_len = BGP_MIN_MSG_LEN + conn_lost_reason = "Connection lost as protocol is no longer active" + try: + while True: + next_bytes = self._socket.recv(required_len) + if len(next_bytes) == 0: + conn_lost_reason = 'Peer closed connection' + break + self.data_received(next_bytes) + except socket.error as err: + conn_lost_reason = 'Connection to peer lost: %s.' % err + except BgpExc as ex: + conn_lost_reason = 'Connection to peer lost, reason: %s.' % ex + except Exception as e: + LOG.debug(traceback.format_exc()) + conn_lost_reason = str(e) + finally: + self.connection_lost(conn_lost_reason) + + def connection_made(self): + """Connection to peer handler. + + We send bgp open message to peer and intialize related attributes. + """ + assert self.state == BGP_FSM_CONNECT + # We have a connection with peer we send open message. + open_msg = self._peer.create_open_msg() + self._holdtime = open_msg.holdtime + self.state = BGP_FSM_OPEN_SENT + if not self.is_reactive: + self._peer.state.bgp_state = self.state + self.sent_open_msg = open_msg + self.send(open_msg) + self._peer.connection_made() + LOG.debug('Sent open message %s' % open_msg) + + def connection_lost(self, reason): + """Stops all timers and notifies peer that connection is lost. + """ + + if self._peer: + state = self._peer.state.bgp_state + if self._is_bound or state == BGP_FSM_OPEN_SENT: + self._peer.connection_lost(reason) + + self._peer = None + + if reason: + LOG.info(reason) + else: + LOG.info('Connection to peer closed for unknown reasons.') |