diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/pipe')
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/BUILD | 23 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe.go | 68 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe_test.go | 507 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go | 25 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/rx.go | 83 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/tx.go | 151 |
6 files changed, 857 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD new file mode 100644 index 000000000..e8d795500 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/BUILD @@ -0,0 +1,23 @@ +package(licenses = ["notice"]) # BSD + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "pipe", + srcs = [ + "pipe.go", + "pipe_unsafe.go", + "rx.go", + "tx.go", + ], + importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe", + visibility = ["//:sandbox"], +) + +go_test( + name = "pipe_test", + srcs = [ + "pipe_test.go", + ], + embed = [":pipe"], +) diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go new file mode 100644 index 000000000..1173a60da --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/pipe.go @@ -0,0 +1,68 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pipe implements a shared memory ring buffer on which a single reader +// and a single writer can operate (read/write) concurrently. The ring buffer +// allows for data of different sizes to be written, and preserves the boundary +// of the written data. +// +// Example usage is as follows: +// +// wb := t.Push(20) +// // Write data to wb. +// t.Flush() +// +// rb := r.Pull() +// // Do something with data in rb. +// t.Flush() +package pipe + +import ( + "math" +) + +const ( + jump uint64 = math.MaxUint32 + 1 + offsetMask uint64 = math.MaxUint32 + revolutionMask uint64 = ^offsetMask + + sizeOfSlotHeader = 8 // sizeof(uint64) + slotFree uint64 = 1 << 63 + slotSizeMask uint64 = math.MaxUint32 +) + +// payloadToSlotSize calculates the total size of a slot based on its payload +// size. The total size is the header size, plus the payload size, plus padding +// if necessary to make the total size a multiple of sizeOfSlotHeader. +func payloadToSlotSize(payloadSize uint64) uint64 { + s := sizeOfSlotHeader + payloadSize + return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1) +} + +// slotToPayloadSize calculates the payload size of a slot based on the total +// size of the slot. This is only meant to be used when creating slots that +// don't carry information (e.g., free slots or wrap slots). +func slotToPayloadSize(offset uint64) uint64 { + return offset - sizeOfSlotHeader +} + +// pipe is a basic data structure used by both (transmit & receive) ends of a +// pipe. Indices into this pipe are split into two fields: offset, which counts +// the number of bytes from the beginning of the buffer, and revolution, which +// counts the number of times the index has wrapped around. +type pipe struct { + buffer []byte +} + +// init initializes the pipe buffer such that its size is a multiple of the size +// of the slot header. +func (p *pipe) init(b []byte) { + p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)] +} + +// data returns a section of the buffer starting at the given index (which may +// include revolution information) and with the given size. +func (p *pipe) data(idx uint64, size uint64) []byte { + return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size] +} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go new file mode 100644 index 000000000..441ff5b25 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go @@ -0,0 +1,507 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pipe + +import ( + "math/rand" + "reflect" + "runtime" + "sync" + "testing" +) + +func TestSimpleReadWrite(t *testing.T) { + // Check that a simple write can be properly read from the rx side. + tr := rand.New(rand.NewSource(99)) + rr := rand.New(rand.NewSource(99)) + + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + wb := tx.Push(10) + if wb == nil { + t.Fatalf("Push failed on empty pipe") + } + for i := range wb { + wb[i] = byte(tr.Intn(256)) + } + tx.Flush() + + var rx Rx + rx.Init(b) + rb := rx.Pull() + if len(rb) != 10 { + t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10) + } + + for i := range rb { + if v := byte(rr.Intn(256)); v != rb[i] { + t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v) + } + } + rx.Flush() +} + +func TestEmptyRead(t *testing.T) { + // Check that pulling from an empty pipe fails. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on empty pipe") + } +} + +func TestTooLargeWrite(t *testing.T) { + // Check that writes that are too large are properly rejected. + b := make([]byte, 96) + var tx Tx + tx.Init(b) + + if wb := tx.Push(96); wb != nil { + t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe") + } + + if wb := tx.Push(88); wb != nil { + t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe") + } + + if wb := tx.Push(80); wb == nil { + t.Fatalf("Write of 80 bytes failed on 96-byte pipe") + } +} + +func TestFullWrite(t *testing.T) { + // Check that writes fail when the pipe is full. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(80); wb == nil { + t.Fatalf("Write of 80 bytes failed on 96-byte pipe") + } + + if wb := tx.Push(1); wb != nil { + t.Fatalf("Write succeeded on full pipe") + } +} + +func TestFullAndFlushedWrite(t *testing.T) { + // Check that writes fail when the pipe is full and has already been + // flushed. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(80); wb == nil { + t.Fatalf("Write of 80 bytes failed on 96-byte pipe") + } + + tx.Flush() + + if wb := tx.Push(1); wb != nil { + t.Fatalf("Write succeeded on full pipe") + } +} + +func TestTxFlushTwice(t *testing.T) { + // Checks that a second consecutive tx flush is a no-op. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + // Make copy of original tx queue, flush it, then check that it didn't + // change. + orig := tx + tx.Flush() + + if !reflect.DeepEqual(orig, tx) { + t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig) + } +} + +func TestRxFlushTwice(t *testing.T) { + // Checks that a second consecutive rx flush is a no-op. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // Make copy of original rx queue, flush it, then check that it didn't + // change. + orig := rx + rx.Flush() + + if !reflect.DeepEqual(orig, rx) { + t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig) + } +} + +func TestWrapInMiddleOfTransaction(t *testing.T) { + // Check that writes are not flushed when we need to wrap the buffer + // around. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // At this point the ring buffer is empty, but the write is at offset + // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). + if wb := tx.Push(10); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on non-full pipe") + } + + // We haven't flushed yet, so pull must return nil. + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on non-flushed pipe") + } + + tx.Flush() + + // The two buffers must be available now. + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } +} + +func TestWriteAbort(t *testing.T) { + // Check that a read fails on a pipe that has had data pushed to it but + // has aborted the push. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(10); wb == nil { + t.Fatalf("Write failed on empty pipe") + } + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on empty pipe") + } + + tx.Abort() + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on empty pipe") + } +} + +func TestWrappedWriteAbort(t *testing.T) { + // Check that writes are properly aborted even if the writes wrap + // around. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // At this point the ring buffer is empty, but the write is at offset + // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). + if wb := tx.Push(10); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on non-full pipe") + } + + // We haven't flushed yet, so pull must return nil. + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on non-flushed pipe") + } + + tx.Abort() + + // The pushes were aborted, so no data should be readable. + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on non-flushed pipe") + } + + // Try the same transactions again, but flush this time. + if wb := tx.Push(10); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on non-full pipe") + } + + tx.Flush() + + // The two buffers must be available now. + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } +} + +func TestEmptyReadOnNonFlushedWrite(t *testing.T) { + // Check that a read fails on a pipe that has had data pushed to it + // but not yet flushed. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(10); wb == nil { + t.Fatalf("Write failed on empty pipe") + } + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on empty pipe") + } + + tx.Flush() + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull on failed on non-empty pipe") + } +} + +func TestPullAfterPullingEntirePipe(t *testing.T) { + // Check that Pull fails when the pipe is full, but all of it has + // already been pulled but not yet flushed. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // At this point the ring buffer is empty, but the write is at offset + // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3 + // buffers that will fill the pipe. + if wb := tx.Push(10); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + + if wb := tx.Push(20); wb == nil { + t.Fatalf("Push failed on non-full pipe") + } + + if wb := tx.Push(24); wb == nil { + t.Fatalf("Push failed on non-full pipe") + } + + tx.Flush() + + // The three buffers must be available now. + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + + // Fourth pull must fail. + if rb := rx.Pull(); rb != nil { + t.Fatalf("Pull succeeded on empty pipe") + } +} + +func TestNoRoomToWrapOnPush(t *testing.T) { + // Check that Push fails when it tries to allocate room to add a wrap + // message. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + var rx Rx + rx.Init(b) + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // At this point the ring buffer is empty, but the write is at offset + // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20, + // which won't fit (64+20+8+padding = 96, which wouldn't leave room for + // the padding), so it wraps around. + if wb := tx.Push(20); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + + tx.Flush() + + // Buffer offset is at 28. Try to write 70, which would require a wrap + // slot which cannot be created now. + if wb := tx.Push(70); wb != nil { + t.Fatalf("Push succeeded on pipe with no room for wrap message") + } +} + +func TestRxImplicitFlushOfWrapMessage(t *testing.T) { + // Check if the first read is that of a wrapping message, that it gets + // immediately flushed. + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + if wb := tx.Push(50); wb == nil { + t.Fatalf("Push failed on empty pipe") + } + tx.Flush() + + // This will cause a wrapping message to written. + if wb := tx.Push(60); wb != nil { + t.Fatalf("Push succeeded when there is no room in pipe") + } + + var rx Rx + rx.Init(b) + + // Read the first message. + if rb := rx.Pull(); rb == nil { + t.Fatalf("Pull failed on non-empty pipe") + } + rx.Flush() + + // This should fail because of the wrapping message is taking up space. + if wb := tx.Push(60); wb != nil { + t.Fatalf("Push succeeded when there is no room in pipe") + } + + // Try to read the next one. This should consume the wrapping message. + rx.Pull() + + // This must now succeed. + if wb := tx.Push(60); wb == nil { + t.Fatalf("Push failed on empty pipe") + } +} + +func TestConcurrentReaderWriter(t *testing.T) { + // Push a million buffers of random sizes and random contents. Check + // that buffers read match what was written. + tr := rand.New(rand.NewSource(99)) + rr := rand.New(rand.NewSource(99)) + + b := make([]byte, 100) + var tx Tx + tx.Init(b) + + var rx Rx + rx.Init(b) + + const count = 1000000 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runtime.Gosched() + for i := 0; i < count; i++ { + n := 1 + tr.Intn(80) + wb := tx.Push(uint64(n)) + for wb == nil { + wb = tx.Push(uint64(n)) + } + + for j := range wb { + wb[j] = byte(tr.Intn(256)) + } + + tx.Flush() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + runtime.Gosched() + for i := 0; i < count; i++ { + n := 1 + rr.Intn(80) + rb := rx.Pull() + for rb == nil { + rb = rx.Pull() + } + + if n != len(rb) { + t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n) + } + + for j := range rb { + if v := byte(rr.Intn(256)); v != rb[j] { + t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v) + } + } + + rx.Flush() + } + }() + + wg.Wait() +} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go new file mode 100644 index 000000000..d536abedf --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go @@ -0,0 +1,25 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pipe + +import ( + "sync/atomic" + "unsafe" +) + +func (p *pipe) write(idx uint64, v uint64) { + ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) + *ptr = v +} + +func (p *pipe) writeAtomic(idx uint64, v uint64) { + ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) + atomic.StoreUint64(ptr, v) +} + +func (p *pipe) readAtomic(idx uint64) uint64 { + ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) + return atomic.LoadUint64(ptr) +} diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go new file mode 100644 index 000000000..261e21f9e --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/rx.go @@ -0,0 +1,83 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pipe + +// Rx is the receive side of the shared memory ring buffer. +type Rx struct { + p pipe + + tail uint64 + head uint64 +} + +// Init initializes the receive end of the pipe. In the initial state, the next +// slot to be inspected is the very first one. +func (r *Rx) Init(b []byte) { + r.p.init(b) + r.tail = 0xfffffffe * jump + r.head = r.tail +} + +// Pull reads the next buffer from the pipe, returning nil if there isn't one +// currently available. +// +// The returned slice is available until Flush() is next called. After that, it +// must not be touched. +func (r *Rx) Pull() []byte { + if r.head == r.tail+jump { + // We've already pulled the whole pipe. + return nil + } + + header := r.p.readAtomic(r.head) + if header&slotFree != 0 { + // The next slot is free, we can't pull it yet. + return nil + } + + payloadSize := header & slotSizeMask + newHead := r.head + payloadToSlotSize(payloadSize) + headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer)) + + // Check if this is a wrapping slot. If that's the case, it carries no + // data, so we just skip it and try again from the first slot. + if int64(newHead-headWrap) >= 0 { + if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 { + return nil + } + + if r.tail == r.head { + // If this is the first pull since the last Flush() + // call, we flush the state so that the sender can use + // this space if it needs to. + r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head)) + r.tail = newHead + } + + r.head = newHead + return r.Pull() + } + + // Grab the buffer before updating r.head. + b := r.p.data(r.head, payloadSize) + r.head = newHead + return b +} + +// Flush tells the transmitter that all buffers pulled since the last Flush() +// have been used, so the transmitter is free to used their slots for further +// transmission. +func (r *Rx) Flush() { + if r.head == r.tail { + return + } + r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail)) + r.tail = r.head +} + +// Bytes returns the byte slice on which the pipe operates. +func (r *Rx) Bytes() []byte { + return r.p.buffer +} diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go new file mode 100644 index 000000000..374f515ab --- /dev/null +++ b/pkg/tcpip/link/sharedmem/pipe/tx.go @@ -0,0 +1,151 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pipe + +// Tx is the transmit side of the shared memory ring buffer. +type Tx struct { + p pipe + maxPayloadSize uint64 + + head uint64 + tail uint64 + next uint64 + + tailHeader uint64 +} + +// Init initializes the transmit end of the pipe. In the initial state, the next +// slot to be written is the very first one, and the transmitter has the whole +// ring buffer available to it. +func (t *Tx) Init(b []byte) { + t.p.init(b) + // maxPayloadSize excludes the header of the payload, and the header + // of the wrapping message. + t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader + t.tail = 0xfffffffe * jump + t.next = t.tail + t.head = t.tail + jump + t.p.write(t.tail, slotFree) +} + +// Capacity determines how many records of the given size can be written to the +// pipe before it fills up. +func (t *Tx) Capacity(recordSize uint64) uint64 { + available := uint64(len(t.p.buffer)) - sizeOfSlotHeader + entryLen := payloadToSlotSize(recordSize) + return available / entryLen +} + +// Push reserves "payloadSize" bytes for transmission in the pipe. The caller +// populates the returned slice with the data to be transferred and enventually +// calls Flush() to make the data visible to the reader, or Abort() to make the +// pipe forget all Push() calls since the last Flush(). +// +// The returned slice is available until Flush() or Abort() is next called. +// After that, it must not be touched. +func (t *Tx) Push(payloadSize uint64) []byte { + // Fail request if we know we will never have enough room. + if payloadSize > t.maxPayloadSize { + return nil + } + + totalLen := payloadToSlotSize(payloadSize) + newNext := t.next + totalLen + nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer)) + if int64(newNext-nextWrap) >= 0 { + // The new buffer would overflow the pipe, so we push a wrapping + // slot, then try to add the actual slot to the front of the + // pipe. + newNext = (newNext & revolutionMask) + jump + wrappingPayloadSize := slotToPayloadSize(newNext - t.next) + if !t.reclaim(newNext) { + return nil + } + + oldNext := t.next + t.next = newNext + if oldNext != t.tail { + t.p.write(oldNext, wrappingPayloadSize) + } else { + t.tailHeader = wrappingPayloadSize + t.Flush() + } + + newNext += totalLen + } + + // Check that we have enough room for the buffer. + if !t.reclaim(newNext) { + return nil + } + + if t.next != t.tail { + t.p.write(t.next, payloadSize) + } else { + t.tailHeader = payloadSize + } + + // Grab the buffer before updating t.next. + b := t.p.data(t.next, payloadSize) + t.next = newNext + + return b +} + +// reclaim attempts to advance the head until at least newNext. If the head is +// already at or beyond newNext, nothing happens and true is returned; otherwise +// it tries to reclaim slots that have already been consumed by the receive end +// of the pipe (they will be marked as free) and returns a boolean indicating +// whether it was successful in reclaiming enough slots. +func (t *Tx) reclaim(newNext uint64) bool { + for int64(newNext-t.head) > 0 { + // Can't reclaim if slot is not free. + header := t.p.readAtomic(t.head) + if header&slotFree == 0 { + return false + } + + payloadSize := header & slotSizeMask + newHead := t.head + payloadToSlotSize(payloadSize) + + // Check newHead is within bounds and valid. + if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) { + return false + } + + t.head = newHead + } + + return true +} + +// Abort causes all Push() calls since the last Flush() to be forgotten and +// therefore they will not be made visible to the receiver. +func (t *Tx) Abort() { + t.next = t.tail +} + +// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever +// is the most recent] to be made visible to the receiver. +func (t *Tx) Flush() { + if t.next == t.tail { + // Nothing to do if there are no pushed buffers. + return + } + + if t.next != t.head { + // The receiver will spin in t.next, so we must make sure that + // the slotFree bit is set. + t.p.write(t.next, slotFree) + } + + t.p.writeAtomic(t.tail, t.tailHeader) + t.tail = t.next +} + +// Bytes returns the byte slice on which the pipe operates. +func (t *Tx) Bytes() []byte { + return t.p.buffer +} |