diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-01 23:25:22 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2015-01-01 23:25:22 +0900 |
commit | 10dfd44a53a3453b4ab83b39bfb5b759b9d43a47 (patch) | |
tree | 5a83d2cf8dabba779964083c1b5bc8ae3add8d94 | |
parent | 1f6976888aa24cbd788f2671e5196a618904f205 (diff) |
server: clean up channel usage for network IO
- use peer.incoming channel directly.
- remove state checking for outgoing messages since peer.go does.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | server/fsm.go | 125 |
1 files changed, 53 insertions, 72 deletions
diff --git a/server/fsm.go b/server/fsm.go index c6309367..3080dca4 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -103,14 +103,14 @@ type FSMHandler struct { t tomb.Tomb fsm *FSM conn *net.TCPConn - ch chan *bgp.BGPMessage - ioError bool + msgCh chan *bgp.BGPMessage + errorCh chan bool } func NewFSMHandler(fsm *FSM) *FSMHandler { f := &FSMHandler{ fsm: fsm, - ioError: false, + errorCh: make(chan bool), } f.t.Go(f.loop) return f @@ -183,36 +183,37 @@ func readAll(conn *net.TCPConn, length int) ([]byte, error) { return buf, nil } -func (h *FSMHandler) recvMessage() error { +func (h *FSMHandler) recvMessageWithError() error { headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH) if err != nil { - h.ioError = true - close(h.ch) - return nil + h.errorCh <- true + return err } hd := &bgp.BGPHeader{} err = hd.DecodeFromBytes(headerBuf) if err != nil { - h.ioError = true - close(h.ch) - return nil + h.errorCh <- true + return err } bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH) if err != nil { - h.ioError = true - close(h.ch) - return nil + h.errorCh <- true + return err } m, err := bgp.ParseBGPBody(hd, bodyBuf) if err != nil { - h.ioError = true - close(h.ch) - return nil + h.errorCh <- true + return err } - h.ch <- m + h.msgCh <- m + return nil +} + +func (h *FSMHandler) recvMessage() error { + h.recvMessageWithError() return nil } @@ -223,7 +224,7 @@ func (h *FSMHandler) opensent() bgp.FSMState { fsm.passiveConn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.ch = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *bgp.BGPMessage) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -233,22 +234,19 @@ func (h *FSMHandler) opensent() bgp.FSMState { case <-h.t.Dying(): fsm.passiveConn.Close() return 0 - case m, ok := <-h.ch: - if ok { - fsm.bgpMessageStateUpdate(m.Header.Type, true) - if m.Header.Type == bgp.BGP_MSG_OPEN { - fsm.incoming <- m - msg := bgp.NewBGPKeepAliveMessage() - b, _ := msg.Serialize() - fsm.passiveConn.Write(b) - fsm.bgpMessageStateUpdate(m.Header.Type, false) - nextState = bgp.BGP_FSM_OPENCONFIRM - } else { - // send error - } + case m := <-h.msgCh: + fsm.bgpMessageStateUpdate(m.Header.Type, true) + if m.Header.Type == bgp.BGP_MSG_OPEN { + fsm.incoming <- m + msg := bgp.NewBGPKeepAliveMessage() + b, _ := msg.Serialize() + fsm.passiveConn.Write(b) + fsm.bgpMessageStateUpdate(m.Header.Type, false) + nextState = bgp.BGP_FSM_OPENCONFIRM } else { - // io error + // send error } + case <-h.errorCh: } return nextState } @@ -258,7 +256,7 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval) fsm.keepaliveTicker = time.NewTicker(sec) - h.ch = make(chan *bgp.BGPMessage) + h.msgCh = make(chan *bgp.BGPMessage) h.conn = fsm.passiveConn h.t.Go(h.recvMessage) @@ -273,19 +271,17 @@ func (h *FSMHandler) openconfirm() bgp.FSMState { b, _ := m.Serialize() // TODO: check error fsm.passiveConn.Write(b) - case m, ok := <-h.ch: + case m := <-h.msgCh: nextState := bgp.BGP_FSM_IDLE - if ok { - fsm.bgpMessageStateUpdate(m.Header.Type, true) - if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { - nextState = bgp.BGP_FSM_ESTABLISHED - } else { - // send error - } + fsm.bgpMessageStateUpdate(m.Header.Type, true) + if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { + nextState = bgp.BGP_FSM_ESTABLISHED } else { - // io error + // send error } return nextState + case <-h.errorCh: + return bgp.BGP_FSM_IDLE } } // panic @@ -300,29 +296,19 @@ func (h *FSMHandler) sendMessageloop() error { case <-h.t.Dying(): return nil case m := <-fsm.outgoing: - isSend := func(state bgp.FSMState, Type uint8) bool { - switch Type { - case bgp.BGP_MSG_UPDATE: - if state == bgp.BGP_FSM_ESTABLISHED { - return true - } - } - return false - }(fsm.state, m.Header.Type) - - if isSend { - b, _ := m.Serialize() - _, err := conn.Write(b) - if err != nil { - return nil - } - fsm.bgpMessageStateUpdate(m.Header.Type, false) + b, _ := m.Serialize() + _, err := conn.Write(b) + if err != nil { + h.errorCh <- true + return nil } + fsm.bgpMessageStateUpdate(m.Header.Type, false) case <-fsm.keepaliveTicker.C: m := bgp.NewBGPKeepAliveMessage() b, _ := m.Serialize() _, err := conn.Write(b) if err != nil { + h.errorCh <- true return nil } fsm.bgpMessageStateUpdate(m.Header.Type, false) @@ -332,8 +318,8 @@ func (h *FSMHandler) sendMessageloop() error { func (h *FSMHandler) recvMessageloop() error { for { - h.recvMessage() - if h.ioError == true { + err := h.recvMessageWithError() + if err != nil { return nil } } @@ -343,21 +329,15 @@ func (h *FSMHandler) established() bgp.FSMState { fsm := h.fsm h.conn = fsm.passiveConn h.t.Go(h.sendMessageloop) - // TODO: use incoming directly - h.ch = make(chan *bgp.BGPMessage, 4096) + h.msgCh = fsm.incoming h.t.Go(h.recvMessageloop) for { select { - case m, ok := <-h.ch: - if ok { - fsm.bgpMessageStateUpdate(m.Header.Type, true) - fsm.incoming <- m - } else { - h.conn.Close() - h.t.Kill(nil) - return bgp.BGP_FSM_IDLE - } + case <-h.errorCh: + h.conn.Close() + h.t.Kill(nil) + return bgp.BGP_FSM_IDLE case <-h.t.Dying(): h.conn.Close() return 0 @@ -384,6 +364,7 @@ func (h *FSMHandler) loop() error { nextState = h.established() } + // zero means that tomb.Dying() if nextState >= bgp.BGP_FSM_IDLE { fsm.stateCh <- nextState } |