summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go20
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go17
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go16
-rw-r--r--pkg/tcpip/link/sharedmem/server_rx.go19
-rw-r--r--pkg/tcpip/link/sharedmem/server_tx.go16
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_server.go17
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go16
7 files changed, 100 insertions, 21 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
index 9a0aad5d7..b8a7f3d86 100644
--- a/pkg/tcpip/link/sharedmem/queue/queue_test.go
+++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go
@@ -35,7 +35,8 @@ func TestBasicTxQueue(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Enqueue two buffers.
b := []TxBuffer{
@@ -203,7 +204,8 @@ func TestBadTxCompletion(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Post a completion that is too short, and check that it is ignored.
if d := txp.Push(7); d == nil {
@@ -318,7 +320,8 @@ func TestFillTxPipe(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Transmit twice, which should fill the tx pipe.
b := []TxBuffer{
@@ -386,7 +389,8 @@ func TestLotsOfTransmissions(t *testing.T) {
txp.Init(pb2)
var q Tx
- q.Init(pb1, pb2)
+ var state uint32
+ q.Init(pb1, pb2, &state)
// Prepare packet with two buffers.
b := []TxBuffer{
@@ -496,8 +500,8 @@ func TestRxEnableNotification(t *testing.T) {
q.Init(pb1, pb2, &state)
q.EnableNotification()
- if state != eventFDEnabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
+ if state != EventFDEnabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDEnabled)
}
}
@@ -511,7 +515,7 @@ func TestRxDisableNotification(t *testing.T) {
q.Init(pb1, pb2, &state)
q.DisableNotification()
- if state != eventFDDisabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
+ if state != EventFDDisabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDDisabled)
}
}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
index a78826ebc..89bdf5ef6 100644
--- a/pkg/tcpip/link/sharedmem/queue/rx.go
+++ b/pkg/tcpip/link/sharedmem/queue/rx.go
@@ -49,9 +49,16 @@ const (
sizeOfConsumedBuffer = 28
// The following are the allowed states of the shared data area.
- eventFDUninitialized = 0
- eventFDDisabled = 1
- eventFDEnabled = 2
+ // EventFDUinitialized is the value stored at the start of the shared data
+ // region when it hasn't been initialized.
+ EventFDUninitialized = 0
+ // EventFDDisabled is the value stored at the start of the shared data region
+ // when notifications using eventFD has been disabled.
+ EventFDDisabled = 1
+ // EventFDEnabled is the value stored at the start of the shared data region
+ // when eventFD should be notified as the peer might be blocked waiting on
+ // notifications.
+ EventFDEnabled = 2
)
// RxBuffer is the descriptor of a receive buffer.
@@ -84,13 +91,13 @@ func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
// EnableNotification updates the shared state such that the peer will notify
// the eventfd when there are packets to be dequeued.
func (r *Rx) EnableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
+ atomic.StoreUint32(r.sharedEventFDState, EventFDEnabled)
}
// DisableNotification updates the shared state such that the peer will not
// notify the eventfd.
func (r *Rx) DisableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
+ atomic.StoreUint32(r.sharedEventFDState, EventFDDisabled)
}
// PostedBuffersLimit returns the maximum number of buffers that can be posted
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
index beffe807b..09907c761 100644
--- a/pkg/tcpip/link/sharedmem/queue/tx.go
+++ b/pkg/tcpip/link/sharedmem/queue/tx.go
@@ -16,6 +16,7 @@ package queue
import (
"encoding/binary"
+ "sync/atomic"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
@@ -49,14 +50,23 @@ type TxBuffer struct {
//
// This struct is thread-compatible.
type Tx struct {
- tx pipe.Tx
- rx pipe.Rx
+ tx pipe.Tx
+ rx pipe.Rx
+ sharedEventFDState *uint32
}
// Init initializes the transmit queue with the given pipes.
-func (t *Tx) Init(tx, rx []byte) {
+func (t *Tx) Init(tx, rx []byte, sharedEventFDState *uint32) {
t.tx.Init(tx)
t.rx.Init(rx)
+ t.sharedEventFDState = sharedEventFDState
+}
+
+// NotificationsEnabled returns true if eventFD should be used to notify the
+// peer of events (eg. packet transmit etc).
+func (t *Tx) NotificationsEnabled() bool {
+ // Notifications are considered enabled unless explicitly disabled.
+ return atomic.LoadUint32(t.sharedEventFDState) != EventFDDisabled
}
// Enqueue queues the given linked list of buffers for transmission as one
diff --git a/pkg/tcpip/link/sharedmem/server_rx.go b/pkg/tcpip/link/sharedmem/server_rx.go
index 6ea21ffd1..40068334b 100644
--- a/pkg/tcpip/link/sharedmem/server_rx.go
+++ b/pkg/tcpip/link/sharedmem/server_rx.go
@@ -18,6 +18,8 @@
package sharedmem
import (
+ "sync/atomic"
+
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
"gvisor.dev/gvisor/pkg/eventfd"
@@ -42,6 +44,10 @@ type serverRx struct {
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
+
+ // sharedEventFDState is the memory region in sharedData used to enable
+ // disable notifications on eventFD.
+ sharedEventFDState *uint32
}
// init initializes all state needed by the serverTx queue based on the
@@ -89,6 +95,7 @@ func (s *serverRx) init(c *QueueConfig) error {
s.data = data
s.eventFD = efd
s.sharedData = sharedData
+ s.sharedEventFDState = sharedDataPointer(sharedData)
cu.Release()
return nil
@@ -102,6 +109,18 @@ func (s *serverRx) cleanup() {
s.eventFD.Close()
}
+// EnableNotification updates the shared state such that the peer will notify
+// the eventfd when there are packets to be dequeued.
+func (s *serverRx) EnableNotification() {
+ atomic.StoreUint32(s.sharedEventFDState, queue.EventFDEnabled)
+}
+
+// DisableNotification updates the shared state such that the peer will not
+// notify the eventfd.
+func (s *serverRx) DisableNotification() {
+ atomic.StoreUint32(s.sharedEventFDState, queue.EventFDDisabled)
+}
+
// completionNotificationSize is size in bytes of a completion notification sent
// on the completion queue after a transmitted packet has been handled.
const completionNotificationSize = 8
diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go
index 13a82903f..79a9e382b 100644
--- a/pkg/tcpip/link/sharedmem/server_tx.go
+++ b/pkg/tcpip/link/sharedmem/server_tx.go
@@ -18,6 +18,8 @@
package sharedmem
import (
+ "sync/atomic"
+
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
"gvisor.dev/gvisor/pkg/eventfd"
@@ -45,6 +47,10 @@ type serverTx struct {
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
+
+ // sharedEventFDState is the memory region in sharedData used to enable/disable
+ // notifications on eventFD.
+ sharedEventFDState *uint32
}
// init initializes all tstate needed by the serverTx queue based on the
@@ -94,6 +100,7 @@ func (s *serverTx) init(c *QueueConfig) error {
s.data = data
s.eventFD = efd
s.sharedData = sharedData
+ s.sharedEventFDState = sharedDataPointer(sharedData)
return nil
}
@@ -170,6 +177,13 @@ func (s *serverTx) transmit(views []buffer.View) bool {
return true
}
+func (s *serverTx) notificationsEnabled() bool {
+ // notifications are considered to be enabled unless explicitly disabled.
+ return atomic.LoadUint32(s.sharedEventFDState) != queue.EventFDDisabled
+}
+
func (s *serverTx) notify() {
- s.eventFD.Notify()
+ if s.notificationsEnabled() {
+ s.eventFD.Notify()
+ }
}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go
index 43c5b8c63..c39eca33f 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem_server.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go
@@ -287,8 +287,21 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) {
for atomic.LoadUint32(&e.stopRequested) == 0 {
b := e.rx.receive()
if b == nil {
- e.rx.waitForPackets()
- continue
+ e.rx.EnableNotification()
+ // Now pull again to make sure we didn't receive any packets
+ // while notifications were not enabled.
+ for {
+ b = e.rx.receive()
+ if b != nil {
+ // Disable notifications as we only need to be notified when we are going
+ // to block on eventFD. This should prevent the peer from needlessly
+ // writing to eventFD when this end is already awake and processing
+ // packets.
+ e.rx.DisableNotification()
+ break
+ }
+ e.rx.waitForPackets()
+ }
}
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: buffer.View(b).ToVectorisedView(),
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
index d6c61afee..a74fc012b 100644
--- a/pkg/tcpip/link/sharedmem/tx.go
+++ b/pkg/tcpip/link/sharedmem/tx.go
@@ -34,6 +34,7 @@ type tx struct {
ids idManager
bufs bufferManager
eventFD eventfd.Eventfd
+ sharedData []byte
sharedDataFD int
}
@@ -62,13 +63,22 @@ func (t *tx) init(mtu uint32, c *QueueConfig) error {
return err
}
+ sharedData, err := getBuffer(c.SharedDataFD)
+ if err != nil {
+ unix.Munmap(txPipe)
+ unix.Munmap(rxPipe)
+ unix.Munmap(data)
+ }
+
// Initialize state based on buffers.
- t.q.Init(txPipe, rxPipe)
+ t.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
t.ids.init()
t.bufs.init(0, len(data), int(mtu))
t.data = data
t.eventFD = c.EventFD
t.sharedDataFD = c.SharedDataFD
+ t.sharedData = sharedData
+
return nil
}
@@ -149,7 +159,9 @@ func (t *tx) transmit(bufs ...buffer.View) bool {
// notify writes to the tx.eventFD to indicate to the peer that there is data to
// be read.
func (t *tx) notify() {
- t.eventFD.Notify()
+ if t.q.NotificationsEnabled() {
+ t.eventFD.Notify()
+ }
}
// idDescriptor is used by idManager to either point to a tx buffer (in case