summaryrefslogtreecommitdiffhomepage
path: root/server/server.go
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 22:22:08 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:28:38 +0900
commitf097981a33d107a6d31c4da0f25b90957f3d25fe (patch)
tree25ebf98379bb5eec7131500bb31221698107a04a /server/server.go
parent3343bcec3565da1aa40fa1aa1ecc6feee7e902f6 (diff)
bmp uses the new Watch API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r--server/server.go259
1 files changed, 116 insertions, 143 deletions
diff --git a/server/server.go b/server/server.go
index 4cd71ad6..0aeead21 100644
--- a/server/server.go
+++ b/server/server.go
@@ -30,7 +30,6 @@ import (
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/config"
"github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/packet/bmp"
"github.com/osrg/gobgp/table"
"github.com/satori/go.uuid"
)
@@ -107,11 +106,12 @@ type BgpServer struct {
watchers *watcherManager
watcherMap map[watchType][]*Watcher
zclient *zebraClient
+ bmpManager *bmpClientManager
}
func NewBgpServer() *BgpServer {
roaManager, _ := NewROAManager(0)
- return &BgpServer{
+ s := &BgpServer{
GrpcReqCh: make(chan *GrpcRequest, 1),
neighborMap: make(map[string]*Peer),
policy: table.NewRoutingPolicy(),
@@ -120,6 +120,8 @@ func NewBgpServer() *BgpServer {
mgmtCh: make(chan func(), 1),
watcherMap: make(map[watchType][]*Watcher),
}
+ s.bmpManager = newBmpClientManager(s)
+ return s
}
func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
@@ -383,27 +385,31 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
}
}
+func createWatcherEventStateChange(peer *Peer) *watcherEventStateChangedMsg {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ return &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: peer.fsm.state,
+ adminState: peer.fsm.adminState,
+ timestamp: time.Now(),
+ }
+}
+
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
- _, rport := peer.fsm.RemoteHostPort()
- laddr, lport := peer.fsm.LocalHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- recvOpen := peer.fsm.recvOpen
- ev := &watcherEventStateChangedMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(laddr),
- peerPort: rport,
- localPort: lport,
- peerID: peer.fsm.peerInfo.ID,
- sentOpen: sentOpen,
- recvOpen: recvOpen,
- state: newState,
- adminState: peer.fsm.adminState,
- timestamp: time.Now(),
- }
+ ev := createWatcherEventStateChange(peer)
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
@@ -892,19 +898,28 @@ func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error {
return nil
}
-func (server *BgpServer) SetBmpConfig(c []config.BmpServer) error {
- for _, s := range c {
- ch := make(chan *GrpcResponse)
- server.GrpcReqCh <- &GrpcRequest{
- RequestType: REQ_ADD_BMP,
- Data: &s.Config,
- ResponseCh: ch,
- }
- if err := (<-ch).Err(); err != nil {
- return err
- }
+func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ err = s.bmpManager.addServer(c)
}
- return nil
+ return err
+}
+
+func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ err = s.bmpManager.deleteServer(c)
+ }
+ return err
}
func (server *BgpServer) SetMrtConfig(c []config.Mrt) error {
@@ -1512,37 +1527,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
Data: dsts,
}
close(grpcReq.ResponseCh)
- case REQ_BMP_GLOBAL:
- paths := server.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, server.globalRib.GetRFlist())
- bmpmsgs := make([]*bmp.BMPMessage, 0, len(paths))
- for _, path := range paths {
- msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
- buf, _ := msgs[0].Serialize()
- bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, true, 0, path.GetSource(), path.GetTimestamp().Unix(), buf))
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: bmpmsgs,
- }
- close(grpcReq.ResponseCh)
- case REQ_BMP_NEIGHBORS:
- //TODO: merge REQ_NEIGHBORS and REQ_BMP_NEIGHBORS
- msgs := make([]*bmp.BMPMessage, 0, len(server.neighborMap))
- for _, peer := range server.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- laddr, lport := peer.fsm.LocalHostPort()
- _, rport := peer.fsm.RemoteHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- info := peer.fsm.peerInfo
- timestamp := peer.fsm.pConf.Timers.State.Uptime
- msg := bmpPeerUp(laddr, lport, rport, sentOpen, peer.fsm.recvOpen, bmp.BMP_PEER_TYPE_GLOBAL, false, 0, info, timestamp)
- msgs = append(msgs, msg)
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: msgs,
- }
- close(grpcReq.ResponseCh)
case REQ_NEIGHBOR:
l := make([]*config.Neighbor, 0)
for _, peer := range server.neighborMap {
@@ -1613,22 +1597,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
Data: paths,
}
close(grpcReq.ResponseCh)
- case REQ_BMP_ADJ_IN:
- bmpmsgs := make([]*bmp.BMPMessage, 0)
- for _, peer := range server.neighborMap {
- if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
- continue
- }
- for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
- msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
- buf, _ := msgs[0].Serialize()
- bmpmsgs = append(bmpmsgs, bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, false, 0, peer.fsm.peerInfo, path.GetTimestamp().Unix(), buf))
- }
- }
- grpcReq.ResponseCh <- &GrpcResponse{
- Data: bmpmsgs,
- }
- close(grpcReq.ResponseCh)
case REQ_NEIGHBOR_SHUTDOWN:
peers, err := reqToPeers(grpcReq)
if err != nil {
@@ -1806,10 +1774,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
server.handleDisableMrtRequest(grpcReq)
- case REQ_ADD_BMP:
- server.handleAddBmp(grpcReq)
- case REQ_DELETE_BMP:
- server.handleDeleteBmp(grpcReq)
case REQ_VALIDATE_RIB:
server.handleValidateRib(grpcReq)
case REQ_INITIALIZE_RPKI:
@@ -2686,57 +2650,6 @@ func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) {
close(grpcReq.ResponseCh)
}
-func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) {
- var c *config.BmpServerConfig
- switch arg := grpcReq.Data.(type) {
- case *api.AddBmpRequest:
- c = &config.BmpServerConfig{
- Address: arg.Address,
- Port: arg.Port,
- RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type),
- }
- case *config.BmpServerConfig:
- c = arg
- }
-
- w, y := server.watchers.watcher(WATCHER_BMP)
- if !y {
- w, _ = newBmpWatcher(server.GrpcReqCh)
- server.watchers.addWatcher(WATCHER_BMP, w)
- }
-
- err := w.(*bmpWatcher).addServer(*c)
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- Data: &api.AddBmpResponse{},
- }
- close(grpcReq.ResponseCh)
-}
-
-func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) {
- var c *config.BmpServerConfig
- switch arg := grpcReq.Data.(type) {
- case *api.DeleteBmpRequest:
- c = &config.BmpServerConfig{
- Address: arg.Address,
- Port: arg.Port,
- }
- case *config.BmpServerConfig:
- c = arg
- }
-
- if w, y := server.watchers.watcher(WATCHER_BMP); y {
- err := w.(*bmpWatcher).deleteServer(*c)
- grpcReq.ResponseCh <- &GrpcResponse{
- ResponseErr: err,
- Data: &api.DeleteBmpResponse{},
- }
- close(grpcReq.ResponseCh)
- } else {
- grpcDone(grpcReq, fmt.Errorf("bmp not configured"))
- }
-}
-
func (server *BgpServer) handleValidateRib(grpcReq *GrpcRequest) {
arg := grpcReq.Data.(*api.ValidateRibRequest)
for _, rf := range server.globalRib.GetRFlist() {
@@ -2796,10 +2709,13 @@ const (
)
type watchOptions struct {
- bestpath bool
- preUpdate bool
- postUpdate bool
- peerState bool
+ bestpath bool
+ preUpdate bool
+ postUpdate bool
+ peerState bool
+ initUpdate bool
+ initPostUpdate bool
+ initPeerState bool
}
type WatchOption func(*watchOptions)
@@ -2810,21 +2726,30 @@ func WatchBestPath() WatchOption {
}
}
-func WatchUpdate() WatchOption {
+func WatchUpdate(current bool) WatchOption {
return func(o *watchOptions) {
o.preUpdate = true
+ if current {
+ o.initUpdate = true
+ }
}
}
-func WatchPostUpdate() WatchOption {
+func WatchPostUpdate(current bool) WatchOption {
return func(o *watchOptions) {
o.postUpdate = true
+ if current {
+ o.initPostUpdate = true
+ }
}
}
-func WatchPeerState() WatchOption {
+func WatchPeerState(current bool) WatchOption {
return func(o *watchOptions) {
o.peerState = true
+ if current {
+ o.initPeerState = true
+ }
}
}
@@ -2859,10 +2784,8 @@ func (w *Watcher) loop() {
func (w *Watcher) Stop() {
ch := make(chan struct{})
defer func() { <-ch }()
-
w.s.mgmtCh <- func() {
defer close(ch)
-
for k, l := range w.s.watcherMap {
for i, v := range l {
if w == v {
@@ -2919,6 +2842,56 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
if w.opts.peerState {
register(WATCH_TYPE_PEER_STATE, w)
}
+ if w.opts.initPeerState {
+ for _, peer := range s.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ w.notify(createWatcherEventStateChange(peer))
+ }
+ }
+ if w.opts.initUpdate {
+ for _, peer := range s.neighborMap {
+ if peer.fsm.state != bgp.BGP_FSM_ESTABLISHED {
+ continue
+ }
+ for _, path := range peer.adjRibIn.PathList(peer.configuredRFlist(), false) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
+ l, _ := peer.fsm.LocalHostPort()
+ w.notify(&watcherEventUpdateMsg{
+ message: msgs[0],
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(l),
+ peerID: peer.fsm.peerInfo.ID,
+ fourBytesAs: y,
+ timestamp: path.GetTimestamp(),
+ payload: buf,
+ postPolicy: false,
+ })
+ }
+ }
+ }
+ if w.opts.postUpdate {
+ for _, path := range s.globalRib.GetBestPathList(table.GLOBAL_RIB_NAME, s.globalRib.GetRFlist()) {
+ msgs := table.CreateUpdateMsgFromPaths([]*table.Path{path})
+ buf, _ := msgs[0].Serialize()
+ w.notify(&watcherEventUpdateMsg{
+ peerAS: path.GetSource().AS,
+ peerAddress: path.GetSource().Address,
+ peerID: path.GetSource().ID,
+ message: msgs[0],
+ timestamp: path.GetTimestamp(),
+ payload: buf,
+ postPolicy: true,
+ })
+ }
+
+ }
+
go w.loop()
}
return w