summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorNaoto Hanaue <hanaue.naoto@po.ntts.co.jp>2016-02-03 21:05:19 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-02-19 21:14:59 -0800
commit9f3840ccb8232c60abf3d6eab48d9b294eb7d8a0 (patch)
treec0b264d9a434fd57b5ec175e0ee26f100b227aee
parentbbd5a2cfae44425f026b8a4567367450cb96d8f4 (diff)
ops: add feature that insert gobgp route to ops
-rw-r--r--openswitch/openswitch.go331
1 files changed, 268 insertions, 63 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go
index 4ed77f47..99da17f2 100644
--- a/openswitch/openswitch.go
+++ b/openswitch/openswitch.go
@@ -19,12 +19,23 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
api "github.com/osrg/gobgp/api"
+ "github.com/osrg/gobgp/gobgp/cmd"
"github.com/osrg/gobgp/packet"
"github.com/osrg/gobgp/server"
ovsdb "github.com/osrg/libovsdb"
"github.com/satori/go.uuid"
"net"
"reflect"
+ "time"
+)
+
+const (
+ TARGET_TABLE = "OpenSwitch"
+)
+
+const (
+ NEXTHOP_UUID = "nexthop"
+ ROUTE_UUID = "route"
)
type Notifier struct {
@@ -49,13 +60,6 @@ func NewNotifier(ch chan *ovsdb.TableUpdates) *Notifier {
}
}
-type OpsManager struct {
- client *ovsdb.OvsdbClient
- grpcCh chan *server.GrpcRequest
- updateCh chan *ovsdb.TableUpdates
- cache map[string]map[string]ovsdb.Row
-}
-
func (m *OpsManager) populateCache(updates ovsdb.TableUpdates) {
for table, tableUpdate := range updates.Updates {
if _, ok := m.cache[table]; !ok {
@@ -84,11 +88,29 @@ func extractUUID(v interface{}) uuid.UUID {
return uuid.FromStringOrNil(vv[1].(string))
}
+func (m *OpsManager) getRootUUID() (uuid.UUID, error) {
+ for k, _ := range m.cache[TARGET_TABLE] {
+ return uuid.FromStringOrNil(k), nil
+ }
+ return uuid.Nil, fmt.Errorf("OpenSwitch table not found")
+}
+
+func (m *OpsManager) getVrfUUID() (uuid.UUID, error) {
+ vrfs, ok := m.cache["VRF"]
+ if !ok {
+ return uuid.Nil, fmt.Errorf("VRF table not found")
+ }
+ for k, _ := range vrfs {
+ return uuid.FromStringOrNil(k), nil
+ }
+ return uuid.Nil, fmt.Errorf("vrf table not found")
+}
+
func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) {
var asn uint32
vrfs, ok := m.cache["VRF"]
if !ok {
- return asn, uuid.Nil, fmt.Errorf("no vrf table")
+ return asn, uuid.Nil, fmt.Errorf("VRF table not found")
}
for _, v := range vrfs {
if v.Fields["name"] == "vrf_default" {
@@ -245,112 +267,295 @@ func (m *OpsManager) Serve() error {
}
func (m *OpsManager) OpsServe() error {
- initial, err := m.client.MonitorAll("OpenSwitch", "")
+ initial, err := m.ops.MonitorAll(TARGET_TABLE, "")
if err != nil {
return err
}
go func() {
- m.updateCh <- initial
+ m.opsChs.opsUpdateCh <- initial
}()
- reqs := make([]*server.GrpcRequest, 0)
- ress := make([]*server.GrpcRequest, 0)
for {
- var req, res *server.GrpcRequest
- var reqCh chan *server.GrpcRequest
- var resCh chan *server.GrpcResponse
- if len(reqs) > 0 {
- req = reqs[0]
- reqCh = m.grpcCh
- }
- if len(ress) > 0 {
- res = ress[0]
- resCh = res.ResponseCh
- }
select {
- case updates := <-m.updateCh:
+ case updates := <-m.opsChs.opsUpdateCh:
m.populateCache(*updates)
t, ok := updates.Updates["VRF"]
if ok {
req := m.handleVrfUpdate(t)
if req != nil {
- reqs = append(reqs, req)
+ m.grpcQueue = append(m.grpcQueue, req)
}
}
t, ok = updates.Updates["BGP_Router"]
if ok {
routerReqs := m.handleBgpRouterUpdate(t)
if len(routerReqs) > 0 {
- reqs = append(reqs, routerReqs...)
+ m.grpcQueue = append(m.grpcQueue, routerReqs...)
}
+ m.grpcChs.monitorAlready <- 1
}
t, ok = updates.Updates["BGP_Neighbor"]
if ok {
neighborReqs := m.handleNeighborUpdate(t)
if len(neighborReqs) > 0 {
- reqs = append(reqs, neighborReqs...)
+ m.grpcQueue = append(m.grpcQueue, neighborReqs...)
}
}
- case reqCh <- req:
- ress = append(ress, req)
- reqs = reqs[1:]
- case r := <-resCh:
- if err := r.Err(); err != nil {
- log.Errorf("operation failed. reqtype: %d, err: %s", res.RequestType, err)
+ case r := <-m.opsChs.opsCh:
+ if err := m.Transact(r.operations); err != nil {
+ log.Errorf("%v", err)
}
- ress = ress[1:]
}
}
return nil
}
func (m *OpsManager) GobgpServe() error {
+ go m.GobgpMonitor()
+ for {
+ var grpcReq *server.GrpcRequest
+ var grpcRes chan *server.GrpcResponse
+ if len(m.grpcQueue) < 1{
+ time.Sleep(time.Duration(time.Second *1))
+ continue
+ }
+ grpcReq = m.grpcQueue[0]
+ grpcRes = grpcReq.ResponseCh
+
+ m.grpcChs.grpcCh <- grpcReq
+ m.grpcQueue = m.grpcQueue[1:]
+ r := <-grpcRes
+
+
+ if err := r.Err(); err != nil {
+ log.Errorf("operation failed. err: %s", err)
+ }
+ }
+ return nil
+}
+
+func (m *OpsManager) GobgpMonitor() {
+ <-m.grpcChs.monitorAlready
+ time.Sleep(time.Duration(time.Second *2))
+ reqCh := m.grpcChs.grpcUpdateCh
family := bgp.RF_IPv4_UC
arg := &api.Arguments{
Resource: api.Resource_GLOBAL,
Family: uint32(family),
}
-
- stream, err := client.MonitorBestChanged(context.Background(), arg)
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
for {
- d, err := stream.Recv()
- if err == io.EOF {
- break
- } else if err != nil {
- fmt.Println(err)
- os.Exit(1)
+ req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg)
+ reqCh <- req
+ res := <-req.ResponseCh
+ if err := res.Err(); err != nil {
+ log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err)
+ }
+ d := res.Data.(*api.Destination)
+ p, err := cmd.ApiStruct2Path(d.Paths[0])
+ if err != nil {
+ log.Error("faild parse")
}
- p, err := ApiStruct2Path(d.Paths[0])
+ o, err := m.TransactPreparation(p)
if err != nil {
- fmt.Println(err)
- os.Exit(1)
+ log.Errorf("%v", err)
}
+ m.opsChs.opsCh <- o
+ }
+}
- if globalOpts.Json {
- j, _ := json.Marshal(p)
- fmt.Println(string(j))
- } else {
- ShowRoute(p, false, false, false, true, false)
+func (m *OpsManager) Transact(operations []ovsdb.Operation) error {
+ ops := m.ops
+ reply, err := ops.Transact(TARGET_TABLE, operations...)
+ if err != nil {
+ return err
+ }
+ if len(reply) < len(operations) {
+ return fmt.Errorf("number of replies should be atleast equal to number of Operations")
+ }
+ ok := true
+ for i, o := range reply {
+ if o.Error != "" && i < len(operations) {
+ log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i])
+ ok = false
+ } else if o.Error != "" {
+ log.Errorf("transaction failed due to an error :", o.Error)
+ ok = false
}
}
+ if ok {
+ log.Debugf("bgp route addition successful")
+ } else {
+ return fmt.Errorf("bgp route addition failed")
+ }
return nil
}
-func NewOpsManager(ch chan *server.GrpcRequest) (*OpsManager, error) {
- cli, err := ovsdb.ConnectUnix("")
+func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) {
+ v, err := m.getVrfUUID()
+ if err != nil {
+ return nil, err
+ }
+ opsRoute, err := createOpsRoute(p)
if err != nil {
return nil, err
}
- updateCh := make(chan *ovsdb.TableUpdates)
- n := NewNotifier(updateCh)
- cli.Register(n)
+ insNextHopOp := insertNextHop(opsRoute)
+ insRouteOp, err := insertRoute(v, opsRoute)
+ if err != nil {
+ return nil, err
+ }
+ o := []ovsdb.Operation{insNextHopOp, insRouteOp}
+ oOperation := &OpsOperation{
+ operations: o,
+ }
+ return oOperation, nil
+}
+
+func insertNextHop(opsRoute map[string]interface{}) ovsdb.Operation {
+ nexthop := make(map[string]interface{})
+ nexthop["ip_address"] = opsRoute["bgp_nexthops"]
+ nexthop["type"] = opsRoute["sub_address_family"]
+ insNextHopOp := ovsdb.Operation{
+ Op: "insert",
+ Table: "BGP_Nexthop",
+ Row: nexthop,
+ UUIDName: NEXTHOP_UUID,
+ }
+ return insNextHopOp
+}
+
+func insertRoute(vrfId uuid.UUID, opsRoute map[string]interface{}) (ovsdb.Operation, error) {
+ v := []ovsdb.UUID{ovsdb.UUID{vrfId.String()}}
+ vrfSet, _ := ovsdb.NewOvsSet(v)
+ opsRoute["vrf"] = vrfSet
+
+ nexthop := []ovsdb.UUID{ovsdb.UUID{NEXTHOP_UUID}}
+ nexthopSet, _ := ovsdb.NewOvsSet(nexthop)
+ opsRoute["bgp_nexthops"] = nexthopSet
+
+ attrMap, err := ovsdb.NewOvsMap(opsRoute["path_attributes"])
+ if err != nil {
+ return ovsdb.Operation{}, err
+ }
+
+ opsRoute["path_attributes"] = attrMap
+
+ insRouteOp := ovsdb.Operation{
+ Op: "insert",
+ Table: "BGP_Route",
+ Row: opsRoute,
+ UUIDName: ROUTE_UUID,
+ }
+ return insRouteOp, nil
+}
+
+func createOpsRoute(pl []*cmd.Path) (map[string]interface{}, error) {
+
+ route := make(map[string]interface{})
+ route["metric"] = 0
+ route["peer"] = "Remote announcement"
+ for _, p := range pl {
+ var nexthop string
+ pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"}
+ for _, a := range p.PathAttrs {
+ switch a.GetType() {
+ case bgp.BGP_ATTR_TYPE_NEXT_HOP:
+ nexthop = a.(*bgp.PathAttributeNextHop).Value.String()
+ case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI:
+ n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop
+ if n != nil {
+ nexthop = n.String()
+ } else {
+ nexthop = ""
+ }
+ case bgp.BGP_ATTR_TYPE_AS_PATH:
+ pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String()
+ case bgp.BGP_ATTR_TYPE_ORIGIN:
+ origin := "-"
+ switch a.(*bgp.PathAttributeOrigin).Value[0] {
+ case bgp.BGP_ORIGIN_ATTR_TYPE_IGP:
+ origin = "i"
+ case bgp.BGP_ORIGIN_ATTR_TYPE_EGP:
+ origin = "e"
+ case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE:
+ origin = "?"
+ }
+ pathAttr["BGP_origin"] = origin
+ case bgp.BGP_ATTR_TYPE_LOCAL_PREF:
+ pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value)
+ case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC:
+ route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value
+ default:
+ continue
+ }
+ }
+ afi := "ipv4"
+ if p.Nlri.AFI() != bgp.AFI_IP {
+ afi = "ipv6"
+ }
+ safi := "unicast"
+
+ route["prefix"] = p.Nlri.String()
+ route["address_family"] = afi
+ route["sub_address_family"] = safi
+ route["bgp_nexthops"] = nexthop
+ route["path_attributes"] = pathAttr
+ break
+ }
+
+ return route, nil
+}
+
+type OpsOperation struct {
+ operations []ovsdb.Operation
+}
+
+type GrpcChs struct {
+ grpcCh chan *server.GrpcRequest
+ grpcUpdateCh chan *server.GrpcRequest
+ monitorAlready chan int
+}
+
+type OpsChs struct {
+ opsCh chan *OpsOperation
+ opsUpdateCh chan *ovsdb.TableUpdates
+}
+
+type OpsManager struct {
+ ops *ovsdb.OvsdbClient
+ grpcChs *GrpcChs
+ opsChs *OpsChs
+ grpcQueue []*server.GrpcRequest
+// opsQueue []*OpsOperation
+ bgpReady bool
+ cache map[string]map[string]ovsdb.Row
+}
+
+func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
+ ops, err := ovsdb.ConnectUnix("")
+ if err != nil {
+ return nil, err
+ }
+// oQueue := make([]*OpsOperation, 0)
+ gQueue := make([]*server.GrpcRequest, 0)
+ grpcChs := &GrpcChs{
+ grpcCh: grpcCh,
+ grpcUpdateCh: grpcCh,
+ monitorAlready: make(chan int),
+ }
+ opsUpdateCh := make(chan *ovsdb.TableUpdates)
+ n := NewNotifier(opsUpdateCh)
+ ops.Register(n)
+ opsChs := &OpsChs{
+ opsCh: make(chan *OpsOperation, 1024),
+ opsUpdateCh: opsUpdateCh,
+ }
return &OpsManager{
- client: cli,
- grpcCh: ch,
- updateCh: updateCh,
- cache: make(map[string]map[string]ovsdb.Row),
+ ops: ops,
+ grpcChs: grpcChs,
+ opsChs: opsChs,
+ grpcQueue: gQueue,
+// opsQueue: oQueue,
+ bgpReady: false,
+ cache: make(map[string]map[string]ovsdb.Row),
}, nil
}