diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/pipe/tx.go')
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/tx.go | 161 |
1 files changed, 0 insertions, 161 deletions
diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go deleted file mode 100644 index 9841eb231..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/tx.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Tx is the transmit side of the shared memory ring buffer. -type Tx struct { - p pipe - maxPayloadSize uint64 - - head uint64 - tail uint64 - next uint64 - - tailHeader uint64 -} - -// Init initializes the transmit end of the pipe. In the initial state, the next -// slot to be written is the very first one, and the transmitter has the whole -// ring buffer available to it. -func (t *Tx) Init(b []byte) { - t.p.init(b) - // maxPayloadSize excludes the header of the payload, and the header - // of the wrapping message. - t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader - t.tail = 0xfffffffe * jump - t.next = t.tail - t.head = t.tail + jump - t.p.write(t.tail, slotFree) -} - -// Capacity determines how many records of the given size can be written to the -// pipe before it fills up. -func (t *Tx) Capacity(recordSize uint64) uint64 { - available := uint64(len(t.p.buffer)) - sizeOfSlotHeader - entryLen := payloadToSlotSize(recordSize) - return available / entryLen -} - -// Push reserves "payloadSize" bytes for transmission in the pipe. The caller -// populates the returned slice with the data to be transferred and enventually -// calls Flush() to make the data visible to the reader, or Abort() to make the -// pipe forget all Push() calls since the last Flush(). -// -// The returned slice is available until Flush() or Abort() is next called. -// After that, it must not be touched. -func (t *Tx) Push(payloadSize uint64) []byte { - // Fail request if we know we will never have enough room. - if payloadSize > t.maxPayloadSize { - return nil - } - - totalLen := payloadToSlotSize(payloadSize) - newNext := t.next + totalLen - nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer)) - if int64(newNext-nextWrap) >= 0 { - // The new buffer would overflow the pipe, so we push a wrapping - // slot, then try to add the actual slot to the front of the - // pipe. - newNext = (newNext & revolutionMask) + jump - wrappingPayloadSize := slotToPayloadSize(newNext - t.next) - if !t.reclaim(newNext) { - return nil - } - - oldNext := t.next - t.next = newNext - if oldNext != t.tail { - t.p.write(oldNext, wrappingPayloadSize) - } else { - t.tailHeader = wrappingPayloadSize - t.Flush() - } - - newNext += totalLen - } - - // Check that we have enough room for the buffer. - if !t.reclaim(newNext) { - return nil - } - - if t.next != t.tail { - t.p.write(t.next, payloadSize) - } else { - t.tailHeader = payloadSize - } - - // Grab the buffer before updating t.next. - b := t.p.data(t.next, payloadSize) - t.next = newNext - - return b -} - -// reclaim attempts to advance the head until at least newNext. If the head is -// already at or beyond newNext, nothing happens and true is returned; otherwise -// it tries to reclaim slots that have already been consumed by the receive end -// of the pipe (they will be marked as free) and returns a boolean indicating -// whether it was successful in reclaiming enough slots. -func (t *Tx) reclaim(newNext uint64) bool { - for int64(newNext-t.head) > 0 { - // Can't reclaim if slot is not free. - header := t.p.readAtomic(t.head) - if header&slotFree == 0 { - return false - } - - payloadSize := header & slotSizeMask - newHead := t.head + payloadToSlotSize(payloadSize) - - // Check newHead is within bounds and valid. - if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) { - return false - } - - t.head = newHead - } - - return true -} - -// Abort causes all Push() calls since the last Flush() to be forgotten and -// therefore they will not be made visible to the receiver. -func (t *Tx) Abort() { - t.next = t.tail -} - -// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever -// is the most recent] to be made visible to the receiver. -func (t *Tx) Flush() { - if t.next == t.tail { - // Nothing to do if there are no pushed buffers. - return - } - - if t.next != t.head { - // The receiver will spin in t.next, so we must make sure that - // the slotFree bit is set. - t.p.write(t.next, slotFree) - } - - t.p.writeAtomic(t.tail, t.tailHeader) - t.tail = t.next -} - -// Bytes returns the byte slice on which the pipe operates. -func (t *Tx) Bytes() []byte { - return t.p.buffer -} |