diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem')
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/queue_test.go | 20 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/rx.go | 17 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/tx.go | 16 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/server_rx.go | 19 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/server_tx.go | 16 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem_server.go | 17 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/tx.go | 16 |
7 files changed, 100 insertions, 21 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go index 9a0aad5d7..b8a7f3d86 100644 --- a/pkg/tcpip/link/sharedmem/queue/queue_test.go +++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go @@ -35,7 +35,8 @@ func TestBasicTxQueue(t *testing.T) { txp.Init(pb2) var q Tx - q.Init(pb1, pb2) + var state uint32 + q.Init(pb1, pb2, &state) // Enqueue two buffers. b := []TxBuffer{ @@ -203,7 +204,8 @@ func TestBadTxCompletion(t *testing.T) { txp.Init(pb2) var q Tx - q.Init(pb1, pb2) + var state uint32 + q.Init(pb1, pb2, &state) // Post a completion that is too short, and check that it is ignored. if d := txp.Push(7); d == nil { @@ -318,7 +320,8 @@ func TestFillTxPipe(t *testing.T) { txp.Init(pb2) var q Tx - q.Init(pb1, pb2) + var state uint32 + q.Init(pb1, pb2, &state) // Transmit twice, which should fill the tx pipe. b := []TxBuffer{ @@ -386,7 +389,8 @@ func TestLotsOfTransmissions(t *testing.T) { txp.Init(pb2) var q Tx - q.Init(pb1, pb2) + var state uint32 + q.Init(pb1, pb2, &state) // Prepare packet with two buffers. b := []TxBuffer{ @@ -496,8 +500,8 @@ func TestRxEnableNotification(t *testing.T) { q.Init(pb1, pb2, &state) q.EnableNotification() - if state != eventFDEnabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled) + if state != EventFDEnabled { + t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDEnabled) } } @@ -511,7 +515,7 @@ func TestRxDisableNotification(t *testing.T) { q.Init(pb1, pb2, &state) q.DisableNotification() - if state != eventFDDisabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled) + if state != EventFDDisabled { + t.Fatalf("Bad value in shared state: got %v, want %v", state, EventFDDisabled) } } diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go index a78826ebc..89bdf5ef6 100644 --- a/pkg/tcpip/link/sharedmem/queue/rx.go +++ b/pkg/tcpip/link/sharedmem/queue/rx.go @@ -49,9 +49,16 @@ const ( sizeOfConsumedBuffer = 28 // The following are the allowed states of the shared data area. - eventFDUninitialized = 0 - eventFDDisabled = 1 - eventFDEnabled = 2 + // EventFDUinitialized is the value stored at the start of the shared data + // region when it hasn't been initialized. + EventFDUninitialized = 0 + // EventFDDisabled is the value stored at the start of the shared data region + // when notifications using eventFD has been disabled. + EventFDDisabled = 1 + // EventFDEnabled is the value stored at the start of the shared data region + // when eventFD should be notified as the peer might be blocked waiting on + // notifications. + EventFDEnabled = 2 ) // RxBuffer is the descriptor of a receive buffer. @@ -84,13 +91,13 @@ func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { // EnableNotification updates the shared state such that the peer will notify // the eventfd when there are packets to be dequeued. func (r *Rx) EnableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) + atomic.StoreUint32(r.sharedEventFDState, EventFDEnabled) } // DisableNotification updates the shared state such that the peer will not // notify the eventfd. func (r *Rx) DisableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) + atomic.StoreUint32(r.sharedEventFDState, EventFDDisabled) } // PostedBuffersLimit returns the maximum number of buffers that can be posted diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go index beffe807b..09907c761 100644 --- a/pkg/tcpip/link/sharedmem/queue/tx.go +++ b/pkg/tcpip/link/sharedmem/queue/tx.go @@ -16,6 +16,7 @@ package queue import ( "encoding/binary" + "sync/atomic" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" @@ -49,14 +50,23 @@ type TxBuffer struct { // // This struct is thread-compatible. type Tx struct { - tx pipe.Tx - rx pipe.Rx + tx pipe.Tx + rx pipe.Rx + sharedEventFDState *uint32 } // Init initializes the transmit queue with the given pipes. -func (t *Tx) Init(tx, rx []byte) { +func (t *Tx) Init(tx, rx []byte, sharedEventFDState *uint32) { t.tx.Init(tx) t.rx.Init(rx) + t.sharedEventFDState = sharedEventFDState +} + +// NotificationsEnabled returns true if eventFD should be used to notify the +// peer of events (eg. packet transmit etc). +func (t *Tx) NotificationsEnabled() bool { + // Notifications are considered enabled unless explicitly disabled. + return atomic.LoadUint32(t.sharedEventFDState) != EventFDDisabled } // Enqueue queues the given linked list of buffers for transmission as one diff --git a/pkg/tcpip/link/sharedmem/server_rx.go b/pkg/tcpip/link/sharedmem/server_rx.go index 6ea21ffd1..40068334b 100644 --- a/pkg/tcpip/link/sharedmem/server_rx.go +++ b/pkg/tcpip/link/sharedmem/server_rx.go @@ -18,6 +18,8 @@ package sharedmem import ( + "sync/atomic" + "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/cleanup" "gvisor.dev/gvisor/pkg/eventfd" @@ -42,6 +44,10 @@ type serverRx struct { // sharedData the memory region to use to enable/disable notifications. sharedData []byte + + // sharedEventFDState is the memory region in sharedData used to enable + // disable notifications on eventFD. + sharedEventFDState *uint32 } // init initializes all state needed by the serverTx queue based on the @@ -89,6 +95,7 @@ func (s *serverRx) init(c *QueueConfig) error { s.data = data s.eventFD = efd s.sharedData = sharedData + s.sharedEventFDState = sharedDataPointer(sharedData) cu.Release() return nil @@ -102,6 +109,18 @@ func (s *serverRx) cleanup() { s.eventFD.Close() } +// EnableNotification updates the shared state such that the peer will notify +// the eventfd when there are packets to be dequeued. +func (s *serverRx) EnableNotification() { + atomic.StoreUint32(s.sharedEventFDState, queue.EventFDEnabled) +} + +// DisableNotification updates the shared state such that the peer will not +// notify the eventfd. +func (s *serverRx) DisableNotification() { + atomic.StoreUint32(s.sharedEventFDState, queue.EventFDDisabled) +} + // completionNotificationSize is size in bytes of a completion notification sent // on the completion queue after a transmitted packet has been handled. const completionNotificationSize = 8 diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go index 13a82903f..79a9e382b 100644 --- a/pkg/tcpip/link/sharedmem/server_tx.go +++ b/pkg/tcpip/link/sharedmem/server_tx.go @@ -18,6 +18,8 @@ package sharedmem import ( + "sync/atomic" + "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/cleanup" "gvisor.dev/gvisor/pkg/eventfd" @@ -45,6 +47,10 @@ type serverTx struct { // sharedData the memory region to use to enable/disable notifications. sharedData []byte + + // sharedEventFDState is the memory region in sharedData used to enable/disable + // notifications on eventFD. + sharedEventFDState *uint32 } // init initializes all tstate needed by the serverTx queue based on the @@ -94,6 +100,7 @@ func (s *serverTx) init(c *QueueConfig) error { s.data = data s.eventFD = efd s.sharedData = sharedData + s.sharedEventFDState = sharedDataPointer(sharedData) return nil } @@ -170,6 +177,13 @@ func (s *serverTx) transmit(views []buffer.View) bool { return true } +func (s *serverTx) notificationsEnabled() bool { + // notifications are considered to be enabled unless explicitly disabled. + return atomic.LoadUint32(s.sharedEventFDState) != queue.EventFDDisabled +} + func (s *serverTx) notify() { - s.eventFD.Notify() + if s.notificationsEnabled() { + s.eventFD.Notify() + } } diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go index 43c5b8c63..c39eca33f 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem_server.go +++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go @@ -287,8 +287,21 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) { for atomic.LoadUint32(&e.stopRequested) == 0 { b := e.rx.receive() if b == nil { - e.rx.waitForPackets() - continue + e.rx.EnableNotification() + // Now pull again to make sure we didn't receive any packets + // while notifications were not enabled. + for { + b = e.rx.receive() + if b != nil { + // Disable notifications as we only need to be notified when we are going + // to block on eventFD. This should prevent the peer from needlessly + // writing to eventFD when this end is already awake and processing + // packets. + e.rx.DisableNotification() + break + } + e.rx.waitForPackets() + } } pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: buffer.View(b).ToVectorisedView(), diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go index d6c61afee..a74fc012b 100644 --- a/pkg/tcpip/link/sharedmem/tx.go +++ b/pkg/tcpip/link/sharedmem/tx.go @@ -34,6 +34,7 @@ type tx struct { ids idManager bufs bufferManager eventFD eventfd.Eventfd + sharedData []byte sharedDataFD int } @@ -62,13 +63,22 @@ func (t *tx) init(mtu uint32, c *QueueConfig) error { return err } + sharedData, err := getBuffer(c.SharedDataFD) + if err != nil { + unix.Munmap(txPipe) + unix.Munmap(rxPipe) + unix.Munmap(data) + } + // Initialize state based on buffers. - t.q.Init(txPipe, rxPipe) + t.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData)) t.ids.init() t.bufs.init(0, len(data), int(mtu)) t.data = data t.eventFD = c.EventFD t.sharedDataFD = c.SharedDataFD + t.sharedData = sharedData + return nil } @@ -149,7 +159,9 @@ func (t *tx) transmit(bufs ...buffer.View) bool { // notify writes to the tx.eventFD to indicate to the peer that there is data to // be read. func (t *tx) notify() { - t.eventFD.Notify() + if t.q.NotificationsEnabled() { + t.eventFD.Notify() + } } // idDescriptor is used by idManager to either point to a tx buffer (in case |