// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or // implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "fmt" "io" "math/rand" "net" "strconv" "sync" "time" "github.com/eapache/channels" "github.com/osrg/gobgp/internal/pkg/config" "github.com/osrg/gobgp/internal/pkg/table" "github.com/osrg/gobgp/pkg/packet/bgp" "github.com/osrg/gobgp/pkg/packet/bmp" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" ) type PeerDownReason int const ( PEER_DOWN_REASON_UNKNOWN PeerDownReason = iota PEER_DOWN_BY_LOCAL PEER_DOWN_BY_LOCAL_WITHOUT_NOTIFICATION PEER_DOWN_BY_REMOTE PEER_DOWN_BY_REMOTE_WITHOUT_NOTIFICATION PEER_DOWN_BY_BMP_CONFIGURATION ) type FsmStateReasonType uint8 const ( FSM_DYING FsmStateReasonType = iota FSM_ADMIN_DOWN FSM_READ_FAILED FSM_WRITE_FAILED FSM_NOTIFICATION_SENT FSM_NOTIFICATION_RECV FSM_HOLD_TIMER_EXPIRED FSM_IDLE_HOLD_TIMER_EXPIRED FSM_RESTART_TIMER_EXPIRED FSM_GRACEFUL_RESTART FSM_INVALID_MSG FSM_NEW_CONNECTION FSM_OPEN_MSG_RECEIVED FSM_OPEN_MSG_NEGOTIATED FSM_HARD_RESET ) type FsmStateReason struct { Type FsmStateReasonType PeerDownReason PeerDownReason BGPNotification *bgp.BGPMessage Data []byte } func NewFsmStateReason(typ FsmStateReasonType, notif *bgp.BGPMessage, data []byte) *FsmStateReason { var reasonCode PeerDownReason switch typ { case FSM_DYING, FSM_INVALID_MSG, FSM_NOTIFICATION_SENT, FSM_HOLD_TIMER_EXPIRED, FSM_IDLE_HOLD_TIMER_EXPIRED, FSM_RESTART_TIMER_EXPIRED: reasonCode = PEER_DOWN_BY_LOCAL case FSM_ADMIN_DOWN: reasonCode = PEER_DOWN_BY_LOCAL_WITHOUT_NOTIFICATION case FSM_NOTIFICATION_RECV, FSM_GRACEFUL_RESTART, FSM_HARD_RESET: reasonCode = PEER_DOWN_BY_REMOTE case FSM_READ_FAILED, FSM_WRITE_FAILED: reasonCode = PEER_DOWN_BY_REMOTE_WITHOUT_NOTIFICATION } return &FsmStateReason{ Type: typ, PeerDownReason: reasonCode, BGPNotification: notif, Data: data, } } func (r FsmStateReason) String() string { switch r.Type { case FSM_DYING: return "dying" case FSM_ADMIN_DOWN: return "admin-down" case FSM_READ_FAILED: return "read-failed" case FSM_WRITE_FAILED: return "write-failed" case FSM_NOTIFICATION_SENT: body := r.BGPNotification.Body.(*bgp.BGPNotification) return fmt.Sprintf("notification-sent %s", bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String()) case FSM_NOTIFICATION_RECV: body := r.BGPNotification.Body.(*bgp.BGPNotification) return fmt.Sprintf("notification-received %s", bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String()) case FSM_HOLD_TIMER_EXPIRED: return "hold-timer-expired" case FSM_IDLE_HOLD_TIMER_EXPIRED: return "idle-hold-timer-expired" case FSM_RESTART_TIMER_EXPIRED: return "restart-timer-expired" case FSM_GRACEFUL_RESTART: return "graceful-restart" case FSM_INVALID_MSG: return "invalid-msg" case FSM_NEW_CONNECTION: return "new-connection" case FSM_OPEN_MSG_RECEIVED: return "open-msg-received" case FSM_OPEN_MSG_NEGOTIATED: return "open-msg-negotiated" case FSM_HARD_RESET: return "hard-reset" default: return "unknown" } } type FsmMsgType int const ( _ FsmMsgType = iota FSM_MSG_STATE_CHANGE FSM_MSG_BGP_MESSAGE FSM_MSG_ROUTE_REFRESH ) type FsmMsg struct { MsgType FsmMsgType MsgSrc string MsgData interface{} StateReason *FsmStateReason PathList []*table.Path timestamp time.Time payload []byte Version uint } type FsmOutgoingMsg struct { Paths []*table.Path Notification *bgp.BGPMessage StayIdle bool } const ( HOLDTIME_OPENSENT = 240 HOLDTIME_IDLE = 5 ) type AdminState int const ( ADMIN_STATE_UP AdminState = iota ADMIN_STATE_DOWN ADMIN_STATE_PFX_CT ) func (s AdminState) String() string { switch s { case ADMIN_STATE_UP: return "ADMIN_STATE_UP" case ADMIN_STATE_DOWN: return "ADMIN_STATE_DOWN" case ADMIN_STATE_PFX_CT: return "ADMIN_STATE_PFX_CT" default: return "Unknown" } } type AdminStateOperation struct { State AdminState Communication []byte } var fsmVersion uint type FSM struct { t tomb.Tomb gConf *config.Global pConf *config.Neighbor lock sync.RWMutex state bgp.FSMState reason *FsmStateReason conn net.Conn connCh chan net.Conn idleHoldTime float64 opensentHoldTime float64 adminState AdminState adminStateCh chan AdminStateOperation getActiveCh chan struct{} h *FSMHandler rfMap map[bgp.RouteFamily]bgp.BGPAddPathMode capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface recvOpen *bgp.BGPMessage peerInfo *table.PeerInfo policy *table.RoutingPolicy gracefulRestartTimer *time.Timer twoByteAsTrans bool version uint marshallingOptions *bgp.MarshallingOption } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { fsm.lock.Lock() defer fsm.lock.Unlock() state := &fsm.pConf.State.Messages timer := &fsm.pConf.Timers if isIn { state.Received.Total++ } else { state.Sent.Total++ } switch MessageType { case bgp.BGP_MSG_OPEN: if isIn { state.Received.Open++ } else { state.Sent.Open++ } case bgp.BGP_MSG_UPDATE: if isIn { state.Received.Update++ timer.State.UpdateRecvTime = time.Now().Unix() } else { state.Sent.Update++ } case bgp.BGP_MSG_NOTIFICATION: if isIn { state.Received.Notification++ } else { state.Sent.Notification++ } case bgp.BGP_MSG_KEEPALIVE: if isIn { state.Received.Keepalive++ } else { state.Sent.Keepalive++ } case bgp.BGP_MSG_ROUTE_REFRESH: if isIn { state.Received.Refresh++ } else { state.Sent.Refresh++ } default: if isIn { state.Received.Discarded++ } else { state.Sent.Discarded++ } } } func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) { fsm.lock.Lock() defer fsm.lock.Unlock() stats := &fsm.pConf.State.Messages.Received switch statType { // TODO // Support other stat types. case bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE: stats.WithdrawUpdate += uint32(increment) case bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX: stats.WithdrawPrefix += uint32(increment) } } func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingPolicy) *FSM { adminState := ADMIN_STATE_UP if pConf.Config.AdminDown { adminState = ADMIN_STATE_DOWN } pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)] pConf.Timers.State.Downtime = time.Now().Unix() fsmVersion++ fsm := &FSM{ gConf: gConf, pConf: pConf, state: bgp.BGP_FSM_IDLE, connCh: make(chan net.Conn, 1), opensentHoldTime: float64(HOLDTIME_OPENSENT), adminState: adminState, adminStateCh: make(chan AdminStateOperation, 1), getActiveCh: make(chan struct{}), rfMap: make(map[bgp.RouteFamily]bgp.BGPAddPathMode), capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface), peerInfo: table.NewPeerInfo(gConf, pConf), policy: policy, gracefulRestartTimer: time.NewTimer(time.Hour), version: fsmVersion, } fsm.gracefulRestartTimer.Stop() fsm.t.Go(fsm.connectLoop) return fsm } func (fsm *FSM) StateChange(nextState bgp.FSMState) { fsm.lock.Lock() defer fsm.lock.Unlock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "old": fsm.state.String(), "new": nextState.String(), "reason": fsm.reason, }).Debug("state changed") fsm.state = nextState switch nextState { case bgp.BGP_FSM_ESTABLISHED: fsm.pConf.Timers.State.Uptime = time.Now().Unix() fsm.pConf.State.EstablishedCount++ // reset the state set by the previous session fsm.twoByteAsTrans = false if _, y := fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]; !y { fsm.twoByteAsTrans = true break } y := func() bool { for _, c := range capabilitiesFromConfig(fsm.pConf) { switch c.(type) { case *bgp.CapFourOctetASNumber: return true } } return false }() if !y { fsm.twoByteAsTrans = true } case bgp.BGP_FSM_ACTIVE: if !fsm.pConf.Transport.Config.PassiveMode { fsm.getActiveCh <- struct{}{} } fallthrough default: fsm.pConf.Timers.State.Downtime = time.Now().Unix() } } func hostport(addr net.Addr) (string, uint16) { if addr != nil { host, port, err := net.SplitHostPort(addr.String()) if err != nil { return "", 0 } p, _ := strconv.ParseUint(port, 10, 16) return host, uint16(p) } return "", 0 } func (fsm *FSM) RemoteHostPort() (string, uint16) { return hostport(fsm.conn.RemoteAddr()) } func (fsm *FSM) LocalHostPort() (string, uint16) { return hostport(fsm.conn.LocalAddr()) } func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) { fsm.lock.RLock() established := fsm.h != nil && fsm.h.conn != nil fsm.lock.RUnlock() if established { m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) b, _ := m.Serialize() _, err := fsm.h.conn.Write(b) if err == nil { fsm.bgpMessageStateUpdate(m.Header.Type, false) fsm.h.sentNotification = m } fsm.h.conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "Data": e, }).Warn("sent notification") return m, nil } return nil, fmt.Errorf("can't send notification to %s since TCP connection is not established", fsm.pConf.State.NeighborAddress) } func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) (*bgp.BGPMessage, error) { e := bgp.NewMessageError(code, subType, data, msg) return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError)) } func (fsm *FSM) connectLoop() error { fsm.lock.RLock() tick := int(fsm.pConf.Timers.Config.ConnectRetry) fsm.lock.RUnlock() if tick < MIN_CONNECT_RETRY { tick = MIN_CONNECT_RETRY } r := rand.New(rand.NewSource(time.Now().UnixNano())) timer := time.NewTimer(time.Duration(tick) * time.Second) timer.Stop() connect := func() { fsm.lock.RLock() defer fsm.lock.RUnlock() addr := fsm.pConf.State.NeighborAddress port := int(bgp.BGP_PORT) if fsm.pConf.Transport.Config.RemotePort != 0 { port = int(fsm.pConf.Transport.Config.RemotePort) } laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(fsm.pConf.Transport.Config.LocalAddress, "0")) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Warnf("failed to resolve local address: %s", err) return } var conn net.Conn d := TCPDialer{ Dialer: net.Dialer{ LocalAddr: laddr, Timeout: time.Duration(MIN_CONNECT_RETRY-1) * time.Second, }, AuthPassword: fsm.pConf.Config.AuthPassword, } if fsm.pConf.TtlSecurity.Config.Enabled { d.Ttl = 255 d.TtlMin = fsm.pConf.TtlSecurity.Config.TtlMin } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { d.Ttl = 1 if fsm.pConf.EbgpMultihop.Config.Enabled { d.Ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl } } conn, err = d.DialTCP(addr, port) if err == nil { select { case fsm.connCh <- conn: return default: conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Warn("active conn is closed to avoid being blocked") } } else { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Debugf("failed to connect: %s", err) } if fsm.state == bgp.BGP_FSM_ACTIVE { timer.Reset(time.Duration(tick) * time.Second) } } for { select { case <-fsm.t.Dying(): fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, }).Debug("stop connect loop") fsm.lock.RUnlock() return nil case <-timer.C: fsm.lock.RLock() ready := fsm.state == bgp.BGP_FSM_ACTIVE fsm.lock.RUnlock() if ready { go connect() } case <-fsm.getActiveCh: timer.Reset(time.Duration(r.Intn(MIN_CONNECT_RETRY)+MIN_CONNECT_RETRY) * time.Second) } } } type FSMHandler struct { t tomb.Tomb fsm *FSM conn net.Conn msgCh *channels.InfiniteChannel stateReasonCh chan FsmStateReason incoming *channels.InfiniteChannel stateCh chan *FsmMsg outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification *bgp.BGPMessage } func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing *channels.InfiniteChannel) *FSMHandler { h := &FSMHandler{ fsm: fsm, stateReasonCh: make(chan FsmStateReason, 2), incoming: incoming, stateCh: stateCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), } fsm.t.Go(h.loop) return h } func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm fsm.lock.RLock() idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime)) fsm.lock.RUnlock() for { select { case <-h.t.Dying(): return -1, NewFsmStateReason(FSM_DYING, nil, nil) case <-fsm.gracefulRestartTimer.C: fsm.lock.RLock() restarting := fsm.pConf.GracefulRestart.State.PeerRestarting fsm.lock.RUnlock() if restarting { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") fsm.lock.RUnlock() case <-idleHoldTimer.C: fsm.lock.RLock() adminStateUp := fsm.adminState == ADMIN_STATE_UP fsm.lock.RUnlock() if adminStateUp { fsm.lock.Lock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "Duration": fsm.idleHoldTime, }).Debug("IdleHoldTimer expired") fsm.idleHoldTime = HOLDTIME_IDLE fsm.lock.Unlock() return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil) } else { log.WithFields(log.Fields{"Topic": "Peer"}).Debug("IdleHoldTimer expired, but stay at idle because the admin state is DOWN") } case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { switch stateOp.State { case ADMIN_STATE_DOWN: // stop idle hold timer idleHoldTimer.Stop() case ADMIN_STATE_UP: // restart idle hold timer fsm.lock.RLock() idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime)) fsm.lock.RUnlock() } } } } } func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm for { select { case <-h.t.Dying(): return -1, NewFsmStateReason(FSM_DYING, nil, nil) case conn, ok := <-fsm.connCh: if !ok { break } fsm.lock.Lock() fsm.conn = conn fsm.lock.Unlock() ttl := 0 ttlMin := 0 fsm.lock.RLock() if fsm.pConf.TtlSecurity.Config.Enabled { ttl = 255 ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin) } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { if fsm.pConf.EbgpMultihop.Config.Enabled { ttl = int(fsm.pConf.EbgpMultihop.Config.MultihopTtl) } else if fsm.pConf.Transport.Config.Ttl != 0 { ttl = int(fsm.pConf.Transport.Config.Ttl) } else { ttl = 1 } } else if fsm.pConf.Transport.Config.Ttl != 0 { ttl = int(fsm.pConf.Transport.Config.Ttl) } if ttl != 0 { if err := SetTcpTTLSockopt(conn.(*net.TCPConn), ttl); err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warnf("cannot set TTL(=%d) for peer: %s", ttl, err) } } if ttlMin != 0 { if err := SetTcpMinTTLSockopt(conn.(*net.TCPConn), ttlMin); err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err) } } fsm.lock.RUnlock() // we don't implement delayed open timer so move to opensent right // away. return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil) case <-fsm.gracefulRestartTimer.C: fsm.lock.RLock() restarting := fsm.pConf.GracefulRestart.State.PeerRestarting fsm.lock.RUnlock() if restarting { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case err := <-h.stateReasonCh: return bgp.BGP_FSM_IDLE, &err case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { switch stateOp.State { case ADMIN_STATE_DOWN: return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, nil, nil) case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "AdminState": stateOp.State.String(), }).Panic("code logic bug") } } } } } func capAddPathFromConfig(pConf *config.Neighbor) bgp.ParameterCapabilityInterface { tuples := make([]*bgp.CapAddPathTuple, 0, len(pConf.AfiSafis)) for _, af := range pConf.AfiSafis { var mode bgp.BGPAddPathMode if af.AddPaths.State.Receive { mode |= bgp.BGP_ADD_PATH_RECEIVE } if af.AddPaths.State.SendMax > 0 { mode |= bgp.BGP_ADD_PATH_SEND } if mode > 0 { tuples = append(tuples, bgp.NewCapAddPathTuple(af.State.Family, mode)) } } if len(tuples) == 0 { return nil } return bgp.NewCapAddPath(tuples) } func capabilitiesFromConfig(pConf *config.Neighbor) []bgp.ParameterCapabilityInterface { caps := make([]bgp.ParameterCapabilityInterface, 0, 4) caps = append(caps, bgp.NewCapRouteRefresh()) for _, af := range pConf.AfiSafis { caps = append(caps, bgp.NewCapMultiProtocol(af.State.Family)) } caps = append(caps, bgp.NewCapFourOctetASNumber(pConf.Config.LocalAs)) if c := pConf.GracefulRestart.Config; c.Enabled { tuples := []*bgp.CapGracefulRestartTuple{} ltuples := []*bgp.CapLongLivedGracefulRestartTuple{} // RFC 4724 4.1 // To re-establish the session with its peer, the Restarting Speaker // MUST set the "Restart State" bit in the Graceful Restart Capability // of the OPEN message. restarting := pConf.GracefulRestart.State.LocalRestarting if !c.HelperOnly { for i, rf := range pConf.AfiSafis { if m := rf.MpGracefulRestart.Config; m.Enabled { // When restarting, always flag forwaring bit. // This can be a lie, depending on how gobgpd is used. // For a route-server use-case, since a route-server // itself doesn't forward packets, and the dataplane // is a l2 switch which continues to work with no // relation to bgpd, this behavior is ok. // TODO consideration of other use-cases tuples = append(tuples, bgp.NewCapGracefulRestartTuple(rf.State.Family, restarting)) pConf.AfiSafis[i].MpGracefulRestart.State.Advertised = true } if m := rf.LongLivedGracefulRestart.Config; m.Enabled { ltuples = append(ltuples, bgp.NewCapLongLivedGracefulRestartTuple(rf.State.Family, restarting, m.RestartTime)) } } } restartTime := c.RestartTime notification := c.NotificationEnabled caps = append(caps, bgp.NewCapGracefulRestart(restarting, notification, restartTime, tuples)) if c.LongLivedEnabled { caps = append(caps, bgp.NewCapLongLivedGracefulRestart(ltuples)) } } // unnumbered BGP if pConf.Config.NeighborInterface != "" { tuples := []*bgp.CapExtendedNexthopTuple{} families, _ := config.AfiSafis(pConf.AfiSafis).ToRfList() for _, family := range families { if family == bgp.RF_IPv6_UC { continue } tuple := bgp.NewCapExtendedNexthopTuple(family, bgp.AFI_IP6) tuples = append(tuples, tuple) } caps = append(caps, bgp.NewCapExtendedNexthop(tuples)) } // ADD-PATH Capability if c := capAddPathFromConfig(pConf); c != nil { caps = append(caps, capAddPathFromConfig(pConf)) } return caps } func buildopen(gConf *config.Global, pConf *config.Neighbor) *bgp.BGPMessage { caps := capabilitiesFromConfig(pConf) opt := bgp.NewOptionParameterCapability(caps) holdTime := uint16(pConf.Timers.Config.HoldTime) as := pConf.Config.LocalAs if as > (1<<16)-1 { as = bgp.AS_TRANS } return bgp.NewBGPOpenMessage(uint16(as), holdTime, gConf.Config.RouterId, []bgp.OptionParameterInterface{opt}) } func readAll(conn net.Conn, length int) ([]byte, error) { buf := make([]byte, length) _, err := io.ReadFull(conn, buf) if err != nil { return nil, err } return buf, nil } func getPathAttrFromBGPUpdate(m *bgp.BGPUpdate, typ bgp.BGPAttrType) bgp.PathAttributeInterface { for _, a := range m.PathAttributes { if a.GetType() == typ { return a } } return nil } func hasOwnASLoop(ownAS uint32, limit int, asPath *bgp.PathAttributeAsPath) bool { cnt := 0 for _, param := range asPath.Value { for _, as := range param.GetAS() { if as == ownAS { cnt++ if cnt > limit { return true } } } } return false } func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily { attr := *p var afi uint16 var safi uint8 switch a := attr.(type) { case *bgp.PathAttributeMpReachNLRI: afi = a.AFI safi = a.SAFI case *bgp.PathAttributeMpUnreachNLRI: afi = a.AFI safi = a.SAFI default: return nil } rf := bgp.AfiSafiToRouteFamily(afi, safi) return &rf } func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string { h.fsm.lock.Lock() defer h.fsm.lock.Unlock() n := bgp.AddressFamilyNameMap[rf] for i, a := range h.fsm.pConf.AfiSafis { if string(a.Config.AfiSafiName) == n { h.fsm.pConf.AfiSafis[i].State.Enabled = false break } } newList := make([]bgp.ParameterCapabilityInterface, 0) for _, c := range h.fsm.capMap[bgp.BGP_CAP_MULTIPROTOCOL] { if c.(*bgp.CapMultiProtocol).CapValue == rf { continue } newList = append(newList, c) } h.fsm.capMap[bgp.BGP_CAP_MULTIPROTOCOL] = newList return n } func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError bool) bgp.ErrorHandling { handling := bgp.ERROR_HANDLING_NONE if m.Header.Type == bgp.BGP_MSG_UPDATE && useRevisedError { factor := e.(*bgp.MessageError) handling = factor.ErrorHandling switch handling { case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD: h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("Some attributes were discarded") h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW: m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate)) h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warn("the received Update message was treated as withdraw") h.fsm.lock.RUnlock() case bgp.ERROR_HANDLING_AFISAFI_DISABLE: rf := extractRouteFamily(factor.ErrorAttribute) if rf == nil { h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), }).Warn("Error occurred during AFI/SAFI disabling") h.fsm.lock.RUnlock() } else { n := h.afiSafiDisable(*rf) h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": e, }).Warnf("Capability %s was disabled", n) h.fsm.lock.RUnlock() } } } else { handling = bgp.ERROR_HANDLING_SESSION_RESET } return handling } func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { sendToStateReasonCh := func(typ FsmStateReasonType, notif *bgp.BGPMessage) { // probably doesn't happen but be cautious select { case h.stateReasonCh <- *NewFsmStateReason(typ, notif, nil): default: } } headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH) if err != nil { sendToStateReasonCh(FSM_READ_FAILED, nil) return nil, err } hd := &bgp.BGPHeader{} err = hd.DecodeFromBytes(headerBuf) if err != nil { h.fsm.bgpMessageStateUpdate(0, true) h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP Header") fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.State.NeighborAddress, MsgData: err, Version: h.fsm.version, } h.fsm.lock.RUnlock() return fmsg, err } bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH) if err != nil { sendToStateReasonCh(FSM_READ_FAILED, nil) return nil, err } now := time.Now() handling := bgp.ERROR_HANDLING_NONE h.fsm.lock.RLock() useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw options := h.fsm.marshallingOptions h.fsm.lock.RUnlock() m, err := bgp.ParseBGPBody(hd, bodyBuf, options) if err != nil { handling = h.handlingError(m, err, useRevisedError) h.fsm.bgpMessageStateUpdate(0, true) } else { h.fsm.bgpMessageStateUpdate(m.Header.Type, true) err = bgp.ValidateBGPMessage(m) } h.fsm.lock.RLock() fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.State.NeighborAddress, timestamp: now, Version: h.fsm.version, } h.fsm.lock.RUnlock() switch handling { case bgp.ERROR_HANDLING_AFISAFI_DISABLE: fmsg.MsgData = m return fmsg, nil case bgp.ERROR_HANDLING_SESSION_RESET: h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP message") h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err default: fmsg.MsgData = m h.fsm.lock.RLock() establishedState := h.fsm.state == bgp.BGP_FSM_ESTABLISHED h.fsm.lock.RUnlock() if establishedState { switch m.Header.Type { case bgp.BGP_MSG_ROUTE_REFRESH: fmsg.MsgType = FSM_MSG_ROUTE_REFRESH case bgp.BGP_MSG_UPDATE: body := m.Body.(*bgp.BGPUpdate) isEBGP := h.fsm.pConf.IsEBGPPeer(h.fsm.gConf) isConfed := h.fsm.pConf.IsConfederationMember(h.fsm.gConf) fmsg.payload = make([]byte, len(headerBuf)+len(bodyBuf)) copy(fmsg.payload, headerBuf) copy(fmsg.payload[len(headerBuf):], bodyBuf) h.fsm.lock.RLock() rfMap := h.fsm.rfMap h.fsm.lock.RUnlock() ok, err := bgp.ValidateUpdateMsg(body, rfMap, isEBGP, isConfed) if !ok { handling = h.handlingError(m, err, useRevisedError) } if handling == bgp.ERROR_HANDLING_SESSION_RESET { h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("Session will be reset due to malformed BGP update message") h.fsm.lock.RUnlock() fmsg.MsgData = err return fmsg, err } if routes := len(body.WithdrawnRoutes); routes > 0 { h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE, 1) h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX, routes) } else if attr := getPathAttrFromBGPUpdate(body, bgp.BGP_ATTR_TYPE_MP_UNREACH_NLRI); attr != nil { mpUnreach := attr.(*bgp.PathAttributeMpUnreachNLRI) if routes = len(mpUnreach.Value); routes > 0 { h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE, 1) h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX, routes) } } table.UpdatePathAttrs4ByteAs(body) if err = table.UpdatePathAggregator4ByteAs(body); err != nil { fmsg.MsgData = err return fmsg, err } h.fsm.lock.RLock() peerInfo := h.fsm.peerInfo h.fsm.lock.RUnlock() fmsg.PathList = table.ProcessMessage(m, peerInfo, fmsg.timestamp) fallthrough case bgp.BGP_MSG_KEEPALIVE: // if the length of h.holdTimerResetCh // isn't zero, the timer will be reset // soon anyway. select { case h.holdTimerResetCh <- true: default: } if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { return nil, nil } case bgp.BGP_MSG_NOTIFICATION: body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "Code": body.ErrorCode, "Subcode": body.ErrorSubcode, "Communicated-Reason": communication, "Data": rest, }).Warn("received notification") h.fsm.lock.RUnlock() } else { h.fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.State.NeighborAddress, "Code": body.ErrorCode, "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("received notification") h.fsm.lock.RUnlock() } h.fsm.lock.RLock() s := h.fsm.pConf.GracefulRestart.State hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET h.fsm.lock.RUnlock() if hardReset { sendToStateReasonCh(FSM_HARD_RESET, m) } else { sendToStateReasonCh(FSM_NOTIFICATION_RECV, m) } return nil, nil } } } return fmsg, nil } func (h *FSMHandler) recvMessage() error { defer h.msgCh.Close() fmsg, _ := h.recvMessageWithError() if fmsg != nil { h.msgCh.In() <- fmsg } return nil } func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface, map[bgp.RouteFamily]bgp.BGPAddPathMode) { capMap := make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface) for _, p := range open.OptParams { if paramCap, y := p.(*bgp.OptionParameterCapability); y { for _, c := range paramCap.Capability { m, ok := capMap[c.Code()] if !ok { m = make([]bgp.ParameterCapabilityInterface, 0, 1) } capMap[c.Code()] = append(m, c) } } } // squash add path cap if caps, y := capMap[bgp.BGP_CAP_ADD_PATH]; y { items := make([]*bgp.CapAddPathTuple, 0, len(caps)) for _, c := range caps { items = append(items, c.(*bgp.CapAddPath).Tuples...) } capMap[bgp.BGP_CAP_ADD_PATH] = []bgp.ParameterCapabilityInterface{bgp.NewCapAddPath(items)} } // remote open message may not include multi-protocol capability if _, y := capMap[bgp.BGP_CAP_MULTIPROTOCOL]; !y { capMap[bgp.BGP_CAP_MULTIPROTOCOL] = []bgp.ParameterCapabilityInterface{bgp.NewCapMultiProtocol(bgp.RF_IPv4_UC)} } local := n.CreateRfMap() remote := make(map[bgp.RouteFamily]bgp.BGPAddPathMode) for _, c := range capMap[bgp.BGP_CAP_MULTIPROTOCOL] { family := c.(*bgp.CapMultiProtocol).CapValue remote[family] = bgp.BGP_ADD_PATH_NONE for _, a := range capMap[bgp.BGP_CAP_ADD_PATH] { for _, i := range a.(*bgp.CapAddPath).Tuples { if i.RouteFamily == family { remote[family] = i.Mode } } } } negotiated := make(map[bgp.RouteFamily]bgp.BGPAddPathMode) for family, mode := range local { if m, y := remote[family]; y { n := bgp.BGP_ADD_PATH_NONE if mode&bgp.BGP_ADD_PATH_SEND > 0 && m&bgp.BGP_ADD_PATH_RECEIVE > 0 { n |= bgp.BGP_ADD_PATH_SEND } if mode&bgp.BGP_ADD_PATH_RECEIVE > 0 && m&bgp.BGP_ADD_PATH_SEND > 0 { n |= bgp.BGP_ADD_PATH_RECEIVE } negotiated[family] = n } } return capMap, negotiated } func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm fsm.lock.RLock() m := buildopen(fsm.gConf, fsm.pConf) fsm.lock.RUnlock() b, _ := m.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) h.msgCh = channels.NewInfiniteChannel() fsm.lock.RLock() h.conn = fsm.conn fsm.lock.RUnlock() h.t.Go(h.recvMessage) // RFC 4271 P.60 // sets its HoldTimer to a large value // A HoldTimer value of 4 minutes is suggested as a "large value" // for the HoldTimer fsm.lock.RLock() holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) fsm.lock.RUnlock() for { select { case <-h.t.Dying(): h.conn.Close() return -1, NewFsmStateReason(FSM_DYING, nil, nil) case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: fsm.lock.RLock() restarting := fsm.pConf.GracefulRestart.State.PeerRestarting fsm.lock.RUnlock() if restarting { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case i, ok := <-h.msgCh.Out(): if !ok { continue } e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_OPEN { fsm.lock.Lock() fsm.recvOpen = m fsm.lock.Unlock() body := m.Body.(*bgp.BGPOpen) fsm.lock.RLock() fsmPeerAS := fsm.pConf.Config.PeerAs fsm.lock.RUnlock() peerAs, err := bgp.ValidateOpenMsg(body, fsmPeerAS) if err != nil { m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) } // ASN negotiation was skipped fsm.lock.RLock() asnNegotiationSkipped := fsm.pConf.Config.PeerAs == 0 fsm.lock.RUnlock() if asnNegotiationSkipped { fsm.lock.Lock() typ := config.PEER_TYPE_EXTERNAL if fsm.peerInfo.LocalAS == peerAs { typ = config.PEER_TYPE_INTERNAL } fsm.pConf.State.PeerType = typ log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ) fsm.lock.Unlock() } else { fsm.lock.Lock() fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType fsm.lock.Unlock() } fsm.lock.Lock() fsm.pConf.State.PeerAs = peerAs fsm.peerInfo.AS = peerAs fsm.peerInfo.ID = body.ID fsm.capMap, fsm.rfMap = open2Cap(body, fsm.pConf) if _, y := fsm.capMap[bgp.BGP_CAP_ADD_PATH]; y { fsm.marshallingOptions = &bgp.MarshallingOption{ AddPath: fsm.rfMap, } } else { fsm.marshallingOptions = nil } // calculate HoldTime // RFC 4271 P.13 // a BGP speaker MUST calculate the value of the Hold Timer // by using the smaller of its configured Hold Time and the Hold Time // received in the OPEN message. holdTime := float64(body.HoldTime) myHoldTime := fsm.pConf.Timers.Config.HoldTime if holdTime > myHoldTime { fsm.pConf.Timers.State.NegotiatedHoldTime = myHoldTime } else { fsm.pConf.Timers.State.NegotiatedHoldTime = holdTime } keepalive := fsm.pConf.Timers.Config.KeepaliveInterval if n := fsm.pConf.Timers.State.NegotiatedHoldTime; n < myHoldTime { keepalive = n / 3 } fsm.pConf.Timers.State.KeepaliveInterval = keepalive gr, ok := fsm.capMap[bgp.BGP_CAP_GRACEFUL_RESTART] if fsm.pConf.GracefulRestart.Config.Enabled && ok { state := &fsm.pConf.GracefulRestart.State state.Enabled = true cap := gr[len(gr)-1].(*bgp.CapGracefulRestart) state.PeerRestartTime = uint16(cap.Time) for _, t := range cap.Tuples { n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] for i, a := range fsm.pConf.AfiSafis { if string(a.Config.AfiSafiName) == n { fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Enabled = true fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Received = true break } } } // RFC 4724 4.1 // To re-establish the session with its peer, the Restarting Speaker // MUST set the "Restart State" bit in the Graceful Restart Capability // of the OPEN message. if fsm.pConf.GracefulRestart.State.PeerRestarting && cap.Flags&0x08 == 0 { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("restart flag is not set") // send notification? h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) } // RFC 4724 3 // The most significant bit is defined as the Restart State (R) // bit, ...(snip)... When set (value 1), this bit // indicates that the BGP speaker has restarted, and its peer MUST // NOT wait for the End-of-RIB marker from the speaker before // advertising routing information to the speaker. if fsm.pConf.GracefulRestart.State.LocalRestarting && cap.Flags&0x08 != 0 { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Debug("peer has restarted, skipping wait for EOR") for i := range fsm.pConf.AfiSafis { fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true } } if fsm.pConf.GracefulRestart.Config.NotificationEnabled && cap.Flags&0x04 > 0 { fsm.pConf.GracefulRestart.State.NotificationEnabled = true } } llgr, ok2 := fsm.capMap[bgp.BGP_CAP_LONG_LIVED_GRACEFUL_RESTART] if fsm.pConf.GracefulRestart.Config.LongLivedEnabled && ok && ok2 { fsm.pConf.GracefulRestart.State.LongLivedEnabled = true cap := llgr[len(llgr)-1].(*bgp.CapLongLivedGracefulRestart) for _, t := range cap.Tuples { n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] for i, a := range fsm.pConf.AfiSafis { if string(a.Config.AfiSafiName) == n { fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Enabled = true fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Received = true fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.PeerRestartTime = t.RestartTime break } } } } fsm.lock.Unlock() msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(msg.Header.Type, false) return bgp.BGP_FSM_OPENCONFIRM, NewFsmStateReason(FSM_OPEN_MSG_RECEIVED, nil, nil) } else { // send notification? h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) } case *bgp.MessageError: m, _ := fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) default: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": e.MsgData, }).Panic("unknown msg type") } case err := <-h.stateReasonCh: h.conn.Close() return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") h.t.Kill(nil) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { switch stateOp.State { case ADMIN_STATE_DOWN: h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, m, nil) case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "AdminState": stateOp.State.String(), }).Panic("code logic bug") } } } } } func keepaliveTicker(fsm *FSM) *time.Ticker { fsm.lock.RLock() defer fsm.lock.RUnlock() negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime if negotiatedTime == 0 { return &time.Ticker{} } sec := time.Second * time.Duration(fsm.pConf.Timers.State.KeepaliveInterval) if sec == 0 { sec = time.Second } return time.NewTicker(sec) } func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) h.msgCh = channels.NewInfiniteChannel() fsm.lock.RLock() h.conn = fsm.conn h.t.Go(h.recvMessage) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { // RFC 4271 P.65 // sets the HoldTimer according to the negotiated value holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } fsm.lock.RUnlock() for { select { case <-h.t.Dying(): h.conn.Close() return -1, NewFsmStateReason(FSM_DYING, nil, nil) case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") fsm.lock.RUnlock() case <-fsm.gracefulRestartTimer.C: fsm.lock.RLock() restarting := fsm.pConf.GracefulRestart.State.PeerRestarting fsm.lock.RUnlock() if restarting { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") fsm.lock.RUnlock() h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) } case <-ticker.C: m := bgp.NewBGPKeepAliveMessage() b, _ := m.Serialize() // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) case i, ok := <-h.msgCh.Out(): if !ok { continue } e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { return bgp.BGP_FSM_ESTABLISHED, NewFsmStateReason(FSM_OPEN_MSG_NEGOTIATED, nil, nil) } // send notification ? h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) case *bgp.MessageError: m, _ := fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) default: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": e.MsgData, }).Panic("unknown msg type") } case err := <-h.stateReasonCh: h.conn.Close() return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") h.t.Kill(nil) return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { switch stateOp.State { case ADMIN_STATE_DOWN: h.conn.Close() return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, nil, nil) case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "AdminState": stateOp.State.String(), }).Panic("code logic bug") } } } } } func (h *FSMHandler) sendMessageloop() error { conn := h.conn fsm := h.fsm ticker := keepaliveTicker(fsm) send := func(m *bgp.BGPMessage) error { fsm.lock.RLock() if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": m, }).Debug("update for 2byte AS peer") table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate)) table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) } b, err := m.Serialize(h.fsm.marshallingOptions) fsm.lock.RUnlock() if err != nil { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to serialize") fsm.lock.RUnlock() fsm.bgpMessageStateUpdate(0, false) return nil } fsm.lock.RLock() err = conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))) fsm.lock.RUnlock() if err != nil { h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("failed to set write deadline") } _, err = conn.Write(b) if err != nil { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to send") fsm.lock.RUnlock() h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) conn.Close() return fmt.Errorf("closed") } fsm.bgpMessageStateUpdate(m.Header.Type, false) switch m.Header.Type { case bgp.BGP_MSG_NOTIFICATION: body := m.Body.(*bgp.BGPNotification) if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { communication, rest := decodeAdministrativeCommunication(body.Data) fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Code": body.ErrorCode, "Subcode": body.ErrorSubcode, "Communicated-Reason": communication, "Data": rest, }).Warn("sent notification") fsm.lock.RUnlock() } else { fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Code": body.ErrorCode, "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("sent notification") fsm.lock.RUnlock() } h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil) conn.Close() return fmt.Errorf("closed") case bgp.BGP_MSG_UPDATE: update := m.Body.(*bgp.BGPUpdate) fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "nlri": update.NLRI, "withdrawals": update.WithdrawnRoutes, "attributes": update.PathAttributes, }).Debug("sent update") fsm.lock.RUnlock() default: fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "data": m, }).Debug("sent") fsm.lock.RUnlock() } return nil } for { select { case <-h.t.Dying(): return nil case o := <-h.outgoing.Out(): m := o.(*FsmOutgoingMsg) h.fsm.lock.RLock() options := h.fsm.marshallingOptions h.fsm.lock.RUnlock() for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) { if err := send(msg); err != nil { return nil } } if m.Notification != nil { if m.StayIdle { // current user is only prefix-limit // fix me if this is not the case h.changeAdminState(ADMIN_STATE_PFX_CT) } if err := send(m.Notification); err != nil { return nil } } case <-ticker.C: if err := send(bgp.NewBGPKeepAliveMessage()); err != nil { return nil } } } } func (h *FSMHandler) recvMessageloop() error { for { fmsg, err := h.recvMessageWithError() if fmsg != nil { h.msgCh.In() <- fmsg } if err != nil { return nil } } } func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { fsm := h.fsm fsm.lock.Lock() h.conn = fsm.conn fsm.lock.Unlock() h.t.Go(h.sendMessageloop) h.msgCh = h.incoming h.t.Go(h.recvMessageloop) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { fsm.lock.RLock() holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) fsm.lock.RUnlock() } fsm.gracefulRestartTimer.Stop() for { select { case <-h.t.Dying(): return -1, NewFsmStateReason(FSM_DYING, nil, nil) case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") fsm.lock.RUnlock() case err := <-h.stateReasonCh: h.conn.Close() h.t.Kill(nil) fsm.lock.RLock() if s := fsm.pConf.GracefulRestart.State; s.Enabled && (s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV || err.Type == FSM_READ_FAILED || err.Type == FSM_WRITE_FAILED) { err = *NewFsmStateReason(FSM_GRACEFUL_RESTART, nil, nil) log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Info("peer graceful restart") fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second) } fsm.lock.RUnlock() return bgp.BGP_FSM_IDLE, &err case <-holdTimer.C: fsm.lock.RLock() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("hold timer expired") fsm.lock.RUnlock() m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) case <-h.holdTimerResetCh: fsm.lock.RLock() if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } fsm.lock.RUnlock() case stateOp := <-fsm.adminStateCh: err := h.changeAdminState(stateOp.State) if err == nil { switch stateOp.State { case ADMIN_STATE_DOWN: m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, stateOp.Communication) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} } } } } } func (h *FSMHandler) loop() error { fsm := h.fsm ch := make(chan bgp.FSMState) fsm.lock.RLock() oldState := fsm.state fsm.lock.RUnlock() var reason *FsmStateReason f := func() error { nextState := bgp.FSMState(-1) fsm.lock.RLock() fsmState := fsm.state fsm.lock.RUnlock() switch fsmState { case bgp.BGP_FSM_IDLE: nextState, reason = h.idle() // case bgp.BGP_FSM_CONNECT: // nextState = h.connect() case bgp.BGP_FSM_ACTIVE: nextState, reason = h.active() case bgp.BGP_FSM_OPENSENT: nextState, reason = h.opensent() case bgp.BGP_FSM_OPENCONFIRM: nextState, reason = h.openconfirm() case bgp.BGP_FSM_ESTABLISHED: nextState, reason = h.established() } fsm.reason = reason ch <- nextState return nil } h.t.Go(f) nextState := <-ch fsm.lock.RLock() if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Info("Peer Up") } if oldState == bgp.BGP_FSM_ESTABLISHED { // The main goroutine sent the notificaiton due to // deconfiguration or something. reason := fsm.reason if fsm.h.sentNotification != nil { reason.Type = FSM_NOTIFICATION_SENT reason.PeerDownReason = PEER_DOWN_BY_LOCAL reason.BGPNotification = fsm.h.sentNotification } log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "Reason": reason.String(), }).Info("Peer Down") } fsm.lock.RUnlock() e := time.AfterFunc(time.Second*120, func() { log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState) }) h.t.Wait() e.Stop() // under zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { fsm.lock.RLock() h.stateCh <- &FsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, MsgSrc: fsm.pConf.State.NeighborAddress, MsgData: nextState, StateReason: reason, Version: h.fsm.version, } fsm.lock.RUnlock() } return nil } func (h *FSMHandler) changeAdminState(s AdminState) error { h.fsm.lock.Lock() defer h.fsm.lock.Unlock() fsm := h.fsm if fsm.adminState != s { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), "AdminState": s.String(), }).Debug("admin state changed") fsm.adminState = s fsm.pConf.State.AdminDown = !fsm.pConf.State.AdminDown switch s { case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative start") case ADMIN_STATE_DOWN: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative shutdown") case ADMIN_STATE_PFX_CT: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative shutdown(Prefix limit reached)") } } else { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.State.NeighborAddress, "State": fsm.state.String(), }).Warn("cannot change to the same state") return fmt.Errorf("cannot change to the same state.") } return nil }