diff options
author | ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp> | 2017-01-17 06:47:18 +0000 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2016-11-06 02:36:22 +0900 |
commit | 8798014bd0dc80cfe972c81a510ab0c8c22bed9a (patch) | |
tree | a3e8c2697d0aade988389e000bfeb53d2bb1b35a /server/server.go | |
parent | 10b91fbb3fbb3fa990eb1f80234f44a21ec89f1b (diff) |
server: refactor mgmt operation
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
Diffstat (limited to 'server/server.go')
-rw-r--r-- | server/server.go | 1084 |
1 files changed, 353 insertions, 731 deletions
diff --git a/server/server.go b/server/server.go index 0cea3b4b..a3b25bb5 100644 --- a/server/server.go +++ b/server/server.go @@ -93,7 +93,7 @@ type BgpServer struct { fsmStateCh chan *FsmMsg acceptCh chan *net.TCPConn - mgmtCh chan func() + mgmtCh chan *mgmtOp policy *table.RoutingPolicy listeners []*TCPListener neighborMap map[string]*Peer @@ -112,7 +112,7 @@ func NewBgpServer() *BgpServer { neighborMap: make(map[string]*Peer), policy: table.NewRoutingPolicy(), roaManager: roaManager, - mgmtCh: make(chan func(), 1), + mgmtCh: make(chan *mgmtOp, 1), watcherMap: make(map[WatchEventType][]*Watcher), } s.bmpManager = newBmpClientManager(s) @@ -140,6 +140,33 @@ func (s *BgpServer) active() error { return nil } +type mgmtOp struct { + f func() error + errCh chan error + checkActive bool // check BGP global setting is configured before calling f() +} + +func (server *BgpServer) handleMGMTOp(op *mgmtOp) { + if op.checkActive { + if err := server.active(); err != nil { + op.errCh <- err + return + } + } + op.errCh <- op.f() +} + +func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error) { + ch := make(chan error) + defer func() { err = <-ch }() + s.mgmtCh <- &mgmtOp{ + f: f, + errCh: ch, + checkActive: checkActive, + } + return +} + func (server *BgpServer) Serve() { server.listeners = make([]*TCPListener, 0, 2) server.fsmincomingCh = channels.NewInfiniteChannel() @@ -217,8 +244,8 @@ func (server *BgpServer) Serve() { } select { - case f := <-server.mgmtCh: - f() + case op := <-server.mgmtCh: + server.handleMGMTOp(op) case conn := <-server.acceptCh: passConn(conn) default: @@ -235,8 +262,8 @@ func (server *BgpServer) Serve() { CONT: select { - case f := <-server.mgmtCh: - f() + case op := <-server.mgmtCh: + server.handleMGMTOp(op) case rmsg := <-server.roaManager.ReceiveROA(): server.roaManager.HandleROAEvent(rmsg) case conn := <-server.acceptCh: @@ -693,15 +720,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { select { case <-timer.C: - ch := make(chan struct{}) - defer func() { <-ch }() - server.mgmtCh <- func() { + server.mgmtOperation(func() error { log.WithFields(log.Fields{ "Topic": "Peer", "Key": peer.ID(), "Family": family, }).Debugf("LLGR restart timer (%d sec) for %s expired", t, family) - defer close(ch) peer.DropAll([]bgp.RouteFamily{family}) server.dropPeerAllRoutes(peer, []bgp.RouteFamily{family}) @@ -709,7 +733,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { if peer.llgrRestartTimerExpired(family) { peer.stopPeerRestarting() } - } + return nil + }, false) case <-endCh: log.WithFields(log.Fields{ "Topic": "Peer", @@ -739,13 +764,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { peer.fsm.peerInfo.LocalAddress = net.ParseIP(laddr) deferralExpiredFunc := func(family bgp.RouteFamily) func() { return func() { - ch := make(chan struct{}) - defer func() { <-ch }() - - server.mgmtCh <- func() { - defer close(ch) + server.mgmtOperation(func() error { server.softResetOut(peer.fsm.pConf.Config.NeighborAddress, family, true) - } + return nil + }, false) } } if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting { @@ -943,89 +965,53 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) { return } -func (s *BgpServer) StartCollector(c *config.CollectorConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - _, err = NewCollector(s, c.Url, c.DbName, c.TableDumpInterval) - } - return err +func (s *BgpServer) StartCollector(c *config.CollectorConfig) error { + return s.mgmtOperation(func() error { + _, err := NewCollector(s, c.Url, c.DbName, c.TableDumpInterval) + return err + }, false) } -func (s *BgpServer) StartZebraClient(c *config.ZebraConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - +func (s *BgpServer) StartZebraClient(c *config.ZebraConfig) error { + return s.mgmtOperation(func() error { if s.zclient != nil { - err = fmt.Errorf("already connected to Zebra") - } else { - protos := make([]string, 0, len(c.RedistributeRouteTypeList)) - for _, p := range c.RedistributeRouteTypeList { - protos = append(protos, string(p)) - } - s.zclient, err = newZebraClient(s, c.Url, protos, c.Version) + return fmt.Errorf("already connected to Zebra") } - } - return err -} - -func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return + protos := make([]string, 0, len(c.RedistributeRouteTypeList)) + for _, p := range c.RedistributeRouteTypeList { + protos = append(protos, string(p)) } + var err error + s.zclient, err = newZebraClient(s, c.Url, protos, c.Version) + return err + }, false) +} - err = s.bmpManager.addServer(c) - } - return err +func (s *BgpServer) AddBmp(c *config.BmpServerConfig) error { + return s.mgmtOperation(func() error { + return s.bmpManager.addServer(c) + }, true) } func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - - err = s.bmpManager.deleteServer(c) - } - return err + return s.mgmtOperation(func() error { + return s.bmpManager.deleteServer(c) + }, true) } func (s *BgpServer) Shutdown() { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { s.shutdown = true for _, p := range s.neighborMap { p.fsm.adminStateCh <- ADMIN_STATE_DOWN } // TODO: call fsmincomingCh.Close() - } + return nil + }, false) } -func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - +func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error { + return s.mgmtOperation(func() error { ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1) ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy for _, peer := range s.neighborMap { @@ -1035,9 +1021,8 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) (err error) { }).Info("call set policy") ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy } - err = s.policy.Reset(&policy, ap) - } - return err + return s.policy.Reset(&policy, ap) + }, false) } // EVPN MAC MOBILITY HANDLING @@ -1148,37 +1133,22 @@ func (server *BgpServer) fixupApiPath(vrfId string, pathList []*table.Path) erro } func (s *BgpServer) AddPath(vrfId string, pathList []*table.Path) (uuidBytes []byte, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - if err = s.active(); err != nil { - return + s.mgmtOperation(func() error { + if err := s.fixupApiPath(vrfId, pathList); err != nil { + return err } - - if err = s.fixupApiPath(vrfId, pathList); err == nil { - if len(pathList) == 1 { - uuidBytes = uuid.NewV4().Bytes() - pathList[0].SetUUID(uuidBytes) - } - s.propagateUpdate(nil, pathList) + if len(pathList) == 1 { + uuidBytes = uuid.NewV4().Bytes() + pathList[0].SetUUID(uuidBytes) } - } - return uuidBytes, err + s.propagateUpdate(nil, pathList) + return nil + }, true) + return } -func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pathList []*table.Path) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - +func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pathList []*table.Path) error { + return s.mgmtOperation(func() error { deletePathList := make([]*table.Path, 0) if len(uuid) > 0 { path := func() *table.Path { @@ -1192,7 +1162,7 @@ func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pat if path != nil { deletePathList = append(deletePathList, path.Clone(true)) } else { - err = fmt.Errorf("Can't find a specified path") + return fmt.Errorf("Can't find a specified path") } } else if len(pathList) == 0 { // delete all paths @@ -1204,39 +1174,28 @@ func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pat deletePathList = append(deletePathList, path.Clone(true)) } } else { - if err = s.fixupApiPath(vrfId, pathList); err != nil { - return + if err := s.fixupApiPath(vrfId, pathList); err != nil { + return err } deletePathList = pathList } s.propagateUpdate(nil, deletePathList) - } - return err + return nil + }, true) } -func (s *BgpServer) Start(c *config.Global) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - if err = s.active(); err == nil { - err = fmt.Errorf("gobgp is already started") - return - } - - if err = config.SetDefaultGlobalConfigValues(c); err != nil { - return +func (s *BgpServer) Start(c *config.Global) error { + return s.mgmtOperation(func() error { + if err := config.SetDefaultGlobalConfigValues(c); err != nil { + return err } if c.Config.Port > 0 { acceptCh := make(chan *net.TCPConn, 4096) for _, addr := range c.Config.LocalAddressList { - var l *TCPListener - l, err = NewTCPListener(addr, uint32(c.Config.Port), acceptCh) + l, err := NewTCPListener(addr, uint32(c.Config.Port), acceptCh) if err != nil { - return + return err } s.listeners = append(s.listeners, l) } @@ -1246,8 +1205,8 @@ func (s *BgpServer) Start(c *config.Global) (err error) { rfs, _ := config.AfiSafis(c.AfiSafis).ToRfList() s.globalRib = table.NewTableManager(rfs) - if err = s.policy.Reset(&config.RoutingPolicy{}, map[string]config.ApplyPolicy{}); err != nil { - return + if err := s.policy.Reset(&config.RoutingPolicy{}, map[string]config.ApplyPolicy{}); err != nil { + return err } s.bgpConfig.Global = *c // update route selection options @@ -1255,99 +1214,68 @@ func (s *BgpServer) Start(c *config.Global) (err error) { table.UseMultiplePaths = c.UseMultiplePaths.Config s.roaManager.SetAS(s.bgpConfig.Global.Config.As) - } - return nil + return nil + }, false) } func (s *BgpServer) GetVrf() (l []*table.Vrf) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err := s.active(); err != nil { - log.Errorf("get vrf failed: %v", err) - return - } + s.mgmtOperation(func() error { l = make([]*table.Vrf, 0, len(s.globalRib.Vrfs)) for _, vrf := range s.globalRib.Vrfs { l = append(l, vrf.Clone()) } - } + return nil + }, true) return l } -func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInterface, im, ex []bgp.ExtendedCommunityInterface) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - if err = s.active(); err != nil { - return - } - +func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInterface, im, ex []bgp.ExtendedCommunityInterface) error { + return s.mgmtOperation(func() error { pi := &table.PeerInfo{ AS: s.bgpConfig.Global.Config.As, LocalID: net.ParseIP(s.bgpConfig.Global.Config.RouterId).To4(), } if pathList, e := s.globalRib.AddVrf(name, id, rd, im, ex, pi); e != nil { - err = e + return e } else if len(pathList) > 0 { s.propagateUpdate(nil, pathList) } - } - return err + return nil + }, true) } -func (s *BgpServer) DeleteVrf(name string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - +func (s *BgpServer) DeleteVrf(name string) error { + return s.mgmtOperation(func() error { for _, n := range s.neighborMap { if n.fsm.pConf.Config.Vrf == name { - err = fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID()) - return + return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID()) } } - pathList, err := s.globalRib.DeleteVrf(name) - if err == nil && len(pathList) > 0 { + if err != nil { + return err + } + if len(pathList) > 0 { s.propagateUpdate(nil, pathList) } - } - return err + return nil + }, true) } -func (s *BgpServer) Stop() (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - +func (s *BgpServer) Stop() error { + return s.mgmtOperation(func() error { for k, _ := range s.neighborMap { - if err = s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{ + if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{ NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil { - return + return err } } for _, l := range s.listeners { l.Close() } s.bgpConfig.Global = config.Global{} - } - return nil + return nil + }, true) } func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error { @@ -1438,117 +1366,71 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b return nil } -func (s *BgpServer) SoftResetIn(addr string, family bgp.RouteFamily) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - log.WithFields(log.Fields{ - "Topic": "Operation", - "Key": addr, - }).Info("Neighbor soft reset in") - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - err = s.softResetIn(addr, family) - } - return err +func (s *BgpServer) SoftResetIn(addr string, family bgp.RouteFamily) error { + return s.mgmtOperation(func() error { + log.WithFields(log.Fields{ + "Topic": "Operation", + "Key": addr, + }).Info("Neighbor soft reset in") + return s.softResetIn(addr, family) + }, true) } -func (s *BgpServer) SoftResetOut(addr string, family bgp.RouteFamily) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - log.WithFields(log.Fields{ - "Topic": "Operation", - "Key": addr, - }).Info("Neighbor soft reset out") - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - err = s.softResetOut(addr, family, false) - } - return err +func (s *BgpServer) SoftResetOut(addr string, family bgp.RouteFamily) error { + return s.mgmtOperation(func() error { + log.WithFields(log.Fields{ + "Topic": "Operation", + "Key": addr, + }).Info("Neighbor soft reset out") + return s.softResetOut(addr, family, false) + }, true) } -func (s *BgpServer) SoftReset(addr string, family bgp.RouteFamily) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - log.WithFields(log.Fields{ - "Topic": "Operation", - "Key": addr, - }).Info("Neighbor soft reset") - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - err = s.softResetIn(addr, family) +func (s *BgpServer) SoftReset(addr string, family bgp.RouteFamily) error { + return s.mgmtOperation(func() error { + log.WithFields(log.Fields{ + "Topic": "Operation", + "Key": addr, + }).Info("Neighbor soft reset") + err := s.softResetIn(addr, family) if err != nil { - return + return err } - err = s.softResetOut(addr, family, false) - } - return err + return s.softResetOut(addr, family, false) + }, true) } func (s *BgpServer) GetRib(addr string, family bgp.RouteFamily, prefixes []*table.LookupPrefix) (rib *table.Table, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { m := s.globalRib id := table.GLOBAL_RIB_NAME if len(addr) > 0 { peer, ok := s.neighborMap[addr] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr) - return + return fmt.Errorf("Neighbor that has %v doesn't exist.", addr) } if !peer.isRouteServerClient() { - err = fmt.Errorf("Neighbor %v doesn't have local rib", addr) - return + return fmt.Errorf("Neighbor %v doesn't have local rib", addr) } id = peer.ID() } af := bgp.RouteFamily(family) tbl, ok := m.Tables[af] if !ok { - err = fmt.Errorf("address family: %s not supported", af) - return + return fmt.Errorf("address family: %s not supported", af) } rib, err = tbl.Select(table.TableSelectOption{ID: id, LookupPrefixes: prefixes}) - } + return nil + }, true) return } func (s *BgpServer) GetVrfRib(name string, family bgp.RouteFamily, prefixes []*table.LookupPrefix) (rib *table.Table, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { m := s.globalRib vrfs := m.Vrfs if _, ok := vrfs[name]; !ok { - err = fmt.Errorf("vrf %s not found", name) - return + return fmt.Errorf("vrf %s not found", name) } var af bgp.RouteFamily switch family { @@ -1561,28 +1443,19 @@ func (s *BgpServer) GetVrfRib(name string, family bgp.RouteFamily, prefixes []*t } tbl, ok := m.Tables[af] if !ok { - err = fmt.Errorf("address family: %s not supported", af) - return + return fmt.Errorf("address family: %s not supported", af) } rib, err = tbl.Select(table.TableSelectOption{VRF: vrfs[name], LookupPrefixes: prefixes}) - } + return err + }, true) return } func (s *BgpServer) GetAdjRib(addr string, family bgp.RouteFamily, in bool, prefixes []*table.LookupPrefix) (rib *table.Table, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { peer, ok := s.neighborMap[addr] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr) - return + return fmt.Errorf("Neighbor that has %v doesn't exist.", addr) } id := peer.TableID() @@ -1595,53 +1468,36 @@ func (s *BgpServer) GetAdjRib(addr string, family bgp.RouteFamily, in bool, pref adjRib.Update(accepted) } rib, err = adjRib.Select(family, false, table.TableSelectOption{ID: id, LookupPrefixes: prefixes}) - } + return err + }, true) return } func (s *BgpServer) GetRibInfo(addr string, family bgp.RouteFamily) (info *table.TableInfo, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { m := s.globalRib id := table.GLOBAL_RIB_NAME if len(addr) > 0 { peer, ok := s.neighborMap[addr] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr) - return + return fmt.Errorf("Neighbor that has %v doesn't exist.", addr) } if !peer.isRouteServerClient() { - err = fmt.Errorf("Neighbor %v doesn't have local rib", addr) - return + return fmt.Errorf("Neighbor %v doesn't have local rib", addr) } id = peer.ID() } info, err = m.TableInfo(id, family) - } + return err + }, true) return } func (s *BgpServer) GetAdjRibInfo(addr string, family bgp.RouteFamily, in bool) (info *table.TableInfo, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { peer, ok := s.neighborMap[addr] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr) - return + return fmt.Errorf("Neighbor that has %v doesn't exist.", addr) } var adjRib *table.AdjRib @@ -1653,35 +1509,28 @@ func (s *BgpServer) GetAdjRibInfo(addr string, family bgp.RouteFamily, in bool) adjRib.Update(accepted) } info, err = adjRib.TableInfo(family) - } + return err + }, true) return } func (s *BgpServer) GetServer() (c *config.Global) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { g := s.bgpConfig.Global c = &g - } + return nil + }, false) return c } func (s *BgpServer) GetNeighbor(getAdvertised bool) (l []*config.Neighbor) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { l = make([]*config.Neighbor, 0, len(s.neighborMap)) for _, peer := range s.neighborMap { l = append(l, peer.ToConfig(getAdvertised)) } - } + return nil + }, false) return l } @@ -1755,19 +1604,10 @@ func (server *BgpServer) addNeighbor(c *config.Neighbor) error { return nil } -func (s *BgpServer) AddNeighbor(c *config.Neighbor) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - if err = s.active(); err != nil { - return - } - err = s.addNeighbor(c) - } - return err +func (s *BgpServer) AddNeighbor(c *config.Neighbor) error { + return s.mgmtOperation(func() error { + return s.addNeighbor(c) + }, true) } func (server *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) error { @@ -1833,36 +1673,18 @@ func (server *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) return nil } -func (s *BgpServer) DeleteNeighbor(c *config.Neighbor) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - - err = s.deleteNeighbor(c, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED) - } - return err +func (s *BgpServer) DeleteNeighbor(c *config.Neighbor) error { + return s.mgmtOperation(func() error { + return s.deleteNeighbor(c, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED) + }, true) } func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - + s.mgmtOperation(func() error { addr := c.Config.NeighborAddress peer, ok := s.neighborMap[addr] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr) - return + return fmt.Errorf("Neighbor that has %v doesn't exist.", addr) } if !peer.fsm.pConf.ApplyPolicy.Equal(&c.ApplyPolicy) { @@ -1897,7 +1719,7 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err "Topic": "Peer", "Key": addr, }).Error(err) - return + return err } err = s.addNeighbor(c) if err != nil { @@ -1906,7 +1728,7 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err "Key": addr, }).Error(err) } - return + return err } if !original.Timers.Config.Equal(&c.Timers.Config) { @@ -1926,7 +1748,8 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err // rollback to original state peer.fsm.pConf = original } - } + return err + }, true) return policyUpdated, err } @@ -1960,247 +1783,146 @@ func (s *BgpServer) resetNeighbor(op, addr string, subcode uint8) error { return err } -func (s *BgpServer) ShutdownNeighbor(addr string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } - - err = s.resetNeighbor("Neighbor shutdown", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN) - } - return err +func (s *BgpServer) ShutdownNeighbor(addr string) error { + return s.mgmtOperation(func() error { + return s.resetNeighbor("Neighbor shutdown", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN) + }, true) } -func (s *BgpServer) ResetNeighbor(addr string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return +func (s *BgpServer) ResetNeighbor(addr string) error { + return s.mgmtOperation(func() error { + err := s.resetNeighbor("Neighbor reset", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) + if err != nil { + return err } - - err = s.resetNeighbor("Neighbor reset", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET) - if err == nil { - peers, _ := s.addrToPeers(addr) - for _, peer := range peers { - peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset - } - + peers, _ := s.addrToPeers(addr) + for _, peer := range peers { + peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset } - } - return err + return nil + }, true) } func (s *BgpServer) setAdminState(addr string, enable bool) error { peers, err := s.addrToPeers(addr) - if err == nil { - for _, peer := range peers { - f := func(state AdminState, message string) { - select { - case peer.fsm.adminStateCh <- state: - log.WithFields(log.Fields{ - "Topic": "Peer", - "Key": peer.fsm.pConf.Config.NeighborAddress, - }).Debug(message) - default: - log.Warning("previous request is still remaining. : ", peer.fsm.pConf.Config.NeighborAddress) - } - } - if enable { - f(ADMIN_STATE_UP, "ADMIN_STATE_UP requested") - } else { - f(ADMIN_STATE_DOWN, "ADMIN_STATE_DOWN requested") + if err != nil { + return err + } + for _, peer := range peers { + f := func(state AdminState, message string) { + select { + case peer.fsm.adminStateCh <- state: + log.WithFields(log.Fields{ + "Topic": "Peer", + "Key": peer.fsm.pConf.Config.NeighborAddress, + }).Debug(message) + default: + log.Warning("previous request is still remaining. : ", peer.fsm.pConf.Config.NeighborAddress) } } - } - return err -} - -func (s *BgpServer) EnableNeighbor(addr string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return + if enable { + f(ADMIN_STATE_UP, "ADMIN_STATE_UP requested") + } else { + f(ADMIN_STATE_DOWN, "ADMIN_STATE_DOWN requested") } - - err = s.setAdminState(addr, true) } - return err + return nil } -func (s *BgpServer) DisableNeighbor(addr string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - if err = s.active(); err != nil { - return - } +func (s *BgpServer) EnableNeighbor(addr string) error { + return s.mgmtOperation(func() error { + return s.setAdminState(addr, true) + }, true) +} - err = s.setAdminState(addr, false) - } - return err +func (s *BgpServer) DisableNeighbor(addr string) error { + return s.mgmtOperation(func() error { + return s.setAdminState(addr, false) + }, true) } func (s *BgpServer) GetDefinedSet(typ table.DefinedType) (sets *config.DefinedSets, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { sets, err = s.policy.GetDefinedSet(typ) - } + return nil + }, false) return sets, err } -func (s *BgpServer) AddDefinedSet(a table.DefinedSet) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.AddDefinedSet(a) - } - return err +func (s *BgpServer) AddDefinedSet(a table.DefinedSet) error { + return s.mgmtOperation(func() error { + return s.policy.AddDefinedSet(a) + }, false) } -func (s *BgpServer) DeleteDefinedSet(a table.DefinedSet, all bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.DeleteDefinedSet(a, all) - } - return err +func (s *BgpServer) DeleteDefinedSet(a table.DefinedSet, all bool) error { + return s.mgmtOperation(func() error { + return s.policy.DeleteDefinedSet(a, all) + }, false) } -func (s *BgpServer) ReplaceDefinedSet(a table.DefinedSet) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.ReplaceDefinedSet(a) - } - return err +func (s *BgpServer) ReplaceDefinedSet(a table.DefinedSet) error { + return s.mgmtOperation(func() error { + return s.policy.ReplaceDefinedSet(a) + }, false) } func (s *BgpServer) GetStatement() (l []*config.Statement) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { l = s.policy.GetStatement() - } + return nil + }, false) return l } -func (s *BgpServer) AddStatement(st *table.Statement) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.AddStatement(st) - } - return err +func (s *BgpServer) AddStatement(st *table.Statement) error { + return s.mgmtOperation(func() error { + return s.policy.AddStatement(st) + }, false) } -func (s *BgpServer) DeleteStatement(st *table.Statement, all bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.DeleteStatement(st, all) - } - return err +func (s *BgpServer) DeleteStatement(st *table.Statement, all bool) error { + return s.mgmtOperation(func() error { + return s.policy.DeleteStatement(st, all) + }, false) } -func (s *BgpServer) ReplaceStatement(st *table.Statement) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.ReplaceStatement(st) - } - return err +func (s *BgpServer) ReplaceStatement(st *table.Statement) error { + return s.mgmtOperation(func() error { + return s.policy.ReplaceStatement(st) + }, false) } func (s *BgpServer) GetPolicy() (l []*config.PolicyDefinition) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { l = s.policy.GetAllPolicy() - } + return nil + }, false) return l } -func (s *BgpServer) AddPolicy(x *table.Policy, refer bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.AddPolicy(x, refer) - } - return err +func (s *BgpServer) AddPolicy(x *table.Policy, refer bool) error { + return s.mgmtOperation(func() error { + return s.policy.AddPolicy(x, refer) + }, false) } -func (s *BgpServer) DeletePolicy(x *table.Policy, all, preserve bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - +func (s *BgpServer) DeletePolicy(x *table.Policy, all, preserve bool) error { + return s.mgmtOperation(func() error { l := make([]string, 0, len(s.neighborMap)+1) for _, peer := range s.neighborMap { l = append(l, peer.ID()) } l = append(l, table.GLOBAL_RIB_NAME) - err = s.policy.DeletePolicy(x, all, preserve, l) - - } - return err + return s.policy.DeletePolicy(x, all, preserve, l) + }, false) } -func (s *BgpServer) ReplacePolicy(x *table.Policy, refer, preserve bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.policy.ReplacePolicy(x, refer, preserve) - } - return err +func (s *BgpServer) ReplacePolicy(x *table.Policy, refer, preserve bool) error { + return s.mgmtOperation(func() error { + return s.policy.ReplacePolicy(x, refer, preserve) + }, false) } func (server *BgpServer) toPolicyInfo(name string, dir table.PolicyDirection) (string, error) { @@ -2223,105 +1945,63 @@ func (server *BgpServer) toPolicyInfo(name string, dir table.PolicyDirection) (s } func (s *BgpServer) GetPolicyAssignment(name string, dir table.PolicyDirection) (rt table.RouteType, l []*config.PolicyDefinition, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { var id string id, err = s.toPolicyInfo(name, dir) if err != nil { rt = table.ROUTE_TYPE_NONE - } else { - rt, l, err = s.policy.GetPolicyAssignment(id, dir) + return err } - } + rt, l, err = s.policy.GetPolicyAssignment(id, dir) + return nil + }, false) return rt, l, err } -func (s *BgpServer) AddPolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - var id string - id, err = s.toPolicyInfo(name, dir) +func (s *BgpServer) AddPolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) error { + return s.mgmtOperation(func() error { + id, err := s.toPolicyInfo(name, dir) if err != nil { - return + return err } - err = s.policy.AddPolicyAssignment(id, dir, policies, def) - } - return err + return s.policy.AddPolicyAssignment(id, dir, policies, def) + }, false) } -func (s *BgpServer) DeletePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, all bool) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - var id string - id, err = s.toPolicyInfo(name, dir) +func (s *BgpServer) DeletePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, all bool) error { + return s.mgmtOperation(func() error { + id, err := s.toPolicyInfo(name, dir) if err != nil { - return + return err } - err = s.policy.DeletePolicyAssignment(id, dir, policies, all) - } - return err + return s.policy.DeletePolicyAssignment(id, dir, policies, all) + }, false) } -func (s *BgpServer) ReplacePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - var id string - id, err = s.toPolicyInfo(name, dir) +func (s *BgpServer) ReplacePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) error { + return s.mgmtOperation(func() error { + id, err := s.toPolicyInfo(name, dir) if err != nil { - return + return err } - err = s.policy.ReplacePolicyAssignment(id, dir, policies, def) - } - return err + return s.policy.ReplacePolicyAssignment(id, dir, policies, def) + }, false) } -func (s *BgpServer) EnableMrt(c *config.MrtConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.mrtManager.enable(c) - } - return err +func (s *BgpServer) EnableMrt(c *config.MrtConfig) error { + return s.mgmtOperation(func() error { + return s.mrtManager.enable(c) + }, false) } -func (s *BgpServer) DisableMrt(c *config.MrtConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.mrtManager.disable(c) - } - return err +func (s *BgpServer) DisableMrt(c *config.MrtConfig) error { + return s.mgmtOperation(func() error { + return s.mrtManager.disable(c) + }, false) } -func (s *BgpServer) ValidateRib(prefix string) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - +func (s *BgpServer) ValidateRib(prefix string) error { + return s.mgmtOperation(func() error { for _, rf := range s.globalRib.GetRFlist() { if t, ok := s.globalRib.Tables[rf]; ok { dsts := t.GetDestinations() @@ -2336,104 +2016,60 @@ func (s *BgpServer) ValidateRib(prefix string) (err error) { } } } - } - return err + return nil + }, true) } func (s *BgpServer) GetRpki() (l []*config.RpkiServer, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { l = s.roaManager.GetServers() - } + return nil + }, false) return l, err } func (s *BgpServer) GetRoa(family bgp.RouteFamily) (l []*table.ROA, err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { l, err = s.roaManager.GetRoa(family) - } + return nil + }, false) return l, err } -func (s *BgpServer) AddRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.AddServer(net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), c.RecordLifetime) - } - return err +func (s *BgpServer) AddRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.AddServer(net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), c.RecordLifetime) + }, false) } -func (s *BgpServer) DeleteRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.DeleteServer(c.Address) - } - return err +func (s *BgpServer) DeleteRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.DeleteServer(c.Address) + }, false) } -func (s *BgpServer) EnableRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.Enable(c.Address) - } - return err +func (s *BgpServer) EnableRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.Enable(c.Address) + }, false) } -func (s *BgpServer) DisableRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.Disable(c.Address) - } - return err +func (s *BgpServer) DisableRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.Disable(c.Address) + }, false) } -func (s *BgpServer) ResetRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.Reset(c.Address) - } - return err +func (s *BgpServer) ResetRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.Reset(c.Address) + }, false) } -func (s *BgpServer) SoftResetRpki(c *config.RpkiServerConfig) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - - err = s.roaManager.SoftReset(c.Address) - } - return err +func (s *BgpServer) SoftResetRpki(c *config.RpkiServerConfig) error { + return s.mgmtOperation(func() error { + return s.roaManager.SoftReset(c.Address) + }, false) } type WatchEventType string @@ -2556,13 +2192,8 @@ func (w *Watcher) Event() <-chan WatchEvent { return w.realCh } -func (w *Watcher) Generate(t WatchEventType) (err error) { - ch := make(chan struct{}) - defer func() { <-ch }() - - w.s.mgmtCh <- func() { - defer close(ch) - +func (w *Watcher) Generate(t WatchEventType) error { + return w.s.mgmtOperation(func() error { switch t { case WATCH_EVENT_TYPE_PRE_UPDATE: pathList := make([]*table.Path, 0) @@ -2575,12 +2206,10 @@ func (w *Watcher) Generate(t WatchEventType) (err error) { if len(w.opts.tableName) > 0 { peer, ok := w.s.neighborMap[w.opts.tableName] if !ok { - err = fmt.Errorf("Neighbor that has %v doesn't exist.", w.opts.tableName) - break + return fmt.Errorf("Neighbor that has %v doesn't exist.", w.opts.tableName) } if !peer.isRouteServerClient() { - err = fmt.Errorf("Neighbor %v doesn't have local rib", w.opts.tableName) - return + return fmt.Errorf("Neighbor %v doesn't have local rib", w.opts.tableName) } id = peer.ID() } @@ -2602,11 +2231,10 @@ func (w *Watcher) Generate(t WatchEventType) (err error) { } w.notify(&WatchEventTable{PathList: pathList, Neighbor: l}) default: - err = fmt.Errorf("unsupported type %v", t) - return + return fmt.Errorf("unsupported type %v", t) } - } - return err + return nil + }, false) } func (w *Watcher) notify(v WatchEvent) { @@ -2627,10 +2255,7 @@ func (w *Watcher) loop() { } func (w *Watcher) Stop() { - ch := make(chan struct{}) - defer func() { <-ch }() - w.s.mgmtCh <- func() { - defer close(ch) + w.s.mgmtOperation(func() error { for k, l := range w.s.watcherMap { for i, v := range l { if w == v { @@ -2645,7 +2270,8 @@ func (w *Watcher) Stop() { // writing to realCh. make sure it finishes. for range w.realCh { } - } + return nil + }, false) } func (s *BgpServer) isWatched(typ WatchEventType) bool { @@ -2659,12 +2285,7 @@ func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) { } func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { - ch := make(chan struct{}) - defer func() { <-ch }() - - s.mgmtCh <- func() { - defer close(ch) - + s.mgmtOperation(func() error { w = &Watcher{ s: s, realCh: make(chan WatchEvent, 8), @@ -2742,6 +2363,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) { } go w.loop() - } + return nil + }, false) return w } |