diff options
-rw-r--r-- | server/grpc_server.go | 2 | ||||
-rw-r--r-- | server/server.go | 216 |
2 files changed, 97 insertions, 121 deletions
diff --git a/server/grpc_server.go b/server/grpc_server.go index 38b52f07..73d0ba01 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -43,6 +43,8 @@ const ( REQ_NEIGHBOR_ENABLE REQ_NEIGHBOR_DISABLE REQ_MOD_NEIGHBOR + REQ_ADD_NEIGHBOR + REQ_DEL_NEIGHBOR REQ_GLOBAL_RIB REQ_MONITOR_GLOBAL_BEST_CHANGED REQ_MONITOR_INCOMING diff --git a/server/server.go b/server/server.go index 64ec3f53..7f092230 100644 --- a/server/server.go +++ b/server/server.go @@ -143,8 +143,6 @@ func NewTCPListener(address string, port uint32, ch chan *net.TCPConn) (*TCPList type BgpServer struct { bgpConfig config.Bgp - addedPeerCh chan config.Neighbor - deletedPeerCh chan config.Neighbor updatedPeerCh chan config.Neighbor fsmincomingCh *channels.InfiniteChannel fsmStateCh chan *FsmMsg @@ -167,8 +165,6 @@ type BgpServer struct { func NewBgpServer() *BgpServer { b := BgpServer{} - b.addedPeerCh = make(chan config.Neighbor) - b.deletedPeerCh = make(chan config.Neighbor) b.updatedPeerCh = make(chan config.Neighbor) b.GrpcReqCh = make(chan *GrpcRequest, 1) b.policyUpdateCh = make(chan config.RoutingPolicy) @@ -341,64 +337,6 @@ func (server *BgpServer) Serve() { } case conn := <-server.acceptCh: passConn(conn) - case config := <-server.addedPeerCh: - addr := config.Config.NeighborAddress - _, found := server.neighborMap[addr] - if found { - log.Warn("Can't overwrite the exising peer ", addr) - continue - } - if server.bgpConfig.Global.ListenConfig.Port > 0 { - for _, l := range server.Listeners(addr) { - SetTcpMD5SigSockopts(l, addr, config.Config.AuthPassword) - } - } - peer := NewPeer(server.bgpConfig.Global, config, server.globalRib, server.policy) - server.setPolicyByConfig(peer.ID(), config.ApplyPolicy) - if peer.isRouteServerClient() { - pathList := make([]*table.Path, 0) - rfList := peer.configuredRFlist() - for _, p := range server.neighborMap { - if !p.isRouteServerClient() { - continue - } - pathList = append(pathList, p.getAccepted(rfList)...) - } - moded := server.RSimportPaths(peer, pathList) - if len(moded) > 0 { - server.globalRib.ProcessPaths(nil, moded) - } - } - server.neighborMap[addr] = peer - peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) - case config := <-server.deletedPeerCh: - addr := config.Config.NeighborAddress - for _, l := range server.Listeners(addr) { - SetTcpMD5SigSockopts(l, addr, "") - } - peer, found := server.neighborMap[addr] - if found { - log.Info("Delete a peer configuration for ", addr) - go func(addr string) { - t := time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h.t for ", addr) }) - peer.fsm.h.t.Kill(nil) - peer.fsm.h.t.Wait() - t.Stop() - t = time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h for ", addr) }) - peer.fsm.t.Kill(nil) - peer.fsm.t.Wait() - t.Stop() - }(addr) - - m := server.dropPeerAllRoutes(peer, peer.configuredRFlist()) - if len(m) > 0 { - senderMsgs = append(senderMsgs, m...) - } - delete(server.neighborMap, addr) - } else { - log.Info("Can't delete a peer configuration for ", addr) - } case config := <-server.updatedPeerCh: addr := config.Config.NeighborAddress peer := server.neighborMap[addr] @@ -1124,12 +1062,24 @@ func (server *BgpServer) SetMrtConfig(c []config.Mrt) error { return nil } -func (server *BgpServer) PeerAdd(peer config.Neighbor) { - server.addedPeerCh <- peer +func (server *BgpServer) PeerAdd(peer config.Neighbor) error { + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_ADD_NEIGHBOR, + Data: &peer, + ResponseCh: ch, + } + return (<-ch).Err() } -func (server *BgpServer) PeerDelete(peer config.Neighbor) { - server.deletedPeerCh <- peer +func (server *BgpServer) PeerDelete(peer config.Neighbor) error { + ch := make(chan *GrpcResponse) + server.GrpcReqCh <- &GrpcRequest{ + RequestType: REQ_DEL_NEIGHBOR, + Data: peer, + ResponseCh: ch, + } + return (<-ch).Err() } func (server *BgpServer) PeerUpdate(peer config.Neighbor) { @@ -2189,6 +2139,21 @@ func (server *BgpServer) handleGrpc(grpcReq *GrpcRequest) []*SenderMsg { msgs = append(msgs, m...) } close(grpcReq.ResponseCh) + case REQ_ADD_NEIGHBOR: + _, err := server.handleAddNeighbor(grpcReq.Data.(*config.Neighbor)) + grpcReq.ResponseCh <- &GrpcResponse{ + ResponseErr: err, + } + close(grpcReq.ResponseCh) + case REQ_DEL_NEIGHBOR: + m, err := server.handleDelNeighbor(grpcReq.Data.(*config.Neighbor)) + grpcReq.ResponseCh <- &GrpcResponse{ + ResponseErr: err, + } + if len(m) > 0 { + msgs = append(msgs, m...) + } + close(grpcReq.ResponseCh) case REQ_DEFINED_SET: if err := server.handleGrpcGetDefinedSet(grpcReq); err != nil { grpcReq.ResponseCh <- &GrpcResponse{ @@ -2305,26 +2270,70 @@ func (server *BgpServer) handleGrpcGetDefinedSet(grpcReq *GrpcRequest) error { } return nil } -func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*SenderMsg, err error) { - arg := grpcReq.Data.(*api.ModNeighborArguments) - addr := arg.Peer.Conf.NeighborAddress - n, ok := server.neighborMap[addr] - if arg.Operation != api.Operation_ADD && !ok { - return nil, fmt.Errorf("not found neighbor %s", addr) + +func (server *BgpServer) handleAddNeighbor(c *config.Neighbor) ([]*SenderMsg, error) { + addr := c.Config.NeighborAddress + if _, y := server.neighborMap[addr]; y { + return nil, fmt.Errorf("Can't overwrite the exising peer: %s", addr) } - switch arg.Operation { - case api.Operation_ADD: - if ok { - return nil, fmt.Errorf("Can't overwrite the exising peer %s", addr) - } else { - log.Infof("Peer %s is added", addr) + if server.bgpConfig.Global.ListenConfig.Port > 0 { + for _, l := range server.Listeners(addr) { + SetTcpMD5SigSockopts(l, addr, c.Config.AuthPassword) } - if server.bgpConfig.Global.ListenConfig.Port > 0 { - for _, l := range server.Listeners(addr) { - SetTcpMD5SigSockopts(l, addr, arg.Peer.Conf.AuthPassword) + } + + peer := NewPeer(server.bgpConfig.Global, *c, server.globalRib, server.policy) + server.setPolicyByConfig(peer.ID(), c.ApplyPolicy) + if peer.isRouteServerClient() { + pathList := make([]*table.Path, 0) + rfList := peer.configuredRFlist() + for _, p := range server.neighborMap { + if !p.isRouteServerClient() { + continue } + pathList = append(pathList, p.getAccepted(rfList)...) } + moded := server.RSimportPaths(peer, pathList) + if len(moded) > 0 { + server.globalRib.ProcessPaths(nil, moded) + } + } + server.neighborMap[addr] = peer + peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) + server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) + return nil, nil +} + +func (server *BgpServer) handleDelNeighbor(c *config.Neighbor) ([]*SenderMsg, error) { + addr := c.NeighborAddress + n, y := server.neighborMap[addr] + if !y { + return nil, fmt.Errorf("Can't delete a peer configuration for %s", addr) + } + for _, l := range server.Listeners(addr) { + SetTcpMD5SigSockopts(l, addr, "") + } + log.Info("Delete a peer configuration for ", addr) + go func(addr string) { + t := time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h.t for ", addr) }) + n.fsm.h.t.Kill(nil) + n.fsm.h.t.Wait() + t.Stop() + t = time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h for ", addr) }) + n.fsm.t.Kill(nil) + n.fsm.t.Wait() + t.Stop() + }(addr) + delete(server.neighborMap, addr) + m := server.dropPeerAllRoutes(n, n.configuredRFlist()) + return m, nil +} + +func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) ([]*SenderMsg, error) { + arg := grpcReq.Data.(*api.ModNeighborArguments) + switch arg.Operation { + case api.Operation_ADD: apitoConfig := func(a *api.Peer) (config.Neighbor, error) { var pconf config.Neighbor if a.Conf != nil { @@ -2416,51 +2425,16 @@ func (server *BgpServer) handleGrpcModNeighbor(grpcReq *GrpcRequest) (sMsgs []*S } return pconf, nil } - configneigh, err := apitoConfig(arg.Peer) + c, err := apitoConfig(arg.Peer) if err != nil { return nil, err } - peer := NewPeer(server.bgpConfig.Global, configneigh, server.globalRib, server.policy) - server.setPolicyByConfig(peer.ID(), configneigh.ApplyPolicy) - if peer.isRouteServerClient() { - pathList := make([]*table.Path, 0) - rfList := peer.configuredRFlist() - for _, p := range server.neighborMap { - if !p.isRouteServerClient() { - continue - } - pathList = append(pathList, p.getAccepted(rfList)...) - } - moded := server.RSimportPaths(peer, pathList) - if len(moded) > 0 { - server.globalRib.ProcessPaths(nil, moded) - } - } - server.neighborMap[addr] = peer - peer.startFSMHandler(server.fsmincomingCh, server.fsmStateCh) - server.broadcastPeerState(peer, bgp.BGP_FSM_IDLE) + return server.handleAddNeighbor(&c) case api.Operation_DEL: - for _, l := range server.Listeners(addr) { - SetTcpMD5SigSockopts(l, addr, "") - } - log.Info("Delete a peer configuration for ", addr) - go func(addr string) { - t := time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h.t for ", addr) }) - n.fsm.h.t.Kill(nil) - n.fsm.h.t.Wait() - t.Stop() - t = time.AfterFunc(time.Minute*5, func() { log.Fatal("failed to free the fsm.h for ", addr) }) - n.fsm.t.Kill(nil) - n.fsm.t.Wait() - t.Stop() - }(addr) - m := server.dropPeerAllRoutes(n, n.configuredRFlist()) - if len(m) > 0 { - sMsgs = append(sMsgs, m...) - } - delete(server.neighborMap, addr) + return server.handleDelNeighbor(&config.Neighbor{NeighborAddress: arg.Peer.Conf.NeighborAddress}) + default: + return nil, fmt.Errorf("unsupported operation %s", arg.Operation) } - return sMsgs, err } func (server *BgpServer) handleGrpcModDefinedSet(grpcReq *GrpcRequest) error { |