summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/queue
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue')
-rw-r--r--pkg/tcpip/link/sharedmem/queue/BUILD29
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go517
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go221
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go151
4 files changed, 0 insertions, 918 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD
deleted file mode 100644
index 8c9234d54..000000000
--- a/pkg/tcpip/link/sharedmem/queue/BUILD
+++ /dev/null
@@ -1,29 +0,0 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
-load("@io_bazel_rules_go//go:def.bzl", "go_test")
-
-package(licenses = ["notice"])
-
-go_library(
- name = "queue",
- srcs = [
- "rx.go",
- "tx.go",
- ],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue",
- visibility = ["//visibility:public"],
- deps = [
- "//pkg/log",
- "//pkg/tcpip/link/sharedmem/pipe",
- ],
-)
-
-go_test(
- name = "queue_test",
- srcs = [
- "queue_test.go",
- ],
- embed = [":queue"],
- deps = [
- "//pkg/tcpip/link/sharedmem/pipe",
- ],
-)
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
deleted file mode 100644
index 9a0aad5d7..000000000
--- a/pkg/tcpip/link/sharedmem/queue/queue_test.go
+++ /dev/null
@@ -1,517 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package queue
-
-import (
- "encoding/binary"
- "reflect"
- "testing"
-
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-func TestBasicTxQueue(t *testing.T) {
- // Tests that a basic transmit on a queue works, and that completion
- // gets properly reported as well.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Enqueue two buffers.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
- if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue failed on empty queue")
- }
-
- // Check the contents of the pipe.
- d := rxp.Pull()
- if d == nil {
- t.Fatalf("Tx pipe is empty after Enqueue")
- }
-
- want := []byte{
- 234, 3, 0, 0, 0, 0, 0, 0, // id
- 100, 0, 0, 0, // total size
- 0, 0, 0, 0, // reserved
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- }
-
- if !reflect.DeepEqual(want, d) {
- t.Fatalf("Bad posted packet: got %v, want %v", d, want)
- }
-
- rxp.Flush()
-
- // Check that there are no completions yet.
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Packet reported as completed too soon")
- }
-
- // Post a completion.
- d = txp.Push(8)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- binary.LittleEndian.PutUint64(d, usedID)
- txp.Flush()
-
- // Check that completion is properly reported.
- id, ok := q.CompletedPacket()
- if !ok {
- t.Fatalf("Completion not reported")
- }
-
- if id != usedID {
- t.Fatalf("Bad completion id: got %v, want %v", id, usedID)
- }
-}
-
-func TestBasicRxQueue(t *testing.T) {
- // Tests that a basic receive on a queue works.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post two buffers.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- {200, 40, 2123, 0},
- }
-
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on empty queue")
- }
-
- // Check the contents of the pipe.
- want := [][]byte{
- {
- 100, 0, 0, 0, 0, 0, 0, 0, // Offset1
- 60, 0, 0, 0, // Size1
- 0, 0, 0, 0, // Remaining in group 1
- 0, 0, 0, 0, 0, 0, 0, 0, // User data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
- },
- {
- 200, 0, 0, 0, 0, 0, 0, 0, // Offset2
- 40, 0, 0, 0, // Size2
- 0, 0, 0, 0, // Remaining in group 2
- 0, 0, 0, 0, 0, 0, 0, 0, // User data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- },
- }
-
- for i := range b {
- d := rxp.Pull()
- if d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
-
- if !reflect.DeepEqual(want[i], d) {
- t.Fatalf("Bad posted packet: got %v, want %v", d, want[i])
- }
-
- rxp.Flush()
- }
-
- // Check that there are no completions.
- if _, n := q.Dequeue(nil); n != 0 {
- t.Fatalf("Packet reported as received too soon")
- }
-
- // Post a completion.
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
-
- // Check that completion is properly reported.
- bufs, n := q.Dequeue(nil)
- if n != 100 {
- t.Fatalf("Bad packet size: got %v, want %v", n, 100)
- }
-
- if !reflect.DeepEqual(bufs, b) {
- t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b)
- }
-}
-
-func TestBadTxCompletion(t *testing.T) {
- // Check that tx completions with bad sizes are properly ignored.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Post a completion that is too short, and check that it is ignored.
- if d := txp.Push(7); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion that is too long, and check that it is ignored.
- if d := txp.Push(10); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if _, ok := q.CompletedPacket(); ok {
- t.Fatalf("Bad completion not ignored")
- }
-}
-
-func TestBadRxCompletion(t *testing.T) {
- // Check that bad rx completions are properly ignored.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post a completion that is too short, and check that it is ignored.
- if d := txp.Push(7); d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
- txp.Flush()
-
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion whose buffer sizes add up to less than the total
- // size.
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 10, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 10, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-
- // Post a completion whose buffer sizes will cause a 32-bit overflow,
- // but adds up to the right number.
- d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 255, 255, 255, 255, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 101, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
- if b, _ := q.Dequeue(nil); b != nil {
- t.Fatalf("Bad completion not ignored")
- }
-}
-
-func TestFillTxPipe(t *testing.T) {
- // Check that transmitting a new buffer when the buffer pipe is full
- // fails gracefully.
- pb1 := make([]byte, 104)
- pb2 := make([]byte, 104)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Transmit twice, which should fill the tx pipe.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
- for i := uint64(0); i < 2; i++ {
- if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Failed to transmit buffer")
- }
- }
-
- // Transmit another packet now that the tx pipe is full.
- if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue succeeded when tx pipe is full")
- }
-}
-
-func TestFillRxPipe(t *testing.T) {
- // Check that posting a new buffer when the buffer pipe is full fails
- // gracefully.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Post a buffer twice, it should fill the tx pipe.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- }
-
- for i := 0; i < 2; i++ {
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on non-full queue")
- }
- }
-
- // Post another buffer now that the tx pipe is full.
- if q.PostBuffers(b) {
- t.Fatalf("PostBuffers succeeded on full queue")
- }
-}
-
-func TestLotsOfTransmissions(t *testing.T) {
- // Make sure pipes are being properly flushed when transmitting packets.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Tx
- q.Init(pb1, pb2)
-
- // Prepare packet with two buffers.
- b := []TxBuffer{
- {nil, 100, 60},
- {nil, 200, 40},
- }
-
- b[0].Next = &b[1]
-
- const usedID = 1002
- const usedTotalSize = 100
-
- // Post 100000 packets and completions.
- for i := 100000; i > 0; i-- {
- if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
- t.Fatalf("Enqueue failed on non-full queue")
- }
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after Enqueue")
- }
- rxp.Flush()
-
- d := txp.Push(8)
- if d == nil {
- t.Fatalf("Unable to write to rx pipe")
- }
- binary.LittleEndian.PutUint64(d, usedID)
- txp.Flush()
- if _, ok := q.CompletedPacket(); !ok {
- t.Fatalf("Completion not returned")
- }
- }
-}
-
-func TestLotsOfReceptions(t *testing.T) {
- // Make sure pipes are being properly flushed when receiving packets.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var rxp pipe.Rx
- rxp.Init(pb1)
-
- var txp pipe.Tx
- txp.Init(pb2)
-
- var q Rx
- q.Init(pb1, pb2, nil)
-
- // Prepare for posting two buffers.
- b := []RxBuffer{
- {100, 60, 1077, 0},
- {200, 40, 2123, 0},
- }
-
- // Post 100000 buffers and completions.
- for i := 100000; i > 0; i-- {
- if !q.PostBuffers(b) {
- t.Fatalf("PostBuffers failed on non-full queue")
- }
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
- rxp.Flush()
-
- if d := rxp.Pull(); d == nil {
- t.Fatalf("Tx pipe is empty after PostBuffers")
- }
- rxp.Flush()
-
- d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
- if d == nil {
- t.Fatalf("Unable to push to rx pipe")
- }
-
- copy(d, []byte{
- 100, 0, 0, 0, // packet size
- 0, 0, 0, 0, // reserved
-
- 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
- 60, 0, 0, 0, // size 1
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
- 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
-
- 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
- 40, 0, 0, 0, // size 2
- 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
- 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
- })
-
- txp.Flush()
-
- if _, n := q.Dequeue(nil); n == 0 {
- t.Fatalf("Dequeue failed when there is a completion")
- }
- }
-}
-
-func TestRxEnableNotification(t *testing.T) {
- // Check that enabling nofifications results in properly updated state.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var state uint32
- var q Rx
- q.Init(pb1, pb2, &state)
-
- q.EnableNotification()
- if state != eventFDEnabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
- }
-}
-
-func TestRxDisableNotification(t *testing.T) {
- // Check that disabling nofifications results in properly updated state.
- pb1 := make([]byte, 100)
- pb2 := make([]byte, 100)
-
- var state uint32
- var q Rx
- q.Init(pb1, pb2, &state)
-
- q.DisableNotification()
- if state != eventFDDisabled {
- t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
- }
-}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
deleted file mode 100644
index 696e6c9e5..000000000
--- a/pkg/tcpip/link/sharedmem/queue/rx.go
+++ /dev/null
@@ -1,221 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package queue provides the implementation of transmit and receive queues
-// based on shared memory ring buffers.
-package queue
-
-import (
- "encoding/binary"
- "sync/atomic"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-const (
- // Offsets within a posted buffer.
- postedOffset = 0
- postedSize = 8
- postedRemainingInGroup = 12
- postedUserData = 16
- postedID = 24
-
- sizeOfPostedBuffer = 32
-
- // Offsets within a received packet header.
- consumedPacketSize = 0
- consumedPacketReserved = 4
-
- sizeOfConsumedPacketHeader = 8
-
- // Offsets within a consumed buffer.
- consumedOffset = 0
- consumedSize = 8
- consumedUserData = 12
- consumedID = 20
-
- sizeOfConsumedBuffer = 28
-
- // The following are the allowed states of the shared data area.
- eventFDUninitialized = 0
- eventFDDisabled = 1
- eventFDEnabled = 2
-)
-
-// RxBuffer is the descriptor of a receive buffer.
-type RxBuffer struct {
- Offset uint64
- Size uint32
- ID uint64
- UserData uint64
-}
-
-// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
-// pipe is used to "post" buffers, while the rx pipe is used to receive packets
-// whose contents have been written to previously posted buffers.
-//
-// This struct is thread-compatible.
-type Rx struct {
- tx pipe.Tx
- rx pipe.Rx
- sharedEventFDState *uint32
-}
-
-// Init initializes the receive queue with the given pipes, and shared state
-// pointer -- the latter is used to enable/disable eventfd notifications.
-func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
- r.sharedEventFDState = sharedEventFDState
- r.tx.Init(tx)
- r.rx.Init(rx)
-}
-
-// EnableNotification updates the shared state such that the peer will notify
-// the eventfd when there are packets to be dequeued.
-func (r *Rx) EnableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
-}
-
-// DisableNotification updates the shared state such that the peer will not
-// notify the eventfd.
-func (r *Rx) DisableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
-}
-
-// PostedBuffersLimit returns the maximum number of buffers that can be posted
-// before the tx queue fills up.
-func (r *Rx) PostedBuffersLimit() uint64 {
- return r.tx.Capacity(sizeOfPostedBuffer)
-}
-
-// PostBuffers makes the given buffers available for receiving data from the
-// peer. Once they are posted, the peer is free to write to them and will
-// eventually post them back for consumption.
-func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
- for i := range buffers {
- b := r.tx.Push(sizeOfPostedBuffer)
- if b == nil {
- r.tx.Abort()
- return false
- }
-
- pb := &buffers[i]
- binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
- binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
- binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
- binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
- binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
- }
-
- r.tx.Flush()
-
- return true
-}
-
-// Dequeue receives buffers that have been previously posted by PostBuffers()
-// and that have been filled by the peer and posted back.
-//
-// This is similar to append() in that new buffers are appended to "bufs", with
-// reallocation only if "bufs" doesn't have enough capacity.
-func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
- for {
- outBufs := bufs
-
- // Pull the next descriptor from the rx pipe.
- b := r.rx.Pull()
- if b == nil {
- return bufs, 0
- }
-
- if len(b) < sizeOfConsumedPacketHeader {
- log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
- r.rx.Flush()
- continue
- }
-
- totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
-
- // Calculate the number of buffer descriptors and copy them
- // over to the output.
- count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
- offset := sizeOfConsumedPacketHeader
- buffersSize := uint32(0)
- for i := count; i > 0; i-- {
- s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
- buffersSize += s
- if buffersSize < s {
- // The buffer size overflows an unsigned 32-bit
- // integer, so break out and force it to be
- // ignored.
- totalDataSize = 1
- buffersSize = 0
- break
- }
-
- outBufs = append(outBufs, RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
- Size: s,
- ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
- })
-
- offset += sizeOfConsumedBuffer
- }
-
- r.rx.Flush()
-
- if buffersSize < totalDataSize {
- // The descriptor is corrupted, ignore it.
- log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
- continue
- }
-
- return outBufs, totalDataSize
- }
-}
-
-// Bytes returns the byte slices on which the queue operates.
-func (r *Rx) Bytes() (tx, rx []byte) {
- return r.tx.Bytes(), r.rx.Bytes()
-}
-
-// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
-func DecodeRxBufferHeader(b []byte) RxBuffer {
- return RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
- Size: binary.LittleEndian.Uint32(b[postedSize:]),
- ID: binary.LittleEndian.Uint64(b[postedID:]),
- UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
- }
-}
-
-// RxCompletionSize returns the number of bytes needed to encode an rx
-// completion containing "count" buffers.
-func RxCompletionSize(count int) uint64 {
- return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
-}
-
-// EncodeRxCompletion encodes an rx completion header.
-func EncodeRxCompletion(b []byte, size, reserved uint32) {
- binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
- binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
-}
-
-// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
-func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
- b = b[RxCompletionSize(i):]
- binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
- binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
- binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
- binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
-}
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
deleted file mode 100644
index beffe807b..000000000
--- a/pkg/tcpip/link/sharedmem/queue/tx.go
+++ /dev/null
@@ -1,151 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package queue
-
-import (
- "encoding/binary"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-const (
- // Offsets within a packet header.
- packetID = 0
- packetSize = 8
- packetReserved = 12
-
- sizeOfPacketHeader = 16
-
- // Offsets with a buffer descriptor
- bufferOffset = 0
- bufferSize = 8
-
- sizeOfBufferDescriptor = 12
-)
-
-// TxBuffer is the descriptor of a transmit buffer.
-type TxBuffer struct {
- Next *TxBuffer
- Offset uint64
- Size uint32
-}
-
-// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the
-// tx pipe is used to request the transmission of packets, while the rx pipe
-// is used to receive which transmissions have completed.
-//
-// This struct is thread-compatible.
-type Tx struct {
- tx pipe.Tx
- rx pipe.Rx
-}
-
-// Init initializes the transmit queue with the given pipes.
-func (t *Tx) Init(tx, rx []byte) {
- t.tx.Init(tx)
- t.rx.Init(rx)
-}
-
-// Enqueue queues the given linked list of buffers for transmission as one
-// packet. While it is queued, the caller must not modify them.
-func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool {
- // Reserve room in the tx pipe.
- totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor
-
- b := t.tx.Push(totalLen)
- if b == nil {
- return false
- }
-
- // Initialize the packet and buffer descriptors.
- binary.LittleEndian.PutUint64(b[packetID:], id)
- binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen)
- binary.LittleEndian.PutUint32(b[packetReserved:], 0)
-
- offset := sizeOfPacketHeader
- for i := bufferCount; i != 0; i-- {
- binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset)
- binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size)
- offset += sizeOfBufferDescriptor
- buffer = buffer.Next
- }
-
- t.tx.Flush()
-
- return true
-}
-
-// CompletedPacket returns the id of the last completed transmission. The
-// returned id, if any, refers to a value passed on a previous call to
-// Enqueue().
-func (t *Tx) CompletedPacket() (id uint64, ok bool) {
- for {
- b := t.rx.Pull()
- if b == nil {
- return 0, false
- }
-
- if len(b) != 8 {
- t.rx.Flush()
- log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8)
- continue
- }
-
- v := binary.LittleEndian.Uint64(b)
-
- t.rx.Flush()
-
- return v, true
- }
-}
-
-// Bytes returns the byte slices on which the queue operates.
-func (t *Tx) Bytes() (tx, rx []byte) {
- return t.tx.Bytes(), t.rx.Bytes()
-}
-
-// TxPacketInfo holds information about a packet sent on a tx queue.
-type TxPacketInfo struct {
- ID uint64
- Size uint32
- Reserved uint32
- BufferCount int
-}
-
-// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue.
-func DecodeTxPacketHeader(b []byte) TxPacketInfo {
- return TxPacketInfo{
- ID: binary.LittleEndian.Uint64(b[packetID:]),
- Size: binary.LittleEndian.Uint32(b[packetSize:]),
- Reserved: binary.LittleEndian.Uint32(b[packetReserved:]),
- BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor,
- }
-}
-
-// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent
-// over a tx queue.
-func DecodeTxBufferHeader(b []byte, i int) TxBuffer {
- b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:]
- return TxBuffer{
- Offset: binary.LittleEndian.Uint64(b[bufferOffset:]),
- Size: binary.LittleEndian.Uint32(b[bufferSize:]),
- }
-}
-
-// EncodeTxCompletion encodes a tx completion header.
-func EncodeTxCompletion(b []byte, id uint64) {
- binary.LittleEndian.PutUint64(b, id)
-}