diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2016-05-24 05:47:52 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-06-06 12:43:20 +0900 |
commit | aca6fd6ad4409b4cb63682bff3c79fca8ca2800d (patch) | |
tree | eb91718c87ddcdaa0d2133f3aaccfee6dbe7f7a8 /server | |
parent | 10746e5f4b303aba553c2bb759afe3a8d4ffe3aa (diff) |
server: refactoring for monitorbestchanged api. use watcher infra
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 24 | ||||
-rw-r--r-- | server/monitor.go | 124 | ||||
-rw-r--r-- | server/server.go | 54 | ||||
-rw-r--r-- | server/watcher.go | 3 | ||||
-rw-r--r-- | server/zclient.go | 2 | ||||
-rw-r--r-- | server/zclient_test.go | 85 |
6 files changed, 141 insertions, 151 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index cbe642a2..5963b780 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -51,8 +51,7 @@ const ( REQ_GRPC_DELETE_NEIGHBOR REQ_UPDATE_NEIGHBOR REQ_GLOBAL_RIB - REQ_MONITOR_GLOBAL_BEST_CHANGED - REQ_MONITOR_INCOMING + REQ_MONITOR_RIB REQ_MONITOR_NEIGHBOR_PEER_STATE REQ_ENABLE_MRT REQ_DISABLE_MRT @@ -189,31 +188,14 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi return d.(*api.GetRibResponse), nil } -func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.GobgpApi_MonitorBestChangedServer) error { - var reqType int - switch arg.Resource { - case api.Resource_GLOBAL: - reqType = REQ_MONITOR_GLOBAL_BEST_CHANGED - default: - return fmt.Errorf("unsupported resource type: %v", arg.Resource) - } - - req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Family), nil) - s.bgpServerCh <- req - - return handleMultipleResponses(req, func(res *GrpcResponse) error { - return stream.Send(res.Data.(*api.Destination)) - }) -} - func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error { switch arg.Type { - case api.Resource_ADJ_IN: + case api.Resource_ADJ_IN, api.Resource_GLOBAL: default: return fmt.Errorf("unsupported resource type: %v", arg.Type) } - req := NewGrpcRequest(REQ_MONITOR_INCOMING, arg.Name, bgp.RouteFamily(arg.Family), arg) + req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg) s.bgpServerCh <- req return handleMultipleResponses(req, func(res *GrpcResponse) error { return stream.Send(res.Data.(*api.Destination)) diff --git a/server/monitor.go b/server/monitor.go index ac9c775b..c7236fed 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -22,99 +22,125 @@ import ( "gopkg.in/tomb.v2" ) -type grpcIncomingWatcher struct { +type grpcWatcher struct { t tomb.Tomb ch chan watcherEvent ctlCh chan *GrpcRequest - reqs []*GrpcRequest + reqs map[watcherEventType][]*GrpcRequest } -func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { +func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { return w.ch } return nil } -func (w *grpcIncomingWatcher) stop() { +func (w *grpcWatcher) stop() { w.t.Kill(nil) } -func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType { - pre := false - post := false - for _, req := range w.reqs { - if req.Data.(*api.Table).PostPolicy { - post = true - } else { - pre = true +func (w *grpcWatcher) watchingEventTypes() []watcherEventType { + types := make([]watcherEventType, 0, 3) + for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} { + if len(w.reqs[t]) > 0 { + types = append(types, t) } } - types := make([]watcherEventType, 0, 2) - if pre { - types = append(types, WATCHER_EVENT_UPDATE_MSG) - } - if post { - types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) - } return types } -func (w *grpcIncomingWatcher) loop() error { +func (w *grpcWatcher) loop() error { for { select { case <-w.t.Dying(): - for _, req := range w.reqs { - close(req.ResponseCh) + for _, rs := range w.reqs { + for _, req := range rs { + close(req.ResponseCh) + } } return nil case req := <-w.ctlCh: - w.reqs = append(w.reqs, req) - case ev := <-w.ch: - msg := ev.(*watcherEventUpdateMsg) - for _, path := range msg.pathList { - remains := make([]*GrpcRequest, 0, len(w.reqs)) - result := &GrpcResponse{ - Data: &api.Destination{ - Prefix: path.GetNlri().String(), - Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, - }, + tbl := req.Data.(*api.Table) + var reqType watcherEventType + switch tbl.Type { + case api.Resource_GLOBAL: + reqType = WATCHER_EVENT_BESTPATH_CHANGE + case api.Resource_ADJ_IN: + if tbl.PostPolicy { + reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG + } else { + reqType = WATCHER_EVENT_UPDATE_MSG } - for _, req := range w.reqs { - select { - case <-req.EndCh: + default: + continue + } + reqs := w.reqs[reqType] + if reqs == nil { + reqs = make([]*GrpcRequest, 0, 16) + } + reqs = append(reqs, req) + w.reqs[reqType] = reqs + case ev := <-w.ch: + sendPaths := func(reqType watcherEventType, paths []*table.Path) { + for _, path := range paths { + if path == nil { continue - default: } - remains = append(remains, req) - if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() { - continue + remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) + result := &GrpcResponse{ + Data: &api.Destination{ + Prefix: path.GetNlri().String(), + Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, + }, } - if req.Name != "" && req.Name != path.GetSource().Address.String() { - continue + for _, req := range w.reqs[reqType] { + select { + case <-req.EndCh: + continue + default: + } + remains = append(remains, req) + if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() { + continue + } + if req.Name != "" && req.Name != path.GetSource().Address.String() { + continue + } + req.ResponseCh <- result } - req.ResponseCh <- result + w.reqs[reqType] = remains } - w.reqs = remains } + switch msg := ev.(type) { + case *watcherEventBestPathMsg: + sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList) + case *watcherEventUpdateMsg: + if msg.postPolicy { + sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList) + } else { + sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList) + } + } + } } } -func (w *grpcIncomingWatcher) restart(string) error { +func (w *grpcWatcher) restart(string) error { return nil } -func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error { +func (w *grpcWatcher) addRequest(req *GrpcRequest) error { w.ctlCh <- req return nil } -func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) { - w := &grpcIncomingWatcher{ +func newGrpcWatcher() (*grpcWatcher, error) { + w := &grpcWatcher{ ch: make(chan watcherEvent), ctlCh: make(chan *GrpcRequest), - reqs: make([]*GrpcRequest, 0, 16), + reqs: make(map[watcherEventType][]*GrpcRequest), } w.t.Go(w.loop) return w, nil diff --git a/server/server.go b/server/server.go index faf4a06c..7bd29048 100644 --- a/server/server.go +++ b/server/server.go @@ -195,8 +195,8 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { } func (server *BgpServer) Serve() { - w, _ := newGrpcIncomingWatcher() - server.watchers[WATCHER_GRPC_INCOMING] = w + w, _ := newGrpcWatcher() + server.watchers[WATCHER_GRPC_MONITOR] = w senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { @@ -478,7 +478,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf) if !peer.isRouteServerClient() { - server.broadcastBests(best[table.GLOBAL_RIB_NAME]) + server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -493,44 +493,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil return msgs } -func (server *BgpServer) broadcastBests(bests []*table.Path) { - server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: bests}) - for _, path := range bests { - if path == nil { - continue - } - rf := path.GetRouteFamily() - - result := &GrpcResponse{ - Data: &api.Destination{ - Prefix: path.GetNlri().String(), - Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, - }, - } - remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) - for _, req := range server.broadcastReqs { - select { - case <-req.EndCh: - continue - default: - } - if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED { - remainReqs = append(remainReqs, req) - continue - } - if req.RouteFamily == bgp.RouteFamily(0) || req.RouteFamily == rf { - m := &broadcastGrpcMsg{ - req: req, - result: result, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - } - remainReqs = append(remainReqs, req) - } - server.broadcastReqs = remainReqs - } -} - func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { result := &GrpcResponse{ Data: peer.ToApiStruct(), @@ -688,7 +650,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] if len(best[table.GLOBAL_RIB_NAME]) == 0 { return nil, alteredPathList } - server.broadcastBests(best[table.GLOBAL_RIB_NAME]) + server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -2291,16 +2253,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: data, } close(grpcReq.ResponseCh) - case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE: + case REQ_MONITOR_NEIGHBOR_PEER_STATE: server.broadcastReqs = append(server.broadcastReqs, grpcReq) - case REQ_MONITOR_INCOMING: + case REQ_MONITOR_RIB: if grpcReq.Name != "" { if _, err = server.checkNeighborRequest(grpcReq); err != nil { break } } - w := server.watchers[WATCHER_GRPC_INCOMING] - go w.(*grpcIncomingWatcher).addRequest(grpcReq) + w := server.watchers[WATCHER_GRPC_MONITOR] + go w.(*grpcWatcher).addRequest(grpcReq) case REQ_ENABLE_MRT: server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: diff --git a/server/watcher.go b/server/watcher.go index 8df5edbb..8c92fab3 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -44,8 +44,7 @@ const ( WATCHER_BMP WATCHER_ZEBRA WATCHER_COLLECTOR - WATCHER_GRPC_BESTPATH - WATCHER_GRPC_INCOMING + WATCHER_GRPC_MONITOR ) type watcherEventType uint8 diff --git a/server/zclient.go b/server/zclient.go index 87e303d4..1b37e5db 100644 --- a/server/zclient.go +++ b/server/zclient.go @@ -29,7 +29,7 @@ import ( ) func newIPRouteMessage(path *table.Path) *zebra.Message { - if path.IsFromExternal() { + if path == nil || path.IsFromExternal() { return nil } l := strings.SplitN(path.GetNlri().String(), "/", 2) diff --git a/server/zclient_test.go b/server/zclient_test.go index a09b59e6..8067c53f 100644 --- a/server/zclient_test.go +++ b/server/zclient_test.go @@ -16,14 +16,16 @@ package server import ( + "github.com/osrg/gobgp/gobgp/cmd" "github.com/osrg/gobgp/table" "github.com/osrg/gobgp/zebra" "github.com/stretchr/testify/assert" "net" "testing" + "time" ) -func Test_createPathFromIPRouteMessage(t *testing.T) { +func Test_createRequestFromIPRouteMessage(t *testing.T) { assert := assert.New(t) m := &zebra.Message{} @@ -51,28 +53,36 @@ func Test_createPathFromIPRouteMessage(t *testing.T) { m.Header = *h m.Body = b - pi := &table.PeerInfo{ - AS: 65000, - LocalID: net.ParseIP("10.0.0.1"), - } - p := createPathFromIPRouteMessage(m, pi) - assert.NotEqual(nil, p) - assert.Equal("0.0.0.0", p.GetNexthop().String()) - assert.Equal("192.168.100.0/24", p.GetNlri().String()) - assert.True(p.IsFromExternal()) - assert.False(p.IsWithdraw) + p := createRequestFromIPRouteMessage(m) + assert.NotNil(p) + paths, err := cmd.ApiStruct2Path(p.Path) + assert.Nil(err) + assert.Equal(len(paths), 1) + path := paths[0] + pp := table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false) + pp.SetIsFromExternal(p.Path.IsFromExternal) + assert.Equal("0.0.0.0", pp.GetNexthop().String()) + assert.Equal("192.168.100.0/24", pp.GetNlri().String()) + assert.True(pp.IsFromExternal()) + assert.False(pp.IsWithdraw) // withdraw h.Command = zebra.IPV4_ROUTE_DELETE m.Header = *h - p = createPathFromIPRouteMessage(m, pi) - assert.NotEqual(nil, p) - assert.Equal("0.0.0.0", p.GetNexthop().String()) - assert.Equal("192.168.100.0/24", p.GetNlri().String()) - med, _ := p.GetMed() + p = createRequestFromIPRouteMessage(m) + assert.NotNil(p) + paths, err = cmd.ApiStruct2Path(p.Path) + assert.Nil(err) + assert.Equal(len(paths), 1) + path = paths[0] + pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false) + pp.SetIsFromExternal(p.Path.IsFromExternal) + assert.Equal("0.0.0.0", pp.GetNexthop().String()) + assert.Equal("192.168.100.0/24", pp.GetNlri().String()) + med, _ := pp.GetMed() assert.Equal(uint32(100), med) - assert.True(p.IsFromExternal()) - assert.True(p.IsWithdraw) + assert.True(pp.IsFromExternal()) + assert.True(pp.IsWithdraw) // IPv6 h.Command = zebra.IPV6_ROUTE_ADD @@ -82,23 +92,34 @@ func Test_createPathFromIPRouteMessage(t *testing.T) { m.Header = *h m.Body = b - p = createPathFromIPRouteMessage(m, pi) - assert.NotEqual(nil, p) - assert.Equal("::", p.GetNexthop().String()) - assert.Equal("2001:db8:0:f101::/64", p.GetNlri().String()) - med, _ = p.GetMed() + p = createRequestFromIPRouteMessage(m) + assert.NotNil(p) + paths, err = cmd.ApiStruct2Path(p.Path) + assert.Nil(err) + assert.Equal(len(paths), 1) + path = paths[0] + pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false) + pp.SetIsFromExternal(p.Path.IsFromExternal) + assert.Equal("::", pp.GetNexthop().String()) + assert.Equal("2001:db8:0:f101::/64", pp.GetNlri().String()) + med, _ = pp.GetMed() assert.Equal(uint32(100), med) - assert.True(p.IsFromExternal()) - assert.False(p.IsWithdraw) + assert.True(pp.IsFromExternal()) + assert.False(pp.IsWithdraw) // withdraw h.Command = zebra.IPV6_ROUTE_DELETE m.Header = *h - p = createPathFromIPRouteMessage(m, pi) - assert.NotEqual(nil, p) - assert.Equal("::", p.GetNexthop().String()) - assert.Equal("2001:db8:0:f101::/64", p.GetNlri().String()) - assert.True(p.IsFromExternal()) - assert.True(p.IsWithdraw) - + p = createRequestFromIPRouteMessage(m) + assert.NotNil(p) + paths, err = cmd.ApiStruct2Path(p.Path) + assert.Nil(err) + assert.Equal(len(paths), 1) + path = paths[0] + pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false) + pp.SetIsFromExternal(p.Path.IsFromExternal) + assert.Equal("::", pp.GetNexthop().String()) + assert.Equal("2001:db8:0:f101::/64", pp.GetNlri().String()) + assert.True(pp.IsFromExternal()) + assert.True(pp.IsWithdraw) } |