diff options
Diffstat (limited to 'pkg/buffer/view.go')
-rw-r--r-- | pkg/buffer/view.go | 214 |
1 files changed, 111 insertions, 103 deletions
diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go index 00fc11e9c..e6901eadb 100644 --- a/pkg/buffer/view.go +++ b/pkg/buffer/view.go @@ -38,14 +38,6 @@ func (v *View) TrimFront(count int64) { } } -// Read implements io.Reader.Read. -// -// Note that reading does not advance the read index. This must be done -// manually using TrimFront or other methods. -func (v *View) Read(p []byte) (int, error) { - return v.ReadAt(p, 0) -} - // ReadAt implements io.ReaderAt.ReadAt. func (v *View) ReadAt(p []byte, offset int64) (int, error) { var ( @@ -54,54 +46,46 @@ func (v *View) ReadAt(p []byte, offset int64) (int, error) { ) for buf := v.data.Front(); buf != nil && done < int64(len(p)); buf = buf.Next() { needToSkip := int(offset - skipped) - if l := buf.write - buf.read; l <= needToSkip { - skipped += int64(l) + if sz := buf.ReadSize(); sz <= needToSkip { + skipped += int64(sz) continue } // Actually read data. - n := copy(p[done:], buf.data[buf.read+needToSkip:buf.write]) + n := copy(p[done:], buf.ReadSlice()[needToSkip:]) skipped += int64(needToSkip) done += int64(n) } - if int(done) < len(p) { + if int(done) < len(p) || offset+done == v.size { return int(done), io.EOF } return int(done), nil } -// Write implements io.Writer.Write. -func (v *View) Write(p []byte) (int, error) { - v.Append(p) // Does not fail. - return len(p), nil -} - // advanceRead advances the view's read index. // // Precondition: there must be sufficient bytes in the buffer. func (v *View) advanceRead(count int64) { for buf := v.data.Front(); buf != nil && count > 0; { - l := int64(buf.write - buf.read) - if l > count { + sz := int64(buf.ReadSize()) + if sz > count { // There is still data for reading. - buf.read += int(count) + buf.ReadMove(int(count)) v.size -= count count = 0 break } - // Read from this buffer. - buf.read += int(l) - count -= l - v.size -= l - - // When all data has been read from a buffer, we push - // it into the empty buffer pool for reuse. + // Consume the whole buffer. oldBuf := buf buf = buf.Next() // Iterate. v.data.Remove(oldBuf) oldBuf.Reset() bufferPool.Put(oldBuf) + + // Update counts. + count -= sz + v.size -= sz } if count > 0 { panic(fmt.Sprintf("advanceRead still has %d bytes remaining", count)) @@ -109,37 +93,39 @@ func (v *View) advanceRead(count int64) { } // Truncate truncates the view to the given bytes. +// +// This will not grow the view, only shrink it. If a length is passed that is +// greater than the current size of the view, then nothing will happen. +// +// Precondition: length must be >= 0. func (v *View) Truncate(length int64) { - if length < 0 || length >= v.size { + if length < 0 { + panic("negative length provided") + } + if length >= v.size { return // Nothing to do. } for buf := v.data.Back(); buf != nil && v.size > length; buf = v.data.Back() { - l := int64(buf.write - buf.read) // Local bytes. - switch { - case v.size-l >= length: - // Drop the buffer completely; see above. - v.data.Remove(buf) - v.size -= l - buf.Reset() - bufferPool.Put(buf) - - case v.size > length && v.size-l < length: - // Just truncate the buffer locally. - delta := (length - (v.size - l)) - buf.write = buf.read + int(delta) + sz := int64(buf.ReadSize()) + if after := v.size - sz; after < length { + // Truncate the buffer locally. + left := (length - after) + buf.write = buf.read + int(left) v.size = length - - default: - // Should never happen. - panic("invalid buffer during truncation") + break } + + // Drop the buffer completely; see above. + v.data.Remove(buf) + buf.Reset() + bufferPool.Put(buf) + v.size -= sz } - v.size = length // Save the new size. } -// Grow grows the given view to the number of bytes. If zero -// is true, all these bytes will be zero. If zero is false, -// then this is the caller's responsibility. +// Grow grows the given view to the number of bytes, which will be appended. If +// zero is true, all these bytes will be zero. If zero is false, then this is +// the caller's responsibility. // // Precondition: length must be >= 0. func (v *View) Grow(length int64, zero bool) { @@ -149,29 +135,29 @@ func (v *View) Grow(length int64, zero bool) { for v.size < length { buf := v.data.Back() - // Is there at least one buffer? + // Is there some space in the last buffer? if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Write up to length bytes. - l := len(buf.data) - buf.write - if int64(l) > length-v.size { - l = int(length - v.size) + sz := buf.WriteSize() + if int64(sz) > length-v.size { + sz = int(length - v.size) } // Zero the written section; note that this pattern is // specifically recognized and optimized by the compiler. if zero { - for i := buf.write; i < buf.write+l; i++ { + for i := buf.write; i < buf.write+sz; i++ { buf.data[i] = 0 } } // Advance the index. - buf.write += l - v.size += int64(l) + buf.WriteMove(sz) + v.size += int64(sz) } } @@ -181,31 +167,40 @@ func (v *View) Prepend(data []byte) { if buf := v.data.Front(); buf != nil && buf.read > 0 { // Fill up before the first write. avail := buf.read - copy(buf.data[0:], data[len(data)-avail:]) - data = data[:len(data)-avail] - v.size += int64(avail) + bStart := 0 + dStart := len(data) - avail + if avail > len(data) { + bStart = avail - len(data) + dStart = 0 + } + n := copy(buf.data[bStart:], data[dStart:]) + data = data[:dStart] + v.size += int64(n) + buf.read -= n } for len(data) > 0 { // Do we need an empty buffer? - buf := bufferPool.Get().(*Buffer) + buf := bufferPool.Get().(*buffer) v.data.PushFront(buf) // The buffer is empty; copy last chunk. - start := len(data) - len(buf.data) - if start < 0 { - start = 0 // Everything. + avail := len(buf.data) + bStart := 0 + dStart := len(data) - avail + if avail > len(data) { + bStart = avail - len(data) + dStart = 0 } // We have to put the data at the end of the current // buffer in order to ensure that the next prepend will // correctly fill up the beginning of this buffer. - bStart := len(buf.data) - len(data[start:]) - n := copy(buf.data[bStart:], data[start:]) - buf.read = bStart - buf.write = len(buf.data) - data = data[:start] + n := copy(buf.data[bStart:], data[dStart:]) + data = data[:dStart] v.size += int64(n) + buf.read = len(buf.data) - n + buf.write = len(buf.data) } } @@ -214,16 +209,16 @@ func (v *View) Append(data []byte) { for done := 0; done < len(data); { buf := v.data.Back() - // Find the first empty buffer. + // Ensure there's a buffer with space. if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Copy in to the given buffer. - n := copy(buf.data[buf.write:], data[done:]) + n := copy(buf.WriteSlice(), data[done:]) done += n - buf.write += n + buf.WriteMove(n) v.size += int64(n) } } @@ -232,52 +227,52 @@ func (v *View) Append(data []byte) { // // This method should not be used in any performance-sensitive paths. It may // allocate a fresh byte slice sufficiently large to contain all the data in -// the buffer. +// the buffer. This is principally for debugging. // // N.B. Tee data still belongs to this view, as if there is a single buffer // present, then it will be returned directly. This should be used for // temporary use only, and a reference to the given slice should not be held. func (v *View) Flatten() []byte { - if buf := v.data.Front(); buf.Next() == nil { - return buf.data[buf.read:buf.write] // Only one buffer. + if buf := v.data.Front(); buf == nil { + return nil // No data at all. + } else if buf.Next() == nil { + return buf.ReadSlice() // Only one buffer. } data := make([]byte, 0, v.size) // Need to flatten. for buf := v.data.Front(); buf != nil; buf = buf.Next() { // Copy to the allocated slice. - data = append(data, buf.data[buf.read:buf.write]...) + data = append(data, buf.ReadSlice()...) } return data } // Size indicates the total amount of data available in this view. -func (v *View) Size() (sz int64) { - sz = v.size // Pre-calculated. - return sz +func (v *View) Size() int64 { + return v.size } // Copy makes a strict copy of this view. func (v *View) Copy() (other View) { for buf := v.data.Front(); buf != nil; buf = buf.Next() { - other.Append(buf.data[buf.read:buf.write]) + other.Append(buf.ReadSlice()) } - return other + return } // Apply applies the given function across all valid data. func (v *View) Apply(fn func([]byte)) { for buf := v.data.Front(); buf != nil; buf = buf.Next() { - if l := int64(buf.write - buf.read); l > 0 { - fn(buf.data[buf.read:buf.write]) - } + fn(buf.ReadSlice()) } } // Merge merges the provided View with this one. // -// The other view will be empty after this operation. +// The other view will be appended to v, and other will be empty after this +// operation completes. func (v *View) Merge(other *View) { // Copy over all buffers. - for buf := other.data.Front(); buf != nil && !buf.Empty(); buf = other.data.Front() { + for buf := other.data.Front(); buf != nil; buf = other.data.Front() { other.data.Remove(buf) v.data.PushBack(buf) } @@ -288,6 +283,9 @@ func (v *View) Merge(other *View) { } // WriteFromReader writes to the buffer from an io.Reader. +// +// A minimum read size equal to unsafe.Sizeof(unintptr) is enforced, +// provided that count is greater than or equal to unsafe.Sizeof(uintptr). func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { var ( done int64 @@ -297,17 +295,17 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { for done < count { buf := v.data.Back() - // Find the first empty buffer. + // Ensure we have an empty buffer. if buf == nil || buf.Full() { - buf = bufferPool.Get().(*Buffer) + buf = bufferPool.Get().(*buffer) v.data.PushBack(buf) } // Is this less than the minimum batch? - if len(buf.data[buf.write:]) < minBatch && (count-done) >= int64(minBatch) { + if buf.WriteSize() < minBatch && (count-done) >= int64(minBatch) { tmp := make([]byte, minBatch) n, err = r.Read(tmp) - v.Write(tmp[:n]) + v.Append(tmp[:n]) done += int64(n) if err != nil { break @@ -316,14 +314,14 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { } // Limit the read, if necessary. - end := len(buf.data) - if int64(end-buf.write) > (count - done) { - end = buf.write + int(count-done) + sz := buf.WriteSize() + if left := count - done; int64(sz) > left { + sz = int(left) } // Pass the relevant portion of the buffer. - n, err = r.Read(buf.data[buf.write:end]) - buf.write += n + n, err = r.Read(buf.WriteSlice()[:sz]) + buf.WriteMove(n) done += int64(n) v.size += int64(n) if err == io.EOF { @@ -340,6 +338,9 @@ func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { // // N.B. This does not consume the bytes read. TrimFront should // be called appropriately after this call in order to do so. +// +// A minimum write size equal to unsafe.Sizeof(unintptr) is enforced, +// provided that count is greater than or equal to unsafe.Sizeof(uintptr). func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { var ( done int64 @@ -348,15 +349,22 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { ) offset := 0 // Spill-over for batching. for buf := v.data.Front(); buf != nil && done < count; buf = buf.Next() { - l := buf.write - buf.read - offset + // Has this been consumed? Skip it. + sz := buf.ReadSize() + if sz <= offset { + offset -= sz + continue + } + sz -= offset // Is this less than the minimum batch? - if l < minBatch && (count-done) >= int64(minBatch) && (v.size-done) >= int64(minBatch) { + left := count - done + if sz < minBatch && left >= int64(minBatch) && (v.size-done) >= int64(minBatch) { tmp := make([]byte, minBatch) n, err = v.ReadAt(tmp, done) w.Write(tmp[:n]) done += int64(n) - offset = n - l // Reset below. + offset = n - sz // Reset below. if err != nil { break } @@ -364,12 +372,12 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { } // Limit the write if necessary. - if int64(l) >= (count - done) { - l = int(count - done) + if int64(sz) >= left { + sz = int(left) } // Perform the actual write. - n, err = w.Write(buf.data[buf.read+offset : buf.read+offset+l]) + n, err = w.Write(buf.ReadSlice()[offset : offset+sz]) done += int64(n) if err != nil { break |