summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem
diff options
context:
space:
mode:
authorGoogler <noreply@google.com>2018-04-27 10:37:02 -0700
committerAdin Scannell <ascannell@google.com>2018-04-28 01:44:26 -0400
commitd02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch)
tree54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/link/sharedmem
parentf70210e742919f40aa2f0934a22f1c9ba6dada62 (diff)
Check in gVisor.
PiperOrigin-RevId: 194583126 Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/link/sharedmem')
-rw-r--r--pkg/tcpip/link/sharedmem/BUILD42
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/BUILD23
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe.go68
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_test.go507
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go25
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/rx.go83
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/tx.go151
-rw-r--r--pkg/tcpip/link/sharedmem/queue/BUILD28
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go507
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go211
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go141
-rw-r--r--pkg/tcpip/link/sharedmem/rx.go147
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem.go240
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_test.go703
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_unsafe.go15
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go262
16 files changed, 3153 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD
new file mode 100644
index 000000000..a4a965924
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/BUILD
@@ -0,0 +1,42 @@
+package(licenses = ["notice"]) # BSD
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "sharedmem",
+ srcs = [
+ "rx.go",
+ "sharedmem.go",
+ "sharedmem_unsafe.go",
+ "tx.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem",
+ visibility = [
+ "//:sandbox",
+ ],
+ deps = [
+ "//pkg/log",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/rawfile",
+ "//pkg/tcpip/link/sharedmem/queue",
+ "//pkg/tcpip/stack",
+ ],
+)
+
+go_test(
+ name = "sharedmem_test",
+ srcs = [
+ "sharedmem_test.go",
+ ],
+ embed = [":sharedmem"],
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/sharedmem/pipe",
+ "//pkg/tcpip/link/sharedmem/queue",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD
new file mode 100644
index 000000000..e8d795500
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/BUILD
@@ -0,0 +1,23 @@
+package(licenses = ["notice"]) # BSD
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "pipe",
+ srcs = [
+ "pipe.go",
+ "pipe_unsafe.go",
+ "rx.go",
+ "tx.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe",
+ visibility = ["//:sandbox"],
+)
+
+go_test(
+ name = "pipe_test",
+ srcs = [
+ "pipe_test.go",
+ ],
+ embed = [":pipe"],
+)
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go
new file mode 100644
index 000000000..1173a60da
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe.go
@@ -0,0 +1,68 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pipe implements a shared memory ring buffer on which a single reader
+// and a single writer can operate (read/write) concurrently. The ring buffer
+// allows for data of different sizes to be written, and preserves the boundary
+// of the written data.
+//
+// Example usage is as follows:
+//
+// wb := t.Push(20)
+// // Write data to wb.
+// t.Flush()
+//
+// rb := r.Pull()
+// // Do something with data in rb.
+// t.Flush()
+package pipe
+
+import (
+ "math"
+)
+
+const (
+ jump uint64 = math.MaxUint32 + 1
+ offsetMask uint64 = math.MaxUint32
+ revolutionMask uint64 = ^offsetMask
+
+ sizeOfSlotHeader = 8 // sizeof(uint64)
+ slotFree uint64 = 1 << 63
+ slotSizeMask uint64 = math.MaxUint32
+)
+
+// payloadToSlotSize calculates the total size of a slot based on its payload
+// size. The total size is the header size, plus the payload size, plus padding
+// if necessary to make the total size a multiple of sizeOfSlotHeader.
+func payloadToSlotSize(payloadSize uint64) uint64 {
+ s := sizeOfSlotHeader + payloadSize
+ return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1)
+}
+
+// slotToPayloadSize calculates the payload size of a slot based on the total
+// size of the slot. This is only meant to be used when creating slots that
+// don't carry information (e.g., free slots or wrap slots).
+func slotToPayloadSize(offset uint64) uint64 {
+ return offset - sizeOfSlotHeader
+}
+
+// pipe is a basic data structure used by both (transmit & receive) ends of a
+// pipe. Indices into this pipe are split into two fields: offset, which counts
+// the number of bytes from the beginning of the buffer, and revolution, which
+// counts the number of times the index has wrapped around.
+type pipe struct {
+ buffer []byte
+}
+
+// init initializes the pipe buffer such that its size is a multiple of the size
+// of the slot header.
+func (p *pipe) init(b []byte) {
+ p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)]
+}
+
+// data returns a section of the buffer starting at the given index (which may
+// include revolution information) and with the given size.
+func (p *pipe) data(idx uint64, size uint64) []byte {
+ return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size]
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
new file mode 100644
index 000000000..441ff5b25
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
@@ -0,0 +1,507 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pipe
+
+import (
+ "math/rand"
+ "reflect"
+ "runtime"
+ "sync"
+ "testing"
+)
+
+func TestSimpleReadWrite(t *testing.T) {
+ // Check that a simple write can be properly read from the rx side.
+ tr := rand.New(rand.NewSource(99))
+ rr := rand.New(rand.NewSource(99))
+
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ wb := tx.Push(10)
+ if wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ for i := range wb {
+ wb[i] = byte(tr.Intn(256))
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ rb := rx.Pull()
+ if len(rb) != 10 {
+ t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10)
+ }
+
+ for i := range rb {
+ if v := byte(rr.Intn(256)); v != rb[i] {
+ t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v)
+ }
+ }
+ rx.Flush()
+}
+
+func TestEmptyRead(t *testing.T) {
+ // Check that pulling from an empty pipe fails.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestTooLargeWrite(t *testing.T) {
+ // Check that writes that are too large are properly rejected.
+ b := make([]byte, 96)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(96); wb != nil {
+ t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe")
+ }
+
+ if wb := tx.Push(88); wb != nil {
+ t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe")
+ }
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+}
+
+func TestFullWrite(t *testing.T) {
+ // Check that writes fail when the pipe is full.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+
+ if wb := tx.Push(1); wb != nil {
+ t.Fatalf("Write succeeded on full pipe")
+ }
+}
+
+func TestFullAndFlushedWrite(t *testing.T) {
+ // Check that writes fail when the pipe is full and has already been
+ // flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+
+ tx.Flush()
+
+ if wb := tx.Push(1); wb != nil {
+ t.Fatalf("Write succeeded on full pipe")
+ }
+}
+
+func TestTxFlushTwice(t *testing.T) {
+ // Checks that a second consecutive tx flush is a no-op.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ // Make copy of original tx queue, flush it, then check that it didn't
+ // change.
+ orig := tx
+ tx.Flush()
+
+ if !reflect.DeepEqual(orig, tx) {
+ t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig)
+ }
+}
+
+func TestRxFlushTwice(t *testing.T) {
+ // Checks that a second consecutive rx flush is a no-op.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // Make copy of original rx queue, flush it, then check that it didn't
+ // change.
+ orig := rx
+ rx.Flush()
+
+ if !reflect.DeepEqual(orig, rx) {
+ t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig)
+ }
+}
+
+func TestWrapInMiddleOfTransaction(t *testing.T) {
+ // Check that writes are not flushed when we need to wrap the buffer
+ // around.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ // We haven't flushed yet, so pull must return nil.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ tx.Flush()
+
+ // The two buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+}
+
+func TestWriteAbort(t *testing.T) {
+ // Check that a read fails on a pipe that has had data pushed to it but
+ // has aborted the push.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Write failed on empty pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+
+ tx.Abort()
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestWrappedWriteAbort(t *testing.T) {
+ // Check that writes are properly aborted even if the writes wrap
+ // around.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ // We haven't flushed yet, so pull must return nil.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ tx.Abort()
+
+ // The pushes were aborted, so no data should be readable.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ // Try the same transactions again, but flush this time.
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ tx.Flush()
+
+ // The two buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+}
+
+func TestEmptyReadOnNonFlushedWrite(t *testing.T) {
+ // Check that a read fails on a pipe that has had data pushed to it
+ // but not yet flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Write failed on empty pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+
+ tx.Flush()
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull on failed on non-empty pipe")
+ }
+}
+
+func TestPullAfterPullingEntirePipe(t *testing.T) {
+ // Check that Pull fails when the pipe is full, but all of it has
+ // already been pulled but not yet flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3
+ // buffers that will fill the pipe.
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(20); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ if wb := tx.Push(24); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ tx.Flush()
+
+ // The three buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ // Fourth pull must fail.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestNoRoomToWrapOnPush(t *testing.T) {
+ // Check that Push fails when it tries to allocate room to add a wrap
+ // message.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20,
+ // which won't fit (64+20+8+padding = 96, which wouldn't leave room for
+ // the padding), so it wraps around.
+ if wb := tx.Push(20); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ tx.Flush()
+
+ // Buffer offset is at 28. Try to write 70, which would require a wrap
+ // slot which cannot be created now.
+ if wb := tx.Push(70); wb != nil {
+ t.Fatalf("Push succeeded on pipe with no room for wrap message")
+ }
+}
+
+func TestRxImplicitFlushOfWrapMessage(t *testing.T) {
+ // Check if the first read is that of a wrapping message, that it gets
+ // immediately flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ // This will cause a wrapping message to written.
+ if wb := tx.Push(60); wb != nil {
+ t.Fatalf("Push succeeded when there is no room in pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+
+ // Read the first message.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // This should fail because of the wrapping message is taking up space.
+ if wb := tx.Push(60); wb != nil {
+ t.Fatalf("Push succeeded when there is no room in pipe")
+ }
+
+ // Try to read the next one. This should consume the wrapping message.
+ rx.Pull()
+
+ // This must now succeed.
+ if wb := tx.Push(60); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+}
+
+func TestConcurrentReaderWriter(t *testing.T) {
+ // Push a million buffers of random sizes and random contents. Check
+ // that buffers read match what was written.
+ tr := rand.New(rand.NewSource(99))
+ rr := rand.New(rand.NewSource(99))
+
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ var rx Rx
+ rx.Init(b)
+
+ const count = 1000000
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ runtime.Gosched()
+ for i := 0; i < count; i++ {
+ n := 1 + tr.Intn(80)
+ wb := tx.Push(uint64(n))
+ for wb == nil {
+ wb = tx.Push(uint64(n))
+ }
+
+ for j := range wb {
+ wb[j] = byte(tr.Intn(256))
+ }
+
+ tx.Flush()
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ runtime.Gosched()
+ for i := 0; i < count; i++ {
+ n := 1 + rr.Intn(80)
+ rb := rx.Pull()
+ for rb == nil {
+ rb = rx.Pull()
+ }
+
+ if n != len(rb) {
+ t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n)
+ }
+
+ for j := range rb {
+ if v := byte(rr.Intn(256)); v != rb[j] {
+ t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v)
+ }
+ }
+
+ rx.Flush()
+ }
+ }()
+
+ wg.Wait()
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
new file mode 100644
index 000000000..d536abedf
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
@@ -0,0 +1,25 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pipe
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+func (p *pipe) write(idx uint64, v uint64) {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ *ptr = v
+}
+
+func (p *pipe) writeAtomic(idx uint64, v uint64) {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ atomic.StoreUint64(ptr, v)
+}
+
+func (p *pipe) readAtomic(idx uint64) uint64 {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ return atomic.LoadUint64(ptr)
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go
new file mode 100644
index 000000000..261e21f9e
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/rx.go
@@ -0,0 +1,83 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pipe
+
+// Rx is the receive side of the shared memory ring buffer.
+type Rx struct {
+ p pipe
+
+ tail uint64
+ head uint64
+}
+
+// Init initializes the receive end of the pipe. In the initial state, the next
+// slot to be inspected is the very first one.
+func (r *Rx) Init(b []byte) {
+ r.p.init(b)
+ r.tail = 0xfffffffe * jump
+ r.head = r.tail
+}
+
+// Pull reads the next buffer from the pipe, returning nil if there isn't one
+// currently available.
+//
+// The returned slice is available until Flush() is next called. After that, it
+// must not be touched.
+func (r *Rx) Pull() []byte {
+ if r.head == r.tail+jump {
+ // We've already pulled the whole pipe.
+ return nil
+ }
+
+ header := r.p.readAtomic(r.head)
+ if header&slotFree != 0 {
+ // The next slot is free, we can't pull it yet.
+ return nil
+ }
+
+ payloadSize := header & slotSizeMask
+ newHead := r.head + payloadToSlotSize(payloadSize)
+ headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
+
+ // Check if this is a wrapping slot. If that's the case, it carries no
+ // data, so we just skip it and try again from the first slot.
+ if int64(newHead-headWrap) >= 0 {
+ if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
+ return nil
+ }
+
+ if r.tail == r.head {
+ // If this is the first pull since the last Flush()
+ // call, we flush the state so that the sender can use
+ // this space if it needs to.
+ r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
+ r.tail = newHead
+ }
+
+ r.head = newHead
+ return r.Pull()
+ }
+
+ // Grab the buffer before updating r.head.
+ b := r.p.data(r.head, payloadSize)
+ r.head = newHead
+ return b
+}
+
+// Flush tells the transmitter that all buffers pulled since the last Flush()
+// have been used, so the transmitter is free to used their slots for further
+// transmission.
+func (r *Rx) Flush() {
+ if r.head == r.tail {
+ return
+ }
+ r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
+ r.tail = r.head
+}
+
+// Bytes returns the byte slice on which the pipe operates.
+func (r *Rx) Bytes() []byte {
+ return r.p.buffer
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go
new file mode 100644
index 000000000..374f515ab
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/tx.go
@@ -0,0 +1,151 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pipe
+
+// Tx is the transmit side of the shared memory ring buffer.
+type Tx struct {
+ p pipe
+ maxPayloadSize uint64
+
+ head uint64
+ tail uint64
+ next uint64
+
+ tailHeader uint64
+}
+
+// Init initializes the transmit end of the pipe. In the initial state, the next
+// slot to be written is the very first one, and the transmitter has the whole
+// ring buffer available to it.
+func (t *Tx) Init(b []byte) {
+ t.p.init(b)
+ // maxPayloadSize excludes the header of the payload, and the header
+ // of the wrapping message.
+ t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
+ t.tail = 0xfffffffe * jump
+ t.next = t.tail
+ t.head = t.tail + jump
+ t.p.write(t.tail, slotFree)
+}
+
+// Capacity determines how many records of the given size can be written to the
+// pipe before it fills up.
+func (t *Tx) Capacity(recordSize uint64) uint64 {
+ available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
+ entryLen := payloadToSlotSize(recordSize)
+ return available / entryLen
+}
+
+// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
+// populates the returned slice with the data to be transferred and enventually
+// calls Flush() to make the data visible to the reader, or Abort() to make the
+// pipe forget all Push() calls since the last Flush().
+//
+// The returned slice is available until Flush() or Abort() is next called.
+// After that, it must not be touched.
+func (t *Tx) Push(payloadSize uint64) []byte {
+ // Fail request if we know we will never have enough room.
+ if payloadSize > t.maxPayloadSize {
+ return nil
+ }
+
+ totalLen := payloadToSlotSize(payloadSize)
+ newNext := t.next + totalLen
+ nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
+ if int64(newNext-nextWrap) >= 0 {
+ // The new buffer would overflow the pipe, so we push a wrapping
+ // slot, then try to add the actual slot to the front of the
+ // pipe.
+ newNext = (newNext & revolutionMask) + jump
+ wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
+ if !t.reclaim(newNext) {
+ return nil
+ }
+
+ oldNext := t.next
+ t.next = newNext
+ if oldNext != t.tail {
+ t.p.write(oldNext, wrappingPayloadSize)
+ } else {
+ t.tailHeader = wrappingPayloadSize
+ t.Flush()
+ }
+
+ newNext += totalLen
+ }
+
+ // Check that we have enough room for the buffer.
+ if !t.reclaim(newNext) {
+ return nil
+ }
+
+ if t.next != t.tail {
+ t.p.write(t.next, payloadSize)
+ } else {
+ t.tailHeader = payloadSize
+ }
+
+ // Grab the buffer before updating t.next.
+ b := t.p.data(t.next, payloadSize)
+ t.next = newNext
+
+ return b
+}
+
+// reclaim attempts to advance the head until at least newNext. If the head is
+// already at or beyond newNext, nothing happens and true is returned; otherwise
+// it tries to reclaim slots that have already been consumed by the receive end
+// of the pipe (they will be marked as free) and returns a boolean indicating
+// whether it was successful in reclaiming enough slots.
+func (t *Tx) reclaim(newNext uint64) bool {
+ for int64(newNext-t.head) > 0 {
+ // Can't reclaim if slot is not free.
+ header := t.p.readAtomic(t.head)
+ if header&slotFree == 0 {
+ return false
+ }
+
+ payloadSize := header & slotSizeMask
+ newHead := t.head + payloadToSlotSize(payloadSize)
+
+ // Check newHead is within bounds and valid.
+ if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
+ return false
+ }
+
+ t.head = newHead
+ }
+
+ return true
+}
+
+// Abort causes all Push() calls since the last Flush() to be forgotten and
+// therefore they will not be made visible to the receiver.
+func (t *Tx) Abort() {
+ t.next = t.tail
+}
+
+// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
+// is the most recent] to be made visible to the receiver.
+func (t *Tx) Flush() {
+ if t.next == t.tail {
+ // Nothing to do if there are no pushed buffers.
+ return
+ }
+
+ if t.next != t.head {
+ // The receiver will spin in t.next, so we must make sure that
+ // the slotFree bit is set.
+ t.p.write(t.next, slotFree)
+ }
+
+ t.p.writeAtomic(t.tail, t.tailHeader)
+ t.tail = t.next
+}
+
+// Bytes returns the byte slice on which the pipe operates.
+func (t *Tx) Bytes() []byte {
+ return t.p.buffer
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD
new file mode 100644
index 000000000..56ea4641d
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/BUILD
@@ -0,0 +1,28 @@
+package(licenses = ["notice"]) # BSD
+
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "queue",
+ srcs = [
+ "rx.go",
+ "tx.go",
+ ],
+ importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue",
+ visibility = ["//:sandbox"],
+ deps = [
+ "//pkg/log",
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
+)
+
+go_test(
+ name = "queue_test",
+ srcs = [
+ "queue_test.go",
+ ],
+ embed = [":queue"],
+ deps = [
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
+)
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
new file mode 100644
index 000000000..b022c389c
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go
@@ -0,0 +1,507 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+func TestBasicTxQueue(t *testing.T) {
+ // Tests that a basic transmit on a queue works, and that completion
+ // gets properly reported as well.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Enqueue two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on empty queue")
+ }
+
+ // Check the contents of the pipe.
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+
+ want := []byte{
+ 234, 3, 0, 0, 0, 0, 0, 0, // id
+ 100, 0, 0, 0, // total size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ }
+
+ if !reflect.DeepEqual(want, d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want)
+ }
+
+ rxp.Flush()
+
+ // Check that there are no completions yet.
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Packet reported as completed too soon")
+ }
+
+ // Post a completion.
+ d = txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+
+ // Check that completion is properly reported.
+ id, ok := q.CompletedPacket()
+ if !ok {
+ t.Fatalf("Completion not reported")
+ }
+
+ if id != usedID {
+ t.Fatalf("Bad completion id: got %v, want %v", id, usedID)
+ }
+}
+
+func TestBasicRxQueue(t *testing.T) {
+ // Tests that a basic receive on a queue works.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on empty queue")
+ }
+
+ // Check the contents of the pipe.
+ want := [][]byte{
+ {
+ 100, 0, 0, 0, 0, 0, 0, 0, // Offset1
+ 60, 0, 0, 0, // Size1
+ 0, 0, 0, 0, // Remaining in group 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ },
+ {
+ 200, 0, 0, 0, 0, 0, 0, 0, // Offset2
+ 40, 0, 0, 0, // Size2
+ 0, 0, 0, 0, // Remaining in group 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ },
+ }
+
+ for i := range b {
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+
+ if !reflect.DeepEqual(want[i], d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want[i])
+ }
+
+ rxp.Flush()
+ }
+
+ // Check that there are no completions.
+ if _, n := q.Dequeue(nil); n != 0 {
+ t.Fatalf("Packet reported as received too soon")
+ }
+
+ // Post a completion.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+
+ // Check that completion is properly reported.
+ bufs, n := q.Dequeue(nil)
+ if n != 100 {
+ t.Fatalf("Bad packet size: got %v, want %v", n, 100)
+ }
+
+ if !reflect.DeepEqual(bufs, b) {
+ t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b)
+ }
+}
+
+func TestBadTxCompletion(t *testing.T) {
+ // Check that tx completions with bad sizes are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion that is too long, and check that it is ignored.
+ if d := txp.Push(10); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+}
+
+func TestBadRxCompletion(t *testing.T) {
+ // Check that bad rx completions are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion whose buffer sizes add up to less than the total
+ // size.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 10, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 10, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion whose buffer sizes will cause a 32-bit overflow,
+ // but adds up to the right number.
+ d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 255, 255, 255, 255, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 101, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+}
+
+func TestFillTxPipe(t *testing.T) {
+ // Check that transmitting a new buffer when the buffer pipe is full
+ // fails gracefully.
+ pb1 := make([]byte, 104)
+ pb2 := make([]byte, 104)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Transmit twice, which should fill the tx pipe.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+ for i := uint64(0); i < 2; i++ {
+ if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Failed to transmit buffer")
+ }
+ }
+
+ // Transmit another packet now that the tx pipe is full.
+ if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue succeeded when tx pipe is full")
+ }
+}
+
+func TestFillRxPipe(t *testing.T) {
+ // Check that posting a new buffer when the buffer pipe is full fails
+ // gracefully.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post a buffer twice, it should fill the tx pipe.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ }
+
+ for i := 0; i < 2; i++ {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+ }
+
+ // Post another buffer now that the tx pipe is full.
+ if q.PostBuffers(b) {
+ t.Fatalf("PostBuffers succeeded on full queue")
+ }
+}
+
+func TestLotsOfTransmissions(t *testing.T) {
+ // Make sure pipes are being properly flushed when transmitting packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Prepare packet with two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+
+ // Post 100000 packets and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on non-full queue")
+ }
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+ rxp.Flush()
+
+ d := txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to write to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+ if _, ok := q.CompletedPacket(); !ok {
+ t.Fatalf("Completion not returned")
+ }
+ }
+}
+
+func TestLotsOfReceptions(t *testing.T) {
+ // Make sure pipes are being properly flushed when receiving packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Prepare for posting two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+
+ // Post 100000 buffers and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+
+ if _, n := q.Dequeue(nil); n == 0 {
+ t.Fatalf("Dequeue failed when there is a completion")
+ }
+ }
+}
+
+func TestRxEnableNotification(t *testing.T) {
+ // Check that enabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+
+ q.EnableNotification()
+ if state != eventFDEnabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
+ }
+}
+
+func TestRxDisableNotification(t *testing.T) {
+ // Check that disabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+
+ q.DisableNotification()
+ if state != eventFDDisabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
+ }
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
new file mode 100644
index 000000000..91bb57190
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/rx.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package queue provides the implementation of transmit and receive queues
+// based on shared memory ring buffers.
+package queue
+
+import (
+ "encoding/binary"
+ "sync/atomic"
+
+ "gvisor.googlesource.com/gvisor/pkg/log"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+const (
+ // Offsets within a posted buffer.
+ postedOffset = 0
+ postedSize = 8
+ postedRemainingInGroup = 12
+ postedUserData = 16
+ postedID = 24
+
+ sizeOfPostedBuffer = 32
+
+ // Offsets within a received packet header.
+ consumedPacketSize = 0
+ consumedPacketReserved = 4
+
+ sizeOfConsumedPacketHeader = 8
+
+ // Offsets within a consumed buffer.
+ consumedOffset = 0
+ consumedSize = 8
+ consumedUserData = 12
+ consumedID = 20
+
+ sizeOfConsumedBuffer = 28
+
+ // The following are the allowed states of the shared data area.
+ eventFDUninitialized = 0
+ eventFDDisabled = 1
+ eventFDEnabled = 2
+)
+
+// RxBuffer is the descriptor of a receive buffer.
+type RxBuffer struct {
+ Offset uint64
+ Size uint32
+ ID uint64
+ UserData uint64
+}
+
+// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
+// pipe is used to "post" buffers, while the rx pipe is used to receive packets
+// whose contents have been written to previously posted buffers.
+//
+// This struct is thread-compatible.
+type Rx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+ sharedEventFDState *uint32
+}
+
+// Init initializes the receive queue with the given pipes, and shared state
+// pointer -- the latter is used to enable/disable eventfd notifications.
+func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
+ r.sharedEventFDState = sharedEventFDState
+ r.tx.Init(tx)
+ r.rx.Init(rx)
+}
+
+// EnableNotification updates the shared state such that the peer will notify
+// the eventfd when there are packets to be dequeued.
+func (r *Rx) EnableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
+}
+
+// DisableNotification updates the shared state such that the peer will not
+// notify the eventfd.
+func (r *Rx) DisableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
+}
+
+// PostedBuffersLimit returns the maximum number of buffers that can be posted
+// before the tx queue fills up.
+func (r *Rx) PostedBuffersLimit() uint64 {
+ return r.tx.Capacity(sizeOfPostedBuffer)
+}
+
+// PostBuffers makes the given buffers available for receiving data from the
+// peer. Once they are posted, the peer is free to write to them and will
+// eventually post them back for consumption.
+func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
+ for i := range buffers {
+ b := r.tx.Push(sizeOfPostedBuffer)
+ if b == nil {
+ r.tx.Abort()
+ return false
+ }
+
+ pb := &buffers[i]
+ binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
+ binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
+ binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
+ binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
+ binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
+ }
+
+ r.tx.Flush()
+
+ return true
+}
+
+// Dequeue receives buffers that have been previously posted by PostBuffers()
+// and that have been filled by the peer and posted back.
+//
+// This is similar to append() in that new buffers are appended to "bufs", with
+// reallocation only if "bufs" doesn't have enough capacity.
+func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
+ for {
+ outBufs := bufs
+
+ // Pull the next descriptor from the rx pipe.
+ b := r.rx.Pull()
+ if b == nil {
+ return bufs, 0
+ }
+
+ if len(b) < sizeOfConsumedPacketHeader {
+ log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
+ r.rx.Flush()
+ continue
+ }
+
+ totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
+
+ // Calculate the number of buffer descriptors and copy them
+ // over to the output.
+ count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
+ offset := sizeOfConsumedPacketHeader
+ buffersSize := uint32(0)
+ for i := count; i > 0; i-- {
+ s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
+ buffersSize += s
+ if buffersSize < s {
+ // The buffer size overflows an unsigned 32-bit
+ // integer, so break out and force it to be
+ // ignored.
+ totalDataSize = 1
+ buffersSize = 0
+ break
+ }
+
+ outBufs = append(outBufs, RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
+ Size: s,
+ ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
+ })
+
+ offset += sizeOfConsumedBuffer
+ }
+
+ r.rx.Flush()
+
+ if buffersSize < totalDataSize {
+ // The descriptor is corrupted, ignore it.
+ log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
+ continue
+ }
+
+ return outBufs, totalDataSize
+ }
+}
+
+// Bytes returns the byte slices on which the queue operates.
+func (r *Rx) Bytes() (tx, rx []byte) {
+ return r.tx.Bytes(), r.rx.Bytes()
+}
+
+// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
+func DecodeRxBufferHeader(b []byte) RxBuffer {
+ return RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
+ Size: binary.LittleEndian.Uint32(b[postedSize:]),
+ ID: binary.LittleEndian.Uint64(b[postedID:]),
+ UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
+ }
+}
+
+// RxCompletionSize returns the number of bytes needed to encode an rx
+// completion containing "count" buffers.
+func RxCompletionSize(count int) uint64 {
+ return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
+}
+
+// EncodeRxCompletion encodes an rx completion header.
+func EncodeRxCompletion(b []byte, size, reserved uint32) {
+ binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
+ binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
+}
+
+// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
+func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
+ b = b[RxCompletionSize(i):]
+ binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
+ binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
+ binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
+ binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
new file mode 100644
index 000000000..b04fb163b
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/tx.go
@@ -0,0 +1,141 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "encoding/binary"
+
+ "gvisor.googlesource.com/gvisor/pkg/log"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+const (
+ // Offsets within a packet header.
+ packetID = 0
+ packetSize = 8
+ packetReserved = 12
+
+ sizeOfPacketHeader = 16
+
+ // Offsets with a buffer descriptor
+ bufferOffset = 0
+ bufferSize = 8
+
+ sizeOfBufferDescriptor = 12
+)
+
+// TxBuffer is the descriptor of a transmit buffer.
+type TxBuffer struct {
+ Next *TxBuffer
+ Offset uint64
+ Size uint32
+}
+
+// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the
+// tx pipe is used to request the transmission of packets, while the rx pipe
+// is used to receive which transmissions have completed.
+//
+// This struct is thread-compatible.
+type Tx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+}
+
+// Init initializes the transmit queue with the given pipes.
+func (t *Tx) Init(tx, rx []byte) {
+ t.tx.Init(tx)
+ t.rx.Init(rx)
+}
+
+// Enqueue queues the given linked list of buffers for transmission as one
+// packet. While it is queued, the caller must not modify them.
+func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool {
+ // Reserve room in the tx pipe.
+ totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor
+
+ b := t.tx.Push(totalLen)
+ if b == nil {
+ return false
+ }
+
+ // Initialize the packet and buffer descriptors.
+ binary.LittleEndian.PutUint64(b[packetID:], id)
+ binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen)
+ binary.LittleEndian.PutUint32(b[packetReserved:], 0)
+
+ offset := sizeOfPacketHeader
+ for i := bufferCount; i != 0; i-- {
+ binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset)
+ binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size)
+ offset += sizeOfBufferDescriptor
+ buffer = buffer.Next
+ }
+
+ t.tx.Flush()
+
+ return true
+}
+
+// CompletedPacket returns the id of the last completed transmission. The
+// returned id, if any, refers to a value passed on a previous call to
+// Enqueue().
+func (t *Tx) CompletedPacket() (id uint64, ok bool) {
+ for {
+ b := t.rx.Pull()
+ if b == nil {
+ return 0, false
+ }
+
+ if len(b) != 8 {
+ t.rx.Flush()
+ log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8)
+ continue
+ }
+
+ v := binary.LittleEndian.Uint64(b)
+
+ t.rx.Flush()
+
+ return v, true
+ }
+}
+
+// Bytes returns the byte slices on which the queue operates.
+func (t *Tx) Bytes() (tx, rx []byte) {
+ return t.tx.Bytes(), t.rx.Bytes()
+}
+
+// TxPacketInfo holds information about a packet sent on a tx queue.
+type TxPacketInfo struct {
+ ID uint64
+ Size uint32
+ Reserved uint32
+ BufferCount int
+}
+
+// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue.
+func DecodeTxPacketHeader(b []byte) TxPacketInfo {
+ return TxPacketInfo{
+ ID: binary.LittleEndian.Uint64(b[packetID:]),
+ Size: binary.LittleEndian.Uint32(b[packetSize:]),
+ Reserved: binary.LittleEndian.Uint32(b[packetReserved:]),
+ BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor,
+ }
+}
+
+// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent
+// over a tx queue.
+func DecodeTxBufferHeader(b []byte, i int) TxBuffer {
+ b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:]
+ return TxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[bufferOffset:]),
+ Size: binary.LittleEndian.Uint32(b[bufferSize:]),
+ }
+}
+
+// EncodeTxCompletion encodes a tx completion header.
+func EncodeTxCompletion(b []byte, id uint64) {
+ binary.LittleEndian.PutUint64(b, id)
+}
diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go
new file mode 100644
index 000000000..951ed966b
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/rx.go
@@ -0,0 +1,147 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sharedmem
+
+import (
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue"
+)
+
+// rx holds all state associated with an rx queue.
+type rx struct {
+ data []byte
+ sharedData []byte
+ q queue.Rx
+ eventFD int
+}
+
+// init initializes all state needed by the rx queue based on the information
+// provided.
+//
+// The caller always retains ownership of all file descriptors passed in. The
+// queue implementation will duplicate any that it may need in the future.
+func (r *rx) init(mtu uint32, c *QueueConfig) error {
+ // Map in all buffers.
+ txPipe, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ return err
+ }
+
+ rxPipe, err := getBuffer(c.RxPipeFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ return err
+ }
+
+ data, err := getBuffer(c.DataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ return err
+ }
+
+ sharedData, err := getBuffer(c.SharedDataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ return err
+ }
+
+ // Duplicate the eventFD so that caller can close it but we can still
+ // use it.
+ efd, err := syscall.Dup(c.EventFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ return err
+ }
+
+ // Set the eventfd as non-blocking.
+ if err := syscall.SetNonblock(efd, true); err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ syscall.Close(efd)
+ return err
+ }
+
+ // Initialize state based on buffers.
+ r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
+ r.data = data
+ r.eventFD = efd
+ r.sharedData = sharedData
+
+ return nil
+}
+
+// cleanup releases all resources allocated during init(). It must only be
+// called if init() has previously succeeded.
+func (r *rx) cleanup() {
+ a, b := r.q.Bytes()
+ syscall.Munmap(a)
+ syscall.Munmap(b)
+
+ syscall.Munmap(r.data)
+ syscall.Munmap(r.sharedData)
+ syscall.Close(r.eventFD)
+}
+
+// postAndReceive posts the provided buffers (if any), and then tries to read
+// from the receive queue.
+//
+// Capacity permitting, it reuses the posted buffer slice to store the buffers
+// that were read as well.
+//
+// This function will block if there aren't any available packets.
+func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) {
+ // Post the buffers first. If we cannot post, sleep until we can. We
+ // never post more than will fit concurrently, so it's safe to wait
+ // until enough room is available.
+ if len(b) != 0 && !r.q.PostBuffers(b) {
+ r.q.EnableNotification()
+ for !r.q.PostBuffers(b) {
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+ }
+
+ // Read the next set of descriptors.
+ b, n := r.q.Dequeue(b[:0])
+ if len(b) != 0 {
+ return b, n
+ }
+
+ // Data isn't immediately available. Enable eventfd notifications.
+ r.q.EnableNotification()
+ for {
+ b, n = r.q.Dequeue(b)
+ if len(b) != 0 {
+ break
+ }
+
+ // Wait for notification.
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+
+ return b, n
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go
new file mode 100644
index 000000000..2c0f1b294
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem.go
@@ -0,0 +1,240 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package sharedmem provides the implemention of data-link layer endpoints
+// backed by shared memory.
+//
+// Shared memory endpoints can be used in the networking stack by calling New()
+// to create a new endpoint, and then passing it as an argument to
+// Stack.CreateNIC().
+package sharedmem
+
+import (
+ "sync"
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/log"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/header"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/stack"
+)
+
+// QueueConfig holds all the file descriptors needed to describe a tx or rx
+// queue over shared memory. It is used when creating new shared memory
+// endpoints to describe tx and rx queues.
+type QueueConfig struct {
+ // DataFD is a file descriptor for the file that contains the data to
+ // be transmitted via this queue. Descriptors contain offsets within
+ // this file.
+ DataFD int
+
+ // EventFD is a file descriptor for the event that is signaled when
+ // data is becomes available in this queue.
+ EventFD int
+
+ // TxPipeFD is a file descriptor for the tx pipe associated with the
+ // queue.
+ TxPipeFD int
+
+ // RxPipeFD is a file descriptor for the rx pipe associated with the
+ // queue.
+ RxPipeFD int
+
+ // SharedDataFD is a file descriptor for the file that contains shared
+ // state between the two ends of the queue. This data specifies, for
+ // example, whether EventFD signaling is enabled or disabled.
+ SharedDataFD int
+}
+
+type endpoint struct {
+ // mtu (maximum transmission unit) is the maximum size of a packet.
+ mtu uint32
+
+ // bufferSize is the size of each individual buffer.
+ bufferSize uint32
+
+ // addr is the local address of this endpoint.
+ addr tcpip.LinkAddress
+
+ // rx is the receive queue.
+ rx rx
+
+ // stopRequested is to be accessed atomically only, and determines if
+ // the worker goroutines should stop.
+ stopRequested uint32
+
+ // Wait group used to indicate that all workers have stopped.
+ completed sync.WaitGroup
+
+ // mu protects the following fields.
+ mu sync.Mutex
+
+ // tx is the transmit queue.
+ tx tx
+
+ // workerStarted specifies whether the worker goroutine was started.
+ workerStarted bool
+}
+
+// New creates a new shared-memory-based endpoint. Buffers will be broken up
+// into buffers of "bufferSize" bytes.
+func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (tcpip.LinkEndpointID, error) {
+ e := &endpoint{
+ mtu: mtu,
+ bufferSize: bufferSize,
+ addr: addr,
+ }
+
+ if err := e.tx.init(bufferSize, &tx); err != nil {
+ return 0, err
+ }
+
+ if err := e.rx.init(bufferSize, &rx); err != nil {
+ e.tx.cleanup()
+ return 0, err
+ }
+
+ return stack.RegisterLinkEndpoint(e), nil
+}
+
+// Close frees all resources associated with the endpoint.
+func (e *endpoint) Close() {
+ // Tell dispatch goroutine to stop, then write to the eventfd so that
+ // it wakes up in case it's sleeping.
+ atomic.StoreUint32(&e.stopRequested, 1)
+ syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Cleanup the queues inline if the worker hasn't started yet; we also
+ // know it won't start from now on because stopRequested is set to 1.
+ e.mu.Lock()
+ workerPresent := e.workerStarted
+ e.mu.Unlock()
+
+ if !workerPresent {
+ e.tx.cleanup()
+ e.rx.cleanup()
+ }
+}
+
+// Wait waits until all workers have stopped after a Close() call.
+func (e *endpoint) Wait() {
+ e.completed.Wait()
+}
+
+// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that
+// reads packets from the rx queue.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.mu.Lock()
+ if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 {
+ e.workerStarted = true
+ e.completed.Add(1)
+ go e.dispatchLoop(dispatcher) // S/R-FIXME
+ }
+ e.mu.Unlock()
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
+// during construction.
+func (e *endpoint) MTU() uint32 {
+ return e.mtu - header.EthernetMinimumSize
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (*endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return 0
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the
+// ethernet frame header size.
+func (*endpoint) MaxHeaderLength() uint16 {
+ return header.EthernetMinimumSize
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local
+// link address.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.addr
+}
+
+// WritePacket writes outbound packets to the file descriptor. If it is not
+// currently writable, the packet is dropped.
+func (e *endpoint) WritePacket(r *stack.Route, hdr *buffer.Prependable, payload buffer.View, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
+ // Add the ethernet header here.
+ eth := header.Ethernet(hdr.Prepend(header.EthernetMinimumSize))
+ eth.Encode(&header.EthernetFields{
+ DstAddr: r.RemoteLinkAddress,
+ SrcAddr: e.addr,
+ Type: protocol,
+ })
+
+ // Transmit the packet.
+ e.mu.Lock()
+ ok := e.tx.transmit(hdr.UsedBytes(), payload)
+ e.mu.Unlock()
+
+ if !ok {
+ return tcpip.ErrWouldBlock
+ }
+
+ return nil
+}
+
+// dispatchLoop reads packets from the rx queue in a loop and dispatches them
+// to the network stack.
+func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
+ // Post initial set of buffers.
+ limit := e.rx.q.PostedBuffersLimit()
+ if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l {
+ limit = l
+ }
+ for i := uint64(0); i < limit; i++ {
+ b := queue.RxBuffer{
+ Offset: i * uint64(e.bufferSize),
+ Size: e.bufferSize,
+ ID: i,
+ }
+ if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) {
+ log.Warningf("Unable to post %v-th buffer", i)
+ }
+ }
+
+ // Read in a loop until a stop is requested.
+ var rxb []queue.RxBuffer
+ views := []buffer.View{nil}
+ vv := buffer.NewVectorisedView(0, views)
+ for atomic.LoadUint32(&e.stopRequested) == 0 {
+ var n uint32
+ rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested)
+
+ // Copy data from the shared area to its own buffer, then
+ // prepare to repost the buffer.
+ b := make([]byte, n)
+ offset := uint32(0)
+ for i := range rxb {
+ copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size])
+ offset += rxb[i].Size
+
+ rxb[i].Size = e.bufferSize
+ }
+
+ if n < header.EthernetMinimumSize {
+ continue
+ }
+
+ // Send packet up the stack.
+ eth := header.Ethernet(b)
+ views[0] = b[header.EthernetMinimumSize:]
+ vv.SetSize(int(n) - header.EthernetMinimumSize)
+ d.DeliverNetworkPacket(e, eth.SourceAddress(), eth.Type(), &vv)
+ }
+
+ // Clean state.
+ e.tx.cleanup()
+ e.rx.cleanup()
+
+ e.completed.Done()
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go
new file mode 100644
index 000000000..f71e4751f
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem_test.go
@@ -0,0 +1,703 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sharedmem
+
+import (
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "reflect"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "gvisor.googlesource.com/gvisor/pkg/tcpip"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/header"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/stack"
+)
+
+const (
+ localLinkAddr = "\xde\xad\xbe\xef\x56\x78"
+ remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34"
+
+ queueDataSize = 1024 * 1024
+ queuePipeSize = 4096
+)
+
+type queueBuffers struct {
+ data []byte
+ rx pipe.Tx
+ tx pipe.Rx
+}
+
+func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) {
+ // Prepare tx pipe.
+ b, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+ q.tx.Init(b)
+
+ // Prepare rx pipe.
+ b, err = getBuffer(c.RxPipeFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+ q.rx.Init(b)
+
+ // Get data slice.
+ q.data, err = getBuffer(c.DataFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+}
+
+func (q *queueBuffers) cleanup() {
+ syscall.Munmap(q.tx.Bytes())
+ syscall.Munmap(q.rx.Bytes())
+ syscall.Munmap(q.data)
+}
+
+type packetInfo struct {
+ addr tcpip.LinkAddress
+ proto tcpip.NetworkProtocolNumber
+ vv buffer.VectorisedView
+}
+
+type testContext struct {
+ t *testing.T
+ ep *endpoint
+ txCfg QueueConfig
+ rxCfg QueueConfig
+ txq queueBuffers
+ rxq queueBuffers
+
+ packetCh chan struct{}
+ mu sync.Mutex
+ packets []packetInfo
+}
+
+func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext {
+ var err error
+ c := &testContext{
+ t: t,
+ packetCh: make(chan struct{}, 1000000),
+ }
+ c.txCfg = createQueueFDs(t, queueSizes{
+ dataSize: queueDataSize,
+ txPipeSize: queuePipeSize,
+ rxPipeSize: queuePipeSize,
+ sharedDataSize: 4096,
+ })
+
+ c.rxCfg = createQueueFDs(t, queueSizes{
+ dataSize: queueDataSize,
+ txPipeSize: queuePipeSize,
+ rxPipeSize: queuePipeSize,
+ sharedDataSize: 4096,
+ })
+
+ initQueue(t, &c.txq, &c.txCfg)
+ initQueue(t, &c.rxq, &c.rxCfg)
+
+ id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg)
+ if err != nil {
+ t.Fatalf("New failed: %v", err)
+ }
+
+ c.ep = stack.FindLinkEndpoint(id).(*endpoint)
+ c.ep.Attach(c)
+
+ return c
+}
+
+func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv *buffer.VectorisedView) {
+ c.mu.Lock()
+ c.packets = append(c.packets, packetInfo{
+ addr: remoteAddr,
+ proto: proto,
+ vv: vv.Clone(nil),
+ })
+ c.mu.Unlock()
+
+ c.packetCh <- struct{}{}
+}
+
+func (c *testContext) cleanup() {
+ c.ep.Close()
+ closeFDs(&c.txCfg)
+ closeFDs(&c.rxCfg)
+ c.txq.cleanup()
+ c.rxq.cleanup()
+}
+
+func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) {
+ for i := 0; i < n; i++ {
+ select {
+ case <-c.packetCh:
+ case <-to:
+ c.t.Fatalf(errorStr)
+ }
+ }
+}
+
+func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) {
+ b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs)))
+ queue.EncodeRxCompletion(b, size, 0)
+ for i := range bs {
+ queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{
+ Offset: bs[i].Offset,
+ Size: bs[i].Size,
+ ID: bs[i].ID,
+ })
+ }
+}
+
+func randomFill(b []byte) {
+ for i := range b {
+ b[i] = byte(rand.Intn(256))
+ }
+}
+
+func shuffle(b []int) {
+ for i := len(b) - 1; i >= 0; i-- {
+ j := rand.Intn(i + 1)
+ b[i], b[j] = b[j], b[i]
+ }
+}
+
+func createFile(t *testing.T, size int64, initQueue bool) int {
+ tmpDir := os.Getenv("TEST_TMPDIR")
+ if tmpDir == "" {
+ tmpDir = os.Getenv("TMPDIR")
+ }
+ f, err := ioutil.TempFile(tmpDir, "sharedmem_test")
+ if err != nil {
+ t.Fatalf("TempFile failed: %v", err)
+ }
+ defer f.Close()
+ syscall.Unlink(f.Name())
+
+ if initQueue {
+ // Write the "slot-free" flag in the initial queue.
+ _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0)
+ if err != nil {
+ t.Fatalf("WriteAt failed: %v", err)
+ }
+ }
+
+ fd, err := syscall.Dup(int(f.Fd()))
+ if err != nil {
+ t.Fatalf("Dup failed: %v", err)
+ }
+
+ if err := syscall.Ftruncate(fd, size); err != nil {
+ syscall.Close(fd)
+ t.Fatalf("Ftruncate failed: %v", err)
+ }
+
+ return fd
+}
+
+func closeFDs(c *QueueConfig) {
+ syscall.Close(c.DataFD)
+ syscall.Close(c.EventFD)
+ syscall.Close(c.TxPipeFD)
+ syscall.Close(c.RxPipeFD)
+ syscall.Close(c.SharedDataFD)
+}
+
+type queueSizes struct {
+ dataSize int64
+ txPipeSize int64
+ rxPipeSize int64
+ sharedDataSize int64
+}
+
+func createQueueFDs(t *testing.T, s queueSizes) QueueConfig {
+ fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0)
+ if err != 0 {
+ t.Fatalf("eventfd failed: %v", error(err))
+ }
+
+ return QueueConfig{
+ EventFD: int(fd),
+ DataFD: createFile(t, s.dataSize, false),
+ TxPipeFD: createFile(t, s.txPipeSize, true),
+ RxPipeFD: createFile(t, s.rxPipeSize, true),
+ SharedDataFD: createFile(t, s.sharedDataSize, false),
+ }
+}
+
+// TestSimpleSend sends 1000 packets with random header and payload sizes,
+// then checks that the right payload is received on the shared memory queues.
+func TestSimpleSend(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare route.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ for iters := 1000; iters > 0; iters-- {
+ // Prepare and send packet.
+ n := rand.Intn(10000)
+ hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength()))
+ hdrBuf := hdr.Prepend(n)
+ randomFill(hdrBuf)
+
+ n = rand.Intn(10000)
+ buf := buffer.NewView(n)
+ randomFill(buf)
+
+ proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
+ err := c.ep.WritePacket(&r, &hdr, buf, proto)
+ if err != nil {
+ t.Fatalf("WritePacket failed: %v", err)
+ }
+
+ // Receive packet.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ contents := make([]byte, 0, pi.Size)
+ for i := 0; i < pi.BufferCount; i++ {
+ bi := queue.DecodeTxBufferHeader(desc, i)
+ contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
+ }
+ c.txq.tx.Flush()
+
+ if pi.Reserved != 0 {
+ t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
+ }
+
+ // Check the thernet header.
+ ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
+ ethTemplate.Encode(&header.EthernetFields{
+ SrcAddr: localLinkAddr,
+ DstAddr: remoteLinkAddr,
+ Type: proto,
+ })
+ if got := contents[:header.EthernetMinimumSize]; !reflect.DeepEqual(got, []byte(ethTemplate)) {
+ t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
+ }
+
+ // Compare contents skipping the ethernet header added by the
+ // endpoint.
+ merged := append(hdrBuf, buf...)
+ if uint32(len(contents)) < pi.Size {
+ t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size)
+ }
+ contents = contents[:pi.Size][header.EthernetMinimumSize:]
+
+ if !reflect.DeepEqual(contents, merged) {
+ t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged))
+ }
+
+ // Tell the endpoint about the completion of the write.
+ b := c.txq.rx.Push(8)
+ queue.EncodeTxCompletion(b, pi.ID)
+ c.txq.rx.Flush()
+ }
+}
+
+// TestFillTxQueue sends packets until the queue is full.
+func TestFillTxQueue(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses no more than 40 bytes, so write that many packets
+ // until the tx queue if full.
+ ids := make(map[uint64]struct{})
+ for i := queuePipeSize / 40; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber)
+ if want := tcpip.ErrWouldBlock; err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets
+// until the queue is full.
+func TestFillTxQueueAfterBadCompletion(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Send a bad completion.
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), 1)
+ c.txq.rx.Flush()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Send two packets so that the id slice has at least two slots.
+ for i := 2; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+ }
+
+ // Complete the two writes twice.
+ for i := 2; i > 0; i-- {
+ pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull())
+
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
+ c.txq.rx.Flush()
+ }
+ c.txq.tx.Flush()
+
+ // Each packet is uses no more than 40 bytes, so write that many packets
+ // until the tx queue if full.
+ ids := make(map[uint64]struct{})
+ for i := queuePipeSize / 40; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber)
+ if want := tcpip.ErrWouldBlock; err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxMemory sends packets until the we run out of shared memory.
+func TestFillTxMemory(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses up one buffer, so write as many as possible until
+ // we fill the memory.
+ ids := make(map[uint64]struct{})
+ for i := queueDataSize / bufferSize; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ c.txq.tx.Flush()
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber)
+ if want := tcpip.ErrWouldBlock; err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of
+// shared memory for a 2-buffer packet, but still with room for a 1-buffer
+// packet.
+func TestFillTxMemoryWithMultiBuffer(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses up one buffer, so write as many as possible
+ // until there is only one buffer left.
+ for i := queueDataSize/bufferSize - 1; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Pull the posted buffer.
+ c.txq.tx.Pull()
+ c.txq.tx.Flush()
+ }
+
+ // Attempt to write a two-buffer packet. It must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ err := c.ep.WritePacket(&r, &hdr, buffer.NewView(bufferSize), header.IPv4ProtocolNumber)
+ if want := tcpip.ErrWouldBlock; err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+
+ // Attempt to write a one-buffer packet. It must succeed.
+ hdr = buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+}
+
+func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte {
+ for {
+ b := p.Pull()
+ if b != nil {
+ return b
+ }
+
+ select {
+ case <-time.After(10 * time.Millisecond):
+ case <-to:
+ t.Fatalf(errStr)
+ }
+ }
+}
+
+// TestSimpleReceive completes 1000 different receives with random payload and
+// random number of buffers. It checks that the contents match the expected
+// values.
+func TestSimpleReceive(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Check that buffers have been posted.
+ limit := c.ep.rx.q.PostedBuffersLimit()
+ timeout := time.After(2 * time.Second)
+ for i := uint64(0); i < limit; i++ {
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted"))
+
+ if want := i * bufferSize; want != bi.Offset {
+ t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want)
+ }
+
+ if want := i; want != bi.ID {
+ t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want)
+ }
+
+ if bufferSize != bi.Size {
+ t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize)
+ }
+ }
+ c.rxq.tx.Flush()
+
+ // Create a slice with the indices 0..limit-1.
+ idx := make([]int, limit)
+ for i := range idx {
+ idx[i] = i
+ }
+
+ // Complete random packets 1000 times.
+ for iters := 1000; iters > 0; iters-- {
+ // Prepare a random packet.
+ shuffle(idx)
+ n := 1 + rand.Intn(10)
+ bufs := make([]queue.RxBuffer, n)
+ contents := make([]byte, bufferSize*n-rand.Intn(500))
+ randomFill(contents)
+ for i := range bufs {
+ j := idx[i]
+ bufs[i].Size = bufferSize
+ bufs[i].Offset = uint64(bufferSize * j)
+ bufs[i].ID = uint64(j)
+
+ copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:])
+ }
+
+ // Push completion.
+ c.pushRxCompletion(uint32(len(contents)), bufs)
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for packet to be received, then check it.
+ c.waitForPackets(1, time.After(time.Second), "Error waiting for packet")
+ c.mu.Lock()
+ rcvd := []byte(c.packets[0].vv.First())
+ c.packets = c.packets[:0]
+ c.mu.Unlock()
+
+ contents = contents[header.EthernetMinimumSize:]
+ if !reflect.DeepEqual(contents, rcvd) {
+ t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents)
+ }
+
+ // Check that buffers have been reposted.
+ for i := range bufs {
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted"))
+ if !reflect.DeepEqual(bi, bufs[i]) {
+ t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i])
+ }
+ }
+ c.rxq.tx.Flush()
+ }
+}
+
+// TestRxBuffersReposted tests that rx buffers get reposted after they have been
+// completed.
+func TestRxBuffersReposted(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Receive all posted buffers.
+ limit := c.ep.rx.q.PostedBuffersLimit()
+ buffers := make([]queue.RxBuffer, 0, limit)
+ timeout := time.After(2 * time.Second)
+ for i := limit; i > 0; i-- {
+ buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers")))
+ }
+ c.rxq.tx.Flush()
+
+ // Check that all buffers are reposted when individually completed.
+ timeout = time.After(2 * time.Second)
+ for i := range buffers {
+ // Complete the buffer.
+ c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for it to be reposted.
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
+ if !reflect.DeepEqual(bi, buffers[i]) {
+ t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i])
+ }
+ }
+ c.rxq.tx.Flush()
+
+ // Check that all buffers are reposted when completed in pairs.
+ timeout = time.After(2 * time.Second)
+ for i := 0; i < len(buffers)/2; i++ {
+ // Complete with two buffers.
+ c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for them to be reposted.
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
+ if !reflect.DeepEqual(bi, buffers[2*i]) {
+ t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i])
+ }
+ bi = queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
+ if !reflect.DeepEqual(bi, buffers[2*i+1]) {
+ t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+1])
+ }
+ }
+ c.rxq.tx.Flush()
+}
+
+// TestReceivePostingIsFull checks that the endpoint will properly handle the
+// case when a received buffer cannot be immediately reposted because it hasn't
+// been pulled from the tx pipe yet.
+func TestReceivePostingIsFull(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Complete first posted buffer before flushing it from the tx pipe.
+ first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
+ c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that packet is received.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
+
+ // Complete another buffer.
+ second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
+ c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that no packet is received yet, as the worker is blocked trying
+ // to repost.
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-c.packetCh:
+ t.Fatalf("Unexpected packet received")
+ }
+
+ // Flush tx queue, which will allow the first buffer to be reposted,
+ // and the second completion to be pulled.
+ c.rxq.tx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that second packet completes.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
+}
+
+// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to
+// repost a buffer. Make sure it backs out.
+func TestCloseWhileWaitingToPost(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ cleaned := false
+ defer func() {
+ if !cleaned {
+ c.cleanup()
+ }
+ }()
+
+ // Complete first posted buffer before flushing it from the tx pipe.
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
+ c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for packet to be indicated.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
+
+ // Cleanup and wait for worker to complete.
+ c.cleanup()
+ cleaned = true
+ c.ep.Wait()
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
new file mode 100644
index 000000000..52f93f480
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
@@ -0,0 +1,15 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sharedmem
+
+import (
+ "unsafe"
+)
+
+// sharedDataPointer converts the shared data slice into a pointer so that it
+// can be used in atomic operations.
+func sharedDataPointer(sharedData []byte) *uint32 {
+ return (*uint32)(unsafe.Pointer(&sharedData[0:4][0]))
+}
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
new file mode 100644
index 000000000..bca1d79b4
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/tx.go
@@ -0,0 +1,262 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sharedmem
+
+import (
+ "math"
+ "syscall"
+
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue"
+)
+
+const (
+ nilID = math.MaxUint64
+)
+
+// tx holds all state associated with a tx queue.
+type tx struct {
+ data []byte
+ q queue.Tx
+ ids idManager
+ bufs bufferManager
+}
+
+// init initializes all state needed by the tx queue based on the information
+// provided.
+//
+// The caller always retains ownership of all file descriptors passed in. The
+// queue implementation will duplicate any that it may need in the future.
+func (t *tx) init(mtu uint32, c *QueueConfig) error {
+ // Map in all buffers.
+ txPipe, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ return err
+ }
+
+ rxPipe, err := getBuffer(c.RxPipeFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ return err
+ }
+
+ data, err := getBuffer(c.DataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ return err
+ }
+
+ // Initialize state based on buffers.
+ t.q.Init(txPipe, rxPipe)
+ t.ids.init()
+ t.bufs.init(0, len(data), int(mtu))
+ t.data = data
+
+ return nil
+}
+
+// cleanup releases all resources allocated during init(). It must only be
+// called if init() has previously succeeded.
+func (t *tx) cleanup() {
+ a, b := t.q.Bytes()
+ syscall.Munmap(a)
+ syscall.Munmap(b)
+ syscall.Munmap(t.data)
+}
+
+// transmit sends a packet made up of up to two buffers. Returns a boolean that
+// specifies whether the packet was successfully transmitted.
+func (t *tx) transmit(a, b []byte) bool {
+ // Pull completions from the tx queue and add their buffers back to the
+ // pool so that we can reuse them.
+ for {
+ id, ok := t.q.CompletedPacket()
+ if !ok {
+ break
+ }
+
+ if buf := t.ids.remove(id); buf != nil {
+ t.bufs.free(buf)
+ }
+ }
+
+ bSize := t.bufs.entrySize
+ total := uint32(len(a) + len(b))
+ bufCount := (total + bSize - 1) / bSize
+
+ // Allocate enough buffers to hold all the data.
+ var buf *queue.TxBuffer
+ for i := bufCount; i != 0; i-- {
+ b := t.bufs.alloc()
+ if b == nil {
+ // Failed to get all buffers. Return to the pool
+ // whatever we had managed to get.
+ if buf != nil {
+ t.bufs.free(buf)
+ }
+ return false
+ }
+ b.Next = buf
+ buf = b
+ }
+
+ // Copy data into allocated buffers.
+ nBuf := buf
+ var dBuf []byte
+ for _, data := range [][]byte{a, b} {
+ for len(data) > 0 {
+ if len(dBuf) == 0 {
+ dBuf = t.data[nBuf.Offset:][:nBuf.Size]
+ nBuf = nBuf.Next
+ }
+ n := copy(dBuf, data)
+ data = data[n:]
+ dBuf = dBuf[n:]
+ }
+ }
+
+ // Get an id for this packet and send it out.
+ id := t.ids.add(buf)
+ if !t.q.Enqueue(id, total, bufCount, buf) {
+ t.ids.remove(id)
+ t.bufs.free(buf)
+ return false
+ }
+
+ return true
+}
+
+// getBuffer returns a memory region mapped to the full contents of the given
+// file descriptor.
+func getBuffer(fd int) ([]byte, error) {
+ var s syscall.Stat_t
+ if err := syscall.Fstat(fd, &s); err != nil {
+ return nil, err
+ }
+
+ // Check that size doesn't overflow an int.
+ if s.Size > int64(^uint(0)>>1) {
+ return nil, syscall.EDOM
+ }
+
+ return syscall.Mmap(fd, 0, int(s.Size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_FILE)
+}
+
+// idDescriptor is used by idManager to either point to a tx buffer (in case
+// the ID is assigned) or to the next free element (if the id is not assigned).
+type idDescriptor struct {
+ buf *queue.TxBuffer
+ nextFree uint64
+}
+
+// idManager is a manager of tx buffer identifiers. It assigns unique IDs to
+// tx buffers that are added to it; the IDs can only be reused after they have
+// been removed.
+//
+// The ID assignments are stored so that the tx buffers can be retrieved from
+// the IDs previously assigned to them.
+type idManager struct {
+ // ids is a slice containing all tx buffers. The ID is the index into
+ // this slice.
+ ids []idDescriptor
+
+ // freeList a list of free IDs.
+ freeList uint64
+}
+
+// init initializes the id manager.
+func (m *idManager) init() {
+ m.freeList = nilID
+}
+
+// add assigns an ID to the given tx buffer.
+func (m *idManager) add(b *queue.TxBuffer) uint64 {
+ if i := m.freeList; i != nilID {
+ // There is an id available in the free list, just use it.
+ m.ids[i].buf = b
+ m.freeList = m.ids[i].nextFree
+ return i
+ }
+
+ // We need to expand the id descriptor.
+ m.ids = append(m.ids, idDescriptor{buf: b})
+ return uint64(len(m.ids) - 1)
+}
+
+// remove retrieves the tx buffer associated with the given ID, and removes the
+// ID from the assigned table so that it can be reused in the future.
+func (m *idManager) remove(i uint64) *queue.TxBuffer {
+ if i >= uint64(len(m.ids)) {
+ return nil
+ }
+
+ desc := &m.ids[i]
+ b := desc.buf
+ if b == nil {
+ // The provided id is not currently assigned.
+ return nil
+ }
+
+ desc.buf = nil
+ desc.nextFree = m.freeList
+ m.freeList = i
+
+ return b
+}
+
+// bufferManager manages a buffer region broken up into smaller, equally sized
+// buffers. Smaller buffers can be allocated and freed.
+type bufferManager struct {
+ freeList *queue.TxBuffer
+ curOffset uint64
+ limit uint64
+ entrySize uint32
+}
+
+// init initializes the buffer manager.
+func (b *bufferManager) init(initialOffset, size, entrySize int) {
+ b.freeList = nil
+ b.curOffset = uint64(initialOffset)
+ b.limit = uint64(initialOffset + size/entrySize*entrySize)
+ b.entrySize = uint32(entrySize)
+}
+
+// alloc allocates a buffer from the manager, if one is available.
+func (b *bufferManager) alloc() *queue.TxBuffer {
+ if b.freeList != nil {
+ // There is a descriptor ready for reuse in the free list.
+ d := b.freeList
+ b.freeList = d.Next
+ d.Next = nil
+ return d
+ }
+
+ if b.curOffset < b.limit {
+ // There is room available in the never-used range, so create
+ // a new descriptor for it.
+ d := &queue.TxBuffer{
+ Offset: b.curOffset,
+ Size: b.entrySize,
+ }
+ b.curOffset += uint64(b.entrySize)
+ return d
+ }
+
+ return nil
+}
+
+// free returns all buffers in the list to the buffer manager so that they can
+// be reused.
+func (b *bufferManager) free(d *queue.TxBuffer) {
+ // Find the last buffer in the list.
+ last := d
+ for last.Next != nil {
+ last = last.Next
+ }
+
+ // Push list onto free list.
+ last.Next = b.freeList
+ b.freeList = d
+}