summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-02-06 20:09:52 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-02-06 20:09:52 -0800
commit3d5383fda440551c05542aafb560f0ec1618609b (patch)
treed6aae9be294ffdf5cb67824a35628201bacaffc7
parentb38617999ddd478bb466f102ba4dd046a5782866 (diff)
server: fix needless warnings about writing to dead bmp connection
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/bmp.go72
1 files changed, 37 insertions, 35 deletions
diff --git a/server/bmp.go b/server/bmp.go
index 9b3fab32..27a278bc 100644
--- a/server/bmp.go
+++ b/server/bmp.go
@@ -40,13 +40,13 @@ type bmpConfig struct {
}
type bmpWatcher struct {
- t tomb.Tomb
- ch chan watcherEvent
- apiCh chan *GrpcRequest
- newServerCh chan *bmpServer
- endCh chan *net.TCPConn
- connMap map[string]*bmpServer
- ctlCh chan *bmpConfig
+ t tomb.Tomb
+ ch chan watcherEvent
+ apiCh chan *GrpcRequest
+ newConnCh chan *net.TCPConn
+ endCh chan *net.TCPConn
+ connMap map[string]*bmpServer
+ ctlCh chan *bmpConfig
}
func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent {
@@ -73,18 +73,7 @@ func (w *bmpWatcher) tryConnect(server *bmpServer) {
}
} else {
log.Info("bmp server is connected, ", host)
- server.conn = conn.(*net.TCPConn)
- go func() {
- buf := make([]byte, 1)
- for {
- _, err := conn.Read(buf)
- if err != nil {
- w.endCh <- conn.(*net.TCPConn)
- return
- }
- }
- }()
- w.newServerCh <- server
+ w.newConnCh <- conn.(*net.TCPConn)
break
}
}
@@ -126,37 +115,47 @@ func (w *bmpWatcher) loop() error {
}
m.errCh <- nil
close(m.errCh)
- case server := <-w.newServerCh:
+ case newConn := <-w.newConnCh:
+ server, y := w.connMap[newConn.RemoteAddr().String()]
+ if !y {
+ log.Warnf("Can't find bmp server %s", newConn.RemoteAddr().String())
+ break
+ }
i := bgp.NewBMPInitiation([]bgp.BMPTLV{})
buf, _ := i.Serialize()
- _, err := server.conn.Write(buf)
- if err != nil {
+ if _, err := newConn.Write(buf); err != nil {
log.Warnf("failed to write to bmp server %s", server.host)
+ break
}
req := &GrpcRequest{
RequestType: REQ_BMP_NEIGHBORS,
ResponseCh: make(chan *GrpcResponse, 1),
}
w.apiCh <- req
- write := func(req *GrpcRequest) {
+ write := func(req *GrpcRequest) error {
for res := range req.ResponseCh {
for _, msg := range res.Data.([]*bgp.BMPMessage) {
buf, _ = msg.Serialize()
- _, err := server.conn.Write(buf)
- if err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
+ if _, err := newConn.Write(buf); err != nil {
+ log.Warnf("failed to write to bmp server %s %s", server.host, err)
+ return err
}
}
}
+ return nil
+ }
+ if err := write(req); err != nil {
+ break
}
- write(req)
if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
req = &GrpcRequest{
RequestType: REQ_BMP_ADJ_IN,
ResponseCh: make(chan *GrpcResponse, 1),
}
w.apiCh <- req
- write(req)
+ if err := write(req); err != nil {
+ break
+ }
}
if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
req = &GrpcRequest{
@@ -164,8 +163,11 @@ func (w *bmpWatcher) loop() error {
ResponseCh: make(chan *GrpcResponse, 1),
}
w.apiCh <- req
- write(req)
+ if err := write(req); err != nil {
+ break
+ }
}
+ server.conn = newConn
case ev := <-w.ch:
switch msg := ev.(type) {
case *watcherEventUpdateMsg:
@@ -293,12 +295,12 @@ func (w *bmpWatcher) watchingEventTypes() []watcherEventType {
func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) {
w := &bmpWatcher{
- ch: make(chan watcherEvent),
- apiCh: grpcCh,
- newServerCh: make(chan *bmpServer),
- endCh: make(chan *net.TCPConn),
- connMap: make(map[string]*bmpServer),
- ctlCh: make(chan *bmpConfig),
+ ch: make(chan watcherEvent),
+ apiCh: grpcCh,
+ newConnCh: make(chan *net.TCPConn),
+ endCh: make(chan *net.TCPConn),
+ connMap: make(map[string]*bmpServer),
+ ctlCh: make(chan *bmpConfig),
}
w.t.Go(w.loop)
return w, nil