diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:27:29 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:27:29 +0900 |
commit | 5aec36b646e2a3c01434828c0f0cc6f3e8566578 (patch) | |
tree | ab954b6214259f2c427e8e002515b606feff7961 /server | |
parent | 43dc07d72353fc8bcb79a18a5739ea0a90dda6bb (diff) |
move gRPC-related code for REQ_MONITOR_RIB and REQ_MONITOR_NEIGHBOR_PEER_STATE to grpc_server.go
Add new Watch API.
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 133 | ||||
-rw-r--r-- | server/monitor.go | 196 | ||||
-rw-r--r-- | server/server.go | 209 |
3 files changed, 279 insertions, 259 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 158f435f..ebe8e972 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -240,21 +240,6 @@ func (s *Server) GetNeighbor(ctx context.Context, arg *api.GetNeighborRequest) ( return &api.GetNeighborResponse{Peers: p}, nil } -func handleMultipleResponses(req *GrpcRequest, f func(*GrpcResponse) error) error { - for res := range req.ResponseCh { - if err := res.Err(); err != nil { - log.Debug(err.Error()) - req.EndCh <- struct{}{} - return err - } - if err := f(res); err != nil { - req.EndCh <- struct{}{} - return err - } - } - return nil -} - func toPathApi(id string, path *table.Path) *api.Path { nlri := path.GetNlri() n, _ := nlri.Serialize() @@ -357,27 +342,115 @@ func (s *Server) GetRib(ctx context.Context, arg *api.GetRibRequest) (*api.GetRi } func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error { - switch arg.Type { - case api.Resource_ADJ_IN, api.Resource_GLOBAL: - default: - return fmt.Errorf("unsupported resource type: %v", arg.Type) + w, err := func() (*Watcher, error) { + switch arg.Type { + case api.Resource_GLOBAL: + return s.bgpServer.Watch(WatchBestPath()), nil + case api.Resource_ADJ_IN: + if arg.PostPolicy { + return s.bgpServer.Watch(WatchPostUpdate()), nil + } + return s.bgpServer.Watch(WatchUpdate()), nil + default: + return nil, fmt.Errorf("unsupported resource type: %v", arg.Type) + } + }() + if err != nil { + return nil } - req := NewGrpcRequest(REQ_MONITOR_RIB, arg.Name, bgp.RouteFamily(arg.Family), arg) - s.bgpServerCh <- req - return handleMultipleResponses(req, func(res *GrpcResponse) error { - return stream.Send(res.Data.(*api.Destination)) - }) + return func() error { + defer func() { w.Stop() }() + + sendPath := func(pathList []*table.Path) error { + dsts := make(map[string]*api.Destination) + for _, path := range pathList { + if path == nil { + continue + } + if dst, y := dsts[path.GetNlri().String()]; y { + dst.Paths = append(dst.Paths, toPathApi(table.GLOBAL_RIB_NAME, path)) + } else { + dsts[path.GetNlri().String()] = &api.Destination{ + Prefix: path.GetNlri().String(), + Paths: []*api.Path{toPathApi(table.GLOBAL_RIB_NAME, path)}, + } + } + } + for _, dst := range dsts { + if err := stream.Send(dst); err != nil { + return err + } + } + return nil + } + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventBestPathMsg: + if err := sendPath(func() []*table.Path { + if len(msg.multiPathList) > 0 { + l := make([]*table.Path, 0) + for _, p := range msg.multiPathList { + l = append(l, p...) + } + return l + } else { + return msg.pathList + } + }()); err != nil { + return err + } + case *watcherEventUpdateMsg: + if err := sendPath(msg.pathList); err != nil { + return err + } + } + } + } + }() } func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error { - var rf bgp.RouteFamily - req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) - s.bgpServerCh <- req + return func() error { + w := s.bgpServer.Watch(WatchPeerState()) + defer func() { w.Stop() }() - return handleMultipleResponses(req, func(res *GrpcResponse) error { - return stream.Send(res.Data.(*api.Peer)) - }) + for { + select { + case ev := <-w.Event(): + switch msg := ev.(type) { + case *watcherEventStateChangedMsg: + if len(arg.Name) > 0 && arg.Name != msg.peerAddress.String() { + continue + } + if err := stream.Send(&api.Peer{ + Conf: &api.PeerConf{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + Id: msg.peerID.String(), + }, + Info: &api.PeerState{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + BgpState: msg.state.String(), + AdminState: msg.adminState.String(), + }, + Transport: &api.Transport{ + LocalAddress: msg.localAddress.String(), + LocalPort: uint32(msg.localPort), + RemotePort: uint32(msg.peerPort), + }, + }); err != nil { + return err + } + } + } + } + }() } func (s *Server) neighbor(reqType int, address string, d interface{}) (interface{}, error) { diff --git a/server/monitor.go b/server/monitor.go deleted file mode 100644 index d8276c9e..00000000 --- a/server/monitor.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - api "github.com/osrg/gobgp/api" - "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/table" - "gopkg.in/tomb.v2" -) - -type grpcWatcher struct { - t tomb.Tomb - ch chan watcherEvent - ctlCh chan *GrpcRequest - reqs map[watcherEventType][]*GrpcRequest -} - -func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { - return w.ch - } - return nil -} - -func (w *grpcWatcher) stop() { - w.t.Kill(nil) -} - -func (w *grpcWatcher) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE} -} - -func (w *grpcWatcher) loop() error { - for { - select { - case <-w.t.Dying(): - for _, rs := range w.reqs { - for _, req := range rs { - close(req.ResponseCh) - } - } - return nil - case req := <-w.ctlCh: - var reqType watcherEventType - switch req.RequestType { - case REQ_MONITOR_RIB: - tbl := req.Data.(*api.Table) - switch tbl.Type { - case api.Resource_GLOBAL: - reqType = WATCHER_EVENT_BESTPATH_CHANGE - case api.Resource_ADJ_IN: - if tbl.PostPolicy { - reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG - } else { - reqType = WATCHER_EVENT_UPDATE_MSG - } - } - case REQ_MONITOR_NEIGHBOR_PEER_STATE: - reqType = WATCHER_EVENT_STATE_CHANGE - } - reqs := w.reqs[reqType] - if reqs == nil { - reqs = make([]*GrpcRequest, 0, 16) - } - reqs = append(reqs, req) - w.reqs[reqType] = reqs - case ev := <-w.ch: - sendMultiPaths := func(reqType watcherEventType, dsts [][]*table.Path) { - for _, dst := range dsts { - paths := make([]*api.Path, 0, len(dst)) - for _, path := range dst { - paths = append(paths, toPathApi(table.GLOBAL_RIB_NAME, path)) - } - if len(paths) == 0 { - continue - } - remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) - result := &GrpcResponse{ - Data: &api.Destination{ - Prefix: dst[0].GetNlri().String(), - Paths: paths, - }, - } - for _, req := range w.reqs[reqType] { - select { - case <-req.EndCh: - continue - default: - } - remains = append(remains, req) - if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != dst[0].GetRouteFamily() { - continue - } - if req.Name != "" && req.Name != paths[0].NeighborIp { - continue - } - req.ResponseCh <- result - } - w.reqs[reqType] = remains - } - } - sendPaths := func(reqType watcherEventType, paths []*table.Path) { - dsts := make([][]*table.Path, 0, len(paths)) - for _, path := range paths { - if path == nil { - continue - } - dsts = append(dsts, []*table.Path{path}) - } - sendMultiPaths(reqType, dsts) - } - switch msg := ev.(type) { - case *watcherEventBestPathMsg: - if table.UseMultiplePaths.Enabled { - sendMultiPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.multiPathList) - } else { - sendPaths(WATCHER_EVENT_BESTPATH_CHANGE, msg.pathList) - } - case *watcherEventUpdateMsg: - if msg.postPolicy { - sendPaths(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, msg.pathList) - } else { - sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList) - } - case *watcherEventStateChangedMsg: - peer := &api.Peer{ - Conf: &api.PeerConf{ - PeerAs: msg.peerAS, - LocalAs: msg.localAS, - NeighborAddress: msg.peerAddress.String(), - Id: msg.peerID.String(), - }, - Info: &api.PeerState{ - PeerAs: msg.peerAS, - LocalAs: msg.localAS, - NeighborAddress: msg.peerAddress.String(), - BgpState: msg.state.String(), - AdminState: msg.adminState.String(), - }, - Transport: &api.Transport{ - LocalAddress: msg.localAddress.String(), - LocalPort: uint32(msg.localPort), - RemotePort: uint32(msg.peerPort), - }, - } - reqType := WATCHER_EVENT_STATE_CHANGE - remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) - result := &GrpcResponse{ - Data: peer, - } - for _, req := range w.reqs[reqType] { - select { - case <-req.EndCh: - continue - default: - } - remains = append(remains, req) - if req.Name != "" && req.Name != peer.Conf.NeighborAddress { - continue - } - req.ResponseCh <- result - } - w.reqs[reqType] = remains - } - } - } -} - -func (w *grpcWatcher) addRequest(req *GrpcRequest) error { - w.ctlCh <- req - return nil -} - -func newGrpcWatcher() (*grpcWatcher, error) { - w := &grpcWatcher{ - ch: make(chan watcherEvent), - ctlCh: make(chan *GrpcRequest), - reqs: make(map[watcherEventType][]*GrpcRequest), - } - w.t.Go(w.loop) - return w, nil -} diff --git a/server/server.go b/server/server.go index 54702226..e6d78112 100644 --- a/server/server.go +++ b/server/server.go @@ -105,6 +105,7 @@ type BgpServer struct { roaManager *roaManager shutdown bool watchers *watcherManager + watcherMap map[watchType][]*Watcher } func NewBgpServer() *BgpServer { @@ -116,6 +117,7 @@ func NewBgpServer() *BgpServer { roaManager: roaManager, watchers: newWatcherManager(), mgmtCh: make(chan func(), 1), + watcherMap: make(map[watchType][]*Watcher), } } @@ -133,9 +135,6 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { } func (server *BgpServer) Serve() { - w, _ := newGrpcWatcher() - server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w) - server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) @@ -367,6 +366,9 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil if !peer.isRouteServerClient() { server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) + for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { + w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) + } } for _, targetPeer := range server.neighborMap { @@ -383,27 +385,30 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { + _, rport := peer.fsm.RemoteHostPort() + laddr, lport := peer.fsm.LocalHostPort() + sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) + recvOpen := peer.fsm.recvOpen + ev := &watcherEventStateChangedMsg{ + peerAS: peer.fsm.peerInfo.AS, + localAS: peer.fsm.peerInfo.LocalAS, + peerAddress: peer.fsm.peerInfo.Address, + localAddress: net.ParseIP(laddr), + peerPort: rport, + localPort: lport, + peerID: peer.fsm.peerInfo.ID, + sentOpen: sentOpen, + recvOpen: recvOpen, + state: newState, + adminState: peer.fsm.adminState, + timestamp: time.Now(), + } if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { - _, rport := peer.fsm.RemoteHostPort() - laddr, lport := peer.fsm.LocalHostPort() - sentOpen := buildopen(peer.fsm.gConf, peer.fsm.pConf) - recvOpen := peer.fsm.recvOpen - ev := &watcherEventStateChangedMsg{ - peerAS: peer.fsm.peerInfo.AS, - localAS: peer.fsm.peerInfo.LocalAS, - peerAddress: peer.fsm.peerInfo.Address, - localAddress: net.ParseIP(laddr), - peerPort: rport, - localPort: lport, - peerID: peer.fsm.peerInfo.ID, - sentOpen: sentOpen, - recvOpen: recvOpen, - state: newState, - adminState: peer.fsm.adminState, - timestamp: time.Now(), - } server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } + for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] { + w.notify(ev) + } } } @@ -515,6 +520,10 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []* return alteredPathList } server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) + for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { + w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) + } + } for _, targetPeer := range server.neighborMap { @@ -657,7 +666,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { sendFsmOutgoingMsg(peer, nil, notification, true) return } - if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { + if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -674,12 +683,15 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { pathList: pathList, } server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev) + for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] { + w.notify(ev) + } } if len(pathList) > 0 { var altered []*table.Path altered = server.propagateUpdate(peer, pathList) - if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) { + if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -697,6 +709,9 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { payload, _ := u.Serialize() ev.payload = payload server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) + for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] { + w.notify(ev) + } } } } @@ -1780,16 +1795,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { } grpcReq.ResponseCh <- result close(grpcReq.ResponseCh) - case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE: - if grpcReq.Name != "" { - if _, err = server.checkNeighborRequest(grpcReq); err != nil { - break - } - } - w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR) - if y { - go w.(*grpcWatcher).addRequest(grpcReq) - } case REQ_ENABLE_MRT: server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: @@ -2787,3 +2792,141 @@ func (server *BgpServer) handleModRpki(grpcReq *GrpcRequest) { done(grpcReq, &api.SoftResetRpkiResponse{}, server.roaManager.SoftReset(arg.Address)) } } + +type watchType string + +const ( + WATCH_TYPE_BESTPATH watchType = "bestpath" + WATCH_TYPE_PRE_UPDATE watchType = "preupdate" + WATCH_TYPE_POST_UPDATE watchType = "postupdate" + WATCH_TYPE_PEER_STATE watchType = "peerstate" +) + +type watchOptions struct { + bestpath bool + preUpdate bool + postUpdate bool + peerState bool +} + +type WatchOption func(*watchOptions) + +func WatchBestPath() WatchOption { + return func(o *watchOptions) { + o.bestpath = true + } +} + +func WatchUpdate() WatchOption { + return func(o *watchOptions) { + o.preUpdate = true + } +} + +func WatchPostUpdate() WatchOption { + return func(o *watchOptions) { + o.postUpdate = true + } +} + +func WatchPeerState() WatchOption { + return func(o *watchOptions) { + o.peerState = true + } +} + +type Watcher struct { + opts watchOptions + realCh chan watcherEvent + ch *channels.InfiniteChannel + s *BgpServer +} + +func (w *Watcher) Event() <-chan watcherEvent { + return w.realCh +} + +func (w *Watcher) notify(v watcherEvent) { + w.ch.In() <- v +} + +func (w *Watcher) loop() { + for { + select { + case ev, ok := <-w.ch.Out(): + if !ok { + close(w.realCh) + return + } + w.realCh <- ev.(watcherEvent) + } + } +} + +func (w *Watcher) Stop() { + ch := make(chan struct{}) + defer func() { <-ch }() + + w.s.mgmtCh <- func() { + defer close(ch) + + for k, l := range w.s.watcherMap { + for i, v := range l { + if w == v { + w.s.watcherMap[k] = append(l[:i], l[i+1:]...) + break + } + } + } + + w.ch.Close() + // make sure the loop function finishes + func() { + for { + select { + case <-w.realCh: + default: + return + } + } + }() + } +} + +func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + w = &Watcher{ + s: s, + realCh: make(chan watcherEvent, 8), + ch: channels.NewInfiniteChannel(), + } + + for _, opt := range opts { + opt(&w.opts) + } + + register := func(t watchType, w *Watcher) { + s.watcherMap[t] = append(s.watcherMap[t], w) + } + + if w.opts.bestpath { + register(WATCH_TYPE_BESTPATH, w) + } + if w.opts.preUpdate { + register(WATCH_TYPE_PRE_UPDATE, w) + } + if w.opts.postUpdate { + register(WATCH_TYPE_POST_UPDATE, w) + } + if w.opts.peerState { + register(WATCH_TYPE_PEER_STATE, w) + } + go w.loop() + } + return w +} |