diff options
-rw-r--r-- | server/monitor.go | 71 | ||||
-rw-r--r-- | server/mrt.go | 196 | ||||
-rw-r--r-- | server/server.go | 162 | ||||
-rw-r--r-- | server/watcher.go | 232 |
4 files changed, 357 insertions, 304 deletions
diff --git a/server/monitor.go b/server/monitor.go index c7236fed..e8e7df4d 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -30,7 +30,7 @@ type grpcWatcher struct { } func (w *grpcWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG { + if t == WATCHER_EVENT_BESTPATH_CHANGE || t == WATCHER_EVENT_UPDATE_MSG || t == WATCHER_EVENT_POST_POLICY_UPDATE_MSG || t == WATCHER_EVENT_STATE_CHANGE { return w.ch } return nil @@ -41,8 +41,8 @@ func (w *grpcWatcher) stop() { } func (w *grpcWatcher) watchingEventTypes() []watcherEventType { - types := make([]watcherEventType, 0, 3) - for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE} { + types := make([]watcherEventType, 0, 4) + for _, t := range []watcherEventType{WATCHER_EVENT_UPDATE_MSG, WATCHER_EVENT_POST_POLICY_UPDATE_MSG, WATCHER_EVENT_BESTPATH_CHANGE, WATCHER_EVENT_STATE_CHANGE} { if len(w.reqs[t]) > 0 { types = append(types, t) } @@ -61,19 +61,22 @@ func (w *grpcWatcher) loop() error { } return nil case req := <-w.ctlCh: - tbl := req.Data.(*api.Table) var reqType watcherEventType - switch tbl.Type { - case api.Resource_GLOBAL: - reqType = WATCHER_EVENT_BESTPATH_CHANGE - case api.Resource_ADJ_IN: - if tbl.PostPolicy { - reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG - } else { - reqType = WATCHER_EVENT_UPDATE_MSG + switch req.RequestType { + case REQ_MONITOR_RIB: + tbl := req.Data.(*api.Table) + switch tbl.Type { + case api.Resource_GLOBAL: + reqType = WATCHER_EVENT_BESTPATH_CHANGE + case api.Resource_ADJ_IN: + if tbl.PostPolicy { + reqType = WATCHER_EVENT_POST_POLICY_UPDATE_MSG + } else { + reqType = WATCHER_EVENT_UPDATE_MSG + } } - default: - continue + case REQ_MONITOR_NEIGHBOR_PEER_STATE: + reqType = WATCHER_EVENT_STATE_CHANGE } reqs := w.reqs[reqType] if reqs == nil { @@ -121,8 +124,46 @@ func (w *grpcWatcher) loop() error { } else { sendPaths(WATCHER_EVENT_UPDATE_MSG, msg.pathList) } + case *watcherEventStateChangedMsg: + peer := &api.Peer{ + Conf: &api.PeerConf{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + Id: msg.peerID.String(), + }, + Info: &api.PeerState{ + PeerAs: msg.peerAS, + LocalAs: msg.localAS, + NeighborAddress: msg.peerAddress.String(), + BgpState: msg.state.String(), + AdminState: msg.adminState.String(), + }, + Transport: &api.Transport{ + LocalAddress: msg.localAddress.String(), + LocalPort: uint32(msg.localPort), + RemotePort: uint32(msg.peerPort), + }, + } + reqType := WATCHER_EVENT_STATE_CHANGE + remains := make([]*GrpcRequest, 0, len(w.reqs[reqType])) + result := &GrpcResponse{ + Data: peer, + } + for _, req := range w.reqs[reqType] { + select { + case <-req.EndCh: + continue + default: + } + remains = append(remains, req) + if req.Name != "" && req.Name != peer.Conf.NeighborAddress { + continue + } + req.ResponseCh <- result + } + w.reqs[reqType] = remains } - } } } diff --git a/server/mrt.go b/server/mrt.go new file mode 100644 index 00000000..ce4ac90b --- /dev/null +++ b/server/mrt.go @@ -0,0 +1,196 @@ +// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "bytes" + "os" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/osrg/gobgp/packet/mrt" + "gopkg.in/tomb.v2" +) + +type mrtWatcher struct { + t tomb.Tomb + filename string + file *os.File + ch chan watcherEvent + interval uint64 +} + +func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG { + return w.ch + } + return nil +} + +func (w *mrtWatcher) stop() { + w.t.Kill(nil) +} + +func (w *mrtWatcher) restart(filename string) error { + return nil +} + +func (w *mrtWatcher) loop() error { + c := func() *time.Ticker { + if w.interval == 0 { + return &time.Ticker{} + } + return time.NewTicker(time.Second * time.Duration(w.interval)) + }() + + defer func() { + if w.file != nil { + w.file.Close() + } + if w.interval != 0 { + c.Stop() + } + }() + + for { + serialize := func(ev watcherEvent) ([]byte, error) { + m := ev.(*watcherEventUpdateMsg) + subtype := mrt.MESSAGE_AS4 + mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) + mp.BGPMessagePayload = m.payload + if m.fourBytesAs == false { + subtype = mrt.MESSAGE + } + bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": m, + }).Warn(err) + return nil, err + } + return bm.Serialize() + } + + drain := func(ev watcherEvent) { + events := make([]watcherEvent, 0, 1+len(w.ch)) + if ev != nil { + events = append(events, ev) + } + + for len(w.ch) > 0 { + e := <-w.ch + events = append(events, e) + } + + w := func(buf []byte) { + if _, err := w.file.Write(buf); err == nil { + w.file.Sync() + } else { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Error": err, + }).Warn(err) + } + } + + var b bytes.Buffer + for _, e := range events { + buf, err := serialize(e) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": e, + }).Warn(err) + continue + } + b.Write(buf) + if b.Len() > 1*1000*1000 { + w(b.Bytes()) + b.Reset() + } + } + if b.Len() > 0 { + w(b.Bytes()) + } + } + select { + case <-w.t.Dying(): + drain(nil) + return nil + case e := <-w.ch: + drain(e) + case <-c.C: + w.file.Close() + file, err := mrtFileOpen(w.filename, w.interval) + if err == nil { + w.file = file + } else { + log.Info("can't rotate mrt file", err) + } + } + } +} + +func (w *mrtWatcher) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} +} + +func mrtFileOpen(filename string, interval uint64) (*os.File, error) { + realname := filename + if interval != 0 { + realname = time.Now().Format(filename) + } + + i := len(realname) + for i > 0 && os.IsPathSeparator(realname[i-1]) { + // skip trailing path separators + i-- + } + j := i + + for j > 0 && !os.IsPathSeparator(realname[j-1]) { + j-- + } + + if j > 0 { + if err := os.MkdirAll(realname[0:j-1], 0755); err != nil { + log.Warn(err) + return nil, err + } + } + + file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + log.Warn(err) + } + return file, err +} + +func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) { + file, err := mrtFileOpen(filename, interval) + if err != nil { + return nil, err + } + w := mrtWatcher{ + filename: filename, + file: file, + ch: make(chan watcherEvent, 1<<16), + interval: interval, + } + w.t.Go(w.loop) + return &w, nil +} diff --git a/server/server.go b/server/server.go index 7bd29048..4d5ebf43 100644 --- a/server/server.go +++ b/server/server.go @@ -42,50 +42,6 @@ type SenderMsg struct { msg *FsmOutgoingMsg } -type broadcastMsg interface { - send() -} - -type broadcastGrpcMsg struct { - req *GrpcRequest - result *GrpcResponse - done bool -} - -func (m *broadcastGrpcMsg) send() { - m.req.ResponseCh <- m.result - if m.done == true { - close(m.req.ResponseCh) - } -} - -type broadcastBGPMsg struct { - message *bgp.BGPMessage - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - fourBytesAs bool - ch chan *broadcastBGPMsg -} - -func (m *broadcastBGPMsg) send() { - m.ch <- m -} - -type Watchers map[watcherType]watcher - -func (ws Watchers) watching(typ watcherEventType) bool { - for _, w := range ws { - for _, ev := range w.watchingEventTypes() { - if ev == typ { - return true - } - } - } - return false -} - type TCPListener struct { l *net.TCPListener ch chan struct{} @@ -146,16 +102,14 @@ type BgpServer struct { acceptCh chan *net.TCPConn collector *Collector - GrpcReqCh chan *GrpcRequest - policy *table.RoutingPolicy - broadcastReqs []*GrpcRequest - broadcastMsgs []broadcastMsg - listeners []*TCPListener - neighborMap map[string]*Peer - globalRib *table.TableManager - roaManager *roaManager - shutdown bool - watchers Watchers + GrpcReqCh chan *GrpcRequest + policy *table.RoutingPolicy + listeners []*TCPListener + neighborMap map[string]*Peer + globalRib *table.TableManager + roaManager *roaManager + shutdown bool + watchers *watcherManager } func NewBgpServer() *BgpServer { @@ -163,24 +117,12 @@ func NewBgpServer() *BgpServer { return &BgpServer{ GrpcReqCh: make(chan *GrpcRequest, 1), neighborMap: make(map[string]*Peer), - watchers: Watchers(make(map[watcherType]watcher)), policy: table.NewRoutingPolicy(), roaManager: roaManager, + watchers: newWatcherManager(), } } -func (server *BgpServer) notify2watchers(typ watcherEventType, ev watcherEvent) error { - for _, watcher := range server.watchers { - if ch := watcher.notify(typ); ch != nil { - server.broadcastMsgs = append(server.broadcastMsgs, &broadcastWatcherMsg{ - ch: ch, - event: ev, - }) - } - } - return nil -} - func (server *BgpServer) Listeners(addr string) []*net.TCPListener { list := make([]*net.TCPListener, 0, len(server.listeners)) rhs := net.ParseIP(addr).To4() != nil @@ -196,7 +138,7 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { func (server *BgpServer) Serve() { w, _ := newGrpcWatcher() - server.watchers[WATCHER_GRPC_MONITOR] = w + server.watchers.addWatcher(WATCHER_GRPC_MONITOR, w) senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { @@ -213,14 +155,6 @@ func (server *BgpServer) Serve() { }(senderCh) - broadcastCh := make(chan broadcastMsg, 8) - go func(ch chan broadcastMsg) { - for { - m := <-ch - m.send() - } - }(broadcastCh) - server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) @@ -249,12 +183,6 @@ func (server *BgpServer) Serve() { sCh = senderCh firstMsg = senderMsgs[0] } - var firstBroadcastMsg broadcastMsg - var bCh chan broadcastMsg - if len(server.broadcastMsgs) > 0 { - bCh = broadcastCh - firstBroadcastMsg = server.broadcastMsgs[0] - } passConn := func(conn *net.TCPConn) { host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) @@ -336,8 +264,6 @@ func (server *BgpServer) Serve() { handleFsmMsg(e) case sCh <- firstMsg: senderMsgs = senderMsgs[1:] - case bCh <- firstBroadcastMsg: - server.broadcastMsgs = server.broadcastMsgs[1:] case grpcReq := <-server.GrpcReqCh: m := server.handleGrpc(grpcReq) if len(m) > 0 { @@ -478,7 +404,7 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil best, _ := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf) if !peer.isRouteServerClient() { - server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) + server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -494,30 +420,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { - result := &GrpcResponse{ - Data: peer.ToApiStruct(), - } - remainReqs := make([]*GrpcRequest, 0, len(server.broadcastReqs)) - for _, req := range server.broadcastReqs { - select { - case <-req.EndCh: - continue - default: - } - ignore := req.RequestType != REQ_MONITOR_NEIGHBOR_PEER_STATE - ignore = ignore || (req.Name != "" && req.Name != peer.fsm.pConf.Config.NeighborAddress) - if ignore { - remainReqs = append(remainReqs, req) - continue - } - m := &broadcastGrpcMsg{ - req: req, - result: result, - } - server.broadcastMsgs = append(server.broadcastMsgs, m) - remainReqs = append(remainReqs, req) - } - server.broadcastReqs = remainReqs newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { @@ -536,9 +438,10 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { sentOpen: sentOpen, recvOpen: recvOpen, state: newState, + adminState: peer.fsm.adminState, timestamp: time.Now(), } - server.notify2watchers(WATCHER_EVENT_STATE_CHANGE, ev) + server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) } } } @@ -650,7 +553,7 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) ([] if len(best[table.GLOBAL_RIB_NAME]) == 0 { return nil, alteredPathList } - server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) + server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME]}) } for _, targetPeer := range server.neighborMap { @@ -807,7 +710,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { postPolicy: false, pathList: pathList, } - server.notify2watchers(WATCHER_EVENT_UPDATE_MSG, ev) + server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev) } if len(pathList) > 0 { @@ -830,7 +733,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() ev.payload = payload - server.notify2watchers(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) + server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) } } } @@ -2253,16 +2156,16 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { Data: data, } close(grpcReq.ResponseCh) - case REQ_MONITOR_NEIGHBOR_PEER_STATE: - server.broadcastReqs = append(server.broadcastReqs, grpcReq) - case REQ_MONITOR_RIB: + case REQ_MONITOR_RIB, REQ_MONITOR_NEIGHBOR_PEER_STATE: if grpcReq.Name != "" { if _, err = server.checkNeighborRequest(grpcReq); err != nil { break } } - w := server.watchers[WATCHER_GRPC_MONITOR] - go w.(*grpcWatcher).addRequest(grpcReq) + w, y := server.watchers.watcher(WATCHER_GRPC_MONITOR) + if y { + go w.(*grpcWatcher).addRequest(grpcReq) + } case REQ_ENABLE_MRT: server.handleEnableMrtRequest(grpcReq) case REQ_DISABLE_MRT: @@ -2308,7 +2211,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { } z, err := newZebraWatcher(server.GrpcReqCh, c.Url, protos) if err == nil { - server.watchers[WATCHER_ZEBRA] = z + server.watchers.addWatcher(WATCHER_ZEBRA, z) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -2318,8 +2221,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { c := grpcReq.Data.(*config.CollectorConfig) collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) if err == nil { - server.collector = collector - server.watchers[WATCHER_COLLECTOR] = collector + server.watchers.addWatcher(WATCHER_COLLECTOR, collector) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -2333,7 +2235,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { grpcReq.ResponseCh <- &GrpcResponse{} close(grpcReq.ResponseCh) - server.notify2watchers(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) + server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR @@ -3121,7 +3023,7 @@ func grpcDone(grpcReq *GrpcRequest, e error) { func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { arg := grpcReq.Data.(*api.EnableMrtRequest) - if _, y := server.watchers[WATCHER_MRT]; y { + if _, y := server.watchers.watcher(WATCHER_MRT); y { grpcDone(grpcReq, fmt.Errorf("already enabled")) return } @@ -3131,7 +3033,7 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { } w, err := newMrtWatcher(arg.DumpType, arg.Filename, arg.Interval) if err == nil { - server.watchers[WATCHER_MRT] = w + server.watchers.addWatcher(WATCHER_MRT, w) } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, @@ -3141,14 +3043,12 @@ func (server *BgpServer) handleEnableMrtRequest(grpcReq *GrpcRequest) { } func (server *BgpServer) handleDisableMrtRequest(grpcReq *GrpcRequest) { - w, y := server.watchers[WATCHER_MRT] + _, y := server.watchers.watcher(WATCHER_MRT) if !y { grpcDone(grpcReq, fmt.Errorf("not enabled yet")) return } - - delete(server.watchers, WATCHER_MRT) - w.stop() + server.watchers.delWatcher(WATCHER_MRT) grpcReq.ResponseCh <- &GrpcResponse{ Data: &api.DisableMrtResponse{}, } @@ -3168,10 +3068,10 @@ func (server *BgpServer) handleAddBmp(grpcReq *GrpcRequest) { c = arg } - w, y := server.watchers[WATCHER_BMP] + w, y := server.watchers.watcher(WATCHER_BMP) if !y { w, _ = newBmpWatcher(server.GrpcReqCh) - server.watchers[WATCHER_BMP] = w + server.watchers.addWatcher(WATCHER_BMP, w) } err := w.(*bmpWatcher).addServer(*c) @@ -3194,7 +3094,7 @@ func (server *BgpServer) handleDeleteBmp(grpcReq *GrpcRequest) { c = arg } - if w, y := server.watchers[WATCHER_BMP]; y { + if w, y := server.watchers.watcher(WATCHER_BMP); y { err := w.(*bmpWatcher).deleteServer(*c) grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, diff --git a/server/watcher.go b/server/watcher.go index 8c92fab3..4b4b700c 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -16,26 +16,18 @@ package server import ( - "bytes" + "fmt" + "net" + "sync" + "time" + log "github.com/Sirupsen/logrus" + "github.com/eapache/channels" "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/packet/mrt" "github.com/osrg/gobgp/table" "gopkg.in/tomb.v2" - "net" - "os" - "time" ) -type broadcastWatcherMsg struct { - ch chan watcherEvent - event watcherEvent -} - -func (m *broadcastWatcherMsg) send() { - m.ch <- m.event -} - type watcherType uint8 const ( @@ -86,6 +78,7 @@ type watcherEventStateChangedMsg struct { sentOpen *bgp.BGPMessage recvOpen *bgp.BGPMessage state bgp.FSMState + adminState AdminState timestamp time.Time } @@ -104,172 +97,95 @@ type watcher interface { watchingEventTypes() []watcherEventType } -type mrtWatcher struct { - t tomb.Tomb - filename string - file *os.File - ch chan watcherEvent - interval uint64 +type watcherMsg struct { + typ watcherEventType + ev watcherEvent } -func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_UPDATE_MSG { - return w.ch - } - return nil +type watcherManager struct { + t tomb.Tomb + mu sync.RWMutex + m map[watcherType]watcher + ch *channels.InfiniteChannel } -func (w *mrtWatcher) stop() { - w.t.Kill(nil) +func (m *watcherManager) watching(typ watcherEventType) bool { + for _, w := range m.m { + for _, ev := range w.watchingEventTypes() { + if ev == typ { + return true + } + } + } + return false } -func (w *mrtWatcher) restart(filename string) error { - return nil +// this will be called from server's main goroutine. +// shouldn't block. +func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) { + m.ch.In() <- &watcherMsg{typ, ev} } -func (w *mrtWatcher) loop() error { - c := func() *time.Ticker { - if w.interval == 0 { - return &time.Ticker{} - } - return time.NewTicker(time.Second * time.Duration(w.interval)) - }() - - defer func() { - if w.file != nil { - w.file.Close() - } - if w.interval != 0 { - c.Stop() - } - }() - +func (m *watcherManager) loop() error { for { - serialize := func(ev watcherEvent) ([]byte, error) { - m := ev.(*watcherEventUpdateMsg) - subtype := mrt.MESSAGE_AS4 - mp := mrt.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, nil) - mp.BGPMessagePayload = m.payload - if m.fourBytesAs == false { - subtype = mrt.MESSAGE - } - bm, err := mrt.NewMRTMessage(uint32(m.timestamp.Unix()), mrt.BGP4MP, subtype, mp) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": m, - }).Warn(err) - return nil, err - } - return bm.Serialize() - } - - drain := func(ev watcherEvent) { - events := make([]watcherEvent, 0, 1+len(w.ch)) - if ev != nil { - events = append(events, ev) - } - - for len(w.ch) > 0 { - e := <-w.ch - events = append(events, e) - } - - w := func(buf []byte) { - if _, err := w.file.Write(buf); err == nil { - w.file.Sync() - } else { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Error": err, - }).Warn(err) - } + select { + case i, ok := <-m.ch.Out(): + if !ok { + continue } - - var b bytes.Buffer - for _, e := range events { - buf, err := serialize(e) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": e, - }).Warn(err) - continue - } - b.Write(buf) - if b.Len() > 1*1000*1000 { - w(b.Bytes()) - b.Reset() + msg := i.(*watcherMsg) + m.mu.RLock() + for _, w := range m.m { + if ch := w.notify(msg.typ); ch != nil { + t := time.NewTimer(time.Second) + select { + case ch <- msg.ev: + case <-t.C: + log.WithFields(log.Fields{ + "Topic": "Watcher", + }).Warnf("notification to %s timeout expired") + } } } - if b.Len() > 0 { - w(b.Bytes()) - } - } - select { - case <-w.t.Dying(): - drain(nil) - return nil - case e := <-w.ch: - drain(e) - case <-c.C: - w.file.Close() - file, err := mrtFileOpen(w.filename, w.interval) - if err == nil { - w.file = file - } else { - log.Info("can't rotate mrt file", err) - } + m.mu.RUnlock() } } } -func (w *mrtWatcher) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_UPDATE_MSG} +func (m *watcherManager) watcher(typ watcherType) (watcher, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + w, y := m.m[typ] + return w, y } -func mrtFileOpen(filename string, interval uint64) (*os.File, error) { - realname := filename - if interval != 0 { - realname = time.Now().Format(filename) - } - - i := len(realname) - for i > 0 && os.IsPathSeparator(realname[i-1]) { - // skip trailing path separators - i-- - } - j := i - - for j > 0 && !os.IsPathSeparator(realname[j-1]) { - j-- - } - - if j > 0 { - if err := os.MkdirAll(realname[0:j-1], 0755); err != nil { - log.Warn(err) - return nil, err - } +func (m *watcherManager) addWatcher(typ watcherType, w watcher) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, y := m.m[typ]; y { + return fmt.Errorf("already exists %s watcher", typ) } + m.m[typ] = w + return nil +} - file, err := os.OpenFile(realname, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - log.Warn(err) +func (m *watcherManager) delWatcher(typ watcherType) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, y := m.m[typ]; !y { + return fmt.Errorf("not found %s watcher", typ) } - return file, err + w := m.m[typ] + w.stop() + delete(m.m, typ) + return nil } -func newMrtWatcher(dumpType int32, filename string, interval uint64) (*mrtWatcher, error) { - file, err := mrtFileOpen(filename, interval) - if err != nil { - return nil, err - } - w := mrtWatcher{ - filename: filename, - file: file, - ch: make(chan watcherEvent, 1<<16), - interval: interval, +func newWatcherManager() *watcherManager { + m := &watcherManager{ + m: make(map[watcherType]watcher), + ch: channels.NewInfiniteChannel(), } - w.t.Go(w.loop) - return &w, nil + m.t.Go(m.loop) + return m } |