diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:08:59 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-22 17:28:34 +0900 |
commit | 3343bcec3565da1aa40fa1aa1ecc6feee7e902f6 (patch) | |
tree | e1a3a5f8ece2e5ec57e2fb970463278a87594bef /server | |
parent | 5aec36b646e2a3c01434828c0f0cc6f3e8566578 (diff) |
zebra client uses the new Watch API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
Diffstat (limited to 'server')
-rw-r--r-- | server/server.go | 47 | ||||
-rw-r--r-- | server/zclient.go | 50 |
2 files changed, 39 insertions, 58 deletions
diff --git a/server/server.go b/server/server.go index e6d78112..4cd71ad6 100644 --- a/server/server.go +++ b/server/server.go @@ -106,6 +106,7 @@ type BgpServer struct { shutdown bool watchers *watcherManager watcherMap map[watchType][]*Watcher + zclient *zebraClient } func NewBgpServer() *BgpServer { @@ -840,20 +841,26 @@ func (server *BgpServer) SetCollector(c config.Collector) error { return nil } -func (server *BgpServer) SetZebraConfig(z config.Zebra) error { - if !z.Config.Enabled { - return nil - } - ch := make(chan *GrpcResponse) - server.GrpcReqCh <- &GrpcRequest{ - RequestType: REQ_INITIALIZE_ZEBRA, - Data: &z.Config, - ResponseCh: ch, - } - if err := (<-ch).Err(); err != nil { - return err +func (s *BgpServer) StartZebraClient(x *config.Zebra) (err error) { + ch := make(chan struct{}) + defer func() { <-ch }() + + s.mgmtCh <- func() { + defer close(ch) + + if s.zclient != nil { + err = fmt.Errorf("already connected to Zebra") + } else { + c := x.Config + + protos := make([]string, 0, len(c.RedistributeRouteTypeList)) + for _, p := range c.RedistributeRouteTypeList { + protos = append(protos, string(p)) + } + s.zclient, err = newZebraClient(s, c.Url, protos) + } } - return nil + return err } func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) error { @@ -1819,20 +1826,6 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) { if len(pathList) > 0 { server.propagateUpdate(nil, pathList) } - case REQ_INITIALIZE_ZEBRA: - c := grpcReq.Data.(*config.ZebraConfig) - protos := make([]string, 0, len(c.RedistributeRouteTypeList)) - for _, p := range c.RedistributeRouteTypeList { - protos = append(protos, string(p)) - } - z, err := newZebraWatcher(server, c.Url, protos) - if err == nil { - server.watchers.addWatcher(WATCHER_ZEBRA, z) - } - grpcReq.ResponseCh <- &GrpcResponse{ - ResponseErr: err, - } - close(grpcReq.ResponseCh) case REQ_INITIALIZE_COLLECTOR: c := grpcReq.Data.(*config.CollectorConfig) collector, err := NewCollector(server.GrpcReqCh, c.Url, c.DbName, c.TableDumpInterval) diff --git a/server/zclient.go b/server/zclient.go index 9a6885a3..23b74963 100644 --- a/server/zclient.go +++ b/server/zclient.go @@ -21,7 +21,6 @@ import ( "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/table" "github.com/osrg/gobgp/zebra" - "gopkg.in/tomb.v2" "net" "strconv" "strings" @@ -149,64 +148,53 @@ func createPathFromIPRouteMessage(m *zebra.Message) *table.Path { return path } -type zebraWatcher struct { - t tomb.Tomb - ch chan watcherEvent +type zebraClient struct { client *zebra.Client server *BgpServer + dead chan struct{} } -func (w *zebraWatcher) notify(t watcherEventType) chan watcherEvent { - if t == WATCHER_EVENT_BESTPATH_CHANGE { - return w.ch - } - return nil -} - -func (w *zebraWatcher) stop() { - w.t.Kill(nil) +func (z *zebraClient) stop() { + close(z.dead) } -func (w *zebraWatcher) watchingEventTypes() []watcherEventType { - return []watcherEventType{WATCHER_EVENT_BESTPATH_CHANGE} -} +func (z *zebraClient) loop() { + w := z.server.Watch(WatchBestPath()) + defer func() { w.Stop() }() -func (w *zebraWatcher) loop() error { for { select { - case <-w.t.Dying(): - return w.client.Close() - case msg := <-w.client.Receive(): + case <-z.dead: + return + case msg := <-z.client.Receive(): switch msg.Body.(type) { case *zebra.IPRouteBody: - p := createPathFromIPRouteMessage(msg) - if p != nil { - if _, err := w.server.AddPath("", []*table.Path{p}); err != nil { + if p := createPathFromIPRouteMessage(msg); p != nil { + if _, err := z.server.AddPath("", []*table.Path{p}); err != nil { log.Errorf("failed to add path from zebra: %s", p) } } } - case ev := <-w.ch: + case ev := <-w.Event(): msg := ev.(*watcherEventBestPathMsg) if table.UseMultiplePaths.Enabled { for _, dst := range msg.multiPathList { if m := newIPRouteMessage(dst); m != nil { - w.client.Send(m) + z.client.Send(m) } } } else { for _, path := range msg.pathList { if m := newIPRouteMessage([]*table.Path{path}); m != nil { - w.client.Send(m) + z.client.Send(m) } } } } } - return nil } -func newZebraWatcher(s *BgpServer, url string, protos []string) (*zebraWatcher, error) { +func newZebraClient(s *BgpServer, url string, protos []string) (*zebraClient, error) { l := strings.SplitN(url, ":", 2) if len(l) != 2 { return nil, fmt.Errorf("unsupported url: %s", url) @@ -225,11 +213,11 @@ func newZebraWatcher(s *BgpServer, url string, protos []string) (*zebraWatcher, } cli.SendRedistribute(t) } - w := &zebraWatcher{ - ch: make(chan watcherEvent), + w := &zebraClient{ + dead: make(chan struct{}), client: cli, server: s, } - w.t.Go(w.loop) + go w.loop() return w, nil } |