From 463f4217d109ded8af758fe51a5daf8670da9794 Mon Sep 17 00:00:00 2001 From: Adin Scannell Date: Fri, 28 Feb 2020 12:28:10 -0800 Subject: Make pipe buffer implementation standard. A follow-up change will convert the networking code to use this standard pipe implementation. PiperOrigin-RevId: 297903206 --- pkg/buffer/buffer.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 pkg/buffer/buffer.go (limited to 'pkg/buffer/buffer.go') diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go new file mode 100644 index 000000000..d5f64609b --- /dev/null +++ b/pkg/buffer/buffer.go @@ -0,0 +1,67 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package buffer provides the implementation of a buffer view. +package buffer + +import ( + "sync" +) + +const bufferSize = 8144 // See below. + +// Buffer encapsulates a queueable byte buffer. +// +// Note that the total size is slightly less than two pages. This is done +// intentionally to ensure that the buffer object aligns with runtime +// internals. We have no hard size or alignment requirements. This two page +// size will effectively minimize internal fragmentation, but still have a +// large enough chunk to limit excessive segmentation. +// +// +stateify savable +type Buffer struct { + data [bufferSize]byte + read int + write int + bufferEntry +} + +// Reset resets internal data. +// +// This must be called before use. +func (b *Buffer) Reset() { + b.read = 0 + b.write = 0 +} + +// Empty indicates the buffer is empty. +// +// This indicates there is no data left to read. +func (b *Buffer) Empty() bool { + return b.read == b.write +} + +// Full indicates the buffer is full. +// +// This indicates there is no capacity left to write. +func (b *Buffer) Full() bool { + return b.write == len(b.data) +} + +// bufferPool is a pool for buffers. +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(Buffer) + }, +} -- cgit v1.2.3 From 61051f226889f51fb97bd44131899a3c502b4c42 Mon Sep 17 00:00:00 2001 From: Adin Scannell Date: Wed, 11 Mar 2020 19:50:59 -0700 Subject: Clean-up buffer implementation. This also adds substantial test cases. The Read/Write interfaces are dropped as they are not necessary. PiperOrigin-RevId: 300461547 --- pkg/buffer/BUILD | 10 +- pkg/buffer/buffer.go | 55 +++-- pkg/buffer/safemem.go | 30 ++- pkg/buffer/safemem_test.go | 170 +++++++++++++++ pkg/buffer/view.go | 214 ++++++++++--------- pkg/buffer/view_test.go | 510 +++++++++++++++++++++++++++++++++------------ 6 files changed, 715 insertions(+), 274 deletions(-) create mode 100644 pkg/buffer/safemem_test.go (limited to 'pkg/buffer/buffer.go') diff --git a/pkg/buffer/BUILD b/pkg/buffer/BUILD index a77a3beea..dcd086298 100644 --- a/pkg/buffer/BUILD +++ b/pkg/buffer/BUILD @@ -10,8 +10,8 @@ go_template_instance( prefix = "buffer", template = "//pkg/ilist:generic_list", types = { - "Element": "*Buffer", - "Linker": "*Buffer", + "Element": "*buffer", + "Linker": "*buffer", }, ) @@ -34,6 +34,10 @@ go_library( go_test( name = "buffer_test", size = "small", - srcs = ["view_test.go"], + srcs = [ + "safemem_test.go", + "view_test.go", + ], library = ":buffer", + deps = ["//pkg/safemem"], ) diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index d5f64609b..c6d089fd9 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -13,6 +13,10 @@ // limitations under the License. // Package buffer provides the implementation of a buffer view. +// +// A view is an flexible buffer, backed by a pool, supporting the safecopy +// operations natively as well as the ability to grow via either prepend or +// append, as well as shrink. package buffer import ( @@ -21,7 +25,7 @@ import ( const bufferSize = 8144 // See below. -// Buffer encapsulates a queueable byte buffer. +// buffer encapsulates a queueable byte buffer. // // Note that the total size is slightly less than two pages. This is done // intentionally to ensure that the buffer object aligns with runtime @@ -30,38 +34,61 @@ const bufferSize = 8144 // See below. // large enough chunk to limit excessive segmentation. // // +stateify savable -type Buffer struct { +type buffer struct { data [bufferSize]byte read int write int bufferEntry } -// Reset resets internal data. +// reset resets internal data. // -// This must be called before use. -func (b *Buffer) Reset() { +// This must be called before returning the buffer to the pool. +func (b *buffer) Reset() { b.read = 0 b.write = 0 } -// Empty indicates the buffer is empty. -// -// This indicates there is no data left to read. -func (b *Buffer) Empty() bool { - return b.read == b.write -} - // Full indicates the buffer is full. // // This indicates there is no capacity left to write. -func (b *Buffer) Full() bool { +func (b *buffer) Full() bool { return b.write == len(b.data) } +// ReadSize returns the number of bytes available for reading. +func (b *buffer) ReadSize() int { + return b.write - b.read +} + +// ReadMove advances the read index by the given amount. +func (b *buffer) ReadMove(n int) { + b.read += n +} + +// ReadSlice returns the read slice for this buffer. +func (b *buffer) ReadSlice() []byte { + return b.data[b.read:b.write] +} + +// WriteSize returns the number of bytes available for writing. +func (b *buffer) WriteSize() int { + return len(b.data) - b.write +} + +// WriteMove advances the write index by the given amount. +func (b *buffer) WriteMove(n int) { + b.write += n +} + +// WriteSlice returns the write slice for this buffer. +func (b *buffer) WriteSlice() []byte { + return b.data[b.write:] +} + // bufferPool is a pool for buffers. var bufferPool = sync.Pool{ New: func() interface{} { - return new(Buffer) + return new(buffer) }, } diff --git a/pkg/buffer/safemem.go b/pkg/buffer/safemem.go index 071aaa488..0e5b86344 100644 --- a/pkg/buffer/safemem.go +++ b/pkg/buffer/safemem.go @@ -15,19 +15,17 @@ package buffer import ( - "io" - "gvisor.dev/gvisor/pkg/safemem" ) // WriteBlock returns this buffer as a write Block. -func (b *Buffer) WriteBlock() safemem.Block { - return safemem.BlockFromSafeSlice(b.data[b.write:]) +func (b *buffer) WriteBlock() safemem.Block { + return safemem.BlockFromSafeSlice(b.WriteSlice()) } // ReadBlock returns this buffer as a read Block. -func (b *Buffer) ReadBlock() safemem.Block { - return safemem.BlockFromSafeSlice(b.data[b.read:b.write]) +func (b *buffer) ReadBlock() safemem.Block { + return safemem.BlockFromSafeSlice(b.ReadSlice()) } // WriteFromBlocks implements safemem.Writer.WriteFromBlocks. @@ -47,21 +45,21 @@ func (v *View) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { // Need at least one buffer. firstBuf := v.data.Back() if firstBuf == nil { - firstBuf = bufferPool.Get().(*Buffer) + firstBuf = bufferPool.Get().(*buffer) v.data.PushBack(firstBuf) } // Does the last block have sufficient capacity alone? - if l := len(firstBuf.data) - firstBuf.write; l >= need { + if l := firstBuf.WriteSize(); l >= need { dst = safemem.BlockSeqOf(firstBuf.WriteBlock()) } else { // Append blocks until sufficient. need -= l blocks = append(blocks, firstBuf.WriteBlock()) for need > 0 { - emptyBuf := bufferPool.Get().(*Buffer) + emptyBuf := bufferPool.Get().(*buffer) v.data.PushBack(emptyBuf) - need -= len(emptyBuf.data) // Full block. + need -= emptyBuf.WriteSize() blocks = append(blocks, emptyBuf.WriteBlock()) } dst = safemem.BlockSeqFromSlice(blocks) @@ -73,11 +71,11 @@ func (v *View) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { // Update all indices. for left := int(n); left > 0; firstBuf = firstBuf.Next() { - if l := len(firstBuf.data) - firstBuf.write; left >= l { - firstBuf.write += l // Whole block. + if l := firstBuf.WriteSize(); left >= l { + firstBuf.WriteMove(l) // Whole block. left -= l } else { - firstBuf.write += left // Partial block. + firstBuf.WriteMove(left) // Partial block. left = 0 } } @@ -103,18 +101,18 @@ func (v *View) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { firstBuf := v.data.Front() if firstBuf == nil { - return 0, io.EOF + return 0, nil // No EOF. } // Is all the data in a single block? - if l := firstBuf.write - firstBuf.read; l >= need { + if l := firstBuf.ReadSize(); l >= need { src = safemem.BlockSeqOf(firstBuf.ReadBlock()) } else { // Build a list of all the buffers. need -= l blocks = append(blocks, firstBuf.ReadBlock()) for buf := firstBuf.Next(); buf != nil && need > 0; buf = buf.Next() { - need -= buf.write - buf.read + need -= buf.ReadSize() blocks = append(blocks, buf.ReadBlock()) } src = safemem.BlockSeqFromSlice(blocks) diff --git a/pkg/buffer/safemem_test.go b/pkg/buffer/safemem_test.go new file mode 100644 index 000000000..47f357e0c --- /dev/null +++ b/pkg/buffer/safemem_test.go @@ -0,0 +1,170 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "bytes" + "strings" + "testing" + + "gvisor.dev/gvisor/pkg/safemem" +) + +func TestSafemem(t *testing.T) { + testCases := []struct { + name string + input string + output string + readLen int + op func(*View) + }{ + // Basic coverage. + { + name: "short", + input: "010", + output: "010", + }, + { + name: "long", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: "0" + strings.Repeat("1", bufferSize) + "0", + }, + { + name: "short-read", + input: "0", + readLen: 100, // > size. + output: "0", + }, + { + name: "zero-read", + input: "0", + output: "", + }, + { + name: "read-empty", + input: "", + readLen: 1, // > size. + output: "", + }, + + // Ensure offsets work. + { + name: "offsets-short", + input: "012", + output: "2", + op: func(v *View) { + v.TrimFront(2) + }, + }, + { + name: "offsets-long0", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: strings.Repeat("1", bufferSize) + "0", + op: func(v *View) { + v.TrimFront(1) + }, + }, + { + name: "offsets-long1", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: strings.Repeat("1", bufferSize-1) + "0", + op: func(v *View) { + v.TrimFront(2) + }, + }, + { + name: "offsets-long2", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: "10", + op: func(v *View) { + v.TrimFront(bufferSize) + }, + }, + + // Ensure truncation works. + { + name: "truncate-short", + input: "012", + output: "01", + op: func(v *View) { + v.Truncate(2) + }, + }, + { + name: "truncate-long0", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: "0" + strings.Repeat("1", bufferSize), + op: func(v *View) { + v.Truncate(bufferSize + 1) + }, + }, + { + name: "truncate-long1", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: "0" + strings.Repeat("1", bufferSize-1), + op: func(v *View) { + v.Truncate(bufferSize) + }, + }, + { + name: "truncate-long2", + input: "0" + strings.Repeat("1", bufferSize) + "0", + output: "01", + op: func(v *View) { + v.Truncate(2) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct the new view. + var view View + bs := safemem.BlockSeqOf(safemem.BlockFromSafeSlice([]byte(tc.input))) + n, err := view.WriteFromBlocks(bs) + if err != nil { + t.Errorf("expected err nil, got %v", err) + } + if n != uint64(len(tc.input)) { + t.Errorf("expected %d bytes, got %d", len(tc.input), n) + } + + // Run the operation. + if tc.op != nil { + tc.op(&view) + } + + // Read and validate. + readLen := tc.readLen + if readLen == 0 { + readLen = len(tc.output) // Default. + } + out := make([]byte, readLen) + bs = safemem.BlockSeqOf(safemem.BlockFromSafeSlice(out)) + n, err = view.ReadToBlocks(bs) + if err != nil { + t.Errorf("expected nil, got %v", err) + } + if n != uint64(len(tc.output)) { + t.Errorf("expected %d bytes, got %d", len(tc.output), n) + } + + // Ensure the contents are correct. + if !bytes.Equal(out[:n], []byte(tc.output[:n])) { + t.Errorf("contents are wrong: expected %q, got %q", tc.output, string(out)) + } + }) + } +} diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go index 00fc11e9c..e6901eadb 100644 --- a/pkg/buffer/view.go +++ b/pkg/buffer/view.go @@ -38,14 +38,6 @@ func (v *View) TrimFront(count int64) { } } -// Read implements io.Reader.Read. -// -// Note that reading does not advance the read index. This must be done -// manually using TrimFront or other methods. -func (v *View) Read(p []byte) (int, error) { - return v.ReadAt(p, 0) -} - // ReadAt implements io.ReaderAt.ReadAt. func (v *View) ReadAt(p []byte, offset int64) (int, error) { var ( @@ -54,54 +46,46 @@ func (v *View) ReadAt(p []byte, offset int64) (int, error) { ) for buf := v.data.Front(); buf != nil && done < int64(len(p)); buf = buf.Next() { needToSkip := int(offset - skipped) - if l := buf.write - buf.read; l <= needToSkip { - skipped += int64(l) + if sz := buf.ReadSize(); sz <= needToSkip { + skipped += int64(sz) continue } // Actually read data. - n := copy(p[done:], buf.data[buf.read+needToSkip:buf.write]) + n := copy(p[done:], buf.ReadSlice()[needToSkip:]) skipped += int64(needToSkip) done += int64(n) } - if int(done) < len(p) { + if int(done) < len(p) || offset+done == v.size { return int(done), io.EOF } return int(done), nil } -// Write implements io.Writer.Write. -func (v *View) Write(p []byte) (int, error) { - v.Append(p) // Does not fail. - return len(p), nil -} - // advanceRead advances the view's read index. // // Precondition: there must be sufficient bytes in the buffer. func (v *View) advanceRead(count int64) { for buf := v.data.Front(); buf != nil && count > 0; { - l := int64(buf.write - buf.read) - if l > count { + sz := int64(buf.ReadSize()) + if sz > count { // There is still data for reading. - buf.read += int(count) + buf.ReadMove(int(count)) v.size -= count count = 0 break } - // Read from this buffer. - buf.read += int(l) - count -= l - v.size -= l - - // When all data has been read from a buffer, we push - // it into the empty buffer pool for reuse. + // Consume the whole buffer. oldBuf := buf buf = buf.Next() // Iterate. v.data.Remove(oldBuf) oldBuf.Reset() bufferPool.Put(oldBuf) + + // Update counts. + count -= sz + v.size -= sz } if count > 0 { panic(fmt.Sprintf("advanceRead still has %d bytes remaining", count)) @@ -109,37 +93,39 @@ func (v *View) advanceRead(count int64) { } // Truncate truncates the view to the given bytes. +// +// This will not grow the view, only shrink it. If a length is passed that is +// greater than the current size of the view, then nothing will happen. +// +// Precondition: length must be >= 0. func (v *View) Truncate(length int64) { - if length < 0 || length >= v.size { + if length < 0 { + panic("negative length provided") + } + if length >= v.size { return // Nothing to do. } for buf := v.data.Back(); buf != nil && v.size > length; buf = v.data.Back() { - l := int64(buf.write - buf.read) // Local bytes. - switch { - case v.size-l >= length: - // Drop the buffer completely; see above. - v.data.Remove(buf) - v.size -= l - buf.Reset() - bufferPool.Put(buf) - - case v.size > length && v.size-l < length: - // Just truncate the buffer locally. - delta := (length - (v.size - l)) - buf.write = buf.read + int(delta) + sz := int64(buf.ReadSize()) + if after := v.size - sz; after < length { + // Truncate the buffer locally. + left := (length - after) + buf.write = buf.read + int(left) v.size = length - - default: - // Should never happen. - panic("invalid buffer during truncation") + break } + + // Drop the buffer completely; see above. + v.data.Remove(buf) + buf.Reset() + bufferPool.Put(buf) + v.size -= sz } - v.size = length // Save the new size. } -// Grow grows the given view to the number of bytes. If zero -// is true, all these bytes will be zero. If zero is false, -// then this is the caller's responsibility. +// Grow grows the given view to the number of bytes, which will be appended. If +// zero is true, all these bytes will be zero. If zero is false, then this is +// the caller's responsibility. // // Precondition: length must be >= 0. func (v *View) Grow(length int64, zero bool) { @@ -149,29 +135,29 @@ func (v *View) Grow(length int64, zero bool) { for v.size < length { buf := v.data.Back() - // Is there at least one buffer? + // Is there some space in the last buffer? if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Write up to length bytes. - l := len(buf.data) - buf.write - if int64(l) > length-v.size { - l = int(length - v.size) + sz := buf.WriteSize() + if int64(sz) > length-v.size { + sz = int(length - v.size) } // Zero the written section; note that this pattern is // specifically recognized and optimized by the compiler. if zero { - for i := buf.write; i < buf.write+l; i++ { + for i := buf.write; i < buf.write+sz; i++ { buf.data[i] = 0 } } // Advance the index. - buf.write += l - v.size += int64(l) + buf.WriteMove(sz) + v.size += int64(sz) } } @@ -181,31 +167,40 @@ func (v *View) Prepend(data []byte) { if buf := v.data.Front(); buf != nil && buf.read > 0 { // Fill up before the first write. avail := buf.read - copy(buf.data[0:], data[len(data)-avail:]) - data = data[:len(data)-avail] - v.size += int64(avail) + bStart := 0 + dStart := len(data) - avail + if avail > len(data) { + bStart = avail - len(data) + dStart = 0 + } + n := copy(buf.data[bStart:], data[dStart:]) + data = data[:dStart] + v.size += int64(n) + buf.read -= n } for len(data) > 0 { // Do we need an empty buffer? - buf := bufferPool.Get().(*Buffer) + buf := bufferPool.Get().(*buffer) v.data.PushFront(buf) // The buffer is empty; copy last chunk. - start := len(data) - len(buf.data) - if start < 0 { - start = 0 // Everything. + avail := len(buf.data) + bStart := 0 + dStart := len(data) - avail + if avail > len(data) { + bStart = avail - len(data) + dStart = 0 } // We have to put the data at the end of the current // buffer in order to ensure that the next prepend will // correctly fill up the beginning of this buffer. - bStart := len(buf.data) - len(data[start:]) - n := copy(buf.data[bStart:], data[start:]) - buf.read = bStart - buf.write = len(buf.data) - data = data[:start] + n := copy(buf.data[bStart:], data[dStart:]) + data = data[:dStart] v.size += int64(n) + buf.read = len(buf.data) - n + buf.write = len(buf.data) } } @@ -214,16 +209,16 @@ func (v *View) Append(data []byte) { for done := 0; done < len(data); { buf := v.data.Back() - // Find the first empty buffer. + // Ensure there's a buffer with space. if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Copy in to the given buffer. - n := copy(buf.data[buf.write:], data[done:]) + n := copy(buf.WriteSlice(), data[done:]) done += n - buf.write += n + buf.WriteMove(n) v.size += int64(n) } } @@ -232,52 +227,52 @@ func (v *View) Append(data []byte) { // // This method should not be used in any performance-sensitive paths. It may // allocate a fresh byte slice sufficiently large to contain all the data in -// the buffer. +// the buffer. This is principally for debugging. // // N.B. Tee data still belongs to this view, as if there is a single buffer // present, then it will be returned directly. This should be used for // temporary use only, and a reference to the given slice should not be held. func (v *View) Flatten() []byte { - if buf := v.data.Front(); buf.Next() == nil { - return buf.data[buf.read:buf.write] // Only one buffer. + if buf := v.data.Front(); buf == nil { + return nil // No data at all. + } else if buf.Next() == nil { + return buf.ReadSlice() // Only one buffer. } data := make([]byte, 0, v.size) // Need to flatten. for buf := v.data.Front(); buf != nil; buf = buf.Next() { // Copy to the allocated slice. - data = append(data, buf.data[buf.read:buf.write]...) + data = append(data, buf.ReadSlice()...) } return data } // Size indicates the total amount of data available in this view. -func (v *View) Size() (sz int64) { - sz = v.size // Pre-calculated. - return sz +func (v *View) Size() int64 { + return v.size } // Copy makes a strict copy of this view. func (v *View) Copy() (other View) { for buf := v.data.Front(); buf != nil; buf = buf.Next() { - other.Append(buf.data[buf.read:buf.write]) + other.Append(buf.ReadSlice()) } - return other + return } // Apply applies the given function across all valid data. func (v *View) Apply(fn func([]byte)) { for buf := v.data.Front(); buf != nil; buf = buf.Next() { - if l := int64(buf.write - buf.read); l > 0 { - fn(buf.data[buf.read:buf.write]) - } + fn(buf.ReadSlice()) } } // Merge merges the provided View with this one. // -// The other view will be empty after this operation. +// The other view will be appended to v, and other will be empty after this +// operation completes. func (v *View) Merge(other *View) { // Copy over all buffers. - for buf := other.data.Front(); buf != nil && !buf.Empty(); buf = other.data.Front() { + for buf := other.data.Front(); buf != nil; buf = other.data.Front() { other.data.Remove(buf) v.data.PushBack(buf) } @@ -288,6 +283,9 @@ func (v *View) Merge(other *View) { } // WriteFromReader writes to the buffer from an io.Reader. +// +// A minimum read size equal to unsafe.Sizeof(unintptr) is enforced, +// provided that count is greater than or equal to unsafe.Sizeof(uintptr). func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { var ( done int64 @@ -297,17 +295,17 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { for done < count { buf := v.data.Back() - // Find the first empty buffer. + // Ensure we have an empty buffer. if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Is this less than the minimum batch? - if len(buf.data[buf.write:]) < minBatch && (count-done) >= int64(minBatch) { + if buf.WriteSize() < minBatch && (count-done) >= int64(minBatch) { tmp := make([]byte, minBatch) n, err = r.Read(tmp) - v.Write(tmp[:n]) + v.Append(tmp[:n]) done += int64(n) if err != nil { break @@ -316,14 +314,14 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { } // Limit the read, if necessary. - end := len(buf.data) - if int64(end-buf.write) > (count - done) { - end = buf.write + int(count-done) + sz := buf.WriteSize() + if left := count - done; int64(sz) > left { + sz = int(left) } // Pass the relevant portion of the buffer. - n, err = r.Read(buf.data[buf.write:end]) - buf.write += n + n, err = r.Read(buf.WriteSlice()[:sz]) + buf.WriteMove(n) done += int64(n) v.size += int64(n) if err == io.EOF { @@ -340,6 +338,9 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { // // N.B. This does not consume the bytes read. TrimFront should // be called appropriately after this call in order to do so. +// +// A minimum write size equal to unsafe.Sizeof(unintptr) is enforced, +// provided that count is greater than or equal to unsafe.Sizeof(uintptr). func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { var ( done int64 @@ -348,15 +349,22 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { ) offset := 0 // Spill-over for batching. for buf := v.data.Front(); buf != nil && done < count; buf = buf.Next() { - l := buf.write - buf.read - offset + // Has this been consumed? Skip it. + sz := buf.ReadSize() + if sz <= offset { + offset -= sz + continue + } + sz -= offset // Is this less than the minimum batch? - if l < minBatch && (count-done) >= int64(minBatch) && (v.size-done) >= int64(minBatch) { + left := count - done + if sz < minBatch && left >= int64(minBatch) && (v.size-done) >= int64(minBatch) { tmp := make([]byte, minBatch) n, err = v.ReadAt(tmp, done) w.Write(tmp[:n]) done += int64(n) - offset = n - l // Reset below. + offset = n - sz // Reset below. if err != nil { break } @@ -364,12 +372,12 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { } // Limit the write if necessary. - if int64(l) >= (count - done) { - l = int(count - done) + if int64(sz) >= left { + sz = int(left) } // Perform the actual write. - n, err = w.Write(buf.data[buf.read+offset : buf.read+offset+l]) + n, err = w.Write(buf.ReadSlice()[offset : offset+sz]) done += int64(n) if err != nil { break diff --git a/pkg/buffer/view_test.go b/pkg/buffer/view_test.go index 37e652f16..3db1bc6ee 100644 --- a/pkg/buffer/view_test.go +++ b/pkg/buffer/view_test.go @@ -16,218 +16,452 @@ package buffer import ( "bytes" + "io" "strings" "testing" ) +func fillAppend(v *View, data []byte) { + v.Append(data) +} + +func fillAppendEnd(v *View, data []byte) { + v.Grow(bufferSize-1, false) + v.Append(data) + v.TrimFront(bufferSize - 1) +} + +func fillWriteFromReader(v *View, data []byte) { + b := bytes.NewBuffer(data) + v.WriteFromReader(b, int64(len(data))) +} + +func fillWriteFromReaderEnd(v *View, data []byte) { + v.Grow(bufferSize-1, false) + b := bytes.NewBuffer(data) + v.WriteFromReader(b, int64(len(data))) + v.TrimFront(bufferSize - 1) +} + +var fillFuncs = map[string]func(*View, []byte){ + "append": fillAppend, + "appendEnd": fillAppendEnd, + "writeFromReader": fillWriteFromReader, + "writeFromReaderEnd": fillWriteFromReaderEnd, +} + +func testReadAt(t *testing.T, v *View, offset int64, n int, wantStr string, wantErr error) { + t.Helper() + d := make([]byte, n) + n, err := v.ReadAt(d, offset) + if n != len(wantStr) { + t.Errorf("got %d, want %d", n, len(wantStr)) + } + if err != wantErr { + t.Errorf("got err %v, want %v", err, wantErr) + } + if !bytes.Equal(d[:n], []byte(wantStr)) { + t.Errorf("got %q, want %q", string(d[:n]), wantStr) + } +} + func TestView(t *testing.T) { testCases := []struct { name string input string output string - ops []func(*View) + op func(*testing.T, *View) }{ - // Prepend. + // Preconditions. + { + name: "truncate-check", + input: "hello", + output: "hello", // Not touched. + op: func(t *testing.T, v *View) { + defer func() { + if r := recover(); r == nil { + t.Errorf("Truncate(-1) did not panic") + } + }() + v.Truncate(-1) + }, + }, + { + name: "grow-check", + input: "hello", + output: "hello", // Not touched. + op: func(t *testing.T, v *View) { + defer func() { + if r := recover(); r == nil { + t.Errorf("Grow(-1) did not panic") + } + }() + v.Grow(-1, false) + }, + }, { - name: "prepend", - input: "world", - ops: []func(*View){ - func(v *View) { - v.Prepend([]byte("hello ")) - }, + name: "advance-check", + input: "hello", + output: "", // Consumed. + op: func(t *testing.T, v *View) { + defer func() { + if r := recover(); r == nil { + t.Errorf("advanceRead(Size()+1) did not panic") + } + }() + v.advanceRead(v.Size() + 1) }, + }, + + // Prepend. + { + name: "prepend", + input: "world", output: "hello world", + op: func(t *testing.T, v *View) { + v.Prepend([]byte("hello ")) + }, }, { - name: "prepend fill", - input: strings.Repeat("1", bufferSize-1), - ops: []func(*View){ - func(v *View) { - v.Prepend([]byte("0")) - }, + name: "prepend-backfill-full", + input: "hello world", + output: "jello world", + op: func(t *testing.T, v *View) { + v.TrimFront(1) + v.Prepend([]byte("j")) }, - output: "0" + strings.Repeat("1", bufferSize-1), }, { - name: "prepend overflow", - input: strings.Repeat("1", bufferSize), - ops: []func(*View){ - func(v *View) { - v.Prepend([]byte("0")) - }, + name: "prepend-backfill-under", + input: "hello world", + output: "hola world", + op: func(t *testing.T, v *View) { + v.TrimFront(5) + v.Prepend([]byte("hola")) }, - output: "0" + strings.Repeat("1", bufferSize), }, { - name: "prepend multiple buffers", - input: strings.Repeat("1", bufferSize-1), - ops: []func(*View){ - func(v *View) { - v.Prepend([]byte(strings.Repeat("0", bufferSize*3))) - }, + name: "prepend-backfill-over", + input: "hello world", + output: "smello world", + op: func(t *testing.T, v *View) { + v.TrimFront(1) + v.Prepend([]byte("sm")) }, + }, + { + name: "prepend-fill", + input: strings.Repeat("1", bufferSize-1), + output: "0" + strings.Repeat("1", bufferSize-1), + op: func(t *testing.T, v *View) { + v.Prepend([]byte("0")) + }, + }, + { + name: "prepend-overflow", + input: strings.Repeat("1", bufferSize), + output: "0" + strings.Repeat("1", bufferSize), + op: func(t *testing.T, v *View) { + v.Prepend([]byte("0")) + }, + }, + { + name: "prepend-multiple-buffers", + input: strings.Repeat("1", bufferSize-1), output: strings.Repeat("0", bufferSize*3) + strings.Repeat("1", bufferSize-1), + op: func(t *testing.T, v *View) { + v.Prepend([]byte(strings.Repeat("0", bufferSize*3))) + }, }, - // Append. + // Append and write. { - name: "append", - input: "hello", - ops: []func(*View){ - func(v *View) { - v.Append([]byte(" world")) - }, - }, + name: "append", + input: "hello", output: "hello world", + op: func(t *testing.T, v *View) { + v.Append([]byte(" world")) + }, }, { - name: "append fill", - input: strings.Repeat("1", bufferSize-1), - ops: []func(*View){ - func(v *View) { - v.Append([]byte("0")) - }, - }, + name: "append-fill", + input: strings.Repeat("1", bufferSize-1), output: strings.Repeat("1", bufferSize-1) + "0", + op: func(t *testing.T, v *View) { + v.Append([]byte("0")) + }, }, { - name: "append overflow", - input: strings.Repeat("1", bufferSize), - ops: []func(*View){ - func(v *View) { - v.Append([]byte("0")) - }, - }, + name: "append-overflow", + input: strings.Repeat("1", bufferSize), output: strings.Repeat("1", bufferSize) + "0", + op: func(t *testing.T, v *View) { + v.Append([]byte("0")) + }, }, { - name: "append multiple buffers", - input: strings.Repeat("1", bufferSize-1), - ops: []func(*View){ - func(v *View) { - v.Append([]byte(strings.Repeat("0", bufferSize*3))) - }, - }, + name: "append-multiple-buffers", + input: strings.Repeat("1", bufferSize-1), output: strings.Repeat("1", bufferSize-1) + strings.Repeat("0", bufferSize*3), + op: func(t *testing.T, v *View) { + v.Append([]byte(strings.Repeat("0", bufferSize*3))) + }, }, // Truncate. { - name: "truncate", - input: "hello world", - ops: []func(*View){ - func(v *View) { - v.Truncate(5) - }, - }, + name: "truncate", + input: "hello world", output: "hello", + op: func(t *testing.T, v *View) { + v.Truncate(5) + }, }, { - name: "truncate multiple buffers", - input: strings.Repeat("1", bufferSize*2), - ops: []func(*View){ - func(v *View) { - v.Truncate(bufferSize*2 - 1) - }, + name: "truncate-noop", + input: "hello world", + output: "hello world", + op: func(t *testing.T, v *View) { + v.Truncate(v.Size() + 1) }, - output: strings.Repeat("1", bufferSize*2-1), }, { - name: "truncate multiple buffers to one buffer", - input: strings.Repeat("1", bufferSize*2), - ops: []func(*View){ - func(v *View) { - v.Truncate(5) - }, + name: "truncate-multiple-buffers", + input: strings.Repeat("1", bufferSize*2), + output: strings.Repeat("1", bufferSize*2-1), + op: func(t *testing.T, v *View) { + v.Truncate(bufferSize*2 - 1) }, + }, + { + name: "truncate-multiple-buffers-to-one", + input: strings.Repeat("1", bufferSize*2), output: "11111", + op: func(t *testing.T, v *View) { + v.Truncate(5) + }, }, // TrimFront. { - name: "trim", - input: "hello world", - ops: []func(*View){ - func(v *View) { - v.TrimFront(6) - }, - }, + name: "trim", + input: "hello world", output: "world", + op: func(t *testing.T, v *View) { + v.TrimFront(6) + }, }, { - name: "trim multiple buffers", - input: strings.Repeat("1", bufferSize*2), - ops: []func(*View){ - func(v *View) { - v.TrimFront(1) - }, + name: "trim-too-large", + input: "hello world", + output: "", + op: func(t *testing.T, v *View) { + v.TrimFront(v.Size() + 1) }, - output: strings.Repeat("1", bufferSize*2-1), }, { - name: "trim multiple buffers to one buffer", - input: strings.Repeat("1", bufferSize*2), - ops: []func(*View){ - func(v *View) { - v.TrimFront(bufferSize*2 - 1) - }, + name: "trim-multiple-buffers", + input: strings.Repeat("1", bufferSize*2), + output: strings.Repeat("1", bufferSize*2-1), + op: func(t *testing.T, v *View) { + v.TrimFront(1) }, + }, + { + name: "trim-multiple-buffers-to-one-buffer", + input: strings.Repeat("1", bufferSize*2), output: "1", + op: func(t *testing.T, v *View) { + v.TrimFront(bufferSize*2 - 1) + }, }, // Grow. { - name: "grow", - input: "hello world", - ops: []func(*View){ - func(v *View) { - v.Grow(1, true) - }, - }, + name: "grow", + input: "hello world", output: "hello world", + op: func(t *testing.T, v *View) { + v.Grow(1, true) + }, }, { - name: "grow from zero", - ops: []func(*View){ - func(v *View) { - v.Grow(1024, true) - }, - }, + name: "grow-from-zero", output: strings.Repeat("\x00", 1024), + op: func(t *testing.T, v *View) { + v.Grow(1024, true) + }, }, { - name: "grow from non-zero", - input: strings.Repeat("1", bufferSize), - ops: []func(*View){ - func(v *View) { - v.Grow(bufferSize*2, true) - }, - }, + name: "grow-from-non-zero", + input: strings.Repeat("1", bufferSize), output: strings.Repeat("1", bufferSize) + strings.Repeat("\x00", bufferSize), + op: func(t *testing.T, v *View) { + v.Grow(bufferSize*2, true) + }, + }, + + // Copy. + { + name: "copy", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { + other := v.Copy() + bs := other.Flatten() + want := []byte("hello") + if !bytes.Equal(bs, want) { + t.Errorf("expected %v, got %v", want, bs) + } + }, + }, + { + name: "copy-large", + input: strings.Repeat("1", bufferSize+1), + output: strings.Repeat("1", bufferSize+1), + op: func(t *testing.T, v *View) { + other := v.Copy() + bs := other.Flatten() + want := []byte(strings.Repeat("1", bufferSize+1)) + if !bytes.Equal(bs, want) { + t.Errorf("expected %v, got %v", want, bs) + } + }, + }, + + // Merge. + { + name: "merge", + input: "hello", + output: "hello world", + op: func(t *testing.T, v *View) { + var other View + other.Append([]byte(" world")) + v.Merge(&other) + if sz := other.Size(); sz != 0 { + t.Errorf("expected 0, got %d", sz) + } + }, + }, + { + name: "merge-large", + input: strings.Repeat("1", bufferSize+1), + output: strings.Repeat("1", bufferSize+1) + strings.Repeat("0", bufferSize+1), + op: func(t *testing.T, v *View) { + var other View + other.Append([]byte(strings.Repeat("0", bufferSize+1))) + v.Merge(&other) + if sz := other.Size(); sz != 0 { + t.Errorf("expected 0, got %d", sz) + } + }, + }, + + // ReadAt. + { + name: "readat", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 0, 6, "hello", io.EOF) }, + }, + { + name: "readat-long", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 0, 8, "hello", io.EOF) }, + }, + { + name: "readat-short", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 0, 3, "hel", nil) }, + }, + { + name: "readat-offset", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 2, 3, "llo", io.EOF) }, + }, + { + name: "readat-long-offset", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 2, 8, "llo", io.EOF) }, + }, + { + name: "readat-short-offset", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, 2, 2, "ll", nil) }, + }, + { + name: "readat-skip-all", + input: "hello", + output: "hello", + op: func(t *testing.T, v *View) { testReadAt(t, v, bufferSize+1, 1, "", io.EOF) }, + }, + { + name: "readat-second-buffer", + input: strings.Repeat("0", bufferSize+1) + "12", + output: strings.Repeat("0", bufferSize+1) + "12", + op: func(t *testing.T, v *View) { testReadAt(t, v, bufferSize+1, 1, "1", nil) }, + }, + { + name: "readat-second-buffer-end", + input: strings.Repeat("0", bufferSize+1) + "12", + output: strings.Repeat("0", bufferSize+1) + "12", + op: func(t *testing.T, v *View) { testReadAt(t, v, bufferSize+1, 2, "12", io.EOF) }, }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Construct the new view. - var view View - view.Append([]byte(tc.input)) - - // Run all operations. - for _, op := range tc.ops { - op(&view) - } - - // Flatten and validate. - out := view.Flatten() - if !bytes.Equal([]byte(tc.output), out) { - t.Errorf("expected %q, got %q", tc.output, string(out)) - } - - // Ensure the size is correct. - if len(out) != int(view.Size()) { - t.Errorf("size is wrong: expected %d, got %d", len(out), view.Size()) - } - }) + for fillName, fn := range fillFuncs { + t.Run(fillName+"/"+tc.name, func(t *testing.T) { + // Construct & fill the view. + var view View + fn(&view, []byte(tc.input)) + + // Run the operation. + if tc.op != nil { + tc.op(t, &view) + } + + // Flatten and validate. + out := view.Flatten() + if !bytes.Equal([]byte(tc.output), out) { + t.Errorf("expected %q, got %q", tc.output, string(out)) + } + + // Ensure the size is correct. + if len(out) != int(view.Size()) { + t.Errorf("size is wrong: expected %d, got %d", len(out), view.Size()) + } + + // Calculate contents via apply. + var appliedOut []byte + view.Apply(func(b []byte) { + appliedOut = append(appliedOut, b...) + }) + if len(appliedOut) != len(out) { + t.Errorf("expected %d, got %d", len(out), len(appliedOut)) + } + if !bytes.Equal(appliedOut, out) { + t.Errorf("expected %v, got %v", out, appliedOut) + } + + // Calculate contents via ReadToWriter. + var b bytes.Buffer + n, err := view.ReadToWriter(&b, int64(len(out))) + if n != int64(len(out)) { + t.Errorf("expected %d, got %d", len(out), n) + } + if err != nil { + t.Errorf("expected nil, got %v", err) + } + if !bytes.Equal(b.Bytes(), out) { + t.Errorf("expected %v, got %v", out, b.Bytes()) + } + }) + } } } -- cgit v1.2.3