summaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/server.go294
1 files changed, 151 insertions, 143 deletions
diff --git a/server/server.go b/server/server.go
index a03fd92e..c300e823 100644
--- a/server/server.go
+++ b/server/server.go
@@ -89,13 +89,14 @@ func (ws Watchers) watching(typ watcherEventType) bool {
type BgpServer struct {
bgpConfig config.Bgp
- globalTypeCh chan config.Global
addedPeerCh chan config.Neighbor
deletedPeerCh chan config.Neighbor
updatedPeerCh chan config.Neighbor
fsmincomingCh *channels.InfiniteChannel
fsmStateCh chan *FsmMsg
rpkiConfigCh chan []config.RpkiServer
+ acceptCh chan *net.TCPConn
+ zapiMsgCh chan *zebra.Message
GrpcReqCh chan *GrpcRequest
policyUpdateCh chan config.RoutingPolicy
@@ -113,7 +114,6 @@ type BgpServer struct {
func NewBgpServer() *BgpServer {
b := BgpServer{}
- b.globalTypeCh = make(chan config.Global, 1)
b.addedPeerCh = make(chan config.Neighbor)
b.deletedPeerCh = make(chan config.Neighbor)
b.updatedPeerCh = make(chan config.Neighbor)
@@ -184,64 +184,11 @@ func (server *BgpServer) Listeners(addr string) []*net.TCPListener {
}
func (server *BgpServer) Serve() {
- var g config.Global
- for {
- select {
- case g = <-server.globalTypeCh:
- server.bgpConfig.Global = g
- server.globalTypeCh = nil
- default:
- }
-
- if server.globalTypeCh == nil {
- break
- }
-
- select {
- case grpcReq := <-server.GrpcReqCh:
- server.handleGrpc(grpcReq)
- case g = <-server.globalTypeCh:
- server.bgpConfig.Global = g
- server.globalTypeCh = nil
- }
- }
-
- server.roaManager, _ = newROAManager(g.Config.As, nil)
-
- if g.Mrt.FileName != "" {
- w, err := newMrtWatcher(g.Mrt.FileName)
- if err != nil {
- log.Warn(err)
- } else {
- server.watchers[WATCHER_MRT] = w
- }
- }
-
- if len(g.BmpServers) > 0 {
- w, err := newBmpWatcher(server.GrpcReqCh)
- if err != nil {
- log.Warn(err)
- } else {
- for _, server := range g.BmpServers {
- if err := w.addServer(server.Config); err != nil {
- log.Warn(err)
- }
- }
- server.watchers[WATCHER_BMP] = w
- }
- }
+ server.roaManager, _ = newROAManager(0, nil)
w, _ := newGrpcIncomingWatcher()
server.watchers[WATCHER_GRPC_INCOMING] = w
- if g.Zebra.Enabled {
- cli, err := NewZclient(g.Zebra.Url, g.Zebra.RedistributeRouteTypeList)
- if err != nil {
- log.Error(err)
- }
- server.zclient = cli
- }
-
senderCh := make(chan *SenderMsg, 1<<16)
go func(ch chan *SenderMsg) {
for {
@@ -275,34 +222,11 @@ func (server *BgpServer) Serve() {
}
}(broadcastCh)
- rfs, _ := config.AfiSafis(g.AfiSafis).ToRfList()
- server.globalRib = table.NewTableManager(rfs, g.MplsLabelRange.MinLabel, g.MplsLabelRange.MaxLabel)
server.listeners = make([]*net.TCPListener, 0, 2)
- acceptCh := make(chan *net.TCPConn, 4096)
- if g.ListenConfig.Port > 0 {
- list := []string{"0.0.0.0", "::"}
- if len(g.ListenConfig.LocalAddressList) > 0 {
- list = g.ListenConfig.LocalAddressList
- }
- for _, addr := range list {
- l, err := listenAndAccept(addr, uint32(g.ListenConfig.Port), acceptCh)
- if err != nil {
- log.Fatal(err)
- os.Exit(1)
- }
- server.listeners = append(server.listeners, l)
- }
- }
-
server.fsmincomingCh = channels.NewInfiniteChannel()
server.fsmStateCh = make(chan *FsmMsg, 4096)
var senderMsgs []*SenderMsg
- var zapiMsgCh chan *zebra.Message
- if server.zclient != nil {
- zapiMsgCh = server.zclient.Receive()
- }
-
handleFsmMsg := func(e *FsmMsg) {
peer, found := server.neighborMap[e.MsgSrc]
if !found {
@@ -373,7 +297,7 @@ func (server *BgpServer) Serve() {
if len(m) > 0 {
senderMsgs = append(senderMsgs, m...)
}
- case conn := <-acceptCh:
+ case conn := <-server.acceptCh:
passConn(conn)
default:
}
@@ -393,12 +317,12 @@ func (server *BgpServer) Serve() {
server.roaManager, _ = newROAManager(server.bgpConfig.Global.Config.As, c)
case rmsg := <-server.roaManager.recieveROA():
server.roaManager.handleROAEvent(rmsg)
- case zmsg := <-zapiMsgCh:
+ case zmsg := <-server.zapiMsgCh:
m := handleZapiMsg(zmsg, server)
if len(m) > 0 {
senderMsgs = append(senderMsgs, m...)
}
- case conn := <-acceptCh:
+ case conn := <-server.acceptCh:
passConn(conn)
case config := <-server.addedPeerCh:
addr := config.Config.NeighborAddress
@@ -407,12 +331,12 @@ func (server *BgpServer) Serve() {
log.Warn("Can't overwrite the exising peer ", addr)
continue
}
- if g.ListenConfig.Port > 0 {
+ if server.bgpConfig.Global.ListenConfig.Port > 0 {
for _, l := range server.Listeners(addr) {
SetTcpMD5SigSockopts(l, addr, config.Config.AuthPassword)
}
}
- peer := NewPeer(g, config, server.globalRib, server.policy)
+ peer := NewPeer(server.bgpConfig.Global, config, server.globalRib, server.policy)
server.setPolicyByConfig(peer.ID(), config.ApplyPolicy)
if peer.isRouteServerClient() {
pathList := make([]*table.Path, 0)
@@ -1177,10 +1101,53 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) []*SenderMsg {
return msgs
}
-func (server *BgpServer) SetGlobalType(g config.Global) {
- if server.globalTypeCh != nil {
- server.globalTypeCh <- g
+func (server *BgpServer) SetGlobalType(g config.Global) error {
+ {
+ ch := make(chan *GrpcResponse)
+ server.GrpcReqCh <- &GrpcRequest{
+ RequestType: REQ_MOD_GLOBAL_CONFIG,
+ Data: &g,
+ ResponseCh: ch,
+ }
+ if err := (<-ch).Err(); err != nil {
+ return err
+ }
+ }
+ if g.Mrt.FileName != "" {
+ ch := make(chan *GrpcResponse)
+ server.GrpcReqCh <- &GrpcRequest{
+ RequestType: REQ_MOD_MRT,
+ Data: &api.ModMrtArguments{
+ Operation: api.Operation_ADD,
+ Filename: g.Mrt.FileName,
+ },
+ ResponseCh: ch,
+ }
+ if err := (<-ch).Err(); err != nil {
+ return err
+ }
+ }
+ for _, s := range g.BmpServers {
+ ch := make(chan *GrpcResponse)
+ server.GrpcReqCh <- &GrpcRequest{
+ RequestType: REQ_MOD_BMP,
+ Data: &s.Config,
+ ResponseCh: ch,
+ }
+ if err := (<-ch).Err(); err != nil {
+ return err
+ }
+ }
+
+ if g.Zebra.Enabled {
+ cli, err := NewZclient(g.Zebra.Url, g.Zebra.RedistributeRouteTypeList)
+ if err != nil {
+ return err
+ }
+ server.zclient = cli
+ server.zapiMsgCh = server.zclient.Receive()
}
+ return nil
}
func (server *BgpServer) SetRpkiConfig(c []config.RpkiServer) {
@@ -1663,56 +1630,87 @@ END:
}
func (server *BgpServer) handleModConfig(grpcReq *GrpcRequest) error {
- arg := grpcReq.Data.(*api.ModGlobalConfigArguments)
- if arg.Operation != api.Operation_ADD {
- return fmt.Errorf("invalid operation %s", arg.Operation)
- }
- if server.globalTypeCh == nil {
- return fmt.Errorf("gobgp is already started")
- }
- g := arg.Global
- id := net.ParseIP(g.RouterId)
- if id == nil {
- return fmt.Errorf("invalid router-id format: %s", g.RouterId)
- }
- families := make([]config.AfiSafi, 0, len(g.Families))
- for _, f := range g.Families {
- families = append(families, config.AfiSafi{
- Config: config.AfiSafiConfig{
- AfiSafiName: config.AfiSafiType(bgp.RouteFamily(f).String()),
- Enabled: true,
+ var c *config.Global
+ switch arg := grpcReq.Data.(type) {
+ case *api.ModGlobalConfigArguments:
+ if arg.Operation != api.Operation_ADD {
+ return fmt.Errorf("invalid operation %s", arg.Operation)
+ }
+ g := arg.Global
+ id := net.ParseIP(g.RouterId)
+ if id == nil {
+ return fmt.Errorf("invalid router-id format: %s", g.RouterId)
+ }
+ families := make([]config.AfiSafi, 0, len(g.Families))
+ for _, f := range g.Families {
+ name := config.AfiSafiType(bgp.RouteFamily(f).String())
+ families = append(families, config.AfiSafi{
+ AfiSafiName: name,
+ Config: config.AfiSafiConfig{
+ AfiSafiName: name,
+ Enabled: true,
+ },
+ State: config.AfiSafiState{
+ AfiSafiName: name,
+ },
+ })
+ }
+ b := &config.Bgp{
+ Global: config.Global{
+ Config: config.GlobalConfig{
+ As: g.As,
+ RouterId: g.RouterId,
+ },
+ ListenConfig: config.ListenConfig{
+ Port: g.ListenPort,
+ LocalAddressList: g.ListenAddresses,
+ },
+ MplsLabelRange: config.MplsLabelRange{
+ MinLabel: g.MplsLabelMin,
+ MaxLabel: g.MplsLabelMax,
+ },
+ AfiSafis: families,
+ Collector: config.Collector{
+ Enabled: g.Collector,
+ },
},
- })
+ }
+ if err := config.SetDefaultConfigValues(nil, b); err != nil {
+ return err
+ }
+ c = &b.Global
+ case *config.Global:
+ c = arg
}
- c := config.Bgp{
- Global: config.Global{
- Config: config.GlobalConfig{
- As: g.As,
- RouterId: g.RouterId,
- },
- ListenConfig: config.ListenConfig{
- Port: g.ListenPort,
- LocalAddressList: g.ListenAddresses,
- },
- MplsLabelRange: config.MplsLabelRange{
- MinLabel: g.MplsLabelMin,
- MaxLabel: g.MplsLabelMax,
- },
- AfiSafis: families,
- Collector: config.Collector{
- Enabled: g.Collector,
- },
- },
+
+ if server.bgpConfig.Global.Config.As != 0 {
+ return fmt.Errorf("gobgp is already started")
}
- err := config.SetDefaultConfigValues(nil, &c)
- if err != nil {
- return err
+
+ if c.ListenConfig.Port > 0 {
+ acceptCh := make(chan *net.TCPConn, 4096)
+ list := []string{"0.0.0.0", "::"}
+ if len(c.ListenConfig.LocalAddressList) > 0 {
+ list = c.ListenConfig.LocalAddressList
+ }
+ for _, addr := range list {
+ l, err := listenAndAccept(addr, uint32(c.ListenConfig.Port), acceptCh)
+ if err != nil {
+ return err
+ }
+ server.listeners = append(server.listeners, l)
+ }
+ server.acceptCh = acceptCh
}
+
+ rfs, _ := config.AfiSafis(c.AfiSafis).ToRfList()
+ server.globalRib = table.NewTableManager(rfs, c.MplsLabelRange.MinLabel, c.MplsLabelRange.MaxLabel)
+
p := config.RoutingPolicy{}
if err := server.SetRoutingPolicy(p); err != nil {
- log.Fatal(err)
+ return err
}
- server.globalTypeCh <- c.Global
+ server.bgpConfig.Global = *c
return nil
}
@@ -1766,7 +1764,7 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg {
return results
}
- if server.globalTypeCh != nil && grpcReq.RequestType != REQ_MOD_GLOBAL_CONFIG {
+ if server.bgpConfig.Global.Config.As == 0 && grpcReq.RequestType != REQ_MOD_GLOBAL_CONFIG {
grpcReq.ResponseCh <- &GrpcResponse{
ResponseErr: fmt.Errorf("bgpd main loop is not started yet"),
}
@@ -2824,31 +2822,41 @@ func (server *BgpServer) handleModMrt(grpcReq *GrpcRequest) {
}
func (server *BgpServer) handleModBmp(grpcReq *GrpcRequest) {
- arg := grpcReq.Data.(*api.ModBmpArguments)
+ var op api.Operation
+ var c *config.BmpServerConfig
+ switch arg := grpcReq.Data.(type) {
+ case *api.ModBmpArguments:
+ c = &config.BmpServerConfig{
+ Address: arg.Address,
+ Port: arg.Port,
+ RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type),
+ }
+ op = arg.Operation
+ case *config.BmpServerConfig:
+ c = arg
+ op = api.Operation_ADD
+ }
+
w, y := server.watchers[WATCHER_BMP]
if !y {
- if arg.Operation == api.Operation_ADD {
+ if op == api.Operation_ADD {
w, _ = newBmpWatcher(server.GrpcReqCh)
server.watchers[WATCHER_BMP] = w
- } else if arg.Operation == api.Operation_DEL {
+ } else if op == api.Operation_DEL {
grpcDone(grpcReq, fmt.Errorf("not enabled yet"))
return
}
}
- c := config.BmpServerConfig{
- Address: arg.Address,
- Port: arg.Port,
- RouteMonitoringPolicy: config.BmpRouteMonitoringPolicyType(arg.Type),
- }
- switch arg.Operation {
+
+ switch op {
case api.Operation_ADD:
- err := w.(*bmpWatcher).addServer(c)
+ err := w.(*bmpWatcher).addServer(*c)
grpcDone(grpcReq, err)
case api.Operation_DEL:
- err := w.(*bmpWatcher).deleteServer(c)
+ err := w.(*bmpWatcher).deleteServer(*c)
grpcDone(grpcReq, err)
default:
- grpcDone(grpcReq, fmt.Errorf("unsupported operation: %s", arg.Operation))
+ grpcDone(grpcReq, fmt.Errorf("unsupported operation: %s", op))
}
}