diff options
author | Naoto Hanaue <hanaue.naoto@po.ntts.co.jp> | 2016-02-10 09:20:04 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-02-19 21:14:59 -0800 |
commit | 1e749caf0c926b61777ec8648c31e81725d2baf3 (patch) | |
tree | 73c9653ba99a434be696ca42955caffa5f8fa005 | |
parent | e3061382d751ded78e5bec5078958eeaad0ce797 (diff) |
ops: add feature that insert ops route to gobgp
-rw-r--r-- | openswitch/openswitch.go | 528 |
1 files changed, 322 insertions, 206 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go index db37e255..b618f48c 100644 --- a/openswitch/openswitch.go +++ b/openswitch/openswitch.go @@ -26,6 +26,7 @@ import ( "github.com/satori/go.uuid" "net" "reflect" + "strconv" "time" ) @@ -134,6 +135,90 @@ func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) { return asn, uuid.Nil, fmt.Errorf("not found") } +func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { + var nlri bgp.AddrPrefixInterface + path := &api.Path{ + Pattrs: make([][]byte, 0), + } + + prefix := src.New.Fields["prefix"].(string) + safi := src.New.Fields["sub_address_family"].(string) + afi := src.New.Fields["address_family"].(string) + m := src.New.Fields["metric"].(float64) + nh := src.New.Fields["bgp_nexthops"].(ovsdb.OvsSet).GoSet + attrs := src.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap + + nexthop := "0.0.0.0" + if afi == "ipv6" { + nexthop = "::" + } + if len(nh) == 0 { + log.Debug("nexthop addres does not exist") + } else if len(nh) == 1 { + if net.ParseIP(nh[0].(string)) == nil { + return nil, fmt.Errorf("invalid nexthop address") + } + } else { + return nil, fmt.Errorf("route has multiple nexthop address") + } + + med, _ := bgp.NewPathAttributeMultiExitDisc(uint32(m)).Serialize() + path.Pattrs = append(path.Pattrs, med) + + lpref, err := strconv.Atoi(attrs["BGP_loc_pref"].(string)) + if err != nil { + return nil, err + } + localPref, _ := bgp.NewPathAttributeLocalPref(uint32(lpref)).Serialize() + path.Pattrs = append(path.Pattrs, localPref) + + var origin_t int + switch attrs["BGP_origin"].(string) { + case "i": + origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_IGP + case "e": + origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_EGP + case "?": + origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE + default: + return nil, fmt.Errorf("invalid origin") + } + origin, _ := bgp.NewPathAttributeOrigin(uint8(origin_t)).Serialize() + path.Pattrs = append(path.Pattrs, origin) + + switch afi { + case "ipv4", "ipv6": + ip, net, err := net.ParseCIDR(prefix) + if err != nil { + return nil, err + } + ones, _ := net.Mask.Size() + if afi == "ipv4" { + if ip.To4() == nil { + return nil, fmt.Errorf("invalid ipv4 prefix") + } + nlri = bgp.NewIPAddrPrefix(uint8(ones), ip.String()) + } else { + if ip.To16() == nil { + return nil, fmt.Errorf("invalid ipv6 prefix") + } + nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String()) + } + default: + return nil, fmt.Errorf("unsupported address family: %s", afi) + } + + if afi == "ipv4" && safi == "unicast" { + path.Nlri, _ = nlri.Serialize() + n, _ := bgp.NewPathAttributeNextHop(nexthop).Serialize() + path.Pattrs = append(path.Pattrs, n) + } else { + mpreach, _ := bgp.NewPathAttributeMpReachNLRI(nexthop, []bgp.AddrPrefixInterface{nlri}).Serialize() + path.Pattrs = append(path.Pattrs, mpreach) + } + return path, nil +} + func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, error) { global, ok := m.cache["BGP_Router"] if !ok { @@ -260,161 +345,89 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr return reqs } -func (m *OpsManager) Serve() error { - go m.OpsServe() - go m.GobgpServe() - return nil -} - -func (m *OpsManager) OpsServe() error { - initial, err := m.ops.MonitorAll(TARGET_TABLE, "") - if err != nil { - return err - } - go func() { - m.opsChs.opsUpdateCh <- initial - }() - for { - select { - case updates := <-m.opsChs.opsUpdateCh: - m.populateCache(*updates) - t, ok := updates.Updates["VRF"] - if ok { - req := m.handleVrfUpdate(t) - if req != nil { - m.grpcQueue = append(m.grpcQueue, req) - } - } - t, ok = updates.Updates["BGP_Router"] - if ok { - routerReqs := m.handleBgpRouterUpdate(t) - if len(routerReqs) > 0 { - m.grpcQueue = append(m.grpcQueue, routerReqs...) - } - m.grpcChs.monitorAlready <- 1 - } - t, ok = updates.Updates["BGP_Neighbor"] - if ok { - neighborReqs := m.handleNeighborUpdate(t) - if len(neighborReqs) > 0 { - m.grpcQueue = append(m.grpcQueue, neighborReqs...) - } - } - case r := <-m.opsChs.opsCh: - if err := m.Transact(r.operations); err != nil { - log.Errorf("%v", err) - } - } - } - return nil -} - -func (m *OpsManager) GobgpServe() error { - go m.GobgpMonitor() - for { - var grpcReq *server.GrpcRequest - var grpcRes chan *server.GrpcResponse - if len(m.grpcQueue) < 1{ - time.Sleep(time.Duration(time.Second *1)) +func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcRequest { + id, _ := m.getVrfUUID() + reqs := []*server.GrpcRequest{} + for _, v := range update.Rows { + vrf := v.New.Fields["vrf"] + if vrf == nil { continue } - grpcReq = m.grpcQueue[0] - grpcRes = grpcReq.ResponseCh - - m.grpcChs.grpcCh <- grpcReq - m.grpcQueue = m.grpcQueue[1:] - r := <-grpcRes - - - if err := r.Err(); err != nil { - log.Errorf("operation failed. err: %s", err) + idx := vrf.(ovsdb.UUID).GoUuid + if uuid.Equal(id, uuid.FromStringOrNil(idx)) { + path, err := parseRouteToGobgp(v) + if err != nil { + log.Error("faild to parse path") + return nil + } + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "", bgp.RouteFamily(0), &api.ModPathArguments{ + Operation: api.Operation_ADD, + Resource: api.Resource_GLOBAL, + Name: "", + Path: path, + })) } } - return nil + return reqs } -func (m *OpsManager) GobgpMonitor() { - <-m.grpcChs.monitorAlready - time.Sleep(time.Duration(time.Second *2)) - reqCh := m.grpcChs.grpcUpdateCh - family := bgp.RF_IPv4_UC - arg := &api.Arguments{ - Resource: api.Resource_GLOBAL, - Family: uint32(family), - } - for { - req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg) - reqCh <- req - res := <-req.ResponseCh - if err := res.Err(); err != nil { - log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err) - } - d := res.Data.(*api.Destination) - p, err := cmd.ApiStruct2Path(d.Paths[0]) - if err != nil { - log.Error("faild parse") - } - o, err := m.TransactPreparation(p) - if err != nil { - log.Errorf("%v", err) +func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) { + route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"} + IsWithdraw := false + // route := make(map[string]interface{}) + // route["metric"] = 0 + // route["peer"] = "Remote announcement" + for _, p := range pl { + var nexthop string + pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"} + for _, a := range p.PathAttrs { + switch a.GetType() { + case bgp.BGP_ATTR_TYPE_NEXT_HOP: + nexthop = a.(*bgp.PathAttributeNextHop).Value.String() + case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI: + n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop + if n != nil { + nexthop = n.String() + } else { + nexthop = "" + } + case bgp.BGP_ATTR_TYPE_AS_PATH: + pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String() + case bgp.BGP_ATTR_TYPE_ORIGIN: + origin := "-" + switch a.(*bgp.PathAttributeOrigin).Value[0] { + case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: + origin = "i" + case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: + origin = "e" + case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: + origin = "?" + } + pathAttr["BGP_origin"] = origin + case bgp.BGP_ATTR_TYPE_LOCAL_PREF: + pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value) + case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC: + route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value + default: + continue + } } - m.opsChs.opsCh <- o - } -} - -func (m *OpsManager) Transact(operations []ovsdb.Operation) error { - ops := m.ops - reply, err := ops.Transact(TARGET_TABLE, operations...) - if err != nil { - return err - } - if len(reply) < len(operations) { - return fmt.Errorf("number of replies should be atleast equal to number of Operations") - } - ok := true - for i, o := range reply { - if o.Error != "" && i < len(operations) { - log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i]) - ok = false - } else if o.Error != "" { - log.Errorf("transaction failed due to an error :", o.Error) - ok = false + IsWithdraw = p.IsWithdraw + afi := "ipv4" + if p.Nlri.AFI() != bgp.AFI_IP { + afi = "ipv6" } - } - if ok { - log.Debugf("bgp route addition successful") - } else { - return fmt.Errorf("bgp route addition failed") - } - return nil -} + safi := "unicast" -func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) { - v, err := m.getVrfUUID() - if err != nil { - return nil, err - } - opsRoute, isWithdraw, err := createOpsRoute(p) - if err != nil { - return nil, err + route["prefix"] = p.Nlri.String() + route["address_family"] = afi + route["sub_address_family"] = safi + route["bgp_nexthops"] = nexthop + route["path_attributes"] = pathAttr + break } - var o []ovsdb.Operation - if !isWithdraw { - insNextHopOp := insertNextHop(opsRoute) - insRouteOp, err := insertRoute(v, opsRoute) - if err != nil { - return nil, err - } - o = []ovsdb.Operation{insNextHopOp, insRouteOp} - } else { - delRouteOp := deleteRoute(opsRoute) - o = []ovsdb.Operation{delRouteOp} - } - oOperation := &OpsOperation{ - operations: o, - } - return oOperation, nil + return route, IsWithdraw, nil } func insertNextHop(opsRoute map[string]interface{}) ovsdb.Operation { @@ -455,7 +468,7 @@ func insertRoute(vrfId uuid.UUID, opsRoute map[string]interface{}) (ovsdb.Operat return insRouteOp, nil } -func deleteRoute(opsRoute map[string]interface{}) (ovsdb.Operation) { +func deleteRoute(opsRoute map[string]interface{}) ovsdb.Operation { condition := ovsdb.NewCondition("prefix", "==", opsRoute["prefix"]) deleteOp := ovsdb.Operation{ Op: "delete", @@ -465,63 +478,166 @@ func deleteRoute(opsRoute map[string]interface{}) (ovsdb.Operation) { return deleteOp } -func createOpsRoute(pl []*cmd.Path) (map[string]interface{}, bool, error) { - route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"} - IsWithdraw := false -// route := make(map[string]interface{}) -// route["metric"] = 0 -// route["peer"] = "Remote announcement" - for _, p := range pl { - var nexthop string - pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"} - for _, a := range p.PathAttrs { - switch a.GetType() { - case bgp.BGP_ATTR_TYPE_NEXT_HOP: - nexthop = a.(*bgp.PathAttributeNextHop).Value.String() - case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI: - n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop - if n != nil { - nexthop = n.String() - } else { - nexthop = "" +func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) { + v, err := m.getVrfUUID() + if err != nil { + return nil, err + } + opsRoute, isWithdraw, err := parseRouteToOps(p) + if err != nil { + return nil, err + } + + var o []ovsdb.Operation + if !isWithdraw { + insNextHopOp := insertNextHop(opsRoute) + insRouteOp, err := insertRoute(v, opsRoute) + if err != nil { + return nil, err + } + o = []ovsdb.Operation{insNextHopOp, insRouteOp} + } else { + delRouteOp := deleteRoute(opsRoute) + o = []ovsdb.Operation{delRouteOp} + } + oOperation := &OpsOperation{ + operations: o, + } + return oOperation, nil +} + +func (m *OpsManager) Transact(operations []ovsdb.Operation) error { + ops := m.ops + reply, err := ops.Transact(TARGET_TABLE, operations...) + if err != nil { + return err + } + if len(reply) < len(operations) { + return fmt.Errorf("number of replies should be atleast equal to number of Operations") + } + ok := true + for i, o := range reply { + if o.Error != "" && i < len(operations) { + log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i]) + ok = false + } else if o.Error != "" { + log.Errorf("transaction failed due to an error :", o.Error) + ok = false + } + } + if ok { + log.Debugf("bgp route update successful") + } else { + return fmt.Errorf("bgp route update failed") + } + return nil +} + +func (m *OpsManager) GobgpMonitor() { + <-m.grpcChs.monitorAlready + time.Sleep(time.Duration(time.Second * 2)) + reqCh := m.grpcChs.grpcUpdateCh + family := bgp.RF_IPv4_UC + arg := &api.Arguments{ + Resource: api.Resource_GLOBAL, + Family: uint32(family), + } + for { + req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg) + reqCh <- req + res := <-req.ResponseCh + if err := res.Err(); err != nil { + log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err) + } + d := res.Data.(*api.Destination) + p, err := cmd.ApiStruct2Path(d.Paths[0]) + if err != nil { + log.Error("faild parse") + } + o, err := m.TransactPreparation(p) + if err != nil { + log.Errorf("%v", err) + } + m.opsChs.opsCh <- o + } +} + +func (m *OpsManager) OpsServe() error { + initial, err := m.ops.MonitorAll(TARGET_TABLE, "") + if err != nil { + return err + } + go func() { + m.opsChs.opsUpdateCh <- initial + }() + for { + select { + case updates := <-m.opsChs.opsUpdateCh: + m.populateCache(*updates) + t, ok := updates.Updates["VRF"] + if ok { + req := m.handleVrfUpdate(t) + if req != nil { + m.grpcQueue = append(m.grpcQueue, req) } - case bgp.BGP_ATTR_TYPE_AS_PATH: - pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String() - case bgp.BGP_ATTR_TYPE_ORIGIN: - origin := "-" - switch a.(*bgp.PathAttributeOrigin).Value[0] { - case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: - origin = "i" - case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: - origin = "e" - case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: - origin = "?" + } + t, ok = updates.Updates["BGP_Router"] + if ok { + routerReqs := m.handleBgpRouterUpdate(t) + if len(routerReqs) > 0 { + m.grpcQueue = append(m.grpcQueue, routerReqs...) } - pathAttr["BGP_origin"] = origin - case bgp.BGP_ATTR_TYPE_LOCAL_PREF: - pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value) - case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC: - route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value - default: - continue + m.grpcChs.monitorAlready <- 1 + } + t, ok = updates.Updates["BGP_Neighbor"] + if ok { + neighborReqs := m.handleNeighborUpdate(t) + if len(neighborReqs) > 0 { + m.grpcQueue = append(m.grpcQueue, neighborReqs...) + } + } + t, ok = updates.Updates["BGP_Route"] + if ok { + routeReqs := m.handleRouteUpdate(t) + if len(routeReqs) > 0 { + m.grpcQueue = append(m.grpcQueue, routeReqs...) + } + } + case r := <-m.opsChs.opsCh: + if err := m.Transact(r.operations); err != nil { } } - IsWithdraw = p.IsWithdraw - afi := "ipv4" - if p.Nlri.AFI() != bgp.AFI_IP { - afi = "ipv6" + } + return nil +} + +func (m *OpsManager) GobgpServe() error { + go m.GobgpMonitor() + for { + var grpcReq *server.GrpcRequest + var grpcRes chan *server.GrpcResponse + if len(m.grpcQueue) < 1 { + time.Sleep(time.Duration(time.Millisecond * 10)) + continue } - safi := "unicast" + grpcReq = m.grpcQueue[0] + grpcRes = grpcReq.ResponseCh - route["prefix"] = p.Nlri.String() - route["address_family"] = afi - route["sub_address_family"] = safi - route["bgp_nexthops"] = nexthop - route["path_attributes"] = pathAttr - break + m.grpcChs.grpcCh <- grpcReq + m.grpcQueue = m.grpcQueue[1:] + r := <-grpcRes + + if err := r.Err(); err != nil { + log.Errorf("operation failed. err: %s", err) + } } + return nil +} - return route, IsWithdraw, nil +func (m *OpsManager) Serve() error { + go m.OpsServe() + go m.GobgpServe() + return nil } type OpsOperation struct { @@ -544,9 +660,9 @@ type OpsManager struct { grpcChs *GrpcChs opsChs *OpsChs grpcQueue []*server.GrpcRequest -// opsQueue []*OpsOperation - bgpReady bool - cache map[string]map[string]ovsdb.Row + // opsQueue []*OpsOperation + bgpReady bool + cache map[string]map[string]ovsdb.Row } func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) { @@ -554,11 +670,11 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) { if err != nil { return nil, err } -// oQueue := make([]*OpsOperation, 0) + // oQueue := make([]*OpsOperation, 0) gQueue := make([]*server.GrpcRequest, 0) grpcChs := &GrpcChs{ grpcCh: grpcCh, - grpcUpdateCh: grpcCh, + grpcUpdateCh: grpcCh, monitorAlready: make(chan int), } opsUpdateCh := make(chan *ovsdb.TableUpdates) @@ -569,12 +685,12 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) { opsUpdateCh: opsUpdateCh, } return &OpsManager{ - ops: ops, - grpcChs: grpcChs, - opsChs: opsChs, - grpcQueue: gQueue, -// opsQueue: oQueue, - bgpReady: false, - cache: make(map[string]map[string]ovsdb.Row), + ops: ops, + grpcChs: grpcChs, + opsChs: opsChs, + grpcQueue: gQueue, + // opsQueue: oQueue, + bgpReady: false, + cache: make(map[string]map[string]ovsdb.Row), }, nil } |