summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-24 05:47:52 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (patch)
treeeb91718c87ddcdaa0d2133f3aaccfee6dbe7f7a8 /server
parent10746e5f4b303aba553c2bb759afe3a8d4ffe3aa (diff)
server: refactoring for monitorbestchanged api. use watcher infra
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/grpc_server.go24
-rw-r--r--server/monitor.go124
-rw-r--r--server/server.go54
-rw-r--r--server/watcher.go3
-rw-r--r--server/zclient.go2
-rw-r--r--server/zclient_test.go85
6 files changed, 141 insertions, 151 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index cbe642a2..5963b780 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -51,8 +51,7 @@ const (
REQ_GRPC_DELETE_NEIGHBOR
REQ_UPDATE_NEIGHBOR
REQ_GLOBAL_RIB
- REQ_MONITOR_GLOBAL_BEST_CHANGED
- REQ_MONITOR_INCOMING
+ REQ_MONITOR_RIB
REQ_MONITOR_NEIGHBOR_PEER_STATE
REQ_ENABLE_MRT
REQ_DISABLE_MRT
@@ -189,31 +188,14 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi
return d.(*api.GetRibResponse), nil
}
-func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.GobgpApi_MonitorBestChangedServer) error {
- var reqType int
- switch arg.Resource {
- case api.Resource_GLOBAL:
- reqType = REQ_MONITOR_GLOBAL_BEST_CHANGED
- default:
- return fmt.Errorf("unsupported resource type: %v", arg.Resource)
- }
-
- req := NewGrpcRequest(reqType, "", bgp.RouteFamily(arg.Family), nil)
- s.bgpServerCh <- req
-
- return handleMultipleResponses(req, func(res *GrpcResponse) error {
- return stream.Send(res.Data.(*api.Destination))
- })
-}
-
func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error {
switch arg.Type {
- case api.Resource_ADJ_IN:
+ case api.Resource_ADJ_IN, api.Resource_GLOBAL:
default:
return fmt.Errorf("unsupported resource type: %v", arg.Type)
}
- req := NewGrpcRequest(REQ_MONITOR_INCOMING, arg.Name, bgp.RouteFamily(arg.Family), arg)
+ req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg)
s.bgpServerCh <- req
return handleMultipleResponses(req, func(res *GrpcResponse) error {
return stream.Send(res.Data.(*api.Destination))
diff --git a/server/monitor.go b/server/monitor.go
index ac9c775b..c7236fed 100644
--- a/server/monitor.go
+++ b/server/monitor.go
@@ -22,99 +22,125 @@ import (
"gopkg.in/tomb.v2"
)
-type grpcIncomingWatcher struct {
+type grpcWatcher struct {
t tomb.Tomb
ch chan watcherEvent
ctlCh chan *GrpcRequest
- reqs []*GrpcRequest
+ reqs map[watcherEventType][]*GrpcRequest
}
-func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
+func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
return w.ch
}
return nil
}
-func (w *grpcIncomingWatcher) stop() {
+func (w *grpcWatcher) stop() {
w.t.Kill(nil)
}
-func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType {
- pre := false
- post := false
- for _, req := range w.reqs {
- if req.Data.(*api.Table).PostPolicy {
- post = true
- } else {
- pre = true
+func (w *grpcWatcher) watchingEventTypes() []watcherEventType {
+ types := make([]watcherEventType, 0, 3)
+ for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} {
+ if len(w.reqs[t]) > 0 {
+ types = append(types, t)
}
}
- types := make([]watcherEventType, 0, 2)
- if pre {
- types = append(types, WATCHER_EVENT_UPDATE_MSG)
- }
- if post {
- types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
- }
return types
}
-func (w *grpcIncomingWatcher) loop() error {
+func (w *grpcWatcher) loop() error {
for {
select {
case <-w.t.Dying():
- for _, req := range w.reqs {
- close(req.ResponseCh)
+ for _, rs := range w.reqs {
+ for _, req := range rs {
+ close(req.ResponseCh)
+ }
}
return nil
case req := <-w.ctlCh:
- w.reqs = append(w.reqs, req)
- case ev := <-w.ch:
- msg := ev.(*watcherEventUpdateMsg)
- for _, path := range msg.pathList {
- remains := make([]*GrpcRequest, 0, len(w.reqs))
- result := &GrpcResponse{
- Data: &api.Destination{
- Prefix: path.GetNlri().String(),
- Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
- },
+ tbl := req.Data.(*api.Table)
+ var reqType watcherEventType
+ switch tbl.Type {
+ case api.Resource_GLOBAL:
+ reqType = WATCHER_EVENT_BESTPATH_CHANGE
+ case api.Resource_ADJ_IN:
+ if tbl.PostPolicy {
+ reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
+ } else {
+ reqType = WATCHER_EVENT_UPDATE_MSG
}
- for _, req := range w.reqs {
- select {
- case <-req.EndCh:
+ default:
+ continue
+ }
+ reqs := w.reqs[reqType]
+ if reqs == nil {
+ reqs = make([]*GrpcRequest, 0, 16)
+ }
+ reqs = append(reqs, req)
+ w.reqs[reqType] = reqs
+ case ev := <-w.ch:
+ sendPaths := func(reqType watcherEventType, paths []*table.Path) {
+ for _, path := range paths {
+ if path == nil {
continue
- default:
}
- remains = append(remains, req)
- if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() {
- continue
+ remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
+ result := &GrpcResponse{
+ Data: &api.Destination{
+ Prefix: path.GetNlri().String(),
+ Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
+ },
}
- if req.Name != "" && req.Name != path.GetSource().Address.String() {
- continue
+ for _, req := range w.reqs[reqType] {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
+ remains = append(remains, req)
+ if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() {
+ continue
+ }
+ if req.Name != "" && req.Name != path.GetSource().Address.String() {
+ continue
+ }
+ req.ResponseCh <- result
}
- req.ResponseCh <- result
+ w.reqs[reqType] = remains
}
- w.reqs = remains
}
+ switch msg := ev.(type) {
+ case *watcherEventBestPathMsg:
+ sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList)
+ case *watcherEventUpdateMsg:
+ if msg.postPolicy {
+ sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList)
+ } else {
+ sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList)
+ }
+ }
+
}
}
}
-func (w *grpcIncomingWatcher) restart(string) error {
+func (w *grpcWatcher) restart(string) error {
return nil
}
-func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error {
+func (w *grpcWatcher) addRequest(req *GrpcRequest) error {
w.ctlCh <- req
return nil
}
-func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) {
- w := &grpcIncomingWatcher{
+func newGrpcWatcher() (*grpcWatcher, error) {
+ w := &grpcWatcher{
ch: make(chan watcherEvent),
ctlCh: make(chan *GrpcRequest),
- reqs: make([]*GrpcRequest, 0, 16),
+ reqs: make(map[watcherEventType][]*GrpcRequest),
}
w.t.Go(w.loop)
return w, nil
diff --git a/server/server.go b/server/server.go
index faf4a06c..7bd29048 100644
--- a/server/server.go
+++ b/server/server.go
@@ -195,8 +195,8 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
}
func (server *BgpServer) Serve() {
- w, _ := newGrpcIncomingWatcher()
- server.watchers[WATCHER_GRPC_INCOMING] = w
+ w, _ := newGrpcWatcher()
+ server.watchers[WATCHER_GRPC_MONITOR] = w
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
@@ -478,7 +478,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf)
if !peer.isRouteServerClient() {
- server.broadcastBests(best[table.GLOBAL_RIB_NAME])
+ server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -493,44 +493,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
return msgs
}
-func (server *BgpServer) broadcastBests(bests []*table.Path) {
- server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: bests})
- for _, path := range bests {
- if path == nil {
- continue
- }
- rf := path.GetRouteFamily()
-
- result := &GrpcResponse{
- Data: &api.Destination{
- Prefix: path.GetNlri().String(),
- Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
- },
- }
- remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
- for _, req := range server.broadcastReqs {
- select {
- case <-req.EndCh:
- continue
- default:
- }
- if req.RequestType != REQ_MONITOR_GLOBAL_BEST_CHANGED {
- remainReqs = append(remainReqs, req)
- continue
- }
- if req.RouteFamily == bgp.RouteFamily(0) || req.RouteFamily == rf {
- m := &broadcastGrpcMsg{
- req: req,
- result: result,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- }
- remainReqs = append(remainReqs, req)
- }
- server.broadcastReqs = remainReqs
- }
-}
-
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
result := &GrpcResponse{
Data: peer.ToApiStruct(),
@@ -688,7 +650,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
if len(best[table.GLOBAL_RIB_NAME]) == 0 {
return nil, alteredPathList
}
- server.broadcastBests(best[table.GLOBAL_RIB_NAME])
+ server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -2291,16 +2253,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: data,
}
close(grpcReq.ResponseCh)
- case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE:
+ case REQ_MONITOR_NEIGHBOR_PEER_STATE:
server.broadcastReqs = append(server.broadcastReqs, grpcReq)
- case REQ_MONITOR_INCOMING:
+ case REQ_MONITOR_RIB:
if grpcReq.Name != "" {
if _, err = server.checkNeighborRequest(grpcReq); err != nil {
break
}
}
- w := server.watchers[WATCHER_GRPC_INCOMING]
- go w.(*grpcIncomingWatcher).addRequest(grpcReq)
+ w := server.watchers[WATCHER_GRPC_MONITOR]
+ go w.(*grpcWatcher).addRequest(grpcReq)
case REQ_ENABLE_MRT:
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
diff --git a/server/watcher.go b/server/watcher.go
index 8df5edbb..8c92fab3 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -44,8 +44,7 @@ const (
WATCHER_BMP
WATCHER_ZEBRA
WATCHER_COLLECTOR
- WATCHER_GRPC_BESTPATH
- WATCHER_GRPC_INCOMING
+ WATCHER_GRPC_MONITOR
)
type watcherEventType uint8
diff --git a/server/zclient.go b/server/zclient.go
index 87e303d4..1b37e5db 100644
--- a/server/zclient.go
+++ b/server/zclient.go
@@ -29,7 +29,7 @@ import (
)
func newIPRouteMessage(path *table.Path) *zebra.Message {
- if path.IsFromExternal() {
+ if path == nil || path.IsFromExternal() {
return nil
}
l := strings.SplitN(path.GetNlri().String(), "/", 2)
diff --git a/server/zclient_test.go b/server/zclient_test.go
index a09b59e6..8067c53f 100644
--- a/server/zclient_test.go
+++ b/server/zclient_test.go
@@ -16,14 +16,16 @@
package server
import (
+ "github.com/osrg/gobgp/gobgp/cmd"
"github.com/osrg/gobgp/table"
"github.com/osrg/gobgp/zebra"
"github.com/stretchr/testify/assert"
"net"
"testing"
+ "time"
)
-func Test_createPathFromIPRouteMessage(t *testing.T) {
+func Test_createRequestFromIPRouteMessage(t *testing.T) {
assert := assert.New(t)
m := &zebra.Message{}
@@ -51,28 +53,36 @@ func Test_createPathFromIPRouteMessage(t *testing.T) {
m.Header = *h
m.Body = b
- pi := &table.PeerInfo{
- AS: 65000,
- LocalID: net.ParseIP("10.0.0.1"),
- }
- p := createPathFromIPRouteMessage(m, pi)
- assert.NotEqual(nil, p)
- assert.Equal("0.0.0.0", p.GetNexthop().String())
- assert.Equal("192.168.100.0/24", p.GetNlri().String())
- assert.True(p.IsFromExternal())
- assert.False(p.IsWithdraw)
+ p := createRequestFromIPRouteMessage(m)
+ assert.NotNil(p)
+ paths, err := cmd.ApiStruct2Path(p.Path)
+ assert.Nil(err)
+ assert.Equal(len(paths), 1)
+ path := paths[0]
+ pp := table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false)
+ pp.SetIsFromExternal(p.Path.IsFromExternal)
+ assert.Equal("0.0.0.0", pp.GetNexthop().String())
+ assert.Equal("192.168.100.0/24", pp.GetNlri().String())
+ assert.True(pp.IsFromExternal())
+ assert.False(pp.IsWithdraw)
// withdraw
h.Command = zebra.IPV4_ROUTE_DELETE
m.Header = *h
- p = createPathFromIPRouteMessage(m, pi)
- assert.NotEqual(nil, p)
- assert.Equal("0.0.0.0", p.GetNexthop().String())
- assert.Equal("192.168.100.0/24", p.GetNlri().String())
- med, _ := p.GetMed()
+ p = createRequestFromIPRouteMessage(m)
+ assert.NotNil(p)
+ paths, err = cmd.ApiStruct2Path(p.Path)
+ assert.Nil(err)
+ assert.Equal(len(paths), 1)
+ path = paths[0]
+ pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false)
+ pp.SetIsFromExternal(p.Path.IsFromExternal)
+ assert.Equal("0.0.0.0", pp.GetNexthop().String())
+ assert.Equal("192.168.100.0/24", pp.GetNlri().String())
+ med, _ := pp.GetMed()
assert.Equal(uint32(100), med)
- assert.True(p.IsFromExternal())
- assert.True(p.IsWithdraw)
+ assert.True(pp.IsFromExternal())
+ assert.True(pp.IsWithdraw)
// IPv6
h.Command = zebra.IPV6_ROUTE_ADD
@@ -82,23 +92,34 @@ func Test_createPathFromIPRouteMessage(t *testing.T) {
m.Header = *h
m.Body = b
- p = createPathFromIPRouteMessage(m, pi)
- assert.NotEqual(nil, p)
- assert.Equal("::", p.GetNexthop().String())
- assert.Equal("2001:db8:0:f101::/64", p.GetNlri().String())
- med, _ = p.GetMed()
+ p = createRequestFromIPRouteMessage(m)
+ assert.NotNil(p)
+ paths, err = cmd.ApiStruct2Path(p.Path)
+ assert.Nil(err)
+ assert.Equal(len(paths), 1)
+ path = paths[0]
+ pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false)
+ pp.SetIsFromExternal(p.Path.IsFromExternal)
+ assert.Equal("::", pp.GetNexthop().String())
+ assert.Equal("2001:db8:0:f101::/64", pp.GetNlri().String())
+ med, _ = pp.GetMed()
assert.Equal(uint32(100), med)
- assert.True(p.IsFromExternal())
- assert.False(p.IsWithdraw)
+ assert.True(pp.IsFromExternal())
+ assert.False(pp.IsWithdraw)
// withdraw
h.Command = zebra.IPV6_ROUTE_DELETE
m.Header = *h
- p = createPathFromIPRouteMessage(m, pi)
- assert.NotEqual(nil, p)
- assert.Equal("::", p.GetNexthop().String())
- assert.Equal("2001:db8:0:f101::/64", p.GetNlri().String())
- assert.True(p.IsFromExternal())
- assert.True(p.IsWithdraw)
-
+ p = createRequestFromIPRouteMessage(m)
+ assert.NotNil(p)
+ paths, err = cmd.ApiStruct2Path(p.Path)
+ assert.Nil(err)
+ assert.Equal(len(paths), 1)
+ path = paths[0]
+ pp = table.NewPath(nil, path.Nlri, path.IsWithdraw, path.PathAttrs, time.Now(), false)
+ pp.SetIsFromExternal(p.Path.IsFromExternal)
+ assert.Equal("::", pp.GetNexthop().String())
+ assert.Equal("2001:db8:0:f101::/64", pp.GetNlri().String())
+ assert.True(pp.IsFromExternal())
+ assert.True(pp.IsWithdraw)
}