diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/link/sharedmem/queue | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue')
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/BUILD | 28 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/queue_test.go | 507 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/rx.go | 211 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/tx.go | 141 |
4 files changed, 887 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD new file mode 100644 index 000000000..56ea4641d --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/BUILD @@ -0,0 +1,28 @@ +package(licenses = ["notice"]) # BSD + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "queue", + srcs = [ + "rx.go", + "tx.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue", + visibility = ["//:sandbox"], + deps = [ + "//pkg/log", + "//pkg/tcpip/link/sharedmem/pipe", + ], +) + +go_test( + name = "queue_test", + srcs = [ + "queue_test.go", + ], + embed = [":queue"], + deps = [ + "//pkg/tcpip/link/sharedmem/pipe", + ], +) diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go new file mode 100644 index 000000000..b022c389c --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go @@ -0,0 +1,507 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/binary" + "reflect" + "testing" + + "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" +) + +func TestBasicTxQueue(t *testing.T) { + // Tests that a basic transmit on a queue works, and that completion + // gets properly reported as well. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Tx + q.Init(pb1, pb2) + + // Enqueue two buffers. + b := []TxBuffer{ + {nil, 100, 60}, + {nil, 200, 40}, + } + + b[0].Next = &b[1] + + const usedID = 1002 + const usedTotalSize = 100 + if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { + t.Fatalf("Enqueue failed on empty queue") + } + + // Check the contents of the pipe. + d := rxp.Pull() + if d == nil { + t.Fatalf("Tx pipe is empty after Enqueue") + } + + want := []byte{ + 234, 3, 0, 0, 0, 0, 0, 0, // id + 100, 0, 0, 0, // total size + 0, 0, 0, 0, // reserved + 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 + 60, 0, 0, 0, // size 1 + 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 + 40, 0, 0, 0, // size 2 + } + + if !reflect.DeepEqual(want, d) { + t.Fatalf("Bad posted packet: got %v, want %v", d, want) + } + + rxp.Flush() + + // Check that there are no completions yet. + if _, ok := q.CompletedPacket(); ok { + t.Fatalf("Packet reported as completed too soon") + } + + // Post a completion. + d = txp.Push(8) + if d == nil { + t.Fatalf("Unable to push to rx pipe") + } + binary.LittleEndian.PutUint64(d, usedID) + txp.Flush() + + // Check that completion is properly reported. + id, ok := q.CompletedPacket() + if !ok { + t.Fatalf("Completion not reported") + } + + if id != usedID { + t.Fatalf("Bad completion id: got %v, want %v", id, usedID) + } +} + +func TestBasicRxQueue(t *testing.T) { + // Tests that a basic receive on a queue works. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Rx + q.Init(pb1, pb2, nil) + + // Post two buffers. + b := []RxBuffer{ + {100, 60, 1077, 0}, + {200, 40, 2123, 0}, + } + + if !q.PostBuffers(b) { + t.Fatalf("PostBuffers failed on empty queue") + } + + // Check the contents of the pipe. + want := [][]byte{ + { + 100, 0, 0, 0, 0, 0, 0, 0, // Offset1 + 60, 0, 0, 0, // Size1 + 0, 0, 0, 0, // Remaining in group 1 + 0, 0, 0, 0, 0, 0, 0, 0, // User data 1 + 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 + }, + { + 200, 0, 0, 0, 0, 0, 0, 0, // Offset2 + 40, 0, 0, 0, // Size2 + 0, 0, 0, 0, // Remaining in group 2 + 0, 0, 0, 0, 0, 0, 0, 0, // User data 2 + 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 + }, + } + + for i := range b { + d := rxp.Pull() + if d == nil { + t.Fatalf("Tx pipe is empty after PostBuffers") + } + + if !reflect.DeepEqual(want[i], d) { + t.Fatalf("Bad posted packet: got %v, want %v", d, want[i]) + } + + rxp.Flush() + } + + // Check that there are no completions. + if _, n := q.Dequeue(nil); n != 0 { + t.Fatalf("Packet reported as received too soon") + } + + // Post a completion. + d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) + if d == nil { + t.Fatalf("Unable to push to rx pipe") + } + + copy(d, []byte{ + 100, 0, 0, 0, // packet size + 0, 0, 0, 0, // reserved + + 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 + 60, 0, 0, 0, // size 1 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 + 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 + + 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 + 40, 0, 0, 0, // size 2 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 + 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 + }) + + txp.Flush() + + // Check that completion is properly reported. + bufs, n := q.Dequeue(nil) + if n != 100 { + t.Fatalf("Bad packet size: got %v, want %v", n, 100) + } + + if !reflect.DeepEqual(bufs, b) { + t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b) + } +} + +func TestBadTxCompletion(t *testing.T) { + // Check that tx completions with bad sizes are properly ignored. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Tx + q.Init(pb1, pb2) + + // Post a completion that is too short, and check that it is ignored. + if d := txp.Push(7); d == nil { + t.Fatalf("Unable to push to rx pipe") + } + txp.Flush() + + if _, ok := q.CompletedPacket(); ok { + t.Fatalf("Bad completion not ignored") + } + + // Post a completion that is too long, and check that it is ignored. + if d := txp.Push(10); d == nil { + t.Fatalf("Unable to push to rx pipe") + } + txp.Flush() + + if _, ok := q.CompletedPacket(); ok { + t.Fatalf("Bad completion not ignored") + } +} + +func TestBadRxCompletion(t *testing.T) { + // Check that bad rx completions are properly ignored. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Rx + q.Init(pb1, pb2, nil) + + // Post a completion that is too short, and check that it is ignored. + if d := txp.Push(7); d == nil { + t.Fatalf("Unable to push to rx pipe") + } + txp.Flush() + + if b, _ := q.Dequeue(nil); b != nil { + t.Fatalf("Bad completion not ignored") + } + + // Post a completion whose buffer sizes add up to less than the total + // size. + d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) + if d == nil { + t.Fatalf("Unable to push to rx pipe") + } + + copy(d, []byte{ + 100, 0, 0, 0, // packet size + 0, 0, 0, 0, // reserved + + 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 + 10, 0, 0, 0, // size 1 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 + 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 + + 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 + 10, 0, 0, 0, // size 2 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 + 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 + }) + + txp.Flush() + if b, _ := q.Dequeue(nil); b != nil { + t.Fatalf("Bad completion not ignored") + } + + // Post a completion whose buffer sizes will cause a 32-bit overflow, + // but adds up to the right number. + d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) + if d == nil { + t.Fatalf("Unable to push to rx pipe") + } + + copy(d, []byte{ + 100, 0, 0, 0, // packet size + 0, 0, 0, 0, // reserved + + 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 + 255, 255, 255, 255, // size 1 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 + 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 + + 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 + 101, 0, 0, 0, // size 2 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 + 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 + }) + + txp.Flush() + if b, _ := q.Dequeue(nil); b != nil { + t.Fatalf("Bad completion not ignored") + } +} + +func TestFillTxPipe(t *testing.T) { + // Check that transmitting a new buffer when the buffer pipe is full + // fails gracefully. + pb1 := make([]byte, 104) + pb2 := make([]byte, 104) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Tx + q.Init(pb1, pb2) + + // Transmit twice, which should fill the tx pipe. + b := []TxBuffer{ + {nil, 100, 60}, + {nil, 200, 40}, + } + + b[0].Next = &b[1] + + const usedID = 1002 + const usedTotalSize = 100 + for i := uint64(0); i < 2; i++ { + if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) { + t.Fatalf("Failed to transmit buffer") + } + } + + // Transmit another packet now that the tx pipe is full. + if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) { + t.Fatalf("Enqueue succeeded when tx pipe is full") + } +} + +func TestFillRxPipe(t *testing.T) { + // Check that posting a new buffer when the buffer pipe is full fails + // gracefully. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Rx + q.Init(pb1, pb2, nil) + + // Post a buffer twice, it should fill the tx pipe. + b := []RxBuffer{ + {100, 60, 1077, 0}, + } + + for i := 0; i < 2; i++ { + if !q.PostBuffers(b) { + t.Fatalf("PostBuffers failed on non-full queue") + } + } + + // Post another buffer now that the tx pipe is full. + if q.PostBuffers(b) { + t.Fatalf("PostBuffers succeeded on full queue") + } +} + +func TestLotsOfTransmissions(t *testing.T) { + // Make sure pipes are being properly flushed when transmitting packets. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Tx + q.Init(pb1, pb2) + + // Prepare packet with two buffers. + b := []TxBuffer{ + {nil, 100, 60}, + {nil, 200, 40}, + } + + b[0].Next = &b[1] + + const usedID = 1002 + const usedTotalSize = 100 + + // Post 100000 packets and completions. + for i := 100000; i > 0; i-- { + if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { + t.Fatalf("Enqueue failed on non-full queue") + } + + if d := rxp.Pull(); d == nil { + t.Fatalf("Tx pipe is empty after Enqueue") + } + rxp.Flush() + + d := txp.Push(8) + if d == nil { + t.Fatalf("Unable to write to rx pipe") + } + binary.LittleEndian.PutUint64(d, usedID) + txp.Flush() + if _, ok := q.CompletedPacket(); !ok { + t.Fatalf("Completion not returned") + } + } +} + +func TestLotsOfReceptions(t *testing.T) { + // Make sure pipes are being properly flushed when receiving packets. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var rxp pipe.Rx + rxp.Init(pb1) + + var txp pipe.Tx + txp.Init(pb2) + + var q Rx + q.Init(pb1, pb2, nil) + + // Prepare for posting two buffers. + b := []RxBuffer{ + {100, 60, 1077, 0}, + {200, 40, 2123, 0}, + } + + // Post 100000 buffers and completions. + for i := 100000; i > 0; i-- { + if !q.PostBuffers(b) { + t.Fatalf("PostBuffers failed on non-full queue") + } + + if d := rxp.Pull(); d == nil { + t.Fatalf("Tx pipe is empty after PostBuffers") + } + rxp.Flush() + + if d := rxp.Pull(); d == nil { + t.Fatalf("Tx pipe is empty after PostBuffers") + } + rxp.Flush() + + d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) + if d == nil { + t.Fatalf("Unable to push to rx pipe") + } + + copy(d, []byte{ + 100, 0, 0, 0, // packet size + 0, 0, 0, 0, // reserved + + 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 + 60, 0, 0, 0, // size 1 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 + 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 + + 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 + 40, 0, 0, 0, // size 2 + 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 + 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 + }) + + txp.Flush() + + if _, n := q.Dequeue(nil); n == 0 { + t.Fatalf("Dequeue failed when there is a completion") + } + } +} + +func TestRxEnableNotification(t *testing.T) { + // Check that enabling nofifications results in properly updated state. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var state uint32 + var q Rx + q.Init(pb1, pb2, &state) + + q.EnableNotification() + if state != eventFDEnabled { + t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled) + } +} + +func TestRxDisableNotification(t *testing.T) { + // Check that disabling nofifications results in properly updated state. + pb1 := make([]byte, 100) + pb2 := make([]byte, 100) + + var state uint32 + var q Rx + q.Init(pb1, pb2, &state) + + q.DisableNotification() + if state != eventFDDisabled { + t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled) + } +} diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go new file mode 100644 index 000000000..91bb57190 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/rx.go @@ -0,0 +1,211 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package queue provides the implementation of transmit and receive queues +// based on shared memory ring buffers. +package queue + +import ( + "encoding/binary" + "sync/atomic" + + "gvisor.googlesource.com/gvisor/pkg/log" + "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" +) + +const ( + // Offsets within a posted buffer. + postedOffset = 0 + postedSize = 8 + postedRemainingInGroup = 12 + postedUserData = 16 + postedID = 24 + + sizeOfPostedBuffer = 32 + + // Offsets within a received packet header. + consumedPacketSize = 0 + consumedPacketReserved = 4 + + sizeOfConsumedPacketHeader = 8 + + // Offsets within a consumed buffer. + consumedOffset = 0 + consumedSize = 8 + consumedUserData = 12 + consumedID = 20 + + sizeOfConsumedBuffer = 28 + + // The following are the allowed states of the shared data area. + eventFDUninitialized = 0 + eventFDDisabled = 1 + eventFDEnabled = 2 +) + +// RxBuffer is the descriptor of a receive buffer. +type RxBuffer struct { + Offset uint64 + Size uint32 + ID uint64 + UserData uint64 +} + +// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx +// pipe is used to "post" buffers, while the rx pipe is used to receive packets +// whose contents have been written to previously posted buffers. +// +// This struct is thread-compatible. +type Rx struct { + tx pipe.Tx + rx pipe.Rx + sharedEventFDState *uint32 +} + +// Init initializes the receive queue with the given pipes, and shared state +// pointer -- the latter is used to enable/disable eventfd notifications. +func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { + r.sharedEventFDState = sharedEventFDState + r.tx.Init(tx) + r.rx.Init(rx) +} + +// EnableNotification updates the shared state such that the peer will notify +// the eventfd when there are packets to be dequeued. +func (r *Rx) EnableNotification() { + atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) +} + +// DisableNotification updates the shared state such that the peer will not +// notify the eventfd. +func (r *Rx) DisableNotification() { + atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) +} + +// PostedBuffersLimit returns the maximum number of buffers that can be posted +// before the tx queue fills up. +func (r *Rx) PostedBuffersLimit() uint64 { + return r.tx.Capacity(sizeOfPostedBuffer) +} + +// PostBuffers makes the given buffers available for receiving data from the +// peer. Once they are posted, the peer is free to write to them and will +// eventually post them back for consumption. +func (r *Rx) PostBuffers(buffers []RxBuffer) bool { + for i := range buffers { + b := r.tx.Push(sizeOfPostedBuffer) + if b == nil { + r.tx.Abort() + return false + } + + pb := &buffers[i] + binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) + binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) + binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) + binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) + binary.LittleEndian.PutUint64(b[postedID:], pb.ID) + } + + r.tx.Flush() + + return true +} + +// Dequeue receives buffers that have been previously posted by PostBuffers() +// and that have been filled by the peer and posted back. +// +// This is similar to append() in that new buffers are appended to "bufs", with +// reallocation only if "bufs" doesn't have enough capacity. +func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { + for { + outBufs := bufs + + // Pull the next descriptor from the rx pipe. + b := r.rx.Pull() + if b == nil { + return bufs, 0 + } + + if len(b) < sizeOfConsumedPacketHeader { + log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) + r.rx.Flush() + continue + } + + totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) + + // Calculate the number of buffer descriptors and copy them + // over to the output. + count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer + offset := sizeOfConsumedPacketHeader + buffersSize := uint32(0) + for i := count; i > 0; i-- { + s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) + buffersSize += s + if buffersSize < s { + // The buffer size overflows an unsigned 32-bit + // integer, so break out and force it to be + // ignored. + totalDataSize = 1 + buffersSize = 0 + break + } + + outBufs = append(outBufs, RxBuffer{ + Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), + Size: s, + ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), + }) + + offset += sizeOfConsumedBuffer + } + + r.rx.Flush() + + if buffersSize < totalDataSize { + // The descriptor is corrupted, ignore it. + log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) + continue + } + + return outBufs, totalDataSize + } +} + +// Bytes returns the byte slices on which the queue operates. +func (r *Rx) Bytes() (tx, rx []byte) { + return r.tx.Bytes(), r.rx.Bytes() +} + +// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. +func DecodeRxBufferHeader(b []byte) RxBuffer { + return RxBuffer{ + Offset: binary.LittleEndian.Uint64(b[postedOffset:]), + Size: binary.LittleEndian.Uint32(b[postedSize:]), + ID: binary.LittleEndian.Uint64(b[postedID:]), + UserData: binary.LittleEndian.Uint64(b[postedUserData:]), + } +} + +// RxCompletionSize returns the number of bytes needed to encode an rx +// completion containing "count" buffers. +func RxCompletionSize(count int) uint64 { + return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer +} + +// EncodeRxCompletion encodes an rx completion header. +func EncodeRxCompletion(b []byte, size, reserved uint32) { + binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) + binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) +} + +// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. +func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { + b = b[RxCompletionSize(i):] + binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) + binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) + binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) + binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) +} diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go new file mode 100644 index 000000000..b04fb163b --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/tx.go @@ -0,0 +1,141 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "encoding/binary" + + "gvisor.googlesource.com/gvisor/pkg/log" + "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" +) + +const ( + // Offsets within a packet header. + packetID = 0 + packetSize = 8 + packetReserved = 12 + + sizeOfPacketHeader = 16 + + // Offsets with a buffer descriptor + bufferOffset = 0 + bufferSize = 8 + + sizeOfBufferDescriptor = 12 +) + +// TxBuffer is the descriptor of a transmit buffer. +type TxBuffer struct { + Next *TxBuffer + Offset uint64 + Size uint32 +} + +// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the +// tx pipe is used to request the transmission of packets, while the rx pipe +// is used to receive which transmissions have completed. +// +// This struct is thread-compatible. +type Tx struct { + tx pipe.Tx + rx pipe.Rx +} + +// Init initializes the transmit queue with the given pipes. +func (t *Tx) Init(tx, rx []byte) { + t.tx.Init(tx) + t.rx.Init(rx) +} + +// Enqueue queues the given linked list of buffers for transmission as one +// packet. While it is queued, the caller must not modify them. +func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool { + // Reserve room in the tx pipe. + totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor + + b := t.tx.Push(totalLen) + if b == nil { + return false + } + + // Initialize the packet and buffer descriptors. + binary.LittleEndian.PutUint64(b[packetID:], id) + binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen) + binary.LittleEndian.PutUint32(b[packetReserved:], 0) + + offset := sizeOfPacketHeader + for i := bufferCount; i != 0; i-- { + binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset) + binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size) + offset += sizeOfBufferDescriptor + buffer = buffer.Next + } + + t.tx.Flush() + + return true +} + +// CompletedPacket returns the id of the last completed transmission. The +// returned id, if any, refers to a value passed on a previous call to +// Enqueue(). +func (t *Tx) CompletedPacket() (id uint64, ok bool) { + for { + b := t.rx.Pull() + if b == nil { + return 0, false + } + + if len(b) != 8 { + t.rx.Flush() + log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8) + continue + } + + v := binary.LittleEndian.Uint64(b) + + t.rx.Flush() + + return v, true + } +} + +// Bytes returns the byte slices on which the queue operates. +func (t *Tx) Bytes() (tx, rx []byte) { + return t.tx.Bytes(), t.rx.Bytes() +} + +// TxPacketInfo holds information about a packet sent on a tx queue. +type TxPacketInfo struct { + ID uint64 + Size uint32 + Reserved uint32 + BufferCount int +} + +// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue. +func DecodeTxPacketHeader(b []byte) TxPacketInfo { + return TxPacketInfo{ + ID: binary.LittleEndian.Uint64(b[packetID:]), + Size: binary.LittleEndian.Uint32(b[packetSize:]), + Reserved: binary.LittleEndian.Uint32(b[packetReserved:]), + BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor, + } +} + +// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent +// over a tx queue. +func DecodeTxBufferHeader(b []byte, i int) TxBuffer { + b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:] + return TxBuffer{ + Offset: binary.LittleEndian.Uint64(b[bufferOffset:]), + Size: binary.LittleEndian.Uint32(b[bufferSize:]), + } +} + +// EncodeTxCompletion encodes a tx completion header. +func EncodeTxCompletion(b []byte, id uint64) { + binary.LittleEndian.PutUint64(b, id) +} |