summaryrefslogtreecommitdiffhomepage
path: root/src/peer.go
diff options
context:
space:
mode:
authorMathias Hall-Andersen <mathias@hall-andersen.dk>2018-01-26 22:52:32 +0100
committerMathias Hall-Andersen <mathias@hall-andersen.dk>2018-01-26 22:52:32 +0100
commitf73d2fb2d96bc3fbc8bc4cce452e3c19689de01e (patch)
tree52e392cf6313e7e9e5e87539fcb5e88817e47f37 /src/peer.go
parent068d932f2c54f3b1cf0873d74113205dbeb1816f (diff)
Added initial version of peer teardown
There is a double lock issue with device.Close which has yet to be resolved.
Diffstat (limited to 'src/peer.go')
-rw-r--r--src/peer.go68
1 files changed, 51 insertions, 17 deletions
diff --git a/src/peer.go b/src/peer.go
index 3d82989..5ad4511 100644
--- a/src/peer.go
+++ b/src/peer.go
@@ -4,6 +4,7 @@ import (
"encoding/base64"
"errors"
"fmt"
+ "github.com/sasha-s/go-deadlock"
"sync"
"time"
)
@@ -14,7 +15,8 @@ const (
type Peer struct {
id uint
- mutex sync.RWMutex
+ isRunning AtomicBool
+ mutex deadlock.RWMutex
persistentKeepaliveInterval uint64
keyPairs KeyPairs
handshake Handshake
@@ -26,7 +28,7 @@ type Peer struct {
lastHandshakeNano int64 // nano seconds since epoch
}
time struct {
- mutex sync.RWMutex
+ mutex deadlock.RWMutex
lastSend time.Time // last send message
lastHandshake time.Time // last completed handshake
nextKeepalive time.Time
@@ -58,7 +60,7 @@ type Peer struct {
inbound chan *QueueInboundElement // sequential ordering of work
}
routines struct {
- mutex sync.Mutex // held when stopping / starting routines
+ mutex deadlock.Mutex // held when stopping / starting routines
starting sync.WaitGroup // routines pending start
stopping sync.WaitGroup // routines pending stop
stop Signal // size 0, stop all goroutines in peer
@@ -67,6 +69,14 @@ type Peer struct {
}
func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
+
+ if device.isClosed.Get() {
+ return nil, errors.New("Device closed")
+ }
+
+ device.mutex.Lock()
+ defer device.mutex.Unlock()
+
// create peer
peer := new(Peer)
@@ -75,17 +85,17 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
peer.mac.Init(pk)
peer.device = device
+ peer.isRunning.Set(false)
+ peer.timer.zeroAllKeys = NewTimer()
peer.timer.keepalivePersistent = NewTimer()
peer.timer.keepalivePassive = NewTimer()
- peer.timer.zeroAllKeys = NewTimer()
peer.timer.handshakeNew = NewTimer()
peer.timer.handshakeDeadline = NewTimer()
peer.timer.handshakeTimeout = NewTimer()
// assign id for debugging
- device.mutex.Lock()
peer.id = device.idCounter
device.idCounter += 1
@@ -102,7 +112,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
return nil, errors.New("Adding existing peer")
}
device.peers[pk] = peer
- device.mutex.Unlock()
// precompute DH
@@ -117,23 +126,20 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
peer.endpoint = nil
- // prepare queuing
-
- peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize)
- peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
- peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
-
// prepare signaling & routines
- peer.signal.newKeyPair = NewSignal()
- peer.signal.handshakeBegin = NewSignal()
- peer.signal.handshakeCompleted = NewSignal()
- peer.signal.flushNonceQueue = NewSignal()
-
peer.routines.mutex.Lock()
peer.routines.stop = NewSignal()
peer.routines.mutex.Unlock()
+ // start peer
+
+ peer.device.state.mutex.Lock()
+ if peer.device.isUp.Get() {
+ peer.Start()
+ }
+ peer.device.state.mutex.Unlock()
+
return peer, nil
}
@@ -148,6 +154,10 @@ func (peer *Peer) SendBuffer(buffer []byte) error {
return errors.New("No known endpoint for peer")
}
+ if peer.device.net.bind == nil {
+ return errors.New("No bind")
+ }
+
return peer.device.net.bind.Send(buffer, peer.endpoint)
}
@@ -174,12 +184,26 @@ func (peer *Peer) Start() {
peer.routines.mutex.Lock()
defer peer.routines.mutex.Lock()
+ peer.device.log.Debug.Println("Starting:", peer.String())
+
// stop & wait for ungoing routines (if any)
+ peer.isRunning.Set(false)
peer.routines.stop.Broadcast()
peer.routines.starting.Wait()
peer.routines.stopping.Wait()
+ // prepare queues
+
+ peer.signal.newKeyPair = NewSignal()
+ peer.signal.handshakeBegin = NewSignal()
+ peer.signal.handshakeCompleted = NewSignal()
+ peer.signal.flushNonceQueue = NewSignal()
+
+ peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize)
+ peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
+ peer.queue.inbound = make(chan *QueueInboundElement, QueueInboundSize)
+
// reset signal and start (new) routines
peer.routines.stop = NewSignal()
@@ -192,6 +216,7 @@ func (peer *Peer) Start() {
go peer.RoutineSequentialReceiver()
peer.routines.starting.Wait()
+ peer.isRunning.Set(true)
}
func (peer *Peer) Stop() {
@@ -199,13 +224,22 @@ func (peer *Peer) Stop() {
peer.routines.mutex.Lock()
defer peer.routines.mutex.Lock()
+ peer.device.log.Debug.Println("Stopping:", peer.String())
+
// stop & wait for ungoing routines (if any)
peer.routines.stop.Broadcast()
peer.routines.starting.Wait()
peer.routines.stopping.Wait()
+ // close queues
+
+ close(peer.queue.nonce)
+ close(peer.queue.outbound)
+ close(peer.queue.inbound)
+
// reset signal (to handle repeated stopping)
peer.routines.stop = NewSignal()
+ peer.isRunning.Set(false)
}