summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-26 11:24:52 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-07-26 11:24:52 +0900
commit9b1dac34bb22f159babf352bf43d7e3ffdf157e5 (patch)
tree50119400a00105e234bd3916e92ebc645c103a16
parentebac86e07ac40d19037ca100c42bac7ba94aae12 (diff)
openswitch: use proper gRPC API
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--gobgpd/main.go2
-rw-r--r--openswitch/openswitch.go177
2 files changed, 52 insertions, 127 deletions
diff --git a/gobgpd/main.go b/gobgpd/main.go
index f886ea95..c8f32926 100644
--- a/gobgpd/main.go
+++ b/gobgpd/main.go
@@ -186,7 +186,7 @@ func main() {
}()
if opts.Ops {
- m, err := ops.NewOpsManager(grpcServer, bgpServer.GrpcReqCh)
+ m, err := ops.NewOpsManager(opts.GrpcHosts)
if err != nil {
log.Errorf("Failed to start ops config manager: %s", err)
os.Exit(1)
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go
index f7206b91..3d8ca9ff 100644
--- a/openswitch/openswitch.go
+++ b/openswitch/openswitch.go
@@ -21,10 +21,10 @@ import (
api "github.com/osrg/gobgp/api"
"github.com/osrg/gobgp/gobgp/cmd"
"github.com/osrg/gobgp/packet/bgp"
- "github.com/osrg/gobgp/server"
"github.com/satori/go.uuid"
ovsdb "github.com/socketplane/libovsdb"
"golang.org/x/net/context"
+ "google.golang.org/grpc"
"net"
"reflect"
"strconv"
@@ -268,29 +268,27 @@ func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, e
return nil, nil, fmt.Errorf("neighbor not found")
}
-func (m *OpsManager) handleVrfUpdate(update ovsdb.TableUpdate) *server.GrpcRequest {
+func (m *OpsManager) handleVrfUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) {
for _, v := range update.Rows {
if len(v.Old.Fields) == 0 {
log.WithFields(log.Fields{
"Topic": "openswitch",
}).Debug("new vrf")
} else if _, ok := v.Old.Fields["bgp_routers"]; ok {
- _, _, err := m.getBGPRouterUUID()
- if err != nil {
- return server.NewGrpcRequest(server.REQ_STOP_SERVER, "", bgp.RouteFamily(0), &api.StopServerRequest{})
+ if _, _, err := m.getBGPRouterUUID(); err != nil {
+ cli.StopServer(context.Background(), &api.StopServerRequest{})
+ return
}
}
}
- return nil
}
-func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest {
+func (m *OpsManager) handleBgpRouterUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) {
asn, id, err := m.getBGPRouterUUID()
if err != nil {
log.Debugf("%s", err)
- return nil
+ return
}
- reqs := []*server.GrpcRequest{}
for k, v := range update.Rows {
if uuid.Equal(id, uuid.FromStringOrNil(k)) {
initial := false
@@ -306,21 +304,21 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G
log.WithFields(log.Fields{
"Topic": "openswitch",
}).Debug("router-id is not configured yet")
- return nil
+ return
}
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_START_SERVER, "", bgp.RouteFamily(0), &api.StartServerRequest{
+ cli.StartServer(context.Background(), &api.StartServerRequest{
Global: &api.Global{
As: asn,
RouterId: r,
},
- }))
+ })
}
if o, ok := v.Old.Fields["bgp_neighbors"]; ok {
oldNeighMap := o.(ovsdb.OvsMap).GoMap
newNeighMap := v.New.Fields["bgp_neighbors"].(ovsdb.OvsMap).GoMap
for k, _ := range oldNeighMap {
if _, ok := newNeighMap[k]; !ok {
- m.grpcServer.DeleteNeighbor(context.Background(), &api.DeleteNeighborRequest{
+ cli.DeleteNeighbor(context.Background(), &api.DeleteNeighborRequest{
Peer: &api.Peer{
Conf: &api.PeerConf{
NeighborAddress: k.(string),
@@ -332,16 +330,14 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G
}
}
}
- return reqs
}
-func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest {
+func (m *OpsManager) handleNeighborUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) {
_, id, _ := m.getBGPRouterUUID()
addrs, ids, err := m.getBGPNeighborUUIDs(id)
if err != nil {
- return nil
+ return
}
- reqs := make([]*server.GrpcRequest, 0, len(addrs))
for k, v := range update.Rows {
for idx, id := range ids {
if uuid.Equal(id, uuid.FromStringOrNil(k)) {
@@ -352,7 +348,7 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr
}).Debug("remote-as is not configured yet")
continue
}
- m.grpcServer.AddNeighbor(context.Background(), &api.AddNeighborRequest{
+ cli.AddNeighbor(context.Background(), &api.AddNeighborRequest{
Peer: &api.Peer{
Conf: &api.PeerConf{
NeighborAddress: addrs[idx].String(),
@@ -363,12 +359,10 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr
}
}
}
- return reqs
}
-func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest {
+func (m *OpsManager) handleRouteUpdate(cli api.GobgpApiClient, update ovsdb.TableUpdate) {
id, _ := m.getVrfUUID()
- reqs := []*server.GrpcRequest{}
for _, v := range update.Rows {
vrf := v.New.Fields["vrf"]
if vrf == nil {
@@ -383,25 +377,24 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR
"Path": path,
"Err": err,
}).Debug("failed to parse path")
- return nil
+ return
}
if isWithdraw {
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_DELETE_PATH, "", bgp.RouteFamily(0), &api.AddPathRequest{
+ cli.DeletePath(context.Background(), &api.DeletePathRequest{
Resource: api.Resource_GLOBAL,
Path: path,
- }))
+ })
} else {
if isFromGobgp {
- return nil
+ return
}
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_ADD_PATH, "", bgp.RouteFamily(0), &api.AddPathRequest{
+ cli.AddPath(context.Background(), &api.AddPathRequest{
Resource: api.Resource_GLOBAL,
Path: path,
- }))
+ })
}
}
}
- return reqs
}
func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) {
@@ -564,30 +557,20 @@ func (m *OpsManager) Transact(operations []ovsdb.Operation) error {
return nil
}
-func (m *OpsManager) GobgpMonitor(ready *bool) {
+func (m *OpsManager) GobgpMonitor(target string) {
time.Sleep(time.Duration(time.Second * 2))
- reqCh := m.grpcCh
- family := bgp.RF_IPv4_UC
- arg := &api.Table{
- Type: api.Resource_GLOBAL,
- Family: uint32(family),
+
+ conn, err := grpc.Dial(target, grpc.WithTimeout(time.Second), grpc.WithBlock(), grpc.WithInsecure())
+ if err != nil {
+ log.Fatal(err)
}
+ cli := api.NewGobgpApiClient(conn)
+ stream, err := cli.MonitorRib(context.Background(), &api.Table{
+ Type: api.Resource_GLOBAL,
+ Family: uint32(bgp.RF_IPv4_UC),
+ })
for {
- if !*ready {
- return
- }
- req := server.NewGrpcRequest(server.REQ_MONITOR_RIB, "", bgp.RouteFamily(arg.Family), arg)
- reqCh <- req
- res := <-req.ResponseCh
- if err := res.Err(); err != nil {
- log.WithFields(log.Fields{
- "Topic": "openswitch",
- "Type": "Monitor",
- "RequestType": req.RequestType,
- "Err": err,
- }).Error("grpc operation failed")
- }
- d := res.Data.(*api.Destination)
+ d, err := stream.Recv()
bPath := d.Paths[0]
if bPath.IsFromExternal && !bPath.IsWithdraw {
continue
@@ -612,45 +595,7 @@ func (m *OpsManager) GobgpMonitor(ready *bool) {
}
}
-func (m *OpsManager) GobgpServe() error {
- monitorReady := false
- for {
- var grpcReq *server.GrpcRequest
- var grpcRes chan *server.GrpcResponse
- if len(m.grpcQueue) < 1 {
- time.Sleep(time.Duration(time.Millisecond * 10))
- continue
- }
- grpcReq = m.grpcQueue[0]
- grpcRes = grpcReq.ResponseCh
-
- m.grpcCh <- grpcReq
- m.grpcQueue = m.grpcQueue[1:]
- r := <-grpcRes
-
- if r.Err() != nil {
- log.WithFields(log.Fields{
- "Topic": "openswitch",
- "Type": "ModRequest",
- "Err": r.Err(),
- }).Error("grpc operation failed")
- } else {
- if monitorReady {
- if grpcReq.RequestType == server.REQ_STOP_SERVER {
- monitorReady = false
- }
- } else {
- if grpcReq.RequestType == server.REQ_START_SERVER {
- monitorReady = true
- go m.GobgpMonitor(&monitorReady)
- }
- }
- }
- }
- return nil
-}
-
-func (m *OpsManager) OpsServe() error {
+func (m *OpsManager) OpsServe(target string) error {
initial, err := m.ops.MonitorAll(TARGET_TABLE, "")
if err != nil {
return err
@@ -658,37 +603,26 @@ func (m *OpsManager) OpsServe() error {
go func() {
m.opsUpdateCh <- initial
}()
+ conn, err := grpc.Dial(target, grpc.WithTimeout(time.Second), grpc.WithBlock(), grpc.WithInsecure())
+ if err != nil {
+ log.Fatal(err)
+ }
+ cli := api.NewGobgpApiClient(conn)
for {
select {
case updates := <-m.opsUpdateCh:
m.populateCache(*updates)
- t, ok := updates.Updates["VRF"]
- if ok {
- req := m.handleVrfUpdate(t)
- if req != nil {
- m.grpcQueue = append(m.grpcQueue, req)
- }
+ if t, ok := updates.Updates["VRF"]; ok {
+ m.handleVrfUpdate(cli, t)
}
- t, ok = updates.Updates["BGP_Router"]
- if ok {
- routerReqs := m.handleBgpRouterUpdate(t)
- if len(routerReqs) > 0 {
- m.grpcQueue = append(m.grpcQueue, routerReqs...)
- }
+ if t, ok := updates.Updates["BGP_Router"]; ok {
+ m.handleBgpRouterUpdate(cli, t)
}
- t, ok = updates.Updates["BGP_Neighbor"]
- if ok {
- neighborReqs := m.handleNeighborUpdate(t)
- if len(neighborReqs) > 0 {
- m.grpcQueue = append(m.grpcQueue, neighborReqs...)
- }
+ if t, ok := updates.Updates["BGP_Neighbor"]; ok {
+ m.handleNeighborUpdate(cli, t)
}
- t, ok = updates.Updates["BGP_Route"]
- if ok {
- routeReqs := m.handleRouteUpdate(t)
- if len(routeReqs) > 0 {
- m.grpcQueue = append(m.grpcQueue, routeReqs...)
- }
+ if t, ok := updates.Updates["BGP_Route"]; ok {
+ m.handleRouteUpdate(cli, t)
}
case r := <-m.opsCh:
if err := m.Transact(r.operations); err != nil {
@@ -699,8 +633,8 @@ func (m *OpsManager) OpsServe() error {
}
func (m *OpsManager) Serve() error {
- go m.OpsServe()
- go m.GobgpServe()
+ go m.OpsServe(m.target)
+ go m.GobgpMonitor(m.target)
return nil
}
@@ -708,44 +642,35 @@ type OpsOperation struct {
operations []ovsdb.Operation
}
-type GrpcChs struct {
- grpcCh chan *server.GrpcRequest
-}
-
type OpsChs struct {
opsCh chan *OpsOperation
opsUpdateCh chan *ovsdb.TableUpdates
}
type OpsManager struct {
- grpcServer *server.Server
ops *ovsdb.OvsdbClient
- grpcCh chan *server.GrpcRequest
opsCh chan *OpsOperation
opsUpdateCh chan *ovsdb.TableUpdates
- grpcQueue []*server.GrpcRequest
bgpReady bool
cache map[string]map[string]ovsdb.Row
+ target string
}
-func NewOpsManager(grpcServer *server.Server, grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
+func NewOpsManager(target string) (*OpsManager, error) {
ops, err := ovsdb.Connect("", 0)
if err != nil {
return nil, err
}
- gQueue := make([]*server.GrpcRequest, 0)
opsUpdateCh := make(chan *ovsdb.TableUpdates)
n := NewNotifier(opsUpdateCh)
ops.Register(n)
return &OpsManager{
- grpcServer: grpcServer,
ops: ops,
- grpcCh: grpcCh,
opsCh: make(chan *OpsOperation, 1024),
opsUpdateCh: opsUpdateCh,
- grpcQueue: gQueue,
bgpReady: false,
cache: make(map[string]map[string]ovsdb.Row),
+ target: target,
}, nil
}