diff options
author | gVisor bot <gvisor-bot@google.com> | 2019-12-11 22:25:10 +0000 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2019-12-11 22:25:10 +0000 |
commit | 97aa9227305245fb9908e51e8b29cc677dafebd6 (patch) | |
tree | 6066e900938958fe049cc7157c99041c4e12ef50 /pkg/tcpip/link/sharedmem/queue | |
parent | c216ff474c8b6ba6dfd5da23ed6b30bb80ea7068 (diff) | |
parent | 0d027262e09184f61ea0707935534fc2fc4af7e7 (diff) |
Merge release-20191129.0-48-g0d02726 (automated)
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue')
-rwxr-xr-x | pkg/tcpip/link/sharedmem/queue/queue_state_autogen.go | 4 | ||||
-rwxr-xr-x | pkg/tcpip/link/sharedmem/queue/rx.go | 221 | ||||
-rwxr-xr-x | pkg/tcpip/link/sharedmem/queue/tx.go | 151 |
3 files changed, 376 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_state_autogen.go b/pkg/tcpip/link/sharedmem/queue/queue_state_autogen.go new file mode 100755 index 000000000..eec17d734 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/queue_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package queue + diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go new file mode 100755 index 000000000..696e6c9e5 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/rx.go @@ -0,0 +1,221 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package queue provides the implementation of transmit and receive queues +// based on shared memory ring buffers. +package queue + +import ( + "encoding/binary" + "sync/atomic" + + "gvisor.dev/gvisor/pkg/log" + "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" +) + +const ( + // Offsets within a posted buffer. + postedOffset = 0 + postedSize = 8 + postedRemainingInGroup = 12 + postedUserData = 16 + postedID = 24 + + sizeOfPostedBuffer = 32 + + // Offsets within a received packet header. + consumedPacketSize = 0 + consumedPacketReserved = 4 + + sizeOfConsumedPacketHeader = 8 + + // Offsets within a consumed buffer. + consumedOffset = 0 + consumedSize = 8 + consumedUserData = 12 + consumedID = 20 + + sizeOfConsumedBuffer = 28 + + // The following are the allowed states of the shared data area. + eventFDUninitialized = 0 + eventFDDisabled = 1 + eventFDEnabled = 2 +) + +// RxBuffer is the descriptor of a receive buffer. +type RxBuffer struct { + Offset uint64 + Size uint32 + ID uint64 + UserData uint64 +} + +// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx +// pipe is used to "post" buffers, while the rx pipe is used to receive packets +// whose contents have been written to previously posted buffers. +// +// This struct is thread-compatible. +type Rx struct { + tx pipe.Tx + rx pipe.Rx + sharedEventFDState *uint32 +} + +// Init initializes the receive queue with the given pipes, and shared state +// pointer -- the latter is used to enable/disable eventfd notifications. +func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { + r.sharedEventFDState = sharedEventFDState + r.tx.Init(tx) + r.rx.Init(rx) +} + +// EnableNotification updates the shared state such that the peer will notify +// the eventfd when there are packets to be dequeued. +func (r *Rx) EnableNotification() { + atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) +} + +// DisableNotification updates the shared state such that the peer will not +// notify the eventfd. +func (r *Rx) DisableNotification() { + atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) +} + +// PostedBuffersLimit returns the maximum number of buffers that can be posted +// before the tx queue fills up. +func (r *Rx) PostedBuffersLimit() uint64 { + return r.tx.Capacity(sizeOfPostedBuffer) +} + +// PostBuffers makes the given buffers available for receiving data from the +// peer. Once they are posted, the peer is free to write to them and will +// eventually post them back for consumption. +func (r *Rx) PostBuffers(buffers []RxBuffer) bool { + for i := range buffers { + b := r.tx.Push(sizeOfPostedBuffer) + if b == nil { + r.tx.Abort() + return false + } + + pb := &buffers[i] + binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) + binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) + binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) + binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) + binary.LittleEndian.PutUint64(b[postedID:], pb.ID) + } + + r.tx.Flush() + + return true +} + +// Dequeue receives buffers that have been previously posted by PostBuffers() +// and that have been filled by the peer and posted back. +// +// This is similar to append() in that new buffers are appended to "bufs", with +// reallocation only if "bufs" doesn't have enough capacity. +func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { + for { + outBufs := bufs + + // Pull the next descriptor from the rx pipe. + b := r.rx.Pull() + if b == nil { + return bufs, 0 + } + + if len(b) < sizeOfConsumedPacketHeader { + log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) + r.rx.Flush() + continue + } + + totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) + + // Calculate the number of buffer descriptors and copy them + // over to the output. + count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer + offset := sizeOfConsumedPacketHeader + buffersSize := uint32(0) + for i := count; i > 0; i-- { + s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) + buffersSize += s + if buffersSize < s { + // The buffer size overflows an unsigned 32-bit + // integer, so break out and force it to be + // ignored. + totalDataSize = 1 + buffersSize = 0 + break + } + + outBufs = append(outBufs, RxBuffer{ + Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), + Size: s, + ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), + }) + + offset += sizeOfConsumedBuffer + } + + r.rx.Flush() + + if buffersSize < totalDataSize { + // The descriptor is corrupted, ignore it. + log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) + continue + } + + return outBufs, totalDataSize + } +} + +// Bytes returns the byte slices on which the queue operates. +func (r *Rx) Bytes() (tx, rx []byte) { + return r.tx.Bytes(), r.rx.Bytes() +} + +// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. +func DecodeRxBufferHeader(b []byte) RxBuffer { + return RxBuffer{ + Offset: binary.LittleEndian.Uint64(b[postedOffset:]), + Size: binary.LittleEndian.Uint32(b[postedSize:]), + ID: binary.LittleEndian.Uint64(b[postedID:]), + UserData: binary.LittleEndian.Uint64(b[postedUserData:]), + } +} + +// RxCompletionSize returns the number of bytes needed to encode an rx +// completion containing "count" buffers. +func RxCompletionSize(count int) uint64 { + return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer +} + +// EncodeRxCompletion encodes an rx completion header. +func EncodeRxCompletion(b []byte, size, reserved uint32) { + binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) + binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) +} + +// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. +func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { + b = b[RxCompletionSize(i):] + binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) + binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) + binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) + binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) +} diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go new file mode 100755 index 000000000..beffe807b --- /dev/null +++ b/pkg/tcpip/link/sharedmem/queue/tx.go @@ -0,0 +1,151 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "encoding/binary" + + "gvisor.dev/gvisor/pkg/log" + "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" +) + +const ( + // Offsets within a packet header. + packetID = 0 + packetSize = 8 + packetReserved = 12 + + sizeOfPacketHeader = 16 + + // Offsets with a buffer descriptor + bufferOffset = 0 + bufferSize = 8 + + sizeOfBufferDescriptor = 12 +) + +// TxBuffer is the descriptor of a transmit buffer. +type TxBuffer struct { + Next *TxBuffer + Offset uint64 + Size uint32 +} + +// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the +// tx pipe is used to request the transmission of packets, while the rx pipe +// is used to receive which transmissions have completed. +// +// This struct is thread-compatible. +type Tx struct { + tx pipe.Tx + rx pipe.Rx +} + +// Init initializes the transmit queue with the given pipes. +func (t *Tx) Init(tx, rx []byte) { + t.tx.Init(tx) + t.rx.Init(rx) +} + +// Enqueue queues the given linked list of buffers for transmission as one +// packet. While it is queued, the caller must not modify them. +func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool { + // Reserve room in the tx pipe. + totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor + + b := t.tx.Push(totalLen) + if b == nil { + return false + } + + // Initialize the packet and buffer descriptors. + binary.LittleEndian.PutUint64(b[packetID:], id) + binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen) + binary.LittleEndian.PutUint32(b[packetReserved:], 0) + + offset := sizeOfPacketHeader + for i := bufferCount; i != 0; i-- { + binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset) + binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size) + offset += sizeOfBufferDescriptor + buffer = buffer.Next + } + + t.tx.Flush() + + return true +} + +// CompletedPacket returns the id of the last completed transmission. The +// returned id, if any, refers to a value passed on a previous call to +// Enqueue(). +func (t *Tx) CompletedPacket() (id uint64, ok bool) { + for { + b := t.rx.Pull() + if b == nil { + return 0, false + } + + if len(b) != 8 { + t.rx.Flush() + log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8) + continue + } + + v := binary.LittleEndian.Uint64(b) + + t.rx.Flush() + + return v, true + } +} + +// Bytes returns the byte slices on which the queue operates. +func (t *Tx) Bytes() (tx, rx []byte) { + return t.tx.Bytes(), t.rx.Bytes() +} + +// TxPacketInfo holds information about a packet sent on a tx queue. +type TxPacketInfo struct { + ID uint64 + Size uint32 + Reserved uint32 + BufferCount int +} + +// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue. +func DecodeTxPacketHeader(b []byte) TxPacketInfo { + return TxPacketInfo{ + ID: binary.LittleEndian.Uint64(b[packetID:]), + Size: binary.LittleEndian.Uint32(b[packetSize:]), + Reserved: binary.LittleEndian.Uint32(b[packetReserved:]), + BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor, + } +} + +// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent +// over a tx queue. +func DecodeTxBufferHeader(b []byte, i int) TxBuffer { + b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:] + return TxBuffer{ + Offset: binary.LittleEndian.Uint64(b[bufferOffset:]), + Size: binary.LittleEndian.Uint32(b[bufferSize:]), + } +} + +// EncodeTxCompletion encodes a tx completion header. +func EncodeTxCompletion(b []byte, id uint64) { + binary.LittleEndian.PutUint64(b, id) +} |