summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/queue/rx.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem/queue/rx.go')
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go221
1 files changed, 0 insertions, 221 deletions
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
deleted file mode 100644
index 696e6c9e5..000000000
--- a/pkg/tcpip/link/sharedmem/queue/rx.go
+++ /dev/null
@@ -1,221 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package queue provides the implementation of transmit and receive queues
-// based on shared memory ring buffers.
-package queue
-
-import (
- "encoding/binary"
- "sync/atomic"
-
- "gvisor.dev/gvisor/pkg/log"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
-)
-
-const (
- // Offsets within a posted buffer.
- postedOffset = 0
- postedSize = 8
- postedRemainingInGroup = 12
- postedUserData = 16
- postedID = 24
-
- sizeOfPostedBuffer = 32
-
- // Offsets within a received packet header.
- consumedPacketSize = 0
- consumedPacketReserved = 4
-
- sizeOfConsumedPacketHeader = 8
-
- // Offsets within a consumed buffer.
- consumedOffset = 0
- consumedSize = 8
- consumedUserData = 12
- consumedID = 20
-
- sizeOfConsumedBuffer = 28
-
- // The following are the allowed states of the shared data area.
- eventFDUninitialized = 0
- eventFDDisabled = 1
- eventFDEnabled = 2
-)
-
-// RxBuffer is the descriptor of a receive buffer.
-type RxBuffer struct {
- Offset uint64
- Size uint32
- ID uint64
- UserData uint64
-}
-
-// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
-// pipe is used to "post" buffers, while the rx pipe is used to receive packets
-// whose contents have been written to previously posted buffers.
-//
-// This struct is thread-compatible.
-type Rx struct {
- tx pipe.Tx
- rx pipe.Rx
- sharedEventFDState *uint32
-}
-
-// Init initializes the receive queue with the given pipes, and shared state
-// pointer -- the latter is used to enable/disable eventfd notifications.
-func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
- r.sharedEventFDState = sharedEventFDState
- r.tx.Init(tx)
- r.rx.Init(rx)
-}
-
-// EnableNotification updates the shared state such that the peer will notify
-// the eventfd when there are packets to be dequeued.
-func (r *Rx) EnableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
-}
-
-// DisableNotification updates the shared state such that the peer will not
-// notify the eventfd.
-func (r *Rx) DisableNotification() {
- atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
-}
-
-// PostedBuffersLimit returns the maximum number of buffers that can be posted
-// before the tx queue fills up.
-func (r *Rx) PostedBuffersLimit() uint64 {
- return r.tx.Capacity(sizeOfPostedBuffer)
-}
-
-// PostBuffers makes the given buffers available for receiving data from the
-// peer. Once they are posted, the peer is free to write to them and will
-// eventually post them back for consumption.
-func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
- for i := range buffers {
- b := r.tx.Push(sizeOfPostedBuffer)
- if b == nil {
- r.tx.Abort()
- return false
- }
-
- pb := &buffers[i]
- binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
- binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
- binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
- binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
- binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
- }
-
- r.tx.Flush()
-
- return true
-}
-
-// Dequeue receives buffers that have been previously posted by PostBuffers()
-// and that have been filled by the peer and posted back.
-//
-// This is similar to append() in that new buffers are appended to "bufs", with
-// reallocation only if "bufs" doesn't have enough capacity.
-func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
- for {
- outBufs := bufs
-
- // Pull the next descriptor from the rx pipe.
- b := r.rx.Pull()
- if b == nil {
- return bufs, 0
- }
-
- if len(b) < sizeOfConsumedPacketHeader {
- log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
- r.rx.Flush()
- continue
- }
-
- totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
-
- // Calculate the number of buffer descriptors and copy them
- // over to the output.
- count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
- offset := sizeOfConsumedPacketHeader
- buffersSize := uint32(0)
- for i := count; i > 0; i-- {
- s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
- buffersSize += s
- if buffersSize < s {
- // The buffer size overflows an unsigned 32-bit
- // integer, so break out and force it to be
- // ignored.
- totalDataSize = 1
- buffersSize = 0
- break
- }
-
- outBufs = append(outBufs, RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
- Size: s,
- ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
- })
-
- offset += sizeOfConsumedBuffer
- }
-
- r.rx.Flush()
-
- if buffersSize < totalDataSize {
- // The descriptor is corrupted, ignore it.
- log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
- continue
- }
-
- return outBufs, totalDataSize
- }
-}
-
-// Bytes returns the byte slices on which the queue operates.
-func (r *Rx) Bytes() (tx, rx []byte) {
- return r.tx.Bytes(), r.rx.Bytes()
-}
-
-// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
-func DecodeRxBufferHeader(b []byte) RxBuffer {
- return RxBuffer{
- Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
- Size: binary.LittleEndian.Uint32(b[postedSize:]),
- ID: binary.LittleEndian.Uint64(b[postedID:]),
- UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
- }
-}
-
-// RxCompletionSize returns the number of bytes needed to encode an rx
-// completion containing "count" buffers.
-func RxCompletionSize(count int) uint64 {
- return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
-}
-
-// EncodeRxCompletion encodes an rx completion header.
-func EncodeRxCompletion(b []byte, size, reserved uint32) {
- binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
- binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
-}
-
-// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
-func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
- b = b[RxCompletionSize(i):]
- binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
- binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
- binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
- binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
-}