summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>2017-01-17 06:47:18 +0000
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2016-11-06 02:36:22 +0900
commit8798014bd0dc80cfe972c81a510ab0c8c22bed9a (patch)
treea3e8c2697d0aade988389e000bfeb53d2bb1b35a
parent10b91fbb3fbb3fa990eb1f80234f44a21ec89f1b (diff)
server: refactor mgmt operation
Signed-off-by: ISHIDA Wataru <ishida.wataru@lab.ntt.co.jp>
-rw-r--r--server/server.go1084
1 files changed, 353 insertions, 731 deletions
diff --git a/server/server.go b/server/server.go
index 0cea3b4b..a3b25bb5 100644
--- a/server/server.go
+++ b/server/server.go
@@ -93,7 +93,7 @@ type BgpServer struct {
fsmStateCh chan *FsmMsg
acceptCh chan *net.TCPConn
- mgmtCh chan func()
+ mgmtCh chan *mgmtOp
policy *table.RoutingPolicy
listeners []*TCPListener
neighborMap map[string]*Peer
@@ -112,7 +112,7 @@ func NewBgpServer() *BgpServer {
neighborMap: make(map[string]*Peer),
policy: table.NewRoutingPolicy(),
roaManager: roaManager,
- mgmtCh: make(chan func(), 1),
+ mgmtCh: make(chan *mgmtOp, 1),
watcherMap: make(map[WatchEventType][]*Watcher),
}
s.bmpManager = newBmpClientManager(s)
@@ -140,6 +140,33 @@ func (s *BgpServer) active() error {
return nil
}
+type mgmtOp struct {
+ f func() error
+ errCh chan error
+ checkActive bool // check BGP global setting is configured before calling f()
+}
+
+func (server *BgpServer) handleMGMTOp(op *mgmtOp) {
+ if op.checkActive {
+ if err := server.active(); err != nil {
+ op.errCh <- err
+ return
+ }
+ }
+ op.errCh <- op.f()
+}
+
+func (s *BgpServer) mgmtOperation(f func() error, checkActive bool) (err error) {
+ ch := make(chan error)
+ defer func() { err = <-ch }()
+ s.mgmtCh <- &mgmtOp{
+ f: f,
+ errCh: ch,
+ checkActive: checkActive,
+ }
+ return
+}
+
func (server *BgpServer) Serve() {
server.listeners = make([]*TCPListener, 0, 2)
server.fsmincomingCh = channels.NewInfiniteChannel()
@@ -217,8 +244,8 @@ func (server *BgpServer) Serve() {
}
select {
- case f := <-server.mgmtCh:
- f()
+ case op := <-server.mgmtCh:
+ server.handleMGMTOp(op)
case conn := <-server.acceptCh:
passConn(conn)
default:
@@ -235,8 +262,8 @@ func (server *BgpServer) Serve() {
CONT:
select {
- case f := <-server.mgmtCh:
- f()
+ case op := <-server.mgmtCh:
+ server.handleMGMTOp(op)
case rmsg := <-server.roaManager.ReceiveROA():
server.roaManager.HandleROAEvent(rmsg)
case conn := <-server.acceptCh:
@@ -693,15 +720,12 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
select {
case <-timer.C:
- ch := make(chan struct{})
- defer func() { <-ch }()
- server.mgmtCh <- func() {
+ server.mgmtOperation(func() error {
log.WithFields(log.Fields{
"Topic": "Peer",
"Key": peer.ID(),
"Family": family,
}).Debugf("LLGR restart timer (%d sec) for %s expired", t, family)
- defer close(ch)
peer.DropAll([]bgp.RouteFamily{family})
server.dropPeerAllRoutes(peer, []bgp.RouteFamily{family})
@@ -709,7 +733,8 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
if peer.llgrRestartTimerExpired(family) {
peer.stopPeerRestarting()
}
- }
+ return nil
+ }, false)
case <-endCh:
log.WithFields(log.Fields{
"Topic": "Peer",
@@ -739,13 +764,10 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
peer.fsm.peerInfo.LocalAddress = net.ParseIP(laddr)
deferralExpiredFunc := func(family bgp.RouteFamily) func() {
return func() {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- server.mgmtCh <- func() {
- defer close(ch)
+ server.mgmtOperation(func() error {
server.softResetOut(peer.fsm.pConf.Config.NeighborAddress, family, true)
- }
+ return nil
+ }, false)
}
}
if !peer.fsm.pConf.GracefulRestart.State.LocalRestarting {
@@ -943,89 +965,53 @@ func (server *BgpServer) handleFSMMessage(peer *Peer, e *FsmMsg) {
return
}
-func (s *BgpServer) StartCollector(c *config.CollectorConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- _, err = NewCollector(s, c.Url, c.DbName, c.TableDumpInterval)
- }
- return err
+func (s *BgpServer) StartCollector(c *config.CollectorConfig) error {
+ return s.mgmtOperation(func() error {
+ _, err := NewCollector(s, c.Url, c.DbName, c.TableDumpInterval)
+ return err
+ }, false)
}
-func (s *BgpServer) StartZebraClient(c *config.ZebraConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+func (s *BgpServer) StartZebraClient(c *config.ZebraConfig) error {
+ return s.mgmtOperation(func() error {
if s.zclient != nil {
- err = fmt.Errorf("already connected to Zebra")
- } else {
- protos := make([]string, 0, len(c.RedistributeRouteTypeList))
- for _, p := range c.RedistributeRouteTypeList {
- protos = append(protos, string(p))
- }
- s.zclient, err = newZebraClient(s, c.Url, protos, c.Version)
+ return fmt.Errorf("already connected to Zebra")
}
- }
- return err
-}
-
-func (s *BgpServer) AddBmp(c *config.BmpServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
+ protos := make([]string, 0, len(c.RedistributeRouteTypeList))
+ for _, p := range c.RedistributeRouteTypeList {
+ protos = append(protos, string(p))
}
+ var err error
+ s.zclient, err = newZebraClient(s, c.Url, protos, c.Version)
+ return err
+ }, false)
+}
- err = s.bmpManager.addServer(c)
- }
- return err
+func (s *BgpServer) AddBmp(c *config.BmpServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.bmpManager.addServer(c)
+ }, true)
}
func (s *BgpServer) DeleteBmp(c *config.BmpServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
- err = s.bmpManager.deleteServer(c)
- }
- return err
+ return s.mgmtOperation(func() error {
+ return s.bmpManager.deleteServer(c)
+ }, true)
}
func (s *BgpServer) Shutdown() {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
s.shutdown = true
for _, p := range s.neighborMap {
p.fsm.adminStateCh <- ADMIN_STATE_DOWN
}
// TODO: call fsmincomingCh.Close()
- }
+ return nil
+ }, false)
}
-func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) error {
+ return s.mgmtOperation(func() error {
ap := make(map[string]config.ApplyPolicy, len(s.neighborMap)+1)
ap[table.GLOBAL_RIB_NAME] = s.bgpConfig.Global.ApplyPolicy
for _, peer := range s.neighborMap {
@@ -1035,9 +1021,8 @@ func (s *BgpServer) UpdatePolicy(policy config.RoutingPolicy) (err error) {
}).Info("call set policy")
ap[peer.ID()] = peer.fsm.pConf.ApplyPolicy
}
- err = s.policy.Reset(&policy, ap)
- }
- return err
+ return s.policy.Reset(&policy, ap)
+ }, false)
}
// EVPN MAC MOBILITY HANDLING
@@ -1148,37 +1133,22 @@ func (server *BgpServer) fixupApiPath(vrfId string, pathList []*table.Path) erro
}
func (s *BgpServer) AddPath(vrfId string, pathList []*table.Path) (uuidBytes []byte, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- if err = s.active(); err != nil {
- return
+ s.mgmtOperation(func() error {
+ if err := s.fixupApiPath(vrfId, pathList); err != nil {
+ return err
}
-
- if err = s.fixupApiPath(vrfId, pathList); err == nil {
- if len(pathList) == 1 {
- uuidBytes = uuid.NewV4().Bytes()
- pathList[0].SetUUID(uuidBytes)
- }
- s.propagateUpdate(nil, pathList)
+ if len(pathList) == 1 {
+ uuidBytes = uuid.NewV4().Bytes()
+ pathList[0].SetUUID(uuidBytes)
}
- }
- return uuidBytes, err
+ s.propagateUpdate(nil, pathList)
+ return nil
+ }, true)
+ return
}
-func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pathList []*table.Path) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pathList []*table.Path) error {
+ return s.mgmtOperation(func() error {
deletePathList := make([]*table.Path, 0)
if len(uuid) > 0 {
path := func() *table.Path {
@@ -1192,7 +1162,7 @@ func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pat
if path != nil {
deletePathList = append(deletePathList, path.Clone(true))
} else {
- err = fmt.Errorf("Can't find a specified path")
+ return fmt.Errorf("Can't find a specified path")
}
} else if len(pathList) == 0 {
// delete all paths
@@ -1204,39 +1174,28 @@ func (s *BgpServer) DeletePath(uuid []byte, f bgp.RouteFamily, vrfId string, pat
deletePathList = append(deletePathList, path.Clone(true))
}
} else {
- if err = s.fixupApiPath(vrfId, pathList); err != nil {
- return
+ if err := s.fixupApiPath(vrfId, pathList); err != nil {
+ return err
}
deletePathList = pathList
}
s.propagateUpdate(nil, deletePathList)
- }
- return err
+ return nil
+ }, true)
}
-func (s *BgpServer) Start(c *config.Global) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- if err = s.active(); err == nil {
- err = fmt.Errorf("gobgp is already started")
- return
- }
-
- if err = config.SetDefaultGlobalConfigValues(c); err != nil {
- return
+func (s *BgpServer) Start(c *config.Global) error {
+ return s.mgmtOperation(func() error {
+ if err := config.SetDefaultGlobalConfigValues(c); err != nil {
+ return err
}
if c.Config.Port > 0 {
acceptCh := make(chan *net.TCPConn, 4096)
for _, addr := range c.Config.LocalAddressList {
- var l *TCPListener
- l, err = NewTCPListener(addr, uint32(c.Config.Port), acceptCh)
+ l, err := NewTCPListener(addr, uint32(c.Config.Port), acceptCh)
if err != nil {
- return
+ return err
}
s.listeners = append(s.listeners, l)
}
@@ -1246,8 +1205,8 @@ func (s *BgpServer) Start(c *config.Global) (err error) {
rfs, _ := config.AfiSafis(c.AfiSafis).ToRfList()
s.globalRib = table.NewTableManager(rfs)
- if err = s.policy.Reset(&config.RoutingPolicy{}, map[string]config.ApplyPolicy{}); err != nil {
- return
+ if err := s.policy.Reset(&config.RoutingPolicy{}, map[string]config.ApplyPolicy{}); err != nil {
+ return err
}
s.bgpConfig.Global = *c
// update route selection options
@@ -1255,99 +1214,68 @@ func (s *BgpServer) Start(c *config.Global) (err error) {
table.UseMultiplePaths = c.UseMultiplePaths.Config
s.roaManager.SetAS(s.bgpConfig.Global.Config.As)
- }
- return nil
+ return nil
+ }, false)
}
func (s *BgpServer) GetVrf() (l []*table.Vrf) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err := s.active(); err != nil {
- log.Errorf("get vrf failed: %v", err)
- return
- }
+ s.mgmtOperation(func() error {
l = make([]*table.Vrf, 0, len(s.globalRib.Vrfs))
for _, vrf := range s.globalRib.Vrfs {
l = append(l, vrf.Clone())
}
- }
+ return nil
+ }, true)
return l
}
-func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInterface, im, ex []bgp.ExtendedCommunityInterface) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- if err = s.active(); err != nil {
- return
- }
-
+func (s *BgpServer) AddVrf(name string, id uint32, rd bgp.RouteDistinguisherInterface, im, ex []bgp.ExtendedCommunityInterface) error {
+ return s.mgmtOperation(func() error {
pi := &table.PeerInfo{
AS: s.bgpConfig.Global.Config.As,
LocalID: net.ParseIP(s.bgpConfig.Global.Config.RouterId).To4(),
}
if pathList, e := s.globalRib.AddVrf(name, id, rd, im, ex, pi); e != nil {
- err = e
+ return e
} else if len(pathList) > 0 {
s.propagateUpdate(nil, pathList)
}
- }
- return err
+ return nil
+ }, true)
}
-func (s *BgpServer) DeleteVrf(name string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+func (s *BgpServer) DeleteVrf(name string) error {
+ return s.mgmtOperation(func() error {
for _, n := range s.neighborMap {
if n.fsm.pConf.Config.Vrf == name {
- err = fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID())
- return
+ return fmt.Errorf("failed to delete VRF %s: neighbor %s is in use", name, n.ID())
}
}
-
pathList, err := s.globalRib.DeleteVrf(name)
- if err == nil && len(pathList) > 0 {
+ if err != nil {
+ return err
+ }
+ if len(pathList) > 0 {
s.propagateUpdate(nil, pathList)
}
- }
- return err
+ return nil
+ }, true)
}
-func (s *BgpServer) Stop() (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+func (s *BgpServer) Stop() error {
+ return s.mgmtOperation(func() error {
for k, _ := range s.neighborMap {
- if err = s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{
+ if err := s.deleteNeighbor(&config.Neighbor{Config: config.NeighborConfig{
NeighborAddress: k}}, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED); err != nil {
- return
+ return err
}
}
for _, l := range s.listeners {
l.Close()
}
s.bgpConfig.Global = config.Global{}
- }
- return nil
+ return nil
+ }, true)
}
func (s *BgpServer) softResetIn(addr string, family bgp.RouteFamily) error {
@@ -1438,117 +1366,71 @@ func (s *BgpServer) softResetOut(addr string, family bgp.RouteFamily, deferral b
return nil
}
-func (s *BgpServer) SoftResetIn(addr string, family bgp.RouteFamily) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- log.WithFields(log.Fields{
- "Topic": "Operation",
- "Key": addr,
- }).Info("Neighbor soft reset in")
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
- err = s.softResetIn(addr, family)
- }
- return err
+func (s *BgpServer) SoftResetIn(addr string, family bgp.RouteFamily) error {
+ return s.mgmtOperation(func() error {
+ log.WithFields(log.Fields{
+ "Topic": "Operation",
+ "Key": addr,
+ }).Info("Neighbor soft reset in")
+ return s.softResetIn(addr, family)
+ }, true)
}
-func (s *BgpServer) SoftResetOut(addr string, family bgp.RouteFamily) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- log.WithFields(log.Fields{
- "Topic": "Operation",
- "Key": addr,
- }).Info("Neighbor soft reset out")
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
- err = s.softResetOut(addr, family, false)
- }
- return err
+func (s *BgpServer) SoftResetOut(addr string, family bgp.RouteFamily) error {
+ return s.mgmtOperation(func() error {
+ log.WithFields(log.Fields{
+ "Topic": "Operation",
+ "Key": addr,
+ }).Info("Neighbor soft reset out")
+ return s.softResetOut(addr, family, false)
+ }, true)
}
-func (s *BgpServer) SoftReset(addr string, family bgp.RouteFamily) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- log.WithFields(log.Fields{
- "Topic": "Operation",
- "Key": addr,
- }).Info("Neighbor soft reset")
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
- err = s.softResetIn(addr, family)
+func (s *BgpServer) SoftReset(addr string, family bgp.RouteFamily) error {
+ return s.mgmtOperation(func() error {
+ log.WithFields(log.Fields{
+ "Topic": "Operation",
+ "Key": addr,
+ }).Info("Neighbor soft reset")
+ err := s.softResetIn(addr, family)
if err != nil {
- return
+ return err
}
- err = s.softResetOut(addr, family, false)
- }
- return err
+ return s.softResetOut(addr, family, false)
+ }, true)
}
func (s *BgpServer) GetRib(addr string, family bgp.RouteFamily, prefixes []*table.LookupPrefix) (rib *table.Table, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
m := s.globalRib
id := table.GLOBAL_RIB_NAME
if len(addr) > 0 {
peer, ok := s.neighborMap[addr]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
- return
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
}
if !peer.isRouteServerClient() {
- err = fmt.Errorf("Neighbor %v doesn't have local rib", addr)
- return
+ return fmt.Errorf("Neighbor %v doesn't have local rib", addr)
}
id = peer.ID()
}
af := bgp.RouteFamily(family)
tbl, ok := m.Tables[af]
if !ok {
- err = fmt.Errorf("address family: %s not supported", af)
- return
+ return fmt.Errorf("address family: %s not supported", af)
}
rib, err = tbl.Select(table.TableSelectOption{ID: id, LookupPrefixes: prefixes})
- }
+ return nil
+ }, true)
return
}
func (s *BgpServer) GetVrfRib(name string, family bgp.RouteFamily, prefixes []*table.LookupPrefix) (rib *table.Table, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
m := s.globalRib
vrfs := m.Vrfs
if _, ok := vrfs[name]; !ok {
- err = fmt.Errorf("vrf %s not found", name)
- return
+ return fmt.Errorf("vrf %s not found", name)
}
var af bgp.RouteFamily
switch family {
@@ -1561,28 +1443,19 @@ func (s *BgpServer) GetVrfRib(name string, family bgp.RouteFamily, prefixes []*t
}
tbl, ok := m.Tables[af]
if !ok {
- err = fmt.Errorf("address family: %s not supported", af)
- return
+ return fmt.Errorf("address family: %s not supported", af)
}
rib, err = tbl.Select(table.TableSelectOption{VRF: vrfs[name], LookupPrefixes: prefixes})
- }
+ return err
+ }, true)
return
}
func (s *BgpServer) GetAdjRib(addr string, family bgp.RouteFamily, in bool, prefixes []*table.LookupPrefix) (rib *table.Table, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
peer, ok := s.neighborMap[addr]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
- return
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
}
id := peer.TableID()
@@ -1595,53 +1468,36 @@ func (s *BgpServer) GetAdjRib(addr string, family bgp.RouteFamily, in bool, pref
adjRib.Update(accepted)
}
rib, err = adjRib.Select(family, false, table.TableSelectOption{ID: id, LookupPrefixes: prefixes})
- }
+ return err
+ }, true)
return
}
func (s *BgpServer) GetRibInfo(addr string, family bgp.RouteFamily) (info *table.TableInfo, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
m := s.globalRib
id := table.GLOBAL_RIB_NAME
if len(addr) > 0 {
peer, ok := s.neighborMap[addr]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
- return
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
}
if !peer.isRouteServerClient() {
- err = fmt.Errorf("Neighbor %v doesn't have local rib", addr)
- return
+ return fmt.Errorf("Neighbor %v doesn't have local rib", addr)
}
id = peer.ID()
}
info, err = m.TableInfo(id, family)
- }
+ return err
+ }, true)
return
}
func (s *BgpServer) GetAdjRibInfo(addr string, family bgp.RouteFamily, in bool) (info *table.TableInfo, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
peer, ok := s.neighborMap[addr]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
- return
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
}
var adjRib *table.AdjRib
@@ -1653,35 +1509,28 @@ func (s *BgpServer) GetAdjRibInfo(addr string, family bgp.RouteFamily, in bool)
adjRib.Update(accepted)
}
info, err = adjRib.TableInfo(family)
- }
+ return err
+ }, true)
return
}
func (s *BgpServer) GetServer() (c *config.Global) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
g := s.bgpConfig.Global
c = &g
- }
+ return nil
+ }, false)
return c
}
func (s *BgpServer) GetNeighbor(getAdvertised bool) (l []*config.Neighbor) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
l = make([]*config.Neighbor, 0, len(s.neighborMap))
for _, peer := range s.neighborMap {
l = append(l, peer.ToConfig(getAdvertised))
}
- }
+ return nil
+ }, false)
return l
}
@@ -1755,19 +1604,10 @@ func (server *BgpServer) addNeighbor(c *config.Neighbor) error {
return nil
}
-func (s *BgpServer) AddNeighbor(c *config.Neighbor) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- if err = s.active(); err != nil {
- return
- }
- err = s.addNeighbor(c)
- }
- return err
+func (s *BgpServer) AddNeighbor(c *config.Neighbor) error {
+ return s.mgmtOperation(func() error {
+ return s.addNeighbor(c)
+ }, true)
}
func (server *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8) error {
@@ -1833,36 +1673,18 @@ func (server *BgpServer) deleteNeighbor(c *config.Neighbor, code, subcode uint8)
return nil
}
-func (s *BgpServer) DeleteNeighbor(c *config.Neighbor) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
- err = s.deleteNeighbor(c, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED)
- }
- return err
+func (s *BgpServer) DeleteNeighbor(c *config.Neighbor) error {
+ return s.mgmtOperation(func() error {
+ return s.deleteNeighbor(c, bgp.BGP_ERROR_CEASE, bgp.BGP_ERROR_SUB_PEER_DECONFIGURED)
+ }, true)
}
func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
+ s.mgmtOperation(func() error {
addr := c.Config.NeighborAddress
peer, ok := s.neighborMap[addr]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
- return
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", addr)
}
if !peer.fsm.pConf.ApplyPolicy.Equal(&c.ApplyPolicy) {
@@ -1897,7 +1719,7 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err
"Topic": "Peer",
"Key": addr,
}).Error(err)
- return
+ return err
}
err = s.addNeighbor(c)
if err != nil {
@@ -1906,7 +1728,7 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err
"Key": addr,
}).Error(err)
}
- return
+ return err
}
if !original.Timers.Config.Equal(&c.Timers.Config) {
@@ -1926,7 +1748,8 @@ func (s *BgpServer) UpdateNeighbor(c *config.Neighbor) (policyUpdated bool, err
// rollback to original state
peer.fsm.pConf = original
}
- }
+ return err
+ }, true)
return policyUpdated, err
}
@@ -1960,247 +1783,146 @@ func (s *BgpServer) resetNeighbor(op, addr string, subcode uint8) error {
return err
}
-func (s *BgpServer) ShutdownNeighbor(addr string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
-
- err = s.resetNeighbor("Neighbor shutdown", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN)
- }
- return err
+func (s *BgpServer) ShutdownNeighbor(addr string) error {
+ return s.mgmtOperation(func() error {
+ return s.resetNeighbor("Neighbor shutdown", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_SHUTDOWN)
+ }, true)
}
-func (s *BgpServer) ResetNeighbor(addr string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
+func (s *BgpServer) ResetNeighbor(addr string) error {
+ return s.mgmtOperation(func() error {
+ err := s.resetNeighbor("Neighbor reset", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET)
+ if err != nil {
+ return err
}
-
- err = s.resetNeighbor("Neighbor reset", addr, bgp.BGP_ERROR_SUB_ADMINISTRATIVE_RESET)
- if err == nil {
- peers, _ := s.addrToPeers(addr)
- for _, peer := range peers {
- peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset
- }
-
+ peers, _ := s.addrToPeers(addr)
+ for _, peer := range peers {
+ peer.fsm.idleHoldTime = peer.fsm.pConf.Timers.Config.IdleHoldTimeAfterReset
}
- }
- return err
+ return nil
+ }, true)
}
func (s *BgpServer) setAdminState(addr string, enable bool) error {
peers, err := s.addrToPeers(addr)
- if err == nil {
- for _, peer := range peers {
- f := func(state AdminState, message string) {
- select {
- case peer.fsm.adminStateCh <- state:
- log.WithFields(log.Fields{
- "Topic": "Peer",
- "Key": peer.fsm.pConf.Config.NeighborAddress,
- }).Debug(message)
- default:
- log.Warning("previous request is still remaining. : ", peer.fsm.pConf.Config.NeighborAddress)
- }
- }
- if enable {
- f(ADMIN_STATE_UP, "ADMIN_STATE_UP requested")
- } else {
- f(ADMIN_STATE_DOWN, "ADMIN_STATE_DOWN requested")
+ if err != nil {
+ return err
+ }
+ for _, peer := range peers {
+ f := func(state AdminState, message string) {
+ select {
+ case peer.fsm.adminStateCh <- state:
+ log.WithFields(log.Fields{
+ "Topic": "Peer",
+ "Key": peer.fsm.pConf.Config.NeighborAddress,
+ }).Debug(message)
+ default:
+ log.Warning("previous request is still remaining. : ", peer.fsm.pConf.Config.NeighborAddress)
}
}
- }
- return err
-}
-
-func (s *BgpServer) EnableNeighbor(addr string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
+ if enable {
+ f(ADMIN_STATE_UP, "ADMIN_STATE_UP requested")
+ } else {
+ f(ADMIN_STATE_DOWN, "ADMIN_STATE_DOWN requested")
}
-
- err = s.setAdminState(addr, true)
}
- return err
+ return nil
}
-func (s *BgpServer) DisableNeighbor(addr string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
- if err = s.active(); err != nil {
- return
- }
+func (s *BgpServer) EnableNeighbor(addr string) error {
+ return s.mgmtOperation(func() error {
+ return s.setAdminState(addr, true)
+ }, true)
+}
- err = s.setAdminState(addr, false)
- }
- return err
+func (s *BgpServer) DisableNeighbor(addr string) error {
+ return s.mgmtOperation(func() error {
+ return s.setAdminState(addr, false)
+ }, true)
}
func (s *BgpServer) GetDefinedSet(typ table.DefinedType) (sets *config.DefinedSets, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
sets, err = s.policy.GetDefinedSet(typ)
- }
+ return nil
+ }, false)
return sets, err
}
-func (s *BgpServer) AddDefinedSet(a table.DefinedSet) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.AddDefinedSet(a)
- }
- return err
+func (s *BgpServer) AddDefinedSet(a table.DefinedSet) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.AddDefinedSet(a)
+ }, false)
}
-func (s *BgpServer) DeleteDefinedSet(a table.DefinedSet, all bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.DeleteDefinedSet(a, all)
- }
- return err
+func (s *BgpServer) DeleteDefinedSet(a table.DefinedSet, all bool) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.DeleteDefinedSet(a, all)
+ }, false)
}
-func (s *BgpServer) ReplaceDefinedSet(a table.DefinedSet) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.ReplaceDefinedSet(a)
- }
- return err
+func (s *BgpServer) ReplaceDefinedSet(a table.DefinedSet) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.ReplaceDefinedSet(a)
+ }, false)
}
func (s *BgpServer) GetStatement() (l []*config.Statement) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
l = s.policy.GetStatement()
- }
+ return nil
+ }, false)
return l
}
-func (s *BgpServer) AddStatement(st *table.Statement) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.AddStatement(st)
- }
- return err
+func (s *BgpServer) AddStatement(st *table.Statement) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.AddStatement(st)
+ }, false)
}
-func (s *BgpServer) DeleteStatement(st *table.Statement, all bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.DeleteStatement(st, all)
- }
- return err
+func (s *BgpServer) DeleteStatement(st *table.Statement, all bool) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.DeleteStatement(st, all)
+ }, false)
}
-func (s *BgpServer) ReplaceStatement(st *table.Statement) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.ReplaceStatement(st)
- }
- return err
+func (s *BgpServer) ReplaceStatement(st *table.Statement) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.ReplaceStatement(st)
+ }, false)
}
func (s *BgpServer) GetPolicy() (l []*config.PolicyDefinition) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
l = s.policy.GetAllPolicy()
- }
+ return nil
+ }, false)
return l
}
-func (s *BgpServer) AddPolicy(x *table.Policy, refer bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.AddPolicy(x, refer)
- }
- return err
+func (s *BgpServer) AddPolicy(x *table.Policy, refer bool) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.AddPolicy(x, refer)
+ }, false)
}
-func (s *BgpServer) DeletePolicy(x *table.Policy, all, preserve bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+func (s *BgpServer) DeletePolicy(x *table.Policy, all, preserve bool) error {
+ return s.mgmtOperation(func() error {
l := make([]string, 0, len(s.neighborMap)+1)
for _, peer := range s.neighborMap {
l = append(l, peer.ID())
}
l = append(l, table.GLOBAL_RIB_NAME)
- err = s.policy.DeletePolicy(x, all, preserve, l)
-
- }
- return err
+ return s.policy.DeletePolicy(x, all, preserve, l)
+ }, false)
}
-func (s *BgpServer) ReplacePolicy(x *table.Policy, refer, preserve bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.policy.ReplacePolicy(x, refer, preserve)
- }
- return err
+func (s *BgpServer) ReplacePolicy(x *table.Policy, refer, preserve bool) error {
+ return s.mgmtOperation(func() error {
+ return s.policy.ReplacePolicy(x, refer, preserve)
+ }, false)
}
func (server *BgpServer) toPolicyInfo(name string, dir table.PolicyDirection) (string, error) {
@@ -2223,105 +1945,63 @@ func (server *BgpServer) toPolicyInfo(name string, dir table.PolicyDirection) (s
}
func (s *BgpServer) GetPolicyAssignment(name string, dir table.PolicyDirection) (rt table.RouteType, l []*config.PolicyDefinition, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
var id string
id, err = s.toPolicyInfo(name, dir)
if err != nil {
rt = table.ROUTE_TYPE_NONE
- } else {
- rt, l, err = s.policy.GetPolicyAssignment(id, dir)
+ return err
}
- }
+ rt, l, err = s.policy.GetPolicyAssignment(id, dir)
+ return nil
+ }, false)
return rt, l, err
}
-func (s *BgpServer) AddPolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- var id string
- id, err = s.toPolicyInfo(name, dir)
+func (s *BgpServer) AddPolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) error {
+ return s.mgmtOperation(func() error {
+ id, err := s.toPolicyInfo(name, dir)
if err != nil {
- return
+ return err
}
- err = s.policy.AddPolicyAssignment(id, dir, policies, def)
- }
- return err
+ return s.policy.AddPolicyAssignment(id, dir, policies, def)
+ }, false)
}
-func (s *BgpServer) DeletePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, all bool) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- var id string
- id, err = s.toPolicyInfo(name, dir)
+func (s *BgpServer) DeletePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, all bool) error {
+ return s.mgmtOperation(func() error {
+ id, err := s.toPolicyInfo(name, dir)
if err != nil {
- return
+ return err
}
- err = s.policy.DeletePolicyAssignment(id, dir, policies, all)
- }
- return err
+ return s.policy.DeletePolicyAssignment(id, dir, policies, all)
+ }, false)
}
-func (s *BgpServer) ReplacePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- var id string
- id, err = s.toPolicyInfo(name, dir)
+func (s *BgpServer) ReplacePolicyAssignment(name string, dir table.PolicyDirection, policies []*config.PolicyDefinition, def table.RouteType) error {
+ return s.mgmtOperation(func() error {
+ id, err := s.toPolicyInfo(name, dir)
if err != nil {
- return
+ return err
}
- err = s.policy.ReplacePolicyAssignment(id, dir, policies, def)
- }
- return err
+ return s.policy.ReplacePolicyAssignment(id, dir, policies, def)
+ }, false)
}
-func (s *BgpServer) EnableMrt(c *config.MrtConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.mrtManager.enable(c)
- }
- return err
+func (s *BgpServer) EnableMrt(c *config.MrtConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.mrtManager.enable(c)
+ }, false)
}
-func (s *BgpServer) DisableMrt(c *config.MrtConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.mrtManager.disable(c)
- }
- return err
+func (s *BgpServer) DisableMrt(c *config.MrtConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.mrtManager.disable(c)
+ }, false)
}
-func (s *BgpServer) ValidateRib(prefix string) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+func (s *BgpServer) ValidateRib(prefix string) error {
+ return s.mgmtOperation(func() error {
for _, rf := range s.globalRib.GetRFlist() {
if t, ok := s.globalRib.Tables[rf]; ok {
dsts := t.GetDestinations()
@@ -2336,104 +2016,60 @@ func (s *BgpServer) ValidateRib(prefix string) (err error) {
}
}
}
- }
- return err
+ return nil
+ }, true)
}
func (s *BgpServer) GetRpki() (l []*config.RpkiServer, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
l = s.roaManager.GetServers()
- }
+ return nil
+ }, false)
return l, err
}
func (s *BgpServer) GetRoa(family bgp.RouteFamily) (l []*table.ROA, err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
l, err = s.roaManager.GetRoa(family)
- }
+ return nil
+ }, false)
return l, err
}
-func (s *BgpServer) AddRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.AddServer(net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), c.RecordLifetime)
- }
- return err
+func (s *BgpServer) AddRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.AddServer(net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), c.RecordLifetime)
+ }, false)
}
-func (s *BgpServer) DeleteRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.DeleteServer(c.Address)
- }
- return err
+func (s *BgpServer) DeleteRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.DeleteServer(c.Address)
+ }, false)
}
-func (s *BgpServer) EnableRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.Enable(c.Address)
- }
- return err
+func (s *BgpServer) EnableRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.Enable(c.Address)
+ }, false)
}
-func (s *BgpServer) DisableRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.Disable(c.Address)
- }
- return err
+func (s *BgpServer) DisableRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.Disable(c.Address)
+ }, false)
}
-func (s *BgpServer) ResetRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.Reset(c.Address)
- }
- return err
+func (s *BgpServer) ResetRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.Reset(c.Address)
+ }, false)
}
-func (s *BgpServer) SoftResetRpki(c *config.RpkiServerConfig) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
- err = s.roaManager.SoftReset(c.Address)
- }
- return err
+func (s *BgpServer) SoftResetRpki(c *config.RpkiServerConfig) error {
+ return s.mgmtOperation(func() error {
+ return s.roaManager.SoftReset(c.Address)
+ }, false)
}
type WatchEventType string
@@ -2556,13 +2192,8 @@ func (w *Watcher) Event() <-chan WatchEvent {
return w.realCh
}
-func (w *Watcher) Generate(t WatchEventType) (err error) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- w.s.mgmtCh <- func() {
- defer close(ch)
-
+func (w *Watcher) Generate(t WatchEventType) error {
+ return w.s.mgmtOperation(func() error {
switch t {
case WATCH_EVENT_TYPE_PRE_UPDATE:
pathList := make([]*table.Path, 0)
@@ -2575,12 +2206,10 @@ func (w *Watcher) Generate(t WatchEventType) (err error) {
if len(w.opts.tableName) > 0 {
peer, ok := w.s.neighborMap[w.opts.tableName]
if !ok {
- err = fmt.Errorf("Neighbor that has %v doesn't exist.", w.opts.tableName)
- break
+ return fmt.Errorf("Neighbor that has %v doesn't exist.", w.opts.tableName)
}
if !peer.isRouteServerClient() {
- err = fmt.Errorf("Neighbor %v doesn't have local rib", w.opts.tableName)
- return
+ return fmt.Errorf("Neighbor %v doesn't have local rib", w.opts.tableName)
}
id = peer.ID()
}
@@ -2602,11 +2231,10 @@ func (w *Watcher) Generate(t WatchEventType) (err error) {
}
w.notify(&WatchEventTable{PathList: pathList, Neighbor: l})
default:
- err = fmt.Errorf("unsupported type %v", t)
- return
+ return fmt.Errorf("unsupported type %v", t)
}
- }
- return err
+ return nil
+ }, false)
}
func (w *Watcher) notify(v WatchEvent) {
@@ -2627,10 +2255,7 @@ func (w *Watcher) loop() {
}
func (w *Watcher) Stop() {
- ch := make(chan struct{})
- defer func() { <-ch }()
- w.s.mgmtCh <- func() {
- defer close(ch)
+ w.s.mgmtOperation(func() error {
for k, l := range w.s.watcherMap {
for i, v := range l {
if w == v {
@@ -2645,7 +2270,8 @@ func (w *Watcher) Stop() {
// writing to realCh. make sure it finishes.
for range w.realCh {
}
- }
+ return nil
+ }, false)
}
func (s *BgpServer) isWatched(typ WatchEventType) bool {
@@ -2659,12 +2285,7 @@ func (s *BgpServer) notifyWatcher(typ WatchEventType, ev WatchEvent) {
}
func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
- ch := make(chan struct{})
- defer func() { <-ch }()
-
- s.mgmtCh <- func() {
- defer close(ch)
-
+ s.mgmtOperation(func() error {
w = &Watcher{
s: s,
realCh: make(chan WatchEvent, 8),
@@ -2742,6 +2363,7 @@ func (s *BgpServer) Watch(opts ...WatchOption) (w *Watcher) {
}
go w.loop()
- }
+ return nil
+ }, false)
return w
}