diff options
Diffstat (limited to 'server')
-rw-r--r-- | server/grpc_server.go | 15 | ||||
-rw-r--r-- | server/monitor.go | 121 | ||||
-rw-r--r-- | server/server.go | 16 | ||||
-rw-r--r-- | server/watcher.go | 3 |
4 files changed, 154 insertions, 1 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 9afb9f8a..a15143ee 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -45,6 +45,7 @@ const ( REQ_MOD_NEIGHBOR REQ_GLOBAL_RIB REQ_MONITOR_GLOBAL_BEST_CHANGED + REQ_MONITOR_INCOMING REQ_MONITOR_NEIGHBOR_PEER_STATE REQ_MONITOR_ROA_VALIDATION_RESULT REQ_MRT_GLOBAL_RIB @@ -166,6 +167,20 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.GobgpApi_Moni }) } +func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error { + switch arg.Type { + case api.Resource_ADJ_IN: + default: + return fmt.Errorf("unsupported resource type: %v", arg.Type) + } + + req := NewGrpcRequest(REQ_MONITOR_INCOMING, arg.Name, bgp.RouteFamily(arg.Family), arg) + s.bgpServerCh <- req + return handleMultipleResponses(req, func(res *GrpcResponse) error { + return stream.Send(res.Data.(*api.Destination)) + }) +} + func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error { var rf bgp.RouteFamily req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil) diff --git a/server/monitor.go b/server/monitor.go new file mode 100644 index 00000000..4849f1eb --- /dev/null +++ b/server/monitor.go @@ -0,0 +1,121 @@ +// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + api "github.com/osrg/gobgp/api" + "github.com/osrg/gobgp/packet" + "github.com/osrg/gobgp/table" + "gopkg.in/tomb.v2" +) + +type grpcIncomingWatcher struct { + t tomb.Tomb + ch chan watcherEvent + ctlCh chan *GrpcRequest + reqs []*GrpcRequest +} + +func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { + return w.ch + } + return nil +} + +func (w *grpcIncomingWatcher) stop() { + w.t.Kill(nil) +} + +func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType { + pre := false + post := false + for _, req := range w.reqs { + if req.Data.(*api.Table).PostPolicy { + post = true + } else { + pre = true + } + } + types := make([]watcherEventType, 0, 2) + if pre { + types = append(types, WATCHER_EVENT_UPDATE_MSG) + } + if post { + types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG) + } + return types +} + +func (w *grpcIncomingWatcher) loop() error { + for { + select { + case <-w.t.Dying(): + for _, req := range w.reqs { + close(req.ResponseCh) + } + return nil + case req := <-w.ctlCh: + w.reqs = append(w.reqs, req) + case ev := <-w.ch: + msg := ev.(*watcherEventUpdateMsg) + for _, path := range msg.pathList { + remains := make([]*GrpcRequest, 0, len(w.reqs)) + result := &GrpcResponse{ + Data: &api.Destination{ + Prefix: path.GetNlri().String(), + Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)}, + }, + } + for _, req := range w.reqs { + select { + case <-req.EndCh: + continue + default: + } + remains = append(remains, req) + if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() { + continue + } + if req.Name != "" && req.Name != path.GetSource().Address.String() { + continue + } + req.ResponseCh <- result + } + w.reqs = remains + } + } + } +} + +func (w *grpcIncomingWatcher) restart(string) error { + return nil +} + +func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error { + w.ctlCh <- req + return nil +} + +func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) { + w := &grpcIncomingWatcher{ + ch: make(chan watcherEvent), + ctlCh: make(chan *GrpcRequest), + reqs: make([]*GrpcRequest, 0, 16), + } + w.t.Go(w.loop) + return w, nil +} diff --git a/server/server.go b/server/server.go index 9d59684f..c8aba0d9 100644 --- a/server/server.go +++ b/server/server.go @@ -230,6 +230,9 @@ func (server *BgpServer) Serve() { } } + w, _ := newGrpcIncomingWatcher() + server.watchers[WATCHER_GRPC_INCOMING] = w + if g.Zebra.Enabled == true { if g.Zebra.Url == "" { g.Zebra.Url = "unix:/var/run/quagga/zserv.api" @@ -989,6 +992,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { case *bgp.MessageError: msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)})) case *bgp.BGPMessage: + pathList, msgList := peer.handleBGPmessage(e) + if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() @@ -1003,11 +1008,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { timestamp: e.timestamp, payload: e.payload, postPolicy: false, + pathList: pathList, } server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) } - pathList, msgList := peer.handleBGPmessage(e) if len(msgList) > 0 { msgs = append(msgs, newSenderMsg(peer, msgList)) } @@ -1027,6 +1032,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { fourBytesAs: y, timestamp: e.timestamp, postPolicy: true, + pathList: altered, } for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() @@ -2074,6 +2080,14 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { close(grpcReq.ResponseCh) case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE, REQ_MONITOR_ROA_VALIDATION_RESULT: server.broadcastReqs = append(server.broadcastReqs, grpcReq) + case REQ_MONITOR_INCOMING: + if grpcReq.Name != "" { + if _, err = server.checkNeighborRequest(grpcReq); err != nil { + break + } + } + w := server.watchers[WATCHER_GRPC_INCOMING] + go w.(*grpcIncomingWatcher).addRequest(grpcReq) case REQ_MRT_GLOBAL_RIB, REQ_MRT_LOCAL_RIB: server.handleMrt(grpcReq) case REQ_MOD_MRT: diff --git a/server/watcher.go b/server/watcher.go index 6cf2abc8..591d5d89 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -19,6 +19,7 @@ import ( "fmt" log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/packet" + "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" "net" "os" @@ -42,6 +43,7 @@ const ( WATCHER_BMP WATCHER_ZEBRA WATCHER_GRPC_BESTPATH + WATCHER_GRPC_INCOMING ) type watcherEventType uint8 @@ -68,6 +70,7 @@ type watcherEventUpdateMsg struct { timestamp time.Time payload []byte postPolicy bool + pathList []*table.Path } type watcherEventStateChangedMsg struct { |