summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/rx.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem/rx.go')
-rw-r--r--pkg/tcpip/link/sharedmem/rx.go147
1 files changed, 147 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go
new file mode 100644
index 000000000..951ed966b
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/rx.go
@@ -0,0 +1,147 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sharedmem
+
+import (
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue"
+)
+
+// rx holds all state associated with an rx queue.
+type rx struct {
+ data []byte
+ sharedData []byte
+ q queue.Rx
+ eventFD int
+}
+
+// init initializes all state needed by the rx queue based on the information
+// provided.
+//
+// The caller always retains ownership of all file descriptors passed in. The
+// queue implementation will duplicate any that it may need in the future.
+func (r *rx) init(mtu uint32, c *QueueConfig) error {
+ // Map in all buffers.
+ txPipe, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ return err
+ }
+
+ rxPipe, err := getBuffer(c.RxPipeFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ return err
+ }
+
+ data, err := getBuffer(c.DataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ return err
+ }
+
+ sharedData, err := getBuffer(c.SharedDataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ return err
+ }
+
+ // Duplicate the eventFD so that caller can close it but we can still
+ // use it.
+ efd, err := syscall.Dup(c.EventFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ return err
+ }
+
+ // Set the eventfd as non-blocking.
+ if err := syscall.SetNonblock(efd, true); err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ syscall.Close(efd)
+ return err
+ }
+
+ // Initialize state based on buffers.
+ r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
+ r.data = data
+ r.eventFD = efd
+ r.sharedData = sharedData
+
+ return nil
+}
+
+// cleanup releases all resources allocated during init(). It must only be
+// called if init() has previously succeeded.
+func (r *rx) cleanup() {
+ a, b := r.q.Bytes()
+ syscall.Munmap(a)
+ syscall.Munmap(b)
+
+ syscall.Munmap(r.data)
+ syscall.Munmap(r.sharedData)
+ syscall.Close(r.eventFD)
+}
+
+// postAndReceive posts the provided buffers (if any), and then tries to read
+// from the receive queue.
+//
+// Capacity permitting, it reuses the posted buffer slice to store the buffers
+// that were read as well.
+//
+// This function will block if there aren't any available packets.
+func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) {
+ // Post the buffers first. If we cannot post, sleep until we can. We
+ // never post more than will fit concurrently, so it's safe to wait
+ // until enough room is available.
+ if len(b) != 0 && !r.q.PostBuffers(b) {
+ r.q.EnableNotification()
+ for !r.q.PostBuffers(b) {
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+ }
+
+ // Read the next set of descriptors.
+ b, n := r.q.Dequeue(b[:0])
+ if len(b) != 0 {
+ return b, n
+ }
+
+ // Data isn't immediately available. Enable eventfd notifications.
+ r.q.EnableNotification()
+ for {
+ b, n = r.q.Dequeue(b)
+ if len(b) != 0 {
+ break
+ }
+
+ // Wait for notification.
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+
+ return b, n
+}