diff options
Diffstat (limited to 'server/bmp.go')
-rw-r--r-- | server/bmp.go | 327 |
1 files changed, 104 insertions, 223 deletions
diff --git a/server/bmp.go b/server/bmp.go index 4542ef2d..0c7bc61d 100644 --- a/server/bmp.go +++ b/server/bmp.go @@ -1,4 +1,4 @@ -// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// Copyright (C) 2015-2016 Nippon Telegraph and Telephone Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,211 +22,114 @@ import ( "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/packet/bmp" "github.com/osrg/gobgp/table" - "gopkg.in/tomb.v2" "net" "strconv" "time" ) -type bmpServer struct { - conn *net.TCPConn - host string - typ config.BmpRouteMonitoringPolicyType -} - -type bmpConfig struct { - config config.BmpServerConfig - del bool - errCh chan error -} - -type bmpWatcher struct { - t tomb.Tomb - ch chan watcherEvent - apiCh chan *GrpcRequest - newConnCh chan *net.TCPConn - endCh chan *net.TCPConn - connMap map[string]*bmpServer - ctlCh chan *bmpConfig -} - -func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { - return w.ch - } - return nil -} - -func (w *bmpWatcher) stop() { - w.t.Kill(nil) -} - -func (w *bmpWatcher) tryConnect(server *bmpServer) { +func (b *bmpClient) tryConnect() *net.TCPConn { interval := 1 - host := server.host for { - log.Debug("connecting bmp server: ", host) - conn, err := net.Dial("tcp", host) + log.Debug("connecting bmp server: ", b.host) + conn, err := net.Dial("tcp", b.host) if err != nil { + select { + case <-b.dead: + return nil + default: + } time.Sleep(time.Duration(interval) * time.Second) if interval < 30 { interval *= 2 } } else { - log.Info("bmp server is connected, ", host) - w.newConnCh <- conn.(*net.TCPConn) - break + log.Info("bmp server is connected, ", b.host) + return conn.(*net.TCPConn) } } } -func (w *bmpWatcher) loop() error { +func (b *bmpClient) Stop() { + close(b.dead) +} + +func (b *bmpClient) loop() { for { - select { - case <-w.t.Dying(): - for _, server := range w.connMap { - if server.conn != nil { - server.conn.Close() - } + conn := b.tryConnect() + if conn == nil { + break + } + + if func() bool { + ops := []WatchOption{WatchPeerState(true)} + if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { + ops = append(ops, WatchUpdate(true)) + } else if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { + ops = append(ops, WatchPostUpdate(true)) } - return nil - case m := <-w.ctlCh: - c := m.config - if m.del { - host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) - if _, y := w.connMap[host]; !y { - m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host) - continue - } - conn := w.connMap[host].conn - delete(w.connMap, host) - conn.Close() - } else { - host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) - if _, y := w.connMap[host]; y { - m.errCh <- fmt.Errorf("bmp server %s already exists", host) - continue + w := b.s.Watch(ops...) + defer w.Stop() + + write := func(msg *bmp.BMPMessage) error { + buf, _ := msg.Serialize() + _, err := conn.Write(buf) + if err != nil { + log.Warnf("failed to write to bmp server %s", b.host) } - server := &bmpServer{ - host: host, - typ: c.RouteMonitoringPolicy, - } - w.connMap[host] = server - go w.tryConnect(server) - } - m.errCh <- nil - close(m.errCh) - case newConn := <-w.newConnCh: - server, y := w.connMap[newConn.RemoteAddr().String()] - if !y { - log.Warnf("Can't find bmp server %s", newConn.RemoteAddr().String()) - break + return err } - i := bmp.NewBMPInitiation([]bmp.BMPTLV{}) - buf, _ := i.Serialize() - if _, err := newConn.Write(buf); err != nil { - log.Warnf("failed to write to bmp server %s", server.host) - go w.tryConnect(server) - break - } - req := &GrpcRequest{ - RequestType: REQ_BMP_NEIGHBORS, - ResponseCh: make(chan *GrpcResponse, 1), + + if err := write(bmp.NewBMPInitiation([]bmp.BMPTLV{})); err != nil { + return false } - w.apiCh <- req - write := func(req *GrpcRequest) error { - for res := range req.ResponseCh { - for _, msg := range res.Data.([]*bmp.BMPMessage) { - buf, _ = msg.Serialize() - if _, err := newConn.Write(buf); err != nil { - log.Warnf("failed to write to bmp server %s %s", server.host, err) - go w.tryConnect(server) - return err + + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventUpdateMsg: + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, } - } - } - return nil - } - if err := write(req); err != nil { - break - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { - req = &GrpcRequest{ - RequestType: REQ_BMP_ADJ_IN, - ResponseCh: make(chan *GrpcResponse, 1), - } - w.apiCh <- req - if err := write(req); err != nil { - break - } - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { - req = &GrpcRequest{ - RequestType: REQ_BMP_GLOBAL, - ResponseCh: make(chan *GrpcResponse, 1), - } - w.apiCh <- req - if err := write(req); err != nil { - break - } - } - server.conn = newConn - case ev := <-w.ch: - switch msg := ev.(type) { - case *watcherEventUpdateMsg: - info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, - } - buf, _ := bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize() - for _, server := range w.connMap { - if server.conn != nil { - send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy - send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy) - if send { - _, err := server.conn.Write(buf) - if err != nil { - log.Warnf("failed to write to bmp server %s", server.host) - } + if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil { + return false } - } - } - case *watcherEventStateChangedMsg: - var bmpmsg *bmp.BMPMessage - info := &table.PeerInfo{ - Address: msg.peerAddress, - AS: msg.peerAS, - ID: msg.peerID, - } - if msg.state == bgp.BGP_FSM_ESTABLISHED { - bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) - } else { - bmpmsg = bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix()) - } - buf, _ := bmpmsg.Serialize() - for _, server := range w.connMap { - if server.conn != nil { - _, err := server.conn.Write(buf) - if err != nil { - log.Warnf("failed to write to bmp server %s", server.host) + case *watcherEventStateChangedMsg: + info := &table.PeerInfo{ + Address: msg.peerAddress, + AS: msg.peerAS, + ID: msg.peerID, + } + if msg.state == bgp.BGP_FSM_ESTABLISHED { + if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + return false + } + } else { + if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil { + return false + } } } + case <-b.dead: + conn.Close() + return true } - default: - log.Warnf("unknown watcher event") - } - case conn := <-w.endCh: - host := conn.RemoteAddr().String() - log.Debugf("bmp connection to %s killed", host) - if _, y := w.connMap[host]; y { - w.connMap[host].conn = nil - go w.tryConnect(w.connMap[host]) } + }() { + return } } } +type bmpClient struct { + s *BgpServer + dead chan struct{} + host string + typ config.BmpRouteMonitoringPolicyType +} + func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bmp.BMPMessage { ph := bmp.NewBMPPeerHeader(t, policy, pd, peeri.Address.String(), peeri.AS, peeri.ID.String(), float64(timestamp)) return bmp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv) @@ -245,62 +148,40 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest return m } -func (w *bmpWatcher) addServer(c config.BmpServerConfig) error { - ch := make(chan error) - w.ctlCh <- &bmpConfig{ - config: c, - errCh: ch, +func (b *bmpClientManager) addServer(c *config.BmpServerConfig) error { + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if _, y := b.clientMap[host]; y { + return fmt.Errorf("bmp client %s is already configured", host) } - return <-ch + b.clientMap[host] = &bmpClient{ + s: b.s, + dead: make(chan struct{}), + host: host, + typ: c.RouteMonitoringPolicy, + } + go b.clientMap[host].loop() + return nil } -func (w *bmpWatcher) deleteServer(c config.BmpServerConfig) error { - ch := make(chan error) - w.ctlCh <- &bmpConfig{ - config: c, - del: true, - errCh: ch, +func (b *bmpClientManager) deleteServer(c *config.BmpServerConfig) error { + host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))) + if c, y := b.clientMap[host]; !y { + return fmt.Errorf("bmp client %s isn't found", host) + } else { + c.Stop() + delete(b.clientMap, host) } - return <-ch + return nil } -func (w *bmpWatcher) watchingEventTypes() []watcherEventType { - state := false - pre := false - post := false - for _, server := range w.connMap { - if server.conn != nil { - state = true - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY { - pre = true - } - if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY { - post = true - } - } - } - types := make([]watcherEventType, 0, 3) - if state { - types = append(types, WATCHER_EVENT_STATE_CHANGE) - } - if pre { - types = append(types, WATCHER_EVENT_UPDATE_MSG) - } - if post { - types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) - } - return types +type bmpClientManager struct { + s *BgpServer + clientMap map[string]*bmpClient } -func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) { - w := &bmpWatcher{ - ch: make(chan watcherEvent), - apiCh: grpcCh, - newConnCh: make(chan *net.TCPConn), - endCh: make(chan *net.TCPConn), - connMap: make(map[string]*bmpServer), - ctlCh: make(chan *bmpConfig), +func newBmpClientManager(s *BgpServer) *bmpClientManager { + return &bmpClientManager{ + s: s, + clientMap: make(map[string]*bmpClient), } - w.t.Go(w.loop) - return w, nil } |