diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-07-07 13:48:38 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2018-07-07 20:44:25 +0900 |
commit | c4775c42510d1f1ddd55036dc19e982712fa6a0b (patch) | |
tree | 6ec8b61d4338c809e239e3003a2d32d480898e22 /pkg/server/fsm.go | |
parent | b3079759aa13172fcb548a83da9a9653d8d5fed4 (diff) |
follow Standard Go Project Layout
https://github.com/golang-standards/project-layout
Now you can see clearly what are private and public library code.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'pkg/server/fsm.go')
-rw-r--r-- | pkg/server/fsm.go | 1752 |
1 files changed, 1752 insertions, 0 deletions
diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go new file mode 100644 index 00000000..855066f4 --- /dev/null +++ b/pkg/server/fsm.go @@ -0,0 +1,1752 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "io" + "math/rand" + "net" + "strconv" + "time" + + "github.com/eapache/channels" + "github.com/osrg/gobgp/internal/pkg/config" + "github.com/osrg/gobgp/internal/pkg/table" + "github.com/osrg/gobgp/pkg/packet/bgp" + "github.com/osrg/gobgp/pkg/packet/bmp" + + log "github.com/sirupsen/logrus" + "gopkg.in/tomb.v2" +) + +type PeerDownReason int + +const ( + PEER_DOWN_REASON_UNKNOWN PeerDownReason = iota + PEER_DOWN_BY_LOCAL + PEER_DOWN_BY_LOCAL_WITHOUT_NOTIFICATION + PEER_DOWN_BY_REMOTE + PEER_DOWN_BY_REMOTE_WITHOUT_NOTIFICATION + PEER_DOWN_BY_BMP_CONFIGURATION +) + +type FsmStateReasonType uint8 + +const ( + FSM_DYING FsmStateReasonType = iota + FSM_ADMIN_DOWN + FSM_READ_FAILED + FSM_WRITE_FAILED + FSM_NOTIFICATION_SENT + FSM_NOTIFICATION_RECV + FSM_HOLD_TIMER_EXPIRED + FSM_IDLE_HOLD_TIMER_EXPIRED + FSM_RESTART_TIMER_EXPIRED + FSM_GRACEFUL_RESTART + FSM_INVALID_MSG + FSM_NEW_CONNECTION + FSM_OPEN_MSG_RECEIVED + FSM_OPEN_MSG_NEGOTIATED + FSM_HARD_RESET +) + +type FsmStateReason struct { + Type FsmStateReasonType + PeerDownReason PeerDownReason + BGPNotification *bgp.BGPMessage + Data []byte +} + +func NewFsmStateReason(typ FsmStateReasonType, notif *bgp.BGPMessage, data []byte) *FsmStateReason { + var reasonCode PeerDownReason + switch typ { + case FSM_DYING, FSM_INVALID_MSG, FSM_NOTIFICATION_SENT, FSM_HOLD_TIMER_EXPIRED, FSM_IDLE_HOLD_TIMER_EXPIRED, FSM_RESTART_TIMER_EXPIRED: + reasonCode = PEER_DOWN_BY_LOCAL + case FSM_ADMIN_DOWN: + reasonCode = PEER_DOWN_BY_LOCAL_WITHOUT_NOTIFICATION + case FSM_NOTIFICATION_RECV, FSM_GRACEFUL_RESTART, FSM_HARD_RESET: + reasonCode = PEER_DOWN_BY_REMOTE + case FSM_READ_FAILED, FSM_WRITE_FAILED: + reasonCode = PEER_DOWN_BY_REMOTE_WITHOUT_NOTIFICATION + } + return &FsmStateReason{ + Type: typ, + PeerDownReason: reasonCode, + BGPNotification: notif, + Data: data, + } +} + +func (r FsmStateReason) String() string { + switch r.Type { + case FSM_DYING: + return "dying" + case FSM_ADMIN_DOWN: + return "admin-down" + case FSM_READ_FAILED: + return "read-failed" + case FSM_WRITE_FAILED: + return "write-failed" + case FSM_NOTIFICATION_SENT: + body := r.BGPNotification.Body.(*bgp.BGPNotification) + return fmt.Sprintf("notification-sent %s", bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String()) + case FSM_NOTIFICATION_RECV: + body := r.BGPNotification.Body.(*bgp.BGPNotification) + return fmt.Sprintf("notification-received %s", bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String()) + case FSM_HOLD_TIMER_EXPIRED: + return "hold-timer-expired" + case FSM_IDLE_HOLD_TIMER_EXPIRED: + return "idle-hold-timer-expired" + case FSM_RESTART_TIMER_EXPIRED: + return "restart-timer-expired" + case FSM_GRACEFUL_RESTART: + return "graceful-restart" + case FSM_INVALID_MSG: + return "invalid-msg" + case FSM_NEW_CONNECTION: + return "new-connection" + case FSM_OPEN_MSG_RECEIVED: + return "open-msg-received" + case FSM_OPEN_MSG_NEGOTIATED: + return "open-msg-negotiated" + case FSM_HARD_RESET: + return "hard-reset" + default: + return "unknown" + } +} + +type FsmMsgType int + +const ( + _ FsmMsgType = iota + FSM_MSG_STATE_CHANGE + FSM_MSG_BGP_MESSAGE + FSM_MSG_ROUTE_REFRESH +) + +type FsmMsg struct { + MsgType FsmMsgType + MsgSrc string + MsgData interface{} + StateReason *FsmStateReason + PathList []*table.Path + timestamp time.Time + payload []byte + Version uint +} + +type FsmOutgoingMsg struct { + Paths []*table.Path + Notification *bgp.BGPMessage + StayIdle bool +} + +const ( + HOLDTIME_OPENSENT = 240 + HOLDTIME_IDLE = 5 +) + +type AdminState int + +const ( + ADMIN_STATE_UP AdminState = iota + ADMIN_STATE_DOWN + ADMIN_STATE_PFX_CT +) + +func (s AdminState) String() string { + switch s { + case ADMIN_STATE_UP: + return "ADMIN_STATE_UP" + case ADMIN_STATE_DOWN: + return "ADMIN_STATE_DOWN" + case ADMIN_STATE_PFX_CT: + return "ADMIN_STATE_PFX_CT" + default: + return "Unknown" + } +} + +type AdminStateOperation struct { + State AdminState + Communication []byte +} + +var fsmVersion uint + +type FSM struct { + t tomb.Tomb + gConf *config.Global + pConf *config.Neighbor + state bgp.FSMState + reason *FsmStateReason + conn net.Conn + connCh chan net.Conn + idleHoldTime float64 + opensentHoldTime float64 + adminState AdminState + adminStateCh chan AdminStateOperation + getActiveCh chan struct{} + h *FSMHandler + rfMap map[bgp.RouteFamily]bgp.BGPAddPathMode + capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface + recvOpen *bgp.BGPMessage + peerInfo *table.PeerInfo + policy *table.RoutingPolicy + gracefulRestartTimer *time.Timer + twoByteAsTrans bool + version uint + marshallingOptions *bgp.MarshallingOption +} + +func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { + state := &fsm.pConf.State.Messages + timer := &fsm.pConf.Timers + if isIn { + state.Received.Total++ + } else { + state.Sent.Total++ + } + switch MessageType { + case bgp.BGP_MSG_OPEN: + if isIn { + state.Received.Open++ + } else { + state.Sent.Open++ + } + case bgp.BGP_MSG_UPDATE: + if isIn { + state.Received.Update++ + timer.State.UpdateRecvTime = time.Now().Unix() + } else { + state.Sent.Update++ + } + case bgp.BGP_MSG_NOTIFICATION: + if isIn { + state.Received.Notification++ + } else { + state.Sent.Notification++ + } + case bgp.BGP_MSG_KEEPALIVE: + if isIn { + state.Received.Keepalive++ + } else { + state.Sent.Keepalive++ + } + case bgp.BGP_MSG_ROUTE_REFRESH: + if isIn { + state.Received.Refresh++ + } else { + state.Sent.Refresh++ + } + default: + if isIn { + state.Received.Discarded++ + } else { + state.Sent.Discarded++ + } + } +} + +func (fsm *FSM) bmpStatsUpdate(statType uint16, increment int) { + stats := &fsm.pConf.State.Messages.Received + switch statType { + // TODO + // Support other stat types. + case bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE: + stats.WithdrawUpdate += uint32(increment) + case bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX: + stats.WithdrawPrefix += uint32(increment) + } +} + +func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingPolicy) *FSM { + adminState := ADMIN_STATE_UP + if pConf.Config.AdminDown { + adminState = ADMIN_STATE_DOWN + } + pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)] + pConf.Timers.State.Downtime = time.Now().Unix() + fsmVersion++ + fsm := &FSM{ + gConf: gConf, + pConf: pConf, + state: bgp.BGP_FSM_IDLE, + connCh: make(chan net.Conn, 1), + opensentHoldTime: float64(HOLDTIME_OPENSENT), + adminState: adminState, + adminStateCh: make(chan AdminStateOperation, 1), + getActiveCh: make(chan struct{}), + rfMap: make(map[bgp.RouteFamily]bgp.BGPAddPathMode), + capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface), + peerInfo: table.NewPeerInfo(gConf, pConf), + policy: policy, + gracefulRestartTimer: time.NewTimer(time.Hour), + version: fsmVersion, + } + fsm.gracefulRestartTimer.Stop() + fsm.t.Go(fsm.connectLoop) + return fsm +} + +func (fsm *FSM) StateChange(nextState bgp.FSMState) { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "old": fsm.state.String(), + "new": nextState.String(), + "reason": fsm.reason, + }).Debug("state changed") + fsm.state = nextState + switch nextState { + case bgp.BGP_FSM_ESTABLISHED: + fsm.pConf.Timers.State.Uptime = time.Now().Unix() + fsm.pConf.State.EstablishedCount++ + // reset the state set by the previous session + fsm.twoByteAsTrans = false + if _, y := fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]; !y { + fsm.twoByteAsTrans = true + break + } + y := func() bool { + for _, c := range capabilitiesFromConfig(fsm.pConf) { + switch c.(type) { + case *bgp.CapFourOctetASNumber: + return true + } + } + return false + }() + if !y { + fsm.twoByteAsTrans = true + } + case bgp.BGP_FSM_ACTIVE: + if !fsm.pConf.Transport.Config.PassiveMode { + fsm.getActiveCh <- struct{}{} + } + fallthrough + default: + fsm.pConf.Timers.State.Downtime = time.Now().Unix() + } +} + +func hostport(addr net.Addr) (string, uint16) { + if addr != nil { + host, port, err := net.SplitHostPort(addr.String()) + if err != nil { + return "", 0 + } + p, _ := strconv.ParseUint(port, 10, 16) + return host, uint16(p) + } + return "", 0 +} + +func (fsm *FSM) RemoteHostPort() (string, uint16) { + return hostport(fsm.conn.RemoteAddr()) + +} + +func (fsm *FSM) LocalHostPort() (string, uint16) { + return hostport(fsm.conn.LocalAddr()) +} + +func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) (*bgp.BGPMessage, error) { + if fsm.h != nil && fsm.h.conn != nil { + m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) + b, _ := m.Serialize() + _, err := fsm.h.conn.Write(b) + if err == nil { + fsm.bgpMessageStateUpdate(m.Header.Type, false) + fsm.h.sentNotification = m + } + fsm.h.conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "Data": e, + }).Warn("sent notification") + return m, nil + } + return nil, fmt.Errorf("can't send notification to %s since TCP connection is not established", fsm.pConf.State.NeighborAddress) +} + +func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) (*bgp.BGPMessage, error) { + e := bgp.NewMessageError(code, subType, data, msg) + return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError)) +} + +func (fsm *FSM) connectLoop() error { + tick := int(fsm.pConf.Timers.Config.ConnectRetry) + if tick < MIN_CONNECT_RETRY { + tick = MIN_CONNECT_RETRY + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + timer := time.NewTimer(time.Duration(tick) * time.Second) + timer.Stop() + + connect := func() { + addr := fsm.pConf.State.NeighborAddress + port := int(bgp.BGP_PORT) + if fsm.pConf.Transport.Config.RemotePort != 0 { + port = int(fsm.pConf.Transport.Config.RemotePort) + } + laddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(fsm.pConf.Transport.Config.LocalAddress, "0")) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + }).Warn("failed to resolve local address: %s", err) + return + } + var conn net.Conn + d := TCPDialer{ + Dialer: net.Dialer{ + LocalAddr: laddr, + Timeout: time.Duration(MIN_CONNECT_RETRY-1) * time.Second, + }, + AuthPassword: fsm.pConf.Config.AuthPassword, + } + if fsm.pConf.TtlSecurity.Config.Enabled { + d.Ttl = 255 + d.TtlMin = fsm.pConf.TtlSecurity.Config.TtlMin + } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { + d.Ttl = 1 + if fsm.pConf.EbgpMultihop.Config.Enabled { + d.Ttl = fsm.pConf.EbgpMultihop.Config.MultihopTtl + } + } + conn, err = d.DialTCP(addr, port) + if err == nil { + select { + case fsm.connCh <- conn: + return + default: + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + }).Warn("active conn is closed to avoid being blocked") + } + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + }).Debugf("failed to connect: %s", err) + } + + if fsm.state == bgp.BGP_FSM_ACTIVE && !fsm.pConf.GracefulRestart.State.PeerRestarting { + timer.Reset(time.Duration(tick) * time.Second) + } + } + + for { + select { + case <-fsm.t.Dying(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + }).Debug("stop connect loop") + return nil + case <-timer.C: + if fsm.state == bgp.BGP_FSM_ACTIVE && !fsm.pConf.GracefulRestart.State.PeerRestarting { + go connect() + } + case <-fsm.getActiveCh: + timer.Reset(time.Duration(r.Intn(MIN_CONNECT_RETRY)+MIN_CONNECT_RETRY) * time.Second) + } + } +} + +type FSMHandler struct { + t tomb.Tomb + fsm *FSM + conn net.Conn + msgCh *channels.InfiniteChannel + stateReasonCh chan FsmStateReason + incoming *channels.InfiniteChannel + stateCh chan *FsmMsg + outgoing *channels.InfiniteChannel + holdTimerResetCh chan bool + sentNotification *bgp.BGPMessage +} + +func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing *channels.InfiniteChannel) *FSMHandler { + h := &FSMHandler{ + fsm: fsm, + stateReasonCh: make(chan FsmStateReason, 2), + incoming: incoming, + stateCh: stateCh, + outgoing: outgoing, + holdTimerResetCh: make(chan bool, 2), + } + fsm.t.Go(h.loop) + return h +} + +func (h *FSMHandler) idle() (bgp.FSMState, *FsmStateReason) { + fsm := h.fsm + + idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime)) + for { + select { + case <-h.t.Dying(): + return -1, NewFsmStateReason(FSM_DYING, nil, nil) + case <-fsm.gracefulRestartTimer.C: + if fsm.pConf.GracefulRestart.State.PeerRestarting { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("graceful restart timer expired") + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) + } + case conn, ok := <-fsm.connCh: + if !ok { + break + } + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("Closed an accepted connection") + case <-idleHoldTimer.C: + + if fsm.adminState == ADMIN_STATE_UP { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "Duration": fsm.idleHoldTime, + }).Debug("IdleHoldTimer expired") + fsm.idleHoldTime = HOLDTIME_IDLE + return bgp.BGP_FSM_ACTIVE, NewFsmStateReason(FSM_IDLE_HOLD_TIMER_EXPIRED, nil, nil) + + } else { + log.WithFields(log.Fields{"Topic": "Peer"}).Debug("IdleHoldTimer expired, but stay at idle because the admin state is DOWN") + } + + case stateOp := <-fsm.adminStateCh: + err := h.changeAdminState(stateOp.State) + if err == nil { + switch stateOp.State { + case ADMIN_STATE_DOWN: + // stop idle hold timer + idleHoldTimer.Stop() + + case ADMIN_STATE_UP: + // restart idle hold timer + idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime)) + } + } + } + } +} + +func (h *FSMHandler) active() (bgp.FSMState, *FsmStateReason) { + fsm := h.fsm + for { + select { + case <-h.t.Dying(): + return -1, NewFsmStateReason(FSM_DYING, nil, nil) + case conn, ok := <-fsm.connCh: + if !ok { + break + } + fsm.conn = conn + ttl := 0 + ttlMin := 0 + if fsm.pConf.TtlSecurity.Config.Enabled { + ttl = 255 + ttlMin = int(fsm.pConf.TtlSecurity.Config.TtlMin) + } else if fsm.pConf.Config.PeerAs != 0 && fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { + if fsm.pConf.EbgpMultihop.Config.Enabled { + ttl = int(fsm.pConf.EbgpMultihop.Config.MultihopTtl) + } else if fsm.pConf.Transport.Config.Ttl != 0 { + ttl = int(fsm.pConf.Transport.Config.Ttl) + } else { + ttl = 1 + } + } else if fsm.pConf.Transport.Config.Ttl != 0 { + ttl = int(fsm.pConf.Transport.Config.Ttl) + } + if ttl != 0 { + if err := SetTcpTTLSockopt(conn.(*net.TCPConn), ttl); err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.Config.NeighborAddress, + "State": fsm.state.String(), + }).Warnf("cannot set TTL(=%d) for peer: %s", ttl, err) + } + } + if ttlMin != 0 { + if err := SetTcpMinTTLSockopt(conn.(*net.TCPConn), ttlMin); err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.Config.NeighborAddress, + "State": fsm.state.String(), + }).Warnf("cannot set minimal TTL(=%d) for peer: %s", ttl, err) + } + } + // we don't implement delayed open timer so move to opensent right + // away. + return bgp.BGP_FSM_OPENSENT, NewFsmStateReason(FSM_NEW_CONNECTION, nil, nil) + case <-fsm.gracefulRestartTimer.C: + if fsm.pConf.GracefulRestart.State.PeerRestarting { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("graceful restart timer expired") + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) + } + case err := <-h.stateReasonCh: + return bgp.BGP_FSM_IDLE, &err + case stateOp := <-fsm.adminStateCh: + err := h.changeAdminState(stateOp.State) + if err == nil { + switch stateOp.State { + case ADMIN_STATE_DOWN: + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, nil, nil) + case ADMIN_STATE_UP: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "AdminState": stateOp.State.String(), + }).Panic("code logic bug") + } + } + } + } +} + +func capAddPathFromConfig(pConf *config.Neighbor) bgp.ParameterCapabilityInterface { + tuples := make([]*bgp.CapAddPathTuple, 0, len(pConf.AfiSafis)) + for _, af := range pConf.AfiSafis { + var mode bgp.BGPAddPathMode + if af.AddPaths.State.Receive { + mode |= bgp.BGP_ADD_PATH_RECEIVE + } + if af.AddPaths.State.SendMax > 0 { + mode |= bgp.BGP_ADD_PATH_SEND + } + if mode > 0 { + tuples = append(tuples, bgp.NewCapAddPathTuple(af.State.Family, mode)) + } + } + if len(tuples) == 0 { + return nil + } + return bgp.NewCapAddPath(tuples) +} + +func capabilitiesFromConfig(pConf *config.Neighbor) []bgp.ParameterCapabilityInterface { + caps := make([]bgp.ParameterCapabilityInterface, 0, 4) + caps = append(caps, bgp.NewCapRouteRefresh()) + for _, af := range pConf.AfiSafis { + caps = append(caps, bgp.NewCapMultiProtocol(af.State.Family)) + } + caps = append(caps, bgp.NewCapFourOctetASNumber(pConf.Config.LocalAs)) + + if c := pConf.GracefulRestart.Config; c.Enabled { + tuples := []*bgp.CapGracefulRestartTuple{} + ltuples := []*bgp.CapLongLivedGracefulRestartTuple{} + + // RFC 4724 4.1 + // To re-establish the session with its peer, the Restarting Speaker + // MUST set the "Restart State" bit in the Graceful Restart Capability + // of the OPEN message. + restarting := pConf.GracefulRestart.State.LocalRestarting + + if !c.HelperOnly { + for i, rf := range pConf.AfiSafis { + if m := rf.MpGracefulRestart.Config; m.Enabled { + // When restarting, always flag forwaring bit. + // This can be a lie, depending on how gobgpd is used. + // For a route-server use-case, since a route-server + // itself doesn't forward packets, and the dataplane + // is a l2 switch which continues to work with no + // relation to bgpd, this behavior is ok. + // TODO consideration of other use-cases + tuples = append(tuples, bgp.NewCapGracefulRestartTuple(rf.State.Family, restarting)) + pConf.AfiSafis[i].MpGracefulRestart.State.Advertised = true + } + if m := rf.LongLivedGracefulRestart.Config; m.Enabled { + ltuples = append(ltuples, bgp.NewCapLongLivedGracefulRestartTuple(rf.State.Family, restarting, m.RestartTime)) + } + } + } + restartTime := c.RestartTime + notification := c.NotificationEnabled + caps = append(caps, bgp.NewCapGracefulRestart(restarting, notification, restartTime, tuples)) + if c.LongLivedEnabled { + caps = append(caps, bgp.NewCapLongLivedGracefulRestart(ltuples)) + } + } + + // unnumbered BGP + if pConf.Config.NeighborInterface != "" { + tuples := []*bgp.CapExtendedNexthopTuple{} + families, _ := config.AfiSafis(pConf.AfiSafis).ToRfList() + for _, family := range families { + if family == bgp.RF_IPv6_UC { + continue + } + tuple := bgp.NewCapExtendedNexthopTuple(family, bgp.AFI_IP6) + tuples = append(tuples, tuple) + } + caps = append(caps, bgp.NewCapExtendedNexthop(tuples)) + } + + // ADD-PATH Capability + if c := capAddPathFromConfig(pConf); c != nil { + caps = append(caps, capAddPathFromConfig(pConf)) + } + + return caps +} + +func buildopen(gConf *config.Global, pConf *config.Neighbor) *bgp.BGPMessage { + caps := capabilitiesFromConfig(pConf) + opt := bgp.NewOptionParameterCapability(caps) + holdTime := uint16(pConf.Timers.Config.HoldTime) + as := pConf.Config.LocalAs + if as > (1<<16)-1 { + as = bgp.AS_TRANS + } + return bgp.NewBGPOpenMessage(uint16(as), holdTime, gConf.Config.RouterId, + []bgp.OptionParameterInterface{opt}) +} + +func readAll(conn net.Conn, length int) ([]byte, error) { + buf := make([]byte, length) + _, err := io.ReadFull(conn, buf) + if err != nil { + return nil, err + } + return buf, nil +} + +func getPathAttrFromBGPUpdate(m *bgp.BGPUpdate, typ bgp.BGPAttrType) bgp.PathAttributeInterface { + for _, a := range m.PathAttributes { + if a.GetType() == typ { + return a + } + } + return nil +} + +func hasOwnASLoop(ownAS uint32, limit int, asPath *bgp.PathAttributeAsPath) bool { + cnt := 0 + for _, param := range asPath.Value { + for _, as := range param.GetAS() { + if as == ownAS { + cnt++ + if cnt > limit { + return true + } + } + } + } + return false +} + +func extractRouteFamily(p *bgp.PathAttributeInterface) *bgp.RouteFamily { + attr := *p + + var afi uint16 + var safi uint8 + + switch a := attr.(type) { + case *bgp.PathAttributeMpReachNLRI: + afi = a.AFI + safi = a.SAFI + case *bgp.PathAttributeMpUnreachNLRI: + afi = a.AFI + safi = a.SAFI + default: + return nil + } + + rf := bgp.AfiSafiToRouteFamily(afi, safi) + return &rf +} + +func (h *FSMHandler) afiSafiDisable(rf bgp.RouteFamily) string { + n := bgp.AddressFamilyNameMap[rf] + + for i, a := range h.fsm.pConf.AfiSafis { + if string(a.Config.AfiSafiName) == n { + h.fsm.pConf.AfiSafis[i].State.Enabled = false + break + } + } + newList := make([]bgp.ParameterCapabilityInterface, 0) + for _, c := range h.fsm.capMap[bgp.BGP_CAP_MULTIPROTOCOL] { + if c.(*bgp.CapMultiProtocol).CapValue == rf { + continue + } + newList = append(newList, c) + } + h.fsm.capMap[bgp.BGP_CAP_MULTIPROTOCOL] = newList + return n +} + +func (h *FSMHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError bool) bgp.ErrorHandling { + handling := bgp.ERROR_HANDLING_NONE + if m.Header.Type == bgp.BGP_MSG_UPDATE && useRevisedError { + factor := e.(*bgp.MessageError) + handling = factor.ErrorHandling + switch handling { + case bgp.ERROR_HANDLING_ATTRIBUTE_DISCARD: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": e, + }).Warn("Some attributes were discarded") + case bgp.ERROR_HANDLING_TREAT_AS_WITHDRAW: + m.Body = bgp.TreatAsWithdraw(m.Body.(*bgp.BGPUpdate)) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": e, + }).Warn("the received Update message was treated as withdraw") + case bgp.ERROR_HANDLING_AFISAFI_DISABLE: + rf := extractRouteFamily(factor.ErrorAttribute) + if rf == nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + }).Warn("Error occurred during AFI/SAFI disabling") + } else { + n := h.afiSafiDisable(*rf) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": e, + }).Warnf("Capability %s was disabled", n) + } + } + } else { + handling = bgp.ERROR_HANDLING_SESSION_RESET + } + return handling +} + +func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { + sendToStateReasonCh := func(typ FsmStateReasonType, notif *bgp.BGPMessage) { + // probably doesn't happen but be cautious + select { + case h.stateReasonCh <- *NewFsmStateReason(typ, notif, nil): + default: + } + } + + headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH) + if err != nil { + sendToStateReasonCh(FSM_READ_FAILED, nil) + return nil, err + } + + hd := &bgp.BGPHeader{} + err = hd.DecodeFromBytes(headerBuf) + if err != nil { + h.fsm.bgpMessageStateUpdate(0, true) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": err, + }).Warn("Session will be reset due to malformed BGP Header") + fmsg := &FsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.pConf.State.NeighborAddress, + MsgData: err, + Version: h.fsm.version, + } + return fmsg, err + } + + bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH) + if err != nil { + sendToStateReasonCh(FSM_READ_FAILED, nil) + return nil, err + } + + now := time.Now() + useRevisedError := h.fsm.pConf.ErrorHandling.Config.TreatAsWithdraw + handling := bgp.ERROR_HANDLING_NONE + + m, err := bgp.ParseBGPBody(hd, bodyBuf, h.fsm.marshallingOptions) + if err != nil { + handling = h.handlingError(m, err, useRevisedError) + h.fsm.bgpMessageStateUpdate(0, true) + } else { + h.fsm.bgpMessageStateUpdate(m.Header.Type, true) + err = bgp.ValidateBGPMessage(m) + } + fmsg := &FsmMsg{ + MsgType: FSM_MSG_BGP_MESSAGE, + MsgSrc: h.fsm.pConf.State.NeighborAddress, + timestamp: now, + Version: h.fsm.version, + } + + switch handling { + case bgp.ERROR_HANDLING_AFISAFI_DISABLE: + fmsg.MsgData = m + return fmsg, nil + case bgp.ERROR_HANDLING_SESSION_RESET: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": err, + }).Warn("Session will be reset due to malformed BGP message") + fmsg.MsgData = err + return fmsg, err + default: + fmsg.MsgData = m + if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + switch m.Header.Type { + case bgp.BGP_MSG_ROUTE_REFRESH: + fmsg.MsgType = FSM_MSG_ROUTE_REFRESH + case bgp.BGP_MSG_UPDATE: + body := m.Body.(*bgp.BGPUpdate) + isEBGP := h.fsm.pConf.IsEBGPPeer(h.fsm.gConf) + isConfed := h.fsm.pConf.IsConfederationMember(h.fsm.gConf) + + fmsg.payload = make([]byte, len(headerBuf)+len(bodyBuf)) + copy(fmsg.payload, headerBuf) + copy(fmsg.payload[len(headerBuf):], bodyBuf) + + ok, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, isEBGP, isConfed) + if !ok { + handling = h.handlingError(m, err, useRevisedError) + } + if handling == bgp.ERROR_HANDLING_SESSION_RESET { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "State": h.fsm.state.String(), + "error": err, + }).Warn("Session will be reset due to malformed BGP update message") + fmsg.MsgData = err + return fmsg, err + } + + if routes := len(body.WithdrawnRoutes); routes > 0 { + h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE, 1) + h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX, routes) + } else if attr := getPathAttrFromBGPUpdate(body, bgp.BGP_ATTR_TYPE_MP_UNREACH_NLRI); attr != nil { + mpUnreach := attr.(*bgp.PathAttributeMpUnreachNLRI) + if routes = len(mpUnreach.Value); routes > 0 { + h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_UPDATE, 1) + h.fsm.bmpStatsUpdate(bmp.BMP_STAT_TYPE_WITHDRAW_PREFIX, routes) + } + } + + table.UpdatePathAttrs4ByteAs(body) + if err = table.UpdatePathAggregator4ByteAs(body); err != nil { + fmsg.MsgData = err + return fmsg, err + } + + fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp) + fallthrough + case bgp.BGP_MSG_KEEPALIVE: + // if the length of h.holdTimerResetCh + // isn't zero, the timer will be reset + // soon anyway. + select { + case h.holdTimerResetCh <- true: + default: + } + if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { + return nil, nil + } + case bgp.BGP_MSG_NOTIFICATION: + body := m.Body.(*bgp.BGPNotification) + if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { + communication, rest := decodeAdministrativeCommunication(body.Data) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "Code": body.ErrorCode, + "Subcode": body.ErrorSubcode, + "Communicated-Reason": communication, + "Data": rest, + }).Warn("received notification") + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": h.fsm.pConf.State.NeighborAddress, + "Code": body.ErrorCode, + "Subcode": body.ErrorSubcode, + "Data": body.Data, + }).Warn("received notification") + } + + if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET { + sendToStateReasonCh(FSM_HARD_RESET, m) + } else { + sendToStateReasonCh(FSM_NOTIFICATION_RECV, m) + } + return nil, nil + } + } + } + return fmsg, nil +} + +func (h *FSMHandler) recvMessage() error { + defer h.msgCh.Close() + fmsg, _ := h.recvMessageWithError() + if fmsg != nil { + h.msgCh.In() <- fmsg + } + return nil +} + +func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface, map[bgp.RouteFamily]bgp.BGPAddPathMode) { + capMap := make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface) + for _, p := range open.OptParams { + if paramCap, y := p.(*bgp.OptionParameterCapability); y { + for _, c := range paramCap.Capability { + m, ok := capMap[c.Code()] + if !ok { + m = make([]bgp.ParameterCapabilityInterface, 0, 1) + } + capMap[c.Code()] = append(m, c) + } + } + } + + // squash add path cap + if caps, y := capMap[bgp.BGP_CAP_ADD_PATH]; y { + items := make([]*bgp.CapAddPathTuple, 0, len(caps)) + for _, c := range caps { + items = append(items, c.(*bgp.CapAddPath).Tuples...) + } + capMap[bgp.BGP_CAP_ADD_PATH] = []bgp.ParameterCapabilityInterface{bgp.NewCapAddPath(items)} + } + + // remote open message may not include multi-protocol capability + if _, y := capMap[bgp.BGP_CAP_MULTIPROTOCOL]; !y { + capMap[bgp.BGP_CAP_MULTIPROTOCOL] = []bgp.ParameterCapabilityInterface{bgp.NewCapMultiProtocol(bgp.RF_IPv4_UC)} + } + + local := n.CreateRfMap() + remote := make(map[bgp.RouteFamily]bgp.BGPAddPathMode) + for _, c := range capMap[bgp.BGP_CAP_MULTIPROTOCOL] { + family := c.(*bgp.CapMultiProtocol).CapValue + remote[family] = bgp.BGP_ADD_PATH_NONE + for _, a := range capMap[bgp.BGP_CAP_ADD_PATH] { + for _, i := range a.(*bgp.CapAddPath).Tuples { + if i.RouteFamily == family { + remote[family] = i.Mode + } + } + } + } + negotiated := make(map[bgp.RouteFamily]bgp.BGPAddPathMode) + for family, mode := range local { + if m, y := remote[family]; y { + n := bgp.BGP_ADD_PATH_NONE + if mode&bgp.BGP_ADD_PATH_SEND > 0 && m&bgp.BGP_ADD_PATH_RECEIVE > 0 { + n |= bgp.BGP_ADD_PATH_SEND + } + if mode&bgp.BGP_ADD_PATH_RECEIVE > 0 && m&bgp.BGP_ADD_PATH_SEND > 0 { + n |= bgp.BGP_ADD_PATH_RECEIVE + } + negotiated[family] = n + } + } + return capMap, negotiated +} + +func (h *FSMHandler) opensent() (bgp.FSMState, *FsmStateReason) { + fsm := h.fsm + m := buildopen(fsm.gConf, fsm.pConf) + b, _ := m.Serialize() + fsm.conn.Write(b) + fsm.bgpMessageStateUpdate(m.Header.Type, false) + + h.msgCh = channels.NewInfiniteChannel() + h.conn = fsm.conn + + h.t.Go(h.recvMessage) + + // RFC 4271 P.60 + // sets its HoldTimer to a large value + // A HoldTimer value of 4 minutes is suggested as a "large value" + // for the HoldTimer + holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) + + for { + select { + case <-h.t.Dying(): + h.conn.Close() + return -1, NewFsmStateReason(FSM_DYING, nil, nil) + case conn, ok := <-fsm.connCh: + if !ok { + break + } + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("Closed an accepted connection") + case <-fsm.gracefulRestartTimer.C: + if fsm.pConf.GracefulRestart.State.PeerRestarting { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("graceful restart timer expired") + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) + } + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) + switch e.MsgData.(type) { + case *bgp.BGPMessage: + m := e.MsgData.(*bgp.BGPMessage) + if m.Header.Type == bgp.BGP_MSG_OPEN { + fsm.recvOpen = m + body := m.Body.(*bgp.BGPOpen) + peerAs, err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs) + if err != nil { + m, _ := fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError)) + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) + } + + // ASN negotiation was skipped + if fsm.pConf.Config.PeerAs == 0 { + typ := config.PEER_TYPE_EXTERNAL + if fsm.peerInfo.LocalAS == peerAs { + typ = config.PEER_TYPE_INTERNAL + } + fsm.pConf.State.PeerType = typ + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Infof("skipped asn negotiation: peer-as: %d, peer-type: %s", peerAs, typ) + } else { + fsm.pConf.State.PeerType = fsm.pConf.Config.PeerType + } + fsm.pConf.State.PeerAs = peerAs + fsm.peerInfo.AS = peerAs + fsm.peerInfo.ID = body.ID + fsm.capMap, fsm.rfMap = open2Cap(body, fsm.pConf) + + if _, y := fsm.capMap[bgp.BGP_CAP_ADD_PATH]; y { + fsm.marshallingOptions = &bgp.MarshallingOption{ + AddPath: fsm.rfMap, + } + } else { + fsm.marshallingOptions = nil + } + + // calculate HoldTime + // RFC 4271 P.13 + // a BGP speaker MUST calculate the value of the Hold Timer + // by using the smaller of its configured Hold Time and the Hold Time + // received in the OPEN message. + holdTime := float64(body.HoldTime) + myHoldTime := fsm.pConf.Timers.Config.HoldTime + if holdTime > myHoldTime { + fsm.pConf.Timers.State.NegotiatedHoldTime = myHoldTime + } else { + fsm.pConf.Timers.State.NegotiatedHoldTime = holdTime + } + + keepalive := fsm.pConf.Timers.Config.KeepaliveInterval + if n := fsm.pConf.Timers.State.NegotiatedHoldTime; n < myHoldTime { + keepalive = n / 3 + } + fsm.pConf.Timers.State.KeepaliveInterval = keepalive + + gr, ok := fsm.capMap[bgp.BGP_CAP_GRACEFUL_RESTART] + if fsm.pConf.GracefulRestart.Config.Enabled && ok { + state := &fsm.pConf.GracefulRestart.State + state.Enabled = true + cap := gr[len(gr)-1].(*bgp.CapGracefulRestart) + state.PeerRestartTime = uint16(cap.Time) + + for _, t := range cap.Tuples { + n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] + for i, a := range fsm.pConf.AfiSafis { + if string(a.Config.AfiSafiName) == n { + fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Enabled = true + fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Received = true + break + } + } + } + + // RFC 4724 4.1 + // To re-establish the session with its peer, the Restarting Speaker + // MUST set the "Restart State" bit in the Graceful Restart Capability + // of the OPEN message. + if fsm.pConf.GracefulRestart.State.PeerRestarting && cap.Flags&0x08 == 0 { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("restart flag is not set") + // send notification? + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) + } + + // RFC 4724 3 + // The most significant bit is defined as the Restart State (R) + // bit, ...(snip)... When set (value 1), this bit + // indicates that the BGP speaker has restarted, and its peer MUST + // NOT wait for the End-of-RIB marker from the speaker before + // advertising routing information to the speaker. + if fsm.pConf.GracefulRestart.State.LocalRestarting && cap.Flags&0x08 != 0 { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Debug("peer has restarted, skipping wait for EOR") + for i := range fsm.pConf.AfiSafis { + fsm.pConf.AfiSafis[i].MpGracefulRestart.State.EndOfRibReceived = true + } + } + if fsm.pConf.GracefulRestart.Config.NotificationEnabled && cap.Flags&0x04 > 0 { + fsm.pConf.GracefulRestart.State.NotificationEnabled = true + } + } + llgr, ok2 := fsm.capMap[bgp.BGP_CAP_LONG_LIVED_GRACEFUL_RESTART] + if fsm.pConf.GracefulRestart.Config.LongLivedEnabled && ok && ok2 { + fsm.pConf.GracefulRestart.State.LongLivedEnabled = true + cap := llgr[len(llgr)-1].(*bgp.CapLongLivedGracefulRestart) + for _, t := range cap.Tuples { + n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] + for i, a := range fsm.pConf.AfiSafis { + if string(a.Config.AfiSafiName) == n { + fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Enabled = true + fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Received = true + fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.PeerRestartTime = t.RestartTime + break + } + } + } + } + + msg := bgp.NewBGPKeepAliveMessage() + b, _ := msg.Serialize() + fsm.conn.Write(b) + fsm.bgpMessageStateUpdate(msg.Header.Type, false) + return bgp.BGP_FSM_OPENCONFIRM, NewFsmStateReason(FSM_OPEN_MSG_RECEIVED, nil, nil) + } else { + // send notification? + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) + } + case *bgp.MessageError: + m, _ := fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) + default: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Data": e.MsgData, + }).Panic("unknown msg type") + } + case err := <-h.stateReasonCh: + h.conn.Close() + return bgp.BGP_FSM_IDLE, &err + case <-holdTimer.C: + m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") + h.t.Kill(nil) + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) + case stateOp := <-fsm.adminStateCh: + err := h.changeAdminState(stateOp.State) + if err == nil { + switch stateOp.State { + case ADMIN_STATE_DOWN: + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, m, nil) + case ADMIN_STATE_UP: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "AdminState": stateOp.State.String(), + }).Panic("code logic bug") + } + } + } + } +} + +func keepaliveTicker(fsm *FSM) *time.Ticker { + negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime + if negotiatedTime == 0 { + return &time.Ticker{} + } + sec := time.Second * time.Duration(fsm.pConf.Timers.State.KeepaliveInterval) + if sec == 0 { + sec = time.Second + } + return time.NewTicker(sec) +} + +func (h *FSMHandler) openconfirm() (bgp.FSMState, *FsmStateReason) { + fsm := h.fsm + ticker := keepaliveTicker(fsm) + h.msgCh = channels.NewInfiniteChannel() + h.conn = fsm.conn + + h.t.Go(h.recvMessage) + + var holdTimer *time.Timer + if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { + holdTimer = &time.Timer{} + } else { + // RFC 4271 P.65 + // sets the HoldTimer according to the negotiated value + holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) + } + + for { + select { + case <-h.t.Dying(): + h.conn.Close() + return -1, NewFsmStateReason(FSM_DYING, nil, nil) + case conn, ok := <-fsm.connCh: + if !ok { + break + } + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("Closed an accepted connection") + case <-fsm.gracefulRestartTimer.C: + if fsm.pConf.GracefulRestart.State.PeerRestarting { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("graceful restart timer expired") + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_RESTART_TIMER_EXPIRED, nil, nil) + } + case <-ticker.C: + m := bgp.NewBGPKeepAliveMessage() + b, _ := m.Serialize() + // TODO: check error + fsm.conn.Write(b) + fsm.bgpMessageStateUpdate(m.Header.Type, false) + case i, ok := <-h.msgCh.Out(): + if !ok { + continue + } + e := i.(*FsmMsg) + switch e.MsgData.(type) { + case *bgp.BGPMessage: + m := e.MsgData.(*bgp.BGPMessage) + if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { + return bgp.BGP_FSM_ESTABLISHED, NewFsmStateReason(FSM_OPEN_MSG_NEGOTIATED, nil, nil) + } + // send notification ? + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, nil, nil) + case *bgp.MessageError: + m, _ := fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_INVALID_MSG, m, nil) + default: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Data": e.MsgData, + }).Panic("unknown msg type") + } + case err := <-h.stateReasonCh: + h.conn.Close() + return bgp.BGP_FSM_IDLE, &err + case <-holdTimer.C: + m, _ := fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") + h.t.Kill(nil) + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) + case stateOp := <-fsm.adminStateCh: + err := h.changeAdminState(stateOp.State) + if err == nil { + switch stateOp.State { + case ADMIN_STATE_DOWN: + h.conn.Close() + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_ADMIN_DOWN, nil, nil) + case ADMIN_STATE_UP: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "AdminState": stateOp.State.String(), + }).Panic("code logic bug") + } + } + } + } +} + +func (h *FSMHandler) sendMessageloop() error { + conn := h.conn + fsm := h.fsm + ticker := keepaliveTicker(fsm) + send := func(m *bgp.BGPMessage) error { + if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Data": m, + }).Debug("update for 2byte AS peer") + table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate)) + table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) + } + b, err := m.Serialize(h.fsm.marshallingOptions) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Data": err, + }).Warn("failed to serialize") + fsm.bgpMessageStateUpdate(0, false) + return nil + } + if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil { + h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) + conn.Close() + return fmt.Errorf("failed to set write deadline") + } + _, err = conn.Write(b) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Data": err, + }).Warn("failed to send") + h.stateReasonCh <- *NewFsmStateReason(FSM_WRITE_FAILED, nil, nil) + conn.Close() + return fmt.Errorf("closed") + } + fsm.bgpMessageStateUpdate(m.Header.Type, false) + + switch m.Header.Type { + case bgp.BGP_MSG_NOTIFICATION: + body := m.Body.(*bgp.BGPNotification) + if body.ErrorCode == bgp.BGP_ERROR_CEASE && (body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN || body.ErrorSubcode == bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) { + communication, rest := decodeAdministrativeCommunication(body.Data) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Code": body.ErrorCode, + "Subcode": body.ErrorSubcode, + "Communicated-Reason": communication, + "Data": rest, + }).Warn("sent notification") + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Code": body.ErrorCode, + "Subcode": body.ErrorSubcode, + "Data": body.Data, + }).Warn("sent notification") + } + h.stateReasonCh <- *NewFsmStateReason(FSM_NOTIFICATION_SENT, m, nil) + conn.Close() + return fmt.Errorf("closed") + case bgp.BGP_MSG_UPDATE: + update := m.Body.(*bgp.BGPUpdate) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "nlri": update.NLRI, + "withdrawals": update.WithdrawnRoutes, + "attributes": update.PathAttributes, + }).Debug("sent update") + default: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "data": m, + }).Debug("sent") + } + return nil + } + + for { + select { + case <-h.t.Dying(): + return nil + case o := <-h.outgoing.Out(): + m := o.(*FsmOutgoingMsg) + for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, h.fsm.marshallingOptions) { + if err := send(msg); err != nil { + return nil + } + } + if m.Notification != nil { + if m.StayIdle { + // current user is only prefix-limit + // fix me if this is not the case + h.changeAdminState(ADMIN_STATE_PFX_CT) + } + if err := send(m.Notification); err != nil { + return nil + } + } + case <-ticker.C: + if err := send(bgp.NewBGPKeepAliveMessage()); err != nil { + return nil + } + } + } +} + +func (h *FSMHandler) recvMessageloop() error { + for { + fmsg, err := h.recvMessageWithError() + if fmsg != nil { + h.msgCh.In() <- fmsg + } + if err != nil { + return nil + } + } +} + +func (h *FSMHandler) established() (bgp.FSMState, *FsmStateReason) { + fsm := h.fsm + h.conn = fsm.conn + h.t.Go(h.sendMessageloop) + h.msgCh = h.incoming + h.t.Go(h.recvMessageloop) + + var holdTimer *time.Timer + if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { + holdTimer = &time.Timer{} + } else { + holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) + } + + fsm.gracefulRestartTimer.Stop() + + for { + select { + case <-h.t.Dying(): + return -1, NewFsmStateReason(FSM_DYING, nil, nil) + case conn, ok := <-fsm.connCh: + if !ok { + break + } + conn.Close() + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("Closed an accepted connection") + case err := <-h.stateReasonCh: + h.conn.Close() + h.t.Kill(nil) + if s := fsm.pConf.GracefulRestart.State; s.Enabled && + (s.NotificationEnabled && err.Type == FSM_NOTIFICATION_RECV || + err.Type == FSM_READ_FAILED || + err.Type == FSM_WRITE_FAILED) { + err = *NewFsmStateReason(FSM_GRACEFUL_RESTART, nil, nil) + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Info("peer graceful restart") + fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second) + } + return bgp.BGP_FSM_IDLE, &err + case <-holdTimer.C: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("hold timer expired") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) + h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} + return bgp.BGP_FSM_IDLE, NewFsmStateReason(FSM_HOLD_TIMER_EXPIRED, m, nil) + case <-h.holdTimerResetCh: + if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { + holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) + } + case stateOp := <-fsm.adminStateCh: + err := h.changeAdminState(stateOp.State) + if err == nil { + switch stateOp.State { + case ADMIN_STATE_DOWN: + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, stateOp.Communication) + h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} + } + } + } + } +} + +func (h *FSMHandler) loop() error { + fsm := h.fsm + ch := make(chan bgp.FSMState) + oldState := fsm.state + + var reason *FsmStateReason + f := func() error { + nextState := bgp.FSMState(-1) + switch fsm.state { + case bgp.BGP_FSM_IDLE: + nextState, reason = h.idle() + // case bgp.BGP_FSM_CONNECT: + // nextState = h.connect() + case bgp.BGP_FSM_ACTIVE: + nextState, reason = h.active() + case bgp.BGP_FSM_OPENSENT: + nextState, reason = h.opensent() + case bgp.BGP_FSM_OPENCONFIRM: + nextState, reason = h.openconfirm() + case bgp.BGP_FSM_ESTABLISHED: + nextState, reason = h.established() + } + fsm.reason = reason + ch <- nextState + return nil + } + + h.t.Go(f) + + nextState := <-ch + + if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Info("Peer Up") + } + + if oldState == bgp.BGP_FSM_ESTABLISHED { + // The main goroutine sent the notificaiton due to + // deconfiguration or something. + reason := fsm.reason + if fsm.h.sentNotification != nil { + reason.Type = FSM_NOTIFICATION_SENT + reason.PeerDownReason = PEER_DOWN_BY_LOCAL + reason.BGPNotification = fsm.h.sentNotification + } + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "Reason": reason.String(), + }).Info("Peer Down") + } + + e := time.AfterFunc(time.Second*120, func() { + log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.State.NeighborAddress, oldState, nextState) + }) + h.t.Wait() + e.Stop() + + // under zero means that tomb.Dying() + if nextState >= bgp.BGP_FSM_IDLE { + h.stateCh <- &FsmMsg{ + MsgType: FSM_MSG_STATE_CHANGE, + MsgSrc: fsm.pConf.State.NeighborAddress, + MsgData: nextState, + StateReason: reason, + Version: h.fsm.version, + } + } + return nil +} + +func (h *FSMHandler) changeAdminState(s AdminState) error { + fsm := h.fsm + if fsm.adminState != s { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + "AdminState": s.String(), + }).Debug("admin state changed") + + fsm.adminState = s + fsm.pConf.State.AdminDown = !fsm.pConf.State.AdminDown + + switch s { + case ADMIN_STATE_UP: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Info("Administrative start") + case ADMIN_STATE_DOWN: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Info("Administrative shutdown") + case ADMIN_STATE_PFX_CT: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Info("Administrative shutdown(Prefix limit reached)") + } + + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.pConf.State.NeighborAddress, + "State": fsm.state.String(), + }).Warn("cannot change to the same state") + + return fmt.Errorf("cannot change to the same state.") + } + return nil +} |