diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 23:09:51 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 23:09:51 +0900 |
commit | 21d6d528523c7c1f0d7dfcfeffbef5928dc40894 (patch) | |
tree | c0c7767a2227fbcb0fba4f9a1fa754365b42dbc8 | |
parent | 20696d7e1cca8d49b2b59f73aad4be009d5293b0 (diff) |
delete unused old watcher code
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | server/grpc_server.go | 1 | ||||
-rw-r--r-- | server/server.go | 63 | ||||
-rw-r--r-- | server/watcher.go | 191 |
3 files changed, 43 insertions, 212 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 86d1267b..f8c411ad 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -96,7 +96,6 @@ const ( REQ_DEFERRAL_TIMER_EXPIRED REQ_RELOAD_POLICY REQ_INITIALIZE_ZEBRA - REQ_WATCHER_ADJ_RIB_IN // FIXME ) type Server struct { diff --git a/server/server.go b/server/server.go index 7e3943fe..74aa73b8 100644 --- a/server/server.go +++ b/server/server.go @@ -103,7 +103,6 @@ type BgpServer struct { globalRib *table.TableManager roaManager *roaManager shutdown bool - watchers *watcherManager watcherMap map[watchType][]*Watcher zclient *zebraClient bmpManager *bmpClientManager @@ -117,7 +116,6 @@ func NewBgpServer() *BgpServer { neighborMap: make(map[string]*Peer), policy: table.NewRoutingPolicy(), roaManager: roaManager, - watchers: newWatcherManager(), mgmtCh: make(chan func(), 1), watcherMap: make(map[watchType][]*Watcher), } @@ -369,7 +367,6 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil best, _, multipath := server.globalRib.DeletePathsByPeer(ids, peer.fsm.peerInfo, rf) if !peer.isRouteServerClient() { - server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multipath}) } @@ -411,9 +408,6 @@ func (server *BgpServer) broadcastPeerState(peer *Peer, oldState bgp.FSMState) { newState := peer.fsm.state if oldState == bgp.BGP_FSM_ESTABLISHED || newState == bgp.BGP_FSM_ESTABLISHED { ev := createWatcherEventStateChange(peer) - if server.watchers.watching(WATCHER_EVENT_STATE_CHANGE) { - server.watchers.notify(WATCHER_EVENT_STATE_CHANGE, ev) - } for _, w := range server.watcherMap[WATCH_TYPE_PEER_STATE] { w.notify(ev) } @@ -527,7 +521,6 @@ func (server *BgpServer) propagateUpdate(peer *Peer, pathList []*table.Path) []* if len(best[table.GLOBAL_RIB_NAME]) == 0 { return alteredPathList } - server.watchers.notify(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) for _, w := range server.watcherMap[WATCH_TYPE_BESTPATH] { w.notify(&watcherEventBestPathMsg{pathList: best[table.GLOBAL_RIB_NAME], multiPathList: multi}) } @@ -674,7 +667,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { sendFsmOutgoingMsg(peer, nil, notification, true) return } - if m.Header.Type == bgp.BGP_MSG_UPDATE && (server.watchers.watching(WATCHER_EVENT_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0) { + if m.Header.Type == bgp.BGP_MSG_UPDATE && len(server.watcherMap[WATCH_TYPE_PRE_UPDATE]) > 0 { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -690,7 +683,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { postPolicy: false, pathList: pathList, } - server.watchers.notify(WATCHER_EVENT_UPDATE_MSG, ev) for _, w := range server.watcherMap[WATCH_TYPE_PRE_UPDATE] { w.notify(ev) } @@ -699,7 +691,7 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { if len(pathList) > 0 { var altered []*table.Path altered = server.propagateUpdate(peer, pathList) - if server.watchers.watching(WATCHER_EVENT_POST_POLICY_UPDATE_MSG) || len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 { + if len(server.watcherMap[WATCH_TYPE_POST_UPDATE]) > 0 { _, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() ev := &watcherEventUpdateMsg{ @@ -716,7 +708,6 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { for _, u := range table.CreateUpdateMsgFromPaths(altered) { payload, _ := u.Serialize() ev.payload = payload - server.watchers.notify(WATCHER_EVENT_POST_POLICY_UPDATE_MSG, ev) for _, w := range server.watcherMap[WATCH_TYPE_POST_UPDATE] { w.notify(ev) } @@ -1761,15 +1752,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { if len(pathList) > 0 { server.propagateUpdate(nil, pathList) } - case REQ_WATCHER_ADJ_RIB_IN: - pathList := make([]*table.Path, 0) - for _, peer := range server.neighborMap { - pathList = append(pathList, peer.adjRibIn.PathList(peer.configuredRFlist(), false)...) - } - - grpcReq.ResponseCh <- &GrpcResponse{} - close(grpcReq.ResponseCh) - server.watchers.notify(WATCHER_EVENT_ADJ_IN, &watcherEventAdjInMsg{pathList: pathList}) default: err = fmt.Errorf("Unknown request type: %v", grpcReq.RequestType) goto ERROR @@ -2883,3 +2865,44 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } return w } + +type watcherEvent interface { +} + +type watcherEventUpdateMsg struct { + message *bgp.BGPMessage + peerAS uint32 + localAS uint32 + peerAddress net.IP + localAddress net.IP + peerID net.IP + fourBytesAs bool + timestamp time.Time + payload []byte + postPolicy bool + pathList []*table.Path +} + +type watcherEventStateChangedMsg struct { + peerAS uint32 + localAS uint32 + peerAddress net.IP + localAddress net.IP + peerPort uint16 + localPort uint16 + peerID net.IP + sentOpen *bgp.BGPMessage + recvOpen *bgp.BGPMessage + state bgp.FSMState + adminState AdminState + timestamp time.Time +} + +type watcherEventAdjInMsg struct { + pathList []*table.Path +} + +type watcherEventBestPathMsg struct { + pathList []*table.Path + multiPathList [][]*table.Path +} diff --git a/server/watcher.go b/server/watcher.go deleted file mode 100644 index 674206d9..00000000 --- a/server/watcher.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "fmt" - "net" - "sync" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/eapache/channels" - "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/table" - "gopkg.in/tomb.v2" -) - -type watcherType uint8 - -const ( - _ watcherType = iota - WATCHER_MRT // UPDATE MSG - WATCHER_BMP - WATCHER_ZEBRA - WATCHER_COLLECTOR - WATCHER_GRPC_MONITOR -) - -type watcherEventType uint8 - -const ( - _ watcherEventType = iota - WATCHER_EVENT_UPDATE_MSG - WATCHER_EVENT_STATE_CHANGE - WATCHER_EVENT_BESTPATH_CHANGE - WATCHER_EVENT_POST_POLICY_UPDATE_MSG - WATCHER_EVENT_ADJ_IN -) - -type watcherEvent interface { -} - -type watcherEventUpdateMsg struct { - message *bgp.BGPMessage - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerID net.IP - fourBytesAs bool - timestamp time.Time - payload []byte - postPolicy bool - pathList []*table.Path -} - -type watcherEventStateChangedMsg struct { - peerAS uint32 - localAS uint32 - peerAddress net.IP - localAddress net.IP - peerPort uint16 - localPort uint16 - peerID net.IP - sentOpen *bgp.BGPMessage - recvOpen *bgp.BGPMessage - state bgp.FSMState - adminState AdminState - timestamp time.Time -} - -type watcherEventAdjInMsg struct { - pathList []*table.Path -} - -type watcherEventBestPathMsg struct { - pathList []*table.Path - multiPathList [][]*table.Path -} - -type watcher interface { - notify(watcherEventType) chan watcherEvent - stop() - watchingEventTypes() []watcherEventType -} - -type watcherMsg struct { - typ watcherEventType - ev watcherEvent -} - -type watcherManager struct { - t tomb.Tomb - mu sync.RWMutex - m map[watcherType]watcher - ch *channels.InfiniteChannel -} - -func (m *watcherManager) watching(typ watcherEventType) bool { - for _, w := range m.m { - for _, ev := range w.watchingEventTypes() { - if ev == typ { - return true - } - } - } - return false -} - -// this will be called from server's main goroutine. -// shouldn't block. -func (m *watcherManager) notify(typ watcherEventType, ev watcherEvent) { - m.ch.In() <- &watcherMsg{typ, ev} -} - -func (m *watcherManager) loop() error { - for { - select { - case i, ok := <-m.ch.Out(): - if !ok { - continue - } - msg := i.(*watcherMsg) - m.mu.RLock() - for _, w := range m.m { - if ch := w.notify(msg.typ); ch != nil { - t := time.NewTimer(time.Second) - select { - case ch <- msg.ev: - case <-t.C: - log.WithFields(log.Fields{ - "Topic": "Watcher", - }).Warnf("notification to %s timeout expired") - } - } - } - m.mu.RUnlock() - } - } -} - -func (m *watcherManager) watcher(typ watcherType) (watcher, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - w, y := m.m[typ] - return w, y -} - -func (m *watcherManager) addWatcher(typ watcherType, w watcher) error { - m.mu.Lock() - defer m.mu.Unlock() - if _, y := m.m[typ]; y { - return fmt.Errorf("already exists %s watcher", typ) - } - m.m[typ] = w - return nil -} - -func (m *watcherManager) delWatcher(typ watcherType) error { - m.mu.Lock() - defer m.mu.Unlock() - if _, y := m.m[typ]; !y { - return fmt.Errorf("not found %s watcher", typ) - } - w := m.m[typ] - w.stop() - delete(m.m, typ) - return nil -} - -func newWatcherManager() *watcherManager { - m := &watcherManager{ - m: make(map[watcherType]watcher), - ch: channels.NewInfiniteChannel(), - } - m.t.Go(m.loop) - return m -} |