summaryrefslogtreecommitdiffhomepage
path: root/pkg/buffer
diff options
context:
space:
mode:
authorTing-Yu Wang <anivia@google.com>2021-05-13 13:54:04 -0700
committergVisor bot <gvisor-bot@google.com>2021-05-13 13:56:16 -0700
commit84f04cc858644e9748a82f33b834a84c8b0fc934 (patch)
tree011d6915a666ea978a7b5efb7397757cef3370e0 /pkg/buffer
parentbaa0888f114c586ea490d49a23c3d828fd739b85 (diff)
Migrate PacketBuffer to use pkg/buffer
Benchmark iperf3: Before After native->runsc 5.14 5.01 (Gbps) runsc->native 4.15 4.07 (Gbps) It did introduce overhead, mainly at the bridge between pkg/buffer and VectorisedView, the ExtractVV method. Once endpoints start migrating away from VV, this overhead will be gone. Updates #2404 PiperOrigin-RevId: 373651666
Diffstat (limited to 'pkg/buffer')
-rw-r--r--pkg/buffer/BUILD1
-rw-r--r--pkg/buffer/buffer.go28
-rw-r--r--pkg/buffer/buffer_test.go111
-rw-r--r--pkg/buffer/pool.go9
-rw-r--r--pkg/buffer/view.go179
-rw-r--r--pkg/buffer/view_test.go356
6 files changed, 681 insertions, 3 deletions
diff --git a/pkg/buffer/BUILD b/pkg/buffer/BUILD
index 1186f788e..2a2e3d1aa 100644
--- a/pkg/buffer/BUILD
+++ b/pkg/buffer/BUILD
@@ -38,6 +38,7 @@ go_test(
name = "buffer_test",
size = "small",
srcs = [
+ "buffer_test.go",
"pool_test.go",
"safemem_test.go",
"view_test.go",
diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go
index 311808ae9..5b77a6a3f 100644
--- a/pkg/buffer/buffer.go
+++ b/pkg/buffer/buffer.go
@@ -33,12 +33,40 @@ func (b *buffer) init(size int) {
b.data = make([]byte, size)
}
+// initWithData initializes b with data, taking ownership.
+func (b *buffer) initWithData(data []byte) {
+ b.data = data
+ b.read = 0
+ b.write = len(data)
+}
+
// Reset resets read and write locations, effectively emptying the buffer.
func (b *buffer) Reset() {
b.read = 0
b.write = 0
}
+// Remove removes r from the unread portion. It returns false if r does not
+// fully reside in b.
+func (b *buffer) Remove(r Range) bool {
+ sz := b.ReadSize()
+ switch {
+ case r.Len() != r.Intersect(Range{end: sz}).Len():
+ return false
+ case r.Len() == 0:
+ // Noop
+ case r.begin == 0:
+ b.read += r.end
+ case r.end == sz:
+ b.write -= r.Len()
+ default:
+ // Remove from the middle of b.data.
+ copy(b.data[b.read+r.begin:], b.data[b.read+r.end:b.write])
+ b.write -= r.Len()
+ }
+ return true
+}
+
// Full indicates the buffer is full.
//
// This indicates there is no capacity left to write.
diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go
new file mode 100644
index 000000000..32db841e4
--- /dev/null
+++ b/pkg/buffer/buffer_test.go
@@ -0,0 +1,111 @@
+// Copyright 2021 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestBufferRemove(t *testing.T) {
+ sample := []byte("01234567")
+
+ // Success cases
+ for _, tc := range []struct {
+ desc string
+ data []byte
+ rng Range
+ want []byte
+ }{
+ {
+ desc: "empty slice",
+ },
+ {
+ desc: "empty range",
+ data: sample,
+ want: sample,
+ },
+ {
+ desc: "empty range with positive begin",
+ data: sample,
+ rng: Range{begin: 1, end: 1},
+ want: sample,
+ },
+ {
+ desc: "range at beginning",
+ data: sample,
+ rng: Range{begin: 0, end: 1},
+ want: sample[1:],
+ },
+ {
+ desc: "range in middle",
+ data: sample,
+ rng: Range{begin: 2, end: 4},
+ want: []byte("014567"),
+ },
+ {
+ desc: "range at end",
+ data: sample,
+ rng: Range{begin: 7, end: 8},
+ want: sample[:7],
+ },
+ {
+ desc: "range all",
+ data: sample,
+ rng: Range{begin: 0, end: 8},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var buf buffer
+ buf.initWithData(tc.data)
+ if ok := buf.Remove(tc.rng); !ok {
+ t.Errorf("buf.Remove(%#v) = false, want true", tc.rng)
+ } else if got := buf.ReadSlice(); !bytes.Equal(got, tc.want) {
+ t.Errorf("buf.ReadSlice() = %q, want %q", got, tc.want)
+ }
+ })
+ }
+
+ // Failure cases
+ for _, tc := range []struct {
+ desc string
+ data []byte
+ rng Range
+ }{
+ {
+ desc: "begin out-of-range",
+ data: sample,
+ rng: Range{begin: -1, end: 4},
+ },
+ {
+ desc: "end out-of-range",
+ data: sample,
+ rng: Range{begin: 4, end: 9},
+ },
+ {
+ desc: "both out-of-range",
+ data: sample,
+ rng: Range{begin: -100, end: 100},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var buf buffer
+ buf.initWithData(tc.data)
+ if ok := buf.Remove(tc.rng); ok {
+ t.Errorf("buf.Remove(%#v) = true, want false", tc.rng)
+ }
+ })
+ }
+}
diff --git a/pkg/buffer/pool.go b/pkg/buffer/pool.go
index 7ad6132ab..2ec41dd4f 100644
--- a/pkg/buffer/pool.go
+++ b/pkg/buffer/pool.go
@@ -42,6 +42,13 @@ type pool struct {
// get gets a new buffer from p.
func (p *pool) get() *buffer {
+ buf := p.getNoInit()
+ buf.init(p.bufferSize)
+ return buf
+}
+
+// get gets a new buffer from p without initializing it.
+func (p *pool) getNoInit() *buffer {
if p.avail == nil {
p.avail = p.embeddedStorage[:]
}
@@ -52,7 +59,6 @@ func (p *pool) get() *buffer {
p.bufferSize = defaultBufferSize
}
buf := &p.avail[0]
- buf.init(p.bufferSize)
p.avail = p.avail[1:]
return buf
}
@@ -62,6 +68,7 @@ func (p *pool) put(buf *buffer) {
// Remove reference to the underlying storage, allowing it to be garbage
// collected.
buf.data = nil
+ buf.Reset()
}
// setBufferSize sets the size of underlying storage buffer for future
diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go
index 00652d675..7bcfcd543 100644
--- a/pkg/buffer/view.go
+++ b/pkg/buffer/view.go
@@ -19,6 +19,9 @@ import (
"io"
)
+// Buffer is an alias to View.
+type Buffer = View
+
// View is a non-linear buffer.
//
// All methods are thread compatible.
@@ -39,6 +42,51 @@ func (v *View) TrimFront(count int64) {
}
}
+// Remove deletes data at specified location in v. It returns false if specified
+// range does not fully reside in v.
+func (v *View) Remove(offset, length int) bool {
+ if offset < 0 || length < 0 {
+ return false
+ }
+ tgt := Range{begin: offset, end: offset + length}
+ if tgt.Len() != tgt.Intersect(Range{end: int(v.size)}).Len() {
+ return false
+ }
+
+ // Scan through each buffer and remove intersections.
+ var curr Range
+ for buf := v.data.Front(); buf != nil; {
+ origLen := buf.ReadSize()
+ curr.end = curr.begin + origLen
+
+ if x := curr.Intersect(tgt); x.Len() > 0 {
+ if !buf.Remove(x.Offset(-curr.begin)) {
+ panic("buf.Remove() failed")
+ }
+ if buf.ReadSize() == 0 {
+ // buf fully removed, removing it from the list.
+ oldBuf := buf
+ buf = buf.Next()
+ v.data.Remove(oldBuf)
+ v.pool.put(oldBuf)
+ } else {
+ // Only partial data intersects, moving on to next one.
+ buf = buf.Next()
+ }
+ v.size -= int64(x.Len())
+ } else {
+ // This buffer is not in range, moving on to next one.
+ buf = buf.Next()
+ }
+
+ curr.begin += origLen
+ if curr.begin >= tgt.end {
+ break
+ }
+ }
+ return true
+}
+
// ReadAt implements io.ReaderAt.ReadAt.
func (v *View) ReadAt(p []byte, offset int64) (int, error) {
var (
@@ -81,7 +129,6 @@ func (v *View) advanceRead(count int64) {
oldBuf := buf
buf = buf.Next() // Iterate.
v.data.Remove(oldBuf)
- oldBuf.Reset()
v.pool.put(oldBuf)
// Update counts.
@@ -118,7 +165,6 @@ func (v *View) Truncate(length int64) {
// Drop the buffer completely; see above.
v.data.Remove(buf)
- buf.Reset()
v.pool.put(buf)
v.size -= sz
}
@@ -224,6 +270,78 @@ func (v *View) Append(data []byte) {
}
}
+// AppendOwned takes ownership of data and appends it to v.
+func (v *View) AppendOwned(data []byte) {
+ if len(data) > 0 {
+ buf := v.pool.getNoInit()
+ buf.initWithData(data)
+ v.data.PushBack(buf)
+ v.size += int64(len(data))
+ }
+}
+
+// PullUp makes the specified range contiguous and returns the backing memory.
+func (v *View) PullUp(offset, length int) ([]byte, bool) {
+ if length == 0 {
+ return nil, true
+ }
+ tgt := Range{begin: offset, end: offset + length}
+ if tgt.Intersect(Range{end: int(v.size)}).Len() != length {
+ return nil, false
+ }
+
+ curr := Range{}
+ buf := v.data.Front()
+ for ; buf != nil; buf = buf.Next() {
+ origLen := buf.ReadSize()
+ curr.end = curr.begin + origLen
+
+ if x := curr.Intersect(tgt); x.Len() == tgt.Len() {
+ // buf covers the whole requested target range.
+ sub := x.Offset(-curr.begin)
+ return buf.ReadSlice()[sub.begin:sub.end], true
+ } else if x.Len() > 0 {
+ // buf is pointing at the starting buffer we want to merge.
+ break
+ }
+
+ curr.begin += origLen
+ }
+
+ // Calculate the total merged length.
+ totLen := 0
+ for n := buf; n != nil; n = n.Next() {
+ totLen += n.ReadSize()
+ if curr.begin+totLen >= tgt.end {
+ break
+ }
+ }
+
+ // Merge the buffers.
+ data := make([]byte, totLen)
+ off := 0
+ for n := buf; n != nil && off < totLen; {
+ copy(data[off:], n.ReadSlice())
+ off += n.ReadSize()
+
+ // Remove buffers except for the first one, which will be reused.
+ if n == buf {
+ n = n.Next()
+ } else {
+ old := n
+ n = n.Next()
+ v.data.Remove(old)
+ v.pool.put(old)
+ }
+ }
+
+ // Update the first buffer with merged data.
+ buf.initWithData(data)
+
+ r := tgt.Offset(-curr.begin)
+ return buf.data[r.begin:r.end], true
+}
+
// Flatten returns a flattened copy of this data.
//
// This method should not be used in any performance-sensitive paths. It may
@@ -267,6 +385,27 @@ func (v *View) Apply(fn func([]byte)) {
}
}
+// SubApply applies fn to a given range of data in v. Any part of the range
+// outside of v is ignored.
+func (v *View) SubApply(offset, length int, fn func([]byte)) {
+ for buf := v.data.Front(); length > 0 && buf != nil; buf = buf.Next() {
+ d := buf.ReadSlice()
+ if offset >= len(d) {
+ offset -= len(d)
+ continue
+ }
+ if offset > 0 {
+ d = d[offset:]
+ offset = 0
+ }
+ if length < len(d) {
+ d = d[:length]
+ }
+ fn(d)
+ length -= len(d)
+ }
+}
+
// Merge merges the provided View with this one.
//
// The other view will be appended to v, and other will be empty after this
@@ -389,3 +528,39 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) {
}
return done, err
}
+
+// A Range specifies a range of buffer.
+type Range struct {
+ begin int
+ end int
+}
+
+// Intersect returns the intersection of x and y.
+func (x Range) Intersect(y Range) Range {
+ if x.begin < y.begin {
+ x.begin = y.begin
+ }
+ if x.end > y.end {
+ x.end = y.end
+ }
+ if x.begin >= x.end {
+ return Range{}
+ }
+ return x
+}
+
+// Offset returns x offset by off.
+func (x Range) Offset(off int) Range {
+ x.begin += off
+ x.end += off
+ return x
+}
+
+// Len returns the length of x.
+func (x Range) Len() int {
+ l := x.end - x.begin
+ if l < 0 {
+ l = 0
+ }
+ return l
+}
diff --git a/pkg/buffer/view_test.go b/pkg/buffer/view_test.go
index 839af0223..796efa240 100644
--- a/pkg/buffer/view_test.go
+++ b/pkg/buffer/view_test.go
@@ -17,7 +17,9 @@ package buffer
import (
"bytes"
"context"
+ "fmt"
"io"
+ "reflect"
"strings"
"testing"
@@ -237,6 +239,18 @@ func TestView(t *testing.T) {
},
},
+ // AppendOwned.
+ {
+ name: "append-owned",
+ input: "hello",
+ output: "hello world",
+ op: func(t *testing.T, v *View) {
+ b := []byte("Xworld")
+ v.AppendOwned(b)
+ b[0] = ' '
+ },
+ },
+
// Truncate.
{
name: "truncate",
@@ -495,6 +509,267 @@ func TestView(t *testing.T) {
}
}
+func TestViewPullUp(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ inputs []string
+ offset int
+ length int
+ output string
+ failed bool
+ // lengths is the lengths of each buffer node after the pull up.
+ lengths []int
+ }{
+ {
+ desc: "whole empty view",
+ },
+ {
+ desc: "zero pull",
+ inputs: []string{"hello", " world"},
+ lengths: []int{5, 6},
+ },
+ {
+ desc: "whole view",
+ inputs: []string{"hello", " world"},
+ offset: 0,
+ length: 11,
+ output: "hello world",
+ lengths: []int{11},
+ },
+ {
+ desc: "middle to end aligned",
+ inputs: []string{"0123", "45678", "9abcd"},
+ offset: 4,
+ length: 10,
+ output: "456789abcd",
+ lengths: []int{4, 10},
+ },
+ {
+ desc: "middle to end unaligned",
+ inputs: []string{"0123", "45678", "9abcd"},
+ offset: 6,
+ length: 8,
+ output: "6789abcd",
+ lengths: []int{4, 10},
+ },
+ {
+ desc: "middle aligned",
+ inputs: []string{"0123", "45678", "9abcd", "efgh"},
+ offset: 6,
+ length: 5,
+ output: "6789a",
+ lengths: []int{4, 10, 4},
+ },
+
+ // Failed cases.
+ {
+ desc: "empty view - length too long",
+ offset: 0,
+ length: 1,
+ failed: true,
+ },
+ {
+ desc: "empty view - offset too large",
+ offset: 1,
+ length: 1,
+ failed: true,
+ },
+ {
+ desc: "length too long",
+ inputs: []string{"0123", "45678", "9abcd"},
+ offset: 4,
+ length: 100,
+ failed: true,
+ lengths: []int{4, 5, 5},
+ },
+ {
+ desc: "offset too large",
+ inputs: []string{"0123", "45678", "9abcd"},
+ offset: 100,
+ length: 1,
+ failed: true,
+ lengths: []int{4, 5, 5},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var v View
+ for _, s := range tc.inputs {
+ v.AppendOwned([]byte(s))
+ }
+
+ got, gotOk := v.PullUp(tc.offset, tc.length)
+ want, wantOk := []byte(tc.output), !tc.failed
+ if gotOk != wantOk || !bytes.Equal(got, want) {
+ t.Errorf("v.PullUp(%d, %d) = %q, %t; %q, %t", tc.offset, tc.length, got, gotOk, want, wantOk)
+ }
+
+ var gotLengths []int
+ for buf := v.data.Front(); buf != nil; buf = buf.Next() {
+ gotLengths = append(gotLengths, buf.ReadSize())
+ }
+ if !reflect.DeepEqual(gotLengths, tc.lengths) {
+ t.Errorf("lengths = %v; want %v", gotLengths, tc.lengths)
+ }
+ })
+ }
+}
+
+func TestViewRemove(t *testing.T) {
+ // Success cases
+ for _, tc := range []struct {
+ desc string
+ // before is the contents for each buffer node initially.
+ before []string
+ // after is the contents for each buffer node after removal.
+ after []string
+ offset int
+ length int
+ }{
+ {
+ desc: "empty view",
+ },
+ {
+ desc: "nothing removed",
+ before: []string{"hello", " world"},
+ after: []string{"hello", " world"},
+ },
+ {
+ desc: "whole view",
+ before: []string{"hello", " world"},
+ offset: 0,
+ length: 11,
+ },
+ {
+ desc: "beginning to middle aligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"9abcd"},
+ offset: 0,
+ length: 9,
+ },
+ {
+ desc: "beginning to middle unaligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"678", "9abcd"},
+ offset: 0,
+ length: 6,
+ },
+ {
+ desc: "middle to end aligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"0123"},
+ offset: 4,
+ length: 10,
+ },
+ {
+ desc: "middle to end unaligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"0123", "45"},
+ offset: 6,
+ length: 8,
+ },
+ {
+ desc: "middle aligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"0123", "9abcd"},
+ offset: 4,
+ length: 5,
+ },
+ {
+ desc: "middle unaligned",
+ before: []string{"0123", "45678", "9abcd"},
+ after: []string{"0123", "4578", "9abcd"},
+ offset: 6,
+ length: 1,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var v View
+ for _, s := range tc.before {
+ v.AppendOwned([]byte(s))
+ }
+
+ if ok := v.Remove(tc.offset, tc.length); !ok {
+ t.Errorf("v.Remove(%d, %d) = false, want true", tc.offset, tc.length)
+ }
+
+ var got []string
+ for buf := v.data.Front(); buf != nil; buf = buf.Next() {
+ got = append(got, string(buf.ReadSlice()))
+ }
+ if !reflect.DeepEqual(got, tc.after) {
+ t.Errorf("after = %v; want %v", got, tc.after)
+ }
+ })
+ }
+
+ // Failure cases
+ for _, tc := range []struct {
+ desc string
+ // before is the contents for each buffer node initially.
+ before []string
+ offset int
+ length int
+ }{
+ {
+ desc: "offset out-of-range",
+ before: []string{"hello", " world"},
+ offset: -1,
+ length: 3,
+ },
+ {
+ desc: "length too long",
+ before: []string{"hello", " world"},
+ offset: 0,
+ length: 12,
+ },
+ {
+ desc: "length too long with positive offset",
+ before: []string{"hello", " world"},
+ offset: 3,
+ length: 9,
+ },
+ {
+ desc: "length negative",
+ before: []string{"hello", " world"},
+ offset: 0,
+ length: -1,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var v View
+ for _, s := range tc.before {
+ v.AppendOwned([]byte(s))
+ }
+ if ok := v.Remove(tc.offset, tc.length); ok {
+ t.Errorf("v.Remove(%d, %d) = true, want false", tc.offset, tc.length)
+ }
+ })
+ }
+}
+
+func TestViewSubApply(t *testing.T) {
+ var v View
+ v.AppendOwned([]byte("0123"))
+ v.AppendOwned([]byte("45678"))
+ v.AppendOwned([]byte("9abcd"))
+
+ data := []byte("0123456789abcd")
+
+ for i := 0; i <= len(data); i++ {
+ for j := i; j <= len(data); j++ {
+ t.Run(fmt.Sprintf("SubApply(%d,%d)", i, j), func(t *testing.T) {
+ var got []byte
+ v.SubApply(i, j-i, func(b []byte) {
+ got = append(got, b...)
+ })
+ if want := data[i:j]; !bytes.Equal(got, want) {
+ t.Errorf("got = %q; want %q", got, want)
+ }
+ })
+ }
+ }
+}
+
func doSaveAndLoad(t *testing.T, toSave, toLoad *View) {
t.Helper()
var buf bytes.Buffer
@@ -542,3 +817,84 @@ func TestSaveRestoreView(t *testing.T) {
t.Errorf("v.Flatten() = %x, want %x", got, data)
}
}
+
+func TestRangeIntersect(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ x, y, want Range
+ }{
+ {
+ desc: "empty intersects empty",
+ },
+ {
+ desc: "empty intersection",
+ x: Range{end: 10},
+ y: Range{begin: 10, end: 20},
+ },
+ {
+ desc: "some intersection",
+ x: Range{begin: 5, end: 20},
+ y: Range{end: 10},
+ want: Range{begin: 5, end: 10},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ if got := tc.x.Intersect(tc.y); got != tc.want {
+ t.Errorf("(%#v).Intersect(%#v) = %#v; want %#v", tc.x, tc.y, got, tc.want)
+ }
+ if got := tc.y.Intersect(tc.x); got != tc.want {
+ t.Errorf("(%#v).Intersect(%#v) = %#v; want %#v", tc.y, tc.x, got, tc.want)
+ }
+ })
+ }
+}
+
+func TestRangeOffset(t *testing.T) {
+ for _, tc := range []struct {
+ input Range
+ offset int
+ output Range
+ }{
+ {
+ input: Range{},
+ offset: 0,
+ output: Range{},
+ },
+ {
+ input: Range{},
+ offset: -1,
+ output: Range{begin: -1, end: -1},
+ },
+ {
+ input: Range{begin: 10, end: 20},
+ offset: -1,
+ output: Range{begin: 9, end: 19},
+ },
+ {
+ input: Range{begin: 10, end: 20},
+ offset: 2,
+ output: Range{begin: 12, end: 22},
+ },
+ } {
+ if got := tc.input.Offset(tc.offset); got != tc.output {
+ t.Errorf("(%#v).Offset(%d) = %#v, want %#v", tc.input, tc.offset, got, tc.output)
+ }
+ }
+}
+
+func TestRangeLen(t *testing.T) {
+ for _, tc := range []struct {
+ r Range
+ want int
+ }{
+ {r: Range{}, want: 0},
+ {r: Range{begin: 1, end: 1}, want: 0},
+ {r: Range{begin: -1, end: -1}, want: 0},
+ {r: Range{end: 10}, want: 10},
+ {r: Range{begin: 5, end: 10}, want: 5},
+ } {
+ if got := tc.r.Len(); got != tc.want {
+ t.Errorf("(%#v).Len() = %d, want %d", tc.r, got, tc.want)
+ }
+ }
+}