diff options
-rw-r--r-- | server/peer.go | 110 | ||||
-rw-r--r-- | server/server.go | 116 | ||||
-rw-r--r-- | table/destination.go | 1 |
3 files changed, 155 insertions, 72 deletions
diff --git a/server/peer.go b/server/peer.go index 6f18f4ec..b65edc11 100644 --- a/server/peer.go +++ b/server/peer.go @@ -27,6 +27,19 @@ import ( "time" ) +type peerMsgType int + +const ( + _ peerMsgType = iota + PEER_MSG_PATH + PEER_MSG_PEER_DOWN +) + +type peerMsg struct { + msgType peerMsgType + msgData interface{} +} + type Peer struct { t tomb.Tomb globalConfig config.GlobalType @@ -34,8 +47,8 @@ type Peer struct { acceptedConnCh chan *net.TCPConn incoming chan *fsmMsg outgoing chan *bgp.BGPMessage - inEventCh chan *message - outEventCh chan *message + serverMsgCh chan *serverMsg + peerMsgCh chan *peerMsg fsm *FSM adjRib *table.AdjRib // peer and rib are always not one-to-one so should not be @@ -45,19 +58,24 @@ type Peer struct { rf bgp.RouteFamily capMap map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface peerInfo *table.PeerInfo + siblings map[string]*serverMsgDataPeer } -func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *message) *Peer { +func NewPeer(g config.GlobalType, peer config.NeighborType, serverMsgCh chan *serverMsg, peerMsgCh chan *peerMsg, peerList []*serverMsgDataPeer) *Peer { p := &Peer{ globalConfig: g, peerConfig: peer, acceptedConnCh: make(chan *net.TCPConn), incoming: make(chan *fsmMsg, 4096), outgoing: make(chan *bgp.BGPMessage, 4096), - inEventCh: make(chan *message, 4096), - outEventCh: outEventCh, + serverMsgCh: serverMsgCh, + peerMsgCh: peerMsgCh, capMap: make(map[bgp.BGPCapabilityCode]bgp.ParameterCapabilityInterface), } + p.siblings = make(map[string]*serverMsgDataPeer) + for _, s := range peerList { + p.siblings[s.address.String()] = s + } p.fsm = NewFSM(&g, &peer, p.acceptedConnCh, p.incoming, p.outgoing) peer.BgpNeighborCommonState.State = uint32(bgp.BGP_FSM_IDLE) if peer.NeighborAddress.To4() != nil { @@ -70,6 +88,7 @@ func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *mes VersionNum: 1, LocalID: g.RouterId, RF: p.rf, + Address: peer.NeighborAddress, } p.adjRib = table.NewAdjRib() p.rib = table.NewTableManager() @@ -108,7 +127,14 @@ func (peer *Peer) handleBGPmessage(m *bgp.BGPMessage) { return } peer.adjRib.UpdateIn(pathList) - peer.sendToHub("", PEER_MSG_PATH, pathList) + pm := &peerMsg{ + msgType: PEER_MSG_PATH, + msgData: pathList, + } + for _, s := range peer.siblings { + // TODO: check rf + s.peerMsgCh <- pm + } } } @@ -140,7 +166,7 @@ func (peer *Peer) handleREST(restReq *api.RestRequest) { close(restReq.ResponseCh) } -func (peer *Peer) handlePeermessage(m *message) { +func (peer *Peer) handlePeerMsg(m *peerMsg) { sendpath := func(pList []table.Path, wList []table.Path) { pathList := append([]table.Path(nil), pList...) pathList = append(pathList, wList...) @@ -154,15 +180,46 @@ func (peer *Peer) handlePeermessage(m *message) { peer.sendMessages(table.CreateUpdateMsgFromPaths(pathList)) } - switch m.event { + switch m.msgType { case PEER_MSG_PATH: - pList, wList, _ := peer.rib.ProcessPaths(m.data.([]table.Path)) + pList, wList, _ := peer.rib.ProcessPaths(m.msgData.([]table.Path)) sendpath(pList, wList) - case PEER_MSG_DOWN: - pList, wList, _ := peer.rib.DeletePathsforPeer(m.data.(*table.PeerInfo)) + case PEER_MSG_PEER_DOWN: + pList, wList, _ := peer.rib.DeletePathsforPeer(m.msgData.(*table.PeerInfo)) sendpath(pList, wList) - case PEER_MSG_REST: - peer.handleREST(m.data.(*api.RestRequest)) + } +} + +func (peer *Peer) handleServerMsg(m *serverMsg) { + switch m.msgType { + case SRV_MSG_PEER_ADDED: + d := m.msgData.(*serverMsgDataPeer) + peer.siblings[d.address.String()] = d + pathList := peer.adjRib.GetInPathList(d.rf) + if len(pathList) == 0 { + return + } + pm := &peerMsg{ + msgType: PEER_MSG_PATH, + msgData: pathList, + } + for _, s := range peer.siblings { + // TODO: check rf + s.peerMsgCh <- pm + } + case SRV_MSG_PEER_DELETED: + d := m.msgData.(*table.PeerInfo) + _, found := peer.siblings[d.Address.String()] + if found { + delete(peer.siblings, d.Address.String()) + // TODO: do the same that PEER_MSG_PEER_DOWN handler + } else { + log.Warning("can not find peer: ", d.Address.String()) + } + case SRV_MSG_API: + peer.handleREST(m.msgData.(*api.RestRequest)) + default: + log.Fatal("unknown server msg type ", m.msgType) } } @@ -200,13 +257,21 @@ func (peer *Peer) loop() error { } if oldState == bgp.BGP_FSM_ESTABLISHED { peer.fsm.peerConfig.BgpNeighborCommonState.Uptime = time.Time{} - peer.sendToHub("", PEER_MSG_DOWN, peer.peerInfo) + pm := &peerMsg{ + msgType: PEER_MSG_PEER_DOWN, + msgData: peer.peerInfo, + } + for _, s := range peer.siblings { + s.peerMsgCh <- pm + } } case FSM_MSG_BGP_MESSAGE: peer.handleBGPmessage(e.MsgData.(*bgp.BGPMessage)) } - case m := <-peer.inEventCh: - peer.handlePeermessage(m) + case m := <-peer.serverMsgCh: + peer.handleServerMsg(m) + case m := <-peer.peerMsgCh: + peer.handlePeerMsg(m) } } } @@ -221,19 +286,6 @@ func (peer *Peer) PassConn(conn *net.TCPConn) { peer.acceptedConnCh <- conn } -func (peer *Peer) SendMessage(msg *message) { - peer.inEventCh <- msg -} - -func (peer *Peer) sendToHub(destination string, event int, data interface{}) { - peer.outEventCh <- &message{ - src: peer.peerConfig.NeighborAddress.String(), - dst: destination, - event: event, - data: data, - } -} - func (peer *Peer) MarshalJSON() ([]byte, error) { f := peer.fsm diff --git a/server/server.go b/server/server.go index 99ea974c..3275e5dc 100644 --- a/server/server.go +++ b/server/server.go @@ -21,25 +21,38 @@ import ( log "github.com/Sirupsen/logrus" "github.com/osrg/gobgp/api" "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" "net" "os" "strconv" "strings" ) +type serverMsgType int + const ( - _ = iota - PEER_MSG_NEW - PEER_MSG_PATH - PEER_MSG_DOWN - PEER_MSG_REST //hacky, fix later + _ serverMsgType = iota + SRV_MSG_PEER_ADDED + SRV_MSG_PEER_DELETED + SRV_MSG_API ) -type message struct { - src string - dst string - event int - data interface{} +type serverMsg struct { + msgType serverMsgType + msgData interface{} +} + +type serverMsgDataPeer struct { + peerMsgCh chan *peerMsg + address net.IP + rf bgp.RouteFamily +} + +type peerMapInfo struct { + peer *Peer + serverMsgCh chan *serverMsg + peerMsgCh chan *peerMsg + peerMsgData *serverMsgDataPeer } type BgpServer struct { @@ -49,7 +62,7 @@ type BgpServer struct { deletedPeerCh chan config.NeighborType RestReqCh chan *api.RestRequest listenPort int - peerMap map[string]*Peer + peerMap map[string]peerMapInfo } func NewBgpServer(port int) *BgpServer { @@ -111,8 +124,7 @@ func (server *BgpServer) Serve() { return f } - server.peerMap = make(map[string]*Peer) - broadcastCh := make(chan *message) + server.peerMap = make(map[string]peerMapInfo) for { select { case conn := <-acceptCh: @@ -123,10 +135,10 @@ func (server *BgpServer) Serve() { idx := strings.LastIndex(addrPort, ":") return addrPort[1 : idx-1] }(conn.RemoteAddr().String()) - peer, found := server.peerMap[remoteAddr] + info, found := server.peerMap[remoteAddr] if found { log.Info("accepted a new passive connection from ", remoteAddr) - peer.PassConn(conn) + info.peer.PassConn(conn) } else { log.Info("can't find configuration for a new passive connection from ", remoteAddr) conn.Close() @@ -135,29 +147,59 @@ func (server *BgpServer) Serve() { addr := peer.NeighborAddress.String() f := listenFile(peer.NeighborAddress) SetTcpMD5SigSockopts(int(f.Fd()), addr, peer.AuthPassword) - p := NewPeer(server.bgpConfig.Global, peer, broadcastCh) - server.peerMap[peer.NeighborAddress.String()] = p + sch := make(chan *serverMsg, 8) + pch := make(chan *peerMsg, 4096) + l := make([]*serverMsgDataPeer, len(server.peerMap)) + i := 0 + for _, v := range server.peerMap { + l[i] = v.peerMsgData + i++ + } + p := NewPeer(server.bgpConfig.Global, peer, sch, pch, l) + d := &serverMsgDataPeer{ + address: peer.NeighborAddress, + peerMsgCh: pch, + rf: p.peerInfo.RF, + } + msg := &serverMsg{ + msgType: SRV_MSG_PEER_ADDED, + msgData: d, + } + sendServerMsgToAll(server.peerMap, msg) + server.peerMap[peer.NeighborAddress.String()] = peerMapInfo{ + peer: p, + serverMsgCh: sch, + peerMsgData: d, + } case peer := <-server.deletedPeerCh: addr := peer.NeighborAddress.String() f := listenFile(peer.NeighborAddress) SetTcpMD5SigSockopts(int(f.Fd()), addr, "") - p, found := server.peerMap[addr] + info, found := server.peerMap[addr] if found { log.Info("Delete a peer configuration for ", addr) - p.Stop() + info.peer.Stop() delete(server.peerMap, addr) + msg := &serverMsg{ + msgType: SRV_MSG_PEER_DELETED, + msgData: info.peer.peerInfo, + } + sendServerMsgToAll(server.peerMap, msg) } else { log.Info("Can't delete a peer configuration for ", addr) } case restReq := <-server.RestReqCh: server.handleRest(restReq) - - case msg := <-broadcastCh: - server.broadcast(msg) } } } +func sendServerMsgToAll(peerMap map[string]peerMapInfo, msg *serverMsg) { + for _, info := range peerMap { + info.serverMsgCh <- msg + } +} + func (server *BgpServer) SetGlobalType(g config.GlobalType) { server.globalTypeCh <- g } @@ -170,25 +212,13 @@ func (server *BgpServer) PeerDelete(peer config.NeighborType) { server.deletedPeerCh <- peer } -func (server *BgpServer) broadcast(msg *message) { - for key := range server.peerMap { - if key == msg.src { - continue - } - if msg.dst == "" || msg.dst == key { - peer := server.peerMap[key] - peer.SendMessage(msg) - } - } -} - func (server *BgpServer) handleRest(restReq *api.RestRequest) { switch restReq.RequestType { case api.REQ_NEIGHBORS: result := &api.RestResponse{} peerList := make([]*Peer, 0) - for _, peer := range server.peerMap { - peerList = append(peerList, peer) + for _, info := range server.peerMap { + peerList = append(peerList, info.peer) } j, _ := json.Marshal(peerList) result.Data = j @@ -199,9 +229,9 @@ func (server *BgpServer) handleRest(restReq *api.RestRequest) { remoteAddr := restReq.RemoteAddr result := &api.RestResponse{} - peer, found := server.peerMap[remoteAddr] + info, found := server.peerMap[remoteAddr] if found { - j, _ := json.Marshal(peer) + j, _ := json.Marshal(info.peer) result.Data = j } else { result.ResponseErr = fmt.Errorf("Neighbor that has %v does not exist.", remoteAddr) @@ -211,13 +241,13 @@ func (server *BgpServer) handleRest(restReq *api.RestRequest) { case api.REQ_LOCAL_RIB: remoteAddr := restReq.RemoteAddr result := &api.RestResponse{} - peer, found := server.peerMap[remoteAddr] + info, found := server.peerMap[remoteAddr] if found { - msg := message{ - event: PEER_MSG_REST, - data: restReq, + msg := &serverMsg{ + msgType: SRV_MSG_API, + msgData: restReq, } - peer.SendMessage(&msg) + info.peer.serverMsgCh <- msg } else { result.ResponseErr = fmt.Errorf("Neighbor that has %v does not exist.", remoteAddr) restReq.ResponseCh <- result diff --git a/table/destination.go b/table/destination.go index 06bab95c..73cae66c 100644 --- a/table/destination.go +++ b/table/destination.go @@ -46,6 +46,7 @@ type PeerInfo struct { VersionNum int LocalID net.IP RF bgp.RouteFamily + Address net.IP } type Destination interface { |