summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorNaoto Hanaue <hanaue.naoto@po.ntts.co.jp>2016-02-10 09:20:04 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-02-19 21:14:59 -0800
commit1e749caf0c926b61777ec8648c31e81725d2baf3 (patch)
tree73c9653ba99a434be696ca42955caffa5f8fa005
parente3061382d751ded78e5bec5078958eeaad0ce797 (diff)
ops: add feature that insert ops route to gobgp
-rw-r--r--openswitch/openswitch.go528
1 files changed, 322 insertions, 206 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go
index db37e255..b618f48c 100644
--- a/openswitch/openswitch.go
+++ b/openswitch/openswitch.go
@@ -26,6 +26,7 @@ import (
"github.com/satori/go.uuid"
"net"
"reflect"
+ "strconv"
"time"
)
@@ -134,6 +135,90 @@ func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) {
return asn, uuid.Nil, fmt.Errorf("not found")
}
+func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
+ var nlri bgp.AddrPrefixInterface
+ path := &api.Path{
+ Pattrs: make([][]byte, 0),
+ }
+
+ prefix := src.New.Fields["prefix"].(string)
+ safi := src.New.Fields["sub_address_family"].(string)
+ afi := src.New.Fields["address_family"].(string)
+ m := src.New.Fields["metric"].(float64)
+ nh := src.New.Fields["bgp_nexthops"].(ovsdb.OvsSet).GoSet
+ attrs := src.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap
+
+ nexthop := "0.0.0.0"
+ if afi == "ipv6" {
+ nexthop = "::"
+ }
+ if len(nh) == 0 {
+ log.Debug("nexthop addres does not exist")
+ } else if len(nh) == 1 {
+ if net.ParseIP(nh[0].(string)) == nil {
+ return nil, fmt.Errorf("invalid nexthop address")
+ }
+ } else {
+ return nil, fmt.Errorf("route has multiple nexthop address")
+ }
+
+ med, _ := bgp.NewPathAttributeMultiExitDisc(uint32(m)).Serialize()
+ path.Pattrs = append(path.Pattrs, med)
+
+ lpref, err := strconv.Atoi(attrs["BGP_loc_pref"].(string))
+ if err != nil {
+ return nil, err
+ }
+ localPref, _ := bgp.NewPathAttributeLocalPref(uint32(lpref)).Serialize()
+ path.Pattrs = append(path.Pattrs, localPref)
+
+ var origin_t int
+ switch attrs["BGP_origin"].(string) {
+ case "i":
+ origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_IGP
+ case "e":
+ origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_EGP
+ case "?":
+ origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE
+ default:
+ return nil, fmt.Errorf("invalid origin")
+ }
+ origin, _ := bgp.NewPathAttributeOrigin(uint8(origin_t)).Serialize()
+ path.Pattrs = append(path.Pattrs, origin)
+
+ switch afi {
+ case "ipv4", "ipv6":
+ ip, net, err := net.ParseCIDR(prefix)
+ if err != nil {
+ return nil, err
+ }
+ ones, _ := net.Mask.Size()
+ if afi == "ipv4" {
+ if ip.To4() == nil {
+ return nil, fmt.Errorf("invalid ipv4 prefix")
+ }
+ nlri = bgp.NewIPAddrPrefix(uint8(ones), ip.String())
+ } else {
+ if ip.To16() == nil {
+ return nil, fmt.Errorf("invalid ipv6 prefix")
+ }
+ nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String())
+ }
+ default:
+ return nil, fmt.Errorf("unsupported address family: %s", afi)
+ }
+
+ if afi == "ipv4" && safi == "unicast" {
+ path.Nlri, _ = nlri.Serialize()
+ n, _ := bgp.NewPathAttributeNextHop(nexthop).Serialize()
+ path.Pattrs = append(path.Pattrs, n)
+ } else {
+ mpreach, _ := bgp.NewPathAttributeMpReachNLRI(nexthop, []bgp.AddrPrefixInterface{nlri}).Serialize()
+ path.Pattrs = append(path.Pattrs, mpreach)
+ }
+ return path, nil
+}
+
func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, error) {
global, ok := m.cache["BGP_Router"]
if !ok {
@@ -260,161 +345,89 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr
return reqs
}
-func (m *OpsManager) Serve() error {
- go m.OpsServe()
- go m.GobgpServe()
- return nil
-}
-
-func (m *OpsManager) OpsServe() error {
- initial, err := m.ops.MonitorAll(TARGET_TABLE, "")
- if err != nil {
- return err
- }
- go func() {
- m.opsChs.opsUpdateCh <- initial
- }()
- for {
- select {
- case updates := <-m.opsChs.opsUpdateCh:
- m.populateCache(*updates)
- t, ok := updates.Updates["VRF"]
- if ok {
- req := m.handleVrfUpdate(t)
- if req != nil {
- m.grpcQueue = append(m.grpcQueue, req)
- }
- }
- t, ok = updates.Updates["BGP_Router"]
- if ok {
- routerReqs := m.handleBgpRouterUpdate(t)
- if len(routerReqs) > 0 {
- m.grpcQueue = append(m.grpcQueue, routerReqs...)
- }
- m.grpcChs.monitorAlready <- 1
- }
- t, ok = updates.Updates["BGP_Neighbor"]
- if ok {
- neighborReqs := m.handleNeighborUpdate(t)
- if len(neighborReqs) > 0 {
- m.grpcQueue = append(m.grpcQueue, neighborReqs...)
- }
- }
- case r := <-m.opsChs.opsCh:
- if err := m.Transact(r.operations); err != nil {
- log.Errorf("%v", err)
- }
- }
- }
- return nil
-}
-
-func (m *OpsManager) GobgpServe() error {
- go m.GobgpMonitor()
- for {
- var grpcReq *server.GrpcRequest
- var grpcRes chan *server.GrpcResponse
- if len(m.grpcQueue) < 1{
- time.Sleep(time.Duration(time.Second *1))
+func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest {
+ id, _ := m.getVrfUUID()
+ reqs := []*server.GrpcRequest{}
+ for _, v := range update.Rows {
+ vrf := v.New.Fields["vrf"]
+ if vrf == nil {
continue
}
- grpcReq = m.grpcQueue[0]
- grpcRes = grpcReq.ResponseCh
-
- m.grpcChs.grpcCh <- grpcReq
- m.grpcQueue = m.grpcQueue[1:]
- r := <-grpcRes
-
-
- if err := r.Err(); err != nil {
- log.Errorf("operation failed. err: %s", err)
+ idx := vrf.(ovsdb.UUID).GoUuid
+ if uuid.Equal(id, uuid.FromStringOrNil(idx)) {
+ path, err := parseRouteToGobgp(v)
+ if err != nil {
+ log.Error("faild to parse path")
+ return nil
+ }
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "", bgp.RouteFamily(0), &api.ModPathArguments{
+ Operation: api.Operation_ADD,
+ Resource: api.Resource_GLOBAL,
+ Name: "",
+ Path: path,
+ }))
}
}
- return nil
+ return reqs
}
-func (m *OpsManager) GobgpMonitor() {
- <-m.grpcChs.monitorAlready
- time.Sleep(time.Duration(time.Second *2))
- reqCh := m.grpcChs.grpcUpdateCh
- family := bgp.RF_IPv4_UC
- arg := &api.Arguments{
- Resource: api.Resource_GLOBAL,
- Family: uint32(family),
- }
- for {
- req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg)
- reqCh <- req
- res := <-req.ResponseCh
- if err := res.Err(); err != nil {
- log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err)
- }
- d := res.Data.(*api.Destination)
- p, err := cmd.ApiStruct2Path(d.Paths[0])
- if err != nil {
- log.Error("faild parse")
- }
- o, err := m.TransactPreparation(p)
- if err != nil {
- log.Errorf("%v", err)
+func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) {
+ route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"}
+ IsWithdraw := false
+ // route := make(map[string]interface{})
+ // route["metric"] = 0
+ // route["peer"] = "Remote announcement"
+ for _, p := range pl {
+ var nexthop string
+ pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"}
+ for _, a := range p.PathAttrs {
+ switch a.GetType() {
+ case bgp.BGP_ATTR_TYPE_NEXT_HOP:
+ nexthop = a.(*bgp.PathAttributeNextHop).Value.String()
+ case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI:
+ n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop
+ if n != nil {
+ nexthop = n.String()
+ } else {
+ nexthop = ""
+ }
+ case bgp.BGP_ATTR_TYPE_AS_PATH:
+ pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String()
+ case bgp.BGP_ATTR_TYPE_ORIGIN:
+ origin := "-"
+ switch a.(*bgp.PathAttributeOrigin).Value[0] {
+ case bgp.BGP_ORIGIN_ATTR_TYPE_IGP:
+ origin = "i"
+ case bgp.BGP_ORIGIN_ATTR_TYPE_EGP:
+ origin = "e"
+ case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE:
+ origin = "?"
+ }
+ pathAttr["BGP_origin"] = origin
+ case bgp.BGP_ATTR_TYPE_LOCAL_PREF:
+ pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value)
+ case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC:
+ route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value
+ default:
+ continue
+ }
}
- m.opsChs.opsCh <- o
- }
-}
-
-func (m *OpsManager) Transact(operations []ovsdb.Operation) error {
- ops := m.ops
- reply, err := ops.Transact(TARGET_TABLE, operations...)
- if err != nil {
- return err
- }
- if len(reply) < len(operations) {
- return fmt.Errorf("number of replies should be atleast equal to number of Operations")
- }
- ok := true
- for i, o := range reply {
- if o.Error != "" && i < len(operations) {
- log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i])
- ok = false
- } else if o.Error != "" {
- log.Errorf("transaction failed due to an error :", o.Error)
- ok = false
+ IsWithdraw = p.IsWithdraw
+ afi := "ipv4"
+ if p.Nlri.AFI() != bgp.AFI_IP {
+ afi = "ipv6"
}
- }
- if ok {
- log.Debugf("bgp route addition successful")
- } else {
- return fmt.Errorf("bgp route addition failed")
- }
- return nil
-}
+ safi := "unicast"
-func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) {
- v, err := m.getVrfUUID()
- if err != nil {
- return nil, err
- }
- opsRoute, isWithdraw, err := createOpsRoute(p)
- if err != nil {
- return nil, err
+ route["prefix"] = p.Nlri.String()
+ route["address_family"] = afi
+ route["sub_address_family"] = safi
+ route["bgp_nexthops"] = nexthop
+ route["path_attributes"] = pathAttr
+ break
}
- var o []ovsdb.Operation
- if !isWithdraw {
- insNextHopOp := insertNextHop(opsRoute)
- insRouteOp, err := insertRoute(v, opsRoute)
- if err != nil {
- return nil, err
- }
- o = []ovsdb.Operation{insNextHopOp, insRouteOp}
- } else {
- delRouteOp := deleteRoute(opsRoute)
- o = []ovsdb.Operation{delRouteOp}
- }
- oOperation := &OpsOperation{
- operations: o,
- }
- return oOperation, nil
+ return route, IsWithdraw, nil
}
func insertNextHop(opsRoute map[string]interface{}) ovsdb.Operation {
@@ -455,7 +468,7 @@ func insertRoute(vrfId uuid.UUID, opsRoute map[string]interface{}) (ovsdb.Operat
return insRouteOp, nil
}
-func deleteRoute(opsRoute map[string]interface{}) (ovsdb.Operation) {
+func deleteRoute(opsRoute map[string]interface{}) ovsdb.Operation {
condition := ovsdb.NewCondition("prefix", "==", opsRoute["prefix"])
deleteOp := ovsdb.Operation{
Op: "delete",
@@ -465,63 +478,166 @@ func deleteRoute(opsRoute map[string]interface{}) (ovsdb.Operation) {
return deleteOp
}
-func createOpsRoute(pl []*cmd.Path) (map[string]interface{}, bool, error) {
- route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"}
- IsWithdraw := false
-// route := make(map[string]interface{})
-// route["metric"] = 0
-// route["peer"] = "Remote announcement"
- for _, p := range pl {
- var nexthop string
- pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"}
- for _, a := range p.PathAttrs {
- switch a.GetType() {
- case bgp.BGP_ATTR_TYPE_NEXT_HOP:
- nexthop = a.(*bgp.PathAttributeNextHop).Value.String()
- case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI:
- n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop
- if n != nil {
- nexthop = n.String()
- } else {
- nexthop = ""
+func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) {
+ v, err := m.getVrfUUID()
+ if err != nil {
+ return nil, err
+ }
+ opsRoute, isWithdraw, err := parseRouteToOps(p)
+ if err != nil {
+ return nil, err
+ }
+
+ var o []ovsdb.Operation
+ if !isWithdraw {
+ insNextHopOp := insertNextHop(opsRoute)
+ insRouteOp, err := insertRoute(v, opsRoute)
+ if err != nil {
+ return nil, err
+ }
+ o = []ovsdb.Operation{insNextHopOp, insRouteOp}
+ } else {
+ delRouteOp := deleteRoute(opsRoute)
+ o = []ovsdb.Operation{delRouteOp}
+ }
+ oOperation := &OpsOperation{
+ operations: o,
+ }
+ return oOperation, nil
+}
+
+func (m *OpsManager) Transact(operations []ovsdb.Operation) error {
+ ops := m.ops
+ reply, err := ops.Transact(TARGET_TABLE, operations...)
+ if err != nil {
+ return err
+ }
+ if len(reply) < len(operations) {
+ return fmt.Errorf("number of replies should be atleast equal to number of Operations")
+ }
+ ok := true
+ for i, o := range reply {
+ if o.Error != "" && i < len(operations) {
+ log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i])
+ ok = false
+ } else if o.Error != "" {
+ log.Errorf("transaction failed due to an error :", o.Error)
+ ok = false
+ }
+ }
+ if ok {
+ log.Debugf("bgp route update successful")
+ } else {
+ return fmt.Errorf("bgp route update failed")
+ }
+ return nil
+}
+
+func (m *OpsManager) GobgpMonitor() {
+ <-m.grpcChs.monitorAlready
+ time.Sleep(time.Duration(time.Second * 2))
+ reqCh := m.grpcChs.grpcUpdateCh
+ family := bgp.RF_IPv4_UC
+ arg := &api.Arguments{
+ Resource: api.Resource_GLOBAL,
+ Family: uint32(family),
+ }
+ for {
+ req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg)
+ reqCh <- req
+ res := <-req.ResponseCh
+ if err := res.Err(); err != nil {
+ log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err)
+ }
+ d := res.Data.(*api.Destination)
+ p, err := cmd.ApiStruct2Path(d.Paths[0])
+ if err != nil {
+ log.Error("faild parse")
+ }
+ o, err := m.TransactPreparation(p)
+ if err != nil {
+ log.Errorf("%v", err)
+ }
+ m.opsChs.opsCh <- o
+ }
+}
+
+func (m *OpsManager) OpsServe() error {
+ initial, err := m.ops.MonitorAll(TARGET_TABLE, "")
+ if err != nil {
+ return err
+ }
+ go func() {
+ m.opsChs.opsUpdateCh <- initial
+ }()
+ for {
+ select {
+ case updates := <-m.opsChs.opsUpdateCh:
+ m.populateCache(*updates)
+ t, ok := updates.Updates["VRF"]
+ if ok {
+ req := m.handleVrfUpdate(t)
+ if req != nil {
+ m.grpcQueue = append(m.grpcQueue, req)
}
- case bgp.BGP_ATTR_TYPE_AS_PATH:
- pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String()
- case bgp.BGP_ATTR_TYPE_ORIGIN:
- origin := "-"
- switch a.(*bgp.PathAttributeOrigin).Value[0] {
- case bgp.BGP_ORIGIN_ATTR_TYPE_IGP:
- origin = "i"
- case bgp.BGP_ORIGIN_ATTR_TYPE_EGP:
- origin = "e"
- case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE:
- origin = "?"
+ }
+ t, ok = updates.Updates["BGP_Router"]
+ if ok {
+ routerReqs := m.handleBgpRouterUpdate(t)
+ if len(routerReqs) > 0 {
+ m.grpcQueue = append(m.grpcQueue, routerReqs...)
}
- pathAttr["BGP_origin"] = origin
- case bgp.BGP_ATTR_TYPE_LOCAL_PREF:
- pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value)
- case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC:
- route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value
- default:
- continue
+ m.grpcChs.monitorAlready <- 1
+ }
+ t, ok = updates.Updates["BGP_Neighbor"]
+ if ok {
+ neighborReqs := m.handleNeighborUpdate(t)
+ if len(neighborReqs) > 0 {
+ m.grpcQueue = append(m.grpcQueue, neighborReqs...)
+ }
+ }
+ t, ok = updates.Updates["BGP_Route"]
+ if ok {
+ routeReqs := m.handleRouteUpdate(t)
+ if len(routeReqs) > 0 {
+ m.grpcQueue = append(m.grpcQueue, routeReqs...)
+ }
+ }
+ case r := <-m.opsChs.opsCh:
+ if err := m.Transact(r.operations); err != nil {
}
}
- IsWithdraw = p.IsWithdraw
- afi := "ipv4"
- if p.Nlri.AFI() != bgp.AFI_IP {
- afi = "ipv6"
+ }
+ return nil
+}
+
+func (m *OpsManager) GobgpServe() error {
+ go m.GobgpMonitor()
+ for {
+ var grpcReq *server.GrpcRequest
+ var grpcRes chan *server.GrpcResponse
+ if len(m.grpcQueue) < 1 {
+ time.Sleep(time.Duration(time.Millisecond * 10))
+ continue
}
- safi := "unicast"
+ grpcReq = m.grpcQueue[0]
+ grpcRes = grpcReq.ResponseCh
- route["prefix"] = p.Nlri.String()
- route["address_family"] = afi
- route["sub_address_family"] = safi
- route["bgp_nexthops"] = nexthop
- route["path_attributes"] = pathAttr
- break
+ m.grpcChs.grpcCh <- grpcReq
+ m.grpcQueue = m.grpcQueue[1:]
+ r := <-grpcRes
+
+ if err := r.Err(); err != nil {
+ log.Errorf("operation failed. err: %s", err)
+ }
}
+ return nil
+}
- return route, IsWithdraw, nil
+func (m *OpsManager) Serve() error {
+ go m.OpsServe()
+ go m.GobgpServe()
+ return nil
}
type OpsOperation struct {
@@ -544,9 +660,9 @@ type OpsManager struct {
grpcChs *GrpcChs
opsChs *OpsChs
grpcQueue []*server.GrpcRequest
-// opsQueue []*OpsOperation
- bgpReady bool
- cache map[string]map[string]ovsdb.Row
+ // opsQueue []*OpsOperation
+ bgpReady bool
+ cache map[string]map[string]ovsdb.Row
}
func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
@@ -554,11 +670,11 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
if err != nil {
return nil, err
}
-// oQueue := make([]*OpsOperation, 0)
+ // oQueue := make([]*OpsOperation, 0)
gQueue := make([]*server.GrpcRequest, 0)
grpcChs := &GrpcChs{
grpcCh: grpcCh,
- grpcUpdateCh: grpcCh,
+ grpcUpdateCh: grpcCh,
monitorAlready: make(chan int),
}
opsUpdateCh := make(chan *ovsdb.TableUpdates)
@@ -569,12 +685,12 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
opsUpdateCh: opsUpdateCh,
}
return &OpsManager{
- ops: ops,
- grpcChs: grpcChs,
- opsChs: opsChs,
- grpcQueue: gQueue,
-// opsQueue: oQueue,
- bgpReady: false,
- cache: make(map[string]map[string]ovsdb.Row),
+ ops: ops,
+ grpcChs: grpcChs,
+ opsChs: opsChs,
+ grpcQueue: gQueue,
+ // opsQueue: oQueue,
+ bgpReady: false,
+ cache: make(map[string]map[string]ovsdb.Row),
}, nil
}