From 324ad3564ba42a5106be77a06d0cd52290e1cd22 Mon Sep 17 00:00:00 2001 From: Ian Gudger Date: Mon, 15 Oct 2018 20:21:06 -0700 Subject: Refactor host.ConnectedEndpoint * Integrate recvMsg and sendMsg functions into Recv and Send respectively as they are no longer shared. * Clean up partial read/write error handling code. * Re-order code to make sense given that there is no longer a host.endpoint type. PiperOrigin-RevId: 217255072 Change-Id: Ib43fe9286452f813b8309d969be11f5fa40694cd --- pkg/sentry/fs/host/socket.go | 281 ++++++++++++++++++++------------------- pkg/tcpip/transport/unix/unix.go | 3 + 2 files changed, 144 insertions(+), 140 deletions(-) diff --git a/pkg/sentry/fs/host/socket.go b/pkg/sentry/fs/host/socket.go index 577e9e272..e454b6fe5 100644 --- a/pkg/sentry/fs/host/socket.go +++ b/pkg/sentry/fs/host/socket.go @@ -41,137 +41,6 @@ import ( // N.B. 8MB is the default maximum on Linux (2 * sysctl_wmem_max). const maxSendBufferSize = 8 << 20 -// newSocket allocates a new unix socket with host endpoint. -func newSocket(ctx context.Context, orgfd int, saveable bool) (*fs.File, error) { - ownedfd := orgfd - srfd := -1 - if saveable { - var err error - ownedfd, err = syscall.Dup(orgfd) - if err != nil { - return nil, err - } - srfd = orgfd - } - f := fd.New(ownedfd) - var q waiter.Queue - e, err := NewConnectedEndpoint(f, &q, "" /* path */) - if err != nil { - if saveable { - f.Close() - } else { - f.Release() - } - return nil, syserr.TranslateNetstackError(err).ToError() - } - - e.srfd = srfd - e.Init() - - ep := unix.NewExternal(e.stype, uniqueid.GlobalProviderFromContext(ctx), &q, e, e) - - return unixsocket.New(ctx, ep), nil -} - -// NewSocketWithDirent allocates a new unix socket with host endpoint. -// -// This is currently only used by unsaveable Gofer nodes. -// -// NewSocketWithDirent takes ownership of f on success. -func NewSocketWithDirent(ctx context.Context, d *fs.Dirent, f *fd.FD, flags fs.FileFlags) (*fs.File, error) { - f2 := fd.New(f.FD()) - var q waiter.Queue - e, err := NewConnectedEndpoint(f2, &q, "" /* path */) - if err != nil { - f2.Release() - return nil, syserr.TranslateNetstackError(err).ToError() - } - - // Take ownship of the FD. - f.Release() - - e.Init() - - ep := unix.NewExternal(e.stype, uniqueid.GlobalProviderFromContext(ctx), &q, e, e) - - return unixsocket.NewWithDirent(ctx, d, ep, flags), nil -} - -func sendMsg(fd int, data [][]byte, controlMessages unix.ControlMessages, maxlen int, truncate bool) (uintptr, *tcpip.Error) { - if !controlMessages.Empty() { - return 0, tcpip.ErrInvalidEndpointState - } - n, totalLen, err := fdWriteVec(fd, data, maxlen, truncate) - if n < totalLen && err == nil { - // The host only returns a short write if it would otherwise - // block (and only for stream sockets). - err = syserror.EAGAIN - } - return n, translateError(err) -} - -func recvMsg(fd int, data [][]byte, numRights uintptr, peek bool, addr *tcpip.FullAddress, maxlen int) (uintptr, uintptr, unix.ControlMessages, *tcpip.Error) { - var cm unet.ControlMessage - if numRights > 0 { - cm.EnableFDs(int(numRights)) - } - rl, ml, cl, rerr := fdReadVec(fd, data, []byte(cm), peek, maxlen) - if rl == 0 && rerr != nil { - return 0, 0, unix.ControlMessages{}, translateError(rerr) - } - - // Trim the control data if we received less than the full amount. - if cl < uint64(len(cm)) { - cm = cm[:cl] - } - - // Avoid extra allocations in the case where there isn't any control data. - if len(cm) == 0 { - return rl, ml, unix.ControlMessages{}, translateError(rerr) - } - - fds, err := cm.ExtractFDs() - if err != nil { - return 0, 0, unix.ControlMessages{}, translateError(err) - } - - if len(fds) == 0 { - return rl, ml, unix.ControlMessages{}, translateError(rerr) - } - return rl, ml, control.New(nil, nil, newSCMRights(fds)), translateError(rerr) -} - -// NewConnectedEndpoint creates a new ConnectedEndpoint backed by a host FD -// that will pretend to be bound at a given sentry path. -// -// The caller is responsible for calling Init(). Additionaly, Release needs to -// be called twice because host.ConnectedEndpoint is both a unix.Receiver and -// unix.ConnectedEndpoint. -func NewConnectedEndpoint(file *fd.FD, queue *waiter.Queue, path string) (*ConnectedEndpoint, *tcpip.Error) { - e := ConnectedEndpoint{ - path: path, - queue: queue, - file: file, - srfd: -1, - } - - if err := e.init(); err != nil { - return nil, err - } - - // AtomicRefCounters start off with a single reference. We need two. - e.ref.IncRef() - - return &e, nil -} - -// Init will do initialization required without holding other locks. -func (c *ConnectedEndpoint) Init() { - if err := fdnotifier.AddFD(int32(c.file.FD()), c.queue); err != nil { - panic(err) - } -} - // ConnectedEndpoint is a host FD backed implementation of // unix.ConnectedEndpoint and unix.Receiver. // @@ -249,6 +118,93 @@ func (c *ConnectedEndpoint) init() *tcpip.Error { return nil } +// NewConnectedEndpoint creates a new ConnectedEndpoint backed by a host FD +// that will pretend to be bound at a given sentry path. +// +// The caller is responsible for calling Init(). Additionaly, Release needs to +// be called twice because ConnectedEndpoint is both a unix.Receiver and +// unix.ConnectedEndpoint. +func NewConnectedEndpoint(file *fd.FD, queue *waiter.Queue, path string) (*ConnectedEndpoint, *tcpip.Error) { + e := ConnectedEndpoint{ + path: path, + queue: queue, + file: file, + srfd: -1, + } + + if err := e.init(); err != nil { + return nil, err + } + + // AtomicRefCounters start off with a single reference. We need two. + e.ref.IncRef() + + return &e, nil +} + +// Init will do initialization required without holding other locks. +func (c *ConnectedEndpoint) Init() { + if err := fdnotifier.AddFD(int32(c.file.FD()), c.queue); err != nil { + panic(err) + } +} + +// NewSocketWithDirent allocates a new unix socket with host endpoint. +// +// This is currently only used by unsaveable Gofer nodes. +// +// NewSocketWithDirent takes ownership of f on success. +func NewSocketWithDirent(ctx context.Context, d *fs.Dirent, f *fd.FD, flags fs.FileFlags) (*fs.File, error) { + f2 := fd.New(f.FD()) + var q waiter.Queue + e, err := NewConnectedEndpoint(f2, &q, "" /* path */) + if err != nil { + f2.Release() + return nil, syserr.TranslateNetstackError(err).ToError() + } + + // Take ownship of the FD. + f.Release() + + e.Init() + + ep := unix.NewExternal(e.stype, uniqueid.GlobalProviderFromContext(ctx), &q, e, e) + + return unixsocket.NewWithDirent(ctx, d, ep, flags), nil +} + +// newSocket allocates a new unix socket with host endpoint. +func newSocket(ctx context.Context, orgfd int, saveable bool) (*fs.File, error) { + ownedfd := orgfd + srfd := -1 + if saveable { + var err error + ownedfd, err = syscall.Dup(orgfd) + if err != nil { + return nil, err + } + srfd = orgfd + } + f := fd.New(ownedfd) + var q waiter.Queue + e, err := NewConnectedEndpoint(f, &q, "" /* path */) + if err != nil { + if saveable { + f.Close() + } else { + f.Release() + } + return nil, syserr.TranslateNetstackError(err).ToError() + } + + e.srfd = srfd + e.Init() + + ep := unix.NewExternal(e.stype, uniqueid.GlobalProviderFromContext(ctx), &q, e, e) + + return unixsocket.New(ctx, ep), nil +} + // Send implements unix.ConnectedEndpoint.Send. func (c *ConnectedEndpoint) Send(data [][]byte, controlMessages unix.ControlMessages, from tcpip.FullAddress) (uintptr, bool, *tcpip.Error) { c.mu.RLock() @@ -257,14 +213,30 @@ func (c *ConnectedEndpoint) Send(data [][]byte, controlMessages unix.ControlMess return 0, false, tcpip.ErrClosedForSend } + if !controlMessages.Empty() { + return 0, false, tcpip.ErrInvalidEndpointState + } + // Since stream sockets don't preserve message boundaries, we can write // only as much of the message as fits in the send buffer. truncate := c.stype == unix.SockStream - n, err := sendMsg(c.file.FD(), data, controlMessages, c.sndbuf, truncate) - // There is no need for the callee to call SendNotify because sendMsg uses - // the host's sendmsg(2) and the host kernel's queue. - return n, false, err + n, totalLen, err := fdWriteVec(c.file.FD(), data, c.sndbuf, truncate) + if n < totalLen && err == nil { + // The host only returns a short write if it would otherwise + // block (and only for stream sockets). + err = syserror.EAGAIN + } + if n > 0 && err != syserror.EAGAIN { + // The caller may need to block to send more data, but + // otherwise there isn't anything that can be done about an + // error with a partial write. + err = nil + } + + // There is no need for the callee to call SendNotify because fdWriteVec + // uses the host's sendmsg(2) and the host kernel's queue. + return n, false, translateError(err) } // SendNotify implements unix.ConnectedEndpoint.SendNotify. @@ -318,17 +290,46 @@ func (c *ConnectedEndpoint) Recv(data [][]byte, creds bool, numRights uintptr, p return 0, 0, unix.ControlMessages{}, tcpip.FullAddress{}, false, tcpip.ErrClosedForReceive } + var cm unet.ControlMessage + if numRights > 0 { + cm.EnableFDs(int(numRights)) + } + // N.B. Unix sockets don't have a receive buffer, the send buffer // serves both purposes. - rl, ml, cm, err := recvMsg(c.file.FD(), data, numRights, peek, nil, c.sndbuf) - if rl > 0 && err == tcpip.ErrWouldBlock { - // Message did not fill buffer; that's fine, no need to block. + rl, ml, cl, err := fdReadVec(c.file.FD(), data, []byte(cm), peek, c.sndbuf) + if rl > 0 && err != nil { + // We got some data, so all we need to do on error is return + // the data that we got. Short reads are fine, no need to + // block. err = nil } + if err != nil { + return 0, 0, unix.ControlMessages{}, tcpip.FullAddress{}, false, translateError(err) + } - // There is no need for the callee to call RecvNotify because recvMsg uses + // There is no need for the callee to call RecvNotify because fdReadVec uses // the host's recvmsg(2) and the host kernel's queue. - return rl, ml, cm, tcpip.FullAddress{Addr: tcpip.Address(c.path)}, false, err + + // Trim the control data if we received less than the full amount. + if cl < uint64(len(cm)) { + cm = cm[:cl] + } + + // Avoid extra allocations in the case where there isn't any control data. + if len(cm) == 0 { + return rl, ml, unix.ControlMessages{}, tcpip.FullAddress{Addr: tcpip.Address(c.path)}, false, nil + } + + fds, err := cm.ExtractFDs() + if err != nil { + return 0, 0, unix.ControlMessages{}, tcpip.FullAddress{}, false, translateError(err) + } + + if len(fds) == 0 { + return rl, ml, unix.ControlMessages{}, tcpip.FullAddress{Addr: tcpip.Address(c.path)}, false, nil + } + return rl, ml, control.New(nil, nil, newSCMRights(fds)), tcpip.FullAddress{Addr: tcpip.Address(c.path)}, false, nil } // close releases all resources related to the endpoint. diff --git a/pkg/tcpip/transport/unix/unix.go b/pkg/tcpip/transport/unix/unix.go index 718606cd1..1bca4b0b4 100644 --- a/pkg/tcpip/transport/unix/unix.go +++ b/pkg/tcpip/transport/unix/unix.go @@ -562,6 +562,9 @@ type ConnectedEndpoint interface { // Send sends a single message. This method does not block. // // notify indicates if SendNotify should be called. + // + // tcpip.ErrWouldBlock can be returned along with a partial write if + // the caller should block to send the rest of the data. Send(data [][]byte, controlMessages ControlMessages, from tcpip.FullAddress) (n uintptr, notify bool, err *tcpip.Error) // SendNotify notifies the ConnectedEndpoint of a successful Send. This -- cgit v1.2.3