diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue/rx.go')
-rw-r--r-- | pkg/tcpip/link/sharedmem/queue/rx.go | 221 |
1 files changed, 0 insertions, 221 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go deleted file mode 100644 index 696e6c9e5..000000000 --- a/pkg/tcpip/link/sharedmem/queue/rx.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package queue provides the implementation of transmit and receive queues -// based on shared memory ring buffers. -package queue - -import ( - "encoding/binary" - "sync/atomic" - - "gvisor.dev/gvisor/pkg/log" - "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -const ( - // Offsets within a posted buffer. - postedOffset = 0 - postedSize = 8 - postedRemainingInGroup = 12 - postedUserData = 16 - postedID = 24 - - sizeOfPostedBuffer = 32 - - // Offsets within a received packet header. - consumedPacketSize = 0 - consumedPacketReserved = 4 - - sizeOfConsumedPacketHeader = 8 - - // Offsets within a consumed buffer. - consumedOffset = 0 - consumedSize = 8 - consumedUserData = 12 - consumedID = 20 - - sizeOfConsumedBuffer = 28 - - // The following are the allowed states of the shared data area. - eventFDUninitialized = 0 - eventFDDisabled = 1 - eventFDEnabled = 2 -) - -// RxBuffer is the descriptor of a receive buffer. -type RxBuffer struct { - Offset uint64 - Size uint32 - ID uint64 - UserData uint64 -} - -// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx -// pipe is used to "post" buffers, while the rx pipe is used to receive packets -// whose contents have been written to previously posted buffers. -// -// This struct is thread-compatible. -type Rx struct { - tx pipe.Tx - rx pipe.Rx - sharedEventFDState *uint32 -} - -// Init initializes the receive queue with the given pipes, and shared state -// pointer -- the latter is used to enable/disable eventfd notifications. -func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { - r.sharedEventFDState = sharedEventFDState - r.tx.Init(tx) - r.rx.Init(rx) -} - -// EnableNotification updates the shared state such that the peer will notify -// the eventfd when there are packets to be dequeued. -func (r *Rx) EnableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) -} - -// DisableNotification updates the shared state such that the peer will not -// notify the eventfd. -func (r *Rx) DisableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) -} - -// PostedBuffersLimit returns the maximum number of buffers that can be posted -// before the tx queue fills up. -func (r *Rx) PostedBuffersLimit() uint64 { - return r.tx.Capacity(sizeOfPostedBuffer) -} - -// PostBuffers makes the given buffers available for receiving data from the -// peer. Once they are posted, the peer is free to write to them and will -// eventually post them back for consumption. -func (r *Rx) PostBuffers(buffers []RxBuffer) bool { - for i := range buffers { - b := r.tx.Push(sizeOfPostedBuffer) - if b == nil { - r.tx.Abort() - return false - } - - pb := &buffers[i] - binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) - binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) - binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) - binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) - binary.LittleEndian.PutUint64(b[postedID:], pb.ID) - } - - r.tx.Flush() - - return true -} - -// Dequeue receives buffers that have been previously posted by PostBuffers() -// and that have been filled by the peer and posted back. -// -// This is similar to append() in that new buffers are appended to "bufs", with -// reallocation only if "bufs" doesn't have enough capacity. -func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { - for { - outBufs := bufs - - // Pull the next descriptor from the rx pipe. - b := r.rx.Pull() - if b == nil { - return bufs, 0 - } - - if len(b) < sizeOfConsumedPacketHeader { - log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) - r.rx.Flush() - continue - } - - totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) - - // Calculate the number of buffer descriptors and copy them - // over to the output. - count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer - offset := sizeOfConsumedPacketHeader - buffersSize := uint32(0) - for i := count; i > 0; i-- { - s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) - buffersSize += s - if buffersSize < s { - // The buffer size overflows an unsigned 32-bit - // integer, so break out and force it to be - // ignored. - totalDataSize = 1 - buffersSize = 0 - break - } - - outBufs = append(outBufs, RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), - Size: s, - ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), - }) - - offset += sizeOfConsumedBuffer - } - - r.rx.Flush() - - if buffersSize < totalDataSize { - // The descriptor is corrupted, ignore it. - log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) - continue - } - - return outBufs, totalDataSize - } -} - -// Bytes returns the byte slices on which the queue operates. -func (r *Rx) Bytes() (tx, rx []byte) { - return r.tx.Bytes(), r.rx.Bytes() -} - -// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. -func DecodeRxBufferHeader(b []byte) RxBuffer { - return RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[postedOffset:]), - Size: binary.LittleEndian.Uint32(b[postedSize:]), - ID: binary.LittleEndian.Uint64(b[postedID:]), - UserData: binary.LittleEndian.Uint64(b[postedUserData:]), - } -} - -// RxCompletionSize returns the number of bytes needed to encode an rx -// completion containing "count" buffers. -func RxCompletionSize(count int) uint64 { - return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer -} - -// EncodeRxCompletion encodes an rx completion header. -func EncodeRxCompletion(b []byte, size, reserved uint32) { - binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) - binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) -} - -// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. -func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { - b = b[RxCompletionSize(i):] - binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) - binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) - binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) - binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) -} |