summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2016-05-25 04:39:07 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-06-06 12:43:20 +0900
commitb63e1c1fc3c40b58ba798bbae4e122f0eedaf55d (patch)
tree48356ad79cdc1d2a9deede9650a37556d4a86adf /server
parentaca6fd6ad4409b4cb63682bff3c79fca8ca2800d (diff)
server: refactor monitor/watcher infra
have watcherManager to manage all watchers also merge grpc neighbor state monitoring handling to grpcWatcher Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/monitor.go71
-rw-r--r--server/mrt.go196
-rw-r--r--server/server.go162
-rw-r--r--server/watcher.go232
4 files changed, 357 insertions, 304 deletions
diff --git a/server/monitor.go b/server/monitor.go
index c7236fed..e8e7df4d 100644
--- a/server/monitor.go
+++ b/server/monitor.go
@@ -30,7 +30,7 @@ type grpcWatcher struct {
}
func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG {
+ if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE {
return w.ch
}
return nil
@@ -41,8 +41,8 @@ func (w *grpcWatcher) stop() {
}
func (w *grpcWatcher) watchingEventTypes() []watcherEventType {
- types := make([]watcherEventType, 0, 3)
- for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} {
+ types := make([]watcherEventType, 0, 4)
+ for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE} {
if len(w.reqs[t]) > 0 {
types = append(types, t)
}
@@ -61,19 +61,22 @@ func (w *grpcWatcher) loop() error {
}
return nil
case req := <-w.ctlCh:
- tbl := req.Data.(*api.Table)
var reqType watcherEventType
- switch tbl.Type {
- case api.Resource_GLOBAL:
- reqType = WATCHER_EVENT_BESTPATH_CHANGE
- case api.Resource_ADJ_IN:
- if tbl.PostPolicy {
- reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
- } else {
- reqType = WATCHER_EVENT_UPDATE_MSG
+ switch req.RequestType {
+ case REQ_MONITOR_RIB:
+ tbl := req.Data.(*api.Table)
+ switch tbl.Type {
+ case api.Resource_GLOBAL:
+ reqType = WATCHER_EVENT_BESTPATH_CHANGE
+ case api.Resource_ADJ_IN:
+ if tbl.PostPolicy {
+ reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG
+ } else {
+ reqType = WATCHER_EVENT_UPDATE_MSG
+ }
}
- default:
- continue
+ case REQ_MONITOR_NEIGHBOR_PEER_STATE:
+ reqType = WATCHER_EVENT_STATE_CHANGE
}
reqs := w.reqs[reqType]
if reqs == nil {
@@ -121,8 +124,46 @@ func (w *grpcWatcher) loop() error {
} else {
sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList)
}
+ case *watcherEventStateChangedMsg:
+ peer := &api.Peer{
+ Conf: &api.PeerConf{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ Id: msg.peerID.String(),
+ },
+ Info: &api.PeerState{
+ PeerAs: msg.peerAS,
+ LocalAs: msg.localAS,
+ NeighborAddress: msg.peerAddress.String(),
+ BgpState: msg.state.String(),
+ AdminState: msg.adminState.String(),
+ },
+ Transport: &api.Transport{
+ LocalAddress: msg.localAddress.String(),
+ LocalPort: uint32(msg.localPort),
+ RemotePort: uint32(msg.peerPort),
+ },
+ }
+ reqType := WATCHER_EVENT_STATE_CHANGE
+ remains := make([]*GrpcRequest, 0, len(w.reqs[reqType]))
+ result := &GrpcResponse{
+ Data: peer,
+ }
+ for _, req := range w.reqs[reqType] {
+ select {
+ case <-req.EndCh:
+ continue
+ default:
+ }
+ remains = append(remains, req)
+ if req.Name != "" && req.Name != peer.Conf.NeighborAddress {
+ continue
+ }
+ req.ResponseCh <- result
+ }
+ w.reqs[reqType] = remains
}
-
}
}
}
diff --git a/server/mrt.go b/server/mrt.go
new file mode 100644
index 00000000..ce4ac90b
--- /dev/null
+++ b/server/mrt.go
@@ -0,0 +1,196 @@
+// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "bytes"
+ "os"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ "github.com/osrg/gobgp/packet/mrt"
+ "gopkg.in/tomb.v2"
+)
+
+type mrtWatcher struct {
+ t tomb.Tomb
+ filename string
+ file *os.File
+ ch chan watcherEvent
+ interval uint64
+}
+
+func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_UPDATE_MSG {
+ return w.ch
+ }
+ return nil
+}
+
+func (w *mrtWatcher) stop() {
+ w.t.Kill(nil)
+}
+
+func (w *mrtWatcher) restart(filename string) error {
+ return nil
+}
+
+func (w *mrtWatcher) loop() error {
+ c := func() *time.Ticker {
+ if w.interval == 0 {
+ return &time.Ticker{}
+ }
+ return time.NewTicker(time.Second * time.Duration(w.interval))
+ }()
+
+ defer func() {
+ if w.file != nil {
+ w.file.Close()
+ }
+ if w.interval != 0 {
+ c.Stop()
+ }
+ }()
+
+ for {
+ serialize := func(ev watcherEvent) ([]byte, error) {
+ m := ev.(*watcherEventUpdateMsg)
+ subtype := mrt.MESSAGE_AS4
+ mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
+ mp.BGPMessagePayload = m.payload
+ if m.fourBytesAs == false {
+ subtype = mrt.MESSAGE
+ }
+ bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": m,
+ }).Warn(err)
+ return nil, err
+ }
+ return bm.Serialize()
+ }
+
+ drain := func(ev watcherEvent) {
+ events := make([]watcherEvent, 0, 1+len(w.ch))
+ if ev != nil {
+ events = append(events, ev)
+ }
+
+ for len(w.ch) > 0 {
+ e := <-w.ch
+ events = append(events, e)
+ }
+
+ w := func(buf []byte) {
+ if _, err := w.file.Write(buf); err == nil {
+ w.file.Sync()
+ } else {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Error": err,
+ }).Warn(err)
+ }
+ }
+
+ var b bytes.Buffer
+ for _, e := range events {
+ buf, err := serialize(e)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": e,
+ }).Warn(err)
+ continue
+ }
+ b.Write(buf)
+ if b.Len() > 1*1000*1000 {
+ w(b.Bytes())
+ b.Reset()
+ }
+ }
+ if b.Len() > 0 {
+ w(b.Bytes())
+ }
+ }
+ select {
+ case <-w.t.Dying():
+ drain(nil)
+ return nil
+ case e := <-w.ch:
+ drain(e)
+ case <-c.C:
+ w.file.Close()
+ file, err := mrtFileOpen(w.filename, w.interval)
+ if err == nil {
+ w.file = file
+ } else {
+ log.Info("can't rotate mrt file", err)
+ }
+ }
+ }
+}
+
+func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
+ return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
+}
+
+func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
+ realname := filename
+ if interval != 0 {
+ realname = time.Now().Format(filename)
+ }
+
+ i := len(realname)
+ for i > 0 && os.IsPathSeparator(realname[i-1]) {
+ // skip trailing path separators
+ i--
+ }
+ j := i
+
+ for j > 0 && !os.IsPathSeparator(realname[j-1]) {
+ j--
+ }
+
+ if j > 0 {
+ if err := os.MkdirAll(realname[0:j-1], 0755); err != nil {
+ log.Warn(err)
+ return nil, err
+ }
+ }
+
+ file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ log.Warn(err)
+ }
+ return file, err
+}
+
+func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) {
+ file, err := mrtFileOpen(filename, interval)
+ if err != nil {
+ return nil, err
+ }
+ w := mrtWatcher{
+ filename: filename,
+ file: file,
+ ch: make(chan watcherEvent, 1<<16),
+ interval: interval,
+ }
+ w.t.Go(w.loop)
+ return &w, nil
+}
diff --git a/server/server.go b/server/server.go
index 7bd29048..4d5ebf43 100644
--- a/server/server.go
+++ b/server/server.go
@@ -42,50 +42,6 @@ type SenderMsg struct {
msg *FsmOutgoingMsg
}
-type broadcastMsg interface {
- send()
-}
-
-type broadcastGrpcMsg struct {
- req *GrpcRequest
- result *GrpcResponse
- done bool
-}
-
-func (m *broadcastGrpcMsg) send() {
- m.req.ResponseCh <- m.result
- if m.done == true {
- close(m.req.ResponseCh)
- }
-}
-
-type broadcastBGPMsg struct {
- message *bgp.BGPMessage
- peerAS uint32
- localAS uint32
- peerAddress net.IP
- localAddress net.IP
- fourBytesAs bool
- ch chan *broadcastBGPMsg
-}
-
-func (m *broadcastBGPMsg) send() {
- m.ch <- m
-}
-
-type Watchers map[watcherType]watcher
-
-func (ws Watchers) watching(typ watcherEventType) bool {
- for _, w := range ws {
- for _, ev := range w.watchingEventTypes() {
- if ev == typ {
- return true
- }
- }
- }
- return false
-}
-
type TCPListener struct {
l *net.TCPListener
ch chan struct{}
@@ -146,16 +102,14 @@ type BgpServer struct {
acceptCh chan *net.TCPConn
collector *Collector
- GrpcReqCh chan *GrpcRequest
- policy *table.RoutingPolicy
- broadcastReqs []*GrpcRequest
- broadcastMsgs []broadcastMsg
- listeners []*TCPListener
- neighborMap map[string]*Peer
- globalRib *table.TableManager
- roaManager *roaManager
- shutdown bool
- watchers Watchers
+ GrpcReqCh chan *GrpcRequest
+ policy *table.RoutingPolicy
+ listeners []*TCPListener
+ neighborMap map[string]*Peer
+ globalRib *table.TableManager
+ roaManager *roaManager
+ shutdown bool
+ watchers *watcherManager
}
func NewBgpServer() *BgpServer {
@@ -163,24 +117,12 @@ func NewBgpServer() *BgpServer {
return &BgpServer{
GrpcReqCh: make(chan *GrpcRequest, 1),
neighborMap: make(map[string]*Peer),
- watchers: Watchers(make(map[watcherType]watcher)),
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
+ watchers: newWatcherManager(),
}
}
-func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error {
- for _, watcher := range server.watchers {
- if ch := watcher.notify(typ); ch != nil {
- server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{
- ch: ch,
- event: ev,
- })
- }
- }
- return nil
-}
-
func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
list := make([]*net.TCPListener, 0, len(server.listeners))
rhs := net.ParseIP(addr).To4() != nil
@@ -196,7 +138,7 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
func (server *BgpServer) Serve() {
w, _ := newGrpcWatcher()
- server.watchers[WATCHER_GRPC_MONITOR] = w
+ server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w)
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
@@ -213,14 +155,6 @@ func (server *BgpServer) Serve() {
}(senderCh)
- broadcastCh := make(chan broadcastMsg, 8)
- go func(ch chan broadcastMsg) {
- for {
- m := <-ch
- m.send()
- }
- }(broadcastCh)
-
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
@@ -249,12 +183,6 @@ func (server *BgpServer) Serve() {
sCh = senderCh
firstMsg = senderMsgs[0]
}
- var firstBroadcastMsg broadcastMsg
- var bCh chan broadcastMsg
- if len(server.broadcastMsgs) > 0 {
- bCh = broadcastCh
- firstBroadcastMsg = server.broadcastMsgs[0]
- }
passConn := func(conn *net.TCPConn) {
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
@@ -336,8 +264,6 @@ func (server *BgpServer) Serve() {
handleFsmMsg(e)
case sCh <- firstMsg:
senderMsgs = senderMsgs[1:]
- case bCh <- firstBroadcastMsg:
- server.broadcastMsgs = server.broadcastMsgs[1:]
case grpcReq := <-server.GrpcReqCh:
m := server.handleGrpc(grpcReq)
if len(m) > 0 {
@@ -478,7 +404,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf)
if !peer.isRouteServerClient() {
- server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
+ server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -494,30 +420,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil
}
func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
- result := &GrpcResponse{
- Data: peer.ToApiStruct(),
- }
- remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs))
- for _, req := range server.broadcastReqs {
- select {
- case <-req.EndCh:
- continue
- default:
- }
- ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE
- ignore = ignore || (req.Name != "" && req.Name != peer.fsm.pConf.Config.NeighborAddress)
- if ignore {
- remainReqs = append(remainReqs, req)
- continue
- }
- m := &broadcastGrpcMsg{
- req: req,
- result: result,
- }
- server.broadcastMsgs = append(server.broadcastMsgs, m)
- remainReqs = append(remainReqs, req)
- }
- server.broadcastReqs = remainReqs
newState := peer.fsm.state
if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED {
if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) {
@@ -536,9 +438,10 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) {
sentOpen: sentOpen,
recvOpen: recvOpen,
state: newState,
+ adminState: peer.fsm.adminState,
timestamp: time.Now(),
}
- server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev)
+ server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev)
}
}
}
@@ -650,7 +553,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([]
if len(best[table.GLOBAL_RIB_NAME]) == 0 {
return nil, alteredPathList
}
- server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
+ server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]})
}
for _, targetPeer := range server.neighborMap {
@@ -807,7 +710,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
postPolicy: false,
pathList: pathList,
}
- server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev)
+ server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev)
}
if len(pathList) > 0 {
@@ -830,7 +733,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
for _, u := range table.CreateUpdateMsgFromPaths(altered) {
payload, _ := u.Serialize()
ev.payload = payload
- server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
+ server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev)
}
}
}
@@ -2253,16 +2156,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
Data: data,
}
close(grpcReq.ResponseCh)
- case REQ_MONITOR_NEIGHBOR_PEER_STATE:
- server.broadcastReqs = append(server.broadcastReqs, grpcReq)
- case REQ_MONITOR_RIB:
+ case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE:
if grpcReq.Name != "" {
if _, err = server.checkNeighborRequest(grpcReq); err != nil {
break
}
}
- w := server.watchers[WATCHER_GRPC_MONITOR]
- go w.(*grpcWatcher).addRequest(grpcReq)
+ w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR)
+ if y {
+ go w.(*grpcWatcher).addRequest(grpcReq)
+ }
case REQ_ENABLE_MRT:
server.handleEnableMrtRequest(grpcReq)
case REQ_DISABLE_MRT:
@@ -2308,7 +2211,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
}
z, err := newZebraWatcher(server.GrpcReqCh, c.Url, protos)
if err == nil {
- server.watchers[WATCHER_ZEBRA] = z
+ server.watchers.addWatcher(WATCHER_ZEBRA, z)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -2318,8 +2221,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
c := grpcReq.Data.(*config.CollectorConfig)
collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval)
if err == nil {
- server.collector = collector
- server.watchers[WATCHER_COLLECTOR] = collector
+ server.watchers.addWatcher(WATCHER_COLLECTOR, collector)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -2333,7 +2235,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
grpcReq.ResponseCh <- &GrpcResponse{}
close(grpcReq.ResponseCh)
- server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList})
+ server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList})
default:
err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType)
goto ERROR
@@ -3121,7 +3023,7 @@ func grpcDone(grpcReq *GrpcRequest, e error) {
func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
arg := grpcReq.Data.(*api.EnableMrtRequest)
- if _, y := server.watchers[WATCHER_MRT]; y {
+ if _, y := server.watchers.watcher(WATCHER_MRT); y {
grpcDone(grpcReq, fmt.Errorf("already enabled"))
return
}
@@ -3131,7 +3033,7 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
}
w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval)
if err == nil {
- server.watchers[WATCHER_MRT] = w
+ server.watchers.addWatcher(WATCHER_MRT, w)
}
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
@@ -3141,14 +3043,12 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) {
}
func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) {
- w, y := server.watchers[WATCHER_MRT]
+ _, y := server.watchers.watcher(WATCHER_MRT)
if !y {
grpcDone(grpcReq, fmt.Errorf("not enabled yet"))
return
}
-
- delete(server.watchers, WATCHER_MRT)
- w.stop()
+ server.watchers.delWatcher(WATCHER_MRT)
grpcReq.ResponseCh <- &GrpcResponse{
Data: &api.DisableMrtResponse{},
}
@@ -3168,10 +3068,10 @@ func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) {
c = arg
}
- w, y := server.watchers[WATCHER_BMP]
+ w, y := server.watchers.watcher(WATCHER_BMP)
if !y {
w, _ = newBmpWatcher(server.GrpcReqCh)
- server.watchers[WATCHER_BMP] = w
+ server.watchers.addWatcher(WATCHER_BMP, w)
}
err := w.(*bmpWatcher).addServer(*c)
@@ -3194,7 +3094,7 @@ func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) {
c = arg
}
- if w, y := server.watchers[WATCHER_BMP]; y {
+ if w, y := server.watchers.watcher(WATCHER_BMP); y {
err := w.(*bmpWatcher).deleteServer(*c)
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: err,
diff --git a/server/watcher.go b/server/watcher.go
index 8c92fab3..4b4b700c 100644
--- a/server/watcher.go
+++ b/server/watcher.go
@@ -16,26 +16,18 @@
package server
import (
- "bytes"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+
log "github.com/Sirupsen/logrus"
+ "github.com/eapache/channels"
"github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/packet/mrt"
"github.com/osrg/gobgp/table"
"gopkg.in/tomb.v2"
- "net"
- "os"
- "time"
)
-type broadcastWatcherMsg struct {
- ch chan watcherEvent
- event watcherEvent
-}
-
-func (m *broadcastWatcherMsg) send() {
- m.ch <- m.event
-}
-
type watcherType uint8
const (
@@ -86,6 +78,7 @@ type watcherEventStateChangedMsg struct {
sentOpen *bgp.BGPMessage
recvOpen *bgp.BGPMessage
state bgp.FSMState
+ adminState AdminState
timestamp time.Time
}
@@ -104,172 +97,95 @@ type watcher interface {
watchingEventTypes() []watcherEventType
}
-type mrtWatcher struct {
- t tomb.Tomb
- filename string
- file *os.File
- ch chan watcherEvent
- interval uint64
+type watcherMsg struct {
+ typ watcherEventType
+ ev watcherEvent
}
-func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
- if t == WATCHER_EVENT_UPDATE_MSG {
- return w.ch
- }
- return nil
+type watcherManager struct {
+ t tomb.Tomb
+ mu sync.RWMutex
+ m map[watcherType]watcher
+ ch *channels.InfiniteChannel
}
-func (w *mrtWatcher) stop() {
- w.t.Kill(nil)
+func (m *watcherManager) watching(typ watcherEventType) bool {
+ for _, w := range m.m {
+ for _, ev := range w.watchingEventTypes() {
+ if ev == typ {
+ return true
+ }
+ }
+ }
+ return false
}
-func (w *mrtWatcher) restart(filename string) error {
- return nil
+// this will be called from server's main goroutine.
+// shouldn't block.
+func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) {
+ m.ch.In() <- &watcherMsg{typ, ev}
}
-func (w *mrtWatcher) loop() error {
- c := func() *time.Ticker {
- if w.interval == 0 {
- return &time.Ticker{}
- }
- return time.NewTicker(time.Second * time.Duration(w.interval))
- }()
-
- defer func() {
- if w.file != nil {
- w.file.Close()
- }
- if w.interval != 0 {
- c.Stop()
- }
- }()
-
+func (m *watcherManager) loop() error {
for {
- serialize := func(ev watcherEvent) ([]byte, error) {
- m := ev.(*watcherEventUpdateMsg)
- subtype := mrt.MESSAGE_AS4
- mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil)
- mp.BGPMessagePayload = m.payload
- if m.fourBytesAs == false {
- subtype = mrt.MESSAGE
- }
- bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- }).Warn(err)
- return nil, err
- }
- return bm.Serialize()
- }
-
- drain := func(ev watcherEvent) {
- events := make([]watcherEvent, 0, 1+len(w.ch))
- if ev != nil {
- events = append(events, ev)
- }
-
- for len(w.ch) > 0 {
- e := <-w.ch
- events = append(events, e)
- }
-
- w := func(buf []byte) {
- if _, err := w.file.Write(buf); err == nil {
- w.file.Sync()
- } else {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Error": err,
- }).Warn(err)
- }
+ select {
+ case i, ok := <-m.ch.Out():
+ if !ok {
+ continue
}
-
- var b bytes.Buffer
- for _, e := range events {
- buf, err := serialize(e)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": e,
- }).Warn(err)
- continue
- }
- b.Write(buf)
- if b.Len() > 1*1000*1000 {
- w(b.Bytes())
- b.Reset()
+ msg := i.(*watcherMsg)
+ m.mu.RLock()
+ for _, w := range m.m {
+ if ch := w.notify(msg.typ); ch != nil {
+ t := time.NewTimer(time.Second)
+ select {
+ case ch <- msg.ev:
+ case <-t.C:
+ log.WithFields(log.Fields{
+ "Topic": "Watcher",
+ }).Warnf("notification to %s timeout expired")
+ }
}
}
- if b.Len() > 0 {
- w(b.Bytes())
- }
- }
- select {
- case <-w.t.Dying():
- drain(nil)
- return nil
- case e := <-w.ch:
- drain(e)
- case <-c.C:
- w.file.Close()
- file, err := mrtFileOpen(w.filename, w.interval)
- if err == nil {
- w.file = file
- } else {
- log.Info("can't rotate mrt file", err)
- }
+ m.mu.RUnlock()
}
}
}
-func (w *mrtWatcher) watchingEventTypes() []watcherEventType {
- return []watcherEventType{WATCHER_EVENT_UPDATE_MSG}
+func (m *watcherManager) watcher(typ watcherType) (watcher, bool) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ w, y := m.m[typ]
+ return w, y
}
-func mrtFileOpen(filename string, interval uint64) (*os.File, error) {
- realname := filename
- if interval != 0 {
- realname = time.Now().Format(filename)
- }
-
- i := len(realname)
- for i > 0 && os.IsPathSeparator(realname[i-1]) {
- // skip trailing path separators
- i--
- }
- j := i
-
- for j > 0 && !os.IsPathSeparator(realname[j-1]) {
- j--
- }
-
- if j > 0 {
- if err := os.MkdirAll(realname[0:j-1], 0755); err != nil {
- log.Warn(err)
- return nil, err
- }
+func (m *watcherManager) addWatcher(typ watcherType, w watcher) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, y := m.m[typ]; y {
+ return fmt.Errorf("already exists %s watcher", typ)
}
+ m.m[typ] = w
+ return nil
+}
- file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
- if err != nil {
- log.Warn(err)
+func (m *watcherManager) delWatcher(typ watcherType) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, y := m.m[typ]; !y {
+ return fmt.Errorf("not found %s watcher", typ)
}
- return file, err
+ w := m.m[typ]
+ w.stop()
+ delete(m.m, typ)
+ return nil
}
-func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) {
- file, err := mrtFileOpen(filename, interval)
- if err != nil {
- return nil, err
- }
- w := mrtWatcher{
- filename: filename,
- file: file,
- ch: make(chan watcherEvent, 1<<16),
- interval: interval,
+func newWatcherManager() *watcherManager {
+ m := &watcherManager{
+ m: make(map[watcherType]watcher),
+ ch: channels.NewInfiniteChannel(),
}
- w.t.Go(w.loop)
- return &w, nil
+ m.t.Go(m.loop)
+ return m
}