summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xpkg/buffer/buffer.go67
-rwxr-xr-xpkg/buffer/buffer_list.go (renamed from pkg/sentry/kernel/pipe/buffer_list.go)34
-rwxr-xr-xpkg/buffer/buffer_state_autogen.go70
-rwxr-xr-xpkg/buffer/safemem.go131
-rwxr-xr-xpkg/buffer/view.go382
-rwxr-xr-xpkg/buffer/view_unsafe.go25
-rw-r--r--pkg/sentry/kernel/pipe/buffer.go115
-rw-r--r--pkg/sentry/kernel/pipe/pipe.go118
-rwxr-xr-xpkg/sentry/kernel/pipe/pipe_state_autogen.go52
-rwxr-xr-xpkg/sentry/kernel/pipe/pipe_util.go25
10 files changed, 728 insertions, 291 deletions
diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go
new file mode 100755
index 000000000..d5f64609b
--- /dev/null
+++ b/pkg/buffer/buffer.go
@@ -0,0 +1,67 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package buffer provides the implementation of a buffer view.
+package buffer
+
+import (
+ "sync"
+)
+
+const bufferSize = 8144 // See below.
+
+// Buffer encapsulates a queueable byte buffer.
+//
+// Note that the total size is slightly less than two pages. This is done
+// intentionally to ensure that the buffer object aligns with runtime
+// internals. We have no hard size or alignment requirements. This two page
+// size will effectively minimize internal fragmentation, but still have a
+// large enough chunk to limit excessive segmentation.
+//
+// +stateify savable
+type Buffer struct {
+ data [bufferSize]byte
+ read int
+ write int
+ bufferEntry
+}
+
+// Reset resets internal data.
+//
+// This must be called before use.
+func (b *Buffer) Reset() {
+ b.read = 0
+ b.write = 0
+}
+
+// Empty indicates the buffer is empty.
+//
+// This indicates there is no data left to read.
+func (b *Buffer) Empty() bool {
+ return b.read == b.write
+}
+
+// Full indicates the buffer is full.
+//
+// This indicates there is no capacity left to write.
+func (b *Buffer) Full() bool {
+ return b.write == len(b.data)
+}
+
+// bufferPool is a pool for buffers.
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(Buffer)
+ },
+}
diff --git a/pkg/sentry/kernel/pipe/buffer_list.go b/pkg/buffer/buffer_list.go
index 42ec78788..f3c27c42c 100755
--- a/pkg/sentry/kernel/pipe/buffer_list.go
+++ b/pkg/buffer/buffer_list.go
@@ -1,4 +1,4 @@
-package pipe
+package buffer
// ElementMapper provides an identity mapping by default.
//
@@ -13,7 +13,7 @@ type bufferElementMapper struct{}
// This default implementation should be inlined.
//
//go:nosplit
-func (bufferElementMapper) linkerFor(elem *buffer) *buffer { return elem }
+func (bufferElementMapper) linkerFor(elem *Buffer) *Buffer { return elem }
// List is an intrusive list. Entries can be added to or removed from the list
// in O(1) time and with no additional memory allocations.
@@ -27,8 +27,8 @@ func (bufferElementMapper) linkerFor(elem *buffer) *buffer { return elem }
//
// +stateify savable
type bufferList struct {
- head *buffer
- tail *buffer
+ head *Buffer
+ tail *Buffer
}
// Reset resets list l to the empty state.
@@ -43,17 +43,17 @@ func (l *bufferList) Empty() bool {
}
// Front returns the first element of list l or nil.
-func (l *bufferList) Front() *buffer {
+func (l *bufferList) Front() *Buffer {
return l.head
}
// Back returns the last element of list l or nil.
-func (l *bufferList) Back() *buffer {
+func (l *bufferList) Back() *Buffer {
return l.tail
}
// PushFront inserts the element e at the front of list l.
-func (l *bufferList) PushFront(e *buffer) {
+func (l *bufferList) PushFront(e *Buffer) {
bufferElementMapper{}.linkerFor(e).SetNext(l.head)
bufferElementMapper{}.linkerFor(e).SetPrev(nil)
@@ -67,7 +67,7 @@ func (l *bufferList) PushFront(e *buffer) {
}
// PushBack inserts the element e at the back of list l.
-func (l *bufferList) PushBack(e *buffer) {
+func (l *bufferList) PushBack(e *Buffer) {
bufferElementMapper{}.linkerFor(e).SetNext(nil)
bufferElementMapper{}.linkerFor(e).SetPrev(l.tail)
@@ -97,7 +97,7 @@ func (l *bufferList) PushBackList(m *bufferList) {
}
// InsertAfter inserts e after b.
-func (l *bufferList) InsertAfter(b, e *buffer) {
+func (l *bufferList) InsertAfter(b, e *Buffer) {
a := bufferElementMapper{}.linkerFor(b).Next()
bufferElementMapper{}.linkerFor(e).SetNext(a)
bufferElementMapper{}.linkerFor(e).SetPrev(b)
@@ -111,7 +111,7 @@ func (l *bufferList) InsertAfter(b, e *buffer) {
}
// InsertBefore inserts e before a.
-func (l *bufferList) InsertBefore(a, e *buffer) {
+func (l *bufferList) InsertBefore(a, e *Buffer) {
b := bufferElementMapper{}.linkerFor(a).Prev()
bufferElementMapper{}.linkerFor(e).SetNext(a)
bufferElementMapper{}.linkerFor(e).SetPrev(b)
@@ -125,7 +125,7 @@ func (l *bufferList) InsertBefore(a, e *buffer) {
}
// Remove removes e from l.
-func (l *bufferList) Remove(e *buffer) {
+func (l *bufferList) Remove(e *Buffer) {
prev := bufferElementMapper{}.linkerFor(e).Prev()
next := bufferElementMapper{}.linkerFor(e).Next()
@@ -148,26 +148,26 @@ func (l *bufferList) Remove(e *buffer) {
//
// +stateify savable
type bufferEntry struct {
- next *buffer
- prev *buffer
+ next *Buffer
+ prev *Buffer
}
// Next returns the entry that follows e in the list.
-func (e *bufferEntry) Next() *buffer {
+func (e *bufferEntry) Next() *Buffer {
return e.next
}
// Prev returns the entry that precedes e in the list.
-func (e *bufferEntry) Prev() *buffer {
+func (e *bufferEntry) Prev() *Buffer {
return e.prev
}
// SetNext assigns 'entry' as the entry that follows e in the list.
-func (e *bufferEntry) SetNext(elem *buffer) {
+func (e *bufferEntry) SetNext(elem *Buffer) {
e.next = elem
}
// SetPrev assigns 'entry' as the entry that precedes e in the list.
-func (e *bufferEntry) SetPrev(elem *buffer) {
+func (e *bufferEntry) SetPrev(elem *Buffer) {
e.prev = elem
}
diff --git a/pkg/buffer/buffer_state_autogen.go b/pkg/buffer/buffer_state_autogen.go
new file mode 100755
index 000000000..9565eb6fa
--- /dev/null
+++ b/pkg/buffer/buffer_state_autogen.go
@@ -0,0 +1,70 @@
+// automatically generated by stateify.
+
+package buffer
+
+import (
+ "gvisor.dev/gvisor/pkg/state"
+)
+
+func (x *Buffer) beforeSave() {}
+func (x *Buffer) save(m state.Map) {
+ x.beforeSave()
+ m.Save("data", &x.data)
+ m.Save("read", &x.read)
+ m.Save("write", &x.write)
+ m.Save("bufferEntry", &x.bufferEntry)
+}
+
+func (x *Buffer) afterLoad() {}
+func (x *Buffer) load(m state.Map) {
+ m.Load("data", &x.data)
+ m.Load("read", &x.read)
+ m.Load("write", &x.write)
+ m.Load("bufferEntry", &x.bufferEntry)
+}
+
+func (x *bufferList) beforeSave() {}
+func (x *bufferList) save(m state.Map) {
+ x.beforeSave()
+ m.Save("head", &x.head)
+ m.Save("tail", &x.tail)
+}
+
+func (x *bufferList) afterLoad() {}
+func (x *bufferList) load(m state.Map) {
+ m.Load("head", &x.head)
+ m.Load("tail", &x.tail)
+}
+
+func (x *bufferEntry) beforeSave() {}
+func (x *bufferEntry) save(m state.Map) {
+ x.beforeSave()
+ m.Save("next", &x.next)
+ m.Save("prev", &x.prev)
+}
+
+func (x *bufferEntry) afterLoad() {}
+func (x *bufferEntry) load(m state.Map) {
+ m.Load("next", &x.next)
+ m.Load("prev", &x.prev)
+}
+
+func (x *View) beforeSave() {}
+func (x *View) save(m state.Map) {
+ x.beforeSave()
+ m.Save("data", &x.data)
+ m.Save("size", &x.size)
+}
+
+func (x *View) afterLoad() {}
+func (x *View) load(m state.Map) {
+ m.Load("data", &x.data)
+ m.Load("size", &x.size)
+}
+
+func init() {
+ state.Register("pkg/buffer.Buffer", (*Buffer)(nil), state.Fns{Save: (*Buffer).save, Load: (*Buffer).load})
+ state.Register("pkg/buffer.bufferList", (*bufferList)(nil), state.Fns{Save: (*bufferList).save, Load: (*bufferList).load})
+ state.Register("pkg/buffer.bufferEntry", (*bufferEntry)(nil), state.Fns{Save: (*bufferEntry).save, Load: (*bufferEntry).load})
+ state.Register("pkg/buffer.View", (*View)(nil), state.Fns{Save: (*View).save, Load: (*View).load})
+}
diff --git a/pkg/buffer/safemem.go b/pkg/buffer/safemem.go
new file mode 100755
index 000000000..071aaa488
--- /dev/null
+++ b/pkg/buffer/safemem.go
@@ -0,0 +1,131 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "io"
+
+ "gvisor.dev/gvisor/pkg/safemem"
+)
+
+// WriteBlock returns this buffer as a write Block.
+func (b *Buffer) WriteBlock() safemem.Block {
+ return safemem.BlockFromSafeSlice(b.data[b.write:])
+}
+
+// ReadBlock returns this buffer as a read Block.
+func (b *Buffer) ReadBlock() safemem.Block {
+ return safemem.BlockFromSafeSlice(b.data[b.read:b.write])
+}
+
+// WriteFromBlocks implements safemem.Writer.WriteFromBlocks.
+//
+// This will advance the write index.
+func (v *View) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
+ need := int(srcs.NumBytes())
+ if need == 0 {
+ return 0, nil
+ }
+
+ var (
+ dst safemem.BlockSeq
+ blocks []safemem.Block
+ )
+
+ // Need at least one buffer.
+ firstBuf := v.data.Back()
+ if firstBuf == nil {
+ firstBuf = bufferPool.Get().(*Buffer)
+ v.data.PushBack(firstBuf)
+ }
+
+ // Does the last block have sufficient capacity alone?
+ if l := len(firstBuf.data) - firstBuf.write; l >= need {
+ dst = safemem.BlockSeqOf(firstBuf.WriteBlock())
+ } else {
+ // Append blocks until sufficient.
+ need -= l
+ blocks = append(blocks, firstBuf.WriteBlock())
+ for need > 0 {
+ emptyBuf := bufferPool.Get().(*Buffer)
+ v.data.PushBack(emptyBuf)
+ need -= len(emptyBuf.data) // Full block.
+ blocks = append(blocks, emptyBuf.WriteBlock())
+ }
+ dst = safemem.BlockSeqFromSlice(blocks)
+ }
+
+ // Perform the copy.
+ n, err := safemem.CopySeq(dst, srcs)
+ v.size += int64(n)
+
+ // Update all indices.
+ for left := int(n); left > 0; firstBuf = firstBuf.Next() {
+ if l := len(firstBuf.data) - firstBuf.write; left >= l {
+ firstBuf.write += l // Whole block.
+ left -= l
+ } else {
+ firstBuf.write += left // Partial block.
+ left = 0
+ }
+ }
+
+ return n, err
+}
+
+// ReadToBlocks implements safemem.Reader.ReadToBlocks.
+//
+// This will not advance the read index; the caller should follow
+// this call with a call to TrimFront in order to remove the read
+// data from the buffer. This is done to support pipe sematics.
+func (v *View) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
+ need := int(dsts.NumBytes())
+ if need == 0 {
+ return 0, nil
+ }
+
+ var (
+ src safemem.BlockSeq
+ blocks []safemem.Block
+ )
+
+ firstBuf := v.data.Front()
+ if firstBuf == nil {
+ return 0, io.EOF
+ }
+
+ // Is all the data in a single block?
+ if l := firstBuf.write - firstBuf.read; l >= need {
+ src = safemem.BlockSeqOf(firstBuf.ReadBlock())
+ } else {
+ // Build a list of all the buffers.
+ need -= l
+ blocks = append(blocks, firstBuf.ReadBlock())
+ for buf := firstBuf.Next(); buf != nil && need > 0; buf = buf.Next() {
+ need -= buf.write - buf.read
+ blocks = append(blocks, buf.ReadBlock())
+ }
+ src = safemem.BlockSeqFromSlice(blocks)
+ }
+
+ // Perform the copy.
+ n, err := safemem.CopySeq(dsts, src)
+
+ // See above: we would normally advance the read index here, but we
+ // don't do that in order to support pipe semantics. We rely on a
+ // separate call to TrimFront() in this case.
+
+ return n, err
+}
diff --git a/pkg/buffer/view.go b/pkg/buffer/view.go
new file mode 100755
index 000000000..00fc11e9c
--- /dev/null
+++ b/pkg/buffer/view.go
@@ -0,0 +1,382 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "fmt"
+ "io"
+)
+
+// View is a non-linear buffer.
+//
+// All methods are thread compatible.
+//
+// +stateify savable
+type View struct {
+ data bufferList
+ size int64
+}
+
+// TrimFront removes the first count bytes from the buffer.
+func (v *View) TrimFront(count int64) {
+ if count >= v.size {
+ v.advanceRead(v.size)
+ } else {
+ v.advanceRead(count)
+ }
+}
+
+// Read implements io.Reader.Read.
+//
+// Note that reading does not advance the read index. This must be done
+// manually using TrimFront or other methods.
+func (v *View) Read(p []byte) (int, error) {
+ return v.ReadAt(p, 0)
+}
+
+// ReadAt implements io.ReaderAt.ReadAt.
+func (v *View) ReadAt(p []byte, offset int64) (int, error) {
+ var (
+ skipped int64
+ done int64
+ )
+ for buf := v.data.Front(); buf != nil && done < int64(len(p)); buf = buf.Next() {
+ needToSkip := int(offset - skipped)
+ if l := buf.write - buf.read; l <= needToSkip {
+ skipped += int64(l)
+ continue
+ }
+
+ // Actually read data.
+ n := copy(p[done:], buf.data[buf.read+needToSkip:buf.write])
+ skipped += int64(needToSkip)
+ done += int64(n)
+ }
+ if int(done) < len(p) {
+ return int(done), io.EOF
+ }
+ return int(done), nil
+}
+
+// Write implements io.Writer.Write.
+func (v *View) Write(p []byte) (int, error) {
+ v.Append(p) // Does not fail.
+ return len(p), nil
+}
+
+// advanceRead advances the view's read index.
+//
+// Precondition: there must be sufficient bytes in the buffer.
+func (v *View) advanceRead(count int64) {
+ for buf := v.data.Front(); buf != nil && count > 0; {
+ l := int64(buf.write - buf.read)
+ if l > count {
+ // There is still data for reading.
+ buf.read += int(count)
+ v.size -= count
+ count = 0
+ break
+ }
+
+ // Read from this buffer.
+ buf.read += int(l)
+ count -= l
+ v.size -= l
+
+ // When all data has been read from a buffer, we push
+ // it into the empty buffer pool for reuse.
+ oldBuf := buf
+ buf = buf.Next() // Iterate.
+ v.data.Remove(oldBuf)
+ oldBuf.Reset()
+ bufferPool.Put(oldBuf)
+ }
+ if count > 0 {
+ panic(fmt.Sprintf("advanceRead still has %d bytes remaining", count))
+ }
+}
+
+// Truncate truncates the view to the given bytes.
+func (v *View) Truncate(length int64) {
+ if length < 0 || length >= v.size {
+ return // Nothing to do.
+ }
+ for buf := v.data.Back(); buf != nil && v.size > length; buf = v.data.Back() {
+ l := int64(buf.write - buf.read) // Local bytes.
+ switch {
+ case v.size-l >= length:
+ // Drop the buffer completely; see above.
+ v.data.Remove(buf)
+ v.size -= l
+ buf.Reset()
+ bufferPool.Put(buf)
+
+ case v.size > length && v.size-l < length:
+ // Just truncate the buffer locally.
+ delta := (length - (v.size - l))
+ buf.write = buf.read + int(delta)
+ v.size = length
+
+ default:
+ // Should never happen.
+ panic("invalid buffer during truncation")
+ }
+ }
+ v.size = length // Save the new size.
+}
+
+// Grow grows the given view to the number of bytes. If zero
+// is true, all these bytes will be zero. If zero is false,
+// then this is the caller's responsibility.
+//
+// Precondition: length must be >= 0.
+func (v *View) Grow(length int64, zero bool) {
+ if length < 0 {
+ panic("negative length provided")
+ }
+ for v.size < length {
+ buf := v.data.Back()
+
+ // Is there at least one buffer?
+ if buf == nil || buf.Full() {
+ buf = bufferPool.Get().(*Buffer)
+ v.data.PushBack(buf)
+ }
+
+ // Write up to length bytes.
+ l := len(buf.data) - buf.write
+ if int64(l) > length-v.size {
+ l = int(length - v.size)
+ }
+
+ // Zero the written section; note that this pattern is
+ // specifically recognized and optimized by the compiler.
+ if zero {
+ for i := buf.write; i < buf.write+l; i++ {
+ buf.data[i] = 0
+ }
+ }
+
+ // Advance the index.
+ buf.write += l
+ v.size += int64(l)
+ }
+}
+
+// Prepend prepends the given data.
+func (v *View) Prepend(data []byte) {
+ // Is there any space in the first buffer?
+ if buf := v.data.Front(); buf != nil && buf.read > 0 {
+ // Fill up before the first write.
+ avail := buf.read
+ copy(buf.data[0:], data[len(data)-avail:])
+ data = data[:len(data)-avail]
+ v.size += int64(avail)
+ }
+
+ for len(data) > 0 {
+ // Do we need an empty buffer?
+ buf := bufferPool.Get().(*Buffer)
+ v.data.PushFront(buf)
+
+ // The buffer is empty; copy last chunk.
+ start := len(data) - len(buf.data)
+ if start < 0 {
+ start = 0 // Everything.
+ }
+
+ // We have to put the data at the end of the current
+ // buffer in order to ensure that the next prepend will
+ // correctly fill up the beginning of this buffer.
+ bStart := len(buf.data) - len(data[start:])
+ n := copy(buf.data[bStart:], data[start:])
+ buf.read = bStart
+ buf.write = len(buf.data)
+ data = data[:start]
+ v.size += int64(n)
+ }
+}
+
+// Append appends the given data.
+func (v *View) Append(data []byte) {
+ for done := 0; done < len(data); {
+ buf := v.data.Back()
+
+ // Find the first empty buffer.
+ if buf == nil || buf.Full() {
+ buf = bufferPool.Get().(*Buffer)
+ v.data.PushBack(buf)
+ }
+
+ // Copy in to the given buffer.
+ n := copy(buf.data[buf.write:], data[done:])
+ done += n
+ buf.write += n
+ v.size += int64(n)
+ }
+}
+
+// Flatten returns a flattened copy of this data.
+//
+// This method should not be used in any performance-sensitive paths. It may
+// allocate a fresh byte slice sufficiently large to contain all the data in
+// the buffer.
+//
+// N.B. Tee data still belongs to this view, as if there is a single buffer
+// present, then it will be returned directly. This should be used for
+// temporary use only, and a reference to the given slice should not be held.
+func (v *View) Flatten() []byte {
+ if buf := v.data.Front(); buf.Next() == nil {
+ return buf.data[buf.read:buf.write] // Only one buffer.
+ }
+ data := make([]byte, 0, v.size) // Need to flatten.
+ for buf := v.data.Front(); buf != nil; buf = buf.Next() {
+ // Copy to the allocated slice.
+ data = append(data, buf.data[buf.read:buf.write]...)
+ }
+ return data
+}
+
+// Size indicates the total amount of data available in this view.
+func (v *View) Size() (sz int64) {
+ sz = v.size // Pre-calculated.
+ return sz
+}
+
+// Copy makes a strict copy of this view.
+func (v *View) Copy() (other View) {
+ for buf := v.data.Front(); buf != nil; buf = buf.Next() {
+ other.Append(buf.data[buf.read:buf.write])
+ }
+ return other
+}
+
+// Apply applies the given function across all valid data.
+func (v *View) Apply(fn func([]byte)) {
+ for buf := v.data.Front(); buf != nil; buf = buf.Next() {
+ if l := int64(buf.write - buf.read); l > 0 {
+ fn(buf.data[buf.read:buf.write])
+ }
+ }
+}
+
+// Merge merges the provided View with this one.
+//
+// The other view will be empty after this operation.
+func (v *View) Merge(other *View) {
+ // Copy over all buffers.
+ for buf := other.data.Front(); buf != nil && !buf.Empty(); buf = other.data.Front() {
+ other.data.Remove(buf)
+ v.data.PushBack(buf)
+ }
+
+ // Adjust sizes.
+ v.size += other.size
+ other.size = 0
+}
+
+// WriteFromReader writes to the buffer from an io.Reader.
+func (v *View) WriteFromReader(r io.Reader, count int64) (int64, error) {
+ var (
+ done int64
+ n int
+ err error
+ )
+ for done < count {
+ buf := v.data.Back()
+
+ // Find the first empty buffer.
+ if buf == nil || buf.Full() {
+ buf = bufferPool.Get().(*Buffer)
+ v.data.PushBack(buf)
+ }
+
+ // Is this less than the minimum batch?
+ if len(buf.data[buf.write:]) < minBatch && (count-done) >= int64(minBatch) {
+ tmp := make([]byte, minBatch)
+ n, err = r.Read(tmp)
+ v.Write(tmp[:n])
+ done += int64(n)
+ if err != nil {
+ break
+ }
+ continue
+ }
+
+ // Limit the read, if necessary.
+ end := len(buf.data)
+ if int64(end-buf.write) > (count - done) {
+ end = buf.write + int(count-done)
+ }
+
+ // Pass the relevant portion of the buffer.
+ n, err = r.Read(buf.data[buf.write:end])
+ buf.write += n
+ done += int64(n)
+ v.size += int64(n)
+ if err == io.EOF {
+ err = nil // Short write allowed.
+ break
+ } else if err != nil {
+ break
+ }
+ }
+ return done, err
+}
+
+// ReadToWriter reads from the buffer into an io.Writer.
+//
+// N.B. This does not consume the bytes read. TrimFront should
+// be called appropriately after this call in order to do so.
+func (v *View) ReadToWriter(w io.Writer, count int64) (int64, error) {
+ var (
+ done int64
+ n int
+ err error
+ )
+ offset := 0 // Spill-over for batching.
+ for buf := v.data.Front(); buf != nil && done < count; buf = buf.Next() {
+ l := buf.write - buf.read - offset
+
+ // Is this less than the minimum batch?
+ if l < minBatch && (count-done) >= int64(minBatch) && (v.size-done) >= int64(minBatch) {
+ tmp := make([]byte, minBatch)
+ n, err = v.ReadAt(tmp, done)
+ w.Write(tmp[:n])
+ done += int64(n)
+ offset = n - l // Reset below.
+ if err != nil {
+ break
+ }
+ continue
+ }
+
+ // Limit the write if necessary.
+ if int64(l) >= (count - done) {
+ l = int(count - done)
+ }
+
+ // Perform the actual write.
+ n, err = w.Write(buf.data[buf.read+offset : buf.read+offset+l])
+ done += int64(n)
+ if err != nil {
+ break
+ }
+
+ // Reset spill-over.
+ offset = 0
+ }
+ return done, err
+}
diff --git a/pkg/buffer/view_unsafe.go b/pkg/buffer/view_unsafe.go
new file mode 100755
index 000000000..d1ef39b26
--- /dev/null
+++ b/pkg/buffer/view_unsafe.go
@@ -0,0 +1,25 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buffer
+
+import (
+ "unsafe"
+)
+
+// minBatch is the smallest Read or Write operation that the
+// WriteFromReader and ReadToWriter functions will use.
+//
+// This is defined as the size of a native pointer.
+const minBatch = int(unsafe.Sizeof(uintptr(0)))
diff --git a/pkg/sentry/kernel/pipe/buffer.go b/pkg/sentry/kernel/pipe/buffer.go
deleted file mode 100644
index fe3be5dbd..000000000
--- a/pkg/sentry/kernel/pipe/buffer.go
+++ /dev/null
@@ -1,115 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pipe
-
-import (
- "io"
-
- "gvisor.dev/gvisor/pkg/safemem"
- "gvisor.dev/gvisor/pkg/sync"
-)
-
-// buffer encapsulates a queueable byte buffer.
-//
-// Note that the total size is slightly less than two pages. This
-// is done intentionally to ensure that the buffer object aligns
-// with runtime internals. We have no hard size or alignment
-// requirements. This two page size will effectively minimize
-// internal fragmentation, but still have a large enough chunk
-// to limit excessive segmentation.
-//
-// +stateify savable
-type buffer struct {
- data [8144]byte
- read int
- write int
- bufferEntry
-}
-
-// Reset resets internal data.
-//
-// This must be called before use.
-func (b *buffer) Reset() {
- b.read = 0
- b.write = 0
-}
-
-// Empty indicates the buffer is empty.
-//
-// This indicates there is no data left to read.
-func (b *buffer) Empty() bool {
- return b.read == b.write
-}
-
-// Full indicates the buffer is full.
-//
-// This indicates there is no capacity left to write.
-func (b *buffer) Full() bool {
- return b.write == len(b.data)
-}
-
-// WriteFromBlocks implements safemem.Writer.WriteFromBlocks.
-func (b *buffer) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) {
- dst := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.write:]))
- n, err := safemem.CopySeq(dst, srcs)
- b.write += int(n)
- return n, err
-}
-
-// WriteFromReader writes to the buffer from an io.Reader.
-func (b *buffer) WriteFromReader(r io.Reader, count int64) (int64, error) {
- dst := b.data[b.write:]
- if count < int64(len(dst)) {
- dst = b.data[b.write:][:count]
- }
- n, err := r.Read(dst)
- b.write += n
- return int64(n), err
-}
-
-// ReadToBlocks implements safemem.Reader.ReadToBlocks.
-func (b *buffer) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) {
- src := safemem.BlockSeqOf(safemem.BlockFromSafeSlice(b.data[b.read:b.write]))
- n, err := safemem.CopySeq(dsts, src)
- b.read += int(n)
- return n, err
-}
-
-// ReadToWriter reads from the buffer into an io.Writer.
-func (b *buffer) ReadToWriter(w io.Writer, count int64, dup bool) (int64, error) {
- src := b.data[b.read:b.write]
- if count < int64(len(src)) {
- src = b.data[b.read:][:count]
- }
- n, err := w.Write(src)
- if !dup {
- b.read += n
- }
- return int64(n), err
-}
-
-// bufferPool is a pool for buffers.
-var bufferPool = sync.Pool{
- New: func() interface{} {
- return new(buffer)
- },
-}
-
-// newBuffer grabs a new buffer from the pool.
-func newBuffer() *buffer {
- b := bufferPool.Get().(*buffer)
- b.Reset()
- return b
-}
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go
index 08410283f..725e9db7d 100644
--- a/pkg/sentry/kernel/pipe/pipe.go
+++ b/pkg/sentry/kernel/pipe/pipe.go
@@ -20,6 +20,7 @@ import (
"sync/atomic"
"syscall"
+ "gvisor.dev/gvisor/pkg/buffer"
"gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
"gvisor.dev/gvisor/pkg/sync"
@@ -70,10 +71,10 @@ type Pipe struct {
// mu protects all pipe internal state below.
mu sync.Mutex `state:"nosave"`
- // data is the buffer queue of pipe contents.
+ // view is the underlying set of buffers.
//
// This is protected by mu.
- data bufferList
+ view buffer.View
// max is the maximum size of the pipe in bytes. When this max has been
// reached, writers will get EWOULDBLOCK.
@@ -81,11 +82,6 @@ type Pipe struct {
// This is protected by mu.
max int64
- // size is the current size of the pipe in bytes.
- //
- // This is protected by mu.
- size int64
-
// hadWriter indicates if this pipe ever had a writer. Note that this
// does not necessarily indicate there is *currently* a writer, just
// that there has been a writer at some point since the pipe was
@@ -196,7 +192,7 @@ type readOps struct {
limit func(int64)
// read performs the actual read operation.
- read func(*buffer) (int64, error)
+ read func(*buffer.View) (int64, error)
}
// read reads data from the pipe into dst and returns the number of bytes
@@ -213,7 +209,7 @@ func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
defer p.mu.Unlock()
// Is the pipe empty?
- if p.size == 0 {
+ if p.view.Size() == 0 {
if !p.HasWriters() {
// There are no writers, return EOF.
return 0, nil
@@ -222,71 +218,13 @@ func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) {
}
// Limit how much we consume.
- if ops.left() > p.size {
- ops.limit(p.size)
+ if ops.left() > p.view.Size() {
+ ops.limit(p.view.Size())
}
- done := int64(0)
- for ops.left() > 0 {
- // Pop the first buffer.
- first := p.data.Front()
- if first == nil {
- break
- }
-
- // Copy user data.
- n, err := ops.read(first)
- done += int64(n)
- p.size -= n
-
- // Empty buffer?
- if first.Empty() {
- // Push to the free list.
- p.data.Remove(first)
- bufferPool.Put(first)
- }
-
- // Handle errors.
- if err != nil {
- return done, err
- }
- }
-
- return done, nil
-}
-
-// dup duplicates all data from this pipe into the given writer.
-//
-// There is no blocking behavior implemented here. The writer may propagate
-// some blocking error. All the writes must be complete writes.
-func (p *Pipe) dup(ctx context.Context, ops readOps) (int64, error) {
- p.mu.Lock()
- defer p.mu.Unlock()
-
- // Is the pipe empty?
- if p.size == 0 {
- if !p.HasWriters() {
- // See above.
- return 0, nil
- }
- return 0, syserror.ErrWouldBlock
- }
-
- // Limit how much we consume.
- if ops.left() > p.size {
- ops.limit(p.size)
- }
-
- done := int64(0)
- for buf := p.data.Front(); buf != nil; buf = buf.Next() {
- n, err := ops.read(buf)
- done += n
- if err != nil {
- return done, err
- }
- }
-
- return done, nil
+ // Copy user data; the read op is responsible for trimming.
+ done, err := ops.read(&p.view)
+ return done, err
}
type writeOps struct {
@@ -297,7 +235,7 @@ type writeOps struct {
limit func(int64)
// write should write to the provided buffer.
- write func(*buffer) (int64, error)
+ write func(*buffer.View) (int64, error)
}
// write writes data from sv into the pipe and returns the number of bytes
@@ -317,33 +255,19 @@ func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) {
// POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be
// atomic, but requires no atomicity for writes larger than this.
wanted := ops.left()
- if avail := p.max - p.size; wanted > avail {
+ if avail := p.max - p.view.Size(); wanted > avail {
if wanted <= p.atomicIOBytes {
return 0, syserror.ErrWouldBlock
}
ops.limit(avail)
}
- done := int64(0)
- for ops.left() > 0 {
- // Need a new buffer?
- last := p.data.Back()
- if last == nil || last.Full() {
- // Add a new buffer to the data list.
- last = newBuffer()
- p.data.PushBack(last)
- }
-
- // Copy user data.
- n, err := ops.write(last)
- done += int64(n)
- p.size += n
-
- // Handle errors.
- if err != nil {
- return done, err
- }
+ // Copy user data.
+ done, err := ops.write(&p.view)
+ if err != nil {
+ return done, err
}
+
if wanted > done {
// Partial write due to full pipe.
return done, syserror.ErrWouldBlock
@@ -396,7 +320,7 @@ func (p *Pipe) HasWriters() bool {
// Precondition: mu must be held.
func (p *Pipe) rReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
- if p.HasReaders() && p.data.Front() != nil {
+ if p.HasReaders() && p.view.Size() != 0 {
ready |= waiter.EventIn
}
if !p.HasWriters() && p.hadWriter {
@@ -422,7 +346,7 @@ func (p *Pipe) rReadiness() waiter.EventMask {
// Precondition: mu must be held.
func (p *Pipe) wReadinessLocked() waiter.EventMask {
ready := waiter.EventMask(0)
- if p.HasWriters() && p.size < p.max {
+ if p.HasWriters() && p.view.Size() < p.max {
ready |= waiter.EventOut
}
if !p.HasReaders() {
@@ -451,7 +375,7 @@ func (p *Pipe) rwReadiness() waiter.EventMask {
func (p *Pipe) queued() int64 {
p.mu.Lock()
defer p.mu.Unlock()
- return p.size
+ return p.view.Size()
}
// FifoSize implements fs.FifoSizer.FifoSize.
@@ -474,7 +398,7 @@ func (p *Pipe) SetFifoSize(size int64) (int64, error) {
}
p.mu.Lock()
defer p.mu.Unlock()
- if size < p.size {
+ if size < p.view.Size() {
return 0, syserror.EBUSY
}
p.max = size
diff --git a/pkg/sentry/kernel/pipe/pipe_state_autogen.go b/pkg/sentry/kernel/pipe/pipe_state_autogen.go
index f7f9f32dc..b49ab46f9 100755
--- a/pkg/sentry/kernel/pipe/pipe_state_autogen.go
+++ b/pkg/sentry/kernel/pipe/pipe_state_autogen.go
@@ -6,49 +6,6 @@ import (
"gvisor.dev/gvisor/pkg/state"
)
-func (x *buffer) beforeSave() {}
-func (x *buffer) save(m state.Map) {
- x.beforeSave()
- m.Save("data", &x.data)
- m.Save("read", &x.read)
- m.Save("write", &x.write)
- m.Save("bufferEntry", &x.bufferEntry)
-}
-
-func (x *buffer) afterLoad() {}
-func (x *buffer) load(m state.Map) {
- m.Load("data", &x.data)
- m.Load("read", &x.read)
- m.Load("write", &x.write)
- m.Load("bufferEntry", &x.bufferEntry)
-}
-
-func (x *bufferList) beforeSave() {}
-func (x *bufferList) save(m state.Map) {
- x.beforeSave()
- m.Save("head", &x.head)
- m.Save("tail", &x.tail)
-}
-
-func (x *bufferList) afterLoad() {}
-func (x *bufferList) load(m state.Map) {
- m.Load("head", &x.head)
- m.Load("tail", &x.tail)
-}
-
-func (x *bufferEntry) beforeSave() {}
-func (x *bufferEntry) save(m state.Map) {
- x.beforeSave()
- m.Save("next", &x.next)
- m.Save("prev", &x.prev)
-}
-
-func (x *bufferEntry) afterLoad() {}
-func (x *bufferEntry) load(m state.Map) {
- m.Load("next", &x.next)
- m.Load("prev", &x.prev)
-}
-
func (x *inodeOperations) beforeSave() {}
func (x *inodeOperations) save(m state.Map) {
x.beforeSave()
@@ -69,9 +26,8 @@ func (x *Pipe) save(m state.Map) {
m.Save("atomicIOBytes", &x.atomicIOBytes)
m.Save("readers", &x.readers)
m.Save("writers", &x.writers)
- m.Save("data", &x.data)
+ m.Save("view", &x.view)
m.Save("max", &x.max)
- m.Save("size", &x.size)
m.Save("hadWriter", &x.hadWriter)
}
@@ -81,9 +37,8 @@ func (x *Pipe) load(m state.Map) {
m.Load("atomicIOBytes", &x.atomicIOBytes)
m.Load("readers", &x.readers)
m.Load("writers", &x.writers)
- m.Load("data", &x.data)
+ m.Load("view", &x.view)
m.Load("max", &x.max)
- m.Load("size", &x.size)
m.Load("hadWriter", &x.hadWriter)
}
@@ -121,9 +76,6 @@ func (x *Writer) load(m state.Map) {
}
func init() {
- state.Register("pkg/sentry/kernel/pipe.buffer", (*buffer)(nil), state.Fns{Save: (*buffer).save, Load: (*buffer).load})
- state.Register("pkg/sentry/kernel/pipe.bufferList", (*bufferList)(nil), state.Fns{Save: (*bufferList).save, Load: (*bufferList).load})
- state.Register("pkg/sentry/kernel/pipe.bufferEntry", (*bufferEntry)(nil), state.Fns{Save: (*bufferEntry).save, Load: (*bufferEntry).load})
state.Register("pkg/sentry/kernel/pipe.inodeOperations", (*inodeOperations)(nil), state.Fns{Save: (*inodeOperations).save, Load: (*inodeOperations).load})
state.Register("pkg/sentry/kernel/pipe.Pipe", (*Pipe)(nil), state.Fns{Save: (*Pipe).save, Load: (*Pipe).load})
state.Register("pkg/sentry/kernel/pipe.Reader", (*Reader)(nil), state.Fns{Save: (*Reader).save, Load: (*Reader).load})
diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go
index 80158239e..5a1d4fd57 100755
--- a/pkg/sentry/kernel/pipe/pipe_util.go
+++ b/pkg/sentry/kernel/pipe/pipe_util.go
@@ -21,6 +21,7 @@ import (
"gvisor.dev/gvisor/pkg/abi/linux"
"gvisor.dev/gvisor/pkg/amutex"
+ "gvisor.dev/gvisor/pkg/buffer"
"gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/sentry/arch"
"gvisor.dev/gvisor/pkg/sync"
@@ -49,9 +50,10 @@ func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error)
limit: func(l int64) {
dst = dst.TakeFirst64(l)
},
- read: func(buf *buffer) (int64, error) {
- n, err := dst.CopyOutFrom(ctx, buf)
+ read: func(view *buffer.View) (int64, error) {
+ n, err := dst.CopyOutFrom(ctx, view)
dst = dst.DropFirst64(n)
+ view.TrimFront(n)
return n, err
},
})
@@ -70,16 +72,15 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool)
limit: func(l int64) {
count = l
},
- read: func(buf *buffer) (int64, error) {
- n, err := buf.ReadToWriter(w, count, dup)
+ read: func(view *buffer.View) (int64, error) {
+ n, err := view.ReadToWriter(w, count)
+ if !dup {
+ view.TrimFront(n)
+ }
count -= n
return n, err
},
}
- if dup {
- // There is no notification for dup operations.
- return p.dup(ctx, ops)
- }
n, err := p.read(ctx, ops)
if n > 0 {
p.Notify(waiter.EventOut)
@@ -96,8 +97,8 @@ func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error)
limit: func(l int64) {
src = src.TakeFirst64(l)
},
- write: func(buf *buffer) (int64, error) {
- n, err := src.CopyInTo(ctx, buf)
+ write: func(view *buffer.View) (int64, error) {
+ n, err := src.CopyInTo(ctx, view)
src = src.DropFirst64(n)
return n, err
},
@@ -117,8 +118,8 @@ func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, e
limit: func(l int64) {
count = l
},
- write: func(buf *buffer) (int64, error) {
- n, err := buf.WriteFromReader(r, count)
+ write: func(view *buffer.View) (int64, error) {
+ n, err := view.WriteFromReader(r, count)
count -= n
return n, err
},