summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--openswitch/openswitch.go189
1 files changed, 107 insertions, 82 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go
index b618f48c..293f3d11 100644
--- a/openswitch/openswitch.go
+++ b/openswitch/openswitch.go
@@ -135,18 +135,26 @@ func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) {
return asn, uuid.Nil, fmt.Errorf("not found")
}
-func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
+func parseRouteToGobgp(route ovsdb.RowUpdate, nexthops map[string]ovsdb.Row) (*api.Path, bool, error) {
var nlri bgp.AddrPrefixInterface
path := &api.Path{
Pattrs: make([][]byte, 0),
}
-
- prefix := src.New.Fields["prefix"].(string)
- safi := src.New.Fields["sub_address_family"].(string)
- afi := src.New.Fields["address_family"].(string)
- m := src.New.Fields["metric"].(float64)
- nh := src.New.Fields["bgp_nexthops"].(ovsdb.OvsSet).GoSet
- attrs := src.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap
+ isWithdraw := false
+ prefix := route.New.Fields["prefix"].(string)
+ safi := route.New.Fields["sub_address_family"].(string)
+ afi := route.New.Fields["address_family"].(string)
+ m := route.New.Fields["metric"].(float64)
+ attrs := route.New.Fields["path_attributes"].(ovsdb.OvsMap).GoMap
+ nh := make([]interface{}, 0)
+ nhId, ok := route.New.Fields["bgp_nexthops"].(ovsdb.UUID)
+ if ok {
+ for id, n := range nexthops {
+ if id == nhId.GoUuid {
+ nh = append(nh, n.Fields["ip_address"])
+ }
+ }
+ }
nexthop := "0.0.0.0"
if afi == "ipv6" {
@@ -156,10 +164,12 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
log.Debug("nexthop addres does not exist")
} else if len(nh) == 1 {
if net.ParseIP(nh[0].(string)) == nil {
- return nil, fmt.Errorf("invalid nexthop address")
+ return nil, isWithdraw, fmt.Errorf("invalid nexthop address")
+ } else {
+ nexthop = nh[0].(string)
}
} else {
- return nil, fmt.Errorf("route has multiple nexthop address")
+ return nil, isWithdraw, fmt.Errorf("route has multiple nexthop address")
}
med, _ := bgp.NewPathAttributeMultiExitDisc(uint32(m)).Serialize()
@@ -167,7 +177,7 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
lpref, err := strconv.Atoi(attrs["BGP_loc_pref"].(string))
if err != nil {
- return nil, err
+ return nil, isWithdraw, err
}
localPref, _ := bgp.NewPathAttributeLocalPref(uint32(lpref)).Serialize()
path.Pattrs = append(path.Pattrs, localPref)
@@ -181,7 +191,7 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
case "?":
origin_t = bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE
default:
- return nil, fmt.Errorf("invalid origin")
+ return nil, isWithdraw, fmt.Errorf("invalid origin")
}
origin, _ := bgp.NewPathAttributeOrigin(uint8(origin_t)).Serialize()
path.Pattrs = append(path.Pattrs, origin)
@@ -190,22 +200,22 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
case "ipv4", "ipv6":
ip, net, err := net.ParseCIDR(prefix)
if err != nil {
- return nil, err
+ return nil, isWithdraw, err
}
ones, _ := net.Mask.Size()
if afi == "ipv4" {
if ip.To4() == nil {
- return nil, fmt.Errorf("invalid ipv4 prefix")
+ return nil, isWithdraw, fmt.Errorf("invalid ipv4 prefix")
}
nlri = bgp.NewIPAddrPrefix(uint8(ones), ip.String())
} else {
if ip.To16() == nil {
- return nil, fmt.Errorf("invalid ipv6 prefix")
+ return nil, isWithdraw, fmt.Errorf("invalid ipv6 prefix")
}
nlri = bgp.NewIPv6AddrPrefix(uint8(ones), ip.String())
}
default:
- return nil, fmt.Errorf("unsupported address family: %s", afi)
+ return nil, isWithdraw, fmt.Errorf("unsupported address family: %s", afi)
}
if afi == "ipv4" && safi == "unicast" {
@@ -216,7 +226,11 @@ func parseRouteToGobgp(src ovsdb.RowUpdate) (*api.Path, error) {
mpreach, _ := bgp.NewPathAttributeMpReachNLRI(nexthop, []bgp.AddrPrefixInterface{nlri}).Serialize()
path.Pattrs = append(path.Pattrs, mpreach)
}
- return path, nil
+ if attrs["BGP_flags"].(string) == "512" {
+ isWithdraw = true
+ }
+
+ return path, isWithdraw, nil
}
func (m *OpsManager) getBGPNeighborUUIDs(id uuid.UUID) ([]net.IP, []uuid.UUID, error) {
@@ -255,7 +269,7 @@ func (m *OpsManager) handleVrfUpdate(update ovsdb.TableUpdate) *server.GrpcReque
} else if _, ok := v.Old.Fields["bgp_routers"]; ok {
_, _, err := m.getBGPRouterUUID()
if err != nil {
- return server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{
+ return server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "del", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{
Operation: api.Operation_DEL,
})
}
@@ -286,7 +300,7 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G
log.Debugf("router-id is not configured yet")
return nil
}
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_GLOBAL_CONFIG, "add", bgp.RouteFamily(0), &api.ModGlobalConfigArguments{
Operation: api.Operation_ADD,
Global: &api.Global{
As: asn,
@@ -299,7 +313,7 @@ func (m *OpsManager) handleBgpRouterUpdate(update ovsdb.TableUpdate) []*server.G
newNeighMap := v.New.Fields["bgp_neighbors"].(ovsdb.OvsMap).GoMap
for k, _ := range oldNeighMap {
if _, ok := newNeighMap[k]; !ok {
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "", bgp.RouteFamily(0), &api.ModNeighborArguments{
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "del", bgp.RouteFamily(0), &api.ModNeighborArguments{
Operation: api.Operation_DEL,
Peer: &api.Peer{
Conf: &api.PeerConf{
@@ -330,7 +344,7 @@ func (m *OpsManager) handleNeighborUpdate(update ovsdb.TableUpdate) []*server.Gr
log.Debugf("remote-as is not configured yet")
continue
}
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "", bgp.RouteFamily(0), &api.ModNeighborArguments{
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_NEIGHBOR, "add", bgp.RouteFamily(0), &api.ModNeighborArguments{
Operation: api.Operation_ADD,
Peer: &api.Peer{
Conf: &api.PeerConf{
@@ -355,17 +369,27 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR
}
idx := vrf.(ovsdb.UUID).GoUuid
if uuid.Equal(id, uuid.FromStringOrNil(idx)) {
- path, err := parseRouteToGobgp(v)
+ path, isWithdraw, err := parseRouteToGobgp(v, m.cache["BGP_Nexthop"])
if err != nil {
- log.Error("faild to parse path")
+ log.Error("faild to parse path: %v", err)
return nil
}
- reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "", bgp.RouteFamily(0), &api.ModPathArguments{
- Operation: api.Operation_ADD,
- Resource: api.Resource_GLOBAL,
- Name: "",
- Path: path,
- }))
+ if isWithdraw {
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "del", bgp.RouteFamily(0), &api.ModPathArguments{
+ Operation: api.Operation_DEL,
+ Resource: api.Resource_GLOBAL,
+ Name: "",
+ Path: path,
+ }))
+ log.Debug("advertised route is withdraw")
+ } else {
+ reqs = append(reqs, server.NewGrpcRequest(server.REQ_MOD_PATH, "add", bgp.RouteFamily(0), &api.ModPathArguments{
+ Operation: api.Operation_ADD,
+ Resource: api.Resource_GLOBAL,
+ Name: "",
+ Path: path,
+ }))
+ }
}
}
return reqs
@@ -374,9 +398,6 @@ func (m *OpsManager) handleRouteUpdate(update ovsdb.TableUpdate) []*server.GrpcR
func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) {
route := map[string]interface{}{"metric": 0, "peer": "Remote announcement"}
IsWithdraw := false
- // route := make(map[string]interface{})
- // route["metric"] = 0
- // route["peer"] = "Remote announcement"
for _, p := range pl {
var nexthop string
pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"}
@@ -405,7 +426,7 @@ func parseRouteToOps(pl []*cmd.Path) (map[string]interface{}, bool, error) {
}
pathAttr["BGP_origin"] = origin
case bgp.BGP_ATTR_TYPE_LOCAL_PREF:
- pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value)
+ pathAttr["BGP_loc_pref"] = fmt.Sprintf("%v", a.(*bgp.PathAttributeLocalPref).Value)
case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC:
route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value
default:
@@ -533,16 +554,18 @@ func (m *OpsManager) Transact(operations []ovsdb.Operation) error {
return nil
}
-func (m *OpsManager) GobgpMonitor() {
- <-m.grpcChs.monitorAlready
+func (m *OpsManager) GobgpMonitor(ready *bool) {
time.Sleep(time.Duration(time.Second * 2))
- reqCh := m.grpcChs.grpcUpdateCh
+ reqCh := m.grpcCh
family := bgp.RF_IPv4_UC
arg := &api.Arguments{
Resource: api.Resource_GLOBAL,
Family: uint32(family),
}
for {
+ if !*ready {
+ return
+ }
req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg)
reqCh <- req
res := <-req.ResponseCh
@@ -558,21 +581,58 @@ func (m *OpsManager) GobgpMonitor() {
if err != nil {
log.Errorf("%v", err)
}
- m.opsChs.opsCh <- o
+ m.opsCh <- o
}
}
+func (m *OpsManager) GobgpServe() error {
+ monitorReady := false
+ for {
+ var grpcReq *server.GrpcRequest
+ var grpcRes chan *server.GrpcResponse
+ if len(m.grpcQueue) < 1 {
+ time.Sleep(time.Duration(time.Millisecond * 10))
+ continue
+ }
+ grpcReq = m.grpcQueue[0]
+ grpcRes = grpcReq.ResponseCh
+
+ m.grpcCh <- grpcReq
+ m.grpcQueue = m.grpcQueue[1:]
+ r := <-grpcRes
+
+ if err := r.Err(); err != nil {
+ log.Errorf("operation failed. err: %s", err)
+ }
+ if err := r.Err(); err != nil {
+ log.Errorf("operation failed. err: %s", err)
+ } else {
+ if monitorReady {
+ if grpcReq.RequestType == server.REQ_MOD_GLOBAL_CONFIG && grpcReq.Name == "del" {
+ monitorReady = false
+ }
+ } else {
+ if grpcReq.RequestType == server.REQ_MOD_GLOBAL_CONFIG && grpcReq.Name == "add" {
+ monitorReady = true
+ go m.GobgpMonitor(&monitorReady)
+ }
+ }
+ }
+ }
+ return nil
+}
+
func (m *OpsManager) OpsServe() error {
initial, err := m.ops.MonitorAll(TARGET_TABLE, "")
if err != nil {
return err
}
go func() {
- m.opsChs.opsUpdateCh <- initial
+ m.opsUpdateCh <- initial
}()
for {
select {
- case updates := <-m.opsChs.opsUpdateCh:
+ case updates := <-m.opsUpdateCh:
m.populateCache(*updates)
t, ok := updates.Updates["VRF"]
if ok {
@@ -587,7 +647,6 @@ func (m *OpsManager) OpsServe() error {
if len(routerReqs) > 0 {
m.grpcQueue = append(m.grpcQueue, routerReqs...)
}
- m.grpcChs.monitorAlready <- 1
}
t, ok = updates.Updates["BGP_Neighbor"]
if ok {
@@ -603,7 +662,7 @@ func (m *OpsManager) OpsServe() error {
m.grpcQueue = append(m.grpcQueue, routeReqs...)
}
}
- case r := <-m.opsChs.opsCh:
+ case r := <-m.opsCh:
if err := m.Transact(r.operations); err != nil {
}
}
@@ -611,29 +670,6 @@ func (m *OpsManager) OpsServe() error {
return nil
}
-func (m *OpsManager) GobgpServe() error {
- go m.GobgpMonitor()
- for {
- var grpcReq *server.GrpcRequest
- var grpcRes chan *server.GrpcResponse
- if len(m.grpcQueue) < 1 {
- time.Sleep(time.Duration(time.Millisecond * 10))
- continue
- }
- grpcReq = m.grpcQueue[0]
- grpcRes = grpcReq.ResponseCh
-
- m.grpcChs.grpcCh <- grpcReq
- m.grpcQueue = m.grpcQueue[1:]
- r := <-grpcRes
-
- if err := r.Err(); err != nil {
- log.Errorf("operation failed. err: %s", err)
- }
- }
- return nil
-}
-
func (m *OpsManager) Serve() error {
go m.OpsServe()
go m.GobgpServe()
@@ -646,8 +682,6 @@ type OpsOperation struct {
type GrpcChs struct {
grpcCh chan *server.GrpcRequest
- grpcUpdateCh chan *server.GrpcRequest
- monitorAlready chan int
}
type OpsChs struct {
@@ -657,10 +691,10 @@ type OpsChs struct {
type OpsManager struct {
ops *ovsdb.OvsdbClient
- grpcChs *GrpcChs
- opsChs *OpsChs
+ grpcCh chan *server.GrpcRequest
+ opsCh chan *OpsOperation
+ opsUpdateCh chan *ovsdb.TableUpdates
grpcQueue []*server.GrpcRequest
- // opsQueue []*OpsOperation
bgpReady bool
cache map[string]map[string]ovsdb.Row
}
@@ -670,26 +704,17 @@ func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) {
if err != nil {
return nil, err
}
- // oQueue := make([]*OpsOperation, 0)
gQueue := make([]*server.GrpcRequest, 0)
- grpcChs := &GrpcChs{
- grpcCh: grpcCh,
- grpcUpdateCh: grpcCh,
- monitorAlready: make(chan int),
- }
opsUpdateCh := make(chan *ovsdb.TableUpdates)
n := NewNotifier(opsUpdateCh)
ops.Register(n)
- opsChs := &OpsChs{
- opsCh: make(chan *OpsOperation, 1024),
- opsUpdateCh: opsUpdateCh,
- }
+
return &OpsManager{
ops: ops,
- grpcChs: grpcChs,
- opsChs: opsChs,
+ grpcCh: grpcCh,
+ opsCh: make(chan *OpsOperation, 1024),
+ opsUpdateCh: opsUpdateCh,
grpcQueue: gQueue,
- // opsQueue: oQueue,
bgpReady: false,
cache: make(map[string]map[string]ovsdb.Row),
}, nil