diff options
-rw-r--r-- | server/fsm.go | 45 | ||||
-rw-r--r-- | server/fsm_test.go | 32 | ||||
-rw-r--r-- | server/peer.go | 116 | ||||
-rw-r--r-- | server/peer_test.go | 34 |
4 files changed, 152 insertions, 75 deletions
diff --git a/server/fsm.go b/server/fsm.go index 50323ef1..d657b404 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -40,6 +40,7 @@ type fsmMsg struct { const ( HOLDTIME_OPENSENT = 240 + HOLDTIME_IDLE = 5 ) type AdminState int @@ -65,8 +66,8 @@ type FSM struct { peerConfig *config.Neighbor keepaliveTicker *time.Ticker state bgp.FSMState - passiveConn net.Conn - passiveConnCh chan net.Conn + conn net.Conn + connCh chan net.Conn idleHoldTime float64 opensentHoldTime float64 negotiatedHoldTime float64 @@ -127,7 +128,7 @@ func NewFSM(gConfig *config.Global, pConfig *config.Neighbor, connCh chan net.Co globalConfig: gConfig, peerConfig: pConfig, state: bgp.BGP_FSM_IDLE, - passiveConnCh: connCh, + connCh: connCh, opensentHoldTime: float64(HOLDTIME_OPENSENT), adminState: ADMIN_STATE_UP, adminStateCh: make(chan AdminState, 1), @@ -144,6 +145,18 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) { fsm.state = nextState } +func (fsm *FSM) LocalAddr() net.IP { + addr := fsm.conn.LocalAddr() + if addr != nil { + host, _, err := net.SplitHostPort(addr.String()) + if err != nil { + return nil + } + return net.ParseIP(host) + } + return nil +} + func (fsm *FSM) sendNotificatonFromErrorMsg(conn net.Conn, e *bgp.MessageError) { m := bgp.NewBGPNotificationMessage(e.TypeCode, e.SubTypeCode, e.Data) b, _ := m.Serialize() @@ -211,7 +224,7 @@ func (h *FSMHandler) idle() bgp.FSMState { select { case <-h.t.Dying(): return 0 - case conn, ok := <-fsm.passiveConnCh: + case conn, ok := <-fsm.connCh: if !ok { break } @@ -228,7 +241,7 @@ func (h *FSMHandler) idle() bgp.FSMState { "Key": fsm.peerConfig.NeighborAddress, "Duration": fsm.idleHoldTime, }).Debug("IdleHoldTimer expired") - fsm.idleHoldTime = 0 + fsm.idleHoldTime = HOLDTIME_IDLE return bgp.BGP_FSM_ACTIVE } else { @@ -258,11 +271,11 @@ func (h *FSMHandler) active() bgp.FSMState { select { case <-h.t.Dying(): return 0 - case conn, ok := <-fsm.passiveConnCh: + case conn, ok := <-fsm.connCh: if !ok { break } - fsm.passiveConn = conn + fsm.conn = conn // we don't implement delayed open timer so move to opensent right // away. return bgp.BGP_FSM_OPENSENT @@ -393,11 +406,11 @@ func (h *FSMHandler) opensent() bgp.FSMState { fsm := h.fsm m := buildopen(fsm.globalConfig, fsm.peerConfig) b, _ := m.Serialize() - fsm.passiveConn.Write(b) + fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) h.msgCh = make(chan *fsmMsg) - h.conn = fsm.passiveConn + h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -412,7 +425,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { case <-h.t.Dying(): h.conn.Close() return 0 - case conn, ok := <-fsm.passiveConnCh: + case conn, ok := <-fsm.connCh: if !ok { break } @@ -440,7 +453,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { h.incoming <- e msg := bgp.NewBGPKeepAliveMessage() b, _ := msg.Serialize() - fsm.passiveConn.Write(b) + fsm.conn.Write(b) fsm.bgpMessageStateUpdate(msg.Header.Type, false) return bgp.BGP_FSM_OPENCONFIRM } else { @@ -489,7 +502,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { fsm := h.fsm h.msgCh = make(chan *fsmMsg) - h.conn = fsm.passiveConn + h.conn = fsm.conn h.t.Go(h.recvMessage) @@ -510,7 +523,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { case <-h.t.Dying(): h.conn.Close() return 0 - case conn, ok := <-fsm.passiveConnCh: + case conn, ok := <-fsm.connCh: if !ok { break } @@ -523,7 +536,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { m := bgp.NewBGPKeepAliveMessage() b, _ := m.Serialize() // TODO: check error - fsm.passiveConn.Write(b) + fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) case e := <-h.msgCh: switch e.MsgData.(type) { @@ -667,7 +680,7 @@ func (h *FSMHandler) recvMessageloop() error { func (h *FSMHandler) established() bgp.FSMState { fsm := h.fsm - h.conn = fsm.passiveConn + h.conn = fsm.conn h.t.Go(h.sendMessageloop) h.msgCh = h.incoming h.t.Go(h.recvMessageloop) @@ -683,7 +696,7 @@ func (h *FSMHandler) established() bgp.FSMState { select { case <-h.t.Dying(): return 0 - case conn, ok := <-fsm.passiveConnCh: + case conn, ok := <-fsm.connCh: if !ok { break } diff --git a/server/fsm_test.go b/server/fsm_test.go index 01acf6aa..7e321787 100644 --- a/server/fsm_test.go +++ b/server/fsm_test.go @@ -121,6 +121,12 @@ func (m *MockConnection) Close() error { return nil } +func (m *MockConnection) LocalAddr() net.Addr { + return &net.TCPAddr{ + IP: net.ParseIP("10.10.10.10"), + Port: bgp.BGP_PORT} +} + func TestReadAll(t *testing.T) { assert := assert.New(t) m := NewMockConnection() @@ -157,7 +163,7 @@ func TestFSMHandlerOpensent_HoldTimerExpired(t *testing.T) { p, h := makePeerAndHandler() // push mock connection - p.fsm.passiveConn = m + p.fsm.conn = m // set up keepalive ticker sec := time.Second * 1 @@ -183,7 +189,7 @@ func TestFSMHandlerOpenconfirm_HoldTimerExpired(t *testing.T) { p, h := makePeerAndHandler() // push mock connection - p.fsm.passiveConn = m + p.fsm.conn = m // set up keepalive ticker p.fsm.peerConfig.Timers.KeepaliveInterval = 1 @@ -207,7 +213,7 @@ func TestFSMHandlerEstablish_HoldTimerExpired(t *testing.T) { p, h := makePeerAndHandler() // push mock connection - p.fsm.passiveConn = m + p.fsm.conn = m // set up keepalive ticker sec := time.Second * 1 @@ -245,7 +251,7 @@ func TestFSMHandlerOpenconfirm_HoldtimeZero(t *testing.T) { p, h := makePeerAndHandler() // push mock connection - p.fsm.passiveConn = m + p.fsm.conn = m // set up keepalive ticker p.fsm.peerConfig.Timers.KeepaliveInterval = 1 @@ -267,7 +273,7 @@ func TestFSMHandlerEstablished_HoldtimeZero(t *testing.T) { p, h := makePeerAndHandler() // push mock connection - p.fsm.passiveConn = m + p.fsm.conn = m // set up keepalive ticker sec := time.Second * 1 @@ -288,16 +294,17 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { neighborConfig := config.Neighbor{} p := &Peer{ - globalConfig: globalConfig, - peerConfig: neighborConfig, - acceptedConnCh: make(chan net.Conn), - serverMsgCh: make(chan *serverMsg), - peerMsgCh: make(chan *peerMsg), - capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), + globalConfig: globalConfig, + peerConfig: neighborConfig, + connCh: make(chan net.Conn), + serverMsgCh: make(chan *serverMsg), + peerMsgCh: make(chan *peerMsg), + getActiveCh: make(chan struct{}), + capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), } p.siblings = make(map[string]*serverMsgDataPeer) - p.fsm = NewFSM(&globalConfig, &neighborConfig, p.acceptedConnCh) + p.fsm = NewFSM(&globalConfig, &neighborConfig, p.connCh) incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH) p.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH) @@ -308,6 +315,7 @@ func makePeerAndHandler() (*Peer, *FSMHandler) { incoming: incoming, outgoing: p.outgoing, } + p.t.Go(p.connectLoop) return p, h diff --git a/server/peer.go b/server/peer.go index 14b3548d..a96a89fa 100644 --- a/server/peer.go +++ b/server/peer.go @@ -25,13 +25,14 @@ import ( "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" "net" - "strings" + "strconv" "time" ) const ( FSM_CHANNEL_LENGTH = 1024 FLOP_THRESHOLD = time.Second * 30 + MIN_CONNECT_RETRY = 10 ) type peerMsgType int @@ -48,14 +49,15 @@ type peerMsg struct { } type Peer struct { - t tomb.Tomb - globalConfig config.Global - peerConfig config.Neighbor - acceptedConnCh chan net.Conn - serverMsgCh chan *serverMsg - peerMsgCh chan *peerMsg - fsm *FSM - adjRib *table.AdjRib + t tomb.Tomb + globalConfig config.Global + peerConfig config.Neighbor + connCh chan net.Conn + serverMsgCh chan *serverMsg + peerMsgCh chan *peerMsg + getActiveCh chan struct{} + fsm *FSM + adjRib *table.AdjRib // peer and rib are always not one-to-one so should not be // here but it's the simplest and works our first target. rib *table.TableManager @@ -73,20 +75,21 @@ type Peer struct { func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer, isGlobalRib bool, policyMap map[string]*policy.Policy) *Peer { p := &Peer{ - globalConfig: g, - peerConfig: peer, - acceptedConnCh: make(chan net.Conn), - serverMsgCh: serverMsgCh, - peerMsgCh: peerMsgCh, - rfMap: make(map[bgp.RouteFamily]bool), - capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), - isGlobalRib: isGlobalRib, + globalConfig: g, + peerConfig: peer, + connCh: make(chan net.Conn), + serverMsgCh: serverMsgCh, + peerMsgCh: peerMsgCh, + getActiveCh: make(chan struct{}), + rfMap: make(map[bgp.RouteFamily]bool), + capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), + isGlobalRib: isGlobalRib, } p.siblings = make(map[string]*serverMsgDataPeer) for _, s := range peerList { p.siblings[s.address.String()] = s } - p.fsm = NewFSM(&g, &peer, p.acceptedConnCh) + p.fsm = NewFSM(&g, &peer, p.connCh) peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) peer.BgpNeighborCommonState.Downtime = time.Now().Unix() for _, rf := range peer.AfiSafiList { @@ -103,6 +106,9 @@ func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList) p.setPolicy(policyMap) p.t.Go(p.loop) + if !peer.TransportOptions.PassiveMode && !isGlobalRib { + p.t.Go(p.connectLoop) + } return p } @@ -556,6 +562,56 @@ func (peer *Peer) handleServerMsg(m *serverMsg) { } } +func (peer *Peer) connectLoop() error { + var tick int + if tick = int(peer.fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY { + tick = MIN_CONNECT_RETRY + } + + ticker := time.NewTicker(time.Duration(tick) * time.Second) + ticker.Stop() + + connect := func() { + if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE { + var host string + addr := peer.peerConfig.NeighborAddress + + if addr.To4() != nil { + host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT) + } else { + host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT) + } + + conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second) + if err == nil { + peer.connCh <- conn + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.peerConfig.NeighborAddress, + }).Debugf("failed to connect: %s", err) + } + } + } + + for { + select { + case <-peer.t.Dying(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.peerConfig.NeighborAddress, + }).Debug("stop connect loop") + ticker.Stop() + return nil + case <-ticker.C: + connect() + case <-peer.getActiveCh: + connect() + ticker = time.NewTicker(time.Duration(tick) * time.Second) + } + } +} + // this goroutine handles routing table operations func (peer *Peer) loop() error { for { @@ -566,14 +622,21 @@ func (peer *Peer) loop() error { if !peer.isGlobalRib { h = NewFSMHandler(peer.fsm, incoming, peer.outgoing) - if peer.peerConfig.BgpNeighborCommonState.State == uint32(bgp.BGP_FSM_ESTABLISHED) { + switch peer.peerConfig.BgpNeighborCommonState.State { + case uint32(bgp.BGP_FSM_ESTABLISHED): + peer.peerConfig.LocalAddress = peer.fsm.LocalAddr() for rf, _ := range peer.rfMap { pathList := peer.adjRib.GetOutPathList(rf) peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) } peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix() peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ - } else { + case uint32(bgp.BGP_FSM_ACTIVE): + if !peer.peerConfig.TransportOptions.PassiveMode { + peer.getActiveCh <- struct{}{} + } + fallthrough + default: peer.fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() } } @@ -582,7 +645,7 @@ func (peer *Peer) loop() error { for sameState { select { case <-peer.t.Dying(): - close(peer.acceptedConnCh) + close(peer.connCh) peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil) // h.t.Kill(nil) will be called // internall so even goroutines in @@ -651,16 +714,7 @@ func (peer *Peer) Stop() error { } func (peer *Peer) PassConn(conn *net.TCPConn) { - localAddr := func(addrPort string) string { - if strings.Index(addrPort, "[") == -1 { - return strings.Split(addrPort, ":")[0] - } - idx := strings.LastIndex(addrPort, ":") - return addrPort[1 : idx-1] - }(conn.LocalAddr().String()) - - peer.peerConfig.LocalAddress = net.ParseIP(localAddr) - peer.acceptedConnCh <- conn + peer.connCh <- conn } func (peer *Peer) MarshalJSON() ([]byte, error) { diff --git a/server/peer_test.go b/server/peer_test.go index f8703d2c..23588fdc 100644 --- a/server/peer_test.go +++ b/server/peer_test.go @@ -120,7 +120,7 @@ func TestPeerAdminShutdownWhileEstablished(t *testing.T) { go pushPackets() waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC) @@ -235,7 +235,7 @@ func TestPeerAdminShutdownWhileOpensent(t *testing.T) { peer.t.Go(peer.loop) waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000) restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC) @@ -280,7 +280,7 @@ func TestPeerAdminShutdownWhileOpenconfirm(t *testing.T) { } go pushPackets() waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000) restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC) @@ -330,7 +330,7 @@ func TestPeerAdminEnable(t *testing.T) { go pushPackets() waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) // shutdown peer at first @@ -361,7 +361,7 @@ func TestPeerAdminEnable(t *testing.T) { json.Unmarshal(result.Data, &res) assert.Equal("ADMIN_STATE_UP", res["result"]) - waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) + waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, (HOLDTIME_IDLE+1)*1000) assert.Equal(bgp.BGP_FSM_ACTIVE, peer.fsm.state) m2 := NewMockConnection() @@ -373,7 +373,7 @@ func TestPeerAdminEnable(t *testing.T) { } go pushPackets() - peer.acceptedConnCh <- m2 + peer.connCh <- m2 waitUntil(assert, bgp.BGP_FSM_ESTABLISHED, peer, 1000) assert.Equal(bgp.BGP_FSM_ESTABLISHED, peer.fsm.state) @@ -394,7 +394,7 @@ func TestPeerAdminShutdownReject(t *testing.T) { peer.t.Go(peer.loop) waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_OPENSENT, peer, 1000) restReq := api.NewRestRequest(api.REQ_NEIGHBOR_DISABLE, "0.0.0.0", bgp.RF_IPv4_UC) @@ -451,7 +451,7 @@ func TestPeerSelectSmallerHoldtime(t *testing.T) { go pushPackets() waitUntil(assert, bgp.BGP_FSM_ACTIVE, peer, 1000) - peer.acceptedConnCh <- m + peer.connCh <- m waitUntil(assert, bgp.BGP_FSM_OPENCONFIRM, peer, 1000) assert.Equal(float64(0), peer.fsm.negotiatedHoldTime) @@ -500,17 +500,18 @@ func makePeer(globalConfig config.Global, peerConfig config.Neighbor) *Peer { pch := make(chan *peerMsg, 4096) p := &Peer{ - globalConfig: globalConfig, - peerConfig: peerConfig, - acceptedConnCh: make(chan net.Conn), - serverMsgCh: sch, - peerMsgCh: pch, - rfMap: make(map[bgp.RouteFamily]bool), - capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), + globalConfig: globalConfig, + peerConfig: peerConfig, + connCh: make(chan net.Conn), + serverMsgCh: sch, + peerMsgCh: pch, + getActiveCh: make(chan struct{}), + rfMap: make(map[bgp.RouteFamily]bool), + capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), } p.siblings = make(map[string]*serverMsgDataPeer) - p.fsm = NewFSM(&globalConfig, &peerConfig, p.acceptedConnCh) + p.fsm = NewFSM(&globalConfig, &peerConfig, p.connCh) peerConfig.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() if peerConfig.NeighborAddress.To4() != nil { @@ -527,6 +528,7 @@ func makePeer(globalConfig config.Global, peerConfig config.Neighbor) *Peer { rfList := []bgp.RouteFamily{bgp.RF_IPv4_UC, bgp.RF_IPv6_UC} p.adjRib = table.NewAdjRib(rfList) p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList) + p.t.Go(p.connectLoop) return p } |