summaryrefslogtreecommitdiffhomepage
path: root/server/fsm.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/fsm.go')
-rw-r--r--server/fsm.go125
1 files changed, 53 insertions, 72 deletions
diff --git a/server/fsm.go b/server/fsm.go
index c6309367..3080dca4 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -103,14 +103,14 @@ type FSMHandler struct {
t tomb.Tomb
fsm *FSM
conn *net.TCPConn
- ch chan *bgp.BGPMessage
- ioError bool
+ msgCh chan *bgp.BGPMessage
+ errorCh chan bool
}
func NewFSMHandler(fsm *FSM) *FSMHandler {
f := &FSMHandler{
fsm: fsm,
- ioError: false,
+ errorCh: make(chan bool),
}
f.t.Go(f.loop)
return f
@@ -183,36 +183,37 @@ func readAll(conn *net.TCPConn, length int) ([]byte, error) {
return buf, nil
}
-func (h *FSMHandler) recvMessage() error {
+func (h *FSMHandler) recvMessageWithError() error {
headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH)
if err != nil {
- h.ioError = true
- close(h.ch)
- return nil
+ h.errorCh <- true
+ return err
}
hd := &bgp.BGPHeader{}
err = hd.DecodeFromBytes(headerBuf)
if err != nil {
- h.ioError = true
- close(h.ch)
- return nil
+ h.errorCh <- true
+ return err
}
bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH)
if err != nil {
- h.ioError = true
- close(h.ch)
- return nil
+ h.errorCh <- true
+ return err
}
m, err := bgp.ParseBGPBody(hd, bodyBuf)
if err != nil {
- h.ioError = true
- close(h.ch)
- return nil
+ h.errorCh <- true
+ return err
}
- h.ch <- m
+ h.msgCh <- m
+ return nil
+}
+
+func (h *FSMHandler) recvMessage() error {
+ h.recvMessageWithError()
return nil
}
@@ -223,7 +224,7 @@ func (h *FSMHandler) opensent() bgp.FSMState {
fsm.passiveConn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
- h.ch = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *bgp.BGPMessage)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -233,22 +234,19 @@ func (h *FSMHandler) opensent() bgp.FSMState {
case <-h.t.Dying():
fsm.passiveConn.Close()
return 0
- case m, ok := <-h.ch:
- if ok {
- fsm.bgpMessageStateUpdate(m.Header.Type, true)
- if m.Header.Type == bgp.BGP_MSG_OPEN {
- fsm.incoming <- m
- msg := bgp.NewBGPKeepAliveMessage()
- b, _ := msg.Serialize()
- fsm.passiveConn.Write(b)
- fsm.bgpMessageStateUpdate(m.Header.Type, false)
- nextState = bgp.BGP_FSM_OPENCONFIRM
- } else {
- // send error
- }
+ case m := <-h.msgCh:
+ fsm.bgpMessageStateUpdate(m.Header.Type, true)
+ if m.Header.Type == bgp.BGP_MSG_OPEN {
+ fsm.incoming <- m
+ msg := bgp.NewBGPKeepAliveMessage()
+ b, _ := msg.Serialize()
+ fsm.passiveConn.Write(b)
+ fsm.bgpMessageStateUpdate(m.Header.Type, false)
+ nextState = bgp.BGP_FSM_OPENCONFIRM
} else {
- // io error
+ // send error
}
+ case <-h.errorCh:
}
return nextState
}
@@ -258,7 +256,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval)
fsm.keepaliveTicker = time.NewTicker(sec)
- h.ch = make(chan *bgp.BGPMessage)
+ h.msgCh = make(chan *bgp.BGPMessage)
h.conn = fsm.passiveConn
h.t.Go(h.recvMessage)
@@ -273,19 +271,17 @@ func (h *FSMHandler) openconfirm() bgp.FSMState {
b, _ := m.Serialize()
// TODO: check error
fsm.passiveConn.Write(b)
- case m, ok := <-h.ch:
+ case m := <-h.msgCh:
nextState := bgp.BGP_FSM_IDLE
- if ok {
- fsm.bgpMessageStateUpdate(m.Header.Type, true)
- if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
- nextState = bgp.BGP_FSM_ESTABLISHED
- } else {
- // send error
- }
+ fsm.bgpMessageStateUpdate(m.Header.Type, true)
+ if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
+ nextState = bgp.BGP_FSM_ESTABLISHED
} else {
- // io error
+ // send error
}
return nextState
+ case <-h.errorCh:
+ return bgp.BGP_FSM_IDLE
}
}
// panic
@@ -300,29 +296,19 @@ func (h *FSMHandler) sendMessageloop() error {
case <-h.t.Dying():
return nil
case m := <-fsm.outgoing:
- isSend := func(state bgp.FSMState, Type uint8) bool {
- switch Type {
- case bgp.BGP_MSG_UPDATE:
- if state == bgp.BGP_FSM_ESTABLISHED {
- return true
- }
- }
- return false
- }(fsm.state, m.Header.Type)
-
- if isSend {
- b, _ := m.Serialize()
- _, err := conn.Write(b)
- if err != nil {
- return nil
- }
- fsm.bgpMessageStateUpdate(m.Header.Type, false)
+ b, _ := m.Serialize()
+ _, err := conn.Write(b)
+ if err != nil {
+ h.errorCh <- true
+ return nil
}
+ fsm.bgpMessageStateUpdate(m.Header.Type, false)
case <-fsm.keepaliveTicker.C:
m := bgp.NewBGPKeepAliveMessage()
b, _ := m.Serialize()
_, err := conn.Write(b)
if err != nil {
+ h.errorCh <- true
return nil
}
fsm.bgpMessageStateUpdate(m.Header.Type, false)
@@ -332,8 +318,8 @@ func (h *FSMHandler) sendMessageloop() error {
func (h *FSMHandler) recvMessageloop() error {
for {
- h.recvMessage()
- if h.ioError == true {
+ err := h.recvMessageWithError()
+ if err != nil {
return nil
}
}
@@ -343,21 +329,15 @@ func (h *FSMHandler) established() bgp.FSMState {
fsm := h.fsm
h.conn = fsm.passiveConn
h.t.Go(h.sendMessageloop)
- // TODO: use incoming directly
- h.ch = make(chan *bgp.BGPMessage, 4096)
+ h.msgCh = fsm.incoming
h.t.Go(h.recvMessageloop)
for {
select {
- case m, ok := <-h.ch:
- if ok {
- fsm.bgpMessageStateUpdate(m.Header.Type, true)
- fsm.incoming <- m
- } else {
- h.conn.Close()
- h.t.Kill(nil)
- return bgp.BGP_FSM_IDLE
- }
+ case <-h.errorCh:
+ h.conn.Close()
+ h.t.Kill(nil)
+ return bgp.BGP_FSM_IDLE
case <-h.t.Dying():
h.conn.Close()
return 0
@@ -384,6 +364,7 @@ func (h *FSMHandler) loop() error {
nextState = h.established()
}
+ // zero means that tomb.Dying()
if nextState >= bgp.BGP_FSM_IDLE {
fsm.stateCh <- nextState
}