diff options
author | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-12-16 09:15:13 +0900 |
---|---|---|
committer | FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp> | 2014-12-16 09:15:13 +0900 |
commit | 26ed030ebea36f9798009ccea45df585660b5f25 (patch) | |
tree | 97d3ce671991840a336cf39a44390cbad8aa4c83 | |
parent | 0d4f59afad75e44d8dfbfedddb55e433d7fb5431 (diff) |
add initial bgp server code
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
-rw-r--r-- | bgpd.go | 87 | ||||
-rw-r--r-- | server/fsm.go | 325 | ||||
-rw-r--r-- | server/peer.go | 85 | ||||
-rw-r--r-- | server/server.go | 116 |
4 files changed, 613 insertions, 0 deletions
diff --git a/bgpd.go b/bgpd.go new file mode 100644 index 00000000..7360f99b --- /dev/null +++ b/bgpd.go @@ -0,0 +1,87 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "./server" + "fmt" + "github.com/jessevdk/go-flags" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" + "os" + "os/signal" + "runtime" + "syscall" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP) + + var opts struct { + ConfigFile string `short:"f" long:"config-file" description:"specifying a config file"` + } + + _, err := flags.Parse(&opts) + if err != nil { + os.Exit(1) + } + + if opts.ConfigFile == "" { + opts.ConfigFile = "gobgpd.conf" + } + + configCh := make(chan config.BgpType) + reloadCh := make(chan bool) + go config.ReadConfigfileServe(opts.ConfigFile, configCh, reloadCh) + reloadCh <- true + + bgpServer := server.NewBgpServer(bgp.BGP_PORT) + go bgpServer.Serve() + + var bgpConfig *config.BgpType = nil + for { + select { + case newConfig := <-configCh: + var added []config.NeighborType + var deleted []config.NeighborType + + if bgpConfig == nil { + bgpServer.SetGlobalType(newConfig.Global) + bgpConfig = &newConfig + added = newConfig.NeighborList + deleted = []config.NeighborType{} + } else { + _, added, deleted = config.UpdateConfig(bgpConfig, &newConfig) + } + + for _, p := range added { + bgpServer.PeerAdd(p) + } + for _, p := range deleted { + bgpServer.PeerDelete(p) + } + case sig := <-sigCh: + switch sig { + case syscall.SIGHUP: + fmt.Println("relaod the config file") + reloadCh <- true + } + } + } +} diff --git a/server/fsm.go b/server/fsm.go new file mode 100644 index 00000000..2de18cd9 --- /dev/null +++ b/server/fsm.go @@ -0,0 +1,325 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" + "gopkg.in/tomb.v2" + "net" + "time" +) + +type FSM struct { + globalConfig *config.GlobalType + peerConfig *config.NeighborType + keepaliveTicker *time.Ticker + state int + incoming chan *bgp.BGPMessage + outgoing chan *bgp.BGPMessage + //activeConn *net.TCPConn + passiveConn *net.TCPConn + passiveConnCh chan *net.TCPConn + stateCh chan int +} + +func NewFSM(gConfig *config.GlobalType, pConfig *config.NeighborType, connCh chan *net.TCPConn, incoming chan *bgp.BGPMessage, outgoing chan *bgp.BGPMessage) *FSM { + return &FSM{ + globalConfig: gConfig, + peerConfig: pConfig, + incoming: incoming, + outgoing: outgoing, + state: bgp.BGP_FSM_IDLE, + passiveConnCh: connCh, + stateCh: make(chan int), + } +} + +func (fsm *FSM) StateChanged() chan int { + return fsm.stateCh +} + +func (fsm *FSM) StateSet(nextState int) { + fmt.Println("state changed", nextState, fsm.state) + fsm.state = nextState +} + +type FSMHandler struct { + t tomb.Tomb + fsm *FSM + conn *net.TCPConn + ch chan *bgp.BGPMessage + ioError bool +} + +func NewFSMHandler(fsm *FSM) *FSMHandler { + f := &FSMHandler{ + fsm: fsm, + ioError: false, + } + f.t.Go(f.loop) + return f +} + +func (h *FSMHandler) Wait() error { + return h.t.Wait() +} + +func (h *FSMHandler) Stop() error { + h.t.Kill(nil) + return h.t.Wait() +} + +func (h *FSMHandler) idle() error { + fsm := h.fsm + // TODO: support idle hold timer + + if fsm.keepaliveTicker != nil { + fsm.keepaliveTicker.Stop() + fsm.keepaliveTicker = nil + } + fsm.stateCh <- bgp.BGP_FSM_ACTIVE + return nil +} + +func (h *FSMHandler) active() error { + fsm := h.fsm + select { + case <-h.t.Dying(): + return nil + case conn := <-fsm.passiveConnCh: + fsm.passiveConn = conn + } + // we don't implement delayed open timer so move to opensent right + // away. + fsm.stateCh <- bgp.BGP_FSM_OPENSENT + return nil +} + +func buildopen(global *config.GlobalType, neighborT *config.NeighborType) *bgp.BGPMessage { + p1 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapRouteRefresh()}) + p2 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapMultiProtocol(bgp.AFI_IP, bgp.SAFI_UNICAST)}) + p3 := bgp.NewOptionParameterCapability( + []bgp.ParameterCapabilityInterface{bgp.NewCapFourOctetASNumber(global.As)}) + holdTime := uint16(neighborT.Timers.HoldTime) + as := global.As + if as > (1<<16)-1 { + as = bgp.AS_TRANS + } + return bgp.NewBGPOpenMessage(uint16(as), holdTime, global.RouterId.String(), + []bgp.OptionParameterInterface{p1, p2, p3}) +} + +func readAll(conn *net.TCPConn, length int) ([]byte, error) { + buf := make([]byte, length) + for cur := 0; cur < length; { + if num, err := conn.Read(buf); err != nil { + return nil, err + } else { + cur += num + } + } + return buf, nil +} + +func (h *FSMHandler) recvMessage() error { + headerBuf, err := readAll(h.conn, bgp.BGP_HEADER_LENGTH) + if err != nil { + h.ioError = true + close(h.ch) + return nil + } + + hd := &bgp.BGPHeader{} + err = hd.DecodeFromBytes(headerBuf) + if err != nil { + h.ioError = true + close(h.ch) + return nil + } + + bodyBuf, err := readAll(h.conn, int(hd.Len)-bgp.BGP_HEADER_LENGTH) + if err != nil { + h.ioError = true + close(h.ch) + return nil + } + + m, err := bgp.ParseBGPBody(hd, bodyBuf) + if err != nil { + h.ioError = true + close(h.ch) + return nil + } + h.ch <- m + return nil +} + +func (h *FSMHandler) opensent() error { + fsm := h.fsm + m := buildopen(fsm.globalConfig, fsm.peerConfig) + b, _ := m.Serialize() + fsm.passiveConn.Write(b) + + h.ch = make(chan *bgp.BGPMessage) + h.conn = fsm.passiveConn + + h.t.Go(h.recvMessage) + + nextState := bgp.BGP_FSM_IDLE + select { + case <-h.t.Dying(): + fsm.passiveConn.Close() + return nil + case m, ok := <-h.ch: + if ok { + if m.Header.Type == bgp.BGP_MSG_OPEN { + msg := bgp.NewBGPKeepAliveMessage() + b, _ := msg.Serialize() + fsm.passiveConn.Write(b) + nextState = bgp.BGP_FSM_OPENCONFIRM + } else { + // send error + } + } else { + // io error + } + } + fsm.stateCh <- nextState + return nil +} + +func (h *FSMHandler) openconfirm() error { + fsm := h.fsm + sec := time.Second * time.Duration(fsm.peerConfig.Timers.KeepaliveInterval) + fsm.keepaliveTicker = time.NewTicker(sec) + + h.ch = make(chan *bgp.BGPMessage) + h.conn = fsm.passiveConn + + h.t.Go(h.recvMessage) + + for { + select { + case <-h.t.Dying(): + fsm.passiveConn.Close() + return nil + case <-fsm.keepaliveTicker.C: + m := bgp.NewBGPKeepAliveMessage() + b, _ := m.Serialize() + // TODO: check error + fsm.passiveConn.Write(b) + case m, ok := <-h.ch: + nextState := bgp.BGP_FSM_IDLE + if ok { + if m.Header.Type == bgp.BGP_MSG_KEEPALIVE { + nextState = bgp.BGP_FSM_ESTABLISHED + } else { + // send error + } + } else { + // io error + } + fsm.stateCh <- nextState + return nil + } + } + // panic + return nil +} + +func (h *FSMHandler) sendMessageloop() error { + conn := h.conn + fsm := h.fsm + for { + select { + case <-h.t.Dying(): + return nil + case m := <-fsm.outgoing: + b, _ := m.Serialize() + _, err := conn.Write(b) + if err != nil { + return nil + } + case <-fsm.keepaliveTicker.C: + m := bgp.NewBGPKeepAliveMessage() + b, _ := m.Serialize() + _, err := conn.Write(b) + if err != nil { + return nil + } + } + } +} + +func (h *FSMHandler) recvMessageloop() error { + for { + h.recvMessage() + if h.ioError == true { + return nil + } + } +} + +func (h *FSMHandler) established() error { + fsm := h.fsm + h.conn = fsm.passiveConn + h.t.Go(h.sendMessageloop) + // TODO: use incoming directly + h.ch = make(chan *bgp.BGPMessage, 4096) + h.t.Go(h.recvMessageloop) + + for { + select { + case m, ok := <-h.ch: + if ok { + fsm.incoming <- m + } else { + h.conn.Close() + h.t.Kill(nil) + fsm.stateCh <- bgp.BGP_FSM_IDLE + return nil + } + case <-h.t.Dying(): + h.conn.Close() + return nil + } + } + return nil +} + +func (h *FSMHandler) loop() error { + fsm := h.fsm + switch fsm.state { + case bgp.BGP_FSM_IDLE: + return h.idle() + // case bgp.BGP_FSM_CONNECT: + // return h.connect() + case bgp.BGP_FSM_ACTIVE: + return h.active() + case bgp.BGP_FSM_OPENSENT: + return h.opensent() + case bgp.BGP_FSM_OPENCONFIRM: + return h.openconfirm() + case bgp.BGP_FSM_ESTABLISHED: + return h.established() + } + // panic + return nil +} diff --git a/server/peer.go b/server/peer.go new file mode 100644 index 00000000..750d2230 --- /dev/null +++ b/server/peer.go @@ -0,0 +1,85 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/json" + "fmt" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/packet" + "gopkg.in/tomb.v2" + "net" +) + +type Peer struct { + t tomb.Tomb + globalConfig config.GlobalType + peerConfig config.NeighborType + acceptedConnCh chan *net.TCPConn + incoming chan *bgp.BGPMessage + outgoing chan *bgp.BGPMessage + fsm *FSM +} + +func NewPeer(g config.GlobalType, peer config.NeighborType) *Peer { + p := &Peer{ + globalConfig: g, + peerConfig: peer, + acceptedConnCh: make(chan *net.TCPConn), + incoming: make(chan *bgp.BGPMessage, 4096), + outgoing: make(chan *bgp.BGPMessage, 4096), + } + p.fsm = NewFSM(&g, &peer, p.acceptedConnCh, p.incoming, p.outgoing) + p.t.Go(p.loop) + return p +} + +// this goroutine handles routing table operations +func (peer *Peer) loop() error { + for { + h := NewFSMHandler(peer.fsm) + sameState := true + for sameState { + select { + case nextState := <-peer.fsm.StateChanged(): + // waits for all goroutines created for the current state + h.Wait() + peer.fsm.StateSet(nextState) + sameState = false + case <-peer.t.Dying(): + close(peer.acceptedConnCh) + h.Stop() + close(peer.incoming) + close(peer.outgoing) + return nil + case m := <-peer.incoming: + if m != nil { + j, _ := json.Marshal(m) + fmt.Println(string(j)) + } + } + } + } +} + +func (peer *Peer) Stop() error { + peer.t.Kill(nil) + return peer.t.Wait() +} + +func (peer *Peer) PassConn(conn *net.TCPConn) { + peer.acceptedConnCh <- conn +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 00000000..a082af4d --- /dev/null +++ b/server/server.go @@ -0,0 +1,116 @@ +// Copyright (C) 2014 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "github.com/osrg/gobgp/config" + "github.com/osrg/gobgp/io" + "net" + "os" + "strconv" + "strings" +) + +type BgpServer struct { + bgpConfig config.BgpType + globalTypeCh chan config.GlobalType + addedPeerCh chan config.NeighborType + deletedPeerCh chan config.NeighborType + listenPort int +} + +func NewBgpServer(port int) *BgpServer { + b := BgpServer{} + b.globalTypeCh = make(chan config.GlobalType) + b.addedPeerCh = make(chan config.NeighborType) + b.deletedPeerCh = make(chan config.NeighborType) + b.listenPort = port + return &b +} + +func (server *BgpServer) Serve() { + server.bgpConfig.Global = <-server.globalTypeCh + + service := ":" + strconv.Itoa(server.listenPort) + addr, _ := net.ResolveTCPAddr("tcp", service) + + l, err := net.ListenTCP("tcp4", addr) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + acceptCh := make(chan *net.TCPConn) + go func() { + for { + conn, err := l.Accept() + if err != nil { + fmt.Println(err) + continue + } + acceptCh <- conn.(*net.TCPConn) + } + }() + + peerMap := make(map[string]*Peer) + for { + f, _ := l.File() + select { + case conn := <-acceptCh: + fmt.Println(conn) + remoteAddr := strings.Split(conn.RemoteAddr().String(), ":")[0] + peer, found := peerMap[remoteAddr] + if found { + fmt.Println("found neighbor", remoteAddr) + peer.PassConn(conn) + } else { + fmt.Println("can't found neighbor", remoteAddr) + conn.Close() + } + case peer := <-server.addedPeerCh: + fmt.Println(peer) + addr := peer.NeighborAddress.String() + io.SetTcpMD5SigSockopts(int(f.Fd()), addr, peer.AuthPassword) + p := NewPeer(server.bgpConfig.Global, peer) + peerMap[peer.NeighborAddress.String()] = p + case peer := <-server.deletedPeerCh: + fmt.Println(peer) + addr := peer.NeighborAddress.String() + io.SetTcpMD5SigSockopts(int(f.Fd()), addr, "") + p, found := peerMap[addr] + if found { + fmt.Println("found neighbor", addr) + p.Stop() + delete(peerMap, addr) + } else { + fmt.Println("can't found neighbor", addr) + } + } + } +} + +func (server *BgpServer) SetGlobalType(g config.GlobalType) { + server.globalTypeCh <- g +} + +func (server *BgpServer) PeerAdd(peer config.NeighborType) { + server.addedPeerCh <- peer +} + +func (server *BgpServer) PeerDelete(peer config.NeighborType) { + server.deletedPeerCh <- peer +} |