From 84f04cc858644e9748a82f33b834a84c8b0fc934 Mon Sep 17 00:00:00 2001 From: Ting-Yu Wang Date: Thu, 13 May 2021 13:54:04 -0700 Subject: Migrate PacketBuffer to use pkg/buffer Benchmark iperf3: Before After native->runsc 5.14 5.01 (Gbps) runsc->native 4.15 4.07 (Gbps) It did introduce overhead, mainly at the bridge between pkg/buffer and VectorisedView, the ExtractVV method. Once endpoints start migrating away from VV, this overhead will be gone. Updates #2404 PiperOrigin-RevId: 373651666 --- pkg/buffer/BUILD | 1 + pkg/buffer/buffer.go | 28 ++++ pkg/buffer/buffer_test.go | 111 +++++++++++++++ pkg/buffer/pool.go | 9 +- pkg/buffer/view.go | 179 ++++++++++++++++++++++- pkg/buffer/view_test.go | 356 ++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 681 insertions(+), 3 deletions(-) create mode 100644 pkg/buffer/buffer_test.go (limited to 'pkg/buffer') diff --git a/pkg/buffer/BUILD b/pkg/buffer/BUILD index 1186f788e..2a2e3d1aa 100644 --- a/pkg/buffer/BUILD +++ b/pkg/buffer/BUILD @@ -38,6 +38,7 @@ go_test( name = "buffer_test", size = "small", srcs = [ + "buffer_test.go", "pool_test.go", "safemem_test.go", "view_test.go", diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index 311808ae9..5b77a6a3f 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -33,12 +33,40 @@ func (b *buffer) init(size int) { b.data = make([]byte, size) } +// initWithData initializes b with data, taking ownership. +func (b *buffer) initWithData(data []byte) { + b.data = data + b.read = 0 + b.write = len(data) +} + // Reset resets read and write locations, effectively emptying the buffer. func (b *buffer) Reset() { b.read = 0 b.write = 0 } +// Remove removes r from the unread portion. It returns false if r does not +// fully reside in b. +func (b *buffer) Remove(r Range) bool { + sz := b.ReadSize() + switch { + case r.Len() != r.Intersect(Range{end: sz}).Len(): + return false + case r.Len() == 0: + // Noop + case r.begin == 0: + b.read += r.end + case r.end == sz: + b.write -= r.Len() + default: + // Remove from the middle of b.data. + copy(b.data[b.read+r.begin:], b.data[b.read+r.end:b.write]) + b.write -= r.Len() + } + return true +} + // Full indicates the buffer is full. // // This indicates there is no capacity left to write. diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go new file mode 100644 index 000000000..32db841e4 --- /dev/null +++ b/pkg/buffer/buffer_test.go @@ -0,0 +1,111 @@ +// Copyright 2021 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "bytes" + "testing" +) + +func TestBufferRemove(t *testing.T) { + sample := []byte("01234567") + + // Success cases + for _, tc := range []struct { + desc string + data []byte + rng Range + want []byte + }{ + { + desc: "empty slice", + }, + { + desc: "empty range", + data: sample, + want: sample, + }, + { + desc: "empty range with positive begin", + data: sample, + rng: Range{begin: 1, end: 1}, + want: sample, + }, + { + desc: "range at beginning", + data: sample, + rng: Range{begin: 0, end: 1}, + want: sample[1:], + }, + { + desc: "range in middle", + data: sample, + rng: Range{begin: 2, end: 4}, + want: []byte("014567"), + }, + { + desc: "range at end", + data: sample, + rng: Range{begin: 7, end: 8}, + want: sample[:7], + }, + { + desc: "range all", + data: sample, + rng: Range{begin: 0, end: 8}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var buf buffer + buf.initWithData(tc.data) + if ok := buf.Remove(tc.rng); !ok { + t.Errorf("buf.Remove(%#v) = false, want true", tc.rng) + } else if got := buf.ReadSlice(); !bytes.Equal(got, tc.want) { + t.Errorf("buf.ReadSlice() = %q, want %q", got, tc.want) + } + }) + } + + // Failure cases + for _, tc := range []struct { + desc string + data []byte + rng Range + }{ + { + desc: "begin out-of-range", + data: sample, + rng: Range{begin: -1, end: 4}, + }, + { + desc: "end out-of-range", + data: sample, + rng: Range{begin: 4, end: 9}, + }, + { + desc: "both out-of-range", + data: sample, + rng: Range{begin: -100, end: 100}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var buf buffer + buf.initWithData(tc.data) + if ok := buf.Remove(tc.rng); ok { + t.Errorf("buf.Remove(%#v) = true, want false", tc.rng) + } + }) + } +} diff --git a/pkg/buffer/pool.go b/pkg/buffer/pool.go index 7ad6132ab..2ec41dd4f 100644 --- a/pkg/buffer/pool.go +++ b/pkg/buffer/pool.go @@ -42,6 +42,13 @@ type pool struct { // get gets a new buffer from p. func (p *pool) get() *buffer { + buf := p.getNoInit() + buf.init(p.bufferSize) + return buf +} + +// get gets a new buffer from p without initializing it. +func (p *pool) getNoInit() *buffer { if p.avail == nil { p.avail = p.embeddedStorage[:] } @@ -52,7 +59,6 @@ func (p *pool) get() *buffer { p.bufferSize = defaultBufferSize } buf := &p.avail[0] - buf.init(p.bufferSize) p.avail = p.avail[1:] return buf } @@ -62,6 +68,7 @@ func (p *pool) put(buf *buffer) { // Remove reference to the underlying storage, allowing it to be garbage // collected. buf.data = nil + buf.Reset() } // setBufferSize sets the size of underlying storage buffer for future diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go index 00652d675..7bcfcd543 100644 --- a/pkg/buffer/view.go +++ b/pkg/buffer/view.go @@ -19,6 +19,9 @@ import ( "io" ) +// Buffer is an alias to View. +type Buffer = View + // View is a non-linear buffer. // // All methods are thread compatible. @@ -39,6 +42,51 @@ func (v *View) TrimFront(count int64) { } } +// Remove deletes data at specified location in v. It returns false if specified +// range does not fully reside in v. +func (v *View) Remove(offset, length int) bool { + if offset < 0 || length < 0 { + return false + } + tgt := Range{begin: offset, end: offset + length} + if tgt.Len() != tgt.Intersect(Range{end: int(v.size)}).Len() { + return false + } + + // Scan through each buffer and remove intersections. + var curr Range + for buf := v.data.Front(); buf != nil; { + origLen := buf.ReadSize() + curr.end = curr.begin + origLen + + if x := curr.Intersect(tgt); x.Len() > 0 { + if !buf.Remove(x.Offset(-curr.begin)) { + panic("buf.Remove() failed") + } + if buf.ReadSize() == 0 { + // buf fully removed, removing it from the list. + oldBuf := buf + buf = buf.Next() + v.data.Remove(oldBuf) + v.pool.put(oldBuf) + } else { + // Only partial data intersects, moving on to next one. + buf = buf.Next() + } + v.size -= int64(x.Len()) + } else { + // This buffer is not in range, moving on to next one. + buf = buf.Next() + } + + curr.begin += origLen + if curr.begin >= tgt.end { + break + } + } + return true +} + // ReadAt implements io.ReaderAt.ReadAt. func (v *View) ReadAt(p []byte, offset int64) (int, error) { var ( @@ -81,7 +129,6 @@ func (v *View) advanceRead(count int64) { oldBuf := buf buf = buf.Next() // Iterate. v.data.Remove(oldBuf) - oldBuf.Reset() v.pool.put(oldBuf) // Update counts. @@ -118,7 +165,6 @@ func (v *View) Truncate(length int64) { // Drop the buffer completely; see above. v.data.Remove(buf) - buf.Reset() v.pool.put(buf) v.size -= sz } @@ -224,6 +270,78 @@ func (v *View) Append(data []byte) { } } +// AppendOwned takes ownership of data and appends it to v. +func (v *View) AppendOwned(data []byte) { + if len(data) > 0 { + buf := v.pool.getNoInit() + buf.initWithData(data) + v.data.PushBack(buf) + v.size += int64(len(data)) + } +} + +// PullUp makes the specified range contiguous and returns the backing memory. +func (v *View) PullUp(offset, length int) ([]byte, bool) { + if length == 0 { + return nil, true + } + tgt := Range{begin: offset, end: offset + length} + if tgt.Intersect(Range{end: int(v.size)}).Len() != length { + return nil, false + } + + curr := Range{} + buf := v.data.Front() + for ; buf != nil; buf = buf.Next() { + origLen := buf.ReadSize() + curr.end = curr.begin + origLen + + if x := curr.Intersect(tgt); x.Len() == tgt.Len() { + // buf covers the whole requested target range. + sub := x.Offset(-curr.begin) + return buf.ReadSlice()[sub.begin:sub.end], true + } else if x.Len() > 0 { + // buf is pointing at the starting buffer we want to merge. + break + } + + curr.begin += origLen + } + + // Calculate the total merged length. + totLen := 0 + for n := buf; n != nil; n = n.Next() { + totLen += n.ReadSize() + if curr.begin+totLen >= tgt.end { + break + } + } + + // Merge the buffers. + data := make([]byte, totLen) + off := 0 + for n := buf; n != nil && off < totLen; { + copy(data[off:], n.ReadSlice()) + off += n.ReadSize() + + // Remove buffers except for the first one, which will be reused. + if n == buf { + n = n.Next() + } else { + old := n + n = n.Next() + v.data.Remove(old) + v.pool.put(old) + } + } + + // Update the first buffer with merged data. + buf.initWithData(data) + + r := tgt.Offset(-curr.begin) + return buf.data[r.begin:r.end], true +} + // Flatten returns a flattened copy of this data. // // This method should not be used in any performance-sensitive paths. It may @@ -267,6 +385,27 @@ func (v *View) Apply(fn func([]byte)) { } } +// SubApply applies fn to a given range of data in v. Any part of the range +// outside of v is ignored. +func (v *View) SubApply(offset, length int, fn func([]byte)) { + for buf := v.data.Front(); length > 0 && buf != nil; buf = buf.Next() { + d := buf.ReadSlice() + if offset >= len(d) { + offset -= len(d) + continue + } + if offset > 0 { + d = d[offset:] + offset = 0 + } + if length < len(d) { + d = d[:length] + } + fn(d) + length -= len(d) + } +} + // Merge merges the provided View with this one. // // The other view will be appended to v, and other will be empty after this @@ -389,3 +528,39 @@ func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { } return done, err } + +// A Range specifies a range of buffer. +type Range struct { + begin int + end int +} + +// Intersect returns the intersection of x and y. +func (x Range) Intersect(y Range) Range { + if x.begin < y.begin { + x.begin = y.begin + } + if x.end > y.end { + x.end = y.end + } + if x.begin >= x.end { + return Range{} + } + return x +} + +// Offset returns x offset by off. +func (x Range) Offset(off int) Range { + x.begin += off + x.end += off + return x +} + +// Len returns the length of x. +func (x Range) Len() int { + l := x.end - x.begin + if l < 0 { + l = 0 + } + return l +} diff --git a/pkg/buffer/view_test.go b/pkg/buffer/view_test.go index 839af0223..796efa240 100644 --- a/pkg/buffer/view_test.go +++ b/pkg/buffer/view_test.go @@ -17,7 +17,9 @@ package buffer import ( "bytes" "context" + "fmt" "io" + "reflect" "strings" "testing" @@ -237,6 +239,18 @@ func TestView(t *testing.T) { }, }, + // AppendOwned. + { + name: "append-owned", + input: "hello", + output: "hello world", + op: func(t *testing.T, v *View) { + b := []byte("Xworld") + v.AppendOwned(b) + b[0] = ' ' + }, + }, + // Truncate. { name: "truncate", @@ -495,6 +509,267 @@ func TestView(t *testing.T) { } } +func TestViewPullUp(t *testing.T) { + for _, tc := range []struct { + desc string + inputs []string + offset int + length int + output string + failed bool + // lengths is the lengths of each buffer node after the pull up. + lengths []int + }{ + { + desc: "whole empty view", + }, + { + desc: "zero pull", + inputs: []string{"hello", " world"}, + lengths: []int{5, 6}, + }, + { + desc: "whole view", + inputs: []string{"hello", " world"}, + offset: 0, + length: 11, + output: "hello world", + lengths: []int{11}, + }, + { + desc: "middle to end aligned", + inputs: []string{"0123", "45678", "9abcd"}, + offset: 4, + length: 10, + output: "456789abcd", + lengths: []int{4, 10}, + }, + { + desc: "middle to end unaligned", + inputs: []string{"0123", "45678", "9abcd"}, + offset: 6, + length: 8, + output: "6789abcd", + lengths: []int{4, 10}, + }, + { + desc: "middle aligned", + inputs: []string{"0123", "45678", "9abcd", "efgh"}, + offset: 6, + length: 5, + output: "6789a", + lengths: []int{4, 10, 4}, + }, + + // Failed cases. + { + desc: "empty view - length too long", + offset: 0, + length: 1, + failed: true, + }, + { + desc: "empty view - offset too large", + offset: 1, + length: 1, + failed: true, + }, + { + desc: "length too long", + inputs: []string{"0123", "45678", "9abcd"}, + offset: 4, + length: 100, + failed: true, + lengths: []int{4, 5, 5}, + }, + { + desc: "offset too large", + inputs: []string{"0123", "45678", "9abcd"}, + offset: 100, + length: 1, + failed: true, + lengths: []int{4, 5, 5}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var v View + for _, s := range tc.inputs { + v.AppendOwned([]byte(s)) + } + + got, gotOk := v.PullUp(tc.offset, tc.length) + want, wantOk := []byte(tc.output), !tc.failed + if gotOk != wantOk || !bytes.Equal(got, want) { + t.Errorf("v.PullUp(%d, %d) = %q, %t; %q, %t", tc.offset, tc.length, got, gotOk, want, wantOk) + } + + var gotLengths []int + for buf := v.data.Front(); buf != nil; buf = buf.Next() { + gotLengths = append(gotLengths, buf.ReadSize()) + } + if !reflect.DeepEqual(gotLengths, tc.lengths) { + t.Errorf("lengths = %v; want %v", gotLengths, tc.lengths) + } + }) + } +} + +func TestViewRemove(t *testing.T) { + // Success cases + for _, tc := range []struct { + desc string + // before is the contents for each buffer node initially. + before []string + // after is the contents for each buffer node after removal. + after []string + offset int + length int + }{ + { + desc: "empty view", + }, + { + desc: "nothing removed", + before: []string{"hello", " world"}, + after: []string{"hello", " world"}, + }, + { + desc: "whole view", + before: []string{"hello", " world"}, + offset: 0, + length: 11, + }, + { + desc: "beginning to middle aligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"9abcd"}, + offset: 0, + length: 9, + }, + { + desc: "beginning to middle unaligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"678", "9abcd"}, + offset: 0, + length: 6, + }, + { + desc: "middle to end aligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"0123"}, + offset: 4, + length: 10, + }, + { + desc: "middle to end unaligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"0123", "45"}, + offset: 6, + length: 8, + }, + { + desc: "middle aligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"0123", "9abcd"}, + offset: 4, + length: 5, + }, + { + desc: "middle unaligned", + before: []string{"0123", "45678", "9abcd"}, + after: []string{"0123", "4578", "9abcd"}, + offset: 6, + length: 1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var v View + for _, s := range tc.before { + v.AppendOwned([]byte(s)) + } + + if ok := v.Remove(tc.offset, tc.length); !ok { + t.Errorf("v.Remove(%d, %d) = false, want true", tc.offset, tc.length) + } + + var got []string + for buf := v.data.Front(); buf != nil; buf = buf.Next() { + got = append(got, string(buf.ReadSlice())) + } + if !reflect.DeepEqual(got, tc.after) { + t.Errorf("after = %v; want %v", got, tc.after) + } + }) + } + + // Failure cases + for _, tc := range []struct { + desc string + // before is the contents for each buffer node initially. + before []string + offset int + length int + }{ + { + desc: "offset out-of-range", + before: []string{"hello", " world"}, + offset: -1, + length: 3, + }, + { + desc: "length too long", + before: []string{"hello", " world"}, + offset: 0, + length: 12, + }, + { + desc: "length too long with positive offset", + before: []string{"hello", " world"}, + offset: 3, + length: 9, + }, + { + desc: "length negative", + before: []string{"hello", " world"}, + offset: 0, + length: -1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var v View + for _, s := range tc.before { + v.AppendOwned([]byte(s)) + } + if ok := v.Remove(tc.offset, tc.length); ok { + t.Errorf("v.Remove(%d, %d) = true, want false", tc.offset, tc.length) + } + }) + } +} + +func TestViewSubApply(t *testing.T) { + var v View + v.AppendOwned([]byte("0123")) + v.AppendOwned([]byte("45678")) + v.AppendOwned([]byte("9abcd")) + + data := []byte("0123456789abcd") + + for i := 0; i <= len(data); i++ { + for j := i; j <= len(data); j++ { + t.Run(fmt.Sprintf("SubApply(%d,%d)", i, j), func(t *testing.T) { + var got []byte + v.SubApply(i, j-i, func(b []byte) { + got = append(got, b...) + }) + if want := data[i:j]; !bytes.Equal(got, want) { + t.Errorf("got = %q; want %q", got, want) + } + }) + } + } +} + func doSaveAndLoad(t *testing.T, toSave, toLoad *View) { t.Helper() var buf bytes.Buffer @@ -542,3 +817,84 @@ func TestSaveRestoreView(t *testing.T) { t.Errorf("v.Flatten() = %x, want %x", got, data) } } + +func TestRangeIntersect(t *testing.T) { + for _, tc := range []struct { + desc string + x, y, want Range + }{ + { + desc: "empty intersects empty", + }, + { + desc: "empty intersection", + x: Range{end: 10}, + y: Range{begin: 10, end: 20}, + }, + { + desc: "some intersection", + x: Range{begin: 5, end: 20}, + y: Range{end: 10}, + want: Range{begin: 5, end: 10}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if got := tc.x.Intersect(tc.y); got != tc.want { + t.Errorf("(%#v).Intersect(%#v) = %#v; want %#v", tc.x, tc.y, got, tc.want) + } + if got := tc.y.Intersect(tc.x); got != tc.want { + t.Errorf("(%#v).Intersect(%#v) = %#v; want %#v", tc.y, tc.x, got, tc.want) + } + }) + } +} + +func TestRangeOffset(t *testing.T) { + for _, tc := range []struct { + input Range + offset int + output Range + }{ + { + input: Range{}, + offset: 0, + output: Range{}, + }, + { + input: Range{}, + offset: -1, + output: Range{begin: -1, end: -1}, + }, + { + input: Range{begin: 10, end: 20}, + offset: -1, + output: Range{begin: 9, end: 19}, + }, + { + input: Range{begin: 10, end: 20}, + offset: 2, + output: Range{begin: 12, end: 22}, + }, + } { + if got := tc.input.Offset(tc.offset); got != tc.output { + t.Errorf("(%#v).Offset(%d) = %#v, want %#v", tc.input, tc.offset, got, tc.output) + } + } +} + +func TestRangeLen(t *testing.T) { + for _, tc := range []struct { + r Range + want int + }{ + {r: Range{}, want: 0}, + {r: Range{begin: 1, end: 1}, want: 0}, + {r: Range{begin: -1, end: -1}, want: 0}, + {r: Range{end: 10}, want: 10}, + {r: Range{begin: 5, end: 10}, want: 5}, + } { + if got := tc.r.Len(); got != tc.want { + t.Errorf("(%#v).Len() = %d, want %d", tc.r, got, tc.want) + } + } +} -- cgit v1.2.3