From b1de1da318631c6d29f6c04dea370f712078f443 Mon Sep 17 00:00:00 2001 From: Ting-Yu Wang Date: Thu, 7 Jan 2021 14:14:58 -0800 Subject: netstack: Refactor tcpip.Endpoint.Read Read now takes a destination io.Writer, count, options. Keeping the method name Read, in contrast to the Write method. This enables: * direct transfer of views under VV * zero copy It also eliminates the need for sentry to keep a slice of view because userspace had requested a read that is smaller than the view returned, removing the complexity there. Read/Peek/ReadPacket are now consolidated together and some duplicate code is removed. PiperOrigin-RevId: 350636322 --- pkg/sentry/socket/netstack/BUILD | 1 - pkg/sentry/socket/netstack/netstack.go | 265 ++++++++++----------------------- 2 files changed, 77 insertions(+), 189 deletions(-) (limited to 'pkg/sentry') diff --git a/pkg/sentry/socket/netstack/BUILD b/pkg/sentry/socket/netstack/BUILD index fae3b6783..b2206900b 100644 --- a/pkg/sentry/socket/netstack/BUILD +++ b/pkg/sentry/socket/netstack/BUILD @@ -25,7 +25,6 @@ go_library( "//pkg/marshal", "//pkg/marshal/primitive", "//pkg/metric", - "//pkg/safemem", "//pkg/sentry/arch", "//pkg/sentry/device", "//pkg/sentry/fs", diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go index fe11fca9c..dcf898c0a 100644 --- a/pkg/sentry/socket/netstack/netstack.go +++ b/pkg/sentry/socket/netstack/netstack.go @@ -28,9 +28,9 @@ import ( "bytes" "fmt" "io" + "io/ioutil" "math" "reflect" - "sync/atomic" "syscall" "time" @@ -43,7 +43,6 @@ import ( "gvisor.dev/gvisor/pkg/marshal" "gvisor.dev/gvisor/pkg/marshal/primitive" "gvisor.dev/gvisor/pkg/metric" - "gvisor.dev/gvisor/pkg/safemem" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sentry/fs/fsutil" @@ -308,16 +307,8 @@ type socketOpsCommon struct { skType linux.SockType protocol int - // readViewHasData is 1 iff readView has data to be read, 0 otherwise. - // Must be accessed using atomic operations. It must only be written - // with readMu held but can be read without holding readMu. The latter - // is required to avoid deadlocks in epoll Readiness checks. - readViewHasData uint32 - // readMu protects access to the below fields. readMu sync.Mutex `state:"nosave"` - // readView contains the remaining payload from the last packet. - readView buffer.View // readCM holds control message information for the last packet read // from Endpoint. readCM socket.IPControlMessages @@ -336,8 +327,8 @@ type socketOpsCommon struct { // valid when timestampValid is true. It is protected by readMu. timestampNS int64 - // sockOptInq corresponds to TCP_INQ. It is implemented at this level - // because it takes into account data from readView. + // TODO(b/153685824): Move this to SocketOptions. + // sockOptInq corresponds to TCP_INQ. sockOptInq bool } @@ -377,41 +368,23 @@ func (s *socketOpsCommon) isPacketBased() bool { return s.skType == linux.SOCK_DGRAM || s.skType == linux.SOCK_SEQPACKET || s.skType == linux.SOCK_RDM || s.skType == linux.SOCK_RAW } -// fetchReadView updates the readView field of the socket if it's currently -// empty. It assumes that the socket is locked. -// // Precondition: s.readMu must be held. -func (s *socketOpsCommon) fetchReadView() *syserr.Error { - if len(s.readView) > 0 { - return nil - } - s.readView = nil - s.sender = tcpip.FullAddress{} - s.linkPacketInfo = tcpip.LinkPacketInfo{} +func (s *socketOpsCommon) readLocked(dst io.Writer, count int, peek bool) (numRead, numTotal int, serr *syserr.Error) { + res, err := s.Endpoint.Read(dst, count, tcpip.ReadOptions{ + Peek: peek, + NeedRemoteAddr: true, + NeedLinkPacketInfo: true, + }) - var v buffer.View - var cms tcpip.ControlMessages - var err *tcpip.Error + // Assign these anyways. + s.readCM = socket.NewIPControlMessages(s.family, res.ControlMessages) + s.sender = res.RemoteAddr + s.linkPacketInfo = res.LinkPacketInfo - switch e := s.Endpoint.(type) { - // The ordering of these interfaces matters. The most specific - // interfaces must be specified before the more generic Endpoint - // interface. - case tcpip.PacketEndpoint: - v, cms, err = e.ReadPacket(&s.sender, &s.linkPacketInfo) - case tcpip.Endpoint: - v, cms, err = e.Read(&s.sender) - } if err != nil { - atomic.StoreUint32(&s.readViewHasData, 0) - return syserr.TranslateNetstackError(err) + return 0, 0, syserr.TranslateNetstackError(err) } - - s.readView = v - s.readCM = socket.NewIPControlMessages(s.family, cms) - atomic.StoreUint32(&s.readViewHasData, 1) - - return nil + return res.Count, res.Total, nil } // Release implements fs.FileOperations.Release. @@ -460,38 +433,14 @@ func (s *SocketOperations) Read(ctx context.Context, _ *fs.File, dst usermem.IOS // WriteTo implements fs.FileOperations.WriteTo. func (s *SocketOperations) WriteTo(ctx context.Context, _ *fs.File, dst io.Writer, count int64, dup bool) (int64, error) { s.readMu.Lock() + defer s.readMu.Unlock() - // Copy as much data as possible. - done := int64(0) - for count > 0 { - // This may return a blocking error. - if err := s.fetchReadView(); err != nil { - s.readMu.Unlock() - return done, err.ToError() - } - - // Write to the underlying file. - n, err := dst.Write(s.readView) - done += int64(n) - count -= int64(n) - if dup { - // That's all we support for dup. This is generally - // supported by any Linux system calls, but the - // expectation is that now a caller will call read to - // actually remove these bytes from the socket. - break - } - - // Drop that part of the view. - s.readView.TrimFront(n) - if err != nil { - s.readMu.Unlock() - return done, err - } + // This may return a blocking error. + n, _, err := s.readLocked(dst, int(count), dup /* peek */) + if err != nil { + return 0, err.ToError() } - - s.readMu.Unlock() - return done, nil + return int64(n), nil } // ioSequencePayload implements tcpip.Payload. @@ -627,17 +576,7 @@ func (s *SocketOperations) ReadFrom(ctx context.Context, _ *fs.File, r io.Reader // Readiness returns a mask of ready events for socket s. func (s *socketOpsCommon) Readiness(mask waiter.EventMask) waiter.EventMask { - r := s.Endpoint.Readiness(mask) - - // Check our cached value iff the caller asked for readability and the - // endpoint itself is currently not readable. - if (mask & ^r & waiter.EventIn) != 0 { - if atomic.LoadUint32(&s.readViewHasData) == 1 { - r |= waiter.EventIn - } - } - - return r + return s.Endpoint.Readiness(mask) } func (s *socketOpsCommon) checkFamily(family uint16, exact bool) *syserr.Error { @@ -2618,66 +2557,20 @@ func (s *socketOpsCommon) GetPeerName(t *kernel.Task) (linux.SockAddr, uint32, * return a, l, nil } -// coalescingRead is the fast path for non-blocking, non-peek, stream-based -// case. It coalesces as many packets as possible before returning to the -// caller. +// streamRead is the fast path for non-blocking, non-peek, stream-based socket. // // Precondition: s.readMu must be locked. -func (s *socketOpsCommon) coalescingRead(ctx context.Context, dst usermem.IOSequence, discard bool) (int, *syserr.Error) { - var err *syserr.Error - var copied int - - // Copy as many views as possible into the user-provided buffer. - for { - // Always do at least one fetchReadView, even if the number of bytes to - // read is 0. - err = s.fetchReadView() - if err != nil || len(s.readView) == 0 { - break - } - if dst.NumBytes() == 0 { - break - } - - var n int - var e error - if discard { - n = len(s.readView) - if int64(n) > dst.NumBytes() { - n = int(dst.NumBytes()) - } - } else { - n, e = dst.CopyOut(ctx, s.readView) - // Set the control message, even if 0 bytes were read. - if e == nil { - s.updateTimestamp() - } - } - copied += n - s.readView.TrimFront(n) - - dst = dst.DropFirst(n) - if e != nil { - err = syserr.FromError(e) - break - } - // If we are done reading requested data then stop. - if dst.NumBytes() == 0 { - break - } - } - - if len(s.readView) == 0 { - atomic.StoreUint32(&s.readViewHasData, 0) +func (s *socketOpsCommon) streamRead(ctx context.Context, dst io.Writer, count int) (int, *syserr.Error) { + // Always do at least one read, even if the number of bytes to read is 0. + var n int + n, _, err := s.readLocked(dst, count, false /* peek */) + if err != nil { + return 0, err } - - // If we managed to copy something, we must deliver it. - if copied > 0 { - s.Endpoint.ModerateRecvBuf(copied) - return copied, nil + if n > 0 { + s.Endpoint.ModerateRecvBuf(n) } - - return 0, err + return n, nil } func (s *socketOpsCommon) fillCmsgInq(cmsg *socket.ControlMessages) { @@ -2689,7 +2582,7 @@ func (s *socketOpsCommon) fillCmsgInq(cmsg *socket.ControlMessages) { return } cmsg.IP.HasInq = true - cmsg.IP.Inq = int32(len(s.readView) + rcvBufUsed) + cmsg.IP.Inq = int32(rcvBufUsed) } func toLinuxPacketType(pktType tcpip.PacketType) uint8 { @@ -2726,7 +2619,21 @@ func (s *socketOpsCommon) nonBlockingRead(ctx context.Context, dst usermem.IOSeq // bytes of data to be discarded, rather than passed back in a // caller-supplied buffer. s.readMu.Lock() - n, err := s.coalescingRead(ctx, dst, trunc) + + var w io.Writer + if trunc { + w = ioutil.Discard + } else { + w = dst.Writer(ctx) + } + + n, err := s.streamRead(ctx, w, int(dst.NumBytes())) + + if err == nil && !trunc { + // Set the control message, even if 0 bytes were read. + s.updateTimestamp() + } + cmsg := s.controlMessages() s.fillCmsgInq(&cmsg) s.readMu.Unlock() @@ -2736,18 +2643,32 @@ func (s *socketOpsCommon) nonBlockingRead(ctx context.Context, dst usermem.IOSeq s.readMu.Lock() defer s.readMu.Unlock() - if err := s.fetchReadView(); err != nil { + // MSG_TRUNC with MSG_PEEK on a TCP socket returns the + // amount that could be read, and does not write to buffer. + isTCPPeekTrunc := !isPacket && peek && trunc + + var w io.Writer + if isTCPPeekTrunc { + w = ioutil.Discard + } else { + w = dst.Writer(ctx) + } + + var numRead, numTotal int + var err *syserr.Error + numRead, numTotal, err = s.readLocked(w, int(dst.NumBytes()), peek) + if err != nil { return 0, 0, nil, 0, socket.ControlMessages{}, err } - if !isPacket && peek && trunc { - // MSG_TRUNC with MSG_PEEK on a TCP socket returns the - // amount that could be read. + if isTCPPeekTrunc { + // TCP endpoint does not return the total bytes in buffer as numTotal. + // We need to query it from socket option. rql, err := s.Endpoint.GetSockOptInt(tcpip.ReceiveQueueSizeOption) if err != nil { return 0, 0, nil, 0, socket.ControlMessages{}, syserr.TranslateNetstackError(err) } - available := len(s.readView) + int(rql) + available := int(rql) bufLen := int(dst.NumBytes()) if available < bufLen { return available, 0, nil, 0, socket.ControlMessages{}, nil @@ -2755,11 +2676,9 @@ func (s *socketOpsCommon) nonBlockingRead(ctx context.Context, dst usermem.IOSeq return bufLen, 0, nil, 0, socket.ControlMessages{}, nil } - n, err := dst.CopyOut(ctx, s.readView) // Set the control message, even if 0 bytes were read. - if err == nil { - s.updateTimestamp() - } + s.updateTimestamp() + var addr linux.SockAddr var addrLen uint32 if isPacket && senderRequested { @@ -2772,58 +2691,33 @@ func (s *socketOpsCommon) nonBlockingRead(ctx context.Context, dst usermem.IOSeq } if peek { - if l := len(s.readView); trunc && l > n { + if trunc && numTotal > numRead { // isPacket must be true. - return l, linux.MSG_TRUNC, addr, addrLen, s.controlMessages(), syserr.FromError(err) + return numTotal, linux.MSG_TRUNC, addr, addrLen, s.controlMessages(), nil } - - if isPacket || err != nil { - return n, 0, addr, addrLen, s.controlMessages(), syserr.FromError(err) - } - - // We need to peek beyond the first message. - dst = dst.DropFirst(n) - num, err := dst.CopyOutFrom(ctx, safemem.FromVecReaderFunc{func(dsts [][]byte) (int64, error) { - n, err := s.Endpoint.Peek(dsts) - // TODO(b/78348848): Handle peek timestamp. - if err != nil { - return int64(n), syserr.TranslateNetstackError(err).ToError() - } - return int64(n), nil - }}) - n += int(num) - if err == syserror.ErrWouldBlock && n > 0 { - // We got some data, so no need to return an error. - err = nil - } - return n, 0, nil, 0, s.controlMessages(), syserr.FromError(err) + return numRead, 0, nil, 0, s.controlMessages(), nil } var msgLen int if isPacket { - msgLen = len(s.readView) - s.readView = nil + msgLen = numTotal } else { - msgLen = int(n) - s.readView.TrimFront(int(n)) - } - - if len(s.readView) == 0 { - atomic.StoreUint32(&s.readViewHasData, 0) + msgLen = numRead } var flags int - if msgLen > int(n) { + if msgLen > numRead { flags |= linux.MSG_TRUNC } + n := numRead if trunc { n = msgLen } cmsg := s.controlMessages() s.fillCmsgInq(&cmsg) - return n, flags, addr, addrLen, cmsg, syserr.FromError(err) + return n, flags, addr, addrLen, cmsg, nil } func (s *socketOpsCommon) controlMessages() socket.ControlMessages { @@ -3090,11 +2984,6 @@ func (s *socketOpsCommon) ioctl(ctx context.Context, io usermem.IO, args arch.Sy return 0, syserr.TranslateNetstackError(terr).ToError() } - // Add bytes removed from the endpoint but not yet sent to the caller. - s.readMu.Lock() - v += len(s.readView) - s.readMu.Unlock() - if v > math.MaxInt32 { v = math.MaxInt32 } -- cgit v1.2.3