diff options
-rw-r--r-- | server/server.go | 30 | ||||
-rw-r--r-- | server/watcher.go | 4 | ||||
-rw-r--r-- | server/zclient.go | 140 |
3 files changed, 107 insertions, 67 deletions
diff --git a/server/server.go b/server/server.go index 7efc3686..faf4a06c 100644 --- a/server/server.go +++ b/server/server.go @@ -32,7 +32,6 @@ import ( "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/packet/bmp" "github.com/osrg/gobgp/table" - "github.com/osrg/gobgp/zebra" "github.com/satori/go.uuid" ) @@ -145,7 +144,6 @@ type BgpServer struct { fsmincomingCh *channels.InfiniteChannel fsmStateCh chan *FsmMsg acceptCh chan *net.TCPConn - zapiMsgCh chan *zebra.Message collector *Collector GrpcReqCh chan *GrpcRequest @@ -155,7 +153,6 @@ type BgpServer struct { listeners []*TCPListener neighborMap map[string]*Peer globalRib *table.TableManager - zclient *zebra.Client roaManager *roaManager shutdown bool watchers Watchers @@ -328,11 +325,6 @@ func (server *BgpServer) Serve() { select { case rmsg := <-server.roaManager.ReceiveROA(): server.roaManager.HandleROAEvent(rmsg) - case zmsg := <-server.zapiMsgCh: - m := handleZapiMsg(zmsg, server) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } case conn := <-server.acceptCh: passConn(conn) case e, ok := <-server.fsmincomingCh.Out(): @@ -502,22 +494,11 @@ func (server *BgpServer) dropPeerAllRoutes(peer *Peer, families []bgp.RouteFamil } func (server *BgpServer) broadcastBests(bests []*table.Path) { + server.notify2watchers(WATCHER_EVENT_BESTPATH_CHANGE, &watcherEventBestPathMsg{pathList: bests}) for _, path := range bests { if path == nil { continue } - if !path.IsFromExternal() { - z := newBroadcastZapiBestMsg(server.zclient, path) - if z != nil { - server.broadcastMsgs = append(server.broadcastMsgs, z) - log.WithFields(log.Fields{ - "Topic": "Server", - "Client": z.client, - "Message": z.msg, - }).Debug("Default policy applied and rejected.") - } - } - rf := path.GetRouteFamily() result := &GrpcResponse{ @@ -2359,10 +2340,13 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { close(grpcReq.ResponseCh) case REQ_INITIALIZE_ZEBRA: c := grpcReq.Data.(*config.ZebraConfig) - cli, err := NewZclient(c.Url, c.RedistributeRouteTypeList) + protos := make([]string, 0, len(c.RedistributeRouteTypeList)) + for _, p := range c.RedistributeRouteTypeList { + protos = append(protos, string(p)) + } + z, err := newZebraWatcher(server.GrpcReqCh, c.Url, protos) if err == nil { - server.zclient = cli - server.zapiMsgCh = server.zclient.Receive() + server.watchers[WATCHER_ZEBRA] = z } grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: err, diff --git a/server/watcher.go b/server/watcher.go index 490978f0..8df5edbb 100644 --- a/server/watcher.go +++ b/server/watcher.go @@ -94,6 +94,10 @@ type watcherEventAdjInMsg struct { pathList []*table.Path } +type watcherEventBestPathMsg struct { + pathList []*table.Path +} + type watcher interface { notify(watcherEventType) chan watcherEvent restart(string) error diff --git a/server/zclient.go b/server/zclient.go index 0d2f4300..87e303d4 100644 --- a/server/zclient.go +++ b/server/zclient.go @@ -18,26 +18,20 @@ package server import ( "fmt" log "github.com/Sirupsen/logrus" - "github.com/osrg/gobgp/config" + api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/packet/bgp" "github.com/osrg/gobgp/table" "github.com/osrg/gobgp/zebra" + "gopkg.in/tomb.v2" "net" "strconv" "strings" - "time" ) -type broadcastZapiMsg struct { - client *zebra.Client - msg *zebra.Message -} - -func (m *broadcastZapiMsg) send() { - m.client.Send(m.msg) -} - func newIPRouteMessage(path *table.Path) *zebra.Message { + if path.IsFromExternal() { + return nil + } l := strings.SplitN(path.GetNlri().String(), "/", 2) var command zebra.API_TYPE var prefix net.IP @@ -88,11 +82,14 @@ func newIPRouteMessage(path *table.Path) *zebra.Message { } } -func createPathFromIPRouteMessage(m *zebra.Message, peerInfo *table.PeerInfo) *table.Path { +func createRequestFromIPRouteMessage(m *zebra.Message) *api.AddPathRequest { header := m.Header body := m.Body.(*zebra.IPRouteBody) - isV4 := header.Command == zebra.IPV4_ROUTE_ADD || header.Command == zebra.IPV4_ROUTE_DELETE + family := bgp.RF_IPv6_UC + if header.Command == zebra.IPV4_ROUTE_ADD || header.Command == zebra.IPV4_ROUTE_DELETE { + family = bgp.RF_IPv4_UC + } var nlri bgp.AddrPrefixInterface pattr := make([]bgp.PathAttributeInterface, 0) @@ -116,58 +113,107 @@ func createPathFromIPRouteMessage(m *zebra.Message, peerInfo *table.PeerInfo) *t "api": header.Command.String(), }).Debugf("create path from ip route message.") - if isV4 { + switch family { + case bgp.RF_IPv4_UC: nlri = bgp.NewIPAddrPrefix(body.PrefixLength, body.Prefix.String()) nexthop := bgp.NewPathAttributeNextHop(body.Nexthops[0].String()) pattr = append(pattr, nexthop) - } else { + case bgp.RF_IPv6_UC: nlri = bgp.NewIPv6AddrPrefix(body.PrefixLength, body.Prefix.String()) mpnlri = bgp.NewPathAttributeMpReachNLRI(body.Nexthops[0].String(), []bgp.AddrPrefixInterface{nlri}) pattr = append(pattr, mpnlri) + default: + log.WithFields(log.Fields{ + "Topic": "Zebra", + }).Errorf("unsupport address family: %s", family) + return nil } med := bgp.NewPathAttributeMultiExitDisc(body.Metric) pattr = append(pattr, med) - p := table.NewPath(peerInfo, nlri, isWithdraw, pattr, time.Now(), false) - p.SetIsFromExternal(true) - return p -} + binPattrs := make([][]byte, 0, len(pattr)) + for _, a := range pattr { + bin, _ := a.Serialize() + binPattrs = append(binPattrs, bin) + } -func newBroadcastZapiBestMsg(cli *zebra.Client, path *table.Path) *broadcastZapiMsg { - if cli == nil { - return nil + binNlri, _ := nlri.Serialize() + + path := &api.Path{ + Nlri: binNlri, + Pattrs: binPattrs, + IsWithdraw: isWithdraw, + Family: uint32(family), + IsFromExternal: true, } - m := newIPRouteMessage(path) - if m == nil { - return nil + return &api.AddPathRequest{ + Resource: api.Resource_GLOBAL, + Path: path, } - return &broadcastZapiMsg{ - client: cli, - msg: m, + +} + +type zebraWatcher struct { + t tomb.Tomb + ch chan watcherEvent + client *zebra.Client + apiCh chan *GrpcRequest +} + +func (w *zebraWatcher) notify(t watcherEventType) chan watcherEvent { + if t == WATCHER_EVENT_BESTPATH_CHANGE { + return w.ch } + return nil } -func handleZapiMsg(msg *zebra.Message, server *BgpServer) []*SenderMsg { +func (w *zebraWatcher) stop() { + w.t.Kill(nil) +} - switch b := msg.Body.(type) { - case *zebra.IPRouteBody: - pi := &table.PeerInfo{ - AS: server.bgpConfig.Global.Config.As, - LocalID: net.ParseIP(server.bgpConfig.Global.Config.RouterId).To4(), - } +func (w *zebraWatcher) restart(filename string) error { + return nil +} + +func (w *zebraWatcher) watchingEventTypes() []watcherEventType { + return []watcherEventType{WATCHER_EVENT_BESTPATH_CHANGE} +} - if b.Prefix != nil && len(b.Nexthops) > 0 && b.Type != zebra.ROUTE_KERNEL { - p := createPathFromIPRouteMessage(msg, pi) - msgs, _ := server.propagateUpdate(nil, []*table.Path{p}) - return msgs +func (w *zebraWatcher) loop() error { + for { + select { + case <-w.t.Dying(): + return w.client.Close() + case msg := <-w.client.Receive(): + switch msg.Body.(type) { + case *zebra.IPRouteBody: + p := createRequestFromIPRouteMessage(msg) + if p != nil { + ch := make(chan *GrpcResponse) + w.apiCh <- &GrpcRequest{ + RequestType: REQ_ADD_PATH, + Data: p, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + log.Errorf("failed to add path from zebra: %s", p) + } + } + } + case ev := <-w.ch: + msg := ev.(*watcherEventBestPathMsg) + for _, path := range msg.pathList { + if m := newIPRouteMessage(path); m != nil { + w.client.Send(m) + } + } } } - return nil } -func NewZclient(url string, redistRouteTypes []config.InstallProtocolType) (*zebra.Client, error) { +func newZebraWatcher(apiCh chan *GrpcRequest, url string, protos []string) (*zebraWatcher, error) { l := strings.SplitN(url, ":", 2) if len(l) != 2 { return nil, fmt.Errorf("unsupported url: %s", url) @@ -179,12 +225,18 @@ func NewZclient(url string, redistRouteTypes []config.InstallProtocolType) (*zeb cli.SendHello() cli.SendRouterIDAdd() cli.SendInterfaceAdd() - for _, typ := range redistRouteTypes { - t, err := zebra.RouteTypeFromString(string(typ)) + for _, typ := range protos { + t, err := zebra.RouteTypeFromString(typ) if err != nil { return nil, err } cli.SendRedistribute(t) } - return cli, nil + w := &zebraWatcher{ + ch: make(chan watcherEvent), + client: cli, + apiCh: apiCh, + } + w.t.Go(w.loop) + return w, nil } |