diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/bmp.go | 132 | ||||
-rw-r--r-- | server/fsm.go | 21 | ||||
-rw-r--r-- | server/peer.go | 2 | ||||
-rw-r--r-- | server/server.go | 86 |
4 files changed, 223 insertions, 18 deletions
diff --git a/server/bmp.go b/server/bmp.go new file mode 100644 index 00000000..2a00b73d --- /dev/null +++ b/server/bmp.go @@ -0,0 +1,132 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + log "github.com/Sirupsen/logrus" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" + "github.com/osrg/gobgp/table" + "net" + "strconv" + "time" +) + +type broadcastBMPMsg struct { + ch chan *broadcastBMPMsg + msgList []*bgp.BMPMessage + conn *net.TCPConn + addr string +} + +func (m *broadcastBMPMsg) send() { + m.ch <- m +} + +type bmpConn struct { + conn *net.TCPConn + addr string +} + +type bmpClient struct { + ch chan *broadcastBMPMsg + connCh chan *bmpConn +} + +func newBMPClient(conf config.BmpServers, connCh chan *bmpConn) (*bmpClient, error) { + b := &bmpClient{} + if len(conf.BmpServerList) == 0 { + return b, nil + } + + b.ch = make(chan *broadcastBMPMsg) + b.connCh = connCh + + tryConnect := func(addr string) { + for { + conn, err := net.Dial("tcp", addr) + if err != nil { + time.Sleep(30 * time.Second) + } else { + log.Info("bmp server is connected, ", addr) + connCh <- &bmpConn{ + conn: conn.(*net.TCPConn), + addr: addr, + } + break + } + } + } + + for _, c := range conf.BmpServerList { + b := c.BmpServerConfig + go tryConnect(net.JoinHostPort(b.Address.String(), strconv.Itoa(int(b.Port)))) + } + + go func() { + connMap := make(map[string]*net.TCPConn) + for { + select { + case m := <-b.ch: + if m.conn != nil { + i := bgp.NewBMPInitiation([]bgp.BMPTLV{}) + buf, _ := i.Serialize() + _, err := m.conn.Write(buf) + if err == nil { + connMap[m.addr] = m.conn + } + } + + for addr, conn := range connMap { + if m.conn != nil && m.conn != conn { + continue + } + + for _, msg := range m.msgList { + b, _ := msg.Serialize() + _, err := conn.Write(b) + if err != nil { + delete(connMap, addr) + go tryConnect(addr) + break + } + } + } + } + } + }() + + return b, nil +} + +func (c *bmpClient) send() chan *broadcastBMPMsg { + return c.ch +} + +func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage { + ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp)) + return bgp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv) +} + +func bmpPeerDown(reason uint8, t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage { + ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp)) + return bgp.NewBMPPeerDownNotification(*ph, reason, nil, []byte{}) +} + +func bmpPeerRoute(t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64, u *bgp.BGPMessage) *bgp.BMPMessage { + ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp)) + return bgp.NewBMPRouteMonitoring(*ph, u) +} diff --git a/server/fsm.go b/server/fsm.go index c9950a5d..7ee1d9cc 100644 --- a/server/fsm.go +++ b/server/fsm.go @@ -170,16 +170,25 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) { } } -func (fsm *FSM) LocalAddr() net.IP { - addr := fsm.conn.LocalAddr() +func hostport(addr net.Addr) (string, uint16) { if addr != nil { - host, _, err := net.SplitHostPort(addr.String()) + host, port, err := net.SplitHostPort(addr.String()) if err != nil { - return nil + return "", 0 } - return net.ParseIP(host) + p, _ := strconv.Atoi(port) + return host, uint16(p) } - return nil + return "", 0 +} + +func (fsm *FSM) RemoteHostPort() (string, uint16) { + return hostport(fsm.conn.RemoteAddr()) + +} + +func (fsm *FSM) LocalHostPort() (string, uint16) { + return hostport(fsm.conn.LocalAddr()) } func (fsm *FSM) sendNotificatonFromErrorMsg(conn net.Conn, e *bgp.MessageError) { diff --git a/server/peer.go b/server/peer.go index 45cb1c79..24638130 100644 --- a/server/peer.go +++ b/server/peer.go @@ -44,6 +44,7 @@ type Peer struct { inPolicies []*policy.Policy defaultInPolicy config.DefaultPolicyType isConfederationMember bool + recvOpen *bgp.BGPMessage } func NewPeer(g config.Global, conf config.Neighbor) *Peer { @@ -121,6 +122,7 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) ([]*table.Path, bool, []*b switch m.Header.Type { case bgp.BGP_MSG_OPEN: + peer.recvOpen = m body := m.Body.(*bgp.BGPOpen) peer.peerInfo.ID = m.Body.(*bgp.BGPOpen).ID r := make(map[bgp.RouteFamily]bool) diff --git a/server/server.go b/server/server.go index 1699a5cb..9861ee7b 100644 --- a/server/server.go +++ b/server/server.go @@ -90,6 +90,7 @@ type BgpServer struct { deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor rpkiConfigCh chan config.RpkiServers + bmpConfigCh chan config.BmpServers dumper *dumper GrpcReqCh chan *GrpcRequest @@ -103,6 +104,8 @@ type BgpServer struct { localRibMap map[string]*LocalRib zclient *zebra.Client roaClient *roaClient + bmpClient *bmpClient + bmpConnCh chan *bmpConn } func NewBgpServer(port int) *BgpServer { @@ -112,6 +115,8 @@ func NewBgpServer(port int) *BgpServer { b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) b.rpkiConfigCh = make(chan config.RpkiServers) + b.bmpConfigCh = make(chan config.BmpServers) + b.bmpConnCh = make(chan *bmpConn) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) b.localRibMap = make(map[string]*LocalRib) @@ -252,6 +257,32 @@ func (server *BgpServer) Serve() { select { case c := <-server.rpkiConfigCh: server.roaClient, _ = newROAClient(c) + case c := <-server.bmpConfigCh: + server.bmpClient, _ = newBMPClient(c, server.bmpConnCh) + case c := <-server.bmpConnCh: + bmpMsgList := []*bgp.BMPMessage{} + for _, targetPeer := range server.neighborMap { + pathList := make([]*table.Path, 0) + if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED { + continue + } + for _, rf := range targetPeer.configuredRFlist() { + pathList = append(pathList, targetPeer.adjRib.GetInPathList(rf)...) + } + for _, p := range pathList { + // avoid to merge for timestamp + u := table.CreateUpdateMsgFromPaths([]*table.Path{p}) + bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.peerInfo, p.GetTimestamp().Unix(), u[0])) + } + } + + m := &broadcastBMPMsg{ + ch: server.bmpClient.send(), + conn: c.conn, + addr: c.addr, + msgList: bmpMsgList, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) case rmsg := <-server.roaClient.recieveROA(): server.roaClient.handleRTRMsg(rmsg) case zmsg := <-zapiMsgCh: @@ -733,6 +764,13 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * globalRib := server.localRibMap[GLOBAL_RIB_NAME] if oldState == bgp.BGP_FSM_ESTABLISHED { + if ch := server.bmpClient.send(); ch != nil { + m := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Downtime)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) + } t := time.Now() if t.Sub(time.Unix(peer.conf.Timers.TimersState.Uptime, 0)) < FLOP_THRESHOLD { peer.conf.NeighborState.Flops++ @@ -748,12 +786,22 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * close(peer.outgoing) peer.outgoing = make(chan *bgp.BGPMessage, 128) if nextState == bgp.BGP_FSM_ESTABLISHED { + if ch := server.bmpClient.send(); ch != nil { + laddr, lport := peer.fsm.LocalHostPort() + _, rport := peer.fsm.RemoteHostPort() + m := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Uptime)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, m) + } pathList := make([]*table.Path, 0) if peer.isRouteServerClient() { loc := server.localRibMap[peer.conf.NeighborConfig.NeighborAddress.String()] pathList = applyPolicies(peer, loc, POLICY_DIRECTION_EXPORT, peer.getBests(loc)) } else { - peer.conf.Transport.TransportConfig.LocalAddress = peer.fsm.LocalAddr() + l, _ := peer.fsm.LocalHostPort() + peer.conf.Transport.TransportConfig.LocalAddress = net.ParseIP(l) for _, path := range filterpath(peer, peer.getBests(globalRib)) { p := path.Clone(path.IsWithdraw) p.UpdatePathAttrs(&server.bgpConfig.Global, &peer.conf) @@ -796,18 +844,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan * server.roaClient.validate(pathList) } } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.dumper != nil { - _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] - bm := &broadcastBGPMsg{ - message: m, - peerAS: peer.peerInfo.AS, - localAS: peer.peerInfo.LocalAS, - peerAddress: peer.peerInfo.Address, - localAddress: peer.fsm.LocalAddr(), - fourBytesAs: y, - ch: server.dumper.sendCh(), + if m.Header.Type == bgp.BGP_MSG_UPDATE { + if server.dumper != nil { + _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] + l, _ := peer.fsm.LocalHostPort() + bm := &broadcastBGPMsg{ + message: m, + peerAS: peer.peerInfo.AS, + localAS: peer.peerInfo.LocalAS, + peerAddress: peer.peerInfo.Address, + localAddress: net.ParseIP(l), + fourBytesAs: y, + ch: server.dumper.sendCh(), + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) + } + if ch := server.bmpClient.send(); ch != nil { + bm := &broadcastBMPMsg{ + ch: ch, + msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, time.Now().Unix(), m)}, + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) } - server.broadcastMsgs = append(server.broadcastMsgs, bm) } msgs = append(msgs, server.propagateUpdate(peer, pathList)...) default: @@ -829,6 +887,10 @@ func (server *BgpServer) SetRpkiConfig(c config.RpkiServers) { server.rpkiConfigCh <- c } +func (server *BgpServer) SetBmpConfig(c config.BmpServers) { + server.bmpConfigCh <- c +} + func (server *BgpServer) PeerAdd(peer config.Neighbor) { server.addedPeerCh <- peer } |