// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or // implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/eapache/channels" "github.com/osrg/gobgp/config" "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" "io" "math/rand" "net" "strconv" "strings" "time" ) type FsmStateReason string const ( FSM_DYING = "dying" FSM_ADMIN_DOWN = "admin-down" FSM_READ_FAILED = "read-failed" FSM_WRITE_FAILED = "write-failed" FSM_NOTIFICATION_SENT = "notification-sent" FSM_NOTIFICATION_RECV = "notification-received" FSM_HOLD_TIMER_EXPIRED = "hold-timer-expired" FSM_IDLE_HOLD_TIMER_EXPIRED = "idle-hold-timer-expired" FSM_RESTART_TIMER_EXPIRED = "restart-timer-expired" FSM_GRACEFUL_RESTART = "graceful-restart" FSM_INVALID_MSG = "invalid-msg" FSM_NEW_CONNECTION = "new-connection" FSM_OPEN_MSG_RECEIVED = "open-msg-received" FSM_OPEN_MSG_NEGOTIATED = "open-msg-negotiated" FSM_HARD_RESET = "hard-reset" ) type FsmMsgType int const ( _ FsmMsgType = iota FSM_MSG_STATE_CHANGE FSM_MSG_BGP_MESSAGE FSM_MSG_ROUTE_REFRESH ) type FsmMsg struct { MsgType FsmMsgType MsgSrc string MsgData interface{} PathList []*table.Path timestamp time.Time payload []byte Version uint } type FsmOutgoingMsg struct { Paths []*table.Path Notification *bgp.BGPMessage StayIdle bool } const ( HOLDTIME_OPENSENT = 240 HOLDTIME_IDLE = 5 ) type AdminState int const ( ADMIN_STATE_UP AdminState = iota ADMIN_STATE_DOWN ADMIN_STATE_PFX_CT ) func (s AdminState) String() string { switch s { case ADMIN_STATE_UP: return "ADMIN_STATE_UP" case ADMIN_STATE_DOWN: return "ADMIN_STATE_DOWN" case ADMIN_STATE_PFX_CT: return "ADMIN_STATE_PFX_CT" default: return "Unknown" } } var fsmVersion uint type FSM struct { t tomb.Tomb gConf *config.Global pConf *config.Neighbor state bgp.FSMState reason FsmStateReason conn net.Conn connCh chan net.Conn idleHoldTime float64 opensentHoldTime float64 adminState AdminState adminStateCh chan AdminState getActiveCh chan struct{} h *FSMHandler rfMap map[bgp.RouteFamily]bool capMap map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface recvOpen *bgp.BGPMessage peerInfo *table.PeerInfo policy *table.RoutingPolicy gracefulRestartTimer *time.Timer twoByteAsTrans bool version uint } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { state := &fsm.pConf.State.Messages timer := &fsm.pConf.Timers if isIn { state.Received.Total++ } else { state.Sent.Total++ } switch MessageType { case bgp.BGP_MSG_OPEN: if isIn { state.Received.Open++ } else { state.Sent.Open++ } case bgp.BGP_MSG_UPDATE: if isIn { state.Received.Update++ timer.State.UpdateRecvTime = time.Now().Unix() } else { state.Sent.Update++ } case bgp.BGP_MSG_NOTIFICATION: if isIn { state.Received.Notification++ } else { state.Sent.Notification++ } case bgp.BGP_MSG_KEEPALIVE: if isIn { state.Received.Keepalive++ } else { state.Sent.Keepalive++ } case bgp.BGP_MSG_ROUTE_REFRESH: if isIn { state.Received.Refresh++ } else { state.Sent.Refresh++ } default: if isIn { state.Received.Discarded++ } else { state.Sent.Discarded++ } } } func NewFSM(gConf *config.Global, pConf *config.Neighbor, policy *table.RoutingPolicy) *FSM { adminState := ADMIN_STATE_UP if pConf.Config.AdminDown { adminState = ADMIN_STATE_DOWN } pConf.State.SessionState = config.IntToSessionStateMap[int(bgp.BGP_FSM_IDLE)] pConf.Timers.State.Downtime = time.Now().Unix() fsmVersion++ fsm := &FSM{ gConf: gConf, pConf: pConf, state: bgp.BGP_FSM_IDLE, connCh: make(chan net.Conn, 1), opensentHoldTime: float64(HOLDTIME_OPENSENT), adminState: adminState, adminStateCh: make(chan AdminState, 1), getActiveCh: make(chan struct{}), rfMap: make(map[bgp.RouteFamily]bool), capMap: make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface), peerInfo: table.NewPeerInfo(gConf, pConf), policy: policy, gracefulRestartTimer: time.NewTimer(time.Hour), version: fsmVersion, } fsm.gracefulRestartTimer.Stop() fsm.t.Go(fsm.connectLoop) return fsm } func (fsm *FSM) StateChange(nextState bgp.FSMState) { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "old": fsm.state.String(), "new": nextState.String(), "reason": fsm.reason, }).Debug("state changed") fsm.state = nextState switch nextState { case bgp.BGP_FSM_ESTABLISHED: fsm.pConf.Timers.State.Uptime = time.Now().Unix() fsm.pConf.State.EstablishedCount++ // reset the state set by the previous session fsm.twoByteAsTrans = false if _, y := fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]; !y { fsm.twoByteAsTrans = true break } y := func() bool { for _, c := range capabilitiesFromConfig(fsm.pConf) { switch c.(type) { case *bgp.CapFourOctetASNumber: return true } } return false }() if !y { fsm.twoByteAsTrans = true } case bgp.BGP_FSM_ACTIVE: if !fsm.pConf.Transport.Config.PassiveMode { fsm.getActiveCh <- struct{}{} } fallthrough default: fsm.pConf.Timers.State.Downtime = time.Now().Unix() } } func hostport(addr net.Addr) (string, uint16) { if addr != nil { host, port, err := net.SplitHostPort(addr.String()) if err != nil { return "", 0 } p, _ := strconv.Atoi(port) return host, uint16(p) } return "", 0 } func (fsm *FSM) RemoteHostPort() (string, uint16) { return hostport(fsm.conn.RemoteAddr()) } func (fsm *FSM) LocalHostPort() (string, uint16) { return hostport(fsm.conn.LocalAddr()) } func (fsm *FSM) sendNotificationFromErrorMsg(e *bgp.MessageError) error { if fsm.h != nil && fsm.h.conn != nil { m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) b, _ := m.Serialize() _, err := fsm.h.conn.Write(b) if err == nil { fsm.bgpMessageStateUpdate(m.Header.Type, false) fsm.h.sentNotification = bgp.NewNotificationErrorCode(e.TypeCode, e.SubTypeCode).String() } fsm.h.conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "Data": e, }).Warn("sent notification") return nil } return fmt.Errorf("can't send notification to %s since TCP connection is not established", fsm.pConf.Config.NeighborAddress) } func (fsm *FSM) sendNotification(code, subType uint8, data []byte, msg string) error { e := bgp.NewMessageError(code, subType, data, msg) return fsm.sendNotificationFromErrorMsg(e.(*bgp.MessageError)) } func (fsm *FSM) connectLoop() error { tick := int(fsm.pConf.Timers.Config.ConnectRetry) if tick < MIN_CONNECT_RETRY { tick = MIN_CONNECT_RETRY } r := rand.New(rand.NewSource(time.Now().UnixNano())) timer := time.NewTimer(time.Duration(tick) * time.Second) timer.Stop() connect := func() { addr := fsm.pConf.Config.NeighborAddress port := int(bgp.BGP_PORT) if fsm.pConf.Transport.Config.RemotePort != 0 { port = int(fsm.pConf.Transport.Config.RemotePort) } host := net.JoinHostPort(addr, strconv.Itoa(port)) // check if LocalAddress has been configured laddr := fsm.pConf.Transport.Config.LocalAddress var conn net.Conn var err error if fsm.pConf.Config.AuthPassword != "" { deadline := (MIN_CONNECT_RETRY - 1) * 1000 // msec conn, err = DialTCPTimeoutWithMD5Sig(addr, port, laddr, fsm.pConf.Config.AuthPassword, deadline) } else { lhost := net.JoinHostPort(laddr, "0") ltcpaddr, e := net.ResolveTCPAddr("tcp", lhost) if e != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, }).Warnf("failed to resolve ltcpaddr: %s", e) return } d := net.Dialer{LocalAddr: ltcpaddr, Timeout: time.Duration(MIN_CONNECT_RETRY-1) * time.Second} conn, err = d.Dial("tcp", host) } if err == nil { select { case fsm.connCh <- conn: return default: conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, }).Warn("active conn is closed to avoid being blocked") } } else { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, }).Debugf("failed to connect: %s", err) } if fsm.state == bgp.BGP_FSM_ACTIVE && !fsm.pConf.GracefulRestart.State.PeerRestarting { timer.Reset(time.Duration(tick) * time.Second) } } for { select { case <-fsm.t.Dying(): log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, }).Debug("stop connect loop") return nil case <-timer.C: if fsm.state == bgp.BGP_FSM_ACTIVE && !fsm.pConf.GracefulRestart.State.PeerRestarting { go connect() } case <-fsm.getActiveCh: timer.Reset(time.Duration(r.Intn(MIN_CONNECT_RETRY)+MIN_CONNECT_RETRY) * time.Second) } } } type FSMHandler struct { t tomb.Tomb fsm *FSM conn net.Conn msgCh *channels.InfiniteChannel errorCh chan FsmStateReason incoming *channels.InfiniteChannel stateCh chan *FsmMsg outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification string } func NewFSMHandler(fsm *FSM, incoming *channels.InfiniteChannel, stateCh chan *FsmMsg, outgoing *channels.InfiniteChannel) *FSMHandler { h := &FSMHandler{ fsm: fsm, errorCh: make(chan FsmStateReason, 2), incoming: incoming, stateCh: stateCh, outgoing: outgoing, holdTimerResetCh: make(chan bool, 2), } fsm.t.Go(h.loop) return h } func (h *FSMHandler) idle() (bgp.FSMState, FsmStateReason) { fsm := h.fsm idleHoldTimer := time.NewTimer(time.Second * time.Duration(fsm.idleHoldTime)) for { select { case <-h.t.Dying(): return -1, FSM_DYING case <-fsm.gracefulRestartTimer.C: if fsm.pConf.GracefulRestart.State.PeerRestarting { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") case <-idleHoldTimer.C: if fsm.adminState == ADMIN_STATE_UP { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "Duration": fsm.idleHoldTime, }).Debug("IdleHoldTimer expired") fsm.idleHoldTime = HOLDTIME_IDLE return bgp.BGP_FSM_ACTIVE, FSM_IDLE_HOLD_TIMER_EXPIRED } else { log.WithFields(log.Fields{"Topic": "Peer"}).Debug("IdleHoldTimer expired, but stay at idle because the admin state is DOWN") } case s := <-fsm.adminStateCh: err := h.changeAdminState(s) if err == nil { switch s { case ADMIN_STATE_DOWN: // stop idle hold timer idleHoldTimer.Stop() case ADMIN_STATE_UP: // restart idle hold timer idleHoldTimer.Reset(time.Second * time.Duration(fsm.idleHoldTime)) } } } } } func (h *FSMHandler) active() (bgp.FSMState, FsmStateReason) { fsm := h.fsm for { select { case <-h.t.Dying(): return -1, FSM_DYING case conn, ok := <-fsm.connCh: if !ok { break } fsm.conn = conn if fsm.pConf.Config.PeerType == config.PEER_TYPE_EXTERNAL { ttl := 1 if fsm.pConf.EbgpMultihop.Config.Enabled == true { ttl = int(fsm.pConf.EbgpMultihop.Config.MultihopTtl) } if ttl != 0 { SetTcpTTLSockopts(conn.(*net.TCPConn), ttl) } } // we don't implement delayed open timer so move to opensent right // away. return bgp.BGP_FSM_OPENSENT, FSM_NEW_CONNECTION case <-fsm.gracefulRestartTimer.C: if fsm.pConf.GracefulRestart.State.PeerRestarting { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } case err := <-h.errorCh: return bgp.BGP_FSM_IDLE, err case s := <-fsm.adminStateCh: err := h.changeAdminState(s) if err == nil { switch s { case ADMIN_STATE_DOWN: return bgp.BGP_FSM_IDLE, FSM_ADMIN_DOWN case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "AdminState": s.String(), }).Panic("code logic bug") } } } } } func capabilitiesFromConfig(pConf *config.Neighbor) []bgp.ParameterCapabilityInterface { caps := make([]bgp.ParameterCapabilityInterface, 0, 4) caps = append(caps, bgp.NewCapRouteRefresh()) for _, rf := range pConf.AfiSafis { family, _ := bgp.GetRouteFamily(string(rf.Config.AfiSafiName)) caps = append(caps, bgp.NewCapMultiProtocol(family)) } caps = append(caps, bgp.NewCapFourOctetASNumber(pConf.Config.LocalAs)) if c := pConf.GracefulRestart.Config; c.Enabled { tuples := []*bgp.CapGracefulRestartTuple{} ltuples := []*bgp.CapLongLivedGracefulRestartTuple{} // RFC 4724 4.1 // To re-establish the session with its peer, the Restarting Speaker // MUST set the "Restart State" bit in the Graceful Restart Capability // of the OPEN message. restarting := pConf.GracefulRestart.State.LocalRestarting if !c.HelperOnly { for i, rf := range pConf.AfiSafis { k, _ := bgp.GetRouteFamily(string(rf.Config.AfiSafiName)) if m := rf.MpGracefulRestart.Config; m.Enabled { // When restarting, always flag forwaring bit. // This can be a lie, depending on how gobgpd is used. // For a route-server use-case, since a route-server // itself doesn't forward packets, and the dataplane // is a l2 switch which continues to work with no // relation to bgpd, this behavior is ok. // TODO consideration of other use-cases tuples = append(tuples, bgp.NewCapGracefulRestartTuple(k, restarting)) pConf.AfiSafis[i].MpGracefulRestart.State.Advertised = true } if m := rf.LongLivedGracefulRestart.Config; m.Enabled { ltuples = append(ltuples, bgp.NewCapLongLivedGracefulRestartTuple(k, restarting, m.RestartTime)) } } } time := c.RestartTime notification := c.NotificationEnabled caps = append(caps, bgp.NewCapGracefulRestart(restarting, notification, time, tuples)) if c.LongLivedEnabled { caps = append(caps, bgp.NewCapLongLivedGracefulRestart(ltuples)) } } return caps } func buildopen(gConf *config.Global, pConf *config.Neighbor) *bgp.BGPMessage { caps := capabilitiesFromConfig(pConf) opt := bgp.NewOptionParameterCapability(caps) holdTime := uint16(pConf.Timers.Config.HoldTime) as := pConf.Config.LocalAs if as > (1<<16)-1 { as = bgp.AS_TRANS } return bgp.NewBGPOpenMessage(uint16(as), holdTime, gConf.Config.RouterId, []bgp.OptionParameterInterface{opt}) } func readAll(conn net.Conn, length int) ([]byte, error) { buf := make([]byte, length) _, err := io.ReadFull(conn, buf) if err != nil { return nil, err } return buf, nil } func (h *FSMHandler) recvMessageWithError() (*FsmMsg, error) { sendToErrorCh := func(reason FsmStateReason) { // probably doesn't happen but be cautious select { case h.errorCh <- reason: default: } } headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH) if err != nil { sendToErrorCh(FSM_READ_FAILED) return nil, err } hd := &bgp.BGPHeader{} err = hd.DecodeFromBytes(headerBuf) if err != nil { h.fsm.bgpMessageStateUpdate(0, true) log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.Config.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("malformed BGP Header") fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.Config.NeighborAddress, MsgData: err, Version: h.fsm.version, } return fmsg, err } bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH) if err != nil { sendToErrorCh(FSM_READ_FAILED) return nil, err } now := time.Now() m, err := bgp.ParseBGPBody(hd, bodyBuf) if err == nil { h.fsm.bgpMessageStateUpdate(m.Header.Type, true) err = bgp.ValidateBGPMessage(m) } else { h.fsm.bgpMessageStateUpdate(0, true) } fmsg := &FsmMsg{ MsgType: FSM_MSG_BGP_MESSAGE, MsgSrc: h.fsm.pConf.Config.NeighborAddress, timestamp: now, Version: h.fsm.version, } if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.Config.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("malformed BGP message") fmsg.MsgData = err } else { fmsg.MsgData = m if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { switch m.Header.Type { case bgp.BGP_MSG_ROUTE_REFRESH: fmsg.MsgType = FSM_MSG_ROUTE_REFRESH case bgp.BGP_MSG_UPDATE: body := m.Body.(*bgp.BGPUpdate) confedCheck := !config.IsConfederationMember(h.fsm.gConf, h.fsm.pConf) && config.IsEBGPPeer(h.fsm.gConf, h.fsm.pConf) _, err := bgp.ValidateUpdateMsg(body, h.fsm.rfMap, confedCheck) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.Config.NeighborAddress, "State": h.fsm.state.String(), "error": err, }).Warn("malformed BGP update message") fmsg.MsgData = err } else { // FIXME: we should use the original message for bmp/mrt table.UpdatePathAttrs4ByteAs(body) err := table.UpdatePathAggregator4ByteAs(body) if err == nil { fmsg.PathList = table.ProcessMessage(m, h.fsm.peerInfo, fmsg.timestamp) id := h.fsm.pConf.Config.NeighborAddress for _, path := range fmsg.PathList { if path.IsEOR() { continue } if h.fsm.policy.ApplyPolicy(id, table.POLICY_DIRECTION_IN, path, nil) == nil { path.Filter(id, table.POLICY_DIRECTION_IN) } } } else { fmsg.MsgData = err } } fmsg.payload = make([]byte, len(headerBuf)+len(bodyBuf)) copy(fmsg.payload, headerBuf) copy(fmsg.payload[len(headerBuf):], bodyBuf) fallthrough case bgp.BGP_MSG_KEEPALIVE: // if the length of h.holdTimerResetCh // isn't zero, the timer will be reset // soon anyway. select { case h.holdTimerResetCh <- true: default: } if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { return nil, nil } case bgp.BGP_MSG_NOTIFICATION: body := m.Body.(*bgp.BGPNotification) log.WithFields(log.Fields{ "Topic": "Peer", "Key": h.fsm.pConf.Config.NeighborAddress, "Code": body.ErrorCode, "Subcode": body.ErrorSubcode, "Data": body.Data, }).Warn("received notification") if s := h.fsm.pConf.GracefulRestart.State; s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET { sendToErrorCh(FSM_HARD_RESET) } else { sendToErrorCh(FsmStateReason(fmt.Sprintf("%s %s", FSM_NOTIFICATION_RECV, bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String()))) } return nil, nil } } } return fmsg, err } func (h *FSMHandler) recvMessage() error { defer h.msgCh.Close() fmsg, _ := h.recvMessageWithError() if fmsg != nil { h.msgCh.In() <- fmsg } return nil } func open2Cap(open *bgp.BGPOpen, n *config.Neighbor) (map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface, map[bgp.RouteFamily]bool) { capMap := make(map[bgp.BGPCapabilityCode][]bgp.ParameterCapabilityInterface) rfMap := config.CreateRfMap(n) r := make(map[bgp.RouteFamily]bool) for _, p := range open.OptParams { if paramCap, y := p.(*bgp.OptionParameterCapability); y { for _, c := range paramCap.Capability { m, ok := capMap[c.Code()] if !ok { m = make([]bgp.ParameterCapabilityInterface, 0, 1) } capMap[c.Code()] = append(m, c) if c.Code() == bgp.BGP_CAP_MULTIPROTOCOL { m := c.(*bgp.CapMultiProtocol) r[m.CapValue] = true } } } } if len(r) > 0 { for rf, _ := range rfMap { if _, y := r[rf]; !y { delete(rfMap, rf) } } } else { rfMap = make(map[bgp.RouteFamily]bool) rfMap[bgp.RF_IPv4_UC] = true } return capMap, rfMap } func (h *FSMHandler) opensent() (bgp.FSMState, FsmStateReason) { fsm := h.fsm m := buildopen(fsm.gConf, fsm.pConf) b, _ := m.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) // RFC 4271 P.60 // sets its HoldTimer to a large value // A HoldTimer value of 4 minutes is suggested as a "large value" // for the HoldTimer holdTimer := time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) for { select { case <-h.t.Dying(): h.conn.Close() return -1, FSM_DYING case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") case <-fsm.gracefulRestartTimer.C: if fsm.pConf.GracefulRestart.State.PeerRestarting { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } case i, ok := <-h.msgCh.Out(): if !ok { continue } e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_OPEN { fsm.recvOpen = m body := m.Body.(*bgp.BGPOpen) err := bgp.ValidateOpenMsg(body, fsm.pConf.Config.PeerAs) if err != nil { fsm.sendNotificationFromErrorMsg(err.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG } fsm.peerInfo.ID = body.ID fsm.capMap, fsm.rfMap = open2Cap(body, fsm.pConf) // calculate HoldTime // RFC 4271 P.13 // a BGP speaker MUST calculate the value of the Hold Timer // by using the smaller of its configured Hold Time and the Hold Time // received in the OPEN message. holdTime := float64(body.HoldTime) myHoldTime := fsm.pConf.Timers.Config.HoldTime if holdTime > myHoldTime { fsm.pConf.Timers.State.NegotiatedHoldTime = myHoldTime } else { fsm.pConf.Timers.State.NegotiatedHoldTime = holdTime } keepalive := fsm.pConf.Timers.Config.KeepaliveInterval if n := fsm.pConf.Timers.State.NegotiatedHoldTime; n < myHoldTime { keepalive = n / 3 } fsm.pConf.Timers.State.KeepaliveInterval = keepalive gr, ok := fsm.capMap[bgp.BGP_CAP_GRACEFUL_RESTART] if fsm.pConf.GracefulRestart.Config.Enabled && ok { state := &fsm.pConf.GracefulRestart.State state.Enabled = true cap := gr[len(gr)-1].(*bgp.CapGracefulRestart) state.PeerRestartTime = uint16(cap.Time) for _, t := range cap.Tuples { n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] for i, a := range fsm.pConf.AfiSafis { if string(a.Config.AfiSafiName) == n { fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Enabled = true fsm.pConf.AfiSafis[i].MpGracefulRestart.State.Received = true break } } } // RFC 4724 4.1 // To re-establish the session with its peer, the Restarting Speaker // MUST set the "Restart State" bit in the Graceful Restart Capability // of the OPEN message. if fsm.pConf.GracefulRestart.State.PeerRestarting && cap.Flags&0x08 == 0 { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("restart flag is not set") // send notification? h.conn.Close() return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG } if fsm.pConf.GracefulRestart.Config.NotificationEnabled && cap.Flags&0x04 > 0 { fsm.pConf.GracefulRestart.State.NotificationEnabled = true } } llgr, ok2 := fsm.capMap[bgp.BGP_CAP_LONG_LIVED_GRACEFUL_RESTART] if fsm.pConf.GracefulRestart.Config.LongLivedEnabled && ok && ok2 { fsm.pConf.GracefulRestart.State.LongLivedEnabled = true cap := llgr[len(llgr)-1].(*bgp.CapLongLivedGracefulRestart) for _, t := range cap.Tuples { n := bgp.AddressFamilyNameMap[bgp.AfiSafiToRouteFamily(t.AFI, t.SAFI)] for i, a := range fsm.pConf.AfiSafis { if string(a.Config.AfiSafiName) == n { fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Enabled = true fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.Received = true fsm.pConf.AfiSafis[i].LongLivedGracefulRestart.State.PeerRestartTime = t.RestartTime break } } } } msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() fsm.conn.Write(b) fsm.bgpMessageStateUpdate(msg.Header.Type, false) return bgp.BGP_FSM_OPENCONFIRM, FSM_OPEN_MSG_RECEIVED } else { // send notification? h.conn.Close() return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG } case *bgp.MessageError: fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG default: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": e.MsgData, }).Panic("unknown msg type") } case err := <-h.errorCh: h.conn.Close() return bgp.BGP_FSM_IDLE, err case <-holdTimer.C: fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") h.t.Kill(nil) return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED case s := <-fsm.adminStateCh: err := h.changeAdminState(s) if err == nil { switch s { case ADMIN_STATE_DOWN: h.conn.Close() return bgp.BGP_FSM_IDLE, FSM_ADMIN_DOWN case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "AdminState": s.String(), }).Panic("code logic bug") } } } } } func keepaliveTicker(fsm *FSM) *time.Ticker { negotiatedTime := fsm.pConf.Timers.State.NegotiatedHoldTime if negotiatedTime == 0 { return &time.Ticker{} } sec := time.Second * time.Duration(fsm.pConf.Timers.State.KeepaliveInterval) if sec == 0 { sec = time.Second } return time.NewTicker(sec) } func (h *FSMHandler) openconfirm() (bgp.FSMState, FsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) h.msgCh = channels.NewInfiniteChannel() h.conn = fsm.conn h.t.Go(h.recvMessage) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { // RFC 4271 P.65 // sets the HoldTimer according to the negotiated value holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } for { select { case <-h.t.Dying(): h.conn.Close() return -1, FSM_DYING case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") case <-fsm.gracefulRestartTimer.C: if fsm.pConf.GracefulRestart.State.PeerRestarting { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("graceful restart timer expired") return bgp.BGP_FSM_IDLE, FSM_RESTART_TIMER_EXPIRED } case <-ticker.C: m := bgp.NewBGPKeepAliveMessage() b, _ := m.Serialize() // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) case i, ok := <-h.msgCh.Out(): if !ok { continue } e := i.(*FsmMsg) switch e.MsgData.(type) { case *bgp.BGPMessage: m := e.MsgData.(*bgp.BGPMessage) if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { return bgp.BGP_FSM_ESTABLISHED, FSM_OPEN_MSG_NEGOTIATED } // send notification ? h.conn.Close() return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG case *bgp.MessageError: fsm.sendNotificationFromErrorMsg(e.MsgData.(*bgp.MessageError)) return bgp.BGP_FSM_IDLE, FSM_INVALID_MSG default: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": e.MsgData, }).Panic("unknown msg type") } case err := <-h.errorCh: h.conn.Close() return bgp.BGP_FSM_IDLE, err case <-holdTimer.C: fsm.sendNotification(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil, "hold timer expired") h.t.Kill(nil) return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED case s := <-fsm.adminStateCh: err := h.changeAdminState(s) if err == nil { switch s { case ADMIN_STATE_DOWN: h.conn.Close() return bgp.BGP_FSM_IDLE, FSM_ADMIN_DOWN case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "AdminState": s.String(), }).Panic("code logic bug") } } } } } func (h *FSMHandler) sendMessageloop() error { conn := h.conn fsm := h.fsm ticker := keepaliveTicker(fsm) send := func(m *bgp.BGPMessage) error { if fsm.twoByteAsTrans && m.Header.Type == bgp.BGP_MSG_UPDATE { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": m, }).Debug("update for 2byte AS peer") table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate)) table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate)) } b, err := m.Serialize() if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to serialize") fsm.bgpMessageStateUpdate(0, false) return nil } if err := conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime))); err != nil { h.errorCh <- FSM_WRITE_FAILED conn.Close() return fmt.Errorf("failed to set write deadline") } _, err = conn.Write(b) if err != nil { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": err, }).Warn("failed to send") h.errorCh <- FSM_WRITE_FAILED conn.Close() return fmt.Errorf("closed") } fsm.bgpMessageStateUpdate(m.Header.Type, false) switch m.Header.Type { case bgp.BGP_MSG_NOTIFICATION: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Data": m, }).Warn("sent notification") body := m.Body.(*bgp.BGPNotification) h.errorCh <- FsmStateReason(fmt.Sprintf("%s %s", FSM_NOTIFICATION_SENT, bgp.NewNotificationErrorCode(body.ErrorCode, body.ErrorSubcode).String())) conn.Close() return fmt.Errorf("closed") case bgp.BGP_MSG_UPDATE: update := m.Body.(*bgp.BGPUpdate) log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "nlri": update.NLRI, "withdrawals": update.WithdrawnRoutes, "attributes": update.PathAttributes, }).Debug("sent update") default: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "data": m, }).Debug("sent") } return nil } for { select { case <-h.t.Dying(): return nil case o := <-h.outgoing.Out(): m := o.(*FsmOutgoingMsg) for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths) { if err := send(msg); err != nil { return nil } } if m.Notification != nil { if m.StayIdle { // current user is only prefix-limit // fix me if this is not the case h.changeAdminState(ADMIN_STATE_PFX_CT) } if err := send(m.Notification); err != nil { return nil } } case <-ticker.C: if err := send(bgp.NewBGPKeepAliveMessage()); err != nil { return nil } } } } func (h *FSMHandler) recvMessageloop() error { for { fmsg, err := h.recvMessageWithError() if fmsg != nil { h.msgCh.In() <- fmsg } if err != nil { return nil } } } func (h *FSMHandler) established() (bgp.FSMState, FsmStateReason) { fsm := h.fsm h.conn = fsm.conn h.t.Go(h.sendMessageloop) h.msgCh = h.incoming h.t.Go(h.recvMessageloop) var holdTimer *time.Timer if fsm.pConf.Timers.State.NegotiatedHoldTime == 0 { holdTimer = &time.Timer{} } else { holdTimer = time.NewTimer(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } fsm.gracefulRestartTimer.Stop() for { select { case <-h.t.Dying(): return -1, FSM_DYING case conn, ok := <-fsm.connCh: if !ok { break } conn.Close() log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("Closed an accepted connection") case err := <-h.errorCh: h.conn.Close() h.t.Kill(nil) if s := fsm.pConf.GracefulRestart.State; s.Enabled && ((s.NotificationEnabled && strings.HasPrefix(string(err), FSM_NOTIFICATION_RECV)) || err == FSM_READ_FAILED || err == FSM_WRITE_FAILED) { err = FSM_GRACEFUL_RESTART log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Info("peer graceful restart") fsm.gracefulRestartTimer.Reset(time.Duration(fsm.pConf.GracefulRestart.State.PeerRestartTime) * time.Second) } return bgp.BGP_FSM_IDLE, err case <-holdTimer.C: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("hold timer expired") m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} return bgp.BGP_FSM_IDLE, FSM_HOLD_TIMER_EXPIRED case <-h.holdTimerResetCh: if fsm.pConf.Timers.State.NegotiatedHoldTime != 0 { holdTimer.Reset(time.Second * time.Duration(fsm.pConf.Timers.State.NegotiatedHoldTime)) } case s := <-fsm.adminStateCh: err := h.changeAdminState(s) if err == nil { switch s { case ADMIN_STATE_DOWN: m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN, nil) h.outgoing.In() <- &FsmOutgoingMsg{Notification: m} } } } } } func (h *FSMHandler) loop() error { fsm := h.fsm ch := make(chan bgp.FSMState) oldState := fsm.state f := func() error { nextState := bgp.FSMState(-1) var reason FsmStateReason switch fsm.state { case bgp.BGP_FSM_IDLE: nextState, reason = h.idle() // case bgp.BGP_FSM_CONNECT: // nextState = h.connect() case bgp.BGP_FSM_ACTIVE: nextState, reason = h.active() case bgp.BGP_FSM_OPENSENT: nextState, reason = h.opensent() case bgp.BGP_FSM_OPENCONFIRM: nextState, reason = h.openconfirm() case bgp.BGP_FSM_ESTABLISHED: nextState, reason = h.established() } fsm.reason = reason ch <- nextState return nil } h.t.Go(f) nextState := <-ch if nextState == bgp.BGP_FSM_ESTABLISHED && oldState == bgp.BGP_FSM_OPENCONFIRM { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Info("Peer Up") } if oldState == bgp.BGP_FSM_ESTABLISHED { // The main goroutine sent the notificaiton due to // deconfiguration or something. reason := fsm.reason if fsm.h.sentNotification != "" { reason = FsmStateReason(fmt.Sprintf("%s %s", FSM_NOTIFICATION_SENT, fsm.h.sentNotification)) } log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "Reason": reason, }).Info("Peer Down") } e := time.AfterFunc(time.Second*120, func() { log.WithFields(log.Fields{"Topic": "Peer"}).Fatalf("failed to free the fsm.h.t for %s %s %s", fsm.pConf.Config.NeighborAddress, oldState, nextState) }) h.t.Wait() e.Stop() // under zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { e := &FsmMsg{ MsgType: FSM_MSG_STATE_CHANGE, MsgSrc: fsm.pConf.Config.NeighborAddress, MsgData: nextState, Version: h.fsm.version, } h.stateCh <- e } return nil } func (h *FSMHandler) changeAdminState(s AdminState) error { fsm := h.fsm if fsm.adminState != s { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), "AdminState": s.String(), }).Debug("admin state changed") fsm.adminState = s fsm.pConf.State.AdminDown = !fsm.pConf.State.AdminDown switch s { case ADMIN_STATE_UP: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative start") case ADMIN_STATE_DOWN: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative shutdown") case ADMIN_STATE_PFX_CT: log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Info("Administrative shutdown(Prefix limit reached)") } } else { log.WithFields(log.Fields{ "Topic": "Peer", "Key": fsm.pConf.Config.NeighborAddress, "State": fsm.state.String(), }).Warn("cannot change to the same state") return fmt.Errorf("cannot change to the same state.") } return nil }