summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2021-10-27 17:22:48 -0700
committergVisor bot <gvisor-bot@google.com>2021-10-27 17:25:46 -0700
commit8acc3a9bb2225a5af9e5bf587d2a2baad0e5f841 (patch)
tree24c9464e53c56817bcb0feb4a537d27cbb0d7ee2 /pkg
parent9541a5842bf843414d872a539d32ce6e3202bf04 (diff)
Reduce eventFD notifications on transmit.
When transmitting packets we only need to notify if the peer is not already processing packets. sharedData region is used to enable/disable notifications and the peer will disable notifications when its actively processing packets and enable notifications just before it goes to sleep waiting on packets. This allows more efficient transmit as the sharedmem endpoint does not need to notify on eventFD and incur an expensive host systemcall when the peer is already awake. PiperOrigin-RevId: 406018843
Diffstat (limited to 'pkg')
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go20
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go17
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go16
-rw-r--r--pkg/tcpip/link/sharedmem/server_rx.go19
-rw-r--r--pkg/tcpip/link/sharedmem/server_tx.go16
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_server.go17
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go16
7 files changed, 100 insertions, 21 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
index 9a0aad5d7..b8a7f3d86 100644
--- a/pkg/tcpip/link/sharedmem/queue/queue_test.go
+++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go
@@ -35,7 +35,8 @@ func TestBasicTxQueue(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Enqueue two buffers.
b := []TxBuffer{
@@ -203,7 +204,8 @@ func TestBadTxCompletion(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Post a completion that is too short, and check that it is ignored.
if d := txp.Push(7); d == nil {
@@ -318,7 +320,8 @@ func TestFillTxPipe(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Transmit twice, which should fill the tx pipe.
b := []TxBuffer{
@@ -386,7 +389,8 @@ func TestLotsOfTransmissions(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Prepare packet with two buffers.
b := []TxBuffer{
@@ -496,8 +500,8 @@ func TestRxEnableNotification(t *testing.T) {
q.Init(pb1, pb2, &state)
q.EnableNotification()
- if state != eventFDEnabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
+ if state != EventFDEnabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDEnabled)
}
}
@@ -511,7 +515,7 @@ func TestRxDisableNotification(t *testing.T) {
q.Init(pb1, pb2, &state)
q.DisableNotification()
- if state != eventFDDisabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
+ if state != EventFDDisabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDDisabled)
}
}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
index a78826ebc..89bdf5ef6 100644
--- a/pkg/tcpip/link/sharedmem/queue/rx.go
+++ b/pkg/tcpip/link/sharedmem/queue/rx.go
@@ -49,9 +49,16 @@ const (
sizeOfConsumedBuffer = 28
// The following are the allowed states of the shared data area.
- eventFDUninitialized = 0
- eventFDDisabled = 1
- eventFDEnabled = 2
+ // EventFDUinitialized is the value stored at the start of the shared data
+ // region when it hasn't been initialized.
+ EventFDUninitialized = 0
+ // EventFDDisabled is the value stored at the start of the shared data region
+ // when notifications using eventFD has been disabled.
+ EventFDDisabled = 1
+ // EventFDEnabled is the value stored at the start of the shared data region
+ // when eventFD should be notified as the peer might be blocked waiting on
+ // notifications.
+ EventFDEnabled = 2
)
// RxBuffer is the descriptor of a receive buffer.
@@ -84,13 +91,13 @@ func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
// EnableNotification updates the shared state such that the peer will notify
// the eventfd when there are packets to be dequeued.
func (r *Rx) EnableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
+ atomic.StoreUint32(r.sharedEventFDState, EventFDEnabled)
}
// DisableNotification updates the shared state such that the peer will not
// notify the eventfd.
func (r *Rx) DisableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
+ atomic.StoreUint32(r.sharedEventFDState, EventFDDisabled)
}
// PostedBuffersLimit returns the maximum number of buffers that can be posted
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
index beffe807b..09907c761 100644
--- a/pkg/tcpip/link/sharedmem/queue/tx.go
+++ b/pkg/tcpip/link/sharedmem/queue/tx.go
@@ -16,6 +16,7 @@ package queue
import (
"encoding/binary"
+ "sync/atomic"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
@@ -49,14 +50,23 @@ type TxBuffer struct {
//
// This struct is thread-compatible.
type Tx struct {
- tx pipe.Tx
- rx pipe.Rx
+ tx pipe.Tx
+ rx pipe.Rx
+ sharedEventFDState *uint32
}
// Init initializes the transmit queue with the given pipes.
-func (t *Tx) Init(tx, rx []byte) {
+func (t *Tx) Init(tx, rx []byte, sharedEventFDState *uint32) {
t.tx.Init(tx)
t.rx.Init(rx)
+ t.sharedEventFDState = sharedEventFDState
+}
+
+// NotificationsEnabled returns true if eventFD should be used to notify the
+// peer of events (eg. packet transmit etc).
+func (t *Tx) NotificationsEnabled() bool {
+ // Notifications are considered enabled unless explicitly disabled.
+ return atomic.LoadUint32(t.sharedEventFDState) != EventFDDisabled
}
// Enqueue queues the given linked list of buffers for transmission as one
diff --git a/pkg/tcpip/link/sharedmem/server_rx.go b/pkg/tcpip/link/sharedmem/server_rx.go
index 6ea21ffd1..40068334b 100644
--- a/pkg/tcpip/link/sharedmem/server_rx.go
+++ b/pkg/tcpip/link/sharedmem/server_rx.go
@@ -18,6 +18,8 @@
package sharedmem
import (
+ "sync/atomic"
+
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
"gvisor.dev/gvisor/pkg/eventfd"
@@ -42,6 +44,10 @@ type serverRx struct {
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
+
+ // sharedEventFDState is the memory region in sharedData used to enable
+ // disable notifications on eventFD.
+ sharedEventFDState *uint32
}
// init initializes all state needed by the serverTx queue based on the
@@ -89,6 +95,7 @@ func (s *serverRx) init(c *QueueConfig) error {
s.data = data
s.eventFD = efd
s.sharedData = sharedData
+ s.sharedEventFDState = sharedDataPointer(sharedData)
cu.Release()
return nil
@@ -102,6 +109,18 @@ func (s *serverRx) cleanup() {
s.eventFD.Close()
}
+// EnableNotification updates the shared state such that the peer will notify
+// the eventfd when there are packets to be dequeued.
+func (s *serverRx) EnableNotification() {
+ atomic.StoreUint32(s.sharedEventFDState, queue.EventFDEnabled)
+}
+
+// DisableNotification updates the shared state such that the peer will not
+// notify the eventfd.
+func (s *serverRx) DisableNotification() {
+ atomic.StoreUint32(s.sharedEventFDState, queue.EventFDDisabled)
+}
+
// completionNotificationSize is size in bytes of a completion notification sent
// on the completion queue after a transmitted packet has been handled.
const completionNotificationSize = 8
diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go
index 13a82903f..79a9e382b 100644
--- a/pkg/tcpip/link/sharedmem/server_tx.go
+++ b/pkg/tcpip/link/sharedmem/server_tx.go
@@ -18,6 +18,8 @@
package sharedmem
import (
+ "sync/atomic"
+
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
"gvisor.dev/gvisor/pkg/eventfd"
@@ -45,6 +47,10 @@ type serverTx struct {
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
+
+ // sharedEventFDState is the memory region in sharedData used to enable/disable
+ // notifications on eventFD.
+ sharedEventFDState *uint32
}
// init initializes all tstate needed by the serverTx queue based on the
@@ -94,6 +100,7 @@ func (s *serverTx) init(c *QueueConfig) error {
s.data = data
s.eventFD = efd
s.sharedData = sharedData
+ s.sharedEventFDState = sharedDataPointer(sharedData)
return nil
}
@@ -170,6 +177,13 @@ func (s *serverTx) transmit(views []buffer.View) bool {
return true
}
+func (s *serverTx) notificationsEnabled() bool {
+ // notifications are considered to be enabled unless explicitly disabled.
+ return atomic.LoadUint32(s.sharedEventFDState) != queue.EventFDDisabled
+}
+
func (s *serverTx) notify() {
- s.eventFD.Notify()
+ if s.notificationsEnabled() {
+ s.eventFD.Notify()
+ }
}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go
index 43c5b8c63..c39eca33f 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem_server.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go
@@ -287,8 +287,21 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) {
for atomic.LoadUint32(&e.stopRequested) == 0 {
b := e.rx.receive()
if b == nil {
- e.rx.waitForPackets()
- continue
+ e.rx.EnableNotification()
+ // Now pull again to make sure we didn't receive any packets
+ // while notifications were not enabled.
+ for {
+ b = e.rx.receive()
+ if b != nil {
+ // Disable notifications as we only need to be notified when we are going
+ // to block on eventFD. This should prevent the peer from needlessly
+ // writing to eventFD when this end is already awake and processing
+ // packets.
+ e.rx.DisableNotification()
+ break
+ }
+ e.rx.waitForPackets()
+ }
}
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: buffer.View(b).ToVectorisedView(),
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
index d6c61afee..a74fc012b 100644
--- a/pkg/tcpip/link/sharedmem/tx.go
+++ b/pkg/tcpip/link/sharedmem/tx.go
@@ -34,6 +34,7 @@ type tx struct {
ids idManager
bufs bufferManager
eventFD eventfd.Eventfd
+ sharedData []byte
sharedDataFD int
}
@@ -62,13 +63,22 @@ func (t *tx) init(mtu uint32, c *QueueConfig) error {
return err
}
+ sharedData, err := getBuffer(c.SharedDataFD)
+ if err != nil {
+ unix.Munmap(txPipe)
+ unix.Munmap(rxPipe)
+ unix.Munmap(data)
+ }
+
// Initialize state based on buffers.
- t.q.Init(txPipe, rxPipe)
+ t.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
t.ids.init()
t.bufs.init(0, len(data), int(mtu))
t.data = data
t.eventFD = c.EventFD
t.sharedDataFD = c.SharedDataFD
+ t.sharedData = sharedData
+
return nil
}
@@ -149,7 +159,9 @@ func (t *tx) transmit(bufs ...buffer.View) bool {
// notify writes to the tx.eventFD to indicate to the peer that there is data to
// be read.
func (t *tx) notify() {
- t.eventFD.Notify()
+ if t.q.NotificationsEnabled() {
+ t.eventFD.Notify()
+ }
}
// idDescriptor is used by idManager to either point to a tx buffer (in case