summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/grpc_server.go15
-rw-r--r--server/monitor.go121
-rw-r--r--server/server.go16
-rw-r--r--server/watcher.go3
4 files changed, 154 insertions, 1 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go
index 9afb9f8a..a15143ee 100644
--- a/server/grpc_server.go
+++ b/server/grpc_server.go
@@ -45,6 +45,7 @@ const (
REQ_MOD_NEIGHBOR
REQ_GLOBAL_RIB
REQ_MONITOR_GLOBAL_BEST_CHANGED
+ REQ_MONITOR_INCOMING
REQ_MONITOR_NEIGHBOR_PEER_STATE
REQ_MONITOR_ROA_VALIDATION_RESULT
REQ_MRT_GLOBAL_RIB
@@ -166,6 +167,20 @@ func (s *Server) MonitorBestChanged(arg *api.Arguments, stream api.GobgpApi_Moni
})
}
+func (s *Server) MonitorRib(arg *api.Table, stream api.GobgpApi_MonitorRibServer) error {
+ switch arg.Type {
+ case api.Resource_ADJ_IN:
+ default:
+ return fmt.Errorf("unsupported resource type: %v", arg.Type)
+ }
+
+ req := NewGrpcRequest(REQ_MONITOR_INCOMING, arg.Name, bgp.RouteFamily(arg.Family), arg)
+ s.bgpServerCh <- req
+ return handleMultipleResponses(req, func(res *GrpcResponse) error {
+ return stream.Send(res.Data.(*api.Destination))
+ })
+}
+
func (s *Server) MonitorPeerState(arg *api.Arguments, stream api.GobgpApi_MonitorPeerStateServer) error {
var rf bgp.RouteFamily
req := NewGrpcRequest(REQ_MONITOR_NEIGHBOR_PEER_STATE, arg.Name, rf, nil)
diff --git a/server/monitor.go b/server/monitor.go
new file mode 100644
index 00000000..4849f1eb
--- /dev/null
+++ b/server/monitor.go
@@ -0,0 +1,121 @@
+// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ api "github.com/osrg/gobgp/api"
+ "github.com/osrg/gobgp/packet"
+ "github.com/osrg/gobgp/table"
+ "gopkg.in/tomb.v2"
+)
+
+type grpcIncomingWatcher struct {
+ t tomb.Tomb
+ ch chan watcherEvent
+ ctlCh chan *GrpcRequest
+ reqs []*GrpcRequest
+}
+
+func (w *grpcIncomingWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
+ return w.ch
+ }
+ return nil
+}
+
+func (w *grpcIncomingWatcher) stop() {
+ w.t.Kill(nil)
+}
+
+func (w *grpcIncomingWatcher) watchingEventTypes() []watcherEventType {
+ pre := false
+ post := false
+ for _, req := range w.reqs {
+ if req.Data.(*api.Table).PostPolicy {
+ post = true
+ } else {
+ pre = true
+ }
+ }
+ types := make([]watcherEventType, 0, 2)
+ if pre {
+ types = append(types, WATCHER_EVENT_UPDATE_MSG)
+ }
+ if post {
+ types = append(types, WATCHER_EVENT_POST_POLICY_UPDATE_MSG)
+ }
+ return types
+}
+
+func (w *grpcIncomingWatcher) loop() error {
+ for {
+ select {
+ case <-w.t.Dying():
+ for _, req := range w.reqs {
+ close(req.ResponseCh)
+ }
+ return nil
+ case req := <-w.ctlCh:
+ w.reqs = append(w.reqs, req)
+ case ev := <-w.ch:
+ msg := ev.(*watcherEventUpdateMsg)
+ for _, path := range msg.pathList {
+ remains := make([]*GrpcRequest, 0, len(w.reqs))
+ result := &GrpcResponse{
+ Data: &api.Destination{
+ Prefix: path.GetNlri().String(),
+ Paths: []*api.Path{path.ToApiStruct(table.GLOBAL_RIB_NAME)},
+ },
+ }
+ for _, req := range w.reqs {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
+ remains = append(remains, req)
+ if req.RouteFamily != bgp.RouteFamily(0) && req.RouteFamily != path.GetRouteFamily() {
+ continue
+ }
+ if req.Name != "" && req.Name != path.GetSource().Address.String() {
+ continue
+ }
+ req.ResponseCh <- result
+ }
+ w.reqs = remains
+ }
+ }
+ }
+}
+
+func (w *grpcIncomingWatcher) restart(string) error {
+ return nil
+}
+
+func (w *grpcIncomingWatcher) addRequest(req *GrpcRequest) error {
+ w.ctlCh <- req
+ return nil
+}
+
+func newGrpcIncomingWatcher() (*grpcIncomingWatcher, error) {
+ w := &grpcIncomingWatcher{
+ ch: make(chan watcherEvent),
+ ctlCh: make(chan *GrpcRequest),
+ reqs: make([]*GrpcRequest, 0, 16),
+ }
+ w.t.Go(w.loop)
+ return w, nil
+}
diff --git a/server/server.go b/server/server.go
index 9d59684f..c8aba0d9 100644
--- a/server/server.go
+++ b/server/server.go
@@ -230,6 +230,9 @@ func (server *BgpServer) Serve() {
}
}
+ w, _ := newGrpcIncomingWatcher()
+ server.watchers[WATCHER_GRPC_INCOMING] = w
+
if g.Zebra.Enabled == true {
if g.Zebra.Url == "" {
g.Zebra.Url = "unix:/var/run/quagga/zserv.api"
@@ -989,6 +992,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
case *bgp.MessageError:
msgs = append(msgs, newSenderMsg(peer, []*bgp.BGPMessage{bgp.NewBGPNotificationMessage(m.TypeCode, m.SubTypeCode, m.Data)}))
case *bgp.BGPMessage:
+ pathList, msgList := peer.handleBGPmessage(e)
+
if m.Header.Type == bgp.BGP_MSG_UPDATE && server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
@@ -1003,11 +1008,11 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
timestamp: e.timestamp,
payload: e.payload,
postPolicy: false,
+ pathList: pathList,
}
server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
}
- pathList, msgList := peer.handleBGPmessage(e)
if len(msgList) > 0 {
msgs = append(msgs, newSenderMsg(peer, msgList))
}
@@ -1027,6 +1032,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
fourBytesAs: y,
timestamp: e.timestamp,
postPolicy: true,
+ pathList: altered,
}
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
@@ -2074,6 +2080,14 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
close(grpcReq.ResponseCh)
case REQ_MONITOR_GLOBAL_BEST_CHANGED, REQ_MONITOR_NEIGHBOR_PEER_STATE, REQ_MONITOR_ROA_VALIDATION_RESULT:
server.broadcastReqs = append(server.broadcastReqs, grpcReq)
+ case REQ_MONITOR_INCOMING:
+ if grpcReq.Name != "" {
+ if _, err = server.checkNeighborRequest(grpcReq); err != nil {
+ break
+ }
+ }
+ w := server.watchers[WATCHER_GRPC_INCOMING]
+ go w.(*grpcIncomingWatcher).addRequest(grpcReq)
case REQ_MRT_GLOBAL_RIB, REQ_MRT_LOCAL_RIB:
server.handleMrt(grpcReq)
case REQ_MOD_MRT:
diff --git a/server/watcher.go b/server/watcher.go
index 6cf2abc8..591d5d89 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -19,6 +19,7 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/osrg/gobgp/packet"
+ "github.com/osrg/gobgp/table"
"gopkg.in/tomb.v2"
"net"
"os"
@@ -42,6 +43,7 @@ const (
WATCHER_BMP
WATCHER_ZEBRA
WATCHER_GRPC_BESTPATH
+ WATCHER_GRPC_INCOMING
)
type watcherEventType uint8
@@ -68,6 +70,7 @@ type watcherEventUpdateMsg struct {
timestamp time.Time
payload []byte
postPolicy bool
+ pathList []*table.Path
}
type watcherEventStateChangedMsg struct {