diff options
-rw-r--r-- | server/fsm.go | 102 | ||||
-rw-r--r-- | server/fsm_test.go | 212 | ||||
-rw-r--r-- | server/peer.go | 17 |
3 files changed, 309 insertions, 22 deletions
diff --git a/server/fsm.go b/server/fsm.go index 2f8ea52e..8c3882cf 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -37,14 +37,20 @@ type fsmMsg struct { MsgData interface{} } +const ( + HOLDTIME_OPENSENT = 240 +) + type FSM struct { - globalConfig *config.GlobalType - peerConfig *config.NeighborType - keepaliveTicker *time.Ticker - state bgp.FSMState - passiveConn *net.TCPConn - passiveConnCh chan *net.TCPConn - idleHoldTime float64 + globalConfig *config.GlobalType + peerConfig *config.NeighborType + keepaliveTicker *time.Ticker + state bgp.FSMState + passiveConn net.Conn + passiveConnCh chan net.Conn + idleHoldTime float64 + opensentHoldTime float64 + negotiatedHoldTime float64 } func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { @@ -89,12 +95,13 @@ func (fsm *FSM) bgpMessageStateUpdate(MessageType uint8, isIn bool) { } } -func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn) *FSM { +func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan net.Conn) *FSM { return &FSM{ - globalConfig: gConfig, - peerConfig: pConfig, - state: bgp.BGP_FSM_IDLE, - passiveConnCh: connCh, + globalConfig: gConfig, + peerConfig: pConfig, + state: bgp.BGP_FSM_IDLE, + passiveConnCh: connCh, + opensentHoldTime: float64(HOLDTIME_OPENSENT), } } @@ -109,13 +116,14 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) { } type FSMHandler struct { - t tomb.Tomb - fsm *FSM - conn *net.TCPConn - msgCh chan *fsmMsg - errorCh chan bool - incoming chan *fsmMsg - outgoing chan *bgp.BGPMessage + t tomb.Tomb + fsm *FSM + conn net.Conn + msgCh chan *fsmMsg + errorCh chan bool + incoming chan *fsmMsg + outgoing chan *bgp.BGPMessage + holdTimer *time.Timer } func NewFSMHandler(fsm *FSM, incoming chan *fsmMsg, outgoing chan *bgp.BGPMessage) *FSMHandler { @@ -204,7 +212,7 @@ func buildopen(global *config.GlobalType, peerConf *config.NeighborType) *bgp.BG []bgp.OptionParameterInterface{p1, p2, p3}) } -func readAll(conn *net.TCPConn, length int) ([]byte, error) { +func readAll(conn net.Conn, length int) ([]byte, error) { buf := make([]byte, length) for cur := 0; cur < length; { if num, err := conn.Read(buf); err != nil { @@ -265,6 +273,12 @@ func (h *FSMHandler) recvMessageWithError() error { MsgData: m, } h.fsm.bgpMessageStateUpdate(m.Header.Type, true) + + if h.fsm.state == bgp.BGP_FSM_ESTABLISHED { + if m.Header.Type == bgp.BGP_MSG_KEEPALIVE || m.Header.Type == bgp.BGP_MSG_UPDATE { + h.holdTimer.Reset(time.Second * time.Duration(h.fsm.negotiatedHoldTime)) + } + } } h.msgCh <- fmsg return err @@ -287,6 +301,12 @@ func (h *FSMHandler) opensent() bgp.FSMState { h.t.Go(h.recvMessage) + // RFC 4271 P.60 + // sets its HoldTimer to a large value + // A HoldTimer value of 4 minutes is suggested as a "large value" + // for the HoldTimer + h.holdTimer = time.NewTimer(time.Second * time.Duration(fsm.opensentHoldTime)) + for { select { case <-h.t.Dying(): @@ -336,6 +356,19 @@ func (h *FSMHandler) opensent() bgp.FSMState { case <-h.errorCh: h.conn.Close() return bgp.BGP_FSM_IDLE + case <-h.holdTimer.C: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + "data": bgp.BGP_FSM_OPENSENT, + }).Warn("hold timer expired") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) + b, _ := m.Serialize() + fsm.passiveConn.Write(b) + fsm.bgpMessageStateUpdate(m.Header.Type, false) + h.conn.Close() + h.t.Kill(nil) + return bgp.BGP_FSM_IDLE } } } @@ -350,6 +383,10 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { h.t.Go(h.recvMessage) + // RFC 4271 P.65 + // sets the HoldTimer according to the negotiated value + h.holdTimer = time.NewTimer(time.Second * time.Duration(fsm.negotiatedHoldTime)) + for { select { case <-h.t.Dying(): @@ -397,6 +434,19 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { case <-h.errorCh: h.conn.Close() return bgp.BGP_FSM_IDLE + case <-h.holdTimer.C: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + "data": bgp.BGP_FSM_OPENCONFIRM, + }).Warn("hold timer expired") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) + b, _ := m.Serialize() + fsm.passiveConn.Write(b) + fsm.bgpMessageStateUpdate(m.Header.Type, false) + h.conn.Close() + h.t.Kill(nil) + return bgp.BGP_FSM_IDLE } } log.WithFields(log.Fields{ @@ -462,6 +512,9 @@ func (h *FSMHandler) established() bgp.FSMState { h.msgCh = h.incoming h.t.Go(h.recvMessageloop) + // restart HoldTimer + h.holdTimer = time.NewTimer(time.Second * time.Duration(fsm.negotiatedHoldTime)) + for { select { case <-h.t.Dying(): @@ -476,6 +529,15 @@ func (h *FSMHandler) established() bgp.FSMState { h.conn.Close() h.t.Kill(nil) return bgp.BGP_FSM_IDLE + case <-h.holdTimer.C: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": fsm.peerConfig.NeighborAddress, + "data": bgp.BGP_FSM_ESTABLISHED, + }).Warn("hold timer expired") + m := bgp.NewBGPNotificationMessage(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, 0, nil) + h.outgoing <- m + return bgp.BGP_FSM_IDLE } } return 0 diff --git a/server/fsm_test.go b/server/fsm_test.go new file mode 100644 index 00000000..76b2a095 --- /dev/null +++ b/server/fsm_test.go @@ -0,0 +1,212 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" + "github.com/stretchr/testify/assert" + "net" + "strconv" + "testing" + "time" +) + +type MockConnection struct { + net.Conn + recvCh chan []byte + sendBuf [][]byte + readBytes int +} + +func NewMockConnection() *MockConnection { + m := &MockConnection{ + recvCh: make(chan []byte, 128), + sendBuf: make([][]byte, 129), + } + return m +} + +func (m *MockConnection) Read(buf []byte) (int, error) { + + data := <-m.recvCh + + for _, val := range data { + buf[m.readBytes] = val + m.readBytes += 1 + } + + length := 0 + if m.readBytes == len(buf) { + m.readBytes = 0 + length = len(buf) + } else { + length = m.readBytes + } + + fmt.Printf("%d bytes read from peer\n", length) + return length, nil +} + +func (m *MockConnection) Write(buf []byte) (int, error) { + m.sendBuf = append(m.sendBuf, buf) + msg, _ := bgp.ParseBGPMessage(buf) + fmt.Printf("%d bytes written by gobgp message type : %s\n", len(buf), showMessageType(msg.Header.Type)) + return len(buf), nil +} + +func showMessageType(t uint8) string { + switch t { + case bgp.BGP_MSG_KEEPALIVE: + return "BGP_MSG_KEEPALIVE" + case bgp.BGP_MSG_NOTIFICATION: + return "BGP_MSG_NOTIFICATION" + case bgp.BGP_MSG_OPEN: + return "BGP_MSG_OPEN" + case bgp.BGP_MSG_UPDATE: + return "BGP_MSG_UPDATE" + case bgp.BGP_MSG_ROUTE_REFRESH: + return "BGP_MSG_ROUTE_REFRESH" + } + return strconv.Itoa(int(t)) +} + +func (m *MockConnection) Close() error { + fmt.Printf("close called") + return nil +} + +func TestReadAll(t *testing.T) { + assert := assert.New(t) + m := NewMockConnection() + msg := open() + expected1, _ := msg.Header.Serialize() + expected2, _ := msg.Body.Serialize() + + pushBytes := func() { + fmt.Println("push 5 bytes") + m.recvCh <- expected1[0:5] + fmt.Println("wait 5 seconds...") + time.Sleep(time.Second * 5) + fmt.Println("push rest") + m.recvCh <- expected1[5:] + fmt.Println("push bytes at once") + m.recvCh <- expected2 + } + + go pushBytes() + + var actual1 []byte + actual1, _ = readAll(m, bgp.BGP_HEADER_LENGTH) + fmt.Println(actual1) + assert.Equal(expected1, actual1) + + var actual2 []byte + actual2, _ = readAll(m, len(expected2)) + fmt.Println(actual2) + assert.Equal(expected2, actual2) +} + +func TestFSMHandlerEstablish_HoldTimerExpired(t *testing.T) { + assert := assert.New(t) + m := NewMockConnection() + + p, h := makePeerAndHandler() + + // push mock connection + p.fsm.passiveConn = m + + // set up keepalive ticker + sec := time.Second * 2 + p.fsm.keepaliveTicker = time.NewTicker(sec) + + msg := keepalive() + header, _ := msg.Header.Serialize() + body, _ := msg.Body.Serialize() + + pushPackets := func() { + // first keepalive from peer + m.recvCh <- header + m.recvCh <- body + time.Sleep(time.Second * 4) + // second keepalive from peer + m.recvCh <- header + m.recvCh <- body + } + + // set holdtime + p.fsm.peerConfig.Timers.HoldTime = 10 + p.fsm.negotiatedHoldTime = 10 + + go pushPackets() + state := h.established() + time.Sleep(time.Second * 4) + assert.Equal(bgp.BGP_FSM_IDLE, state) + lastMsg := m.sendBuf[len(m.sendBuf)-1] + sent, _ := bgp.ParseBGPMessage(lastMsg) + assert.Equal(bgp.BGP_MSG_NOTIFICATION, sent.Header.Type) + assert.Equal(bgp.BGP_ERROR_HOLD_TIMER_EXPIRED, sent.Body.(*bgp.BGPNotification).ErrorCode) +} + +func makePeerAndHandler() (*Peer, *FSMHandler) { + globalConfig := config.GlobalType{} + neighborConfig := config.NeighborType{} + + p := &Peer{ + globalConfig: globalConfig, + peerConfig: neighborConfig, + acceptedConnCh: make(chan net.Conn), + serverMsgCh: make(chan *serverMsg), + peerMsgCh: make(chan *peerMsg), + capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), + } + + p.siblings = make(map[string]*serverMsgDataPeer) + p.fsm = NewFSM(&globalConfig, &neighborConfig, p.acceptedConnCh) + + incoming := make(chan *fsmMsg, FSM_CHANNEL_LENGTH) + p.outgoing = make(chan *bgp.BGPMessage, FSM_CHANNEL_LENGTH) + + h := &FSMHandler{ + fsm: p.fsm, + errorCh: make(chan bool, 2), + incoming: incoming, + outgoing: p.outgoing, + } + + return p, h + +} + +func open() *bgp.BGPMessage { + p1 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapRouteRefresh()}) + p2 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapMultiProtocol(3, 4)}) + g := bgp.CapGracefulRestartTuples{4, 2, 3} + p3 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapGracefulRestart(2, 100, + []bgp.CapGracefulRestartTuples{g})}) + p4 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapFourOctetASNumber(100000)}) + return bgp.NewBGPOpenMessage(11033, 303, "100.4.10.3", + []bgp.OptionParameterInterface{p1, p2, p3, p4}) +} + +func keepalive() *bgp.BGPMessage { + return bgp.NewBGPKeepAliveMessage() +} diff --git a/server/peer.go b/server/peer.go index 3e264fbb..bc8bb10c 100644 --- a/server/peer.go +++ b/server/peer.go @@ -49,7 +49,7 @@ type Peer struct { t tomb.Tomb globalConfig config.GlobalType peerConfig config.NeighborType - acceptedConnCh chan *net.TCPConn + acceptedConnCh chan net.Conn serverMsgCh chan *serverMsg peerMsgCh chan *peerMsg fsm *FSM @@ -69,7 +69,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *se p := &Peer{ globalConfig: g, peerConfig: peer, - acceptedConnCh: make(chan *net.TCPConn), + acceptedConnCh: make(chan net.Conn), serverMsgCh: serverMsgCh, peerMsgCh: peerMsgCh, capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), @@ -119,6 +119,19 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { } } + // calculate HoldTime + // RFC 4271 P.13 + // a BGP speaker MUST calculate the value of the Hold Timer + // by using the smaller of its configured Hold Time and the Hold Time + // received in the OPEN message. + holdTime := float64(body.HoldTime) + myHoldTime := peer.fsm.peerConfig.Timers.HoldTime + if holdTime > myHoldTime { + peer.fsm.negotiatedHoldTime = holdTime + } else { + peer.fsm.negotiatedHoldTime = myHoldTime + } + case bgp.BGP_MSG_ROUTE_REFRESH: pathList := peer.adjRib.GetOutPathList(peer.rf) peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) |