summaryrefslogtreecommitdiffhomepage
path: root/server/bmp.go
diff options
context:
space:
mode:
Diffstat (limited to 'server/bmp.go')
-rw-r--r--server/bmp.go327
1 files changed, 104 insertions, 223 deletions
diff --git a/server/bmp.go b/server/bmp.go
index 4542ef2d..0c7bc61d 100644
--- a/server/bmp.go
+++ b/server/bmp.go
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
+// Copyright (C) 2015-2016 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -22,211 +22,114 @@ import (
"github.com/osrg/gobgp/packet/bgp"
"github.com/osrg/gobgp/packet/bmp"
"github.com/osrg/gobgp/table"
- "gopkg.in/tomb.v2"
"net"
"strconv"
"time"
)
-type bmpServer struct {
- conn *net.TCPConn
- host string
- typ config.BmpRouteMonitoringPolicyType
-}
-
-type bmpConfig struct {
- config config.BmpServerConfig
- del bool
- errCh chan error
-}
-
-type bmpWatcher struct {
- t tomb.Tomb
- ch chan watcherEvent
- apiCh chan *GrpcRequest
- newConnCh chan *net.TCPConn
- endCh chan *net.TCPConn
- connMap map[string]*bmpServer
- ctlCh chan *bmpConfig
-}
-
-func (w *bmpWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
- return w.ch
- }
- return nil
-}
-
-func (w *bmpWatcher) stop() {
- w.t.Kill(nil)
-}
-
-func (w *bmpWatcher) tryConnect(server *bmpServer) {
+func (b *bmpClient) tryConnect() *net.TCPConn {
interval := 1
- host := server.host
for {
- log.Debug("connecting bmp server: ", host)
- conn, err := net.Dial("tcp", host)
+ log.Debug("connecting bmp server: ", b.host)
+ conn, err := net.Dial("tcp", b.host)
if err != nil {
+ select {
+ case <-b.dead:
+ return nil
+ default:
+ }
time.Sleep(time.Duration(interval) * time.Second)
if interval < 30 {
interval *= 2
}
} else {
- log.Info("bmp server is connected, ", host)
- w.newConnCh <- conn.(*net.TCPConn)
- break
+ log.Info("bmp server is connected, ", b.host)
+ return conn.(*net.TCPConn)
}
}
}
-func (w *bmpWatcher) loop() error {
+func (b *bmpClient) Stop() {
+ close(b.dead)
+}
+
+func (b *bmpClient) loop() {
for {
- select {
- case <-w.t.Dying():
- for _, server := range w.connMap {
- if server.conn != nil {
- server.conn.Close()
- }
+ conn := b.tryConnect()
+ if conn == nil {
+ break
+ }
+
+ if func() bool {
+ ops := []WatchOption{WatchPeerState(true)}
+ if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
+ ops = append(ops, WatchUpdate(true))
+ } else if b.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
+ ops = append(ops, WatchPostUpdate(true))
}
- return nil
- case m := <-w.ctlCh:
- c := m.config
- if m.del {
- host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
- if _, y := w.connMap[host]; !y {
- m.errCh <- fmt.Errorf("bmp server %s doesn't exists", host)
- continue
- }
- conn := w.connMap[host].conn
- delete(w.connMap, host)
- conn.Close()
- } else {
- host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
- if _, y := w.connMap[host]; y {
- m.errCh <- fmt.Errorf("bmp server %s already exists", host)
- continue
+ w := b.s.Watch(ops...)
+ defer w.Stop()
+
+ write := func(msg *bmp.BMPMessage) error {
+ buf, _ := msg.Serialize()
+ _, err := conn.Write(buf)
+ if err != nil {
+ log.Warnf("failed to write to bmp server %s", b.host)
}
- server := &bmpServer{
- host: host,
- typ: c.RouteMonitoringPolicy,
- }
- w.connMap[host] = server
- go w.tryConnect(server)
- }
- m.errCh <- nil
- close(m.errCh)
- case newConn := <-w.newConnCh:
- server, y := w.connMap[newConn.RemoteAddr().String()]
- if !y {
- log.Warnf("Can't find bmp server %s", newConn.RemoteAddr().String())
- break
+ return err
}
- i := bmp.NewBMPInitiation([]bmp.BMPTLV{})
- buf, _ := i.Serialize()
- if _, err := newConn.Write(buf); err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
- go w.tryConnect(server)
- break
- }
- req := &GrpcRequest{
- RequestType: REQ_BMP_NEIGHBORS,
- ResponseCh: make(chan *GrpcResponse, 1),
+
+ if err := write(bmp.NewBMPInitiation([]bmp.BMPTLV{})); err != nil {
+ return false
}
- w.apiCh <- req
- write := func(req *GrpcRequest) error {
- for res := range req.ResponseCh {
- for _, msg := range res.Data.([]*bmp.BMPMessage) {
- buf, _ = msg.Serialize()
- if _, err := newConn.Write(buf); err != nil {
- log.Warnf("failed to write to bmp server %s %s", server.host, err)
- go w.tryConnect(server)
- return err
+
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventUpdateMsg:
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
}
- }
- }
- return nil
- }
- if err := write(req); err != nil {
- break
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
- req = &GrpcRequest{
- RequestType: REQ_BMP_ADJ_IN,
- ResponseCh: make(chan *GrpcResponse, 1),
- }
- w.apiCh <- req
- if err := write(req); err != nil {
- break
- }
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
- req = &GrpcRequest{
- RequestType: REQ_BMP_GLOBAL,
- ResponseCh: make(chan *GrpcResponse, 1),
- }
- w.apiCh <- req
- if err := write(req); err != nil {
- break
- }
- }
- server.conn = newConn
- case ev := <-w.ch:
- switch msg := ev.(type) {
- case *watcherEventUpdateMsg:
- info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
- }
- buf, _ := bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload).Serialize()
- for _, server := range w.connMap {
- if server.conn != nil {
- send := server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY && !msg.postPolicy
- send = send || (server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY && msg.postPolicy)
- if send {
- _, err := server.conn.Write(buf)
- if err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
- }
+ if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.postPolicy, 0, info, msg.timestamp.Unix(), msg.payload)); err != nil {
+ return false
}
- }
- }
- case *watcherEventStateChangedMsg:
- var bmpmsg *bmp.BMPMessage
- info := &table.PeerInfo{
- Address: msg.peerAddress,
- AS: msg.peerAS,
- ID: msg.peerID,
- }
- if msg.state == bgp.BGP_FSM_ESTABLISHED {
- bmpmsg = bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
- } else {
- bmpmsg = bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())
- }
- buf, _ := bmpmsg.Serialize()
- for _, server := range w.connMap {
- if server.conn != nil {
- _, err := server.conn.Write(buf)
- if err != nil {
- log.Warnf("failed to write to bmp server %s", server.host)
+ case *watcherEventStateChangedMsg:
+ info := &table.PeerInfo{
+ Address: msg.peerAddress,
+ AS: msg.peerAS,
+ ID: msg.peerID,
+ }
+ if msg.state == bgp.BGP_FSM_ESTABLISHED {
+ if err := write(bmpPeerUp(msg.localAddress.String(), msg.localPort, msg.peerPort, msg.sentOpen, msg.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ return false
+ }
+ } else {
+ if err := write(bmpPeerDown(bmp.BMP_PEER_DOWN_REASON_UNKNOWN, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, msg.timestamp.Unix())); err != nil {
+ return false
+ }
}
}
+ case <-b.dead:
+ conn.Close()
+ return true
}
- default:
- log.Warnf("unknown watcher event")
- }
- case conn := <-w.endCh:
- host := conn.RemoteAddr().String()
- log.Debugf("bmp connection to %s killed", host)
- if _, y := w.connMap[host]; y {
- w.connMap[host].conn = nil
- go w.tryConnect(w.connMap[host])
}
+ }() {
+ return
}
}
}
+type bmpClient struct {
+ s *BgpServer
+ dead chan struct{}
+ host string
+ typ config.BmpRouteMonitoringPolicyType
+}
+
func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bmp.BMPMessage {
ph := bmp.NewBMPPeerHeader(t, policy, pd, peeri.Address.String(), peeri.AS, peeri.ID.String(), float64(timestamp))
return bmp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv)
@@ -245,62 +148,40 @@ func bmpPeerRoute(t uint8, policy bool, pd uint64, peeri *table.PeerInfo, timest
return m
}
-func (w *bmpWatcher) addServer(c config.BmpServerConfig) error {
- ch := make(chan error)
- w.ctlCh <- &bmpConfig{
- config: c,
- errCh: ch,
+func (b *bmpClientManager) addServer(c *config.BmpServerConfig) error {
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if _, y := b.clientMap[host]; y {
+ return fmt.Errorf("bmp client %s is already configured", host)
}
- return <-ch
+ b.clientMap[host] = &bmpClient{
+ s: b.s,
+ dead: make(chan struct{}),
+ host: host,
+ typ: c.RouteMonitoringPolicy,
+ }
+ go b.clientMap[host].loop()
+ return nil
}
-func (w *bmpWatcher) deleteServer(c config.BmpServerConfig) error {
- ch := make(chan error)
- w.ctlCh <- &bmpConfig{
- config: c,
- del: true,
- errCh: ch,
+func (b *bmpClientManager) deleteServer(c *config.BmpServerConfig) error {
+ host := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
+ if c, y := b.clientMap[host]; !y {
+ return fmt.Errorf("bmp client %s isn't found", host)
+ } else {
+ c.Stop()
+ delete(b.clientMap, host)
}
- return <-ch
+ return nil
}
-func (w *bmpWatcher) watchingEventTypes() []watcherEventType {
- state := false
- pre := false
- post := false
- for _, server := range w.connMap {
- if server.conn != nil {
- state = true
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_POST_POLICY {
- pre = true
- }
- if server.typ != config.BMP_ROUTE_MONITORING_POLICY_TYPE_PRE_POLICY {
- post = true
- }
- }
- }
- types := make([]watcherEventType, 0, 3)
- if state {
- types = append(types, WATCHER_EVENT_STATE_CHANGE)
- }
- if pre {
- types = append(types, WATCHER_EVENT_UPDATE_MSG)
- }
- if post {
- types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
- }
- return types
+type bmpClientManager struct {
+ s *BgpServer
+ clientMap map[string]*bmpClient
}
-func newBmpWatcher(grpcCh chan *GrpcRequest) (*bmpWatcher, error) {
- w := &bmpWatcher{
- ch: make(chan watcherEvent),
- apiCh: grpcCh,
- newConnCh: make(chan *net.TCPConn),
- endCh: make(chan *net.TCPConn),
- connMap: make(map[string]*bmpServer),
- ctlCh: make(chan *bmpConfig),
+func newBmpClientManager(s *BgpServer) *bmpClientManager {
+ return &bmpClientManager{
+ s: s,
+ clientMap: make(map[string]*bmpClient),
}
- w.t.Go(w.loop)
- return w, nil
}