diff options
-rw-r--r-- | server/dumper.go | 71 | ||||
-rw-r--r-- | server/server.go | 27 | ||||
-rw-r--r-- | server/watcher.go | 145 |
3 files changed, 165 insertions, 78 deletions
diff --git a/server/dumper.go b/server/dumper.go deleted file mode 100644 index 46281ca6..00000000 --- a/server/dumper.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - log "github.com/Sirupsen/logrus" - "github.com/osrg/gobgp/packet" - "os" - "time" -) - -type dumper struct { - ch chan *broadcastBGPMsg -} - -func (d *dumper) sendCh() chan *broadcastBGPMsg { - return d.ch -} - -func newDumper(filename string) (*dumper, error) { - f, err := os.Create(filename) - if err != nil { - return nil, err - } - - ch := make(chan *broadcastBGPMsg, 16) - - go func() { - for { - m := <-ch - subtype := bgp.MESSAGE_AS4 - mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, m.message) - if m.fourBytesAs == false { - subtype = bgp.MESSAGE - } - bm, err := bgp.NewMRTMessage(uint32(time.Now().Unix()), bgp.BGP4MP, subtype, mp) - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": m, - }).Warn(err) - continue - } - buf, err := bm.Serialize() - if err != nil { - log.WithFields(log.Fields{ - "Topic": "mrt", - "Data": m, - }).Warn(err) - } else { - f.Write(buf) - } - } - }() - return &dumper{ - ch: ch, - }, nil -} diff --git a/server/server.go b/server/server.go index 1e54da39..8a5cabdf 100644 --- a/server/server.go +++ b/server/server.go @@ -87,7 +87,6 @@ type BgpServer struct { fsmincomingCh chan *FsmMsg rpkiConfigCh chan config.RpkiServers bmpConfigCh chan config.BmpServers - dumper *dumper GrpcReqCh chan *GrpcRequest listenPort int @@ -103,6 +102,7 @@ type BgpServer struct { bmpClient *bmpClient bmpConnCh chan *bmpConn shutdown bool + watchers map[watcherType]watcher } func NewBgpServer(port int) *BgpServer { @@ -118,6 +118,7 @@ func NewBgpServer(port int) *BgpServer { b.policyUpdateCh = make(chan config.RoutingPolicy) b.neighborMap = make(map[string]*Peer) b.listenPort = port + b.watchers = make(map[watcherType]watcher) return &b } @@ -163,11 +164,11 @@ func (server *BgpServer) Serve() { server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{}) if g.Mrt.FileName != "" { - d, err := newDumper(g.Mrt.FileName) + w, err := newMrtWatcher(g.Mrt.FileName) if err != nil { log.Warn(err) } else { - server.dumper = d + server.watchers[WATCHER_MRT] = w } } @@ -799,20 +800,32 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan * } } if m.Header.Type == bgp.BGP_MSG_UPDATE { - if server.dumper != nil { + listener := make(map[watcher]chan watcherEvent) + for _, watcher := range server.watchers { + if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil { + listener[watcher] = ch + } + } + if len(listener) > 0 { _, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER] l, _ := peer.fsm.LocalHostPort() - bm := &broadcastBGPMsg{ + ev := &watcherEventUpdateMsg{ message: m, peerAS: peer.fsm.peerInfo.AS, localAS: peer.fsm.peerInfo.LocalAS, peerAddress: peer.fsm.peerInfo.Address, localAddress: net.ParseIP(l), fourBytesAs: y, - ch: server.dumper.sendCh(), } - server.broadcastMsgs = append(server.broadcastMsgs, bm) + for _, ch := range listener { + bm := &broadcastWatcherMsg{ + ch: ch, + event: ev, + } + server.broadcastMsgs = append(server.broadcastMsgs, bm) + } } + if ch := server.bmpClient.send(); ch != nil { bm := &broadcastBMPMsg{ ch: ch, diff --git a/server/watcher.go b/server/watcher.go new file mode 100644 index 00000000..a4c3a192 --- /dev/null +++ b/server/watcher.go @@ -0,0 +1,145 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + log "github.com/Sirupsen/logrus" + "github.com/osrg/gobgp/packet" + "gopkg.in/tomb.v2" + "net" + "os" + "time" +) + +type broadcastWatcherMsg struct { + ch chan watcherEvent + event watcherEvent +} + +func (m *broadcastWatcherMsg) send() { + m.ch <- m.event +} + +type watcherType uint8 + +const ( + _ watcherType = iota + WATCHER_MRT // UPDATE MSG + WATCHER_BMP + WATCHER_ZEBRA + WATCHER_GRPC_BESTPATH +) + +type watcherEventType uint8 + +const ( + _ watcherEventType = iota + WATCHER_EVENT_UPDATE_MSG + WATCHER_EVENT_STATE_CHANGE + WATCHER_EVENT_BESTPATH_CHANGE +) + +type watcherEvent interface { +} + +type watcherEventUpdateMsg struct { + message *bgp.BGPMessage + peerAS uint32 + localAS uint32 + peerAddress net.IP + localAddress net.IP + fourBytesAs bool +} + +type watcher interface { + notify(watcherEventType) chan watcherEvent + stop() +} + +type mrtWatcher struct { + t tomb.Tomb + filename string + file *os.File + ch chan watcherEvent +} + +func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_UPDATE_MSG { + return w.ch + } + return nil +} + +func (w *mrtWatcher) stop() { + w.t.Kill(nil) +} + +func (w *mrtWatcher) loop() error { + for { + write := func(ev watcherEvent) { + m := ev.(*watcherEventUpdateMsg) + subtype := bgp.MESSAGE_AS4 + mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, m.message) + if m.fourBytesAs == false { + subtype = bgp.MESSAGE + } + bm, err := bgp.NewMRTMessage(uint32(time.Now().Unix()), bgp.BGP4MP, subtype, mp) + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": m, + }).Warn(err) + return + } + buf, err := bm.Serialize() + if err == nil { + _, err = w.file.Write(buf) + } + + if err != nil { + log.WithFields(log.Fields{ + "Topic": "mrt", + "Data": m, + }).Warn(err) + } + } + + select { + case <-w.t.Dying(): + for len(w.ch) > 0 { + m := <-w.ch + write(m) + } + return nil + case m := <-w.ch: + write(m) + } + } +} + +func newMrtWatcher(filename string) (*mrtWatcher, error) { + file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + w := mrtWatcher{ + filename: filename, + file: file, + ch: make(chan watcherEvent), + } + w.t.Go(w.loop) + return &w, nil +} |