summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-12-18 17:23:22 -0800
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2014-12-19 10:28:45 +0900
commitb1c8ed363bdab0d3618077e93a79df227559539c (patch)
tree6b56b6cc5083c3556e80806c0153fd83b9b5c24e
parentf7568d45ca887d967d493d9ae221a118a06aab44 (diff)
server: add message delivery between peers
a peer notifies other peers of its event (join,leave,bgp state change, updates, etc) It would have better performance for a peer to send a message to a channel of another peer directly. However, for the code simplicity, the hub-and-spoke model is used for now. Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r--server/peer.go21
-rw-r--r--server/server.go35
2 files changed, 49 insertions, 7 deletions
diff --git a/server/peer.go b/server/peer.go
index cf712b57..52a8205f 100644
--- a/server/peer.go
+++ b/server/peer.go
@@ -31,16 +31,20 @@ type Peer struct {
acceptedConnCh chan *net.TCPConn
incoming chan *bgp.BGPMessage
outgoing chan *bgp.BGPMessage
+ inEventCh chan *message
+ outEventCh chan *message
fsm *FSM
}
-func NewPeer(g config.GlobalType, peer config.NeighborType) *Peer {
+func NewPeer(g config.GlobalType, peer config.NeighborType, outEventCh chan *message) *Peer {
p := &Peer{
globalConfig: g,
peerConfig: peer,
acceptedConnCh: make(chan *net.TCPConn),
incoming: make(chan *bgp.BGPMessage, 4096),
outgoing: make(chan *bgp.BGPMessage, 4096),
+ inEventCh: make(chan *message, 4096),
+ outEventCh: outEventCh,
}
p.fsm = NewFSM(&g, &peer, p.acceptedConnCh, p.incoming, p.outgoing)
p.t.Go(p.loop)
@@ -70,6 +74,8 @@ func (peer *Peer) loop() error {
j, _ := json.Marshal(m)
fmt.Println(string(j))
}
+ case m := <-peer.inEventCh:
+ fmt.Println(m)
}
}
}
@@ -83,3 +89,16 @@ func (peer *Peer) Stop() error {
func (peer *Peer) PassConn(conn *net.TCPConn) {
peer.acceptedConnCh <- conn
}
+
+func (peer *Peer) SendMessage(msg *message) {
+ peer.inEventCh <- msg
+}
+
+func (peer *Peer) sendToHub(destination string, event int, data interface{}) {
+ peer.outEventCh <- &message{
+ src: peer.peerConfig.NeighborAddress.String(),
+ dst: destination,
+ event: event,
+ data: data,
+ }
+}
diff --git a/server/server.go b/server/server.go
index 56fe5960..3f7885a6 100644
--- a/server/server.go
+++ b/server/server.go
@@ -24,12 +24,20 @@ import (
"strings"
)
+type message struct {
+ src string
+ dst string
+ event int
+ data interface{}
+}
+
type BgpServer struct {
bgpConfig config.BgpType
globalTypeCh chan config.GlobalType
addedPeerCh chan config.NeighborType
deletedPeerCh chan config.NeighborType
listenPort int
+ peerMap map[string]*Peer
}
func NewBgpServer(port int) *BgpServer {
@@ -65,14 +73,15 @@ func (server *BgpServer) Serve() {
}
}()
- peerMap := make(map[string]*Peer)
+ server.peerMap = make(map[string]*Peer)
+ broadcastCh := make(chan *message)
for {
f, _ := l.File()
select {
case conn := <-acceptCh:
fmt.Println(conn)
remoteAddr := strings.Split(conn.RemoteAddr().String(), ":")[0]
- peer, found := peerMap[remoteAddr]
+ peer, found := server.peerMap[remoteAddr]
if found {
fmt.Println("found neighbor", remoteAddr)
peer.PassConn(conn)
@@ -84,20 +93,22 @@ func (server *BgpServer) Serve() {
fmt.Println(peer)
addr := peer.NeighborAddress.String()
SetTcpMD5SigSockopts(int(f.Fd()), addr, peer.AuthPassword)
- p := NewPeer(server.bgpConfig.Global, peer)
- peerMap[peer.NeighborAddress.String()] = p
+ p := NewPeer(server.bgpConfig.Global, peer, broadcastCh)
+ server.peerMap[peer.NeighborAddress.String()] = p
case peer := <-server.deletedPeerCh:
fmt.Println(peer)
addr := peer.NeighborAddress.String()
SetTcpMD5SigSockopts(int(f.Fd()), addr, "")
- p, found := peerMap[addr]
+ p, found := server.peerMap[addr]
if found {
fmt.Println("found neighbor", addr)
p.Stop()
- delete(peerMap, addr)
+ delete(server.peerMap, addr)
} else {
fmt.Println("can't found neighbor", addr)
}
+ case msg := <-broadcastCh:
+ server.broadcast(msg)
}
}
}
@@ -113,3 +124,15 @@ func (server *BgpServer) PeerAdd(peer config.NeighborType) {
func (server *BgpServer) PeerDelete(peer config.NeighborType) {
server.deletedPeerCh <- peer
}
+
+func (server *BgpServer) broadcast(msg *message) {
+ for key := range server.peerMap {
+ if key == msg.src {
+ continue
+ }
+ if msg.dst == "" || msg.dst == key {
+ peer := server.peerMap[key]
+ peer.SendMessage(msg)
+ }
+ }
+}