diff options
Diffstat (limited to 'src/peer.go')
-rw-r--r-- | src/peer.go | 62 |
1 files changed, 43 insertions, 19 deletions
diff --git a/src/peer.go b/src/peer.go index 7c6ad47..3d82989 100644 --- a/src/peer.go +++ b/src/peer.go @@ -8,6 +8,10 @@ import ( "time" ) +const ( + PeerRoutineNumber = 4 +) + type Peer struct { id uint mutex sync.RWMutex @@ -34,7 +38,6 @@ type Peer struct { flushNonceQueue Signal // size 1, empty queued packets messageSend Signal // size 1, message was send to peer messageReceived Signal // size 1, authenticated message recv - stop Signal // size 0, stop all goroutines in peer } timer struct { // state related to WireGuard timers @@ -54,6 +57,12 @@ type Peer struct { outbound chan *QueueOutboundElement // sequential ordering of work inbound chan *QueueInboundElement // sequential ordering of work } + routines struct { + mutex sync.Mutex // held when stopping / starting routines + starting sync.WaitGroup // routines pending start + stopping sync.WaitGroup // routines pending stop + stop Signal // size 0, stop all goroutines in peer + } mac CookieGenerator } @@ -121,6 +130,10 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { peer.signal.handshakeCompleted = NewSignal() peer.signal.flushNonceQueue = NewSignal() + peer.routines.mutex.Lock() + peer.routines.stop = NewSignal() + peer.routines.mutex.Unlock() + return peer, nil } @@ -156,32 +169,43 @@ func (peer *Peer) String() string { ) } -/* Starts all routines for a given peer - * - * Requires that the caller holds the exclusive peer lock! - */ -func unsafePeerStart(peer *Peer) { - peer.signal.stop.Broadcast() - peer.signal.stop = NewSignal() +func (peer *Peer) Start() { + + peer.routines.mutex.Lock() + defer peer.routines.mutex.Lock() + + // stop & wait for ungoing routines (if any) + + peer.routines.stop.Broadcast() + peer.routines.starting.Wait() + peer.routines.stopping.Wait() - var wait sync.WaitGroup + // reset signal and start (new) routines - wait.Add(1) + peer.routines.stop = NewSignal() + peer.routines.starting.Add(PeerRoutineNumber) + peer.routines.stopping.Add(PeerRoutineNumber) go peer.RoutineNonce() - go peer.RoutineTimerHandler(&wait) + go peer.RoutineTimerHandler() go peer.RoutineSequentialSender() go peer.RoutineSequentialReceiver() - wait.Wait() -} - -func (peer *Peer) Start() { - peer.mutex.Lock() - unsafePeerStart(peer) - peer.mutex.Unlock() + peer.routines.starting.Wait() } func (peer *Peer) Stop() { - peer.signal.stop.Broadcast() + + peer.routines.mutex.Lock() + defer peer.routines.mutex.Lock() + + // stop & wait for ungoing routines (if any) + + peer.routines.stop.Broadcast() + peer.routines.starting.Wait() + peer.routines.stopping.Wait() + + // reset signal (to handle repeated stopping) + + peer.routines.stop = NewSignal() } |