summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-09-13 22:39:21 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-09-13 22:39:21 +0900
commit5be31cce0de1797664d4125957d157991a6fbd4a (patch)
tree5a54a2f12184ea6d7bface519707bf878df859f9 /server
parentc717f53456e0c1ac256377bfdd933431cc6a66bd (diff)
Add bmp support
Can be enabled like: [Global] [Global.GlobalConfig] As = 64512 RouterId = "10.0.255.254" [BmpServers] [[BmpServers.BmpServerList]] [BmpServers.BmpServerList.BmpServerConfig] Address = "127.0.0.1" Port=11019 Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/bmp.go132
-rw-r--r--server/fsm.go21
-rw-r--r--server/peer.go2
-rw-r--r--server/server.go86
4 files changed, 223 insertions, 18 deletions
diff --git a/server/bmp.go b/server/bmp.go
new file mode 100644
index 00000000..2a00b73d
--- /dev/null
+++ b/server/bmp.go
@@ -0,0 +1,132 @@
+// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ log "github.com/Sirupsen/logrus"
+ "github.com/osrg/gobgp/config"
+ "github.com/osrg/gobgp/packet"
+ "github.com/osrg/gobgp/table"
+ "net"
+ "strconv"
+ "time"
+)
+
+type broadcastBMPMsg struct {
+ ch chan *broadcastBMPMsg
+ msgList []*bgp.BMPMessage
+ conn *net.TCPConn
+ addr string
+}
+
+func (m *broadcastBMPMsg) send() {
+ m.ch <- m
+}
+
+type bmpConn struct {
+ conn *net.TCPConn
+ addr string
+}
+
+type bmpClient struct {
+ ch chan *broadcastBMPMsg
+ connCh chan *bmpConn
+}
+
+func newBMPClient(conf config.BmpServers, connCh chan *bmpConn) (*bmpClient, error) {
+ b := &bmpClient{}
+ if len(conf.BmpServerList) == 0 {
+ return b, nil
+ }
+
+ b.ch = make(chan *broadcastBMPMsg)
+ b.connCh = connCh
+
+ tryConnect := func(addr string) {
+ for {
+ conn, err := net.Dial("tcp", addr)
+ if err != nil {
+ time.Sleep(30 * time.Second)
+ } else {
+ log.Info("bmp server is connected, ", addr)
+ connCh <- &bmpConn{
+ conn: conn.(*net.TCPConn),
+ addr: addr,
+ }
+ break
+ }
+ }
+ }
+
+ for _, c := range conf.BmpServerList {
+ b := c.BmpServerConfig
+ go tryConnect(net.JoinHostPort(b.Address.String(), strconv.Itoa(int(b.Port))))
+ }
+
+ go func() {
+ connMap := make(map[string]*net.TCPConn)
+ for {
+ select {
+ case m := <-b.ch:
+ if m.conn != nil {
+ i := bgp.NewBMPInitiation([]bgp.BMPTLV{})
+ buf, _ := i.Serialize()
+ _, err := m.conn.Write(buf)
+ if err == nil {
+ connMap[m.addr] = m.conn
+ }
+ }
+
+ for addr, conn := range connMap {
+ if m.conn != nil && m.conn != conn {
+ continue
+ }
+
+ for _, msg := range m.msgList {
+ b, _ := msg.Serialize()
+ _, err := conn.Write(b)
+ if err != nil {
+ delete(connMap, addr)
+ go tryConnect(addr)
+ break
+ }
+ }
+ }
+ }
+ }
+ }()
+
+ return b, nil
+}
+
+func (c *bmpClient) send() chan *broadcastBMPMsg {
+ return c.ch
+}
+
+func bmpPeerUp(laddr string, lport, rport uint16, sent, recv *bgp.BGPMessage, t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage {
+ ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp))
+ return bgp.NewBMPPeerUpNotification(*ph, laddr, lport, rport, sent, recv)
+}
+
+func bmpPeerDown(reason uint8, t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64) *bgp.BMPMessage {
+ ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp))
+ return bgp.NewBMPPeerDownNotification(*ph, reason, nil, []byte{})
+}
+
+func bmpPeerRoute(t int, policy bool, pd uint64, peeri *table.PeerInfo, timestamp int64, u *bgp.BGPMessage) *bgp.BMPMessage {
+ ph := bgp.NewBMPPeerHeader(uint8(t), policy, pd, peeri.Address.String(), peeri.AS, peeri.LocalID.String(), float64(timestamp))
+ return bgp.NewBMPRouteMonitoring(*ph, u)
+}
diff --git a/server/fsm.go b/server/fsm.go
index c9950a5d..7ee1d9cc 100644
--- a/server/fsm.go
+++ b/server/fsm.go
@@ -170,16 +170,25 @@ func (fsm *FSM) StateChange(nextState bgp.FSMState) {
}
}
-func (fsm *FSM) LocalAddr() net.IP {
- addr := fsm.conn.LocalAddr()
+func hostport(addr net.Addr) (string, uint16) {
if addr != nil {
- host, _, err := net.SplitHostPort(addr.String())
+ host, port, err := net.SplitHostPort(addr.String())
if err != nil {
- return nil
+ return "", 0
}
- return net.ParseIP(host)
+ p, _ := strconv.Atoi(port)
+ return host, uint16(p)
}
- return nil
+ return "", 0
+}
+
+func (fsm *FSM) RemoteHostPort() (string, uint16) {
+ return hostport(fsm.conn.RemoteAddr())
+
+}
+
+func (fsm *FSM) LocalHostPort() (string, uint16) {
+ return hostport(fsm.conn.LocalAddr())
}
func (fsm *FSM) sendNotificatonFromErrorMsg(conn net.Conn, e *bgp.MessageError) {
diff --git a/server/peer.go b/server/peer.go
index 45cb1c79..24638130 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -44,6 +44,7 @@ type Peer struct {
inPolicies []*policy.Policy
defaultInPolicy config.DefaultPolicyType
isConfederationMember bool
+ recvOpen *bgp.BGPMessage
}
func NewPeer(g config.Global, conf config.Neighbor) *Peer {
@@ -121,6 +122,7 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) ([]*table.Path, bool, []*b
switch m.Header.Type {
case bgp.BGP_MSG_OPEN:
+ peer.recvOpen = m
body := m.Body.(*bgp.BGPOpen)
peer.peerInfo.ID = m.Body.(*bgp.BGPOpen).ID
r := make(map[bgp.RouteFamily]bool)
diff --git a/server/server.go b/server/server.go
index 1699a5cb..9861ee7b 100644
--- a/server/server.go
+++ b/server/server.go
@@ -90,6 +90,7 @@ type BgpServer struct {
deletedPeerCh chan config.Neighbor
updatedPeerCh chan config.Neighbor
rpkiConfigCh chan config.RpkiServers
+ bmpConfigCh chan config.BmpServers
dumper *dumper
GrpcReqCh chan *GrpcRequest
@@ -103,6 +104,8 @@ type BgpServer struct {
localRibMap map[string]*LocalRib
zclient *zebra.Client
roaClient *roaClient
+ bmpClient *bmpClient
+ bmpConnCh chan *bmpConn
}
func NewBgpServer(port int) *BgpServer {
@@ -112,6 +115,8 @@ func NewBgpServer(port int) *BgpServer {
b.deletedPeerCh = make(chan config.Neighbor)
b.updatedPeerCh = make(chan config.Neighbor)
b.rpkiConfigCh = make(chan config.RpkiServers)
+ b.bmpConfigCh = make(chan config.BmpServers)
+ b.bmpConnCh = make(chan *bmpConn)
b.GrpcReqCh = make(chan *GrpcRequest, 1)
b.policyUpdateCh = make(chan config.RoutingPolicy)
b.localRibMap = make(map[string]*LocalRib)
@@ -252,6 +257,32 @@ func (server *BgpServer) Serve() {
select {
case c := <-server.rpkiConfigCh:
server.roaClient, _ = newROAClient(c)
+ case c := <-server.bmpConfigCh:
+ server.bmpClient, _ = newBMPClient(c, server.bmpConnCh)
+ case c := <-server.bmpConnCh:
+ bmpMsgList := []*bgp.BMPMessage{}
+ for _, targetPeer := range server.neighborMap {
+ pathList := make([]*table.Path, 0)
+ if targetPeer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ for _, rf := range targetPeer.configuredRFlist() {
+ pathList = append(pathList, targetPeer.adjRib.GetInPathList(rf)...)
+ }
+ for _, p := range pathList {
+ // avoid to merge for timestamp
+ u := table.CreateUpdateMsgFromPaths([]*table.Path{p})
+ bmpMsgList = append(bmpMsgList, bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, targetPeer.peerInfo, p.GetTimestamp().Unix(), u[0]))
+ }
+ }
+
+ m := &broadcastBMPMsg{
+ ch: server.bmpClient.send(),
+ conn: c.conn,
+ addr: c.addr,
+ msgList: bmpMsgList,
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
case rmsg := <-server.roaClient.recieveROA():
server.roaClient.handleRTRMsg(rmsg)
case zmsg := <-zapiMsgCh:
@@ -733,6 +764,13 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan *
globalRib := server.localRibMap[GLOBAL_RIB_NAME]
if oldState == bgp.BGP_FSM_ESTABLISHED {
+ if ch := server.bmpClient.send(); ch != nil {
+ m := &broadcastBMPMsg{
+ ch: ch,
+ msgList: []*bgp.BMPMessage{bmpPeerDown(bgp.BMP_PEER_DOWN_REASON_UNKNOWN, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Downtime)},
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
+ }
t := time.Now()
if t.Sub(time.Unix(peer.conf.Timers.TimersState.Uptime, 0)) < FLOP_THRESHOLD {
peer.conf.NeighborState.Flops++
@@ -748,12 +786,22 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan *
close(peer.outgoing)
peer.outgoing = make(chan *bgp.BGPMessage, 128)
if nextState == bgp.BGP_FSM_ESTABLISHED {
+ if ch := server.bmpClient.send(); ch != nil {
+ laddr, lport := peer.fsm.LocalHostPort()
+ _, rport := peer.fsm.RemoteHostPort()
+ m := &broadcastBMPMsg{
+ ch: ch,
+ msgList: []*bgp.BMPMessage{bmpPeerUp(laddr, lport, rport, buildopen(peer.fsm.gConf, peer.fsm.pConf), peer.recvOpen, bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, peer.conf.Timers.TimersState.Uptime)},
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, m)
+ }
pathList := make([]*table.Path, 0)
if peer.isRouteServerClient() {
loc := server.localRibMap[peer.conf.NeighborConfig.NeighborAddress.String()]
pathList = applyPolicies(peer, loc, POLICY_DIRECTION_EXPORT, peer.getBests(loc))
} else {
- peer.conf.Transport.TransportConfig.LocalAddress = peer.fsm.LocalAddr()
+ l, _ := peer.fsm.LocalHostPort()
+ peer.conf.Transport.TransportConfig.LocalAddress = net.ParseIP(l)
for _, path := range filterpath(peer, peer.getBests(globalRib)) {
p := path.Clone(path.IsWithdraw)
p.UpdatePathAttrs(&server.bgpConfig.Global, &peer.conf)
@@ -796,18 +844,28 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *fsmMsg, incoming chan *
server.roaClient.validate(pathList)
}
}
- if m.Header.Type == bgp.BGP_MSG_UPDATE && server.dumper != nil {
- _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
- bm := &broadcastBGPMsg{
- message: m,
- peerAS: peer.peerInfo.AS,
- localAS: peer.peerInfo.LocalAS,
- peerAddress: peer.peerInfo.Address,
- localAddress: peer.fsm.LocalAddr(),
- fourBytesAs: y,
- ch: server.dumper.sendCh(),
+ if m.Header.Type == bgp.BGP_MSG_UPDATE {
+ if server.dumper != nil {
+ _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ bm := &broadcastBGPMsg{
+ message: m,
+ peerAS: peer.peerInfo.AS,
+ localAS: peer.peerInfo.LocalAS,
+ peerAddress: peer.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ fourBytesAs: y,
+ ch: server.dumper.sendCh(),
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ }
+ if ch := server.bmpClient.send(); ch != nil {
+ bm := &broadcastBMPMsg{
+ ch: ch,
+ msgList: []*bgp.BMPMessage{bmpPeerRoute(bgp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.peerInfo, time.Now().Unix(), m)},
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, bm)
}
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
}
msgs = append(msgs, server.propagateUpdate(peer, pathList)...)
default:
@@ -829,6 +887,10 @@ func (server *BgpServer) SetRpkiConfig(c config.RpkiServers) {
server.rpkiConfigCh <- c
}
+func (server *BgpServer) SetBmpConfig(c config.BmpServers) {
+ server.bmpConfigCh <- c
+}
+
func (server *BgpServer) PeerAdd(peer config.Neighbor) {
server.addedPeerCh <- peer
}