summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem')
-rw-r--r--pkg/tcpip/link/sharedmem/BUILD43
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/BUILD24
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe.go78
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_test.go517
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go35
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/rx.go93
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/tx.go161
-rw-r--r--pkg/tcpip/link/sharedmem/queue/BUILD29
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go517
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go221
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go151
-rw-r--r--pkg/tcpip/link/sharedmem/rx.go159
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem.go288
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_test.go777
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_unsafe.go25
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go272
16 files changed, 0 insertions, 3390 deletions
diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD
deleted file mode 100644
index 0a5ea3dc4..000000000
--- a/pkg/tcpip/link/sharedmem/BUILD
+++ /dev/null
@@ -1,43 +0,0 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
-load("@io_bazel_rules_go//go:def.bzl", "go_test")
-
-package(licenses = ["notice"])
-
-go_library(
- name = "sharedmem",
- srcs = [
- "rx.go",
- "sharedmem.go",
- "sharedmem_unsafe.go",
- "tx.go",
- ],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem",
- visibility = [
- "//:sandbox",
- ],
- deps = [
- "//pkg/log",
- "//pkg/tcpip",
- "//pkg/tcpip/buffer",
- "//pkg/tcpip/header",
- "//pkg/tcpip/link/rawfile",
- "//pkg/tcpip/link/sharedmem/queue",
- "//pkg/tcpip/stack",
- ],
-)
-
-go_test(
- name = "sharedmem_test",
- srcs = [
- "sharedmem_test.go",
- ],
- embed = [":sharedmem"],
- deps = [
- "//pkg/tcpip",
- "//pkg/tcpip/buffer",
- "//pkg/tcpip/header",
- "//pkg/tcpip/link/sharedmem/pipe",
- "//pkg/tcpip/link/sharedmem/queue",
- "//pkg/tcpip/stack",
- ],
-)
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD
deleted file mode 100644
index 330ed5e94..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
-load("@io_bazel_rules_go//go:def.bzl", "go_test")
-
-package(licenses = ["notice"])
-
-go_library(
- name = "pipe",
- srcs = [
- "pipe.go",
- "pipe_unsafe.go",
- "rx.go",
- "tx.go",
- ],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe",
- visibility = ["//:sandbox"],
-)
-
-go_test(
- name = "pipe_test",
- srcs = [
- "pipe_test.go",
- ],
- embed = [":pipe"],
-)
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go
deleted file mode 100644
index 74c9f0311..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package pipe implements a shared memory ring buffer on which a single reader
-// and a single writer can operate (read/write) concurrently. The ring buffer
-// allows for data of different sizes to be written, and preserves the boundary
-// of the written data.
-//
-// Example usage is as follows:
-//
-// wb := t.Push(20)
-// // Write data to wb.
-// t.Flush()
-//
-// rb := r.Pull()
-// // Do something with data in rb.
-// t.Flush()
-package pipe
-
-import (
- "math"
-)
-
-const (
- jump uint64 = math.MaxUint32 + 1
- offsetMask uint64 = math.MaxUint32
- revolutionMask uint64 = ^offsetMask
-
- sizeOfSlotHeader = 8 // sizeof(uint64)
- slotFree uint64 = 1 << 63
- slotSizeMask uint64 = math.MaxUint32
-)
-
-// payloadToSlotSize calculates the total size of a slot based on its payload
-// size. The total size is the header size, plus the payload size, plus padding
-// if necessary to make the total size a multiple of sizeOfSlotHeader.
-func payloadToSlotSize(payloadSize uint64) uint64 {
- s := sizeOfSlotHeader + payloadSize
- return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1)
-}
-
-// slotToPayloadSize calculates the payload size of a slot based on the total
-// size of the slot. This is only meant to be used when creating slots that
-// don't carry information (e.g., free slots or wrap slots).
-func slotToPayloadSize(offset uint64) uint64 {
- return offset - sizeOfSlotHeader
-}
-
-// pipe is a basic data structure used by both (transmit & receive) ends of a
-// pipe. Indices into this pipe are split into two fields: offset, which counts
-// the number of bytes from the beginning of the buffer, and revolution, which
-// counts the number of times the index has wrapped around.
-type pipe struct {
- buffer []byte
-}
-
-// init initializes the pipe buffer such that its size is a multiple of the size
-// of the slot header.
-func (p *pipe) init(b []byte) {
- p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)]
-}
-
-// data returns a section of the buffer starting at the given index (which may
-// include revolution information) and with the given size.
-func (p *pipe) data(idx uint64, size uint64) []byte {
- return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size]
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
deleted file mode 100644
index 59ef69a8b..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
+++ /dev/null
@@ -1,517 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-import (
- "math/rand"
- "reflect"
- "runtime"
- "sync"
- "testing"
-)
-
-func TestSimpleReadWrite(t *testing.T) {
- // Check that a simple write can be properly read from the rx side.
- tr := rand.New(rand.NewSource(99))
- rr := rand.New(rand.NewSource(99))
-
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- wb := tx.Push(10)
- if wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- for i := range wb {
- wb[i] = byte(tr.Intn(256))
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- rb := rx.Pull()
- if len(rb) != 10 {
- t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10)
- }
-
- for i := range rb {
- if v := byte(rr.Intn(256)); v != rb[i] {
- t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v)
- }
- }
- rx.Flush()
-}
-
-func TestEmptyRead(t *testing.T) {
- // Check that pulling from an empty pipe fails.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestTooLargeWrite(t *testing.T) {
- // Check that writes that are too large are properly rejected.
- b := make([]byte, 96)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(96); wb != nil {
- t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe")
- }
-
- if wb := tx.Push(88); wb != nil {
- t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe")
- }
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-}
-
-func TestFullWrite(t *testing.T) {
- // Check that writes fail when the pipe is full.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-
- if wb := tx.Push(1); wb != nil {
- t.Fatalf("Write succeeded on full pipe")
- }
-}
-
-func TestFullAndFlushedWrite(t *testing.T) {
- // Check that writes fail when the pipe is full and has already been
- // flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-
- tx.Flush()
-
- if wb := tx.Push(1); wb != nil {
- t.Fatalf("Write succeeded on full pipe")
- }
-}
-
-func TestTxFlushTwice(t *testing.T) {
- // Checks that a second consecutive tx flush is a no-op.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- // Make copy of original tx queue, flush it, then check that it didn't
- // change.
- orig := tx
- tx.Flush()
-
- if !reflect.DeepEqual(orig, tx) {
- t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig)
- }
-}
-
-func TestRxFlushTwice(t *testing.T) {
- // Checks that a second consecutive rx flush is a no-op.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // Make copy of original rx queue, flush it, then check that it didn't
- // change.
- orig := rx
- rx.Flush()
-
- if !reflect.DeepEqual(orig, rx) {
- t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig)
- }
-}
-
-func TestWrapInMiddleOfTransaction(t *testing.T) {
- // Check that writes are not flushed when we need to wrap the buffer
- // around.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- // We haven't flushed yet, so pull must return nil.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- tx.Flush()
-
- // The two buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-}
-
-func TestWriteAbort(t *testing.T) {
- // Check that a read fails on a pipe that has had data pushed to it but
- // has aborted the push.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Write failed on empty pipe")
- }
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-
- tx.Abort()
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestWrappedWriteAbort(t *testing.T) {
- // Check that writes are properly aborted even if the writes wrap
- // around.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- // We haven't flushed yet, so pull must return nil.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- tx.Abort()
-
- // The pushes were aborted, so no data should be readable.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- // Try the same transactions again, but flush this time.
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- tx.Flush()
-
- // The two buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-}
-
-func TestEmptyReadOnNonFlushedWrite(t *testing.T) {
- // Check that a read fails on a pipe that has had data pushed to it
- // but not yet flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Write failed on empty pipe")
- }
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-
- tx.Flush()
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull on failed on non-empty pipe")
- }
-}
-
-func TestPullAfterPullingEntirePipe(t *testing.T) {
- // Check that Pull fails when the pipe is full, but all of it has
- // already been pulled but not yet flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3
- // buffers that will fill the pipe.
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(20); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- if wb := tx.Push(24); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- tx.Flush()
-
- // The three buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- // Fourth pull must fail.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestNoRoomToWrapOnPush(t *testing.T) {
- // Check that Push fails when it tries to allocate room to add a wrap
- // message.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20,
- // which won't fit (64+20+8+padding = 96, which wouldn't leave room for
- // the padding), so it wraps around.
- if wb := tx.Push(20); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- tx.Flush()
-
- // Buffer offset is at 28. Try to write 70, which would require a wrap
- // slot which cannot be created now.
- if wb := tx.Push(70); wb != nil {
- t.Fatalf("Push succeeded on pipe with no room for wrap message")
- }
-}
-
-func TestRxImplicitFlushOfWrapMessage(t *testing.T) {
- // Check if the first read is that of a wrapping message, that it gets
- // immediately flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- // This will cause a wrapping message to written.
- if wb := tx.Push(60); wb != nil {
- t.Fatalf("Push succeeded when there is no room in pipe")
- }
-
- var rx Rx
- rx.Init(b)
-
- // Read the first message.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // This should fail because of the wrapping message is taking up space.
- if wb := tx.Push(60); wb != nil {
- t.Fatalf("Push succeeded when there is no room in pipe")
- }
-
- // Try to read the next one. This should consume the wrapping message.
- rx.Pull()
-
- // This must now succeed.
- if wb := tx.Push(60); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-}
-
-func TestConcurrentReaderWriter(t *testing.T) {
- // Push a million buffers of random sizes and random contents. Check
- // that buffers read match what was written.
- tr := rand.New(rand.NewSource(99))
- rr := rand.New(rand.NewSource(99))
-
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- var rx Rx
- rx.Init(b)
-
- const count = 1000000
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- runtime.Gosched()
- for i := 0; i < count; i++ {
- n := 1 + tr.Intn(80)
- wb := tx.Push(uint64(n))
- for wb == nil {
- wb = tx.Push(uint64(n))
- }
-
- for j := range wb {
- wb[j] = byte(tr.Intn(256))
- }
-
- tx.Flush()
- }
- }()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- runtime.Gosched()
- for i := 0; i < count; i++ {
- n := 1 + rr.Intn(80)
- rb := rx.Pull()
- for rb == nil {
- rb = rx.Pull()
- }
-
- if n != len(rb) {
- t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n)
- }
-
- for j := range rb {
- if v := byte(rr.Intn(256)); v != rb[j] {
- t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v)
- }
- }
-
- rx.Flush()
- }
- }()
-
- wg.Wait()
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
deleted file mode 100644
index 62d17029e..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-import (
- "sync/atomic"
- "unsafe"
-)
-
-func (p *pipe) write(idx uint64, v uint64) {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- *ptr = v
-}
-
-func (p *pipe) writeAtomic(idx uint64, v uint64) {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- atomic.StoreUint64(ptr, v)
-}
-
-func (p *pipe) readAtomic(idx uint64) uint64 {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- return atomic.LoadUint64(ptr)
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go
deleted file mode 100644
index f22e533ac..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/rx.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Rx is the receive side of the shared memory ring buffer.
-type Rx struct {
- p pipe
-
- tail uint64
- head uint64
-}
-
-// Init initializes the receive end of the pipe. In the initial state, the next
-// slot to be inspected is the very first one.
-func (r *Rx) Init(b []byte) {
- r.p.init(b)
- r.tail = 0xfffffffe * jump
- r.head = r.tail
-}
-
-// Pull reads the next buffer from the pipe, returning nil if there isn't one
-// currently available.
-//
-// The returned slice is available until Flush() is next called. After that, it
-// must not be touched.
-func (r *Rx) Pull() []byte {
- if r.head == r.tail+jump {
- // We've already pulled the whole pipe.
- return nil
- }
-
- header := r.p.readAtomic(r.head)
- if header&slotFree != 0 {
- // The next slot is free, we can't pull it yet.
- return nil
- }
-
- payloadSize := header & slotSizeMask
- newHead := r.head + payloadToSlotSize(payloadSize)
- headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
-
- // Check if this is a wrapping slot. If that's the case, it carries no
- // data, so we just skip it and try again from the first slot.
- if int64(newHead-headWrap) >= 0 {
- if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
- return nil
- }
-
- if r.tail == r.head {
- // If this is the first pull since the last Flush()
- // call, we flush the state so that the sender can use
- // this space if it needs to.
- r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
- r.tail = newHead
- }
-
- r.head = newHead
- return r.Pull()
- }
-
- // Grab the buffer before updating r.head.
- b := r.p.data(r.head, payloadSize)
- r.head = newHead
- return b
-}
-
-// Flush tells the transmitter that all buffers pulled since the last Flush()
-// have been used, so the transmitter is free to used their slots for further
-// transmission.
-func (r *Rx) Flush() {
- if r.head == r.tail {
- return
- }
- r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
- r.tail = r.head
-}
-
-// Bytes returns the byte slice on which the pipe operates.
-func (r *Rx) Bytes() []byte {
- return r.p.buffer
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go
deleted file mode 100644
index 9841eb231..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/tx.go
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Tx is the transmit side of the shared memory ring buffer.
-type Tx struct {
- p pipe
- maxPayloadSize uint64
-
- head uint64
- tail uint64
- next uint64
-
- tailHeader uint64
-}
-
-// Init initializes the transmit end of the pipe. In the initial state, the next
-// slot to be written is the very first one, and the transmitter has the whole
-// ring buffer available to it.
-func (t *Tx) Init(b []byte) {
- t.p.init(b)
- // maxPayloadSize excludes the header of the payload, and the header
- // of the wrapping message.
- t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
- t.tail = 0xfffffffe * jump
- t.next = t.tail
- t.head = t.tail + jump
- t.p.write(t.tail, slotFree)
-}
-
-// Capacity determines how many records of the given size can be written to the
-// pipe before it fills up.
-func (t *Tx) Capacity(recordSize uint64) uint64 {
- available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
- entryLen := payloadToSlotSize(recordSize)
- return available / entryLen
-}
-
-// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
-// populates the returned slice with the data to be transferred and enventually
-// calls Flush() to make the data visible to the reader, or Abort() to make the
-// pipe forget all Push() calls since the last Flush().
-//
-// The returned slice is available until Flush() or Abort() is next called.
-// After that, it must not be touched.
-func (t *Tx) Push(payloadSize uint64) []byte {
- // Fail request if we know we will never have enough room.
- if payloadSize > t.maxPayloadSize {
- return nil
- }
-
- totalLen := payloadToSlotSize(payloadSize)
- newNext := t.next + totalLen
- nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
- if int64(newNext-nextWrap) >= 0 {
- // The new buffer would overflow the pipe, so we push a wrapping
- // slot, then try to add the actual slot to the front of the
- // pipe.
- newNext = (newNext & revolutionMask) + jump
- wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
- if !t.reclaim(newNext) {
- return nil
- }
-
- oldNext := t.next
- t.next = newNext
- if oldNext != t.tail {
- t.p.write(oldNext, wrappingPayloadSize)
- } else {
- t.tailHeader = wrappingPayloadSize
- t.Flush()
- }
-
- newNext += totalLen
- }
-
- // Check that we have enough room for the buffer.
- if !t.reclaim(newNext) {
- return nil
- }
-
- if t.next != t.tail {
- t.p.write(t.next, payloadSize)
- } else {
- t.tailHeader = payloadSize
- }
-
- // Grab the buffer before updating t.next.
- b := t.p.data(t.next, payloadSize)
- t.next = newNext
-
- return b
-}
-
-// reclaim attempts to advance the head until at least newNext. If the head is
-// already at or beyond newNext, nothing happens and true is returned; otherwise
-// it tries to reclaim slots that have already been consumed by the receive end
-// of the pipe (they will be marked as free) and returns a boolean indicating
-// whether it was successful in reclaiming enough slots.
-func (t *Tx) reclaim(newNext uint64) bool {
- for int64(newNext-t.head) > 0 {
- // Can't reclaim if slot is not free.
- header := t.p.readAtomic(t.head)
- if header&slotFree == 0 {
- return false
- }
-
- payloadSize := header & slotSizeMask
- newHead := t.head + payloadToSlotSize(payloadSize)
-
- // Check newHead is within bounds and valid.
- if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
- return false
- }
-
- t.head = newHead
- }
-
- return true
-}
-
-// Abort causes all Push() calls since the last Flush() to be forgotten and
-// therefore they will not be made visible to the receiver.
-func (t *Tx) Abort() {
- t.next = t.tail
-}
-
-// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
-// is the most recent] to be made visible to the receiver.
-func (t *Tx) Flush() {
- if t.next == t.tail {
- // Nothing to do if there are no pushed buffers.
- return
- }
-
- if t.next != t.head {
- // The receiver will spin in t.next, so we must make sure that
- // the slotFree bit is set.
- t.p.write(t.next, slotFree)
- }
-
- t.p.writeAtomic(t.tail, t.tailHeader)
- t.tail = t.next
-}
-
-// Bytes returns the byte slice on which the pipe operates.
-func (t *Tx) Bytes() []byte {
- return t.p.buffer
-}
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD
deleted file mode 100644
index de1ce043d..000000000
--- a/pkg/tcpip/link/sharedmem/queue/BUILD
+++ /dev/null
@@ -1,29 +0,0 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
-load("@io_bazel_rules_go//go:def.bzl", "go_test")
-
-package(licenses = ["notice"])
-
-go_library(
- name = "queue",
- srcs = [
- "rx.go",
- "tx.go",
- ],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue",
- visibility = ["//:sandbox"],
- deps = [
- "//pkg/log",
- "//pkg/tcpip/link/sharedmem/pipe",
- ],
-)
-
-go_test(
- name = "queue_test",
- srcs = [
- "queue_test.go",
- ],
- embed = [":queue"],
- deps = [
- "//pkg/tcpip/link/sharedmem/pipe",
- ],
-)
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
deleted file mode 100644
index 9a0aad5d7..000000000
--- a/pkg/tcpip/link/sharedmem/queue/queue_test.go
+++ /dev/null
@@ -1,517 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package queue
-
-import (
- "encoding/binary"
- "reflect"
- "testing"
-
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-func TestBasicTxQueue(t *testing.T) {
- // Tests that a basic transmit on a queue works, and that completion
- // gets properly reported as well.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Enqueue two buffers.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
- if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue failed on empty queue")
- }
-
- // Check the contents of the pipe.
- d := rxp.Pull()
- if d == nil {
- t.Fatalf("Tx pipe is empty after Enqueue")
- }
-
- want := []byte{
- 234, 3, 0, 0, 0, 0, 0, 0, // id
- 100, 0, 0, 0, // total size
- 0, 0, 0, 0, // reserved
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- }
-
- if !reflect.DeepEqual(want, d) {
- t.Fatalf("Bad posted packet: got %v, want %v", d, want)
- }
-
- rxp.Flush()
-
- // Check that there are no completions yet.
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Packet reported as completed too soon")
- }
-
- // Post a completion.
- d = txp.Push(8)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- binary.LittleEndian.PutUint64(d, usedID)
- txp.Flush()
-
- // Check that completion is properly reported.
- id, ok := q.CompletedPacket()
- if !ok {
- t.Fatalf("Completion not reported")
- }
-
- if id != usedID {
- t.Fatalf("Bad completion id: got %v, want %v", id, usedID)
- }
-}
-
-func TestBasicRxQueue(t *testing.T) {
- // Tests that a basic receive on a queue works.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post two buffers.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- {200, 40, 2123, 0},
- }
-
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on empty queue")
- }
-
- // Check the contents of the pipe.
- want := [][]byte{
- {
- 100, 0, 0, 0, 0, 0, 0, 0, // Offset1
- 60, 0, 0, 0, // Size1
- 0, 0, 0, 0, // Remaining in group 1
- 0, 0, 0, 0, 0, 0, 0, 0, // User data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
- },
- {
- 200, 0, 0, 0, 0, 0, 0, 0, // Offset2
- 40, 0, 0, 0, // Size2
- 0, 0, 0, 0, // Remaining in group 2
- 0, 0, 0, 0, 0, 0, 0, 0, // User data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- },
- }
-
- for i := range b {
- d := rxp.Pull()
- if d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
-
- if !reflect.DeepEqual(want[i], d) {
- t.Fatalf("Bad posted packet: got %v, want %v", d, want[i])
- }
-
- rxp.Flush()
- }
-
- // Check that there are no completions.
- if _, n := q.Dequeue(nil); n != 0 {
- t.Fatalf("Packet reported as received too soon")
- }
-
- // Post a completion.
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
-
- // Check that completion is properly reported.
- bufs, n := q.Dequeue(nil)
- if n != 100 {
- t.Fatalf("Bad packet size: got %v, want %v", n, 100)
- }
-
- if !reflect.DeepEqual(bufs, b) {
- t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b)
- }
-}
-
-func TestBadTxCompletion(t *testing.T) {
- // Check that tx completions with bad sizes are properly ignored.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Post a completion that is too short, and check that it is ignored.
- if d := txp.Push(7); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion that is too long, and check that it is ignored.
- if d := txp.Push(10); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Bad completion not ignored")
- }
-}
-
-func TestBadRxCompletion(t *testing.T) {
- // Check that bad rx completions are properly ignored.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post a completion that is too short, and check that it is ignored.
- if d := txp.Push(7); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion whose buffer sizes add up to less than the total
- // size.
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 10, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 10, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion whose buffer sizes will cause a 32-bit overflow,
- // but adds up to the right number.
- d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 255, 255, 255, 255, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 101, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-}
-
-func TestFillTxPipe(t *testing.T) {
- // Check that transmitting a new buffer when the buffer pipe is full
- // fails gracefully.
- pb1 := make([]byte, 104)
- pb2 := make([]byte, 104)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Transmit twice, which should fill the tx pipe.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
- for i := uint64(0); i < 2; i++ {
- if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Failed to transmit buffer")
- }
- }
-
- // Transmit another packet now that the tx pipe is full.
- if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue succeeded when tx pipe is full")
- }
-}
-
-func TestFillRxPipe(t *testing.T) {
- // Check that posting a new buffer when the buffer pipe is full fails
- // gracefully.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post a buffer twice, it should fill the tx pipe.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- }
-
- for i := 0; i < 2; i++ {
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on non-full queue")
- }
- }
-
- // Post another buffer now that the tx pipe is full.
- if q.PostBuffers(b) {
- t.Fatalf("PostBuffers succeeded on full queue")
- }
-}
-
-func TestLotsOfTransmissions(t *testing.T) {
- // Make sure pipes are being properly flushed when transmitting packets.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Prepare packet with two buffers.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
-
- // Post 100000 packets and completions.
- for i := 100000; i > 0; i-- {
- if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue failed on non-full queue")
- }
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after Enqueue")
- }
- rxp.Flush()
-
- d := txp.Push(8)
- if d == nil {
- t.Fatalf("Unable to write to rx pipe")
- }
- binary.LittleEndian.PutUint64(d, usedID)
- txp.Flush()
- if _, ok := q.CompletedPacket(); !ok {
- t.Fatalf("Completion not returned")
- }
- }
-}
-
-func TestLotsOfReceptions(t *testing.T) {
- // Make sure pipes are being properly flushed when receiving packets.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Prepare for posting two buffers.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- {200, 40, 2123, 0},
- }
-
- // Post 100000 buffers and completions.
- for i := 100000; i > 0; i-- {
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on non-full queue")
- }
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
- rxp.Flush()
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
- rxp.Flush()
-
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
-
- if _, n := q.Dequeue(nil); n == 0 {
- t.Fatalf("Dequeue failed when there is a completion")
- }
- }
-}
-
-func TestRxEnableNotification(t *testing.T) {
- // Check that enabling nofifications results in properly updated state.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var state uint32
- var q Rx
- q.Init(pb1, pb2, &state)
-
- q.EnableNotification()
- if state != eventFDEnabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
- }
-}
-
-func TestRxDisableNotification(t *testing.T) {
- // Check that disabling nofifications results in properly updated state.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var state uint32
- var q Rx
- q.Init(pb1, pb2, &state)
-
- q.DisableNotification()
- if state != eventFDDisabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
- }
-}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
deleted file mode 100644
index 696e6c9e5..000000000
--- a/pkg/tcpip/link/sharedmem/queue/rx.go
+++ /dev/null
@@ -1,221 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package queue provides the implementation of transmit and receive queues
-// based on shared memory ring buffers.
-package queue
-
-import (
- "encoding/binary"
- "sync/atomic"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-const (
- // Offsets within a posted buffer.
- postedOffset = 0
- postedSize = 8
- postedRemainingInGroup = 12
- postedUserData = 16
- postedID = 24
-
- sizeOfPostedBuffer = 32
-
- // Offsets within a received packet header.
- consumedPacketSize = 0
- consumedPacketReserved = 4
-
- sizeOfConsumedPacketHeader = 8
-
- // Offsets within a consumed buffer.
- consumedOffset = 0
- consumedSize = 8
- consumedUserData = 12
- consumedID = 20
-
- sizeOfConsumedBuffer = 28
-
- // The following are the allowed states of the shared data area.
- eventFDUninitialized = 0
- eventFDDisabled = 1
- eventFDEnabled = 2
-)
-
-// RxBuffer is the descriptor of a receive buffer.
-type RxBuffer struct {
- Offset uint64
- Size uint32
- ID uint64
- UserData uint64
-}
-
-// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
-// pipe is used to "post" buffers, while the rx pipe is used to receive packets
-// whose contents have been written to previously posted buffers.
-//
-// This struct is thread-compatible.
-type Rx struct {
- tx pipe.Tx
- rx pipe.Rx
- sharedEventFDState *uint32
-}
-
-// Init initializes the receive queue with the given pipes, and shared state
-// pointer -- the latter is used to enable/disable eventfd notifications.
-func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
- r.sharedEventFDState = sharedEventFDState
- r.tx.Init(tx)
- r.rx.Init(rx)
-}
-
-// EnableNotification updates the shared state such that the peer will notify
-// the eventfd when there are packets to be dequeued.
-func (r *Rx) EnableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
-}
-
-// DisableNotification updates the shared state such that the peer will not
-// notify the eventfd.
-func (r *Rx) DisableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
-}
-
-// PostedBuffersLimit returns the maximum number of buffers that can be posted
-// before the tx queue fills up.
-func (r *Rx) PostedBuffersLimit() uint64 {
- return r.tx.Capacity(sizeOfPostedBuffer)
-}
-
-// PostBuffers makes the given buffers available for receiving data from the
-// peer. Once they are posted, the peer is free to write to them and will
-// eventually post them back for consumption.
-func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
- for i := range buffers {
- b := r.tx.Push(sizeOfPostedBuffer)
- if b == nil {
- r.tx.Abort()
- return false
- }
-
- pb := &buffers[i]
- binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
- binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
- binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
- binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
- binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
- }
-
- r.tx.Flush()
-
- return true
-}
-
-// Dequeue receives buffers that have been previously posted by PostBuffers()
-// and that have been filled by the peer and posted back.
-//
-// This is similar to append() in that new buffers are appended to "bufs", with
-// reallocation only if "bufs" doesn't have enough capacity.
-func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
- for {
- outBufs := bufs
-
- // Pull the next descriptor from the rx pipe.
- b := r.rx.Pull()
- if b == nil {
- return bufs, 0
- }
-
- if len(b) < sizeOfConsumedPacketHeader {
- log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
- r.rx.Flush()
- continue
- }
-
- totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
-
- // Calculate the number of buffer descriptors and copy them
- // over to the output.
- count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
- offset := sizeOfConsumedPacketHeader
- buffersSize := uint32(0)
- for i := count; i > 0; i-- {
- s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
- buffersSize += s
- if buffersSize < s {
- // The buffer size overflows an unsigned 32-bit
- // integer, so break out and force it to be
- // ignored.
- totalDataSize = 1
- buffersSize = 0
- break
- }
-
- outBufs = append(outBufs, RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
- Size: s,
- ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
- })
-
- offset += sizeOfConsumedBuffer
- }
-
- r.rx.Flush()
-
- if buffersSize < totalDataSize {
- // The descriptor is corrupted, ignore it.
- log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
- continue
- }
-
- return outBufs, totalDataSize
- }
-}
-
-// Bytes returns the byte slices on which the queue operates.
-func (r *Rx) Bytes() (tx, rx []byte) {
- return r.tx.Bytes(), r.rx.Bytes()
-}
-
-// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
-func DecodeRxBufferHeader(b []byte) RxBuffer {
- return RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
- Size: binary.LittleEndian.Uint32(b[postedSize:]),
- ID: binary.LittleEndian.Uint64(b[postedID:]),
- UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
- }
-}
-
-// RxCompletionSize returns the number of bytes needed to encode an rx
-// completion containing "count" buffers.
-func RxCompletionSize(count int) uint64 {
- return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
-}
-
-// EncodeRxCompletion encodes an rx completion header.
-func EncodeRxCompletion(b []byte, size, reserved uint32) {
- binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
- binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
-}
-
-// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
-func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
- b = b[RxCompletionSize(i):]
- binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
- binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
- binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
- binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
-}
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
deleted file mode 100644
index beffe807b..000000000
--- a/pkg/tcpip/link/sharedmem/queue/tx.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package queue
-
-import (
- "encoding/binary"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-const (
- // Offsets within a packet header.
- packetID = 0
- packetSize = 8
- packetReserved = 12
-
- sizeOfPacketHeader = 16
-
- // Offsets with a buffer descriptor
- bufferOffset = 0
- bufferSize = 8
-
- sizeOfBufferDescriptor = 12
-)
-
-// TxBuffer is the descriptor of a transmit buffer.
-type TxBuffer struct {
- Next *TxBuffer
- Offset uint64
- Size uint32
-}
-
-// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the
-// tx pipe is used to request the transmission of packets, while the rx pipe
-// is used to receive which transmissions have completed.
-//
-// This struct is thread-compatible.
-type Tx struct {
- tx pipe.Tx
- rx pipe.Rx
-}
-
-// Init initializes the transmit queue with the given pipes.
-func (t *Tx) Init(tx, rx []byte) {
- t.tx.Init(tx)
- t.rx.Init(rx)
-}
-
-// Enqueue queues the given linked list of buffers for transmission as one
-// packet. While it is queued, the caller must not modify them.
-func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool {
- // Reserve room in the tx pipe.
- totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor
-
- b := t.tx.Push(totalLen)
- if b == nil {
- return false
- }
-
- // Initialize the packet and buffer descriptors.
- binary.LittleEndian.PutUint64(b[packetID:], id)
- binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen)
- binary.LittleEndian.PutUint32(b[packetReserved:], 0)
-
- offset := sizeOfPacketHeader
- for i := bufferCount; i != 0; i-- {
- binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset)
- binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size)
- offset += sizeOfBufferDescriptor
- buffer = buffer.Next
- }
-
- t.tx.Flush()
-
- return true
-}
-
-// CompletedPacket returns the id of the last completed transmission. The
-// returned id, if any, refers to a value passed on a previous call to
-// Enqueue().
-func (t *Tx) CompletedPacket() (id uint64, ok bool) {
- for {
- b := t.rx.Pull()
- if b == nil {
- return 0, false
- }
-
- if len(b) != 8 {
- t.rx.Flush()
- log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8)
- continue
- }
-
- v := binary.LittleEndian.Uint64(b)
-
- t.rx.Flush()
-
- return v, true
- }
-}
-
-// Bytes returns the byte slices on which the queue operates.
-func (t *Tx) Bytes() (tx, rx []byte) {
- return t.tx.Bytes(), t.rx.Bytes()
-}
-
-// TxPacketInfo holds information about a packet sent on a tx queue.
-type TxPacketInfo struct {
- ID uint64
- Size uint32
- Reserved uint32
- BufferCount int
-}
-
-// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue.
-func DecodeTxPacketHeader(b []byte) TxPacketInfo {
- return TxPacketInfo{
- ID: binary.LittleEndian.Uint64(b[packetID:]),
- Size: binary.LittleEndian.Uint32(b[packetSize:]),
- Reserved: binary.LittleEndian.Uint32(b[packetReserved:]),
- BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor,
- }
-}
-
-// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent
-// over a tx queue.
-func DecodeTxBufferHeader(b []byte, i int) TxBuffer {
- b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:]
- return TxBuffer{
- Offset: binary.LittleEndian.Uint64(b[bufferOffset:]),
- Size: binary.LittleEndian.Uint32(b[bufferSize:]),
- }
-}
-
-// EncodeTxCompletion encodes a tx completion header.
-func EncodeTxCompletion(b []byte, id uint64) {
- binary.LittleEndian.PutUint64(b, id)
-}
diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go
deleted file mode 100644
index eec11e4cb..000000000
--- a/pkg/tcpip/link/sharedmem/rx.go
+++ /dev/null
@@ -1,159 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// +build linux
-
-package sharedmem
-
-import (
- "sync/atomic"
- "syscall"
-
- "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
-)
-
-// rx holds all state associated with an rx queue.
-type rx struct {
- data []byte
- sharedData []byte
- q queue.Rx
- eventFD int
-}
-
-// init initializes all state needed by the rx queue based on the information
-// provided.
-//
-// The caller always retains ownership of all file descriptors passed in. The
-// queue implementation will duplicate any that it may need in the future.
-func (r *rx) init(mtu uint32, c *QueueConfig) error {
- // Map in all buffers.
- txPipe, err := getBuffer(c.TxPipeFD)
- if err != nil {
- return err
- }
-
- rxPipe, err := getBuffer(c.RxPipeFD)
- if err != nil {
- syscall.Munmap(txPipe)
- return err
- }
-
- data, err := getBuffer(c.DataFD)
- if err != nil {
- syscall.Munmap(txPipe)
- syscall.Munmap(rxPipe)
- return err
- }
-
- sharedData, err := getBuffer(c.SharedDataFD)
- if err != nil {
- syscall.Munmap(txPipe)
- syscall.Munmap(rxPipe)
- syscall.Munmap(data)
- return err
- }
-
- // Duplicate the eventFD so that caller can close it but we can still
- // use it.
- efd, err := syscall.Dup(c.EventFD)
- if err != nil {
- syscall.Munmap(txPipe)
- syscall.Munmap(rxPipe)
- syscall.Munmap(data)
- syscall.Munmap(sharedData)
- return err
- }
-
- // Set the eventfd as non-blocking.
- if err := syscall.SetNonblock(efd, true); err != nil {
- syscall.Munmap(txPipe)
- syscall.Munmap(rxPipe)
- syscall.Munmap(data)
- syscall.Munmap(sharedData)
- syscall.Close(efd)
- return err
- }
-
- // Initialize state based on buffers.
- r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
- r.data = data
- r.eventFD = efd
- r.sharedData = sharedData
-
- return nil
-}
-
-// cleanup releases all resources allocated during init(). It must only be
-// called if init() has previously succeeded.
-func (r *rx) cleanup() {
- a, b := r.q.Bytes()
- syscall.Munmap(a)
- syscall.Munmap(b)
-
- syscall.Munmap(r.data)
- syscall.Munmap(r.sharedData)
- syscall.Close(r.eventFD)
-}
-
-// postAndReceive posts the provided buffers (if any), and then tries to read
-// from the receive queue.
-//
-// Capacity permitting, it reuses the posted buffer slice to store the buffers
-// that were read as well.
-//
-// This function will block if there aren't any available packets.
-func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) {
- // Post the buffers first. If we cannot post, sleep until we can. We
- // never post more than will fit concurrently, so it's safe to wait
- // until enough room is available.
- if len(b) != 0 && !r.q.PostBuffers(b) {
- r.q.EnableNotification()
- for !r.q.PostBuffers(b) {
- var tmp [8]byte
- rawfile.BlockingRead(r.eventFD, tmp[:])
- if atomic.LoadUint32(stopRequested) != 0 {
- r.q.DisableNotification()
- return nil, 0
- }
- }
- r.q.DisableNotification()
- }
-
- // Read the next set of descriptors.
- b, n := r.q.Dequeue(b[:0])
- if len(b) != 0 {
- return b, n
- }
-
- // Data isn't immediately available. Enable eventfd notifications.
- r.q.EnableNotification()
- for {
- b, n = r.q.Dequeue(b)
- if len(b) != 0 {
- break
- }
-
- // Wait for notification.
- var tmp [8]byte
- rawfile.BlockingRead(r.eventFD, tmp[:])
- if atomic.LoadUint32(stopRequested) != 0 {
- r.q.DisableNotification()
- return nil, 0
- }
- }
- r.q.DisableNotification()
-
- return b, n
-}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go
deleted file mode 100644
index 2bace5298..000000000
--- a/pkg/tcpip/link/sharedmem/sharedmem.go
+++ /dev/null
@@ -1,288 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// +build linux
-
-// Package sharedmem provides the implemention of data-link layer endpoints
-// backed by shared memory.
-//
-// Shared memory endpoints can be used in the networking stack by calling New()
-// to create a new endpoint, and then passing it as an argument to
-// Stack.CreateNIC().
-package sharedmem
-
-import (
- "sync"
- "sync/atomic"
- "syscall"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip"
- "gvisor.dev/gvisor/pkg/tcpip/buffer"
- "gvisor.dev/gvisor/pkg/tcpip/header"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
- "gvisor.dev/gvisor/pkg/tcpip/stack"
-)
-
-// QueueConfig holds all the file descriptors needed to describe a tx or rx
-// queue over shared memory. It is used when creating new shared memory
-// endpoints to describe tx and rx queues.
-type QueueConfig struct {
- // DataFD is a file descriptor for the file that contains the data to
- // be transmitted via this queue. Descriptors contain offsets within
- // this file.
- DataFD int
-
- // EventFD is a file descriptor for the event that is signaled when
- // data is becomes available in this queue.
- EventFD int
-
- // TxPipeFD is a file descriptor for the tx pipe associated with the
- // queue.
- TxPipeFD int
-
- // RxPipeFD is a file descriptor for the rx pipe associated with the
- // queue.
- RxPipeFD int
-
- // SharedDataFD is a file descriptor for the file that contains shared
- // state between the two ends of the queue. This data specifies, for
- // example, whether EventFD signaling is enabled or disabled.
- SharedDataFD int
-}
-
-type endpoint struct {
- // mtu (maximum transmission unit) is the maximum size of a packet.
- mtu uint32
-
- // bufferSize is the size of each individual buffer.
- bufferSize uint32
-
- // addr is the local address of this endpoint.
- addr tcpip.LinkAddress
-
- // rx is the receive queue.
- rx rx
-
- // stopRequested is to be accessed atomically only, and determines if
- // the worker goroutines should stop.
- stopRequested uint32
-
- // Wait group used to indicate that all workers have stopped.
- completed sync.WaitGroup
-
- // mu protects the following fields.
- mu sync.Mutex
-
- // tx is the transmit queue.
- tx tx
-
- // workerStarted specifies whether the worker goroutine was started.
- workerStarted bool
-}
-
-// New creates a new shared-memory-based endpoint. Buffers will be broken up
-// into buffers of "bufferSize" bytes.
-func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (stack.LinkEndpoint, error) {
- e := &endpoint{
- mtu: mtu,
- bufferSize: bufferSize,
- addr: addr,
- }
-
- if err := e.tx.init(bufferSize, &tx); err != nil {
- return nil, err
- }
-
- if err := e.rx.init(bufferSize, &rx); err != nil {
- e.tx.cleanup()
- return nil, err
- }
-
- return e, nil
-}
-
-// Close frees all resources associated with the endpoint.
-func (e *endpoint) Close() {
- // Tell dispatch goroutine to stop, then write to the eventfd so that
- // it wakes up in case it's sleeping.
- atomic.StoreUint32(&e.stopRequested, 1)
- syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Cleanup the queues inline if the worker hasn't started yet; we also
- // know it won't start from now on because stopRequested is set to 1.
- e.mu.Lock()
- workerPresent := e.workerStarted
- e.mu.Unlock()
-
- if !workerPresent {
- e.tx.cleanup()
- e.rx.cleanup()
- }
-}
-
-// Wait implements stack.LinkEndpoint.Wait. It waits until all workers have
-// stopped after a Close() call.
-func (e *endpoint) Wait() {
- e.completed.Wait()
-}
-
-// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that
-// reads packets from the rx queue.
-func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
- e.mu.Lock()
- if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 {
- e.workerStarted = true
- e.completed.Add(1)
- // Link endpoints are not savable. When transportation endpoints
- // are saved, they stop sending outgoing packets and all
- // incoming packets are rejected.
- go e.dispatchLoop(dispatcher) // S/R-SAFE: see above.
- }
- e.mu.Unlock()
-}
-
-// IsAttached implements stack.LinkEndpoint.IsAttached.
-func (e *endpoint) IsAttached() bool {
- e.mu.Lock()
- defer e.mu.Unlock()
- return e.workerStarted
-}
-
-// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
-// during construction.
-func (e *endpoint) MTU() uint32 {
- return e.mtu - header.EthernetMinimumSize
-}
-
-// Capabilities implements stack.LinkEndpoint.Capabilities.
-func (*endpoint) Capabilities() stack.LinkEndpointCapabilities {
- return 0
-}
-
-// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the
-// ethernet frame header size.
-func (*endpoint) MaxHeaderLength() uint16 {
- return header.EthernetMinimumSize
-}
-
-// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local
-// link address.
-func (e *endpoint) LinkAddress() tcpip.LinkAddress {
- return e.addr
-}
-
-// WritePacket writes outbound packets to the file descriptor. If it is not
-// currently writable, the packet is dropped.
-func (e *endpoint) WritePacket(r *stack.Route, _ *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
- // Add the ethernet header here.
- eth := header.Ethernet(hdr.Prepend(header.EthernetMinimumSize))
- ethHdr := &header.EthernetFields{
- DstAddr: r.RemoteLinkAddress,
- Type: protocol,
- }
- if r.LocalLinkAddress != "" {
- ethHdr.SrcAddr = r.LocalLinkAddress
- } else {
- ethHdr.SrcAddr = e.addr
- }
- eth.Encode(ethHdr)
-
- v := payload.ToView()
- // Transmit the packet.
- e.mu.Lock()
- ok := e.tx.transmit(hdr.View(), v)
- e.mu.Unlock()
-
- if !ok {
- return tcpip.ErrWouldBlock
- }
-
- return nil
-}
-
-// WritePackets implements stack.LinkEndpoint.WritePackets.
-func (e *endpoint) WritePackets(r *stack.Route, _ *stack.GSO, hdrs []stack.PacketDescriptor, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
- panic("not implemented")
-}
-
-// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
-func (e *endpoint) WriteRawPacket(packet buffer.VectorisedView) *tcpip.Error {
- v := packet.ToView()
- // Transmit the packet.
- e.mu.Lock()
- ok := e.tx.transmit(v, buffer.View{})
- e.mu.Unlock()
-
- if !ok {
- return tcpip.ErrWouldBlock
- }
-
- return nil
-}
-
-// dispatchLoop reads packets from the rx queue in a loop and dispatches them
-// to the network stack.
-func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
- // Post initial set of buffers.
- limit := e.rx.q.PostedBuffersLimit()
- if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l {
- limit = l
- }
- for i := uint64(0); i < limit; i++ {
- b := queue.RxBuffer{
- Offset: i * uint64(e.bufferSize),
- Size: e.bufferSize,
- ID: i,
- }
- if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) {
- log.Warningf("Unable to post %v-th buffer", i)
- }
- }
-
- // Read in a loop until a stop is requested.
- var rxb []queue.RxBuffer
- for atomic.LoadUint32(&e.stopRequested) == 0 {
- var n uint32
- rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested)
-
- // Copy data from the shared area to its own buffer, then
- // prepare to repost the buffer.
- b := make([]byte, n)
- offset := uint32(0)
- for i := range rxb {
- copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size])
- offset += rxb[i].Size
-
- rxb[i].Size = e.bufferSize
- }
-
- if n < header.EthernetMinimumSize {
- continue
- }
-
- // Send packet up the stack.
- eth := header.Ethernet(b[:header.EthernetMinimumSize])
- d.DeliverNetworkPacket(e, eth.SourceAddress(), eth.DestinationAddress(), eth.Type(), tcpip.PacketBuffer{
- Data: buffer.View(b[header.EthernetMinimumSize:]).ToVectorisedView(),
- LinkHeader: buffer.View(eth),
- })
- }
-
- // Clean state.
- e.tx.cleanup()
- e.rx.cleanup()
-
- e.completed.Done()
-}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go
deleted file mode 100644
index 199406886..000000000
--- a/pkg/tcpip/link/sharedmem/sharedmem_test.go
+++ /dev/null
@@ -1,777 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// +build linux
-
-package sharedmem
-
-import (
- "bytes"
- "io/ioutil"
- "math/rand"
- "os"
- "strings"
- "sync"
- "syscall"
- "testing"
- "time"
-
- "gvisor.dev/gvisor/pkg/tcpip"
- "gvisor.dev/gvisor/pkg/tcpip/buffer"
- "gvisor.dev/gvisor/pkg/tcpip/header"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
- "gvisor.dev/gvisor/pkg/tcpip/stack"
-)
-
-const (
- localLinkAddr = "\xde\xad\xbe\xef\x56\x78"
- remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34"
-
- queueDataSize = 1024 * 1024
- queuePipeSize = 4096
-)
-
-type queueBuffers struct {
- data []byte
- rx pipe.Tx
- tx pipe.Rx
-}
-
-func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) {
- // Prepare tx pipe.
- b, err := getBuffer(c.TxPipeFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
- q.tx.Init(b)
-
- // Prepare rx pipe.
- b, err = getBuffer(c.RxPipeFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
- q.rx.Init(b)
-
- // Get data slice.
- q.data, err = getBuffer(c.DataFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
-}
-
-func (q *queueBuffers) cleanup() {
- syscall.Munmap(q.tx.Bytes())
- syscall.Munmap(q.rx.Bytes())
- syscall.Munmap(q.data)
-}
-
-type packetInfo struct {
- addr tcpip.LinkAddress
- proto tcpip.NetworkProtocolNumber
- vv buffer.VectorisedView
- linkHeader buffer.View
-}
-
-type testContext struct {
- t *testing.T
- ep *endpoint
- txCfg QueueConfig
- rxCfg QueueConfig
- txq queueBuffers
- rxq queueBuffers
-
- packetCh chan struct{}
- mu sync.Mutex
- packets []packetInfo
-}
-
-func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext {
- var err error
- c := &testContext{
- t: t,
- packetCh: make(chan struct{}, 1000000),
- }
- c.txCfg = createQueueFDs(t, queueSizes{
- dataSize: queueDataSize,
- txPipeSize: queuePipeSize,
- rxPipeSize: queuePipeSize,
- sharedDataSize: 4096,
- })
-
- c.rxCfg = createQueueFDs(t, queueSizes{
- dataSize: queueDataSize,
- txPipeSize: queuePipeSize,
- rxPipeSize: queuePipeSize,
- sharedDataSize: 4096,
- })
-
- initQueue(t, &c.txq, &c.txCfg)
- initQueue(t, &c.rxq, &c.rxCfg)
-
- ep, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg)
- if err != nil {
- t.Fatalf("New failed: %v", err)
- }
-
- c.ep = ep.(*endpoint)
- c.ep.Attach(c)
-
- return c
-}
-
-func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteLinkAddr, localLinkAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, pkt tcpip.PacketBuffer) {
- c.mu.Lock()
- c.packets = append(c.packets, packetInfo{
- addr: remoteLinkAddr,
- proto: proto,
- vv: pkt.Data.Clone(nil),
- })
- c.mu.Unlock()
-
- c.packetCh <- struct{}{}
-}
-
-func (c *testContext) cleanup() {
- c.ep.Close()
- closeFDs(&c.txCfg)
- closeFDs(&c.rxCfg)
- c.txq.cleanup()
- c.rxq.cleanup()
-}
-
-func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) {
- for i := 0; i < n; i++ {
- select {
- case <-c.packetCh:
- case <-to:
- c.t.Fatalf(errorStr)
- }
- }
-}
-
-func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) {
- b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs)))
- queue.EncodeRxCompletion(b, size, 0)
- for i := range bs {
- queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{
- Offset: bs[i].Offset,
- Size: bs[i].Size,
- ID: bs[i].ID,
- })
- }
-}
-
-func randomFill(b []byte) {
- for i := range b {
- b[i] = byte(rand.Intn(256))
- }
-}
-
-func shuffle(b []int) {
- for i := len(b) - 1; i >= 0; i-- {
- j := rand.Intn(i + 1)
- b[i], b[j] = b[j], b[i]
- }
-}
-
-func createFile(t *testing.T, size int64, initQueue bool) int {
- tmpDir := os.Getenv("TEST_TMPDIR")
- if tmpDir == "" {
- tmpDir = os.Getenv("TMPDIR")
- }
- f, err := ioutil.TempFile(tmpDir, "sharedmem_test")
- if err != nil {
- t.Fatalf("TempFile failed: %v", err)
- }
- defer f.Close()
- syscall.Unlink(f.Name())
-
- if initQueue {
- // Write the "slot-free" flag in the initial queue.
- _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0)
- if err != nil {
- t.Fatalf("WriteAt failed: %v", err)
- }
- }
-
- fd, err := syscall.Dup(int(f.Fd()))
- if err != nil {
- t.Fatalf("Dup failed: %v", err)
- }
-
- if err := syscall.Ftruncate(fd, size); err != nil {
- syscall.Close(fd)
- t.Fatalf("Ftruncate failed: %v", err)
- }
-
- return fd
-}
-
-func closeFDs(c *QueueConfig) {
- syscall.Close(c.DataFD)
- syscall.Close(c.EventFD)
- syscall.Close(c.TxPipeFD)
- syscall.Close(c.RxPipeFD)
- syscall.Close(c.SharedDataFD)
-}
-
-type queueSizes struct {
- dataSize int64
- txPipeSize int64
- rxPipeSize int64
- sharedDataSize int64
-}
-
-func createQueueFDs(t *testing.T, s queueSizes) QueueConfig {
- fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0)
- if err != 0 {
- t.Fatalf("eventfd failed: %v", error(err))
- }
-
- return QueueConfig{
- EventFD: int(fd),
- DataFD: createFile(t, s.dataSize, false),
- TxPipeFD: createFile(t, s.txPipeSize, true),
- RxPipeFD: createFile(t, s.rxPipeSize, true),
- SharedDataFD: createFile(t, s.sharedDataSize, false),
- }
-}
-
-// TestSimpleSend sends 1000 packets with random header and payload sizes,
-// then checks that the right payload is received on the shared memory queues.
-func TestSimpleSend(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Prepare route.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- for iters := 1000; iters > 0; iters-- {
- func() {
- // Prepare and send packet.
- n := rand.Intn(10000)
- hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength()))
- hdrBuf := hdr.Prepend(n)
- randomFill(hdrBuf)
-
- n = rand.Intn(10000)
- buf := buffer.NewView(n)
- randomFill(buf)
-
- proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), proto); err != nil {
- t.Fatalf("WritePacket failed: %v", err)
- }
-
- // Receive packet.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if pi.Reserved != 0 {
- t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
- }
- contents := make([]byte, 0, pi.Size)
- for i := 0; i < pi.BufferCount; i++ {
- bi := queue.DecodeTxBufferHeader(desc, i)
- contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
- }
- c.txq.tx.Flush()
-
- defer func() {
- // Tell the endpoint about the completion of the write.
- b := c.txq.rx.Push(8)
- queue.EncodeTxCompletion(b, pi.ID)
- c.txq.rx.Flush()
- }()
-
- // Check the ethernet header.
- ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
- ethTemplate.Encode(&header.EthernetFields{
- SrcAddr: localLinkAddr,
- DstAddr: remoteLinkAddr,
- Type: proto,
- })
- if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
- t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
- }
-
- // Compare contents skipping the ethernet header added by the
- // endpoint.
- merged := append(hdrBuf, buf...)
- if uint32(len(contents)) < pi.Size {
- t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size)
- }
- contents = contents[:pi.Size][header.EthernetMinimumSize:]
-
- if !bytes.Equal(contents, merged) {
- t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged))
- }
- }()
- }
-}
-
-// TestPreserveSrcAddressInSend calls WritePacket once with LocalLinkAddress
-// set in Route (using much of the same code as TestSimpleSend), then checks
-// that the encoded ethernet header received includes the correct SrcAddr.
-func TestPreserveSrcAddressInSend(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- newLocalLinkAddress := tcpip.LinkAddress(strings.Repeat("0xFE", 6))
- // Set both remote and local link address in route.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- LocalLinkAddress: newLocalLinkAddress,
- }
-
- // WritePacket panics given a prependable with anything less than
- // the minimum size of the ethernet header.
- hdr := buffer.NewPrependable(header.EthernetMinimumSize)
-
- proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buffer.VectorisedView{}, proto); err != nil {
- t.Fatalf("WritePacket failed: %v", err)
- }
-
- // Receive packet.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if pi.Reserved != 0 {
- t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
- }
- contents := make([]byte, 0, pi.Size)
- for i := 0; i < pi.BufferCount; i++ {
- bi := queue.DecodeTxBufferHeader(desc, i)
- contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
- }
- c.txq.tx.Flush()
-
- defer func() {
- // Tell the endpoint about the completion of the write.
- b := c.txq.rx.Push(8)
- queue.EncodeTxCompletion(b, pi.ID)
- c.txq.rx.Flush()
- }()
-
- // Check that the ethernet header contains the expected SrcAddr.
- ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
- ethTemplate.Encode(&header.EthernetFields{
- SrcAddr: newLocalLinkAddress,
- DstAddr: remoteLinkAddr,
- Type: proto,
- })
- if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
- t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
- }
-}
-
-// TestFillTxQueue sends packets until the queue is full.
-func TestFillTxQueue(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses no more than 40 bytes, so write that many packets
- // until the tx queue if full.
- ids := make(map[uint64]struct{})
- for i := queuePipeSize / 40; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
-
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets
-// until the queue is full.
-func TestFillTxQueueAfterBadCompletion(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Send a bad completion.
- queue.EncodeTxCompletion(c.txq.rx.Push(8), 1)
- c.txq.rx.Flush()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Send two packets so that the id slice has at least two slots.
- for i := 2; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
- }
-
- // Complete the two writes twice.
- for i := 2; i > 0; i-- {
- pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull())
-
- queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
- queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
- c.txq.rx.Flush()
- }
- c.txq.tx.Flush()
-
- // Each packet is uses no more than 40 bytes, so write that many packets
- // until the tx queue if full.
- ids := make(map[uint64]struct{})
- for i := queuePipeSize / 40; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxMemory sends packets until the we run out of shared memory.
-func TestFillTxMemory(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses up one buffer, so write as many as possible until
- // we fill the memory.
- ids := make(map[uint64]struct{})
- for i := queueDataSize / bufferSize; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- c.txq.tx.Flush()
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber)
- if want := tcpip.ErrWouldBlock; err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of
-// shared memory for a 2-buffer packet, but still with room for a 1-buffer
-// packet.
-func TestFillTxMemoryWithMultiBuffer(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses up one buffer, so write as many as possible
- // until there is only one buffer left.
- for i := queueDataSize/bufferSize - 1; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Pull the posted buffer.
- c.txq.tx.Pull()
- c.txq.tx.Flush()
- }
-
- // Attempt to write a two-buffer packet. It must fail.
- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- uu := buffer.NewView(bufferSize).ToVectorisedView()
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, uu, header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
- }
-
- // Attempt to write the one-buffer packet again. It must succeed.
- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
- }
-}
-
-func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte {
- t.Helper()
-
- for {
- b := p.Pull()
- if b != nil {
- return b
- }
-
- select {
- case <-time.After(10 * time.Millisecond):
- case <-to:
- t.Fatal(errStr)
- }
- }
-}
-
-// TestSimpleReceive completes 1000 different receives with random payload and
-// random number of buffers. It checks that the contents match the expected
-// values.
-func TestSimpleReceive(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Check that buffers have been posted.
- limit := c.ep.rx.q.PostedBuffersLimit()
- for i := uint64(0); i < limit; i++ {
- timeout := time.After(2 * time.Second)
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted"))
-
- if want := i * bufferSize; want != bi.Offset {
- t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want)
- }
-
- if want := i; want != bi.ID {
- t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want)
- }
-
- if bufferSize != bi.Size {
- t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize)
- }
- }
- c.rxq.tx.Flush()
-
- // Create a slice with the indices 0..limit-1.
- idx := make([]int, limit)
- for i := range idx {
- idx[i] = i
- }
-
- // Complete random packets 1000 times.
- for iters := 1000; iters > 0; iters-- {
- timeout := time.After(2 * time.Second)
- // Prepare a random packet.
- shuffle(idx)
- n := 1 + rand.Intn(10)
- bufs := make([]queue.RxBuffer, n)
- contents := make([]byte, bufferSize*n-rand.Intn(500))
- randomFill(contents)
- for i := range bufs {
- j := idx[i]
- bufs[i].Size = bufferSize
- bufs[i].Offset = uint64(bufferSize * j)
- bufs[i].ID = uint64(j)
-
- copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:])
- }
-
- // Push completion.
- c.pushRxCompletion(uint32(len(contents)), bufs)
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for packet to be received, then check it.
- c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet")
- c.mu.Lock()
- rcvd := []byte(c.packets[0].vv.First())
- c.packets = c.packets[:0]
- c.mu.Unlock()
-
- if contents := contents[header.EthernetMinimumSize:]; !bytes.Equal(contents, rcvd) {
- t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents)
- }
-
- // Check that buffers have been reposted.
- for i := range bufs {
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted"))
- if bi != bufs[i] {
- t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i])
- }
- }
- c.rxq.tx.Flush()
- }
-}
-
-// TestRxBuffersReposted tests that rx buffers get reposted after they have been
-// completed.
-func TestRxBuffersReposted(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Receive all posted buffers.
- limit := c.ep.rx.q.PostedBuffersLimit()
- buffers := make([]queue.RxBuffer, 0, limit)
- for i := limit; i > 0; i-- {
- timeout := time.After(2 * time.Second)
- buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers")))
- }
- c.rxq.tx.Flush()
-
- // Check that all buffers are reposted when individually completed.
- for i := range buffers {
- timeout := time.After(2 * time.Second)
- // Complete the buffer.
- c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for it to be reposted.
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
- if bi != buffers[i] {
- t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i])
- }
- }
- c.rxq.tx.Flush()
-
- // Check that all buffers are reposted when completed in pairs.
- for i := 0; i < len(buffers)/2; i++ {
- timeout := time.After(2 * time.Second)
- // Complete with two buffers.
- c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for them to be reposted.
- for j := 0; j < 2; j++ {
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
- if bi != buffers[2*i+j] {
- t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+j])
- }
- }
- }
- c.rxq.tx.Flush()
-}
-
-// TestReceivePostingIsFull checks that the endpoint will properly handle the
-// case when a received buffer cannot be immediately reposted because it hasn't
-// been pulled from the tx pipe yet.
-func TestReceivePostingIsFull(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Complete first posted buffer before flushing it from the tx pipe.
- first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
- c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that packet is received.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
-
- // Complete another buffer.
- second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
- c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that no packet is received yet, as the worker is blocked trying
- // to repost.
- select {
- case <-time.After(500 * time.Millisecond):
- case <-c.packetCh:
- t.Fatalf("Unexpected packet received")
- }
-
- // Flush tx queue, which will allow the first buffer to be reposted,
- // and the second completion to be pulled.
- c.rxq.tx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that second packet completes.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
-}
-
-// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to
-// repost a buffer. Make sure it backs out.
-func TestCloseWhileWaitingToPost(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- cleaned := false
- defer func() {
- if !cleaned {
- c.cleanup()
- }
- }()
-
- // Complete first posted buffer before flushing it from the tx pipe.
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
- c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for packet to be indicated.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
-
- // Cleanup and wait for worker to complete.
- c.cleanup()
- cleaned = true
- c.ep.Wait()
-}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
deleted file mode 100644
index f7e816a41..000000000
--- a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sharedmem
-
-import (
- "unsafe"
-)
-
-// sharedDataPointer converts the shared data slice into a pointer so that it
-// can be used in atomic operations.
-func sharedDataPointer(sharedData []byte) *uint32 {
- return (*uint32)(unsafe.Pointer(&sharedData[0:4][0]))
-}
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
deleted file mode 100644
index 6b8d7859d..000000000
--- a/pkg/tcpip/link/sharedmem/tx.go
+++ /dev/null
@@ -1,272 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sharedmem
-
-import (
- "math"
- "syscall"
-
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
-)
-
-const (
- nilID = math.MaxUint64
-)
-
-// tx holds all state associated with a tx queue.
-type tx struct {
- data []byte
- q queue.Tx
- ids idManager
- bufs bufferManager
-}
-
-// init initializes all state needed by the tx queue based on the information
-// provided.
-//
-// The caller always retains ownership of all file descriptors passed in. The
-// queue implementation will duplicate any that it may need in the future.
-func (t *tx) init(mtu uint32, c *QueueConfig) error {
- // Map in all buffers.
- txPipe, err := getBuffer(c.TxPipeFD)
- if err != nil {
- return err
- }
-
- rxPipe, err := getBuffer(c.RxPipeFD)
- if err != nil {
- syscall.Munmap(txPipe)
- return err
- }
-
- data, err := getBuffer(c.DataFD)
- if err != nil {
- syscall.Munmap(txPipe)
- syscall.Munmap(rxPipe)
- return err
- }
-
- // Initialize state based on buffers.
- t.q.Init(txPipe, rxPipe)
- t.ids.init()
- t.bufs.init(0, len(data), int(mtu))
- t.data = data
-
- return nil
-}
-
-// cleanup releases all resources allocated during init(). It must only be
-// called if init() has previously succeeded.
-func (t *tx) cleanup() {
- a, b := t.q.Bytes()
- syscall.Munmap(a)
- syscall.Munmap(b)
- syscall.Munmap(t.data)
-}
-
-// transmit sends a packet made up of up to two buffers. Returns a boolean that
-// specifies whether the packet was successfully transmitted.
-func (t *tx) transmit(a, b []byte) bool {
- // Pull completions from the tx queue and add their buffers back to the
- // pool so that we can reuse them.
- for {
- id, ok := t.q.CompletedPacket()
- if !ok {
- break
- }
-
- if buf := t.ids.remove(id); buf != nil {
- t.bufs.free(buf)
- }
- }
-
- bSize := t.bufs.entrySize
- total := uint32(len(a) + len(b))
- bufCount := (total + bSize - 1) / bSize
-
- // Allocate enough buffers to hold all the data.
- var buf *queue.TxBuffer
- for i := bufCount; i != 0; i-- {
- b := t.bufs.alloc()
- if b == nil {
- // Failed to get all buffers. Return to the pool
- // whatever we had managed to get.
- if buf != nil {
- t.bufs.free(buf)
- }
- return false
- }
- b.Next = buf
- buf = b
- }
-
- // Copy data into allocated buffers.
- nBuf := buf
- var dBuf []byte
- for _, data := range [][]byte{a, b} {
- for len(data) > 0 {
- if len(dBuf) == 0 {
- dBuf = t.data[nBuf.Offset:][:nBuf.Size]
- nBuf = nBuf.Next
- }
- n := copy(dBuf, data)
- data = data[n:]
- dBuf = dBuf[n:]
- }
- }
-
- // Get an id for this packet and send it out.
- id := t.ids.add(buf)
- if !t.q.Enqueue(id, total, bufCount, buf) {
- t.ids.remove(id)
- t.bufs.free(buf)
- return false
- }
-
- return true
-}
-
-// getBuffer returns a memory region mapped to the full contents of the given
-// file descriptor.
-func getBuffer(fd int) ([]byte, error) {
- var s syscall.Stat_t
- if err := syscall.Fstat(fd, &s); err != nil {
- return nil, err
- }
-
- // Check that size doesn't overflow an int.
- if s.Size > int64(^uint(0)>>1) {
- return nil, syscall.EDOM
- }
-
- return syscall.Mmap(fd, 0, int(s.Size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_FILE)
-}
-
-// idDescriptor is used by idManager to either point to a tx buffer (in case
-// the ID is assigned) or to the next free element (if the id is not assigned).
-type idDescriptor struct {
- buf *queue.TxBuffer
- nextFree uint64
-}
-
-// idManager is a manager of tx buffer identifiers. It assigns unique IDs to
-// tx buffers that are added to it; the IDs can only be reused after they have
-// been removed.
-//
-// The ID assignments are stored so that the tx buffers can be retrieved from
-// the IDs previously assigned to them.
-type idManager struct {
- // ids is a slice containing all tx buffers. The ID is the index into
- // this slice.
- ids []idDescriptor
-
- // freeList a list of free IDs.
- freeList uint64
-}
-
-// init initializes the id manager.
-func (m *idManager) init() {
- m.freeList = nilID
-}
-
-// add assigns an ID to the given tx buffer.
-func (m *idManager) add(b *queue.TxBuffer) uint64 {
- if i := m.freeList; i != nilID {
- // There is an id available in the free list, just use it.
- m.ids[i].buf = b
- m.freeList = m.ids[i].nextFree
- return i
- }
-
- // We need to expand the id descriptor.
- m.ids = append(m.ids, idDescriptor{buf: b})
- return uint64(len(m.ids) - 1)
-}
-
-// remove retrieves the tx buffer associated with the given ID, and removes the
-// ID from the assigned table so that it can be reused in the future.
-func (m *idManager) remove(i uint64) *queue.TxBuffer {
- if i >= uint64(len(m.ids)) {
- return nil
- }
-
- desc := &m.ids[i]
- b := desc.buf
- if b == nil {
- // The provided id is not currently assigned.
- return nil
- }
-
- desc.buf = nil
- desc.nextFree = m.freeList
- m.freeList = i
-
- return b
-}
-
-// bufferManager manages a buffer region broken up into smaller, equally sized
-// buffers. Smaller buffers can be allocated and freed.
-type bufferManager struct {
- freeList *queue.TxBuffer
- curOffset uint64
- limit uint64
- entrySize uint32
-}
-
-// init initializes the buffer manager.
-func (b *bufferManager) init(initialOffset, size, entrySize int) {
- b.freeList = nil
- b.curOffset = uint64(initialOffset)
- b.limit = uint64(initialOffset + size/entrySize*entrySize)
- b.entrySize = uint32(entrySize)
-}
-
-// alloc allocates a buffer from the manager, if one is available.
-func (b *bufferManager) alloc() *queue.TxBuffer {
- if b.freeList != nil {
- // There is a descriptor ready for reuse in the free list.
- d := b.freeList
- b.freeList = d.Next
- d.Next = nil
- return d
- }
-
- if b.curOffset < b.limit {
- // There is room available in the never-used range, so create
- // a new descriptor for it.
- d := &queue.TxBuffer{
- Offset: b.curOffset,
- Size: b.entrySize,
- }
- b.curOffset += uint64(b.entrySize)
- return d
- }
-
- return nil
-}
-
-// free returns all buffers in the list to the buffer manager so that they can
-// be reused.
-func (b *bufferManager) free(d *queue.TxBuffer) {
- // Find the last buffer in the list.
- last := d
- for last.Next != nil {
- last = last.Next
- }
-
- // Push list onto free list.
- last.Next = b.freeList
- b.freeList = d
-}