diff options
author | Naoto Hanaue <hanaue.naoto@po.ntts.co.jp> | 2016-02-12 20:47:11 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-02-19 21:14:59 -0800 |
commit | d400ce9b12659dd73372584badf0de7af73b2747 (patch) | |
tree | 6193e52d4ce8d329daf6c72b33ab08bae26e8071 | |
parent | 1e749caf0c926b61777ec8648c31e81725d2baf3 (diff) |
ops: add feature that delete gobgp route
-rw-r--r-- | openswitch/openswitch.go | 189 |
1 files changed, 107 insertions, 82 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go index b618f48c..293f3d11 100644 --- a/openswitch/openswitch.go +++ b/openswitch/openswitch.go @@ -135,18 +135,26 @@ func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) { return asn, uuid.Nil, fmt.Errorf("not found") } -func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { +func parseRouteToGobgp(route ovsdb.RowUpdate, nexthops map[string]ovsdb.Row) (*api.Path, bool, error) { var nlri bgp.AddrPrefixInterface path := &api.Path{ Pattrs: make([][]byte, 0), } - - prefix := src.New.Fields["prefix"].(string) - safi := src.New.Fields["sub_address_family"].(string) - afi := src.New.Fields["address_family"].(string) - m := src.New.Fields["metric"].(float64) - nh := src.New.Fields["bgp_nexthops"].(ovsdb.OvsSet).GoSet - attrs := src.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap + isWithdraw := false + prefix := route.New.Fields["prefix"].(string) + safi := route.New.Fields["sub_address_family"].(string) + afi := route.New.Fields["address_family"].(string) + m := route.New.Fields["metric"].(float64) + attrs := route.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap + nh := make([]interface{}, 0) + nhId, ok := route.New.Fields["bgp_nexthops"].(ovsdb.UUID) + if ok { + for id, n := range nexthops { + if id == nhId.GoUuid { + nh = append(nh, n.Fields["ip_address"]) + } + } + } nexthop := "0.0.0.0" if afi == "ipv6" { @@ -156,10 +164,12 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { log.Debug("nexthop addres does not exist") } else if len(nh) == 1 { if net.ParseIP(nh[0].(string)) == nil { - return nil, fmt.Errorf("invalid nexthop address") + return nil, isWithdraw, fmt.Errorf("invalid nexthop address") + } else { + nexthop = nh[0].(string) } } else { - return nil, fmt.Errorf("route has multiple nexthop address") + return nil, isWithdraw, fmt.Errorf("route has multiple nexthop address") } med, _ := bgp.NewPathAttributeMultiExitDisc(uint32(m)).Serialize() @@ -167,7 +177,7 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { lpref, err := strconv.Atoi(attrs["BGP_loc_pref"].(string)) if err != nil { - return nil, err + return nil, isWithdraw, err } localPref, _ := bgp.NewPathAttributeLocalPref(uint32(lpref)).Serialize() path.Pattrs = append(path.Pattrs, localPref) @@ -181,7 +191,7 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { case "?": origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE default: - return nil, fmt.Errorf("invalid origin") + return nil, isWithdraw, fmt.Errorf("invalid origin") } origin, _ := bgp.NewPathAttributeOrigin(uint8(origin_t)).Serialize() path.Pattrs = append(path.Pattrs, origin) @@ -190,22 +200,22 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { case "ipv4", "ipv6": ip, net, err := net.ParseCIDR(prefix) if err != nil { - return nil, err + return nil, isWithdraw, err } ones, _ := net.Mask.Size() if afi == "ipv4" { if ip.To4() == nil { - return nil, fmt.Errorf("invalid ipv4 prefix") + return nil, isWithdraw, fmt.Errorf("invalid ipv4 prefix") } nlri = bgp.NewIPAddrPrefix(uint8(ones), ip.String()) } else { if ip.To16() == nil { - return nil, fmt.Errorf("invalid ipv6 prefix") + return nil, isWithdraw, fmt.Errorf("invalid ipv6 prefix") } nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String()) } default: - return nil, fmt.Errorf("unsupported address family: %s", afi) + return nil, isWithdraw, fmt.Errorf("unsupported address family: %s", afi) } if afi == "ipv4" && safi == "unicast" { @@ -216,7 +226,11 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) { mpreach, _ := bgp.NewPathAttributeMpReachNLRI(nexthop, []bgp.AddrPrefixInterface{nlri}).Serialize() path.Pattrs = append(path.Pattrs, mpreach) } - return path, nil + if attrs["BGP_flags"].(string) == "512" { + isWithdraw = true + } + + return path, isWithdraw, nil } func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, error) { @@ -255,7 +269,7 @@ func (m *OpsManager) handleVrfUpdate(update ovsdb.TableUpdate) *server.GrpcReque } else if _, ok := v.Old.Fields["bgp_routers"]; ok { _, _, err := m.getBGPRouterUUID() if err != nil { - return server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{ + return server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "del", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{ Operation: api.Operation_DEL, }) } @@ -286,7 +300,7 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G log.Debugf("router-id is not configured yet") return nil } - reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{ + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "add", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{ Operation: api.Operation_ADD, Global: &api.Global{ As: asn, @@ -299,7 +313,7 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G newNeighMap := v.New.Fields["bgp_neighbors"].(ovsdb.OvsMap).GoMap for k, _ := range oldNeighMap { if _, ok := newNeighMap[k]; !ok { - reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "", bgp.RouteFamily(0), &api.ModNeighborArguments{ + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "del", bgp.RouteFamily(0), &api.ModNeighborArguments{ Operation: api.Operation_DEL, Peer: &api.Peer{ Conf: &api.PeerConf{ @@ -330,7 +344,7 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr log.Debugf("remote-as is not configured yet") continue } - reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "", bgp.RouteFamily(0), &api.ModNeighborArguments{ + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "add", bgp.RouteFamily(0), &api.ModNeighborArguments{ Operation: api.Operation_ADD, Peer: &api.Peer{ Conf: &api.PeerConf{ @@ -355,17 +369,27 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR } idx := vrf.(ovsdb.UUID).GoUuid if uuid.Equal(id, uuid.FromStringOrNil(idx)) { - path, err := parseRouteToGobgp(v) + path, isWithdraw, err := parseRouteToGobgp(v, m.cache["BGP_Nexthop"]) if err != nil { - log.Error("faild to parse path") + log.Error("faild to parse path: %v", err) return nil } - reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "", bgp.RouteFamily(0), &api.ModPathArguments{ - Operation: api.Operation_ADD, - Resource: api.Resource_GLOBAL, - Name: "", - Path: path, - })) + if isWithdraw { + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "del", bgp.RouteFamily(0), &api.ModPathArguments{ + Operation: api.Operation_DEL, + Resource: api.Resource_GLOBAL, + Name: "", + Path: path, + })) + log.Debug("advertised route is withdraw") + } else { + reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "add", bgp.RouteFamily(0), &api.ModPathArguments{ + Operation: api.Operation_ADD, + Resource: api.Resource_GLOBAL, + Name: "", + Path: path, + })) + } } } return reqs @@ -374,9 +398,6 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) { route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"} IsWithdraw := false - // route := make(map[string]interface{}) - // route["metric"] = 0 - // route["peer"] = "Remote announcement" for _, p := range pl { var nexthop string pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"} @@ -405,7 +426,7 @@ func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) { } pathAttr["BGP_origin"] = origin case bgp.BGP_ATTR_TYPE_LOCAL_PREF: - pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value) + pathAttr["BGP_loc_pref"] = fmt.Sprintf("%v", a.(*bgp.PathAttributeLocalPref).Value) case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC: route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value default: @@ -533,16 +554,18 @@ func (m *OpsManager) Transact(operations []ovsdb.Operation) error { return nil } -func (m *OpsManager) GobgpMonitor() { - <-m.grpcChs.monitorAlready +func (m *OpsManager) GobgpMonitor(ready *bool) { time.Sleep(time.Duration(time.Second * 2)) - reqCh := m.grpcChs.grpcUpdateCh + reqCh := m.grpcCh family := bgp.RF_IPv4_UC arg := &api.Arguments{ Resource: api.Resource_GLOBAL, Family: uint32(family), } for { + if !*ready { + return + } req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg) reqCh <- req res := <-req.ResponseCh @@ -558,21 +581,58 @@ func (m *OpsManager) GobgpMonitor() { if err != nil { log.Errorf("%v", err) } - m.opsChs.opsCh <- o + m.opsCh <- o } } +func (m *OpsManager) GobgpServe() error { + monitorReady := false + for { + var grpcReq *server.GrpcRequest + var grpcRes chan *server.GrpcResponse + if len(m.grpcQueue) < 1 { + time.Sleep(time.Duration(time.Millisecond * 10)) + continue + } + grpcReq = m.grpcQueue[0] + grpcRes = grpcReq.ResponseCh + + m.grpcCh <- grpcReq + m.grpcQueue = m.grpcQueue[1:] + r := <-grpcRes + + if err := r.Err(); err != nil { + log.Errorf("operation failed. err: %s", err) + } + if err := r.Err(); err != nil { + log.Errorf("operation failed. err: %s", err) + } else { + if monitorReady { + if grpcReq.RequestType == server.REQ_MOD_GLOBAL_CONFIG && grpcReq.Name == "del" { + monitorReady = false + } + } else { + if grpcReq.RequestType == server.REQ_MOD_GLOBAL_CONFIG && grpcReq.Name == "add" { + monitorReady = true + go m.GobgpMonitor(&monitorReady) + } + } + } + } + return nil +} + func (m *OpsManager) OpsServe() error { initial, err := m.ops.MonitorAll(TARGET_TABLE, "") if err != nil { return err } go func() { - m.opsChs.opsUpdateCh <- initial + m.opsUpdateCh <- initial }() for { select { - case updates := <-m.opsChs.opsUpdateCh: + case updates := <-m.opsUpdateCh: m.populateCache(*updates) t, ok := updates.Updates["VRF"] if ok { @@ -587,7 +647,6 @@ func (m *OpsManager) OpsServe() error { if len(routerReqs) > 0 { m.grpcQueue = append(m.grpcQueue, routerReqs...) } - m.grpcChs.monitorAlready <- 1 } t, ok = updates.Updates["BGP_Neighbor"] if ok { @@ -603,7 +662,7 @@ func (m *OpsManager) OpsServe() error { m.grpcQueue = append(m.grpcQueue, routeReqs...) } } - case r := <-m.opsChs.opsCh: + case r := <-m.opsCh: if err := m.Transact(r.operations); err != nil { } } @@ -611,29 +670,6 @@ func (m *OpsManager) OpsServe() error { return nil } -func (m *OpsManager) GobgpServe() error { - go m.GobgpMonitor() - for { - var grpcReq *server.GrpcRequest - var grpcRes chan *server.GrpcResponse - if len(m.grpcQueue) < 1 { - time.Sleep(time.Duration(time.Millisecond * 10)) - continue - } - grpcReq = m.grpcQueue[0] - grpcRes = grpcReq.ResponseCh - - m.grpcChs.grpcCh <- grpcReq - m.grpcQueue = m.grpcQueue[1:] - r := <-grpcRes - - if err := r.Err(); err != nil { - log.Errorf("operation failed. err: %s", err) - } - } - return nil -} - func (m *OpsManager) Serve() error { go m.OpsServe() go m.GobgpServe() @@ -646,8 +682,6 @@ type OpsOperation struct { type GrpcChs struct { grpcCh chan *server.GrpcRequest - grpcUpdateCh chan *server.GrpcRequest - monitorAlready chan int } type OpsChs struct { @@ -657,10 +691,10 @@ type OpsChs struct { type OpsManager struct { ops *ovsdb.OvsdbClient - grpcChs *GrpcChs - opsChs *OpsChs + grpcCh chan *server.GrpcRequest + opsCh chan *OpsOperation + opsUpdateCh chan *ovsdb.TableUpdates grpcQueue []*server.GrpcRequest - // opsQueue []*OpsOperation bgpReady bool cache map[string]map[string]ovsdb.Row } @@ -670,26 +704,17 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) { if err != nil { return nil, err } - // oQueue := make([]*OpsOperation, 0) gQueue := make([]*server.GrpcRequest, 0) - grpcChs := &GrpcChs{ - grpcCh: grpcCh, - grpcUpdateCh: grpcCh, - monitorAlready: make(chan int), - } opsUpdateCh := make(chan *ovsdb.TableUpdates) n := NewNotifier(opsUpdateCh) ops.Register(n) - opsChs := &OpsChs{ - opsCh: make(chan *OpsOperation, 1024), - opsUpdateCh: opsUpdateCh, - } + return &OpsManager{ ops: ops, - grpcChs: grpcChs, - opsChs: opsChs, + grpcCh: grpcCh, + opsCh: make(chan *OpsOperation, 1024), + opsUpdateCh: opsUpdateCh, grpcQueue: gQueue, - // opsQueue: oQueue, bgpReady: false, cache: make(map[string]map[string]ovsdb.Row), }, nil |