diff options
Diffstat (limited to 'server/peer.go')
-rw-r--r-- | server/peer.go | 116 |
1 files changed, 85 insertions, 31 deletions
diff --git a/server/peer.go b/server/peer.go index 14b3548d..a96a89fa 100644 --- a/server/peer.go +++ b/server/peer.go @@ -25,13 +25,14 @@ import ( "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" "net" - "strings" + "strconv" "time" ) const ( FSM_CHANNEL_LENGTH = 1024 FLOP_THRESHOLD = time.Second * 30 + MIN_CONNECT_RETRY = 10 ) type peerMsgType int @@ -48,14 +49,15 @@ type peerMsg struct { } type Peer struct { - t tomb.Tomb - globalConfig config.Global - peerConfig config.Neighbor - acceptedConnCh chan net.Conn - serverMsgCh chan *serverMsg - peerMsgCh chan *peerMsg - fsm *FSM - adjRib *table.AdjRib + t tomb.Tomb + globalConfig config.Global + peerConfig config.Neighbor + connCh chan net.Conn + serverMsgCh chan *serverMsg + peerMsgCh chan *peerMsg + getActiveCh chan struct{} + fsm *FSM + adjRib *table.AdjRib // peer and rib are always not one-to-one so should not be // here but it's the simplest and works our first target. rib *table.TableManager @@ -73,20 +75,21 @@ type Peer struct { func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer, isGlobalRib bool, policyMap map[string]*policy.Policy) *Peer { p := &Peer{ - globalConfig: g, - peerConfig: peer, - acceptedConnCh: make(chan net.Conn), - serverMsgCh: serverMsgCh, - peerMsgCh: peerMsgCh, - rfMap: make(map[bgp.RouteFamily]bool), - capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), - isGlobalRib: isGlobalRib, + globalConfig: g, + peerConfig: peer, + connCh: make(chan net.Conn), + serverMsgCh: serverMsgCh, + peerMsgCh: peerMsgCh, + getActiveCh: make(chan struct{}), + rfMap: make(map[bgp.RouteFamily]bool), + capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), + isGlobalRib: isGlobalRib, } p.siblings = make(map[string]*serverMsgDataPeer) for _, s := range peerList { p.siblings[s.address.String()] = s } - p.fsm = NewFSM(&g, &peer, p.acceptedConnCh) + p.fsm = NewFSM(&g, &peer, p.connCh) peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) peer.BgpNeighborCommonState.Downtime = time.Now().Unix() for _, rf := range peer.AfiSafiList { @@ -103,6 +106,9 @@ func NewPeer(g config.Global, peer config.Neighbor, serverMsgCh chan *serverMsg, p.rib = table.NewTableManager(p.peerConfig.NeighborAddress.String(), rfList) p.setPolicy(policyMap) p.t.Go(p.loop) + if !peer.TransportOptions.PassiveMode && !isGlobalRib { + p.t.Go(p.connectLoop) + } return p } @@ -556,6 +562,56 @@ func (peer *Peer) handleServerMsg(m *serverMsg) { } } +func (peer *Peer) connectLoop() error { + var tick int + if tick = int(peer.fsm.peerConfig.Timers.ConnectRetry); tick < MIN_CONNECT_RETRY { + tick = MIN_CONNECT_RETRY + } + + ticker := time.NewTicker(time.Duration(tick) * time.Second) + ticker.Stop() + + connect := func() { + if bgp.FSMState(peer.peerConfig.BgpNeighborCommonState.State) == bgp.BGP_FSM_ACTIVE { + var host string + addr := peer.peerConfig.NeighborAddress + + if addr.To4() != nil { + host = addr.String() + ":" + strconv.Itoa(bgp.BGP_PORT) + } else { + host = "[" + addr.String() + "]:" + strconv.Itoa(bgp.BGP_PORT) + } + + conn, err := net.DialTimeout("tcp", host, time.Duration(MIN_CONNECT_RETRY-1)*time.Second) + if err == nil { + peer.connCh <- conn + } else { + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.peerConfig.NeighborAddress, + }).Debugf("failed to connect: %s", err) + } + } + } + + for { + select { + case <-peer.t.Dying(): + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.peerConfig.NeighborAddress, + }).Debug("stop connect loop") + ticker.Stop() + return nil + case <-ticker.C: + connect() + case <-peer.getActiveCh: + connect() + ticker = time.NewTicker(time.Duration(tick) * time.Second) + } + } +} + // this goroutine handles routing table operations func (peer *Peer) loop() error { for { @@ -566,14 +622,21 @@ func (peer *Peer) loop() error { if !peer.isGlobalRib { h = NewFSMHandler(peer.fsm, incoming, peer.outgoing) - if peer.peerConfig.BgpNeighborCommonState.State == uint32(bgp.BGP_FSM_ESTABLISHED) { + switch peer.peerConfig.BgpNeighborCommonState.State { + case uint32(bgp.BGP_FSM_ESTABLISHED): + peer.peerConfig.LocalAddress = peer.fsm.LocalAddr() for rf, _ := range peer.rfMap { pathList := peer.adjRib.GetOutPathList(rf) peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) } peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Now().Unix() peer.fsm.peerConfig.BgpNeighborCommonState.EstablishedCount++ - } else { + case uint32(bgp.BGP_FSM_ACTIVE): + if !peer.peerConfig.TransportOptions.PassiveMode { + peer.getActiveCh <- struct{}{} + } + fallthrough + default: peer.fsm.peerConfig.BgpNeighborCommonState.Downtime = time.Now().Unix() } } @@ -582,7 +645,7 @@ func (peer *Peer) loop() error { for sameState { select { case <-peer.t.Dying(): - close(peer.acceptedConnCh) + close(peer.connCh) peer.outgoing <- bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED, nil) // h.t.Kill(nil) will be called // internall so even goroutines in @@ -651,16 +714,7 @@ func (peer *Peer) Stop() error { } func (peer *Peer) PassConn(conn *net.TCPConn) { - localAddr := func(addrPort string) string { - if strings.Index(addrPort, "[") == -1 { - return strings.Split(addrPort, ":")[0] - } - idx := strings.LastIndex(addrPort, ":") - return addrPort[1 : idx-1] - }(conn.LocalAddr().String()) - - peer.peerConfig.LocalAddress = net.ParseIP(localAddr) - peer.acceptedConnCh <- conn + peer.connCh <- conn } func (peer *Peer) MarshalJSON() ([]byte, error) { |