diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue')
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/BUILD | 29 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/queue_test.go | 517 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/rx.go | 221 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/tx.go | 151 |
4 files changed, 0 insertions, 918 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD deleted file mode 100644 index de1ce043d..000000000 --- a/pkg/tcpip/link/sharedmem/queue/BUILD +++ /dev/null @@ -1,29 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "queue", - srcs = [ - "rx.go", - "tx.go", - ], - importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue", - visibility = ["//:sandbox"], - deps = [ - "//pkg/log", - "//pkg/tcpip/link/sharedmem/pipe", - ], -) - -go_test( - name = "queue_test", - srcs = [ - "queue_test.go", - ], - embed = [":queue"], - deps = [ - "//pkg/tcpip/link/sharedmem/pipe", - ], -) diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go deleted file mode 100644 index 9a0aad5d7..000000000 --- a/pkg/tcpip/link/sharedmem/queue/queue_test.go +++ /dev/null @@ -1,517 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package queue - -import ( - "encoding/binary" - "reflect" - "testing" - - "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -func TestBasicTxQueue(t *testing.T) { - // Tests that a basic transmit on a queue works, and that completion - // gets properly reported as well. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Enqueue two buffers. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue failed on empty queue") - } - - // Check the contents of the pipe. - d := rxp.Pull() - if d == nil { - t.Fatalf("Tx pipe is empty after Enqueue") - } - - want := []byte{ - 234, 3, 0, 0, 0, 0, 0, 0, // id - 100, 0, 0, 0, // total size - 0, 0, 0, 0, // reserved - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - } - - if !reflect.DeepEqual(want, d) { - t.Fatalf("Bad posted packet: got %v, want %v", d, want) - } - - rxp.Flush() - - // Check that there are no completions yet. - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Packet reported as completed too soon") - } - - // Post a completion. - d = txp.Push(8) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - binary.LittleEndian.PutUint64(d, usedID) - txp.Flush() - - // Check that completion is properly reported. - id, ok := q.CompletedPacket() - if !ok { - t.Fatalf("Completion not reported") - } - - if id != usedID { - t.Fatalf("Bad completion id: got %v, want %v", id, usedID) - } -} - -func TestBasicRxQueue(t *testing.T) { - // Tests that a basic receive on a queue works. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post two buffers. - b := []RxBuffer{ - {100, 60, 1077, 0}, - {200, 40, 2123, 0}, - } - - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on empty queue") - } - - // Check the contents of the pipe. - want := [][]byte{ - { - 100, 0, 0, 0, 0, 0, 0, 0, // Offset1 - 60, 0, 0, 0, // Size1 - 0, 0, 0, 0, // Remaining in group 1 - 0, 0, 0, 0, 0, 0, 0, 0, // User data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - }, - { - 200, 0, 0, 0, 0, 0, 0, 0, // Offset2 - 40, 0, 0, 0, // Size2 - 0, 0, 0, 0, // Remaining in group 2 - 0, 0, 0, 0, 0, 0, 0, 0, // User data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }, - } - - for i := range b { - d := rxp.Pull() - if d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - - if !reflect.DeepEqual(want[i], d) { - t.Fatalf("Bad posted packet: got %v, want %v", d, want[i]) - } - - rxp.Flush() - } - - // Check that there are no completions. - if _, n := q.Dequeue(nil); n != 0 { - t.Fatalf("Packet reported as received too soon") - } - - // Post a completion. - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - - // Check that completion is properly reported. - bufs, n := q.Dequeue(nil) - if n != 100 { - t.Fatalf("Bad packet size: got %v, want %v", n, 100) - } - - if !reflect.DeepEqual(bufs, b) { - t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b) - } -} - -func TestBadTxCompletion(t *testing.T) { - // Check that tx completions with bad sizes are properly ignored. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Post a completion that is too short, and check that it is ignored. - if d := txp.Push(7); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion that is too long, and check that it is ignored. - if d := txp.Push(10); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Bad completion not ignored") - } -} - -func TestBadRxCompletion(t *testing.T) { - // Check that bad rx completions are properly ignored. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post a completion that is too short, and check that it is ignored. - if d := txp.Push(7); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion whose buffer sizes add up to less than the total - // size. - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 10, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 10, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion whose buffer sizes will cause a 32-bit overflow, - // but adds up to the right number. - d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 255, 255, 255, 255, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 101, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } -} - -func TestFillTxPipe(t *testing.T) { - // Check that transmitting a new buffer when the buffer pipe is full - // fails gracefully. - pb1 := make([]byte, 104) - pb2 := make([]byte, 104) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Transmit twice, which should fill the tx pipe. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - for i := uint64(0); i < 2; i++ { - if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) { - t.Fatalf("Failed to transmit buffer") - } - } - - // Transmit another packet now that the tx pipe is full. - if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue succeeded when tx pipe is full") - } -} - -func TestFillRxPipe(t *testing.T) { - // Check that posting a new buffer when the buffer pipe is full fails - // gracefully. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post a buffer twice, it should fill the tx pipe. - b := []RxBuffer{ - {100, 60, 1077, 0}, - } - - for i := 0; i < 2; i++ { - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on non-full queue") - } - } - - // Post another buffer now that the tx pipe is full. - if q.PostBuffers(b) { - t.Fatalf("PostBuffers succeeded on full queue") - } -} - -func TestLotsOfTransmissions(t *testing.T) { - // Make sure pipes are being properly flushed when transmitting packets. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Prepare packet with two buffers. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - - // Post 100000 packets and completions. - for i := 100000; i > 0; i-- { - if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue failed on non-full queue") - } - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after Enqueue") - } - rxp.Flush() - - d := txp.Push(8) - if d == nil { - t.Fatalf("Unable to write to rx pipe") - } - binary.LittleEndian.PutUint64(d, usedID) - txp.Flush() - if _, ok := q.CompletedPacket(); !ok { - t.Fatalf("Completion not returned") - } - } -} - -func TestLotsOfReceptions(t *testing.T) { - // Make sure pipes are being properly flushed when receiving packets. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Prepare for posting two buffers. - b := []RxBuffer{ - {100, 60, 1077, 0}, - {200, 40, 2123, 0}, - } - - // Post 100000 buffers and completions. - for i := 100000; i > 0; i-- { - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on non-full queue") - } - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - rxp.Flush() - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - rxp.Flush() - - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - - if _, n := q.Dequeue(nil); n == 0 { - t.Fatalf("Dequeue failed when there is a completion") - } - } -} - -func TestRxEnableNotification(t *testing.T) { - // Check that enabling nofifications results in properly updated state. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var state uint32 - var q Rx - q.Init(pb1, pb2, &state) - - q.EnableNotification() - if state != eventFDEnabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled) - } -} - -func TestRxDisableNotification(t *testing.T) { - // Check that disabling nofifications results in properly updated state. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var state uint32 - var q Rx - q.Init(pb1, pb2, &state) - - q.DisableNotification() - if state != eventFDDisabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled) - } -} diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go deleted file mode 100644 index 696e6c9e5..000000000 --- a/pkg/tcpip/link/sharedmem/queue/rx.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package queue provides the implementation of transmit and receive queues -// based on shared memory ring buffers. -package queue - -import ( - "encoding/binary" - "sync/atomic" - - "gvisor.dev/gvisor/pkg/log" - "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -const ( - // Offsets within a posted buffer. - postedOffset = 0 - postedSize = 8 - postedRemainingInGroup = 12 - postedUserData = 16 - postedID = 24 - - sizeOfPostedBuffer = 32 - - // Offsets within a received packet header. - consumedPacketSize = 0 - consumedPacketReserved = 4 - - sizeOfConsumedPacketHeader = 8 - - // Offsets within a consumed buffer. - consumedOffset = 0 - consumedSize = 8 - consumedUserData = 12 - consumedID = 20 - - sizeOfConsumedBuffer = 28 - - // The following are the allowed states of the shared data area. - eventFDUninitialized = 0 - eventFDDisabled = 1 - eventFDEnabled = 2 -) - -// RxBuffer is the descriptor of a receive buffer. -type RxBuffer struct { - Offset uint64 - Size uint32 - ID uint64 - UserData uint64 -} - -// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx -// pipe is used to "post" buffers, while the rx pipe is used to receive packets -// whose contents have been written to previously posted buffers. -// -// This struct is thread-compatible. -type Rx struct { - tx pipe.Tx - rx pipe.Rx - sharedEventFDState *uint32 -} - -// Init initializes the receive queue with the given pipes, and shared state -// pointer -- the latter is used to enable/disable eventfd notifications. -func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { - r.sharedEventFDState = sharedEventFDState - r.tx.Init(tx) - r.rx.Init(rx) -} - -// EnableNotification updates the shared state such that the peer will notify -// the eventfd when there are packets to be dequeued. -func (r *Rx) EnableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) -} - -// DisableNotification updates the shared state such that the peer will not -// notify the eventfd. -func (r *Rx) DisableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) -} - -// PostedBuffersLimit returns the maximum number of buffers that can be posted -// before the tx queue fills up. -func (r *Rx) PostedBuffersLimit() uint64 { - return r.tx.Capacity(sizeOfPostedBuffer) -} - -// PostBuffers makes the given buffers available for receiving data from the -// peer. Once they are posted, the peer is free to write to them and will -// eventually post them back for consumption. -func (r *Rx) PostBuffers(buffers []RxBuffer) bool { - for i := range buffers { - b := r.tx.Push(sizeOfPostedBuffer) - if b == nil { - r.tx.Abort() - return false - } - - pb := &buffers[i] - binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) - binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) - binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) - binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) - binary.LittleEndian.PutUint64(b[postedID:], pb.ID) - } - - r.tx.Flush() - - return true -} - -// Dequeue receives buffers that have been previously posted by PostBuffers() -// and that have been filled by the peer and posted back. -// -// This is similar to append() in that new buffers are appended to "bufs", with -// reallocation only if "bufs" doesn't have enough capacity. -func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { - for { - outBufs := bufs - - // Pull the next descriptor from the rx pipe. - b := r.rx.Pull() - if b == nil { - return bufs, 0 - } - - if len(b) < sizeOfConsumedPacketHeader { - log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) - r.rx.Flush() - continue - } - - totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) - - // Calculate the number of buffer descriptors and copy them - // over to the output. - count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer - offset := sizeOfConsumedPacketHeader - buffersSize := uint32(0) - for i := count; i > 0; i-- { - s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) - buffersSize += s - if buffersSize < s { - // The buffer size overflows an unsigned 32-bit - // integer, so break out and force it to be - // ignored. - totalDataSize = 1 - buffersSize = 0 - break - } - - outBufs = append(outBufs, RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), - Size: s, - ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), - }) - - offset += sizeOfConsumedBuffer - } - - r.rx.Flush() - - if buffersSize < totalDataSize { - // The descriptor is corrupted, ignore it. - log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) - continue - } - - return outBufs, totalDataSize - } -} - -// Bytes returns the byte slices on which the queue operates. -func (r *Rx) Bytes() (tx, rx []byte) { - return r.tx.Bytes(), r.rx.Bytes() -} - -// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. -func DecodeRxBufferHeader(b []byte) RxBuffer { - return RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[postedOffset:]), - Size: binary.LittleEndian.Uint32(b[postedSize:]), - ID: binary.LittleEndian.Uint64(b[postedID:]), - UserData: binary.LittleEndian.Uint64(b[postedUserData:]), - } -} - -// RxCompletionSize returns the number of bytes needed to encode an rx -// completion containing "count" buffers. -func RxCompletionSize(count int) uint64 { - return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer -} - -// EncodeRxCompletion encodes an rx completion header. -func EncodeRxCompletion(b []byte, size, reserved uint32) { - binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) - binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) -} - -// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. -func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { - b = b[RxCompletionSize(i):] - binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) - binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) - binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) - binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) -} diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go deleted file mode 100644 index beffe807b..000000000 --- a/pkg/tcpip/link/sharedmem/queue/tx.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package queue - -import ( - "encoding/binary" - - "gvisor.dev/gvisor/pkg/log" - "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -const ( - // Offsets within a packet header. - packetID = 0 - packetSize = 8 - packetReserved = 12 - - sizeOfPacketHeader = 16 - - // Offsets with a buffer descriptor - bufferOffset = 0 - bufferSize = 8 - - sizeOfBufferDescriptor = 12 -) - -// TxBuffer is the descriptor of a transmit buffer. -type TxBuffer struct { - Next *TxBuffer - Offset uint64 - Size uint32 -} - -// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the -// tx pipe is used to request the transmission of packets, while the rx pipe -// is used to receive which transmissions have completed. -// -// This struct is thread-compatible. -type Tx struct { - tx pipe.Tx - rx pipe.Rx -} - -// Init initializes the transmit queue with the given pipes. -func (t *Tx) Init(tx, rx []byte) { - t.tx.Init(tx) - t.rx.Init(rx) -} - -// Enqueue queues the given linked list of buffers for transmission as one -// packet. While it is queued, the caller must not modify them. -func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool { - // Reserve room in the tx pipe. - totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor - - b := t.tx.Push(totalLen) - if b == nil { - return false - } - - // Initialize the packet and buffer descriptors. - binary.LittleEndian.PutUint64(b[packetID:], id) - binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen) - binary.LittleEndian.PutUint32(b[packetReserved:], 0) - - offset := sizeOfPacketHeader - for i := bufferCount; i != 0; i-- { - binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset) - binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size) - offset += sizeOfBufferDescriptor - buffer = buffer.Next - } - - t.tx.Flush() - - return true -} - -// CompletedPacket returns the id of the last completed transmission. The -// returned id, if any, refers to a value passed on a previous call to -// Enqueue(). -func (t *Tx) CompletedPacket() (id uint64, ok bool) { - for { - b := t.rx.Pull() - if b == nil { - return 0, false - } - - if len(b) != 8 { - t.rx.Flush() - log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8) - continue - } - - v := binary.LittleEndian.Uint64(b) - - t.rx.Flush() - - return v, true - } -} - -// Bytes returns the byte slices on which the queue operates. -func (t *Tx) Bytes() (tx, rx []byte) { - return t.tx.Bytes(), t.rx.Bytes() -} - -// TxPacketInfo holds information about a packet sent on a tx queue. -type TxPacketInfo struct { - ID uint64 - Size uint32 - Reserved uint32 - BufferCount int -} - -// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue. -func DecodeTxPacketHeader(b []byte) TxPacketInfo { - return TxPacketInfo{ - ID: binary.LittleEndian.Uint64(b[packetID:]), - Size: binary.LittleEndian.Uint32(b[packetSize:]), - Reserved: binary.LittleEndian.Uint32(b[packetReserved:]), - BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor, - } -} - -// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent -// over a tx queue. -func DecodeTxBufferHeader(b []byte, i int) TxBuffer { - b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:] - return TxBuffer{ - Offset: binary.LittleEndian.Uint64(b[bufferOffset:]), - Size: binary.LittleEndian.Uint32(b[bufferSize:]), - } -} - -// EncodeTxCompletion encodes a tx completion header. -func EncodeTxCompletion(b []byte, id uint64) { - binary.LittleEndian.PutUint64(b, id) -} |