summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorHiroshi Yokoi <yokoi.hiroshi@po.ntts.co.jp>2015-09-18 19:06:09 +0900
committerFUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>2015-09-27 09:17:35 +0900
commit9fbf43eccb0070051214e51758ad35af8c8f48d9 (patch)
tree50f703fa95da4b776f2020a46aa8c66f0968e3bb
parent841971972b318552e4fa25613c9f7c051bb8d198 (diff)
zebra: prevent goroutine to broadcast from blocking
Signed-off-by: Hiroshi Yokoi <yokoi.hiroshi@po.ntts.co.jp>
-rw-r--r--zebra/zapi.go35
1 files changed, 22 insertions, 13 deletions
diff --git a/zebra/zapi.go b/zebra/zapi.go
index 09b9107a..189b5733 100644
--- a/zebra/zapi.go
+++ b/zebra/zapi.go
@@ -220,6 +220,15 @@ func NewClient(network, address string, typ ROUTE_TYPE) (*Client, error) {
return nil, err
}
outgoing := make(chan *Message)
+ incoming := make(chan *Message, 64)
+
+ c := &Client{
+ outgoing: outgoing,
+ incoming: incoming,
+ redistDefault: typ,
+ conn: conn,
+ }
+
go func() {
for {
m, more := <-outgoing
@@ -232,8 +241,8 @@ func NewClient(network, address string, typ ROUTE_TYPE) (*Client, error) {
_, err = conn.Write(b)
if err != nil {
- log.Errorf("failed to write: ", err)
- return
+ log.Errorf("failed to write: %s", err)
+ close(outgoing)
}
} else {
log.Debug("finish outgoing loop")
@@ -242,26 +251,25 @@ func NewClient(network, address string, typ ROUTE_TYPE) (*Client, error) {
}
}()
- incoming := make(chan *Message, 64)
- go func() error {
+ go func() {
for {
headerBuf, err := readAll(conn, HEADER_SIZE)
if err != nil {
log.Error("failed to read header: ", err)
- return err
+ return
}
log.Debugf("read header from zebra: %v", headerBuf)
hd := &Header{}
err = hd.DecodeFromBytes(headerBuf)
if err != nil {
log.Error("failed to decode header: ", err)
- return err
+ return
}
bodyBuf, err := readAll(conn, int(hd.Len-HEADER_SIZE))
if err != nil {
log.Error("failed to read body: ", err)
- return err
+ return
}
log.Debugf("read body from zebra: %v", bodyBuf)
m, err := ParseMessage(hd, bodyBuf)
@@ -273,12 +281,8 @@ func NewClient(network, address string, typ ROUTE_TYPE) (*Client, error) {
incoming <- m
}
}()
- return &Client{
- outgoing: outgoing,
- incoming: incoming,
- redistDefault: typ,
- conn: conn,
- }, nil
+
+ return c, nil
}
func readAll(conn net.Conn, length int) ([]byte, error) {
@@ -292,6 +296,11 @@ func (c *Client) Receive() chan *Message {
}
func (c *Client) Send(m *Message) {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Debugf("recovered: %s", err)
+ }
+ }()
c.outgoing <- m
}