diff options
author | Naoto Hanaue <hanaue.naoto@po.ntts.co.jp> | 2016-02-03 21:05:19 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-02-19 21:14:59 -0800 |
commit | 9f3840ccb8232c60abf3d6eab48d9b294eb7d8a0 (patch) | |
tree | c0b264d9a434fd57b5ec175e0ee26f100b227aee /openswitch/openswitch.go | |
parent | bbd5a2cfae44425f026b8a4567367450cb96d8f4 (diff) |
ops: add feature that insert gobgp route to ops
Diffstat (limited to 'openswitch/openswitch.go')
-rw-r--r-- | openswitch/openswitch.go | 331 |
1 files changed, 268 insertions, 63 deletions
diff --git a/openswitch/openswitch.go b/openswitch/openswitch.go index 4ed77f47..99da17f2 100644 --- a/openswitch/openswitch.go +++ b/openswitch/openswitch.go @@ -19,12 +19,23 @@ import ( "fmt" log "github.com/Sirupsen/logrus" api "github.com/osrg/gobgp/api" + "github.com/osrg/gobgp/gobgp/cmd" "github.com/osrg/gobgp/packet" "github.com/osrg/gobgp/server" ovsdb "github.com/osrg/libovsdb" "github.com/satori/go.uuid" "net" "reflect" + "time" +) + +const ( + TARGET_TABLE = "OpenSwitch" +) + +const ( + NEXTHOP_UUID = "nexthop" + ROUTE_UUID = "route" ) type Notifier struct { @@ -49,13 +60,6 @@ func NewNotifier(ch chan *ovsdb.TableUpdates) *Notifier { } } -type OpsManager struct { - client *ovsdb.OvsdbClient - grpcCh chan *server.GrpcRequest - updateCh chan *ovsdb.TableUpdates - cache map[string]map[string]ovsdb.Row -} - func (m *OpsManager) populateCache(updates ovsdb.TableUpdates) { for table, tableUpdate := range updates.Updates { if _, ok := m.cache[table]; !ok { @@ -84,11 +88,29 @@ func extractUUID(v interface{}) uuid.UUID { return uuid.FromStringOrNil(vv[1].(string)) } +func (m *OpsManager) getRootUUID() (uuid.UUID, error) { + for k, _ := range m.cache[TARGET_TABLE] { + return uuid.FromStringOrNil(k), nil + } + return uuid.Nil, fmt.Errorf("OpenSwitch table not found") +} + +func (m *OpsManager) getVrfUUID() (uuid.UUID, error) { + vrfs, ok := m.cache["VRF"] + if !ok { + return uuid.Nil, fmt.Errorf("VRF table not found") + } + for k, _ := range vrfs { + return uuid.FromStringOrNil(k), nil + } + return uuid.Nil, fmt.Errorf("vrf table not found") +} + func (m *OpsManager) getBGPRouterUUID() (uint32, uuid.UUID, error) { var asn uint32 vrfs, ok := m.cache["VRF"] if !ok { - return asn, uuid.Nil, fmt.Errorf("no vrf table") + return asn, uuid.Nil, fmt.Errorf("VRF table not found") } for _, v := range vrfs { if v.Fields["name"] == "vrf_default" { @@ -245,112 +267,295 @@ func (m *OpsManager) Serve() error { } func (m *OpsManager) OpsServe() error { - initial, err := m.client.MonitorAll("OpenSwitch", "") + initial, err := m.ops.MonitorAll(TARGET_TABLE, "") if err != nil { return err } go func() { - m.updateCh <- initial + m.opsChs.opsUpdateCh <- initial }() - reqs := make([]*server.GrpcRequest, 0) - ress := make([]*server.GrpcRequest, 0) for { - var req, res *server.GrpcRequest - var reqCh chan *server.GrpcRequest - var resCh chan *server.GrpcResponse - if len(reqs) > 0 { - req = reqs[0] - reqCh = m.grpcCh - } - if len(ress) > 0 { - res = ress[0] - resCh = res.ResponseCh - } select { - case updates := <-m.updateCh: + case updates := <-m.opsChs.opsUpdateCh: m.populateCache(*updates) t, ok := updates.Updates["VRF"] if ok { req := m.handleVrfUpdate(t) if req != nil { - reqs = append(reqs, req) + m.grpcQueue = append(m.grpcQueue, req) } } t, ok = updates.Updates["BGP_Router"] if ok { routerReqs := m.handleBgpRouterUpdate(t) if len(routerReqs) > 0 { - reqs = append(reqs, routerReqs...) + m.grpcQueue = append(m.grpcQueue, routerReqs...) } + m.grpcChs.monitorAlready <- 1 } t, ok = updates.Updates["BGP_Neighbor"] if ok { neighborReqs := m.handleNeighborUpdate(t) if len(neighborReqs) > 0 { - reqs = append(reqs, neighborReqs...) + m.grpcQueue = append(m.grpcQueue, neighborReqs...) } } - case reqCh <- req: - ress = append(ress, req) - reqs = reqs[1:] - case r := <-resCh: - if err := r.Err(); err != nil { - log.Errorf("operation failed. reqtype: %d, err: %s", res.RequestType, err) + case r := <-m.opsChs.opsCh: + if err := m.Transact(r.operations); err != nil { + log.Errorf("%v", err) } - ress = ress[1:] } } return nil } func (m *OpsManager) GobgpServe() error { + go m.GobgpMonitor() + for { + var grpcReq *server.GrpcRequest + var grpcRes chan *server.GrpcResponse + if len(m.grpcQueue) < 1{ + time.Sleep(time.Duration(time.Second *1)) + continue + } + grpcReq = m.grpcQueue[0] + grpcRes = grpcReq.ResponseCh + + m.grpcChs.grpcCh <- grpcReq + m.grpcQueue = m.grpcQueue[1:] + r := <-grpcRes + + + if err := r.Err(); err != nil { + log.Errorf("operation failed. err: %s", err) + } + } + return nil +} + +func (m *OpsManager) GobgpMonitor() { + <-m.grpcChs.monitorAlready + time.Sleep(time.Duration(time.Second *2)) + reqCh := m.grpcChs.grpcUpdateCh family := bgp.RF_IPv4_UC arg := &api.Arguments{ Resource: api.Resource_GLOBAL, Family: uint32(family), } - - stream, err := client.MonitorBestChanged(context.Background(), arg) - if err != nil { - fmt.Println(err) - os.Exit(1) - } for { - d, err := stream.Recv() - if err == io.EOF { - break - } else if err != nil { - fmt.Println(err) - os.Exit(1) + req := server.NewGrpcRequest(server.REQ_MONITOR_GLOBAL_BEST_CHANGED, "", bgp.RouteFamily(0), arg) + reqCh <- req + res := <-req.ResponseCh + if err := res.Err(); err != nil { + log.Errorf("operation failed. reqtype: %d, err: %s", req.RequestType, err) + } + d := res.Data.(*api.Destination) + p, err := cmd.ApiStruct2Path(d.Paths[0]) + if err != nil { + log.Error("faild parse") } - p, err := ApiStruct2Path(d.Paths[0]) + o, err := m.TransactPreparation(p) if err != nil { - fmt.Println(err) - os.Exit(1) + log.Errorf("%v", err) } + m.opsChs.opsCh <- o + } +} - if globalOpts.Json { - j, _ := json.Marshal(p) - fmt.Println(string(j)) - } else { - ShowRoute(p, false, false, false, true, false) +func (m *OpsManager) Transact(operations []ovsdb.Operation) error { + ops := m.ops + reply, err := ops.Transact(TARGET_TABLE, operations...) + if err != nil { + return err + } + if len(reply) < len(operations) { + return fmt.Errorf("number of replies should be atleast equal to number of Operations") + } + ok := true + for i, o := range reply { + if o.Error != "" && i < len(operations) { + log.Errorf("transaction failed due to an error :", o.Error, " details:", o.Details, " in ", operations[i]) + ok = false + } else if o.Error != "" { + log.Errorf("transaction failed due to an error :", o.Error) + ok = false } } + if ok { + log.Debugf("bgp route addition successful") + } else { + return fmt.Errorf("bgp route addition failed") + } return nil } -func NewOpsManager(ch chan *server.GrpcRequest) (*OpsManager, error) { - cli, err := ovsdb.ConnectUnix("") +func (m *OpsManager) TransactPreparation(p []*cmd.Path) (*OpsOperation, error) { + v, err := m.getVrfUUID() + if err != nil { + return nil, err + } + opsRoute, err := createOpsRoute(p) if err != nil { return nil, err } - updateCh := make(chan *ovsdb.TableUpdates) - n := NewNotifier(updateCh) - cli.Register(n) + insNextHopOp := insertNextHop(opsRoute) + insRouteOp, err := insertRoute(v, opsRoute) + if err != nil { + return nil, err + } + o := []ovsdb.Operation{insNextHopOp, insRouteOp} + oOperation := &OpsOperation{ + operations: o, + } + return oOperation, nil +} + +func insertNextHop(opsRoute map[string]interface{}) ovsdb.Operation { + nexthop := make(map[string]interface{}) + nexthop["ip_address"] = opsRoute["bgp_nexthops"] + nexthop["type"] = opsRoute["sub_address_family"] + insNextHopOp := ovsdb.Operation{ + Op: "insert", + Table: "BGP_Nexthop", + Row: nexthop, + UUIDName: NEXTHOP_UUID, + } + return insNextHopOp +} + +func insertRoute(vrfId uuid.UUID, opsRoute map[string]interface{}) (ovsdb.Operation, error) { + v := []ovsdb.UUID{ovsdb.UUID{vrfId.String()}} + vrfSet, _ := ovsdb.NewOvsSet(v) + opsRoute["vrf"] = vrfSet + + nexthop := []ovsdb.UUID{ovsdb.UUID{NEXTHOP_UUID}} + nexthopSet, _ := ovsdb.NewOvsSet(nexthop) + opsRoute["bgp_nexthops"] = nexthopSet + + attrMap, err := ovsdb.NewOvsMap(opsRoute["path_attributes"]) + if err != nil { + return ovsdb.Operation{}, err + } + + opsRoute["path_attributes"] = attrMap + + insRouteOp := ovsdb.Operation{ + Op: "insert", + Table: "BGP_Route", + Row: opsRoute, + UUIDName: ROUTE_UUID, + } + return insRouteOp, nil +} + +func createOpsRoute(pl []*cmd.Path) (map[string]interface{}, error) { + + route := make(map[string]interface{}) + route["metric"] = 0 + route["peer"] = "Remote announcement" + for _, p := range pl { + var nexthop string + pathAttr := map[string]string{"BGP_iBGP": "false", "BGP_flags": "16", "BGP_internal": "false", "BGP_loc_pref": "0"} + for _, a := range p.PathAttrs { + switch a.GetType() { + case bgp.BGP_ATTR_TYPE_NEXT_HOP: + nexthop = a.(*bgp.PathAttributeNextHop).Value.String() + case bgp.BGP_ATTR_TYPE_MP_REACH_NLRI: + n := a.(*bgp.PathAttributeMpReachNLRI).Nexthop + if n != nil { + nexthop = n.String() + } else { + nexthop = "" + } + case bgp.BGP_ATTR_TYPE_AS_PATH: + pathAttr["BGP_AS_path"] = a.(*bgp.PathAttributeAsPath).String() + case bgp.BGP_ATTR_TYPE_ORIGIN: + origin := "-" + switch a.(*bgp.PathAttributeOrigin).Value[0] { + case bgp.BGP_ORIGIN_ATTR_TYPE_IGP: + origin = "i" + case bgp.BGP_ORIGIN_ATTR_TYPE_EGP: + origin = "e" + case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE: + origin = "?" + } + pathAttr["BGP_origin"] = origin + case bgp.BGP_ATTR_TYPE_LOCAL_PREF: + pathAttr["BGP_loc_pref"] = fmt.Sprintf("%i", a.(*bgp.PathAttributeLocalPref).Value) + case bgp.BGP_ATTR_TYPE_MULTI_EXIT_DISC: + route["metric"] = a.(*bgp.PathAttributeMultiExitDisc).Value + default: + continue + } + } + afi := "ipv4" + if p.Nlri.AFI() != bgp.AFI_IP { + afi = "ipv6" + } + safi := "unicast" + + route["prefix"] = p.Nlri.String() + route["address_family"] = afi + route["sub_address_family"] = safi + route["bgp_nexthops"] = nexthop + route["path_attributes"] = pathAttr + break + } + + return route, nil +} + +type OpsOperation struct { + operations []ovsdb.Operation +} + +type GrpcChs struct { + grpcCh chan *server.GrpcRequest + grpcUpdateCh chan *server.GrpcRequest + monitorAlready chan int +} + +type OpsChs struct { + opsCh chan *OpsOperation + opsUpdateCh chan *ovsdb.TableUpdates +} + +type OpsManager struct { + ops *ovsdb.OvsdbClient + grpcChs *GrpcChs + opsChs *OpsChs + grpcQueue []*server.GrpcRequest +// opsQueue []*OpsOperation + bgpReady bool + cache map[string]map[string]ovsdb.Row +} + +func NewOpsManager(grpcCh chan *server.GrpcRequest) (*OpsManager, error) { + ops, err := ovsdb.ConnectUnix("") + if err != nil { + return nil, err + } +// oQueue := make([]*OpsOperation, 0) + gQueue := make([]*server.GrpcRequest, 0) + grpcChs := &GrpcChs{ + grpcCh: grpcCh, + grpcUpdateCh: grpcCh, + monitorAlready: make(chan int), + } + opsUpdateCh := make(chan *ovsdb.TableUpdates) + n := NewNotifier(opsUpdateCh) + ops.Register(n) + opsChs := &OpsChs{ + opsCh: make(chan *OpsOperation, 1024), + opsUpdateCh: opsUpdateCh, + } return &OpsManager{ - client: cli, - grpcCh: ch, - updateCh: updateCh, - cache: make(map[string]map[string]ovsdb.Row), + ops: ops, + grpcChs: grpcChs, + opsChs: opsChs, + grpcQueue: gQueue, +// opsQueue: oQueue, + bgpReady: false, + cache: make(map[string]map[string]ovsdb.Row), }, nil } |