summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/pipe
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem/pipe')
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/BUILD24
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe.go78
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_test.go517
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go35
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/rx.go93
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/tx.go161
6 files changed, 0 insertions, 908 deletions
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD
deleted file mode 100644
index 6b5bc542c..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/BUILD
+++ /dev/null
@@ -1,24 +0,0 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
-load("@io_bazel_rules_go//go:def.bzl", "go_test")
-
-package(licenses = ["notice"])
-
-go_library(
- name = "pipe",
- srcs = [
- "pipe.go",
- "pipe_unsafe.go",
- "rx.go",
- "tx.go",
- ],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe",
- visibility = ["//visibility:public"],
-)
-
-go_test(
- name = "pipe_test",
- srcs = [
- "pipe_test.go",
- ],
- embed = [":pipe"],
-)
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go
deleted file mode 100644
index 74c9f0311..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package pipe implements a shared memory ring buffer on which a single reader
-// and a single writer can operate (read/write) concurrently. The ring buffer
-// allows for data of different sizes to be written, and preserves the boundary
-// of the written data.
-//
-// Example usage is as follows:
-//
-// wb := t.Push(20)
-// // Write data to wb.
-// t.Flush()
-//
-// rb := r.Pull()
-// // Do something with data in rb.
-// t.Flush()
-package pipe
-
-import (
- "math"
-)
-
-const (
- jump uint64 = math.MaxUint32 + 1
- offsetMask uint64 = math.MaxUint32
- revolutionMask uint64 = ^offsetMask
-
- sizeOfSlotHeader = 8 // sizeof(uint64)
- slotFree uint64 = 1 << 63
- slotSizeMask uint64 = math.MaxUint32
-)
-
-// payloadToSlotSize calculates the total size of a slot based on its payload
-// size. The total size is the header size, plus the payload size, plus padding
-// if necessary to make the total size a multiple of sizeOfSlotHeader.
-func payloadToSlotSize(payloadSize uint64) uint64 {
- s := sizeOfSlotHeader + payloadSize
- return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1)
-}
-
-// slotToPayloadSize calculates the payload size of a slot based on the total
-// size of the slot. This is only meant to be used when creating slots that
-// don't carry information (e.g., free slots or wrap slots).
-func slotToPayloadSize(offset uint64) uint64 {
- return offset - sizeOfSlotHeader
-}
-
-// pipe is a basic data structure used by both (transmit & receive) ends of a
-// pipe. Indices into this pipe are split into two fields: offset, which counts
-// the number of bytes from the beginning of the buffer, and revolution, which
-// counts the number of times the index has wrapped around.
-type pipe struct {
- buffer []byte
-}
-
-// init initializes the pipe buffer such that its size is a multiple of the size
-// of the slot header.
-func (p *pipe) init(b []byte) {
- p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)]
-}
-
-// data returns a section of the buffer starting at the given index (which may
-// include revolution information) and with the given size.
-func (p *pipe) data(idx uint64, size uint64) []byte {
- return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size]
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
deleted file mode 100644
index 59ef69a8b..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
+++ /dev/null
@@ -1,517 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-import (
- "math/rand"
- "reflect"
- "runtime"
- "sync"
- "testing"
-)
-
-func TestSimpleReadWrite(t *testing.T) {
- // Check that a simple write can be properly read from the rx side.
- tr := rand.New(rand.NewSource(99))
- rr := rand.New(rand.NewSource(99))
-
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- wb := tx.Push(10)
- if wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- for i := range wb {
- wb[i] = byte(tr.Intn(256))
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- rb := rx.Pull()
- if len(rb) != 10 {
- t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10)
- }
-
- for i := range rb {
- if v := byte(rr.Intn(256)); v != rb[i] {
- t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v)
- }
- }
- rx.Flush()
-}
-
-func TestEmptyRead(t *testing.T) {
- // Check that pulling from an empty pipe fails.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestTooLargeWrite(t *testing.T) {
- // Check that writes that are too large are properly rejected.
- b := make([]byte, 96)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(96); wb != nil {
- t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe")
- }
-
- if wb := tx.Push(88); wb != nil {
- t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe")
- }
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-}
-
-func TestFullWrite(t *testing.T) {
- // Check that writes fail when the pipe is full.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-
- if wb := tx.Push(1); wb != nil {
- t.Fatalf("Write succeeded on full pipe")
- }
-}
-
-func TestFullAndFlushedWrite(t *testing.T) {
- // Check that writes fail when the pipe is full and has already been
- // flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(80); wb == nil {
- t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
- }
-
- tx.Flush()
-
- if wb := tx.Push(1); wb != nil {
- t.Fatalf("Write succeeded on full pipe")
- }
-}
-
-func TestTxFlushTwice(t *testing.T) {
- // Checks that a second consecutive tx flush is a no-op.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- // Make copy of original tx queue, flush it, then check that it didn't
- // change.
- orig := tx
- tx.Flush()
-
- if !reflect.DeepEqual(orig, tx) {
- t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig)
- }
-}
-
-func TestRxFlushTwice(t *testing.T) {
- // Checks that a second consecutive rx flush is a no-op.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // Make copy of original rx queue, flush it, then check that it didn't
- // change.
- orig := rx
- rx.Flush()
-
- if !reflect.DeepEqual(orig, rx) {
- t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig)
- }
-}
-
-func TestWrapInMiddleOfTransaction(t *testing.T) {
- // Check that writes are not flushed when we need to wrap the buffer
- // around.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- // We haven't flushed yet, so pull must return nil.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- tx.Flush()
-
- // The two buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-}
-
-func TestWriteAbort(t *testing.T) {
- // Check that a read fails on a pipe that has had data pushed to it but
- // has aborted the push.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Write failed on empty pipe")
- }
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-
- tx.Abort()
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestWrappedWriteAbort(t *testing.T) {
- // Check that writes are properly aborted even if the writes wrap
- // around.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- // We haven't flushed yet, so pull must return nil.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- tx.Abort()
-
- // The pushes were aborted, so no data should be readable.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on non-flushed pipe")
- }
-
- // Try the same transactions again, but flush this time.
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- tx.Flush()
-
- // The two buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-}
-
-func TestEmptyReadOnNonFlushedWrite(t *testing.T) {
- // Check that a read fails on a pipe that has had data pushed to it
- // but not yet flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Write failed on empty pipe")
- }
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-
- tx.Flush()
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull on failed on non-empty pipe")
- }
-}
-
-func TestPullAfterPullingEntirePipe(t *testing.T) {
- // Check that Pull fails when the pipe is full, but all of it has
- // already been pulled but not yet flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3
- // buffers that will fill the pipe.
- if wb := tx.Push(10); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- if wb := tx.Push(20); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- if wb := tx.Push(24); wb == nil {
- t.Fatalf("Push failed on non-full pipe")
- }
-
- tx.Flush()
-
- // The three buffers must be available now.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
-
- // Fourth pull must fail.
- if rb := rx.Pull(); rb != nil {
- t.Fatalf("Pull succeeded on empty pipe")
- }
-}
-
-func TestNoRoomToWrapOnPush(t *testing.T) {
- // Check that Push fails when it tries to allocate room to add a wrap
- // message.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- var rx Rx
- rx.Init(b)
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // At this point the ring buffer is empty, but the write is at offset
- // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20,
- // which won't fit (64+20+8+padding = 96, which wouldn't leave room for
- // the padding), so it wraps around.
- if wb := tx.Push(20); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-
- tx.Flush()
-
- // Buffer offset is at 28. Try to write 70, which would require a wrap
- // slot which cannot be created now.
- if wb := tx.Push(70); wb != nil {
- t.Fatalf("Push succeeded on pipe with no room for wrap message")
- }
-}
-
-func TestRxImplicitFlushOfWrapMessage(t *testing.T) {
- // Check if the first read is that of a wrapping message, that it gets
- // immediately flushed.
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- if wb := tx.Push(50); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
- tx.Flush()
-
- // This will cause a wrapping message to written.
- if wb := tx.Push(60); wb != nil {
- t.Fatalf("Push succeeded when there is no room in pipe")
- }
-
- var rx Rx
- rx.Init(b)
-
- // Read the first message.
- if rb := rx.Pull(); rb == nil {
- t.Fatalf("Pull failed on non-empty pipe")
- }
- rx.Flush()
-
- // This should fail because of the wrapping message is taking up space.
- if wb := tx.Push(60); wb != nil {
- t.Fatalf("Push succeeded when there is no room in pipe")
- }
-
- // Try to read the next one. This should consume the wrapping message.
- rx.Pull()
-
- // This must now succeed.
- if wb := tx.Push(60); wb == nil {
- t.Fatalf("Push failed on empty pipe")
- }
-}
-
-func TestConcurrentReaderWriter(t *testing.T) {
- // Push a million buffers of random sizes and random contents. Check
- // that buffers read match what was written.
- tr := rand.New(rand.NewSource(99))
- rr := rand.New(rand.NewSource(99))
-
- b := make([]byte, 100)
- var tx Tx
- tx.Init(b)
-
- var rx Rx
- rx.Init(b)
-
- const count = 1000000
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- runtime.Gosched()
- for i := 0; i < count; i++ {
- n := 1 + tr.Intn(80)
- wb := tx.Push(uint64(n))
- for wb == nil {
- wb = tx.Push(uint64(n))
- }
-
- for j := range wb {
- wb[j] = byte(tr.Intn(256))
- }
-
- tx.Flush()
- }
- }()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- runtime.Gosched()
- for i := 0; i < count; i++ {
- n := 1 + rr.Intn(80)
- rb := rx.Pull()
- for rb == nil {
- rb = rx.Pull()
- }
-
- if n != len(rb) {
- t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n)
- }
-
- for j := range rb {
- if v := byte(rr.Intn(256)); v != rb[j] {
- t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v)
- }
- }
-
- rx.Flush()
- }
- }()
-
- wg.Wait()
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
deleted file mode 100644
index 62d17029e..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-import (
- "sync/atomic"
- "unsafe"
-)
-
-func (p *pipe) write(idx uint64, v uint64) {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- *ptr = v
-}
-
-func (p *pipe) writeAtomic(idx uint64, v uint64) {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- atomic.StoreUint64(ptr, v)
-}
-
-func (p *pipe) readAtomic(idx uint64) uint64 {
- ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
- return atomic.LoadUint64(ptr)
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go
deleted file mode 100644
index f22e533ac..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/rx.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Rx is the receive side of the shared memory ring buffer.
-type Rx struct {
- p pipe
-
- tail uint64
- head uint64
-}
-
-// Init initializes the receive end of the pipe. In the initial state, the next
-// slot to be inspected is the very first one.
-func (r *Rx) Init(b []byte) {
- r.p.init(b)
- r.tail = 0xfffffffe * jump
- r.head = r.tail
-}
-
-// Pull reads the next buffer from the pipe, returning nil if there isn't one
-// currently available.
-//
-// The returned slice is available until Flush() is next called. After that, it
-// must not be touched.
-func (r *Rx) Pull() []byte {
- if r.head == r.tail+jump {
- // We've already pulled the whole pipe.
- return nil
- }
-
- header := r.p.readAtomic(r.head)
- if header&slotFree != 0 {
- // The next slot is free, we can't pull it yet.
- return nil
- }
-
- payloadSize := header & slotSizeMask
- newHead := r.head + payloadToSlotSize(payloadSize)
- headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
-
- // Check if this is a wrapping slot. If that's the case, it carries no
- // data, so we just skip it and try again from the first slot.
- if int64(newHead-headWrap) >= 0 {
- if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
- return nil
- }
-
- if r.tail == r.head {
- // If this is the first pull since the last Flush()
- // call, we flush the state so that the sender can use
- // this space if it needs to.
- r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
- r.tail = newHead
- }
-
- r.head = newHead
- return r.Pull()
- }
-
- // Grab the buffer before updating r.head.
- b := r.p.data(r.head, payloadSize)
- r.head = newHead
- return b
-}
-
-// Flush tells the transmitter that all buffers pulled since the last Flush()
-// have been used, so the transmitter is free to used their slots for further
-// transmission.
-func (r *Rx) Flush() {
- if r.head == r.tail {
- return
- }
- r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
- r.tail = r.head
-}
-
-// Bytes returns the byte slice on which the pipe operates.
-func (r *Rx) Bytes() []byte {
- return r.p.buffer
-}
diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go
deleted file mode 100644
index 9841eb231..000000000
--- a/pkg/tcpip/link/sharedmem/pipe/tx.go
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-// Tx is the transmit side of the shared memory ring buffer.
-type Tx struct {
- p pipe
- maxPayloadSize uint64
-
- head uint64
- tail uint64
- next uint64
-
- tailHeader uint64
-}
-
-// Init initializes the transmit end of the pipe. In the initial state, the next
-// slot to be written is the very first one, and the transmitter has the whole
-// ring buffer available to it.
-func (t *Tx) Init(b []byte) {
- t.p.init(b)
- // maxPayloadSize excludes the header of the payload, and the header
- // of the wrapping message.
- t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
- t.tail = 0xfffffffe * jump
- t.next = t.tail
- t.head = t.tail + jump
- t.p.write(t.tail, slotFree)
-}
-
-// Capacity determines how many records of the given size can be written to the
-// pipe before it fills up.
-func (t *Tx) Capacity(recordSize uint64) uint64 {
- available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
- entryLen := payloadToSlotSize(recordSize)
- return available / entryLen
-}
-
-// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
-// populates the returned slice with the data to be transferred and enventually
-// calls Flush() to make the data visible to the reader, or Abort() to make the
-// pipe forget all Push() calls since the last Flush().
-//
-// The returned slice is available until Flush() or Abort() is next called.
-// After that, it must not be touched.
-func (t *Tx) Push(payloadSize uint64) []byte {
- // Fail request if we know we will never have enough room.
- if payloadSize > t.maxPayloadSize {
- return nil
- }
-
- totalLen := payloadToSlotSize(payloadSize)
- newNext := t.next + totalLen
- nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
- if int64(newNext-nextWrap) >= 0 {
- // The new buffer would overflow the pipe, so we push a wrapping
- // slot, then try to add the actual slot to the front of the
- // pipe.
- newNext = (newNext & revolutionMask) + jump
- wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
- if !t.reclaim(newNext) {
- return nil
- }
-
- oldNext := t.next
- t.next = newNext
- if oldNext != t.tail {
- t.p.write(oldNext, wrappingPayloadSize)
- } else {
- t.tailHeader = wrappingPayloadSize
- t.Flush()
- }
-
- newNext += totalLen
- }
-
- // Check that we have enough room for the buffer.
- if !t.reclaim(newNext) {
- return nil
- }
-
- if t.next != t.tail {
- t.p.write(t.next, payloadSize)
- } else {
- t.tailHeader = payloadSize
- }
-
- // Grab the buffer before updating t.next.
- b := t.p.data(t.next, payloadSize)
- t.next = newNext
-
- return b
-}
-
-// reclaim attempts to advance the head until at least newNext. If the head is
-// already at or beyond newNext, nothing happens and true is returned; otherwise
-// it tries to reclaim slots that have already been consumed by the receive end
-// of the pipe (they will be marked as free) and returns a boolean indicating
-// whether it was successful in reclaiming enough slots.
-func (t *Tx) reclaim(newNext uint64) bool {
- for int64(newNext-t.head) > 0 {
- // Can't reclaim if slot is not free.
- header := t.p.readAtomic(t.head)
- if header&slotFree == 0 {
- return false
- }
-
- payloadSize := header & slotSizeMask
- newHead := t.head + payloadToSlotSize(payloadSize)
-
- // Check newHead is within bounds and valid.
- if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
- return false
- }
-
- t.head = newHead
- }
-
- return true
-}
-
-// Abort causes all Push() calls since the last Flush() to be forgotten and
-// therefore they will not be made visible to the receiver.
-func (t *Tx) Abort() {
- t.next = t.tail
-}
-
-// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
-// is the most recent] to be made visible to the receiver.
-func (t *Tx) Flush() {
- if t.next == t.tail {
- // Nothing to do if there are no pushed buffers.
- return
- }
-
- if t.next != t.head {
- // The receiver will spin in t.next, so we must make sure that
- // the slotFree bit is set.
- t.p.write(t.next, slotFree)
- }
-
- t.p.writeAtomic(t.tail, t.tailHeader)
- t.tail = t.next
-}
-
-// Bytes returns the byte slice on which the pipe operates.
-func (t *Tx) Bytes() []byte {
- return t.p.buffer
-}