diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/pipe')
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/BUILD | 24 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe.go | 78 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe_test.go | 517 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go | 35 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/rx.go | 93 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/pipe/tx.go | 161 |
6 files changed, 0 insertions, 908 deletions
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD deleted file mode 100644 index 6b5bc542c..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/BUILD +++ /dev/null @@ -1,24 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "pipe", - srcs = [ - "pipe.go", - "pipe_unsafe.go", - "rx.go", - "tx.go", - ], - importpath = "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe", - visibility = ["//visibility:public"], -) - -go_test( - name = "pipe_test", - srcs = [ - "pipe_test.go", - ], - embed = [":pipe"], -) diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go deleted file mode 100644 index 74c9f0311..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package pipe implements a shared memory ring buffer on which a single reader -// and a single writer can operate (read/write) concurrently. The ring buffer -// allows for data of different sizes to be written, and preserves the boundary -// of the written data. -// -// Example usage is as follows: -// -// wb := t.Push(20) -// // Write data to wb. -// t.Flush() -// -// rb := r.Pull() -// // Do something with data in rb. -// t.Flush() -package pipe - -import ( - "math" -) - -const ( - jump uint64 = math.MaxUint32 + 1 - offsetMask uint64 = math.MaxUint32 - revolutionMask uint64 = ^offsetMask - - sizeOfSlotHeader = 8 // sizeof(uint64) - slotFree uint64 = 1 << 63 - slotSizeMask uint64 = math.MaxUint32 -) - -// payloadToSlotSize calculates the total size of a slot based on its payload -// size. The total size is the header size, plus the payload size, plus padding -// if necessary to make the total size a multiple of sizeOfSlotHeader. -func payloadToSlotSize(payloadSize uint64) uint64 { - s := sizeOfSlotHeader + payloadSize - return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1) -} - -// slotToPayloadSize calculates the payload size of a slot based on the total -// size of the slot. This is only meant to be used when creating slots that -// don't carry information (e.g., free slots or wrap slots). -func slotToPayloadSize(offset uint64) uint64 { - return offset - sizeOfSlotHeader -} - -// pipe is a basic data structure used by both (transmit & receive) ends of a -// pipe. Indices into this pipe are split into two fields: offset, which counts -// the number of bytes from the beginning of the buffer, and revolution, which -// counts the number of times the index has wrapped around. -type pipe struct { - buffer []byte -} - -// init initializes the pipe buffer such that its size is a multiple of the size -// of the slot header. -func (p *pipe) init(b []byte) { - p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)] -} - -// data returns a section of the buffer starting at the given index (which may -// include revolution information) and with the given size. -func (p *pipe) data(idx uint64, size uint64) []byte { - return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size] -} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go deleted file mode 100644 index 59ef69a8b..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go +++ /dev/null @@ -1,517 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "math/rand" - "reflect" - "runtime" - "sync" - "testing" -) - -func TestSimpleReadWrite(t *testing.T) { - // Check that a simple write can be properly read from the rx side. - tr := rand.New(rand.NewSource(99)) - rr := rand.New(rand.NewSource(99)) - - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - wb := tx.Push(10) - if wb == nil { - t.Fatalf("Push failed on empty pipe") - } - for i := range wb { - wb[i] = byte(tr.Intn(256)) - } - tx.Flush() - - var rx Rx - rx.Init(b) - rb := rx.Pull() - if len(rb) != 10 { - t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10) - } - - for i := range rb { - if v := byte(rr.Intn(256)); v != rb[i] { - t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v) - } - } - rx.Flush() -} - -func TestEmptyRead(t *testing.T) { - // Check that pulling from an empty pipe fails. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestTooLargeWrite(t *testing.T) { - // Check that writes that are too large are properly rejected. - b := make([]byte, 96) - var tx Tx - tx.Init(b) - - if wb := tx.Push(96); wb != nil { - t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe") - } - - if wb := tx.Push(88); wb != nil { - t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe") - } - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } -} - -func TestFullWrite(t *testing.T) { - // Check that writes fail when the pipe is full. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } - - if wb := tx.Push(1); wb != nil { - t.Fatalf("Write succeeded on full pipe") - } -} - -func TestFullAndFlushedWrite(t *testing.T) { - // Check that writes fail when the pipe is full and has already been - // flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } - - tx.Flush() - - if wb := tx.Push(1); wb != nil { - t.Fatalf("Write succeeded on full pipe") - } -} - -func TestTxFlushTwice(t *testing.T) { - // Checks that a second consecutive tx flush is a no-op. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - // Make copy of original tx queue, flush it, then check that it didn't - // change. - orig := tx - tx.Flush() - - if !reflect.DeepEqual(orig, tx) { - t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig) - } -} - -func TestRxFlushTwice(t *testing.T) { - // Checks that a second consecutive rx flush is a no-op. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // Make copy of original rx queue, flush it, then check that it didn't - // change. - orig := rx - rx.Flush() - - if !reflect.DeepEqual(orig, rx) { - t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig) - } -} - -func TestWrapInMiddleOfTransaction(t *testing.T) { - // Check that writes are not flushed when we need to wrap the buffer - // around. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - // We haven't flushed yet, so pull must return nil. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - tx.Flush() - - // The two buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } -} - -func TestWriteAbort(t *testing.T) { - // Check that a read fails on a pipe that has had data pushed to it but - // has aborted the push. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(10); wb == nil { - t.Fatalf("Write failed on empty pipe") - } - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } - - tx.Abort() - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestWrappedWriteAbort(t *testing.T) { - // Check that writes are properly aborted even if the writes wrap - // around. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - // We haven't flushed yet, so pull must return nil. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - tx.Abort() - - // The pushes were aborted, so no data should be readable. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - // Try the same transactions again, but flush this time. - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - tx.Flush() - - // The two buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } -} - -func TestEmptyReadOnNonFlushedWrite(t *testing.T) { - // Check that a read fails on a pipe that has had data pushed to it - // but not yet flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(10); wb == nil { - t.Fatalf("Write failed on empty pipe") - } - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } - - tx.Flush() - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull on failed on non-empty pipe") - } -} - -func TestPullAfterPullingEntirePipe(t *testing.T) { - // Check that Pull fails when the pipe is full, but all of it has - // already been pulled but not yet flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3 - // buffers that will fill the pipe. - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(20); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - if wb := tx.Push(24); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - tx.Flush() - - // The three buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - // Fourth pull must fail. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestNoRoomToWrapOnPush(t *testing.T) { - // Check that Push fails when it tries to allocate room to add a wrap - // message. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20, - // which won't fit (64+20+8+padding = 96, which wouldn't leave room for - // the padding), so it wraps around. - if wb := tx.Push(20); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - tx.Flush() - - // Buffer offset is at 28. Try to write 70, which would require a wrap - // slot which cannot be created now. - if wb := tx.Push(70); wb != nil { - t.Fatalf("Push succeeded on pipe with no room for wrap message") - } -} - -func TestRxImplicitFlushOfWrapMessage(t *testing.T) { - // Check if the first read is that of a wrapping message, that it gets - // immediately flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - // This will cause a wrapping message to written. - if wb := tx.Push(60); wb != nil { - t.Fatalf("Push succeeded when there is no room in pipe") - } - - var rx Rx - rx.Init(b) - - // Read the first message. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // This should fail because of the wrapping message is taking up space. - if wb := tx.Push(60); wb != nil { - t.Fatalf("Push succeeded when there is no room in pipe") - } - - // Try to read the next one. This should consume the wrapping message. - rx.Pull() - - // This must now succeed. - if wb := tx.Push(60); wb == nil { - t.Fatalf("Push failed on empty pipe") - } -} - -func TestConcurrentReaderWriter(t *testing.T) { - // Push a million buffers of random sizes and random contents. Check - // that buffers read match what was written. - tr := rand.New(rand.NewSource(99)) - rr := rand.New(rand.NewSource(99)) - - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - var rx Rx - rx.Init(b) - - const count = 1000000 - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - runtime.Gosched() - for i := 0; i < count; i++ { - n := 1 + tr.Intn(80) - wb := tx.Push(uint64(n)) - for wb == nil { - wb = tx.Push(uint64(n)) - } - - for j := range wb { - wb[j] = byte(tr.Intn(256)) - } - - tx.Flush() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - runtime.Gosched() - for i := 0; i < count; i++ { - n := 1 + rr.Intn(80) - rb := rx.Pull() - for rb == nil { - rb = rx.Pull() - } - - if n != len(rb) { - t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n) - } - - for j := range rb { - if v := byte(rr.Intn(256)); v != rb[j] { - t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v) - } - } - - rx.Flush() - } - }() - - wg.Wait() -} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go deleted file mode 100644 index 62d17029e..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "sync/atomic" - "unsafe" -) - -func (p *pipe) write(idx uint64, v uint64) { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - *ptr = v -} - -func (p *pipe) writeAtomic(idx uint64, v uint64) { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - atomic.StoreUint64(ptr, v) -} - -func (p *pipe) readAtomic(idx uint64) uint64 { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - return atomic.LoadUint64(ptr) -} diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go deleted file mode 100644 index f22e533ac..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/rx.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Rx is the receive side of the shared memory ring buffer. -type Rx struct { - p pipe - - tail uint64 - head uint64 -} - -// Init initializes the receive end of the pipe. In the initial state, the next -// slot to be inspected is the very first one. -func (r *Rx) Init(b []byte) { - r.p.init(b) - r.tail = 0xfffffffe * jump - r.head = r.tail -} - -// Pull reads the next buffer from the pipe, returning nil if there isn't one -// currently available. -// -// The returned slice is available until Flush() is next called. After that, it -// must not be touched. -func (r *Rx) Pull() []byte { - if r.head == r.tail+jump { - // We've already pulled the whole pipe. - return nil - } - - header := r.p.readAtomic(r.head) - if header&slotFree != 0 { - // The next slot is free, we can't pull it yet. - return nil - } - - payloadSize := header & slotSizeMask - newHead := r.head + payloadToSlotSize(payloadSize) - headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer)) - - // Check if this is a wrapping slot. If that's the case, it carries no - // data, so we just skip it and try again from the first slot. - if int64(newHead-headWrap) >= 0 { - if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 { - return nil - } - - if r.tail == r.head { - // If this is the first pull since the last Flush() - // call, we flush the state so that the sender can use - // this space if it needs to. - r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head)) - r.tail = newHead - } - - r.head = newHead - return r.Pull() - } - - // Grab the buffer before updating r.head. - b := r.p.data(r.head, payloadSize) - r.head = newHead - return b -} - -// Flush tells the transmitter that all buffers pulled since the last Flush() -// have been used, so the transmitter is free to used their slots for further -// transmission. -func (r *Rx) Flush() { - if r.head == r.tail { - return - } - r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail)) - r.tail = r.head -} - -// Bytes returns the byte slice on which the pipe operates. -func (r *Rx) Bytes() []byte { - return r.p.buffer -} diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go deleted file mode 100644 index 9841eb231..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/tx.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Tx is the transmit side of the shared memory ring buffer. -type Tx struct { - p pipe - maxPayloadSize uint64 - - head uint64 - tail uint64 - next uint64 - - tailHeader uint64 -} - -// Init initializes the transmit end of the pipe. In the initial state, the next -// slot to be written is the very first one, and the transmitter has the whole -// ring buffer available to it. -func (t *Tx) Init(b []byte) { - t.p.init(b) - // maxPayloadSize excludes the header of the payload, and the header - // of the wrapping message. - t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader - t.tail = 0xfffffffe * jump - t.next = t.tail - t.head = t.tail + jump - t.p.write(t.tail, slotFree) -} - -// Capacity determines how many records of the given size can be written to the -// pipe before it fills up. -func (t *Tx) Capacity(recordSize uint64) uint64 { - available := uint64(len(t.p.buffer)) - sizeOfSlotHeader - entryLen := payloadToSlotSize(recordSize) - return available / entryLen -} - -// Push reserves "payloadSize" bytes for transmission in the pipe. The caller -// populates the returned slice with the data to be transferred and enventually -// calls Flush() to make the data visible to the reader, or Abort() to make the -// pipe forget all Push() calls since the last Flush(). -// -// The returned slice is available until Flush() or Abort() is next called. -// After that, it must not be touched. -func (t *Tx) Push(payloadSize uint64) []byte { - // Fail request if we know we will never have enough room. - if payloadSize > t.maxPayloadSize { - return nil - } - - totalLen := payloadToSlotSize(payloadSize) - newNext := t.next + totalLen - nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer)) - if int64(newNext-nextWrap) >= 0 { - // The new buffer would overflow the pipe, so we push a wrapping - // slot, then try to add the actual slot to the front of the - // pipe. - newNext = (newNext & revolutionMask) + jump - wrappingPayloadSize := slotToPayloadSize(newNext - t.next) - if !t.reclaim(newNext) { - return nil - } - - oldNext := t.next - t.next = newNext - if oldNext != t.tail { - t.p.write(oldNext, wrappingPayloadSize) - } else { - t.tailHeader = wrappingPayloadSize - t.Flush() - } - - newNext += totalLen - } - - // Check that we have enough room for the buffer. - if !t.reclaim(newNext) { - return nil - } - - if t.next != t.tail { - t.p.write(t.next, payloadSize) - } else { - t.tailHeader = payloadSize - } - - // Grab the buffer before updating t.next. - b := t.p.data(t.next, payloadSize) - t.next = newNext - - return b -} - -// reclaim attempts to advance the head until at least newNext. If the head is -// already at or beyond newNext, nothing happens and true is returned; otherwise -// it tries to reclaim slots that have already been consumed by the receive end -// of the pipe (they will be marked as free) and returns a boolean indicating -// whether it was successful in reclaiming enough slots. -func (t *Tx) reclaim(newNext uint64) bool { - for int64(newNext-t.head) > 0 { - // Can't reclaim if slot is not free. - header := t.p.readAtomic(t.head) - if header&slotFree == 0 { - return false - } - - payloadSize := header & slotSizeMask - newHead := t.head + payloadToSlotSize(payloadSize) - - // Check newHead is within bounds and valid. - if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) { - return false - } - - t.head = newHead - } - - return true -} - -// Abort causes all Push() calls since the last Flush() to be forgotten and -// therefore they will not be made visible to the receiver. -func (t *Tx) Abort() { - t.next = t.tail -} - -// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever -// is the most recent] to be made visible to the receiver. -func (t *Tx) Flush() { - if t.next == t.tail { - // Nothing to do if there are no pushed buffers. - return - } - - if t.next != t.head { - // The receiver will spin in t.next, so we must make sure that - // the slotFree bit is set. - t.p.write(t.next, slotFree) - } - - t.p.writeAtomic(t.tail, t.tailHeader) - t.tail = t.next -} - -// Bytes returns the byte slice on which the pipe operates. -func (t *Tx) Bytes() []byte { - return t.p.buffer -} |