diff options
-rw-r--r-- | gobgpd/main.go | 4 | ||||
-rw-r--r-- | server/server.go | 294 |
2 files changed, 154 insertions, 144 deletions
diff --git a/gobgpd/main.go b/gobgpd/main.go index f60f250b..1f64c76b 100644 --- a/gobgpd/main.go +++ b/gobgpd/main.go @@ -201,7 +201,9 @@ func main() { var added, deleted, updated []config.Neighbor if bgpConfig == nil { - bgpServer.SetGlobalType(newConfig.Bgp.Global) + if err := bgpServer.SetGlobalType(newConfig.Bgp.Global); err != nil { + log.Fatalf("failed to set global config: %s", err) + } bgpConfig = &newConfig.Bgp bgpServer.SetRpkiConfig(newConfig.Bgp.RpkiServers) added = newConfig.Bgp.Neighbors diff --git a/server/server.go b/server/server.go index a03fd92e..c300e823 100644 --- a/server/server.go +++ b/server/server.go @@ -89,13 +89,14 @@ func (ws Watchers) watching(typ watcherEventType) bool { type BgpServer struct { bgpConfig config.Bgp - globalTypeCh chan config.Global addedPeerCh chan config.Neighbor deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor fsmincomingCh *channels.InfiniteChannel fsmStateCh chan *FsmMsg rpkiConfigCh chan []config.RpkiServer + acceptCh chan *net.TCPConn + zapiMsgCh chan *zebra.Message GrpcReqCh chan *GrpcRequest policyUpdateCh chan config.RoutingPolicy @@ -113,7 +114,6 @@ type BgpServer struct { func NewBgpServer() *BgpServer { b := BgpServer{} - b.globalTypeCh = make(chan config.Global, 1) b.addedPeerCh = make(chan config.Neighbor) b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) @@ -184,64 +184,11 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener { } func (server *BgpServer) Serve() { - var g config.Global - for { - select { - case g = <-server.globalTypeCh: - server.bgpConfig.Global = g - server.globalTypeCh = nil - default: - } - - if server.globalTypeCh == nil { - break - } - - select { - case grpcReq := <-server.GrpcReqCh: - server.handleGrpc(grpcReq) - case g = <-server.globalTypeCh: - server.bgpConfig.Global = g - server.globalTypeCh = nil - } - } - - server.roaManager, _ = newROAManager(g.Config.As, nil) - - if g.Mrt.FileName != "" { - w, err := newMrtWatcher(g.Mrt.FileName) - if err != nil { - log.Warn(err) - } else { - server.watchers[WATCHER_MRT] = w - } - } - - if len(g.BmpServers) > 0 { - w, err := newBmpWatcher(server.GrpcReqCh) - if err != nil { - log.Warn(err) - } else { - for _, server := range g.BmpServers { - if err := w.addServer(server.Config); err != nil { - log.Warn(err) - } - } - server.watchers[WATCHER_BMP] = w - } - } + server.roaManager, _ = newROAManager(0, nil) w, _ := newGrpcIncomingWatcher() server.watchers[WATCHER_GRPC_INCOMING] = w - if g.Zebra.Enabled { - cli, err := NewZclient(g.Zebra.Url, g.Zebra.RedistributeRouteTypeList) - if err != nil { - log.Error(err) - } - server.zclient = cli - } - senderCh := make(chan *SenderMsg, 1<<16) go func(ch chan *SenderMsg) { for { @@ -275,34 +222,11 @@ func (server *BgpServer) Serve() { } }(broadcastCh) - rfs, _ := config.AfiSafis(g.AfiSafis).ToRfList() - server.globalRib = table.NewTableManager(rfs, g.MplsLabelRange.MinLabel, g.MplsLabelRange.MaxLabel) server.listeners = make([]*net.TCPListener, 0, 2) - acceptCh := make(chan *net.TCPConn, 4096) - if g.ListenConfig.Port > 0 { - list := []string{"0.0.0.0", "::"} - if len(g.ListenConfig.LocalAddressList) > 0 { - list = g.ListenConfig.LocalAddressList - } - for _, addr := range list { - l, err := listenAndAccept(addr, uint32(g.ListenConfig.Port), acceptCh) - if err != nil { - log.Fatal(err) - os.Exit(1) - } - server.listeners = append(server.listeners, l) - } - } - server.fsmincomingCh = channels.NewInfiniteChannel() server.fsmStateCh = make(chan *FsmMsg, 4096) var senderMsgs []*SenderMsg - var zapiMsgCh chan *zebra.Message - if server.zclient != nil { - zapiMsgCh = server.zclient.Receive() - } - handleFsmMsg := func(e *FsmMsg) { peer, found := server.neighborMap[e.MsgSrc] if !found { @@ -373,7 +297,7 @@ func (server *BgpServer) Serve() { if len(m) > 0 { senderMsgs = append(senderMsgs, m...) } - case conn := <-acceptCh: + case conn := <-server.acceptCh: passConn(conn) default: } @@ -393,12 +317,12 @@ func (server *BgpServer) Serve() { server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c) case rmsg := <-server.roaManager.recieveROA(): server.roaManager.handleROAEvent(rmsg) - case zmsg := <-zapiMsgCh: + case zmsg := <-server.zapiMsgCh: m := handleZapiMsg(zmsg, server) if len(m) > 0 { senderMsgs = append(senderMsgs, m...) } - case conn := <-acceptCh: + case conn := <-server.acceptCh: passConn(conn) case config := <-server.addedPeerCh: addr := config.Config.NeighborAddress @@ -407,12 +331,12 @@ func (server *BgpServer) Serve() { log.Warn("Can't overwrite the exising peer ", addr) continue } - if g.ListenConfig.Port > 0 { + if server.bgpConfig.Global.ListenConfig.Port > 0 { for _, l := range server.Listeners(addr) { SetTcpMD5SigSockopts(l, addr, config.Config.AuthPassword) } } - peer := NewPeer(g, config, server.globalRib, server.policy) + peer := NewPeer(server.bgpConfig.Global, config, server.globalRib, server.policy) server.setPolicyByConfig(peer.ID(), config.ApplyPolicy) if peer.isRouteServerClient() { pathList := make([]*table.Path, 0) @@ -1177,10 +1101,53 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg { return msgs } -func (server *BgpServer) SetGlobalType(g config.Global) { - if server.globalTypeCh != nil { - server.globalTypeCh <- g +func (server *BgpServer) SetGlobalType(g config.Global) error { + { + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_MOD_GLOBAL_CONFIG, + Data: &g, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + return err + } + } + if g.Mrt.FileName != "" { + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_MOD_MRT, + Data: &api.ModMrtArguments{ + Operation: api.Operation_ADD, + Filename: g.Mrt.FileName, + }, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + return err + } + } + for _, s := range g.BmpServers { + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_MOD_BMP, + Data: &s.Config, + ResponseCh: ch, + } + if err := (<-ch).Err(); err != nil { + return err + } + } + + if g.Zebra.Enabled { + cli, err := NewZclient(g.Zebra.Url, g.Zebra.RedistributeRouteTypeList) + if err != nil { + return err + } + server.zclient = cli + server.zapiMsgCh = server.zclient.Receive() } + return nil } func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) { @@ -1663,56 +1630,87 @@ END: } func (server *BgpServer) handleModConfig(grpcReq *GrpcRequest) error { - arg := grpcReq.Data.(*api.ModGlobalConfigArguments) - if arg.Operation != api.Operation_ADD { - return fmt.Errorf("invalid operation %s", arg.Operation) - } - if server.globalTypeCh == nil { - return fmt.Errorf("gobgp is already started") - } - g := arg.Global - id := net.ParseIP(g.RouterId) - if id == nil { - return fmt.Errorf("invalid router-id format: %s", g.RouterId) - } - families := make([]config.AfiSafi, 0, len(g.Families)) - for _, f := range g.Families { - families = append(families, config.AfiSafi{ - Config: config.AfiSafiConfig{ - AfiSafiName: config.AfiSafiType(bgp.RouteFamily(f).String()), - Enabled: true, + var c *config.Global + switch arg := grpcReq.Data.(type) { + case *api.ModGlobalConfigArguments: + if arg.Operation != api.Operation_ADD { + return fmt.Errorf("invalid operation %s", arg.Operation) + } + g := arg.Global + id := net.ParseIP(g.RouterId) + if id == nil { + return fmt.Errorf("invalid router-id format: %s", g.RouterId) + } + families := make([]config.AfiSafi, 0, len(g.Families)) + for _, f := range g.Families { + name := config.AfiSafiType(bgp.RouteFamily(f).String()) + families = append(families, config.AfiSafi{ + AfiSafiName: name, + Config: config.AfiSafiConfig{ + AfiSafiName: name, + Enabled: true, + }, + State: config.AfiSafiState{ + AfiSafiName: name, + }, + }) + } + b := &config.Bgp{ + Global: config.Global{ + Config: config.GlobalConfig{ + As: g.As, + RouterId: g.RouterId, + }, + ListenConfig: config.ListenConfig{ + Port: g.ListenPort, + LocalAddressList: g.ListenAddresses, + }, + MplsLabelRange: config.MplsLabelRange{ + MinLabel: g.MplsLabelMin, + MaxLabel: g.MplsLabelMax, + }, + AfiSafis: families, + Collector: config.Collector{ + Enabled: g.Collector, + }, }, - }) + } + if err := config.SetDefaultConfigValues(nil, b); err != nil { + return err + } + c = &b.Global + case *config.Global: + c = arg } - c := config.Bgp{ - Global: config.Global{ - Config: config.GlobalConfig{ - As: g.As, - RouterId: g.RouterId, - }, - ListenConfig: config.ListenConfig{ - Port: g.ListenPort, - LocalAddressList: g.ListenAddresses, - }, - MplsLabelRange: config.MplsLabelRange{ - MinLabel: g.MplsLabelMin, - MaxLabel: g.MplsLabelMax, - }, - AfiSafis: families, - Collector: config.Collector{ - Enabled: g.Collector, - }, - }, + + if server.bgpConfig.Global.Config.As != 0 { + return fmt.Errorf("gobgp is already started") } - err := config.SetDefaultConfigValues(nil, &c) - if err != nil { - return err + + if c.ListenConfig.Port > 0 { + acceptCh := make(chan *net.TCPConn, 4096) + list := []string{"0.0.0.0", "::"} + if len(c.ListenConfig.LocalAddressList) > 0 { + list = c.ListenConfig.LocalAddressList + } + for _, addr := range list { + l, err := listenAndAccept(addr, uint32(c.ListenConfig.Port), acceptCh) + if err != nil { + return err + } + server.listeners = append(server.listeners, l) + } + server.acceptCh = acceptCh } + + rfs, _ := config.AfiSafis(c.AfiSafis).ToRfList() + server.globalRib = table.NewTableManager(rfs, c.MplsLabelRange.MinLabel, c.MplsLabelRange.MaxLabel) + p := config.RoutingPolicy{} if err := server.SetRoutingPolicy(p); err != nil { - log.Fatal(err) + return err } - server.globalTypeCh <- c.Global + server.bgpConfig.Global = *c return nil } @@ -1766,7 +1764,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { return results } - if server.globalTypeCh != nil && grpcReq.RequestType != REQ_MOD_GLOBAL_CONFIG { + if server.bgpConfig.Global.Config.As == 0 && grpcReq.RequestType != REQ_MOD_GLOBAL_CONFIG { grpcReq.ResponseCh <- &GrpcResponse{ ResponseErr: fmt.Errorf("bgpd main loop is not started yet"), } @@ -2824,31 +2822,41 @@ func (server *BgpServer) handleModMrt(grpcReq *GrpcRequest) { } func (server *BgpServer) handleModBmp(grpcReq *GrpcRequest) { - arg := grpcReq.Data.(*api.ModBmpArguments) + var op api.Operation + var c *config.BmpServerConfig + switch arg := grpcReq.Data.(type) { + case *api.ModBmpArguments: + c = &config.BmpServerConfig{ + Address: arg.Address, + Port: arg.Port, + RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type), + } + op = arg.Operation + case *config.BmpServerConfig: + c = arg + op = api.Operation_ADD + } + w, y := server.watchers[WATCHER_BMP] if !y { - if arg.Operation == api.Operation_ADD { + if op == api.Operation_ADD { w, _ = newBmpWatcher(server.GrpcReqCh) server.watchers[WATCHER_BMP] = w - } else if arg.Operation == api.Operation_DEL { + } else if op == api.Operation_DEL { grpcDone(grpcReq, fmt.Errorf("not enabled yet")) return } } - c := config.BmpServerConfig{ - Address: arg.Address, - Port: arg.Port, - RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type), - } - switch arg.Operation { + + switch op { case api.Operation_ADD: - err := w.(*bmpWatcher).addServer(c) + err := w.(*bmpWatcher).addServer(*c) grpcDone(grpcReq, err) case api.Operation_DEL: - err := w.(*bmpWatcher).deleteServer(c) + err := w.(*bmpWatcher).deleteServer(*c) grpcDone(grpcReq, err) default: - grpcDone(grpcReq, fmt.Errorf("unsupported operation: %s", arg.Operation)) + grpcDone(grpcReq, fmt.Errorf("unsupported operation: %s", op)) } } |