summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:27:29 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-22 17:27:29 +0900
commit5aec36b646e2a3c01434828c0f0cc6f3e8566578 (patch)
treeab954b6214259f2c427e8e002515b606feff7961
parent43dc07d72353fc8bcb79a18a5739ea0a90dda6bb (diff)
move gRPC-related code for REQ_MONITOR_RIB and REQ_MONITOR_NEIGHBOR_PEER_STATE to grpc_server.go
Add new Watch API. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/grpc_server.go133
-rw-r--r--server/monitor.go196
-rw-r--r--server/server.go209
3 files changed, 279 insertions, 259 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 158f435f..ebe8e972 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -240,21 +240,6 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.GetNeighborRequest) (
return &api.GetNeighborResponse{Peers: p}, nil
}
-func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error {
- for res := range req.ResponseCh {
- if err := res.Err(); err != nil {
- log.Debug(err.Error())
- req.EndCh <- struct{}{}
- return err
- }
- if err := f(res); err != nil {
- req.EndCh <- struct{}{}
- return err
- }
- }
- return nil
-}
-
func toPathApi(id string, path *table.Path) *api.Path {
nlri := path.GetNlri()
n, _ := nlri.Serialize()
@@ -357,27 +342,115 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi
}
func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error {
- switch arg.Type {
- case api.Resource_ADJ_IN, api.Resource_GLOBAL:
- default:
- return fmt.Errorf("unsupported resource type: %v", arg.Type)
+ w, err := func() (*Watcher, error) {
+ switch arg.Type {
+ case api.Resource_GLOBAL:
+ return s.bgpServer.Watch(WatchBestPath()), nil
+ case api.Resource_ADJ_IN:
+ if arg.PostPolicy {
+ return s.bgpServer.Watch(WatchPostUpdate()), nil
+ }
+ return s.bgpServer.Watch(WatchUpdate()), nil
+ default:
+ return nil, fmt.Errorf("unsupported resource type: %v", arg.Type)
+ }
+ }()
+ if err != nil {
+ return nil
}
- req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg)
- s.bgpServerCh <- req
- return handleMultipleResponses(req, func(res *GrpcResponse) error {
- return stream.Send(res.Data.(*api.Destination))
- })
+ return func() error {
+ defer func() { w.Stop() }()
+
+ sendPath := func(pathList []*table.Path) error {
+ dsts := make(map[string]*api.Destination)
+ for _, path := range pathList {
+ if path == nil {
+ continue
+ }
+ if dst, y := dsts[path.GetNlri().String()]; y {
+ dst.Paths = append(dst.Paths, toPathApi(table.GLOBAL_RIB_NAME, path))
+ } else {
+ dsts[path.GetNlri().String()] = &api.Destination{
+ Prefix: path.GetNlri().String(),
+ Paths: []*api.Path{toPathApi(table.GLOBAL_RIB_NAME, path)},
+ }
+ }
+ }
+ for _, dst := range dsts {
+ if err := stream.Send(dst); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventBestPathMsg:
+ if err := sendPath(func() []*table.Path {
+ if len(msg.multiPathList) > 0 {
+ l := make([]*table.Path, 0)
+ for _, p := range msg.multiPathList {
+ l = append(l, p...)
+ }
+ return l
+ } else {
+ return msg.pathList
+ }
+ }()); err != nil {
+ return err
+ }
+ case *watcherEventUpdateMsg:
+ if err := sendPath(msg.pathList); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }()
}
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error {
- var rf bgp.RouteFamily
- req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil)
- s.bgpServerCh <- req
+ return func() error {
+ w := s.bgpServer.Watch(WatchPeerState())
+ defer func() { w.Stop() }()
- return handleMultipleResponses(req, func(res *GrpcResponse) error {
- return stream.Send(res.Data.(*api.Peer))
- })
+ for {
+ select {
+ case ev := <-w.Event():
+ switch msg := ev.(type) {
+ case *watcherEventStateChangedMsg:
+ if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() {
+ continue
+ }
+ if err := stream.Send(&api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ Id: msg.peerID.String(),
+ },
+ Info: &api.PeerState{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ BgpState: msg.state.String(),
+ AdminState: msg.adminState.String(),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.localAddress.String(),
+ LocalPort: uint32(msg.localPort),
+ RemotePort: uint32(msg.peerPort),
+ },
+ }); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ }()
}
func (s *Server) neighbor(reqType int, address string, d interface{}) (interface{}, error) {
diff --git a/server/monitor.go b/server/monitor.go
deleted file mode 100644
index d8276c9e..00000000
--- a/server/monitor.go
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-// implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package server
-
-import (
- api "github.com/osrg/gobgp/api"
- "github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/table"
- "gopkg.in/tomb.v2"
-)
-
-type grpcWatcher struct {
- t tomb.Tomb
- ch chan watcherEvent
- ctlCh chan *GrpcRequest
- reqs map[watcherEventType][]*GrpcRequest
-}
-
-func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
- return w.ch
- }
- return nil
-}
-
-func (w *grpcWatcher) stop() {
- w.t.Kill(nil)
-}
-
-func (w *grpcWatcher) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE}
-}
-
-func (w *grpcWatcher) loop() error {
- for {
- select {
- case <-w.t.Dying():
- for _, rs := range w.reqs {
- for _, req := range rs {
- close(req.ResponseCh)
- }
- }
- return nil
- case req := <-w.ctlCh:
- var reqType watcherEventType
- switch req.RequestType {
- case REQ_MONITOR_RIB:
- tbl := req.Data.(*api.Table)
- switch tbl.Type {
- case api.Resource_GLOBAL:
- reqType = WATCHER_EVENT_BESTPATH_CHANGE
- case api.Resource_ADJ_IN:
- if tbl.PostPolicy {
- reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
- } else {
- reqType = WATCHER_EVENT_UPDATE_MSG
- }
- }
- case REQ_MONITOR_NEIGHBOR_PEER_STATE:
- reqType = WATCHER_EVENT_STATE_CHANGE
- }
- reqs := w.reqs[reqType]
- if reqs == nil {
- reqs = make([]*GrpcRequest, 0, 16)
- }
- reqs = append(reqs, req)
- w.reqs[reqType] = reqs
- case ev := <-w.ch:
- sendMultiPaths := func(reqType watcherEventType, dsts [][]*table.Path) {
- for _, dst := range dsts {
- paths := make([]*api.Path, 0, len(dst))
- for _, path := range dst {
- paths = append(paths, toPathApi(table.GLOBAL_RIB_NAME, path))
- }
- if len(paths) == 0 {
- continue
- }
- remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
- result := &GrpcResponse{
- Data: &api.Destination{
- Prefix: dst[0].GetNlri().String(),
- Paths: paths,
- },
- }
- for _, req := range w.reqs[reqType] {
- select {
- case <-req.EndCh:
- continue
- default:
- }
- remains = append(remains, req)
- if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != dst[0].GetRouteFamily() {
- continue
- }
- if req.Name != "" && req.Name != paths[0].NeighborIp {
- continue
- }
- req.ResponseCh <- result
- }
- w.reqs[reqType] = remains
- }
- }
- sendPaths := func(reqType watcherEventType, paths []*table.Path) {
- dsts := make([][]*table.Path, 0, len(paths))
- for _, path := range paths {
- if path == nil {
- continue
- }
- dsts = append(dsts, []*table.Path{path})
- }
- sendMultiPaths(reqType, dsts)
- }
- switch msg := ev.(type) {
- case *watcherEventBestPathMsg:
- if table.UseMultiplePaths.Enabled {
- sendMultiPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.multiPathList)
- } else {
- sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList)
- }
- case *watcherEventUpdateMsg:
- if msg.postPolicy {
- sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList)
- } else {
- sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList)
- }
- case *watcherEventStateChangedMsg:
- peer := &api.Peer{
- Conf: &api.PeerConf{
- PeerAs: msg.peerAS,
- LocalAs: msg.localAS,
- NeighborAddress: msg.peerAddress.String(),
- Id: msg.peerID.String(),
- },
- Info: &api.PeerState{
- PeerAs: msg.peerAS,
- LocalAs: msg.localAS,
- NeighborAddress: msg.peerAddress.String(),
- BgpState: msg.state.String(),
- AdminState: msg.adminState.String(),
- },
- Transport: &api.Transport{
- LocalAddress: msg.localAddress.String(),
- LocalPort: uint32(msg.localPort),
- RemotePort: uint32(msg.peerPort),
- },
- }
- reqType := WATCHER_EVENT_STATE_CHANGE
- remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
- result := &GrpcResponse{
- Data: peer,
- }
- for _, req := range w.reqs[reqType] {
- select {
- case <-req.EndCh:
- continue
- default:
- }
- remains = append(remains, req)
- if req.Name != "" && req.Name != peer.Conf.NeighborAddress {
- continue
- }
- req.ResponseCh <- result
- }
- w.reqs[reqType] = remains
- }
- }
- }
-}
-
-func (w *grpcWatcher) addRequest(req *GrpcRequest) error {
- w.ctlCh <- req
- return nil
-}
-
-func newGrpcWatcher() (*grpcWatcher, error) {
- w := &grpcWatcher{
- ch: make(chan watcherEvent),
- ctlCh: make(chan *GrpcRequest),
- reqs: make(map[watcherEventType][]*GrpcRequest),
- }
- w.t.Go(w.loop)
- return w, nil
-}
diff --git a/server/server.go b/server/server.go
index 54702226..e6d78112 100644
--- a/server/server.go
+++ b/server/server.go
@@ -105,6 +105,7 @@ type BgpServer struct {
roaManager *roaManager
shutdown bool
watchers *watcherManager
+ watcherMap map[watchType][]*Watcher
}
func NewBgpServer() *BgpServer {
@@ -116,6 +117,7 @@ func NewBgpServer() *BgpServer {
roaManager: roaManager,
watchers: newWatcherManager(),
mgmtCh: make(chan func(), 1),
+ watcherMap: make(map[watchType][]*Watcher),
}
}
@@ -133,9 +135,6 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
}
func (server *BgpServer) Serve() {
- w, _ := newGrpcWatcher()
- server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w)
-
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
@@ -367,6 +366,9 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
if !peer.isRouteServerClient() {
server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
+ for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
+ w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath})
+ }
}
for _, targetPeer := range server.neighborMap {
@@ -383,27 +385,30 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
+ _, rport := peer.fsm.RemoteHostPort()
+ laddr, lport := peer.fsm.LocalHostPort()
+ sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
+ recvOpen := peer.fsm.recvOpen
+ ev := &watcherEventStateChangedMsg{
+ peerAS: peer.fsm.peerInfo.AS,
+ localAS: peer.fsm.peerInfo.LocalAS,
+ peerAddress: peer.fsm.peerInfo.Address,
+ localAddress: net.ParseIP(laddr),
+ peerPort: rport,
+ localPort: lport,
+ peerID: peer.fsm.peerInfo.ID,
+ sentOpen: sentOpen,
+ recvOpen: recvOpen,
+ state: newState,
+ adminState: peer.fsm.adminState,
+ timestamp: time.Now(),
+ }
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
- _, rport := peer.fsm.RemoteHostPort()
- laddr, lport := peer.fsm.LocalHostPort()
- sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf)
- recvOpen := peer.fsm.recvOpen
- ev := &watcherEventStateChangedMsg{
- peerAS: peer.fsm.peerInfo.AS,
- localAS: peer.fsm.peerInfo.LocalAS,
- peerAddress: peer.fsm.peerInfo.Address,
- localAddress: net.ParseIP(laddr),
- peerPort: rport,
- localPort: lport,
- peerID: peer.fsm.peerInfo.ID,
- sentOpen: sentOpen,
- recvOpen: recvOpen,
- state: newState,
- adminState: peer.fsm.adminState,
- timestamp: time.Now(),
- }
server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
+ for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] {
+ w.notify(ev)
+ }
}
}
@@ -515,6 +520,10 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []*
return alteredPathList
}
server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
+ for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] {
+ w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi})
+ }
+
}
for _, targetPeer := range server.neighborMap {
@@ -657,7 +666,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
sendFsmOutgoingMsg(peer, nil, notification, true)
return
}
- if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
+ if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -674,12 +683,15 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
pathList: pathList,
}
server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev)
+ for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] {
+ w.notify(ev)
+ }
}
if len(pathList) > 0 {
var altered []*table.Path
altered = server.propagateUpdate(peer, pathList)
- if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) {
+ if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watcherEventUpdateMsg{
@@ -697,6 +709,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
payload, _ := u.Serialize()
ev.payload = payload
server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
+ for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] {
+ w.notify(ev)
+ }
}
}
}
@@ -1780,16 +1795,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) {
}
grpcReq.ResponseCh <- result
close(grpcReq.ResponseCh)
- case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE:
- if grpcReq.Name != "" {
- if _, err = server.checkNeighborRequest(grpcReq); err != nil {
- break
- }
- }
- w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR)
- if y {
- go w.(*grpcWatcher).addRequest(grpcReq)
- }
case REQ_ENABLE_MRT:
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
@@ -2787,3 +2792,141 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) {
done(grpcReq, &api.SoftResetRpkiResponse{}, server.roaManager.SoftReset(arg.Address))
}
}
+
+type watchType string
+
+const (
+ WATCH_TYPE_BESTPATH watchType = "bestpath"
+ WATCH_TYPE_PRE_UPDATE watchType = "preupdate"
+ WATCH_TYPE_POST_UPDATE watchType = "postupdate"
+ WATCH_TYPE_PEER_STATE watchType = "peerstate"
+)
+
+type watchOptions struct {
+ bestpath bool
+ preUpdate bool
+ postUpdate bool
+ peerState bool
+}
+
+type WatchOption func(*watchOptions)
+
+func WatchBestPath() WatchOption {
+ return func(o *watchOptions) {
+ o.bestpath = true
+ }
+}
+
+func WatchUpdate() WatchOption {
+ return func(o *watchOptions) {
+ o.preUpdate = true
+ }
+}
+
+func WatchPostUpdate() WatchOption {
+ return func(o *watchOptions) {
+ o.postUpdate = true
+ }
+}
+
+func WatchPeerState() WatchOption {
+ return func(o *watchOptions) {
+ o.peerState = true
+ }
+}
+
+type Watcher struct {
+ opts watchOptions
+ realCh chan watcherEvent
+ ch *channels.InfiniteChannel
+ s *BgpServer
+}
+
+func (w *Watcher) Event() <-chan watcherEvent {
+ return w.realCh
+}
+
+func (w *Watcher) notify(v watcherEvent) {
+ w.ch.In() <- v
+}
+
+func (w *Watcher) loop() {
+ for {
+ select {
+ case ev, ok := <-w.ch.Out():
+ if !ok {
+ close(w.realCh)
+ return
+ }
+ w.realCh <- ev.(watcherEvent)
+ }
+ }
+}
+
+func (w *Watcher) Stop() {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ w.s.mgmtCh <- func() {
+ defer close(ch)
+
+ for k, l := range w.s.watcherMap {
+ for i, v := range l {
+ if w == v {
+ w.s.watcherMap[k] = append(l[:i], l[i+1:]...)
+ break
+ }
+ }
+ }
+
+ w.ch.Close()
+ // make sure the loop function finishes
+ func() {
+ for {
+ select {
+ case <-w.realCh:
+ default:
+ return
+ }
+ }
+ }()
+ }
+}
+
+func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
+ ch := make(chan struct{})
+ defer func() { <-ch }()
+
+ s.mgmtCh <- func() {
+ defer close(ch)
+
+ w = &Watcher{
+ s: s,
+ realCh: make(chan watcherEvent, 8),
+ ch: channels.NewInfiniteChannel(),
+ }
+
+ for _, opt := range opts {
+ opt(&w.opts)
+ }
+
+ register := func(t watchType, w *Watcher) {
+ s.watcherMap[t] = append(s.watcherMap[t], w)
+ }
+
+ if w.opts.bestpath {
+ register(WATCH_TYPE_BESTPATH, w)
+ }
+ if w.opts.preUpdate {
+ register(WATCH_TYPE_PRE_UPDATE, w)
+ }
+ if w.opts.postUpdate {
+ register(WATCH_TYPE_POST_UPDATE, w)
+ }
+ if w.opts.peerState {
+ register(WATCH_TYPE_PEER_STATE, w)
+ }
+ go w.loop()
+ }
+ return w
+}