summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-11 13:56:15 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-11-12 15:01:12 +0900
commit180d68f88f457096ec92bcb6eabd915a64bb8858 (patch)
tree86116964678ddd40637a29206ab0c1254284e21c /server
parentd5d99dad68ce2b935e1171e64aca8d4340ffae10 (diff)
server: introduce watcher framework
rewrite mrt to use it. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r--server/dumper.go71
-rw-r--r--server/server.go27
-rw-r--r--server/watcher.go145
3 files changed, 165 insertions, 78 deletions
diff --git a/server/dumper.go b/server/dumper.go
deleted file mode 100644
index 46281ca6..00000000
--- a/server/dumper.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-// implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package server
-
-import (
- log "github.com/Sirupsen/logrus"
- "github.com/osrg/gobgp/packet"
- "os"
- "time"
-)
-
-type dumper struct {
- ch chan *broadcastBGPMsg
-}
-
-func (d *dumper) sendCh() chan *broadcastBGPMsg {
- return d.ch
-}
-
-func newDumper(filename string) (*dumper, error) {
- f, err := os.Create(filename)
- if err != nil {
- return nil, err
- }
-
- ch := make(chan *broadcastBGPMsg, 16)
-
- go func() {
- for {
- m := <-ch
- subtype := bgp.MESSAGE_AS4
- mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, m.message)
- if m.fourBytesAs == false {
- subtype = bgp.MESSAGE
- }
- bm, err := bgp.NewMRTMessage(uint32(time.Now().Unix()), bgp.BGP4MP, subtype, mp)
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- }).Warn(err)
- continue
- }
- buf, err := bm.Serialize()
- if err != nil {
- log.WithFields(log.Fields{
- "Topic": "mrt",
- "Data": m,
- }).Warn(err)
- } else {
- f.Write(buf)
- }
- }
- }()
- return &dumper{
- ch: ch,
- }, nil
-}
diff --git a/server/server.go b/server/server.go
index 1e54da39..8a5cabdf 100644
--- a/server/server.go
+++ b/server/server.go
@@ -87,7 +87,6 @@ type BgpServer struct {
fsmincomingCh chan *FsmMsg
rpkiConfigCh chan config.RpkiServers
bmpConfigCh chan config.BmpServers
- dumper *dumper
GrpcReqCh chan *GrpcRequest
listenPort int
@@ -103,6 +102,7 @@ type BgpServer struct {
bmpClient *bmpClient
bmpConnCh chan *bmpConn
shutdown bool
+ watchers map[watcherType]watcher
}
func NewBgpServer(port int) *BgpServer {
@@ -118,6 +118,7 @@ func NewBgpServer(port int) *BgpServer {
b.policyUpdateCh = make(chan config.RoutingPolicy)
b.neighborMap = make(map[string]*Peer)
b.listenPort = port
+ b.watchers = make(map[watcherType]watcher)
return &b
}
@@ -163,11 +164,11 @@ func (server *BgpServer) Serve() {
server.roaClient, _ = newROAClient(g.GlobalConfig.As, config.RpkiServers{})
if g.Mrt.FileName != "" {
- d, err := newDumper(g.Mrt.FileName)
+ w, err := newMrtWatcher(g.Mrt.FileName)
if err != nil {
log.Warn(err)
} else {
- server.dumper = d
+ server.watchers[WATCHER_MRT] = w
}
}
@@ -799,20 +800,32 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg, incoming chan *
}
}
if m.Header.Type == bgp.BGP_MSG_UPDATE {
- if server.dumper != nil {
+ listener := make(map[watcher]chan watcherEvent)
+ for _, watcher := range server.watchers {
+ if ch := watcher.notify(WATCHER_EVENT_UPDATE_MSG); ch != nil {
+ listener[watcher] = ch
+ }
+ }
+ if len(listener) > 0 {
_, y := peer.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
- bm := &broadcastBGPMsg{
+ ev := &watcherEventUpdateMsg{
message: m,
peerAS: peer.fsm.peerInfo.AS,
localAS: peer.fsm.peerInfo.LocalAS,
peerAddress: peer.fsm.peerInfo.Address,
localAddress: net.ParseIP(l),
fourBytesAs: y,
- ch: server.dumper.sendCh(),
}
- server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ for _, ch := range listener {
+ bm := &broadcastWatcherMsg{
+ ch: ch,
+ event: ev,
+ }
+ server.broadcastMsgs = append(server.broadcastMsgs, bm)
+ }
}
+
if ch := server.bmpClient.send(); ch != nil {
bm := &broadcastBMPMsg{
ch: ch,
diff --git a/server/watcher.go b/server/watcher.go
new file mode 100644
index 00000000..a4c3a192
--- /dev/null
+++ b/server/watcher.go
@@ -0,0 +1,145 @@
+// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ log "github.com/Sirupsen/logrus"
+ "github.com/osrg/gobgp/packet"
+ "gopkg.in/tomb.v2"
+ "net"
+ "os"
+ "time"
+)
+
+type broadcastWatcherMsg struct {
+ ch chan watcherEvent
+ event watcherEvent
+}
+
+func (m *broadcastWatcherMsg) send() {
+ m.ch <- m.event
+}
+
+type watcherType uint8
+
+const (
+ _ watcherType = iota
+ WATCHER_MRT // UPDATE MSG
+ WATCHER_BMP
+ WATCHER_ZEBRA
+ WATCHER_GRPC_BESTPATH
+)
+
+type watcherEventType uint8
+
+const (
+ _ watcherEventType = iota
+ WATCHER_EVENT_UPDATE_MSG
+ WATCHER_EVENT_STATE_CHANGE
+ WATCHER_EVENT_BESTPATH_CHANGE
+)
+
+type watcherEvent interface {
+}
+
+type watcherEventUpdateMsg struct {
+ message *bgp.BGPMessage
+ peerAS uint32
+ localAS uint32
+ peerAddress net.IP
+ localAddress net.IP
+ fourBytesAs bool
+}
+
+type watcher interface {
+ notify(watcherEventType) chan watcherEvent
+ stop()
+}
+
+type mrtWatcher struct {
+ t tomb.Tomb
+ filename string
+ file *os.File
+ ch chan watcherEvent
+}
+
+func (w *mrtWatcher) notify(t watcherEventType) chan watcherEvent {
+ if t == WATCHER_EVENT_UPDATE_MSG {
+ return w.ch
+ }
+ return nil
+}
+
+func (w *mrtWatcher) stop() {
+ w.t.Kill(nil)
+}
+
+func (w *mrtWatcher) loop() error {
+ for {
+ write := func(ev watcherEvent) {
+ m := ev.(*watcherEventUpdateMsg)
+ subtype := bgp.MESSAGE_AS4
+ mp := bgp.NewBGP4MPMessage(m.peerAS, m.localAS, 0, m.peerAddress.String(), m.localAddress.String(), m.fourBytesAs, m.message)
+ if m.fourBytesAs == false {
+ subtype = bgp.MESSAGE
+ }
+ bm, err := bgp.NewMRTMessage(uint32(time.Now().Unix()), bgp.BGP4MP, subtype, mp)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": m,
+ }).Warn(err)
+ return
+ }
+ buf, err := bm.Serialize()
+ if err == nil {
+ _, err = w.file.Write(buf)
+ }
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "Topic": "mrt",
+ "Data": m,
+ }).Warn(err)
+ }
+ }
+
+ select {
+ case <-w.t.Dying():
+ for len(w.ch) > 0 {
+ m := <-w.ch
+ write(m)
+ }
+ return nil
+ case m := <-w.ch:
+ write(m)
+ }
+ }
+}
+
+func newMrtWatcher(filename string) (*mrtWatcher, error) {
+ file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ log.Fatal(err)
+ }
+ w := mrtWatcher{
+ filename: filename,
+ file: file,
+ ch: make(chan watcherEvent),
+ }
+ w.t.Go(w.loop)
+ return &w, nil
+}