From 0e2f1b7abd219f39d67cc2cecd00c441a13eeb29 Mon Sep 17 00:00:00 2001 From: Adin Scannell Date: Mon, 27 Jan 2020 15:17:58 -0800 Subject: Update package locations. Because the abi will depend on the core types for marshalling (usermem, context, safemem, safecopy), these need to be flattened from the sentry directory. These packages contain no sentry-specific details. PiperOrigin-RevId: 291811289 --- pkg/sentry/kernel/pipe/buffer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'pkg/sentry/kernel/pipe/buffer_test.go') diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/buffer_test.go index ee1b90115..4d54b8b8f 100644 --- a/pkg/sentry/kernel/pipe/buffer_test.go +++ b/pkg/sentry/kernel/pipe/buffer_test.go @@ -18,7 +18,7 @@ import ( "testing" "unsafe" - "gvisor.dev/gvisor/pkg/sentry/usermem" + "gvisor.dev/gvisor/pkg/usermem" ) func TestBufferSize(t *testing.T) { -- cgit v1.2.3 From 463f4217d109ded8af758fe51a5daf8670da9794 Mon Sep 17 00:00:00 2001 From: Adin Scannell Date: Fri, 28 Feb 2020 12:28:10 -0800 Subject: Make pipe buffer implementation standard. A follow-up change will convert the networking code to use this standard pipe implementation. PiperOrigin-RevId: 297903206 --- pkg/buffer/BUILD | 39 ++++ pkg/buffer/buffer.go | 67 ++++++ pkg/buffer/safemem.go | 131 ++++++++++++ pkg/buffer/view.go | 382 ++++++++++++++++++++++++++++++++++ pkg/buffer/view_test.go | 233 +++++++++++++++++++++ pkg/buffer/view_unsafe.go | 25 +++ pkg/sentry/kernel/pipe/BUILD | 18 +- pkg/sentry/kernel/pipe/buffer.go | 115 ---------- pkg/sentry/kernel/pipe/buffer_test.go | 32 --- pkg/sentry/kernel/pipe/pipe.go | 118 ++--------- pkg/sentry/kernel/pipe/pipe_util.go | 25 +-- 11 files changed, 912 insertions(+), 273 deletions(-) create mode 100644 pkg/buffer/BUILD create mode 100644 pkg/buffer/buffer.go create mode 100644 pkg/buffer/safemem.go create mode 100644 pkg/buffer/view.go create mode 100644 pkg/buffer/view_test.go create mode 100644 pkg/buffer/view_unsafe.go delete mode 100644 pkg/sentry/kernel/pipe/buffer.go delete mode 100644 pkg/sentry/kernel/pipe/buffer_test.go (limited to 'pkg/sentry/kernel/pipe/buffer_test.go') diff --git a/pkg/buffer/BUILD b/pkg/buffer/BUILD new file mode 100644 index 000000000..a77a3beea --- /dev/null +++ b/pkg/buffer/BUILD @@ -0,0 +1,39 @@ +load("//tools:defs.bzl", "go_library", "go_test") +load("//tools/go_generics:defs.bzl", "go_template_instance") + +package(licenses = ["notice"]) + +go_template_instance( + name = "buffer_list", + out = "buffer_list.go", + package = "buffer", + prefix = "buffer", + template = "//pkg/ilist:generic_list", + types = { + "Element": "*Buffer", + "Linker": "*Buffer", + }, +) + +go_library( + name = "buffer", + srcs = [ + "buffer.go", + "buffer_list.go", + "safemem.go", + "view.go", + "view_unsafe.go", + ], + visibility = ["//visibility:public"], + deps = [ + "//pkg/log", + "//pkg/safemem", + ], +) + +go_test( + name = "buffer_test", + size = "small", + srcs = ["view_test.go"], + library = ":buffer", +) diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go new file mode 100644 index 000000000..d5f64609b --- /dev/null +++ b/pkg/buffer/buffer.go @@ -0,0 +1,67 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package buffer provides the implementation of a buffer view. +package buffer + +import ( + "sync" +) + +const bufferSize = 8144 // See below. + +// Buffer encapsulates a queueable byte buffer. +// +// Note that the total size is slightly less than two pages. This is done +// intentionally to ensure that the buffer object aligns with runtime +// internals. We have no hard size or alignment requirements. This two page +// size will effectively minimize internal fragmentation, but still have a +// large enough chunk to limit excessive segmentation. +// +// +stateify savable +type Buffer struct { + data [bufferSize]byte + read int + write int + bufferEntry +} + +// Reset resets internal data. +// +// This must be called before use. +func (b *Buffer) Reset() { + b.read = 0 + b.write = 0 +} + +// Empty indicates the buffer is empty. +// +// This indicates there is no data left to read. +func (b *Buffer) Empty() bool { + return b.read == b.write +} + +// Full indicates the buffer is full. +// +// This indicates there is no capacity left to write. +func (b *Buffer) Full() bool { + return b.write == len(b.data) +} + +// bufferPool is a pool for buffers. +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(Buffer) + }, +} diff --git a/pkg/buffer/safemem.go b/pkg/buffer/safemem.go new file mode 100644 index 000000000..071aaa488 --- /dev/null +++ b/pkg/buffer/safemem.go @@ -0,0 +1,131 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "io" + + "gvisor.dev/gvisor/pkg/safemem" +) + +// WriteBlock returns this buffer as a write Block. +func (b *Buffer) WriteBlock() safemem.Block { + return safemem.BlockFromSafeSlice(b.data[b.write:]) +} + +// ReadBlock returns this buffer as a read Block. +func (b *Buffer) ReadBlock() safemem.Block { + return safemem.BlockFromSafeSlice(b.data[b.read:b.write]) +} + +// WriteFromBlocks implements safemem.Writer.WriteFromBlocks. +// +// This will advance the write index. +func (v *View) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { + need := int(srcs.NumBytes()) + if need == 0 { + return 0, nil + } + + var ( + dst safemem.BlockSeq + blocks []safemem.Block + ) + + // Need at least one buffer. + firstBuf := v.data.Back() + if firstBuf == nil { + firstBuf = bufferPool.Get().(*Buffer) + v.data.PushBack(firstBuf) + } + + // Does the last block have sufficient capacity alone? + if l := len(firstBuf.data) - firstBuf.write; l >= need { + dst = safemem.BlockSeqOf(firstBuf.WriteBlock()) + } else { + // Append blocks until sufficient. + need -= l + blocks = append(blocks, firstBuf.WriteBlock()) + for need > 0 { + emptyBuf := bufferPool.Get().(*Buffer) + v.data.PushBack(emptyBuf) + need -= len(emptyBuf.data) // Full block. + blocks = append(blocks, emptyBuf.WriteBlock()) + } + dst = safemem.BlockSeqFromSlice(blocks) + } + + // Perform the copy. + n, err := safemem.CopySeq(dst, srcs) + v.size += int64(n) + + // Update all indices. + for left := int(n); left > 0; firstBuf = firstBuf.Next() { + if l := len(firstBuf.data) - firstBuf.write; left >= l { + firstBuf.write += l // Whole block. + left -= l + } else { + firstBuf.write += left // Partial block. + left = 0 + } + } + + return n, err +} + +// ReadToBlocks implements safemem.Reader.ReadToBlocks. +// +// This will not advance the read index; the caller should follow +// this call with a call to TrimFront in order to remove the read +// data from the buffer. This is done to support pipe sematics. +func (v *View) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { + need := int(dsts.NumBytes()) + if need == 0 { + return 0, nil + } + + var ( + src safemem.BlockSeq + blocks []safemem.Block + ) + + firstBuf := v.data.Front() + if firstBuf == nil { + return 0, io.EOF + } + + // Is all the data in a single block? + if l := firstBuf.write - firstBuf.read; l >= need { + src = safemem.BlockSeqOf(firstBuf.ReadBlock()) + } else { + // Build a list of all the buffers. + need -= l + blocks = append(blocks, firstBuf.ReadBlock()) + for buf := firstBuf.Next(); buf != nil && need > 0; buf = buf.Next() { + need -= buf.write - buf.read + blocks = append(blocks, buf.ReadBlock()) + } + src = safemem.BlockSeqFromSlice(blocks) + } + + // Perform the copy. + n, err := safemem.CopySeq(dsts, src) + + // See above: we would normally advance the read index here, but we + // don't do that in order to support pipe semantics. We rely on a + // separate call to TrimFront() in this case. + + return n, err +} diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go new file mode 100644 index 000000000..00fc11e9c --- /dev/null +++ b/pkg/buffer/view.go @@ -0,0 +1,382 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "fmt" + "io" +) + +// View is a non-linear buffer. +// +// All methods are thread compatible. +// +// +stateify savable +type View struct { + data bufferList + size int64 +} + +// TrimFront removes the first count bytes from the buffer. +func (v *View) TrimFront(count int64) { + if count >= v.size { + v.advanceRead(v.size) + } else { + v.advanceRead(count) + } +} + +// Read implements io.Reader.Read. +// +// Note that reading does not advance the read index. This must be done +// manually using TrimFront or other methods. +func (v *View) Read(p []byte) (int, error) { + return v.ReadAt(p, 0) +} + +// ReadAt implements io.ReaderAt.ReadAt. +func (v *View) ReadAt(p []byte, offset int64) (int, error) { + var ( + skipped int64 + done int64 + ) + for buf := v.data.Front(); buf != nil && done < int64(len(p)); buf = buf.Next() { + needToSkip := int(offset - skipped) + if l := buf.write - buf.read; l <= needToSkip { + skipped += int64(l) + continue + } + + // Actually read data. + n := copy(p[done:], buf.data[buf.read+needToSkip:buf.write]) + skipped += int64(needToSkip) + done += int64(n) + } + if int(done) < len(p) { + return int(done), io.EOF + } + return int(done), nil +} + +// Write implements io.Writer.Write. +func (v *View) Write(p []byte) (int, error) { + v.Append(p) // Does not fail. + return len(p), nil +} + +// advanceRead advances the view's read index. +// +// Precondition: there must be sufficient bytes in the buffer. +func (v *View) advanceRead(count int64) { + for buf := v.data.Front(); buf != nil && count > 0; { + l := int64(buf.write - buf.read) + if l > count { + // There is still data for reading. + buf.read += int(count) + v.size -= count + count = 0 + break + } + + // Read from this buffer. + buf.read += int(l) + count -= l + v.size -= l + + // When all data has been read from a buffer, we push + // it into the empty buffer pool for reuse. + oldBuf := buf + buf = buf.Next() // Iterate. + v.data.Remove(oldBuf) + oldBuf.Reset() + bufferPool.Put(oldBuf) + } + if count > 0 { + panic(fmt.Sprintf("advanceRead still has %d bytes remaining", count)) + } +} + +// Truncate truncates the view to the given bytes. +func (v *View) Truncate(length int64) { + if length < 0 || length >= v.size { + return // Nothing to do. + } + for buf := v.data.Back(); buf != nil && v.size > length; buf = v.data.Back() { + l := int64(buf.write - buf.read) // Local bytes. + switch { + case v.size-l >= length: + // Drop the buffer completely; see above. + v.data.Remove(buf) + v.size -= l + buf.Reset() + bufferPool.Put(buf) + + case v.size > length && v.size-l < length: + // Just truncate the buffer locally. + delta := (length - (v.size - l)) + buf.write = buf.read + int(delta) + v.size = length + + default: + // Should never happen. + panic("invalid buffer during truncation") + } + } + v.size = length // Save the new size. +} + +// Grow grows the given view to the number of bytes. If zero +// is true, all these bytes will be zero. If zero is false, +// then this is the caller's responsibility. +// +// Precondition: length must be >= 0. +func (v *View) Grow(length int64, zero bool) { + if length < 0 { + panic("negative length provided") + } + for v.size < length { + buf := v.data.Back() + + // Is there at least one buffer? + if buf == nil || buf.Full() { + buf = bufferPool.Get().(*Buffer) + v.data.PushBack(buf) + } + + // Write up to length bytes. + l := len(buf.data) - buf.write + if int64(l) > length-v.size { + l = int(length - v.size) + } + + // Zero the written section; note that this pattern is + // specifically recognized and optimized by the compiler. + if zero { + for i := buf.write; i < buf.write+l; i++ { + buf.data[i] = 0 + } + } + + // Advance the index. + buf.write += l + v.size += int64(l) + } +} + +// Prepend prepends the given data. +func (v *View) Prepend(data []byte) { + // Is there any space in the first buffer? + if buf := v.data.Front(); buf != nil && buf.read > 0 { + // Fill up before the first write. + avail := buf.read + copy(buf.data[0:], data[len(data)-avail:]) + data = data[:len(data)-avail] + v.size += int64(avail) + } + + for len(data) > 0 { + // Do we need an empty buffer? + buf := bufferPool.Get().(*Buffer) + v.data.PushFront(buf) + + // The buffer is empty; copy last chunk. + start := len(data) - len(buf.data) + if start < 0 { + start = 0 // Everything. + } + + // We have to put the data at the end of the current + // buffer in order to ensure that the next prepend will + // correctly fill up the beginning of this buffer. + bStart := len(buf.data) - len(data[start:]) + n := copy(buf.data[bStart:], data[start:]) + buf.read = bStart + buf.write = len(buf.data) + data = data[:start] + v.size += int64(n) + } +} + +// Append appends the given data. +func (v *View) Append(data []byte) { + for done := 0; done < len(data); { + buf := v.data.Back() + + // Find the first empty buffer. + if buf == nil || buf.Full() { + buf = bufferPool.Get().(*Buffer) + v.data.PushBack(buf) + } + + // Copy in to the given buffer. + n := copy(buf.data[buf.write:], data[done:]) + done += n + buf.write += n + v.size += int64(n) + } +} + +// Flatten returns a flattened copy of this data. +// +// This method should not be used in any performance-sensitive paths. It may +// allocate a fresh byte slice sufficiently large to contain all the data in +// the buffer. +// +// N.B. Tee data still belongs to this view, as if there is a single buffer +// present, then it will be returned directly. This should be used for +// temporary use only, and a reference to the given slice should not be held. +func (v *View) Flatten() []byte { + if buf := v.data.Front(); buf.Next() == nil { + return buf.data[buf.read:buf.write] // Only one buffer. + } + data := make([]byte, 0, v.size) // Need to flatten. + for buf := v.data.Front(); buf != nil; buf = buf.Next() { + // Copy to the allocated slice. + data = append(data, buf.data[buf.read:buf.write]...) + } + return data +} + +// Size indicates the total amount of data available in this view. +func (v *View) Size() (sz int64) { + sz = v.size // Pre-calculated. + return sz +} + +// Copy makes a strict copy of this view. +func (v *View) Copy() (other View) { + for buf := v.data.Front(); buf != nil; buf = buf.Next() { + other.Append(buf.data[buf.read:buf.write]) + } + return other +} + +// Apply applies the given function across all valid data. +func (v *View) Apply(fn func([]byte)) { + for buf := v.data.Front(); buf != nil; buf = buf.Next() { + if l := int64(buf.write - buf.read); l > 0 { + fn(buf.data[buf.read:buf.write]) + } + } +} + +// Merge merges the provided View with this one. +// +// The other view will be empty after this operation. +func (v *View) Merge(other *View) { + // Copy over all buffers. + for buf := other.data.Front(); buf != nil && !buf.Empty(); buf = other.data.Front() { + other.data.Remove(buf) + v.data.PushBack(buf) + } + + // Adjust sizes. + v.size += other.size + other.size = 0 +} + +// WriteFromReader writes to the buffer from an io.Reader. +func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) { + var ( + done int64 + n int + err error + ) + for done < count { + buf := v.data.Back() + + // Find the first empty buffer. + if buf == nil || buf.Full() { + buf = bufferPool.Get().(*Buffer) + v.data.PushBack(buf) + } + + // Is this less than the minimum batch? + if len(buf.data[buf.write:]) < minBatch && (count-done) >= int64(minBatch) { + tmp := make([]byte, minBatch) + n, err = r.Read(tmp) + v.Write(tmp[:n]) + done += int64(n) + if err != nil { + break + } + continue + } + + // Limit the read, if necessary. + end := len(buf.data) + if int64(end-buf.write) > (count - done) { + end = buf.write + int(count-done) + } + + // Pass the relevant portion of the buffer. + n, err = r.Read(buf.data[buf.write:end]) + buf.write += n + done += int64(n) + v.size += int64(n) + if err == io.EOF { + err = nil // Short write allowed. + break + } else if err != nil { + break + } + } + return done, err +} + +// ReadToWriter reads from the buffer into an io.Writer. +// +// N.B. This does not consume the bytes read. TrimFront should +// be called appropriately after this call in order to do so. +func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) { + var ( + done int64 + n int + err error + ) + offset := 0 // Spill-over for batching. + for buf := v.data.Front(); buf != nil && done < count; buf = buf.Next() { + l := buf.write - buf.read - offset + + // Is this less than the minimum batch? + if l < minBatch && (count-done) >= int64(minBatch) && (v.size-done) >= int64(minBatch) { + tmp := make([]byte, minBatch) + n, err = v.ReadAt(tmp, done) + w.Write(tmp[:n]) + done += int64(n) + offset = n - l // Reset below. + if err != nil { + break + } + continue + } + + // Limit the write if necessary. + if int64(l) >= (count - done) { + l = int(count - done) + } + + // Perform the actual write. + n, err = w.Write(buf.data[buf.read+offset : buf.read+offset+l]) + done += int64(n) + if err != nil { + break + } + + // Reset spill-over. + offset = 0 + } + return done, err +} diff --git a/pkg/buffer/view_test.go b/pkg/buffer/view_test.go new file mode 100644 index 000000000..37e652f16 --- /dev/null +++ b/pkg/buffer/view_test.go @@ -0,0 +1,233 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "bytes" + "strings" + "testing" +) + +func TestView(t *testing.T) { + testCases := []struct { + name string + input string + output string + ops []func(*View) + }{ + // Prepend. + { + name: "prepend", + input: "world", + ops: []func(*View){ + func(v *View) { + v.Prepend([]byte("hello ")) + }, + }, + output: "hello world", + }, + { + name: "prepend fill", + input: strings.Repeat("1", bufferSize-1), + ops: []func(*View){ + func(v *View) { + v.Prepend([]byte("0")) + }, + }, + output: "0" + strings.Repeat("1", bufferSize-1), + }, + { + name: "prepend overflow", + input: strings.Repeat("1", bufferSize), + ops: []func(*View){ + func(v *View) { + v.Prepend([]byte("0")) + }, + }, + output: "0" + strings.Repeat("1", bufferSize), + }, + { + name: "prepend multiple buffers", + input: strings.Repeat("1", bufferSize-1), + ops: []func(*View){ + func(v *View) { + v.Prepend([]byte(strings.Repeat("0", bufferSize*3))) + }, + }, + output: strings.Repeat("0", bufferSize*3) + strings.Repeat("1", bufferSize-1), + }, + + // Append. + { + name: "append", + input: "hello", + ops: []func(*View){ + func(v *View) { + v.Append([]byte(" world")) + }, + }, + output: "hello world", + }, + { + name: "append fill", + input: strings.Repeat("1", bufferSize-1), + ops: []func(*View){ + func(v *View) { + v.Append([]byte("0")) + }, + }, + output: strings.Repeat("1", bufferSize-1) + "0", + }, + { + name: "append overflow", + input: strings.Repeat("1", bufferSize), + ops: []func(*View){ + func(v *View) { + v.Append([]byte("0")) + }, + }, + output: strings.Repeat("1", bufferSize) + "0", + }, + { + name: "append multiple buffers", + input: strings.Repeat("1", bufferSize-1), + ops: []func(*View){ + func(v *View) { + v.Append([]byte(strings.Repeat("0", bufferSize*3))) + }, + }, + output: strings.Repeat("1", bufferSize-1) + strings.Repeat("0", bufferSize*3), + }, + + // Truncate. + { + name: "truncate", + input: "hello world", + ops: []func(*View){ + func(v *View) { + v.Truncate(5) + }, + }, + output: "hello", + }, + { + name: "truncate multiple buffers", + input: strings.Repeat("1", bufferSize*2), + ops: []func(*View){ + func(v *View) { + v.Truncate(bufferSize*2 - 1) + }, + }, + output: strings.Repeat("1", bufferSize*2-1), + }, + { + name: "truncate multiple buffers to one buffer", + input: strings.Repeat("1", bufferSize*2), + ops: []func(*View){ + func(v *View) { + v.Truncate(5) + }, + }, + output: "11111", + }, + + // TrimFront. + { + name: "trim", + input: "hello world", + ops: []func(*View){ + func(v *View) { + v.TrimFront(6) + }, + }, + output: "world", + }, + { + name: "trim multiple buffers", + input: strings.Repeat("1", bufferSize*2), + ops: []func(*View){ + func(v *View) { + v.TrimFront(1) + }, + }, + output: strings.Repeat("1", bufferSize*2-1), + }, + { + name: "trim multiple buffers to one buffer", + input: strings.Repeat("1", bufferSize*2), + ops: []func(*View){ + func(v *View) { + v.TrimFront(bufferSize*2 - 1) + }, + }, + output: "1", + }, + + // Grow. + { + name: "grow", + input: "hello world", + ops: []func(*View){ + func(v *View) { + v.Grow(1, true) + }, + }, + output: "hello world", + }, + { + name: "grow from zero", + ops: []func(*View){ + func(v *View) { + v.Grow(1024, true) + }, + }, + output: strings.Repeat("\x00", 1024), + }, + { + name: "grow from non-zero", + input: strings.Repeat("1", bufferSize), + ops: []func(*View){ + func(v *View) { + v.Grow(bufferSize*2, true) + }, + }, + output: strings.Repeat("1", bufferSize) + strings.Repeat("\x00", bufferSize), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Construct the new view. + var view View + view.Append([]byte(tc.input)) + + // Run all operations. + for _, op := range tc.ops { + op(&view) + } + + // Flatten and validate. + out := view.Flatten() + if !bytes.Equal([]byte(tc.output), out) { + t.Errorf("expected %q, got %q", tc.output, string(out)) + } + + // Ensure the size is correct. + if len(out) != int(view.Size()) { + t.Errorf("size is wrong: expected %d, got %d", len(out), view.Size()) + } + }) + } +} diff --git a/pkg/buffer/view_unsafe.go b/pkg/buffer/view_unsafe.go new file mode 100644 index 000000000..d1ef39b26 --- /dev/null +++ b/pkg/buffer/view_unsafe.go @@ -0,0 +1,25 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buffer + +import ( + "unsafe" +) + +// minBatch is the smallest Read or Write operation that the +// WriteFromReader and ReadToWriter functions will use. +// +// This is defined as the size of a native pointer. +const minBatch = int(unsafe.Sizeof(uintptr(0))) diff --git a/pkg/sentry/kernel/pipe/BUILD b/pkg/sentry/kernel/pipe/BUILD index 4c049d5b4..f29dc0472 100644 --- a/pkg/sentry/kernel/pipe/BUILD +++ b/pkg/sentry/kernel/pipe/BUILD @@ -1,25 +1,10 @@ load("//tools:defs.bzl", "go_library", "go_test") -load("//tools/go_generics:defs.bzl", "go_template_instance") package(licenses = ["notice"]) -go_template_instance( - name = "buffer_list", - out = "buffer_list.go", - package = "pipe", - prefix = "buffer", - template = "//pkg/ilist:generic_list", - types = { - "Element": "*buffer", - "Linker": "*buffer", - }, -) - go_library( name = "pipe", srcs = [ - "buffer.go", - "buffer_list.go", "device.go", "node.go", "pipe.go", @@ -33,8 +18,8 @@ go_library( deps = [ "//pkg/abi/linux", "//pkg/amutex", + "//pkg/buffer", "//pkg/context", - "//pkg/safemem", "//pkg/sentry/arch", "//pkg/sentry/device", "//pkg/sentry/fs", @@ -51,7 +36,6 @@ go_test( name = "pipe_test", size = "small", srcs = [ - "buffer_test.go", "node_test.go", "pipe_test.go", ], diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go deleted file mode 100644 index fe3be5dbd..000000000 --- a/pkg/sentry/kernel/pipe/buffer.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "io" - - "gvisor.dev/gvisor/pkg/safemem" - "gvisor.dev/gvisor/pkg/sync" -) - -// buffer encapsulates a queueable byte buffer. -// -// Note that the total size is slightly less than two pages. This -// is done intentionally to ensure that the buffer object aligns -// with runtime internals. We have no hard size or alignment -// requirements. This two page size will effectively minimize -// internal fragmentation, but still have a large enough chunk -// to limit excessive segmentation. -// -// +stateify savable -type buffer struct { - data [8144]byte - read int - write int - bufferEntry -} - -// Reset resets internal data. -// -// This must be called before use. -func (b *buffer) Reset() { - b.read = 0 - b.write = 0 -} - -// Empty indicates the buffer is empty. -// -// This indicates there is no data left to read. -func (b *buffer) Empty() bool { - return b.read == b.write -} - -// Full indicates the buffer is full. -// -// This indicates there is no capacity left to write. -func (b *buffer) Full() bool { - return b.write == len(b.data) -} - -// WriteFromBlocks implements safemem.Writer.WriteFromBlocks. -func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { - dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:])) - n, err := safemem.CopySeq(dst, srcs) - b.write += int(n) - return n, err -} - -// WriteFromReader writes to the buffer from an io.Reader. -func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) { - dst := b.data[b.write:] - if count < int64(len(dst)) { - dst = b.data[b.write:][:count] - } - n, err := r.Read(dst) - b.write += n - return int64(n), err -} - -// ReadToBlocks implements safemem.Reader.ReadToBlocks. -func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { - src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write])) - n, err := safemem.CopySeq(dsts, src) - b.read += int(n) - return n, err -} - -// ReadToWriter reads from the buffer into an io.Writer. -func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) { - src := b.data[b.read:b.write] - if count < int64(len(src)) { - src = b.data[b.read:][:count] - } - n, err := w.Write(src) - if !dup { - b.read += n - } - return int64(n), err -} - -// bufferPool is a pool for buffers. -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(buffer) - }, -} - -// newBuffer grabs a new buffer from the pool. -func newBuffer() *buffer { - b := bufferPool.Get().(*buffer) - b.Reset() - return b -} diff --git a/pkg/sentry/kernel/pipe/buffer_test.go b/pkg/sentry/kernel/pipe/buffer_test.go deleted file mode 100644 index 4d54b8b8f..000000000 --- a/pkg/sentry/kernel/pipe/buffer_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "testing" - "unsafe" - - "gvisor.dev/gvisor/pkg/usermem" -) - -func TestBufferSize(t *testing.T) { - bufferSize := unsafe.Sizeof(buffer{}) - if bufferSize < usermem.PageSize { - t.Errorf("buffer is less than a page") - } - if bufferSize > (2 * usermem.PageSize) { - t.Errorf("buffer is greater than two pages") - } -} diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index 08410283f..725e9db7d 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "syscall" + "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sync" @@ -70,10 +71,10 @@ type Pipe struct { // mu protects all pipe internal state below. mu sync.Mutex `state:"nosave"` - // data is the buffer queue of pipe contents. + // view is the underlying set of buffers. // // This is protected by mu. - data bufferList + view buffer.View // max is the maximum size of the pipe in bytes. When this max has been // reached, writers will get EWOULDBLOCK. @@ -81,11 +82,6 @@ type Pipe struct { // This is protected by mu. max int64 - // size is the current size of the pipe in bytes. - // - // This is protected by mu. - size int64 - // hadWriter indicates if this pipe ever had a writer. Note that this // does not necessarily indicate there is *currently* a writer, just // that there has been a writer at some point since the pipe was @@ -196,7 +192,7 @@ type readOps struct { limit func(int64) // read performs the actual read operation. - read func(*buffer) (int64, error) + read func(*buffer.View) (int64, error) } // read reads data from the pipe into dst and returns the number of bytes @@ -213,7 +209,7 @@ func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) { defer p.mu.Unlock() // Is the pipe empty? - if p.size == 0 { + if p.view.Size() == 0 { if !p.HasWriters() { // There are no writers, return EOF. return 0, nil @@ -222,71 +218,13 @@ func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) { } // Limit how much we consume. - if ops.left() > p.size { - ops.limit(p.size) + if ops.left() > p.view.Size() { + ops.limit(p.view.Size()) } - done := int64(0) - for ops.left() > 0 { - // Pop the first buffer. - first := p.data.Front() - if first == nil { - break - } - - // Copy user data. - n, err := ops.read(first) - done += int64(n) - p.size -= n - - // Empty buffer? - if first.Empty() { - // Push to the free list. - p.data.Remove(first) - bufferPool.Put(first) - } - - // Handle errors. - if err != nil { - return done, err - } - } - - return done, nil -} - -// dup duplicates all data from this pipe into the given writer. -// -// There is no blocking behavior implemented here. The writer may propagate -// some blocking error. All the writes must be complete writes. -func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() - - // Is the pipe empty? - if p.size == 0 { - if !p.HasWriters() { - // See above. - return 0, nil - } - return 0, syserror.ErrWouldBlock - } - - // Limit how much we consume. - if ops.left() > p.size { - ops.limit(p.size) - } - - done := int64(0) - for buf := p.data.Front(); buf != nil; buf = buf.Next() { - n, err := ops.read(buf) - done += n - if err != nil { - return done, err - } - } - - return done, nil + // Copy user data; the read op is responsible for trimming. + done, err := ops.read(&p.view) + return done, err } type writeOps struct { @@ -297,7 +235,7 @@ type writeOps struct { limit func(int64) // write should write to the provided buffer. - write func(*buffer) (int64, error) + write func(*buffer.View) (int64, error) } // write writes data from sv into the pipe and returns the number of bytes @@ -317,33 +255,19 @@ func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) { // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be // atomic, but requires no atomicity for writes larger than this. wanted := ops.left() - if avail := p.max - p.size; wanted > avail { + if avail := p.max - p.view.Size(); wanted > avail { if wanted <= p.atomicIOBytes { return 0, syserror.ErrWouldBlock } ops.limit(avail) } - done := int64(0) - for ops.left() > 0 { - // Need a new buffer? - last := p.data.Back() - if last == nil || last.Full() { - // Add a new buffer to the data list. - last = newBuffer() - p.data.PushBack(last) - } - - // Copy user data. - n, err := ops.write(last) - done += int64(n) - p.size += n - - // Handle errors. - if err != nil { - return done, err - } + // Copy user data. + done, err := ops.write(&p.view) + if err != nil { + return done, err } + if wanted > done { // Partial write due to full pipe. return done, syserror.ErrWouldBlock @@ -396,7 +320,7 @@ func (p *Pipe) HasWriters() bool { // Precondition: mu must be held. func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasReaders() && p.data.Front() != nil { + if p.HasReaders() && p.view.Size() != 0 { ready |= waiter.EventIn } if !p.HasWriters() && p.hadWriter { @@ -422,7 +346,7 @@ func (p *Pipe) rReadiness() waiter.EventMask { // Precondition: mu must be held. func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasWriters() && p.size < p.max { + if p.HasWriters() && p.view.Size() < p.max { ready |= waiter.EventOut } if !p.HasReaders() { @@ -451,7 +375,7 @@ func (p *Pipe) rwReadiness() waiter.EventMask { func (p *Pipe) queued() int64 { p.mu.Lock() defer p.mu.Unlock() - return p.size + return p.view.Size() } // FifoSize implements fs.FifoSizer.FifoSize. @@ -474,7 +398,7 @@ func (p *Pipe) SetFifoSize(size int64) (int64, error) { } p.mu.Lock() defer p.mu.Unlock() - if size < p.size { + if size < p.view.Size() { return 0, syserror.EBUSY } p.max = size diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go index 80158239e..5a1d4fd57 100644 --- a/pkg/sentry/kernel/pipe/pipe_util.go +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -21,6 +21,7 @@ import ( "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/amutex" + "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sync" @@ -49,9 +50,10 @@ func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) limit: func(l int64) { dst = dst.TakeFirst64(l) }, - read: func(buf *buffer) (int64, error) { - n, err := dst.CopyOutFrom(ctx, buf) + read: func(view *buffer.View) (int64, error) { + n, err := dst.CopyOutFrom(ctx, view) dst = dst.DropFirst64(n) + view.TrimFront(n) return n, err }, }) @@ -70,16 +72,15 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) limit: func(l int64) { count = l }, - read: func(buf *buffer) (int64, error) { - n, err := buf.ReadToWriter(w, count, dup) + read: func(view *buffer.View) (int64, error) { + n, err := view.ReadToWriter(w, count) + if !dup { + view.TrimFront(n) + } count -= n return n, err }, } - if dup { - // There is no notification for dup operations. - return p.dup(ctx, ops) - } n, err := p.read(ctx, ops) if n > 0 { p.Notify(waiter.EventOut) @@ -96,8 +97,8 @@ func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) limit: func(l int64) { src = src.TakeFirst64(l) }, - write: func(buf *buffer) (int64, error) { - n, err := src.CopyInTo(ctx, buf) + write: func(view *buffer.View) (int64, error) { + n, err := src.CopyInTo(ctx, view) src = src.DropFirst64(n) return n, err }, @@ -117,8 +118,8 @@ func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, e limit: func(l int64) { count = l }, - write: func(buf *buffer) (int64, error) { - n, err := buf.WriteFromReader(r, count) + write: func(view *buffer.View) (int64, error) { + n, err := view.WriteFromReader(r, count) count -= n return n, err }, -- cgit v1.2.3