path: root/pkg/tcpip/link/sharedmem/queue
diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue')
4 files changed, 887 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD
new file mode 100644
index 000000000..56ea4641d
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/BUILD
@@ -0,0 +1,28 @@
+package(licenses = ["notice"]) # BSD
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+ name = "queue",
+ srcs = [
+ "rx.go",
+ "tx.go",
+ ],
+ importpath = "",
+ visibility = ["//:sandbox"],
+ deps = [
+ "//pkg/log",
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
+ name = "queue_test",
+ srcs = [
+ "queue_test.go",
+ ],
+ embed = [":queue"],
+ deps = [
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
new file mode 100644
index 000000000..b022c389c
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go
@@ -0,0 +1,507 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+package queue
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+ ""
+func TestBasicTxQueue(t *testing.T) {
+ // Tests that a basic transmit on a queue works, and that completion
+ // gets properly reported as well.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Tx
+ q.Init(pb1, pb2)
+ // Enqueue two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+ b[0].Next = &b[1]
+ const usedID = 1002
+ const usedTotalSize = 100
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on empty queue")
+ }
+ // Check the contents of the pipe.
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+ want := []byte{
+ 234, 3, 0, 0, 0, 0, 0, 0, // id
+ 100, 0, 0, 0, // total size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ }
+ if !reflect.DeepEqual(want, d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want)
+ }
+ rxp.Flush()
+ // Check that there are no completions yet.
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Packet reported as completed too soon")
+ }
+ // Post a completion.
+ d = txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+ // Check that completion is properly reported.
+ id, ok := q.CompletedPacket()
+ if !ok {
+ t.Fatalf("Completion not reported")
+ }
+ if id != usedID {
+ t.Fatalf("Bad completion id: got %v, want %v", id, usedID)
+ }
+func TestBasicRxQueue(t *testing.T) {
+ // Tests that a basic receive on a queue works.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Rx
+ q.Init(pb1, pb2, nil)
+ // Post two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on empty queue")
+ }
+ // Check the contents of the pipe.
+ want := [][]byte{
+ {
+ 100, 0, 0, 0, 0, 0, 0, 0, // Offset1
+ 60, 0, 0, 0, // Size1
+ 0, 0, 0, 0, // Remaining in group 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ },
+ {
+ 200, 0, 0, 0, 0, 0, 0, 0, // Offset2
+ 40, 0, 0, 0, // Size2
+ 0, 0, 0, 0, // Remaining in group 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ },
+ }
+ for i := range b {
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ if !reflect.DeepEqual(want[i], d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want[i])
+ }
+ rxp.Flush()
+ }
+ // Check that there are no completions.
+ if _, n := q.Dequeue(nil); n != 0 {
+ t.Fatalf("Packet reported as received too soon")
+ }
+ // Post a completion.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+ txp.Flush()
+ // Check that completion is properly reported.
+ bufs, n := q.Dequeue(nil)
+ if n != 100 {
+ t.Fatalf("Bad packet size: got %v, want %v", n, 100)
+ }
+ if !reflect.DeepEqual(bufs, b) {
+ t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b)
+ }
+func TestBadTxCompletion(t *testing.T) {
+ // Check that tx completions with bad sizes are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Tx
+ q.Init(pb1, pb2)
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+ // Post a completion that is too long, and check that it is ignored.
+ if d := txp.Push(10); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+func TestBadRxCompletion(t *testing.T) {
+ // Check that bad rx completions are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Rx
+ q.Init(pb1, pb2, nil)
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+ // Post a completion whose buffer sizes add up to less than the total
+ // size.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 10, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 10, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+ // Post a completion whose buffer sizes will cause a 32-bit overflow,
+ // but adds up to the right number.
+ d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 255, 255, 255, 255, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 101, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+func TestFillTxPipe(t *testing.T) {
+ // Check that transmitting a new buffer when the buffer pipe is full
+ // fails gracefully.
+ pb1 := make([]byte, 104)
+ pb2 := make([]byte, 104)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Tx
+ q.Init(pb1, pb2)
+ // Transmit twice, which should fill the tx pipe.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+ b[0].Next = &b[1]
+ const usedID = 1002
+ const usedTotalSize = 100
+ for i := uint64(0); i < 2; i++ {
+ if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Failed to transmit buffer")
+ }
+ }
+ // Transmit another packet now that the tx pipe is full.
+ if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue succeeded when tx pipe is full")
+ }
+func TestFillRxPipe(t *testing.T) {
+ // Check that posting a new buffer when the buffer pipe is full fails
+ // gracefully.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Rx
+ q.Init(pb1, pb2, nil)
+ // Post a buffer twice, it should fill the tx pipe.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ }
+ for i := 0; i < 2; i++ {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+ }
+ // Post another buffer now that the tx pipe is full.
+ if q.PostBuffers(b) {
+ t.Fatalf("PostBuffers succeeded on full queue")
+ }
+func TestLotsOfTransmissions(t *testing.T) {
+ // Make sure pipes are being properly flushed when transmitting packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Tx
+ q.Init(pb1, pb2)
+ // Prepare packet with two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+ b[0].Next = &b[1]
+ const usedID = 1002
+ const usedTotalSize = 100
+ // Post 100000 packets and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on non-full queue")
+ }
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+ rxp.Flush()
+ d := txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to write to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+ if _, ok := q.CompletedPacket(); !ok {
+ t.Fatalf("Completion not returned")
+ }
+ }
+func TestLotsOfReceptions(t *testing.T) {
+ // Make sure pipes are being properly flushed when receiving packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+ var txp pipe.Tx
+ txp.Init(pb2)
+ var q Rx
+ q.Init(pb1, pb2, nil)
+ // Prepare for posting two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+ // Post 100000 buffers and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+ txp.Flush()
+ if _, n := q.Dequeue(nil); n == 0 {
+ t.Fatalf("Dequeue failed when there is a completion")
+ }
+ }
+func TestRxEnableNotification(t *testing.T) {
+ // Check that enabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+ q.EnableNotification()
+ if state != eventFDEnabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
+ }
+func TestRxDisableNotification(t *testing.T) {
+ // Check that disabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+ q.DisableNotification()
+ if state != eventFDDisabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
+ }
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
new file mode 100644
index 000000000..91bb57190
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/rx.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Package queue provides the implementation of transmit and receive queues
+// based on shared memory ring buffers.
+package queue
+import (
+ "encoding/binary"
+ "sync/atomic"
+ ""
+ ""
+const (
+ // Offsets within a posted buffer.
+ postedOffset = 0
+ postedSize = 8
+ postedRemainingInGroup = 12
+ postedUserData = 16
+ postedID = 24
+ sizeOfPostedBuffer = 32
+ // Offsets within a received packet header.
+ consumedPacketSize = 0
+ consumedPacketReserved = 4
+ sizeOfConsumedPacketHeader = 8
+ // Offsets within a consumed buffer.
+ consumedOffset = 0
+ consumedSize = 8
+ consumedUserData = 12
+ consumedID = 20
+ sizeOfConsumedBuffer = 28
+ // The following are the allowed states of the shared data area.
+ eventFDUninitialized = 0
+ eventFDDisabled = 1
+ eventFDEnabled = 2
+// RxBuffer is the descriptor of a receive buffer.
+type RxBuffer struct {
+ Offset uint64
+ Size uint32
+ ID uint64
+ UserData uint64
+// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
+// pipe is used to "post" buffers, while the rx pipe is used to receive packets
+// whose contents have been written to previously posted buffers.
+// This struct is thread-compatible.
+type Rx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+ sharedEventFDState *uint32
+// Init initializes the receive queue with the given pipes, and shared state
+// pointer -- the latter is used to enable/disable eventfd notifications.
+func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
+ r.sharedEventFDState = sharedEventFDState
+ r.tx.Init(tx)
+ r.rx.Init(rx)
+// EnableNotification updates the shared state such that the peer will notify
+// the eventfd when there are packets to be dequeued.
+func (r *Rx) EnableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
+// DisableNotification updates the shared state such that the peer will not
+// notify the eventfd.
+func (r *Rx) DisableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
+// PostedBuffersLimit returns the maximum number of buffers that can be posted
+// before the tx queue fills up.
+func (r *Rx) PostedBuffersLimit() uint64 {
+ return r.tx.Capacity(sizeOfPostedBuffer)
+// PostBuffers makes the given buffers available for receiving data from the
+// peer. Once they are posted, the peer is free to write to them and will
+// eventually post them back for consumption.
+func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
+ for i := range buffers {
+ b := r.tx.Push(sizeOfPostedBuffer)
+ if b == nil {
+ r.tx.Abort()
+ return false
+ }
+ pb := &buffers[i]
+ binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
+ binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
+ binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
+ binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
+ binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
+ }
+ r.tx.Flush()
+ return true
+// Dequeue receives buffers that have been previously posted by PostBuffers()
+// and that have been filled by the peer and posted back.
+// This is similar to append() in that new buffers are appended to "bufs", with
+// reallocation only if "bufs" doesn't have enough capacity.
+func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
+ for {
+ outBufs := bufs
+ // Pull the next descriptor from the rx pipe.
+ b := r.rx.Pull()
+ if b == nil {
+ return bufs, 0
+ }
+ if len(b) < sizeOfConsumedPacketHeader {
+ log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
+ r.rx.Flush()
+ continue
+ }
+ totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
+ // Calculate the number of buffer descriptors and copy them
+ // over to the output.
+ count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
+ offset := sizeOfConsumedPacketHeader
+ buffersSize := uint32(0)
+ for i := count; i > 0; i-- {
+ s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
+ buffersSize += s
+ if buffersSize < s {
+ // The buffer size overflows an unsigned 32-bit
+ // integer, so break out and force it to be
+ // ignored.
+ totalDataSize = 1
+ buffersSize = 0
+ break
+ }
+ outBufs = append(outBufs, RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
+ Size: s,
+ ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
+ })
+ offset += sizeOfConsumedBuffer
+ }
+ r.rx.Flush()
+ if buffersSize < totalDataSize {
+ // The descriptor is corrupted, ignore it.
+ log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
+ continue
+ }
+ return outBufs, totalDataSize
+ }
+// Bytes returns the byte slices on which the queue operates.
+func (r *Rx) Bytes() (tx, rx []byte) {
+ return r.tx.Bytes(), r.rx.Bytes()
+// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
+func DecodeRxBufferHeader(b []byte) RxBuffer {
+ return RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
+ Size: binary.LittleEndian.Uint32(b[postedSize:]),
+ ID: binary.LittleEndian.Uint64(b[postedID:]),
+ UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
+ }
+// RxCompletionSize returns the number of bytes needed to encode an rx
+// completion containing "count" buffers.
+func RxCompletionSize(count int) uint64 {
+ return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
+// EncodeRxCompletion encodes an rx completion header.
+func EncodeRxCompletion(b []byte, size, reserved uint32) {
+ binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
+ binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
+// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
+func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
+ b = b[RxCompletionSize(i):]
+ binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
+ binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
+ binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
+ binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
new file mode 100644
index 000000000..b04fb163b
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/tx.go
@@ -0,0 +1,141 @@
+// Copyright 2016 The Netstack Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+package queue
+import (
+ "encoding/binary"
+ ""
+ ""
+const (
+ // Offsets within a packet header.
+ packetID = 0
+ packetSize = 8
+ packetReserved = 12
+ sizeOfPacketHeader = 16
+ // Offsets with a buffer descriptor
+ bufferOffset = 0
+ bufferSize = 8
+ sizeOfBufferDescriptor = 12
+// TxBuffer is the descriptor of a transmit buffer.
+type TxBuffer struct {
+ Next *TxBuffer
+ Offset uint64
+ Size uint32
+// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the
+// tx pipe is used to request the transmission of packets, while the rx pipe
+// is used to receive which transmissions have completed.
+// This struct is thread-compatible.
+type Tx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+// Init initializes the transmit queue with the given pipes.
+func (t *Tx) Init(tx, rx []byte) {
+ t.tx.Init(tx)
+ t.rx.Init(rx)
+// Enqueue queues the given linked list of buffers for transmission as one
+// packet. While it is queued, the caller must not modify them.
+func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool {
+ // Reserve room in the tx pipe.
+ totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor
+ b := t.tx.Push(totalLen)
+ if b == nil {
+ return false
+ }
+ // Initialize the packet and buffer descriptors.
+ binary.LittleEndian.PutUint64(b[packetID:], id)
+ binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen)
+ binary.LittleEndian.PutUint32(b[packetReserved:], 0)
+ offset := sizeOfPacketHeader
+ for i := bufferCount; i != 0; i-- {
+ binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset)
+ binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size)
+ offset += sizeOfBufferDescriptor
+ buffer = buffer.Next
+ }
+ t.tx.Flush()
+ return true
+// CompletedPacket returns the id of the last completed transmission. The
+// returned id, if any, refers to a value passed on a previous call to
+// Enqueue().
+func (t *Tx) CompletedPacket() (id uint64, ok bool) {
+ for {
+ b := t.rx.Pull()
+ if b == nil {
+ return 0, false
+ }
+ if len(b) != 8 {
+ t.rx.Flush()
+ log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8)
+ continue
+ }
+ v := binary.LittleEndian.Uint64(b)
+ t.rx.Flush()
+ return v, true
+ }
+// Bytes returns the byte slices on which the queue operates.
+func (t *Tx) Bytes() (tx, rx []byte) {
+ return t.tx.Bytes(), t.rx.Bytes()
+// TxPacketInfo holds information about a packet sent on a tx queue.
+type TxPacketInfo struct {
+ ID uint64
+ Size uint32
+ Reserved uint32
+ BufferCount int
+// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue.
+func DecodeTxPacketHeader(b []byte) TxPacketInfo {
+ return TxPacketInfo{
+ ID: binary.LittleEndian.Uint64(b[packetID:]),
+ Size: binary.LittleEndian.Uint32(b[packetSize:]),
+ Reserved: binary.LittleEndian.Uint32(b[packetReserved:]),
+ BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor,
+ }
+// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent
+// over a tx queue.
+func DecodeTxBufferHeader(b []byte, i int) TxBuffer {
+ b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:]
+ return TxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[bufferOffset:]),
+ Size: binary.LittleEndian.Uint32(b[bufferSize:]),
+ }
+// EncodeTxCompletion encodes a tx completion header.
+func EncodeTxCompletion(b []byte, id uint64) {
+ binary.LittleEndian.PutUint64(b, id)