diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 11:24:52 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-07-26 11:24:52 +0900 |
commit | 9b1dac34bb22f159babf352bf43d7e3ffdf157e5 (patch) | |
tree | 50119400a00105e234bd3916e92ebc645c103a16 | |
parent | ebac86e07ac40d19037ca100c42bac7ba94aae12 (diff) |
openswitch: use proper gRPC API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | gobgpd/main.go | 2 | ||||
-rw-r--r-- | openswitch/openswitch.go | 177 |
2 files changed, 52 insertions, 127 deletions
diff --git a/gobgpd/main.go b/gobgpd/main.go index f886ea95..c8f32926 100644 --- a/gobgpd/main.go +++ b/gobgpd/main.go @@ -186,7 +186,7 @@ func main() { }() if opts.Ops { - m, err := ops.NewOpsManager(grpcServer, bgpServer.GrpcReqCh) + m, err := ops.NewOpsManager(opts.GrpcHosts) if err != nil { log.Errorf("Failed to start ops config manager: %s", err) os.Exit(1) diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go index f7206b91..3d8ca9ff 100644 --- a/openswitch/openswitch.go +++ b/openswitch/openswitch.go @@ -21,10 +21,10 @@ import ( api "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/gobgp/cmd" "github.com/osrg/gobgp/packet/bgp" - "github.com/osrg/gobgp/server" "github.com/satori/go.uuid" ovsdb "github.com/socketplane/libovsdb" "golang.org/x/net/context" + "google.golang.org/grpc" "net" "reflect" "strconv" @@ -268,29 +268,27 @@ func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, e return nil, nil, fmt.Errorf("neighbor not found") } -func (m *OpsManager) handleVrfUpdate(update ovsdb.TableUpdate) *server.GrpcRequest { +func (m *OpsManager) handleVrfUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) { for _, v := range update.Rows { if len(v.Old.Fields) == 0 { log.WithFields(log.Fields{ "Topic": "openswitch", }).Debug("new vrf") } else if _, ok := v.Old.Fields["bgp_routers"]; ok { - _, _, err := m.getBGPRouterUUID() - if err != nil { - return server.NewGrpcRequest(server.REQ_STOP_SERVER, "", bgp.RouteFamily(0), &api.StopServerRequest{}) + if _, _, err := m.getBGPRouterUUID(); err != nil { + cli.StopServer(context.Background(), &api.StopServerRequest{}) + return } } } - return nil } -func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest { +func (m *OpsManager) handleBgpRouterUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) { asn, id, err := m.getBGPRouterUUID() if err != nil { log.Debugf("%s", err) - return nil + return } - reqs := []*server.GrpcRequest{} for k, v := range update.Rows { if uuid.Equal(id, uuid.FromStringOrNil(k)) { initial := false @@ -306,21 +304,21 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G log.WithFields(log.Fields{ "Topic": "openswitch", }).Debug("router-id is not configured yet") - return nil + return } - reqs = append(reqs, server.NewGrpcRequest(server.REQ_START_SERVER, "", bgp.RouteFamily(0), &api.StartServerRequest{ + cli.StartServer(context.Background(), &api.StartServerRequest{ Global: &api.Global{ As: asn, RouterId: r, }, - })) + }) } if o, ok := v.Old.Fields["bgp_neighbors"]; ok { oldNeighMap := o.(ovsdb.OvsMap).GoMap newNeighMap := v.New.Fields["bgp_neighbors"].(ovsdb.OvsMap).GoMap for k, _ := range oldNeighMap { if _, ok := newNeighMap[k]; !ok { - m.grpcServer.DeleteNeighbor(context.Background(), &api.DeleteNeighborRequest{ + cli.DeleteNeighbor(context.Background(), &api.DeleteNeighborRequest{ Peer: &api.Peer{ Conf: &api.PeerConf{ NeighborAddress: k.(string), @@ -332,16 +330,14 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G } } } - return reqs } -func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest { +func (m *OpsManager) handleNeighborUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) { _, id, _ := m.getBGPRouterUUID() addrs, ids, err := m.getBGPNeighborUUIDs(id) if err != nil { - return nil + return } - reqs := make([]*server.GrpcRequest, 0, len(addrs)) for k, v := range update.Rows { for idx, id := range ids { if uuid.Equal(id, uuid.FromStringOrNil(k)) { @@ -352,7 +348,7 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr }).Debug("remote-as is not configured yet") continue } - m.grpcServer.AddNeighbor(context.Background(), &api.AddNeighborRequest{ + cli.AddNeighbor(context.Background(), &api.AddNeighborRequest{ Peer: &api.Peer{ Conf: &api.PeerConf{ NeighborAddress: addrs[idx].String(), @@ -363,12 +359,10 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr } } } - return reqs } -func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest { +func (m *OpsManager) handleRouteUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) { id, _ := m.getVrfUUID() - reqs := []*server.GrpcRequest{} for _, v := range update.Rows { vrf := v.New.Fields["vrf"] if vrf == nil { @@ -383,25 +377,24 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR "Path": path, "Err": err, }).Debug("failed to parse path") - return nil + return } if isWithdraw { - reqs = append(reqs, server.NewGrpcRequest(server.REQ_DELETE_PATH, "", bgp.RouteFamily(0), &api.AddPathRequest{ + cli.DeletePath(context.Background(), &api.DeletePathRequest{ Resource: api.Resource_GLOBAL, Path: path, - })) + }) } else { if isFromGobgp { - return nil + return } - reqs = append(reqs, server.NewGrpcRequest(server.REQ_ADD_PATH, "", bgp.RouteFamily(0), &api.AddPathRequest{ + cli.AddPath(context.Background(), &api.AddPathRequest{ Resource: api.Resource_GLOBAL, Path: path, - })) + }) } } } - return reqs } func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) { @@ -564,30 +557,20 @@ func (m *OpsManager) Transact(operations []ovsdb.Operation) error { return nil } -func (m *OpsManager) GobgpMonitor(ready *bool) { +func (m *OpsManager) GobgpMonitor(target string) { time.Sleep(time.Duration(time.Second * 2)) - reqCh := m.grpcCh - family := bgp.RF_IPv4_UC - arg := &api.Table{ - Type: api.Resource_GLOBAL, - Family: uint32(family), + + conn, err := grpc.Dial(target, grpc.WithTimeout(time.Second), grpc.WithBlock(), grpc.WithInsecure()) + if err != nil { + log.Fatal(err) } + cli := api.NewGobgpApiClient(conn) + stream, err := cli.MonitorRib(context.Background(), &api.Table{ + Type: api.Resource_GLOBAL, + Family: uint32(bgp.RF_IPv4_UC), + }) for { - if !*ready { - return - } - req := server.NewGrpcRequest(server.REQ_MONITOR_RIB, "", bgp.RouteFamily(arg.Family), arg) - reqCh <- req - res := <-req.ResponseCh - if err := res.Err(); err != nil { - log.WithFields(log.Fields{ - "Topic": "openswitch", - "Type": "Monitor", - "RequestType": req.RequestType, - "Err": err, - }).Error("grpc operation failed") - } - d := res.Data.(*api.Destination) + d, err := stream.Recv() bPath := d.Paths[0] if bPath.IsFromExternal && !bPath.IsWithdraw { continue @@ -612,45 +595,7 @@ func (m *OpsManager) GobgpMonitor(ready *bool) { } } -func (m *OpsManager) GobgpServe() error { - monitorReady := false - for { - var grpcReq *server.GrpcRequest - var grpcRes chan *server.GrpcResponse - if len(m.grpcQueue) < 1 { - time.Sleep(time.Duration(time.Millisecond * 10)) - continue - } - grpcReq = m.grpcQueue[0] - grpcRes = grpcReq.ResponseCh - - m.grpcCh <- grpcReq - m.grpcQueue = m.grpcQueue[1:] - r := <-grpcRes - - if r.Err() != nil { - log.WithFields(log.Fields{ - "Topic": "openswitch", - "Type": "ModRequest", - "Err": r.Err(), - }).Error("grpc operation failed") - } else { - if monitorReady { - if grpcReq.RequestType == server.REQ_STOP_SERVER { - monitorReady = false - } - } else { - if grpcReq.RequestType == server.REQ_START_SERVER { - monitorReady = true - go m.GobgpMonitor(&monitorReady) - } - } - } - } - return nil -} - -func (m *OpsManager) OpsServe() error { +func (m *OpsManager) OpsServe(target string) error { initial, err := m.ops.MonitorAll(TARGET_TABLE, "") if err != nil { return err @@ -658,37 +603,26 @@ func (m *OpsManager) OpsServe() error { go func() { m.opsUpdateCh <- initial }() + conn, err := grpc.Dial(target, grpc.WithTimeout(time.Second), grpc.WithBlock(), grpc.WithInsecure()) + if err != nil { + log.Fatal(err) + } + cli := api.NewGobgpApiClient(conn) for { select { case updates := <-m.opsUpdateCh: m.populateCache(*updates) - t, ok := updates.Updates["VRF"] - if ok { - req := m.handleVrfUpdate(t) - if req != nil { - m.grpcQueue = append(m.grpcQueue, req) - } + if t, ok := updates.Updates["VRF"]; ok { + m.handleVrfUpdate(cli, t) } - t, ok = updates.Updates["BGP_Router"] - if ok { - routerReqs := m.handleBgpRouterUpdate(t) - if len(routerReqs) > 0 { - m.grpcQueue = append(m.grpcQueue, routerReqs...) - } + if t, ok := updates.Updates["BGP_Router"]; ok { + m.handleBgpRouterUpdate(cli, t) } - t, ok = updates.Updates["BGP_Neighbor"] - if ok { - neighborReqs := m.handleNeighborUpdate(t) - if len(neighborReqs) > 0 { - m.grpcQueue = append(m.grpcQueue, neighborReqs...) - } + if t, ok := updates.Updates["BGP_Neighbor"]; ok { + m.handleNeighborUpdate(cli, t) } - t, ok = updates.Updates["BGP_Route"] - if ok { - routeReqs := m.handleRouteUpdate(t) - if len(routeReqs) > 0 { - m.grpcQueue = append(m.grpcQueue, routeReqs...) - } + if t, ok := updates.Updates["BGP_Route"]; ok { + m.handleRouteUpdate(cli, t) } case r := <-m.opsCh: if err := m.Transact(r.operations); err != nil { @@ -699,8 +633,8 @@ func (m *OpsManager) OpsServe() error { } func (m *OpsManager) Serve() error { - go m.OpsServe() - go m.GobgpServe() + go m.OpsServe(m.target) + go m.GobgpMonitor(m.target) return nil } @@ -708,44 +642,35 @@ type OpsOperation struct { operations []ovsdb.Operation } -type GrpcChs struct { - grpcCh chan *server.GrpcRequest -} - type OpsChs struct { opsCh chan *OpsOperation opsUpdateCh chan *ovsdb.TableUpdates } type OpsManager struct { - grpcServer *server.Server ops *ovsdb.OvsdbClient - grpcCh chan *server.GrpcRequest opsCh chan *OpsOperation opsUpdateCh chan *ovsdb.TableUpdates - grpcQueue []*server.GrpcRequest bgpReady bool cache map[string]map[string]ovsdb.Row + target string } -func NewOpsManager(grpcServer *server.Server, grpcCh chan *server.GrpcRequest) (*OpsManager, error) { +func NewOpsManager(target string) (*OpsManager, error) { ops, err := ovsdb.Connect("", 0) if err != nil { return nil, err } - gQueue := make([]*server.GrpcRequest, 0) opsUpdateCh := make(chan *ovsdb.TableUpdates) n := NewNotifier(opsUpdateCh) ops.Register(n) return &OpsManager{ - grpcServer: grpcServer, ops: ops, - grpcCh: grpcCh, opsCh: make(chan *OpsOperation, 1024), opsUpdateCh: opsUpdateCh, - grpcQueue: gQueue, bgpReady: false, cache: make(map[string]map[string]ovsdb.Row), + target: target, }, nil } |