diff options
author | Brad Fitzpatrick <bradfitz@tailscale.com> | 2022-08-30 07:43:11 -0700 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2022-09-04 12:57:30 +0200 |
commit | b51010ba13f0a3e59808fbdb1566cd2c6b834b95 (patch) | |
tree | 37a6753e5604c13f7141bd5613b0ab80e7b794f7 /device | |
parent | d1d08426b27b57990b1ee6782794f56d2c002aa3 (diff) |
all: use Go 1.19 and its atomic types
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
Diffstat (limited to 'device')
-rw-r--r-- | device/alignment_test.go | 65 | ||||
-rw-r--r-- | device/device.go | 32 | ||||
-rw-r--r-- | device/device_test.go | 6 | ||||
-rw-r--r-- | device/keypair.go | 13 | ||||
-rw-r--r-- | device/misc.go | 41 | ||||
-rw-r--r-- | device/noise-protocol.go | 16 | ||||
-rw-r--r-- | device/noise_test.go | 2 | ||||
-rw-r--r-- | device/peer.go | 53 | ||||
-rw-r--r-- | device/pools.go | 8 | ||||
-rw-r--r-- | device/pools_test.go | 23 | ||||
-rw-r--r-- | device/receive.go | 15 | ||||
-rw-r--r-- | device/send.go | 21 | ||||
-rw-r--r-- | device/timers.go | 31 | ||||
-rw-r--r-- | device/tun.go | 3 | ||||
-rw-r--r-- | device/uapi.go | 11 |
15 files changed, 106 insertions, 234 deletions
diff --git a/device/alignment_test.go b/device/alignment_test.go deleted file mode 100644 index a918112..0000000 --- a/device/alignment_test.go +++ /dev/null @@ -1,65 +0,0 @@ -/* SPDX-License-Identifier: MIT - * - * Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved. - */ - -package device - -import ( - "reflect" - "testing" - "unsafe" -) - -func checkAlignment(t *testing.T, name string, offset uintptr) { - t.Helper() - if offset%8 != 0 { - t.Errorf("offset of %q within struct is %d bytes, which does not align to 64-bit word boundaries (missing %d bytes). Atomic operations will crash on 32-bit systems.", name, offset, 8-(offset%8)) - } -} - -// TestPeerAlignment checks that atomically-accessed fields are -// aligned to 64-bit boundaries, as required by the atomic package. -// -// Unfortunately, violating this rule on 32-bit platforms results in a -// hard segfault at runtime. -func TestPeerAlignment(t *testing.T) { - var p Peer - - typ := reflect.TypeOf(&p).Elem() - t.Logf("Peer type size: %d, with fields:", typ.Size()) - for i := 0; i < typ.NumField(); i++ { - field := typ.Field(i) - t.Logf("\t%30s\toffset=%3v\t(type size=%3d, align=%d)", - field.Name, - field.Offset, - field.Type.Size(), - field.Type.Align(), - ) - } - - checkAlignment(t, "Peer.stats", unsafe.Offsetof(p.stats)) - checkAlignment(t, "Peer.isRunning", unsafe.Offsetof(p.isRunning)) -} - -// TestDeviceAlignment checks that atomically-accessed fields are -// aligned to 64-bit boundaries, as required by the atomic package. -// -// Unfortunately, violating this rule on 32-bit platforms results in a -// hard segfault at runtime. -func TestDeviceAlignment(t *testing.T) { - var d Device - - typ := reflect.TypeOf(&d).Elem() - t.Logf("Device type size: %d, with fields:", typ.Size()) - for i := 0; i < typ.NumField(); i++ { - field := typ.Field(i) - t.Logf("\t%30s\toffset=%3v\t(type size=%3d, align=%d)", - field.Name, - field.Offset, - field.Type.Size(), - field.Type.Align(), - ) - } - checkAlignment(t, "Device.rate.underLoadUntil", unsafe.Offsetof(d.rate)+unsafe.Offsetof(d.rate.underLoadUntil)) -} diff --git a/device/device.go b/device/device.go index 3625608..f96e277 100644 --- a/device/device.go +++ b/device/device.go @@ -30,7 +30,7 @@ type Device struct { // will become the actual state; Up can fail. // The device can also change state multiple times between time of check and time of use. // Unsynchronized uses of state must therefore be advisory/best-effort only. - state uint32 // actually a deviceState, but typed uint32 for convenience + state atomic.Uint32 // actually a deviceState, but typed uint32 for convenience // stopping blocks until all inputs to Device have been closed. stopping sync.WaitGroup // mu protects state changes. @@ -58,9 +58,8 @@ type Device struct { keyMap map[NoisePublicKey]*Peer } - // Keep this 8-byte aligned rate struct { - underLoadUntil int64 + underLoadUntil atomic.Int64 limiter ratelimiter.Ratelimiter } @@ -82,7 +81,7 @@ type Device struct { tun struct { device tun.Device - mtu int32 + mtu atomic.Int32 } ipcMutex sync.RWMutex @@ -94,10 +93,9 @@ type Device struct { // There are three states: down, up, closed. // Transitions: // -// down -----+ -// ↑↓ ↓ -// up -> closed -// +// down -----+ +// ↑↓ ↓ +// up -> closed type deviceState uint32 //go:generate go run golang.org/x/tools/cmd/stringer -type deviceState -trimprefix=deviceState @@ -110,7 +108,7 @@ const ( // deviceState returns device.state.state as a deviceState // See those docs for how to interpret this value. func (device *Device) deviceState() deviceState { - return deviceState(atomic.LoadUint32(&device.state.state)) + return deviceState(device.state.state.Load()) } // isClosed reports whether the device is closed (or is closing). @@ -149,14 +147,14 @@ func (device *Device) changeState(want deviceState) (err error) { case old: return nil case deviceStateUp: - atomic.StoreUint32(&device.state.state, uint32(deviceStateUp)) + device.state.state.Store(uint32(deviceStateUp)) err = device.upLocked() if err == nil { break } fallthrough // up failed; bring the device all the way back down case deviceStateDown: - atomic.StoreUint32(&device.state.state, uint32(deviceStateDown)) + device.state.state.Store(uint32(deviceStateDown)) errDown := device.downLocked() if err == nil { err = errDown @@ -182,7 +180,7 @@ func (device *Device) upLocked() error { device.peers.RLock() for _, peer := range device.peers.keyMap { peer.Start() - if atomic.LoadUint32(&peer.persistentKeepaliveInterval) > 0 { + if peer.persistentKeepaliveInterval.Load() > 0 { peer.SendKeepalive() } } @@ -219,11 +217,11 @@ func (device *Device) IsUnderLoad() bool { now := time.Now() underLoad := len(device.queue.handshake.c) >= QueueHandshakeSize/8 if underLoad { - atomic.StoreInt64(&device.rate.underLoadUntil, now.Add(UnderLoadAfterTime).UnixNano()) + device.rate.underLoadUntil.Store(now.Add(UnderLoadAfterTime).UnixNano()) return true } // check if recently under load - return atomic.LoadInt64(&device.rate.underLoadUntil) > now.UnixNano() + return device.rate.underLoadUntil.Load() > now.UnixNano() } func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { @@ -283,7 +281,7 @@ func (device *Device) SetPrivateKey(sk NoisePrivateKey) error { func NewDevice(tunDevice tun.Device, bind conn.Bind, logger *Logger) *Device { device := new(Device) - device.state.state = uint32(deviceStateDown) + device.state.state.Store(uint32(deviceStateDown)) device.closed = make(chan struct{}) device.log = logger device.net.bind = bind @@ -293,7 +291,7 @@ func NewDevice(tunDevice tun.Device, bind conn.Bind, logger *Logger) *Device { device.log.Errorf("Trouble determining MTU, assuming default: %v", err) mtu = DefaultMTU } - device.tun.mtu = int32(mtu) + device.tun.mtu.Store(int32(mtu)) device.peers.keyMap = make(map[NoisePublicKey]*Peer) device.rate.limiter.Init() device.indexTable.Init() @@ -359,7 +357,7 @@ func (device *Device) Close() { if device.isClosed() { return } - atomic.StoreUint32(&device.state.state, uint32(deviceStateClosed)) + device.state.state.Store(uint32(deviceStateClosed)) device.log.Verbosef("Device closing") device.tun.device.Close() diff --git a/device/device_test.go b/device/device_test.go index ab7236e..8cffe08 100644 --- a/device/device_test.go +++ b/device/device_test.go @@ -333,7 +333,7 @@ func BenchmarkThroughput(b *testing.B) { // Measure how long it takes to receive b.N packets, // starting when we receive the first packet. - var recv uint64 + var recv atomic.Uint64 var elapsed time.Duration var wg sync.WaitGroup wg.Add(1) @@ -342,7 +342,7 @@ func BenchmarkThroughput(b *testing.B) { var start time.Time for { <-pair[0].tun.Inbound - new := atomic.AddUint64(&recv, 1) + new := recv.Add(1) if new == 1 { start = time.Now() } @@ -358,7 +358,7 @@ func BenchmarkThroughput(b *testing.B) { ping := tuntest.Ping(pair[0].ip, pair[1].ip) pingc := pair[1].tun.Outbound var sent uint64 - for atomic.LoadUint64(&recv) != uint64(b.N) { + for recv.Load() != uint64(b.N) { sent++ pingc <- ping } diff --git a/device/keypair.go b/device/keypair.go index 788c947..206d7a9 100644 --- a/device/keypair.go +++ b/device/keypair.go @@ -10,7 +10,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "golang.zx2c4.com/wireguard/replay" ) @@ -23,7 +22,7 @@ import ( */ type Keypair struct { - sendNonce uint64 // accessed atomically + sendNonce atomic.Uint64 send cipher.AEAD receive cipher.AEAD replayFilter replay.Filter @@ -37,15 +36,7 @@ type Keypairs struct { sync.RWMutex current *Keypair previous *Keypair - next *Keypair -} - -func (kp *Keypairs) storeNext(next *Keypair) { - atomic.StorePointer((*unsafe.Pointer)((unsafe.Pointer)(&kp.next)), (unsafe.Pointer)(next)) -} - -func (kp *Keypairs) loadNext() *Keypair { - return (*Keypair)(atomic.LoadPointer((*unsafe.Pointer)((unsafe.Pointer)(&kp.next)))) + next atomic.Pointer[Keypair] } func (kp *Keypairs) Current() *Keypair { diff --git a/device/misc.go b/device/misc.go deleted file mode 100644 index 4126704..0000000 --- a/device/misc.go +++ /dev/null @@ -1,41 +0,0 @@ -/* SPDX-License-Identifier: MIT - * - * Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved. - */ - -package device - -import ( - "sync/atomic" -) - -/* Atomic Boolean */ - -const ( - AtomicFalse = int32(iota) - AtomicTrue -) - -type AtomicBool struct { - int32 -} - -func (a *AtomicBool) Get() bool { - return atomic.LoadInt32(&a.int32) == AtomicTrue -} - -func (a *AtomicBool) Swap(val bool) bool { - flag := AtomicFalse - if val { - flag = AtomicTrue - } - return atomic.SwapInt32(&a.int32, flag) == AtomicTrue -} - -func (a *AtomicBool) Set(val bool) { - flag := AtomicFalse - if val { - flag = AtomicTrue - } - atomic.StoreInt32(&a.int32, flag) -} diff --git a/device/noise-protocol.go b/device/noise-protocol.go index ffa0452..410926e 100644 --- a/device/noise-protocol.go +++ b/device/noise-protocol.go @@ -282,7 +282,7 @@ func (device *Device) ConsumeMessageInitiation(msg *MessageInitiation) *Peer { // lookup peer peer := device.LookupPeer(peerPK) - if peer == nil || !peer.isRunning.Get() { + if peer == nil || !peer.isRunning.Load() { return nil } @@ -581,12 +581,12 @@ func (peer *Peer) BeginSymmetricSession() error { defer keypairs.Unlock() previous := keypairs.previous - next := keypairs.loadNext() + next := keypairs.next.Load() current := keypairs.current if isInitiator { if next != nil { - keypairs.storeNext(nil) + keypairs.next.Store(nil) keypairs.previous = next device.DeleteKeypair(current) } else { @@ -595,7 +595,7 @@ func (peer *Peer) BeginSymmetricSession() error { device.DeleteKeypair(previous) keypairs.current = keypair } else { - keypairs.storeNext(keypair) + keypairs.next.Store(keypair) device.DeleteKeypair(next) keypairs.previous = nil device.DeleteKeypair(previous) @@ -607,18 +607,18 @@ func (peer *Peer) BeginSymmetricSession() error { func (peer *Peer) ReceivedWithKeypair(receivedKeypair *Keypair) bool { keypairs := &peer.keypairs - if keypairs.loadNext() != receivedKeypair { + if keypairs.next.Load() != receivedKeypair { return false } keypairs.Lock() defer keypairs.Unlock() - if keypairs.loadNext() != receivedKeypair { + if keypairs.next.Load() != receivedKeypair { return false } old := keypairs.previous keypairs.previous = keypairs.current peer.device.DeleteKeypair(old) - keypairs.current = keypairs.loadNext() - keypairs.storeNext(nil) + keypairs.current = keypairs.next.Load() + keypairs.next.Store(nil) return true } diff --git a/device/noise_test.go b/device/noise_test.go index e2f23c6..7c84efc 100644 --- a/device/noise_test.go +++ b/device/noise_test.go @@ -148,7 +148,7 @@ func TestNoiseHandshake(t *testing.T) { t.Fatal("failed to derive keypair for peer 2", err) } - key1 := peer1.keypairs.loadNext() + key1 := peer1.keypairs.next.Load() key2 := peer2.keypairs.current // encrypting / decryption test diff --git a/device/peer.go b/device/peer.go index 5bd52df..79feae7 100644 --- a/device/peer.go +++ b/device/peer.go @@ -16,24 +16,16 @@ import ( ) type Peer struct { - isRunning AtomicBool - sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer - keypairs Keypairs - handshake Handshake - device *Device - endpoint conn.Endpoint - stopping sync.WaitGroup // routines pending stop - - // These fields are accessed with atomic operations, which must be - // 64-bit aligned even on 32-bit platforms. Go guarantees that an - // allocated struct will be 64-bit aligned. So we place - // atomically-accessed fields up front, so that they can share in - // this alignment before smaller fields throw it off. - stats struct { - txBytes uint64 // bytes send to peer (endpoint) - rxBytes uint64 // bytes received from peer - lastHandshakeNano int64 // nano seconds since epoch - } + isRunning atomic.Bool + sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer + keypairs Keypairs + handshake Handshake + device *Device + endpoint conn.Endpoint + stopping sync.WaitGroup // routines pending stop + txBytes atomic.Uint64 // bytes send to peer (endpoint) + rxBytes atomic.Uint64 // bytes received from peer + lastHandshakeNano atomic.Int64 // nano seconds since epoch disableRoaming bool @@ -43,9 +35,9 @@ type Peer struct { newHandshake *Timer zeroKeyMaterial *Timer persistentKeepalive *Timer - handshakeAttempts uint32 - needAnotherKeepalive AtomicBool - sentLastMinuteHandshake AtomicBool + handshakeAttempts atomic.Uint32 + needAnotherKeepalive atomic.Bool + sentLastMinuteHandshake atomic.Bool } state struct { @@ -60,7 +52,7 @@ type Peer struct { cookieGenerator CookieGenerator trieEntries list.List - persistentKeepaliveInterval uint32 // accessed atomically + persistentKeepaliveInterval atomic.Uint32 } func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { @@ -133,7 +125,7 @@ func (peer *Peer) SendBuffer(buffer []byte) error { err := peer.device.net.bind.Send(buffer, peer.endpoint) if err == nil { - atomic.AddUint64(&peer.stats.txBytes, uint64(len(buffer))) + peer.txBytes.Add(uint64(len(buffer))) } return err } @@ -174,7 +166,7 @@ func (peer *Peer) Start() { peer.state.Lock() defer peer.state.Unlock() - if peer.isRunning.Get() { + if peer.isRunning.Load() { return } @@ -198,7 +190,7 @@ func (peer *Peer) Start() { go peer.RoutineSequentialSender() go peer.RoutineSequentialReceiver() - peer.isRunning.Set(true) + peer.isRunning.Store(true) } func (peer *Peer) ZeroAndFlushAll() { @@ -210,10 +202,10 @@ func (peer *Peer) ZeroAndFlushAll() { keypairs.Lock() device.DeleteKeypair(keypairs.previous) device.DeleteKeypair(keypairs.current) - device.DeleteKeypair(keypairs.loadNext()) + device.DeleteKeypair(keypairs.next.Load()) keypairs.previous = nil keypairs.current = nil - keypairs.storeNext(nil) + keypairs.next.Store(nil) keypairs.Unlock() // clear handshake state @@ -238,11 +230,10 @@ func (peer *Peer) ExpireCurrentKeypairs() { keypairs := &peer.keypairs keypairs.Lock() if keypairs.current != nil { - atomic.StoreUint64(&keypairs.current.sendNonce, RejectAfterMessages) + keypairs.current.sendNonce.Store(RejectAfterMessages) } - if keypairs.next != nil { - next := keypairs.loadNext() - atomic.StoreUint64(&next.sendNonce, RejectAfterMessages) + if next := keypairs.next.Load(); next != nil { + next.sendNonce.Store(RejectAfterMessages) } keypairs.Unlock() } diff --git a/device/pools.go b/device/pools.go index f40477b..9da0f79 100644 --- a/device/pools.go +++ b/device/pools.go @@ -14,7 +14,7 @@ type WaitPool struct { pool sync.Pool cond sync.Cond lock sync.Mutex - count uint32 + count atomic.Uint32 max uint32 } @@ -27,10 +27,10 @@ func NewWaitPool(max uint32, new func() any) *WaitPool { func (p *WaitPool) Get() any { if p.max != 0 { p.lock.Lock() - for atomic.LoadUint32(&p.count) >= p.max { + for p.count.Load() >= p.max { p.cond.Wait() } - atomic.AddUint32(&p.count, 1) + p.count.Add(1) p.lock.Unlock() } return p.pool.Get() @@ -41,7 +41,7 @@ func (p *WaitPool) Put(x any) { if p.max == 0 { return } - atomic.AddUint32(&p.count, ^uint32(0)) + p.count.Add(^uint32(0)) p.cond.Signal() } diff --git a/device/pools_test.go b/device/pools_test.go index 17e2298..48a98b0 100644 --- a/device/pools_test.go +++ b/device/pools_test.go @@ -17,29 +17,31 @@ import ( func TestWaitPool(t *testing.T) { t.Skip("Currently disabled") var wg sync.WaitGroup - trials := int32(100000) + var trials atomic.Int32 + startTrials := int32(100000) if raceEnabled { // This test can be very slow with -race. - trials /= 10 + startTrials /= 10 } + trials.Store(startTrials) workers := runtime.NumCPU() + 2 if workers-4 <= 0 { t.Skip("Not enough cores") } p := NewWaitPool(uint32(workers-4), func() any { return make([]byte, 16) }) wg.Add(workers) - max := uint32(0) + var max atomic.Uint32 updateMax := func() { - count := atomic.LoadUint32(&p.count) + count := p.count.Load() if count > p.max { t.Errorf("count (%d) > max (%d)", count, p.max) } for { - old := atomic.LoadUint32(&max) + old := max.Load() if count <= old { break } - if atomic.CompareAndSwapUint32(&max, old, count) { + if max.CompareAndSwap(old, count) { break } } @@ -47,7 +49,7 @@ func TestWaitPool(t *testing.T) { for i := 0; i < workers; i++ { go func() { defer wg.Done() - for atomic.AddInt32(&trials, -1) > 0 { + for trials.Add(-1) > 0 { updateMax() x := p.Get() updateMax() @@ -59,14 +61,15 @@ func TestWaitPool(t *testing.T) { }() } wg.Wait() - if max != p.max { + if max.Load() != p.max { t.Errorf("Actual maximum count (%d) != ideal maximum count (%d)", max, p.max) } } func BenchmarkWaitPool(b *testing.B) { var wg sync.WaitGroup - trials := int32(b.N) + var trials atomic.Int32 + trials.Store(int32(b.N)) workers := runtime.NumCPU() + 2 if workers-4 <= 0 { b.Skip("Not enough cores") @@ -77,7 +80,7 @@ func BenchmarkWaitPool(b *testing.B) { for i := 0; i < workers; i++ { go func() { defer wg.Done() - for atomic.AddInt32(&trials, -1) > 0 { + for trials.Add(-1) > 0 { x := p.Get() time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond) p.Put(x) diff --git a/device/receive.go b/device/receive.go index cc34498..4dbf1e8 100644 --- a/device/receive.go +++ b/device/receive.go @@ -11,7 +11,6 @@ import ( "errors" "net" "sync" - "sync/atomic" "time" "golang.org/x/crypto/chacha20poly1305" @@ -52,12 +51,12 @@ func (elem *QueueInboundElement) clearPointers() { * NOTE: Not thread safe, but called by sequential receiver! */ func (peer *Peer) keepKeyFreshReceiving() { - if peer.timers.sentLastMinuteHandshake.Get() { + if peer.timers.sentLastMinuteHandshake.Load() { return } keypair := peer.keypairs.Current() if keypair != nil && keypair.isInitiator && time.Since(keypair.created) > (RejectAfterTime-KeepaliveTimeout-RekeyTimeout) { - peer.timers.sentLastMinuteHandshake.Set(true) + peer.timers.sentLastMinuteHandshake.Store(true) peer.SendHandshakeInitiation(false) } } @@ -163,7 +162,7 @@ func (device *Device) RoutineReceiveIncoming(recv conn.ReceiveFunc) { elem.Lock() // add to decryption queues - if peer.isRunning.Get() { + if peer.isRunning.Load() { peer.queue.inbound.c <- elem device.queue.decryption.c <- elem buffer = device.GetMessageBuffer() @@ -268,7 +267,7 @@ func (device *Device) RoutineHandshake(id int) { // consume reply - if peer := entry.peer; peer.isRunning.Get() { + if peer := entry.peer; peer.isRunning.Load() { device.log.Verbosef("Receiving cookie response from %s", elem.endpoint.DstToString()) if !peer.cookieGenerator.ConsumeReply(&reply) { device.log.Verbosef("Could not decrypt invalid cookie response") @@ -341,7 +340,7 @@ func (device *Device) RoutineHandshake(id int) { peer.SetEndpointFromPacket(elem.endpoint) device.log.Verbosef("%v - Received handshake initiation", peer) - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) + peer.rxBytes.Add(uint64(len(elem.packet))) peer.SendHandshakeResponse() @@ -369,7 +368,7 @@ func (device *Device) RoutineHandshake(id int) { peer.SetEndpointFromPacket(elem.endpoint) device.log.Verbosef("%v - Received handshake response", peer) - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet))) + peer.rxBytes.Add(uint64(len(elem.packet))) // update timers @@ -426,7 +425,7 @@ func (peer *Peer) RoutineSequentialReceiver() { peer.keepKeyFreshReceiving() peer.timersAnyAuthenticatedPacketTraversal() peer.timersAnyAuthenticatedPacketReceived() - atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)+MinMessageSize)) + peer.rxBytes.Add(uint64(len(elem.packet) + MinMessageSize)) if len(elem.packet) == 0 { device.log.Verbosef("%v - Receiving keepalive packet", peer) diff --git a/device/send.go b/device/send.go index 0a7135f..471c51c 100644 --- a/device/send.go +++ b/device/send.go @@ -12,7 +12,6 @@ import ( "net" "os" "sync" - "sync/atomic" "time" "golang.org/x/crypto/chacha20poly1305" @@ -76,7 +75,7 @@ func (elem *QueueOutboundElement) clearPointers() { /* Queues a keepalive if no packets are queued for peer */ func (peer *Peer) SendKeepalive() { - if len(peer.queue.staged) == 0 && peer.isRunning.Get() { + if len(peer.queue.staged) == 0 && peer.isRunning.Load() { elem := peer.device.NewOutboundElement() select { case peer.queue.staged <- elem: @@ -91,7 +90,7 @@ func (peer *Peer) SendKeepalive() { func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { if !isRetry { - atomic.StoreUint32(&peer.timers.handshakeAttempts, 0) + peer.timers.handshakeAttempts.Store(0) } peer.handshake.mutex.RLock() @@ -193,7 +192,7 @@ func (peer *Peer) keepKeyFreshSending() { if keypair == nil { return } - nonce := atomic.LoadUint64(&keypair.sendNonce) + nonce := keypair.sendNonce.Load() if nonce > RekeyAfterMessages || (keypair.isInitiator && time.Since(keypair.created) > RekeyAfterTime) { peer.SendHandshakeInitiation(false) } @@ -269,7 +268,7 @@ func (device *Device) RoutineReadFromTUN() { if peer == nil { continue } - if peer.isRunning.Get() { + if peer.isRunning.Load() { peer.StagePacket(elem) elem = nil peer.SendStagedPackets() @@ -300,7 +299,7 @@ top: } keypair := peer.keypairs.Current() - if keypair == nil || atomic.LoadUint64(&keypair.sendNonce) >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime { + if keypair == nil || keypair.sendNonce.Load() >= RejectAfterMessages || time.Since(keypair.created) >= RejectAfterTime { peer.SendHandshakeInitiation(false) return } @@ -309,9 +308,9 @@ top: select { case elem := <-peer.queue.staged: elem.peer = peer - elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1 + elem.nonce = keypair.sendNonce.Add(1) - 1 if elem.nonce >= RejectAfterMessages { - atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) + keypair.sendNonce.Store(RejectAfterMessages) peer.StagePacket(elem) // XXX: Out of order, but we can't front-load go chans goto top } @@ -320,7 +319,7 @@ top: elem.Lock() // add to parallel and sequential queue - if peer.isRunning.Get() { + if peer.isRunning.Load() { peer.queue.outbound.c <- elem peer.device.queue.encryption.c <- elem } else { @@ -385,7 +384,7 @@ func (device *Device) RoutineEncryption(id int) { binary.LittleEndian.PutUint64(fieldNonce, elem.nonce) // pad content to multiple of 16 - paddingSize := calculatePaddingSize(len(elem.packet), int(atomic.LoadInt32(&device.tun.mtu))) + paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load())) elem.packet = append(elem.packet, paddingZeros[:paddingSize]...) // encrypt content and release to consumer @@ -419,7 +418,7 @@ func (peer *Peer) RoutineSequentialSender() { return } elem.Lock() - if !peer.isRunning.Get() { + if !peer.isRunning.Load() { // peer has been stopped; return re-usable elems to the shared pool. // This is an optimization only. It is possible for the peer to be stopped // immediately after this check, in which case, elem will get processed. diff --git a/device/timers.go b/device/timers.go index 4d2d0f8..c8ef887 100644 --- a/device/timers.go +++ b/device/timers.go @@ -9,7 +9,6 @@ package device import ( "sync" - "sync/atomic" "time" _ "unsafe" ) @@ -74,11 +73,11 @@ func (timer *Timer) IsPending() bool { } func (peer *Peer) timersActive() bool { - return peer.isRunning.Get() && peer.device != nil && peer.device.isUp() + return peer.isRunning.Load() && peer.device != nil && peer.device.isUp() } func expiredRetransmitHandshake(peer *Peer) { - if atomic.LoadUint32(&peer.timers.handshakeAttempts) > MaxTimerHandshakes { + if peer.timers.handshakeAttempts.Load() > MaxTimerHandshakes { peer.device.log.Verbosef("%s - Handshake did not complete after %d attempts, giving up", peer, MaxTimerHandshakes+2) if peer.timersActive() { @@ -97,8 +96,8 @@ func expiredRetransmitHandshake(peer *Peer) { peer.timers.zeroKeyMaterial.Mod(RejectAfterTime * 3) } } else { - atomic.AddUint32(&peer.timers.handshakeAttempts, 1) - peer.device.log.Verbosef("%s - Handshake did not complete after %d seconds, retrying (try %d)", peer, int(RekeyTimeout.Seconds()), atomic.LoadUint32(&peer.timers.handshakeAttempts)+1) + peer.timers.handshakeAttempts.Add(1) + peer.device.log.Verbosef("%s - Handshake did not complete after %d seconds, retrying (try %d)", peer, int(RekeyTimeout.Seconds()), peer.timers.handshakeAttempts.Load()+1) /* We clear the endpoint address src address, in case this is the cause of trouble. */ peer.Lock() @@ -113,8 +112,8 @@ func expiredRetransmitHandshake(peer *Peer) { func expiredSendKeepalive(peer *Peer) { peer.SendKeepalive() - if peer.timers.needAnotherKeepalive.Get() { - peer.timers.needAnotherKeepalive.Set(false) + if peer.timers.needAnotherKeepalive.Load() { + peer.timers.needAnotherKeepalive.Store(false) if peer.timersActive() { peer.timers.sendKeepalive.Mod(KeepaliveTimeout) } @@ -138,7 +137,7 @@ func expiredZeroKeyMaterial(peer *Peer) { } func expiredPersistentKeepalive(peer *Peer) { - if atomic.LoadUint32(&peer.persistentKeepaliveInterval) > 0 { + if peer.persistentKeepaliveInterval.Load() > 0 { peer.SendKeepalive() } } @@ -156,7 +155,7 @@ func (peer *Peer) timersDataReceived() { if !peer.timers.sendKeepalive.IsPending() { peer.timers.sendKeepalive.Mod(KeepaliveTimeout) } else { - peer.timers.needAnotherKeepalive.Set(true) + peer.timers.needAnotherKeepalive.Store(true) } } } @@ -187,9 +186,9 @@ func (peer *Peer) timersHandshakeComplete() { if peer.timersActive() { peer.timers.retransmitHandshake.Del() } - atomic.StoreUint32(&peer.timers.handshakeAttempts, 0) - peer.timers.sentLastMinuteHandshake.Set(false) - atomic.StoreInt64(&peer.stats.lastHandshakeNano, time.Now().UnixNano()) + peer.timers.handshakeAttempts.Store(0) + peer.timers.sentLastMinuteHandshake.Store(false) + peer.lastHandshakeNano.Store(time.Now().UnixNano()) } /* Should be called after an ephemeral key is created, which is before sending a handshake response or after receiving a handshake response. */ @@ -201,7 +200,7 @@ func (peer *Peer) timersSessionDerived() { /* Should be called before a packet with authentication -- keepalive, data, or handshake -- is sent, or after one is received. */ func (peer *Peer) timersAnyAuthenticatedPacketTraversal() { - keepalive := atomic.LoadUint32(&peer.persistentKeepaliveInterval) + keepalive := peer.persistentKeepaliveInterval.Load() if keepalive > 0 && peer.timersActive() { peer.timers.persistentKeepalive.Mod(time.Duration(keepalive) * time.Second) } @@ -216,9 +215,9 @@ func (peer *Peer) timersInit() { } func (peer *Peer) timersStart() { - atomic.StoreUint32(&peer.timers.handshakeAttempts, 0) - peer.timers.sentLastMinuteHandshake.Set(false) - peer.timers.needAnotherKeepalive.Set(false) + peer.timers.handshakeAttempts.Store(0) + peer.timers.sentLastMinuteHandshake.Store(false) + peer.timers.needAnotherKeepalive.Store(false) } func (peer *Peer) timersStop() { diff --git a/device/tun.go b/device/tun.go index 4af9548..d94bde1 100644 --- a/device/tun.go +++ b/device/tun.go @@ -7,7 +7,6 @@ package device import ( "fmt" - "sync/atomic" "golang.zx2c4.com/wireguard/tun" ) @@ -33,7 +32,7 @@ func (device *Device) RoutineTUNEventReader() { tooLarge = fmt.Sprintf(" (too large, capped at %v)", MaxContentSize) mtu = MaxContentSize } - old := atomic.SwapInt32(&device.tun.mtu, int32(mtu)) + old := device.tun.mtu.Swap(int32(mtu)) if int(old) != mtu { device.log.Verbosef("MTU updated: %v%s", mtu, tooLarge) } diff --git a/device/uapi.go b/device/uapi.go index 30dd97e..550a032 100644 --- a/device/uapi.go +++ b/device/uapi.go @@ -16,7 +16,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "golang.zx2c4.com/wireguard/ipc" @@ -112,15 +111,15 @@ func (device *Device) IpcGetOperation(w io.Writer) error { sendf("endpoint=%s", peer.endpoint.DstToString()) } - nano := atomic.LoadInt64(&peer.stats.lastHandshakeNano) + nano := peer.lastHandshakeNano.Load() secs := nano / time.Second.Nanoseconds() nano %= time.Second.Nanoseconds() sendf("last_handshake_time_sec=%d", secs) sendf("last_handshake_time_nsec=%d", nano) - sendf("tx_bytes=%d", atomic.LoadUint64(&peer.stats.txBytes)) - sendf("rx_bytes=%d", atomic.LoadUint64(&peer.stats.rxBytes)) - sendf("persistent_keepalive_interval=%d", atomic.LoadUint32(&peer.persistentKeepaliveInterval)) + sendf("tx_bytes=%d", peer.txBytes.Load()) + sendf("rx_bytes=%d", peer.rxBytes.Load()) + sendf("persistent_keepalive_interval=%d", peer.persistentKeepaliveInterval.Load()) device.allowedips.EntriesForPeer(peer, func(prefix netip.Prefix) bool { sendf("allowed_ip=%s", prefix.String()) @@ -358,7 +357,7 @@ func (device *Device) handlePeerLine(peer *ipcSetPeer, key, value string) error return ipcErrorf(ipc.IpcErrorInvalid, "failed to set persistent keepalive interval: %w", err) } - old := atomic.SwapUint32(&peer.persistentKeepaliveInterval, uint32(secs)) + old := peer.persistentKeepaliveInterval.Swap(uint32(secs)) // Send immediate keepalive if we're turning it on and before it wasn't on. peer.pkaOn = old == 0 && secs != 0 |