diff options
Diffstat (limited to 'pkg/sentry/kernel/pipe')
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe.go | 198 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_state_autogen.go | 23 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/pipe_util.go | 99 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/save_restore.go | 26 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/vfs.go | 202 |
5 files changed, 258 insertions, 290 deletions
diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index b989e14c7..c551acd99 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -21,8 +21,8 @@ import ( "sync/atomic" "syscall" - "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/context" + "gvisor.dev/gvisor/pkg/safemem" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/syserror" @@ -75,10 +75,18 @@ type Pipe struct { // mu protects all pipe internal state below. mu sync.Mutex `state:"nosave"` - // view is the underlying set of buffers. + // buf holds the pipe's data. buf is a circular buffer; the first valid + // byte in buf is at offset off, and the pipe contains size valid bytes. + // bufBlocks contains two identical safemem.Blocks representing buf; this + // avoids needing to heap-allocate a new safemem.Block slice when buf is + // resized. bufBlockSeq is a safemem.BlockSeq representing bufBlocks. // - // This is protected by mu. - view buffer.View + // These fields are protected by mu. + buf []byte + bufBlocks [2]safemem.Block `state:"nosave"` + bufBlockSeq safemem.BlockSeq `state:"nosave"` + off int64 + size int64 // max is the maximum size of the pipe in bytes. When this max has been // reached, writers will get EWOULDBLOCK. @@ -99,12 +107,6 @@ type Pipe struct { // // N.B. The size will be bounded. func NewPipe(isNamed bool, sizeBytes int64) *Pipe { - if sizeBytes < MinimumPipeSize { - sizeBytes = MinimumPipeSize - } - if sizeBytes > MaximumPipeSize { - sizeBytes = MaximumPipeSize - } var p Pipe initPipe(&p, isNamed, sizeBytes) return &p @@ -175,75 +177,71 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F } } -type readOps struct { - // left returns the bytes remaining. - left func() int64 - - // limit limits subsequence reads. - limit func(int64) - - // read performs the actual read operation. - read func(*buffer.View) (int64, error) -} - -// read reads data from the pipe into dst and returns the number of bytes -// read, or returns ErrWouldBlock if the pipe is empty. +// peekLocked passes the first count bytes in the pipe to f and returns its +// result. If fewer than count bytes are available, the safemem.BlockSeq passed +// to f will be less than count bytes in length. // -// Precondition: this pipe must have readers. -func (p *Pipe) read(ctx context.Context, ops readOps) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() - return p.readLocked(ctx, ops) -} - -func (p *Pipe) readLocked(ctx context.Context, ops readOps) (int64, error) { +// peekLocked does not mutate the pipe; if the read consumes bytes from the +// pipe, then the caller is responsible for calling p.consumeLocked() and +// p.Notify(waiter.EventOut). (The latter must be called with p.mu unlocked.) +// +// Preconditions: +// * p.mu must be locked. +// * This pipe must have readers. +func (p *Pipe) peekLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) { // Don't block for a zero-length read even if the pipe is empty. - if ops.left() == 0 { + if count == 0 { return 0, nil } - // Is the pipe empty? - if p.view.Size() == 0 { - if !p.HasWriters() { - // There are no writers, return EOF. - return 0, io.EOF + // Limit the amount of data read to the amount of data in the pipe. + if count > p.size { + if p.size == 0 { + if !p.HasWriters() { + return 0, io.EOF + } + return 0, syserror.ErrWouldBlock } - return 0, syserror.ErrWouldBlock + count = p.size } - // Limit how much we consume. - if ops.left() > p.view.Size() { - ops.limit(p.view.Size()) - } + // Prepare the view of the data to be read. + bs := p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(count)) - // Copy user data; the read op is responsible for trimming. - done, err := ops.read(&p.view) - return done, err + // Perform the read. + done, err := f(bs) + return int64(done), err } -type writeOps struct { - // left returns the bytes remaining. - left func() int64 - - // limit should limit subsequent writes. - limit func(int64) - - // write should write to the provided buffer. - write func(*buffer.View) (int64, error) -} - -// write writes data from sv into the pipe and returns the number of bytes -// written. If no bytes are written because the pipe is full (or has less than -// atomicIOBytes free capacity), write returns ErrWouldBlock. +// consumeLocked consumes the first n bytes in the pipe, such that they will no +// longer be visible to future reads. // -// Precondition: this pipe must have writers. -func (p *Pipe) write(ctx context.Context, ops writeOps) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() - return p.writeLocked(ctx, ops) +// Preconditions: +// * p.mu must be locked. +// * The pipe must contain at least n bytes. +func (p *Pipe) consumeLocked(n int64) { + p.off += n + if max := int64(len(p.buf)); p.off >= max { + p.off -= max + } + p.size -= n } -func (p *Pipe) writeLocked(ctx context.Context, ops writeOps) (int64, error) { +// writeLocked passes a safemem.BlockSeq representing the first count bytes of +// unused space in the pipe to f and returns the result. If fewer than count +// bytes are free, the safemem.BlockSeq passed to f will be less than count +// bytes in length. If the pipe is full or otherwise cannot accomodate a write +// of any number of bytes up to count, writeLocked returns ErrWouldBlock +// without calling f. +// +// Unlike peekLocked, writeLocked assumes that f returns the number of bytes +// written to the pipe, and increases the number of bytes stored in the pipe +// accordingly. Callers are still responsible for calling +// p.Notify(waiter.EventIn) with p.mu unlocked. +// +// Preconditions: +// * p.mu must be locked. +func (p *Pipe) writeLocked(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) { // Can't write to a pipe with no readers. if !p.HasReaders() { return 0, syscall.EPIPE @@ -251,29 +249,59 @@ func (p *Pipe) writeLocked(ctx context.Context, ops writeOps) (int64, error) { // POSIX requires that a write smaller than atomicIOBytes (PIPE_BUF) be // atomic, but requires no atomicity for writes larger than this. - wanted := ops.left() - avail := p.max - p.view.Size() - if wanted > avail { - if wanted <= atomicIOBytes { + avail := p.max - p.size + short := false + if count > avail { + if count <= atomicIOBytes { return 0, syserror.ErrWouldBlock } - ops.limit(avail) + count = avail + short = true } - // Copy user data. - done, err := ops.write(&p.view) - if err != nil { - return done, err + // Ensure that the buffer is big enough. + if newLen, oldCap := p.size+count, int64(len(p.buf)); newLen > oldCap { + // Allocate a new buffer. + newCap := oldCap * 2 + if oldCap == 0 { + newCap = 8 // arbitrary; sending individual integers across pipes is relatively common + } + for newLen > newCap { + newCap *= 2 + } + if newCap > p.max { + newCap = p.max + } + newBuf := make([]byte, newCap) + // Copy the old buffer's contents to the beginning of the new one. + safemem.CopySeq( + safemem.BlockSeqOf(safemem.BlockFromSafeSlice(newBuf)), + p.bufBlockSeq.DropFirst64(uint64(p.off)).TakeFirst64(uint64(p.size))) + // Switch to the new buffer. + p.buf = newBuf + p.bufBlocks[0] = safemem.BlockFromSafeSlice(newBuf) + p.bufBlocks[1] = p.bufBlocks[0] + p.bufBlockSeq = safemem.BlockSeqFromSlice(p.bufBlocks[:]) + p.off = 0 } - if done < avail { - // Non-failure, but short write. - return done, nil + // Prepare the view of the space to be written. + woff := p.off + p.size + if woff >= int64(len(p.buf)) { + woff -= int64(len(p.buf)) } - if done < wanted { - // Partial write due to full pipe. Note that this could also be - // the short write case above, we would expect a second call - // and the write to return zero bytes in this case. + bs := p.bufBlockSeq.DropFirst64(uint64(woff)).TakeFirst64(uint64(count)) + + // Perform the write. + doneU64, err := f(bs) + done := int64(doneU64) + p.size += done + if done < count || err != nil { + return done, err + } + + // If we shortened the write, adjust the returned error appropriately. + if short { return done, syserror.ErrWouldBlock } @@ -324,7 +352,7 @@ func (p *Pipe) HasWriters() bool { // Precondition: mu must be held. func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasReaders() && p.view.Size() != 0 { + if p.HasReaders() && p.size != 0 { ready |= waiter.EventIn } if !p.HasWriters() && p.hadWriter { @@ -350,7 +378,7 @@ func (p *Pipe) rReadiness() waiter.EventMask { // Precondition: mu must be held. func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) - if p.HasWriters() && p.view.Size() < p.max { + if p.HasWriters() && p.size < p.max { ready |= waiter.EventOut } if !p.HasReaders() { @@ -383,7 +411,7 @@ func (p *Pipe) queued() int64 { } func (p *Pipe) queuedLocked() int64 { - return p.view.Size() + return p.size } // FifoSize implements fs.FifoSizer.FifoSize. @@ -406,7 +434,7 @@ func (p *Pipe) SetFifoSize(size int64) (int64, error) { } p.mu.Lock() defer p.mu.Unlock() - if size < p.view.Size() { + if size < p.size { return 0, syserror.EBUSY } p.max = size diff --git a/pkg/sentry/kernel/pipe/pipe_state_autogen.go b/pkg/sentry/kernel/pipe/pipe_state_autogen.go index 3413c8bbb..9cee1f13a 100644 --- a/pkg/sentry/kernel/pipe/pipe_state_autogen.go +++ b/pkg/sentry/kernel/pipe/pipe_state_autogen.go @@ -41,7 +41,9 @@ func (p *Pipe) StateFields() []string { "isNamed", "readers", "writers", - "view", + "buf", + "off", + "size", "max", "hadWriter", } @@ -54,20 +56,23 @@ func (p *Pipe) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(0, &p.isNamed) stateSinkObject.Save(1, &p.readers) stateSinkObject.Save(2, &p.writers) - stateSinkObject.Save(3, &p.view) - stateSinkObject.Save(4, &p.max) - stateSinkObject.Save(5, &p.hadWriter) + stateSinkObject.Save(3, &p.buf) + stateSinkObject.Save(4, &p.off) + stateSinkObject.Save(5, &p.size) + stateSinkObject.Save(6, &p.max) + stateSinkObject.Save(7, &p.hadWriter) } -func (p *Pipe) afterLoad() {} - func (p *Pipe) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(0, &p.isNamed) stateSourceObject.Load(1, &p.readers) stateSourceObject.Load(2, &p.writers) - stateSourceObject.Load(3, &p.view) - stateSourceObject.Load(4, &p.max) - stateSourceObject.Load(5, &p.hadWriter) + stateSourceObject.Load(3, &p.buf) + stateSourceObject.Load(4, &p.off) + stateSourceObject.Load(5, &p.size) + stateSourceObject.Load(6, &p.max) + stateSourceObject.Load(7, &p.hadWriter) + stateSourceObject.AfterLoad(p.afterLoad) } func (r *Reader) StateTypeName() string { diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go index f665920cb..77246edbe 100644 --- a/pkg/sentry/kernel/pipe/pipe_util.go +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -21,9 +21,9 @@ import ( "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/amutex" - "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/marshal/primitive" + "gvisor.dev/gvisor/pkg/safemem" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/usermem" @@ -44,46 +44,37 @@ func (p *Pipe) Release(context.Context) { // Read reads from the Pipe into dst. func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { - n, err := p.read(ctx, readOps{ - left: func() int64 { - return dst.NumBytes() - }, - limit: func(l int64) { - dst = dst.TakeFirst64(l) - }, - read: func(view *buffer.View) (int64, error) { - n, err := dst.CopyOutFrom(ctx, view) - dst = dst.DropFirst64(n) - view.TrimFront(n) - return n, err - }, - }) + n, err := dst.CopyOutFrom(ctx, p) if n > 0 { p.Notify(waiter.EventOut) } return n, err } +// ReadToBlocks implements safemem.Reader.ReadToBlocks for Pipe.Read. +func (p *Pipe) ReadToBlocks(dsts safemem.BlockSeq) (uint64, error) { + n, err := p.read(int64(dsts.NumBytes()), func(srcs safemem.BlockSeq) (uint64, error) { + return safemem.CopySeq(dsts, srcs) + }, true /* removeFromSrc */) + return uint64(n), err +} + +func (p *Pipe) read(count int64, f func(srcs safemem.BlockSeq) (uint64, error), removeFromSrc bool) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + n, err := p.peekLocked(count, f) + if n > 0 && removeFromSrc { + p.consumeLocked(n) + } + return n, err +} + // WriteTo writes to w from the Pipe. func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) { - ops := readOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - read: func(view *buffer.View) (int64, error) { - n, err := view.ReadToWriter(w, count) - if !dup { - view.TrimFront(n) - } - count -= n - return n, err - }, - } - n, err := p.read(ctx, ops) - if n > 0 { + n, err := p.read(count, func(srcs safemem.BlockSeq) (uint64, error) { + return safemem.FromIOWriter{w}.WriteFromBlocks(srcs) + }, !dup /* removeFromSrc */) + if n > 0 && !dup { p.Notify(waiter.EventOut) } return n, err @@ -91,39 +82,31 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) // Write writes to the Pipe from src. func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { - n, err := p.write(ctx, writeOps{ - left: func() int64 { - return src.NumBytes() - }, - limit: func(l int64) { - src = src.TakeFirst64(l) - }, - write: func(view *buffer.View) (int64, error) { - n, err := src.CopyInTo(ctx, view) - src = src.DropFirst64(n) - return n, err - }, - }) + n, err := src.CopyInTo(ctx, p) if n > 0 { p.Notify(waiter.EventIn) } return n, err } +// WriteFromBlocks implements safemem.Writer.WriteFromBlocks for Pipe.Write. +func (p *Pipe) WriteFromBlocks(srcs safemem.BlockSeq) (uint64, error) { + n, err := p.write(int64(srcs.NumBytes()), func(dsts safemem.BlockSeq) (uint64, error) { + return safemem.CopySeq(dsts, srcs) + }) + return uint64(n), err +} + +func (p *Pipe) write(count int64, f func(safemem.BlockSeq) (uint64, error)) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + return p.writeLocked(count, f) +} + // ReadFrom reads from r to the Pipe. func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) { - n, err := p.write(ctx, writeOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - write: func(view *buffer.View) (int64, error) { - n, err := view.WriteFromReader(r, count) - count -= n - return n, err - }, + n, err := p.write(count, func(dsts safemem.BlockSeq) (uint64, error) { + return safemem.FromIOReader{r}.ReadToBlocks(dsts) }) if n > 0 { p.Notify(waiter.EventIn) diff --git a/pkg/sentry/kernel/pipe/save_restore.go b/pkg/sentry/kernel/pipe/save_restore.go new file mode 100644 index 000000000..f135827de --- /dev/null +++ b/pkg/sentry/kernel/pipe/save_restore.go @@ -0,0 +1,26 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipe + +import ( + "gvisor.dev/gvisor/pkg/safemem" +) + +// afterLoad is called by stateify. +func (p *Pipe) afterLoad() { + p.bufBlocks[0] = safemem.BlockFromSafeSlice(p.buf) + p.bufBlocks[1] = p.bufBlocks[0] + p.bufBlockSeq = safemem.BlockSeqFromSlice(p.bufBlocks[:]) +} diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go index 2d47d2e82..d5a91730d 100644 --- a/pkg/sentry/kernel/pipe/vfs.go +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -16,7 +16,6 @@ package pipe import ( "gvisor.dev/gvisor/pkg/abi/linux" - "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/safemem" "gvisor.dev/gvisor/pkg/sentry/arch" @@ -269,12 +268,10 @@ func (fd *VFSPipeFD) SetPipeSize(size int64) (int64, error) { // SpliceToNonPipe performs a splice operation from fd to a non-pipe file. func (fd *VFSPipeFD) SpliceToNonPipe(ctx context.Context, out *vfs.FileDescription, off, count int64) (int64, error) { fd.pipe.mu.Lock() - defer fd.pipe.mu.Unlock() // Cap the sequence at number of bytes actually available. - v := fd.pipe.queuedLocked() - if v < count { - count = v + if count > fd.pipe.size { + count = fd.pipe.size } src := usermem.IOSequence{ IO: fd, @@ -291,154 +288,97 @@ func (fd *VFSPipeFD) SpliceToNonPipe(ctx context.Context, out *vfs.FileDescripti n, err = out.PWrite(ctx, src, off, vfs.WriteOptions{}) } if n > 0 { - fd.pipe.view.TrimFront(n) + fd.pipe.consumeLocked(n) + } + + fd.pipe.mu.Unlock() + + if n > 0 { + fd.pipe.Notify(waiter.EventOut) } return n, err } // SpliceFromNonPipe performs a splice operation from a non-pipe file to fd. func (fd *VFSPipeFD) SpliceFromNonPipe(ctx context.Context, in *vfs.FileDescription, off, count int64) (int64, error) { - fd.pipe.mu.Lock() - defer fd.pipe.mu.Unlock() - dst := usermem.IOSequence{ IO: fd, Addrs: usermem.AddrRangeSeqOf(usermem.AddrRange{0, usermem.Addr(count)}), } + var ( + n int64 + err error + ) + fd.pipe.mu.Lock() if off == -1 { - return in.Read(ctx, dst, vfs.ReadOptions{}) + n, err = in.Read(ctx, dst, vfs.ReadOptions{}) + } else { + n, err = in.PRead(ctx, dst, off, vfs.ReadOptions{}) + } + fd.pipe.mu.Unlock() + + if n > 0 { + fd.pipe.Notify(waiter.EventIn) } - return in.PRead(ctx, dst, off, vfs.ReadOptions{}) + return n, err } // CopyIn implements usermem.IO.CopyIn. Note that it is the caller's -// responsibility to trim fd.pipe.view after the read is completed. +// responsibility to call fd.pipe.consumeLocked() and +// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// +// Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, opts usermem.IOOpts) (int, error) { - origCount := int64(len(dst)) - n, err := fd.pipe.readLocked(ctx, readOps{ - left: func() int64 { - return int64(len(dst)) - }, - limit: func(l int64) { - dst = dst[:l] - }, - read: func(view *buffer.View) (int64, error) { - n, err := view.ReadAt(dst, 0) - return int64(n), err - }, + n, err := fd.pipe.peekLocked(int64(len(dst)), func(srcs safemem.BlockSeq) (uint64, error) { + return safemem.CopySeq(safemem.BlockSeqOf(safemem.BlockFromSafeSlice(dst)), srcs) }) - if n > 0 { - fd.pipe.Notify(waiter.EventOut) - } - if err == nil && n != origCount { - return int(n), syserror.ErrWouldBlock - } return int(n), err } -// CopyOut implements usermem.IO.CopyOut. +// CopyOut implements usermem.IO.CopyOut. Note that it is the caller's +// responsibility to call fd.pipe.Notify(waiter.EventIn) after the +// write is completed. +// +// Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOut(ctx context.Context, addr usermem.Addr, src []byte, opts usermem.IOOpts) (int, error) { - origCount := int64(len(src)) - n, err := fd.pipe.writeLocked(ctx, writeOps{ - left: func() int64 { - return int64(len(src)) - }, - limit: func(l int64) { - src = src[:l] - }, - write: func(view *buffer.View) (int64, error) { - view.Append(src) - return int64(len(src)), nil - }, + n, err := fd.pipe.writeLocked(int64(len(src)), func(dsts safemem.BlockSeq) (uint64, error) { + return safemem.CopySeq(dsts, safemem.BlockSeqOf(safemem.BlockFromSafeSlice(src))) }) - if n > 0 { - fd.pipe.Notify(waiter.EventIn) - } - if err == nil && n != origCount { - return int(n), syserror.ErrWouldBlock - } return int(n), err } // ZeroOut implements usermem.IO.ZeroOut. +// +// Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) ZeroOut(ctx context.Context, addr usermem.Addr, toZero int64, opts usermem.IOOpts) (int64, error) { - origCount := toZero - n, err := fd.pipe.writeLocked(ctx, writeOps{ - left: func() int64 { - return toZero - }, - limit: func(l int64) { - toZero = l - }, - write: func(view *buffer.View) (int64, error) { - view.Grow(view.Size()+toZero, true /* zero */) - return toZero, nil - }, + n, err := fd.pipe.writeLocked(toZero, func(dsts safemem.BlockSeq) (uint64, error) { + return safemem.ZeroSeq(dsts) }) - if n > 0 { - fd.pipe.Notify(waiter.EventIn) - } - if err == nil && n != origCount { - return n, syserror.ErrWouldBlock - } return n, err } // CopyInTo implements usermem.IO.CopyInTo. Note that it is the caller's -// responsibility to trim fd.pipe.view after the read is completed. +// responsibility to call fd.pipe.consumeLocked() and +// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// +// Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst safemem.Writer, opts usermem.IOOpts) (int64, error) { - count := ars.NumBytes() - if count == 0 { - return 0, nil - } - origCount := count - n, err := fd.pipe.readLocked(ctx, readOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - read: func(view *buffer.View) (int64, error) { - n, err := view.ReadToSafememWriter(dst, uint64(count)) - return int64(n), err - }, + return fd.pipe.peekLocked(ars.NumBytes(), func(srcs safemem.BlockSeq) (uint64, error) { + return dst.WriteFromBlocks(srcs) }) - if n > 0 { - fd.pipe.Notify(waiter.EventOut) - } - if err == nil && n != origCount { - return n, syserror.ErrWouldBlock - } - return n, err } // CopyOutFrom implements usermem.IO.CopyOutFrom. +// +// Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOutFrom(ctx context.Context, ars usermem.AddrRangeSeq, src safemem.Reader, opts usermem.IOOpts) (int64, error) { - count := ars.NumBytes() - if count == 0 { - return 0, nil - } - origCount := count - n, err := fd.pipe.writeLocked(ctx, writeOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - write: func(view *buffer.View) (int64, error) { - n, err := view.WriteFromSafememReader(src, uint64(count)) - return int64(n), err - }, + n, err := fd.pipe.writeLocked(ars.NumBytes(), func(dsts safemem.BlockSeq) (uint64, error) { + return src.ReadToBlocks(dsts) }) if n > 0 { fd.pipe.Notify(waiter.EventIn) } - if err == nil && n != origCount { - return n, syserror.ErrWouldBlock - } return n, err } @@ -481,37 +421,23 @@ func spliceOrTee(ctx context.Context, dst, src *VFSPipeFD, count int64, removeFr } lockTwoPipes(dst.pipe, src.pipe) - defer dst.pipe.mu.Unlock() - defer src.pipe.mu.Unlock() - - n, err := dst.pipe.writeLocked(ctx, writeOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - write: func(dstView *buffer.View) (int64, error) { - return src.pipe.readLocked(ctx, readOps{ - left: func() int64 { - return count - }, - limit: func(l int64) { - count = l - }, - read: func(srcView *buffer.View) (int64, error) { - n, err := srcView.ReadToSafememWriter(dstView, uint64(count)) - if n > 0 && removeFromSrc { - srcView.TrimFront(int64(n)) - } - return int64(n), err - }, - }) - }, + n, err := dst.pipe.writeLocked(count, func(dsts safemem.BlockSeq) (uint64, error) { + n, err := src.pipe.peekLocked(int64(dsts.NumBytes()), func(srcs safemem.BlockSeq) (uint64, error) { + return safemem.CopySeq(dsts, srcs) + }) + if n > 0 && removeFromSrc { + src.pipe.consumeLocked(n) + } + return uint64(n), err }) + dst.pipe.mu.Unlock() + src.pipe.mu.Unlock() + if n > 0 { dst.pipe.Notify(waiter.EventIn) - src.pipe.Notify(waiter.EventOut) + if removeFromSrc { + src.pipe.Notify(waiter.EventOut) + } } return n, err } |