diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 228 |
1 files changed, 126 insertions, 102 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 6e3c8860e..8f3981075 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -17,6 +17,7 @@ package tcp import ( "encoding/binary" "fmt" + "io" "math" "runtime" "strings" @@ -27,7 +28,6 @@ import ( "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" - "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/ports" @@ -393,15 +393,28 @@ type endpoint struct { lastErrorMu sync.Mutex `state:"nosave"` lastError *tcpip.Error `state:".(string)"` - // The following fields are used to manage the receive queue. The - // protocol goroutine adds ready-for-delivery segments to rcvList, - // which are returned by Read() calls to users. + // rcvReadMu synchronizes calls to Read. // - // Once the peer has closed its send side, rcvClosed is set to true - // to indicate to users that no more data is coming. + // mu and rcvListMu are temporarily released during data copying. rcvReadMu + // must be held during each read to ensure atomicity, so that multiple reads + // do not interleave. + // + // rcvReadMu should be held before holding mu. + rcvReadMu sync.Mutex `state:"nosave"` + + // rcvListMu synchronizes access to rcvList. // // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` + rcvListMu sync.Mutex `state:"nosave"` + + // rcvList is the queue for ready-for-delivery segments. + // + // rcvReadMu, mu and rcvListMu must be held, in the stated order, to read data + // and removing segments from list. A range of segment can be determined, then + // temporarily release mu and rcvListMu while processing the segment range. + // This allows new segments to be appended to the list while processing. + // + // rcvListMu must be held to append segments to list. rcvList segmentList `state:"wait"` rcvClosed bool // rcvBufSize is the total size of the receive buffer. @@ -1309,8 +1322,69 @@ func (e *endpoint) UpdateLastError(err *tcpip.Error) { e.UnlockUser() } -// Read reads data from the endpoint. -func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { +// Read implements tcpip.Endpoint.Read. +func (e *endpoint) Read(dst io.Writer, count int, opts tcpip.ReadOptions) (tcpip.ReadResult, *tcpip.Error) { + e.rcvReadMu.Lock() + defer e.rcvReadMu.Unlock() + + // N.B. Here we get a range of segments to be processed. It is safe to not + // hold rcvListMu when processing, since we hold rcvReadMu to ensure only we + // can remove segments from the list through commitRead(). + first, last, serr := e.startRead() + if serr != nil { + if serr == tcpip.ErrClosedForReceive { + e.stats.ReadErrors.ReadClosed.Increment() + } + return tcpip.ReadResult{}, serr + } + + var err error + done := 0 + s := first + for s != nil && done < count { + var n int + n, err = s.data.ReadTo(dst, count-done, opts.Peek) + // Book keeping first then error handling. + + done += n + + if opts.Peek { + // For peek, we use the (first, last) range of segment returned from + // startRead. We don't consume the receive buffer, so commitRead should + // not be called. + // + // N.B. It is important to use `last` to determine the last segment, since + // appending can happen while we process, and will lead to data race. + if s == last { + break + } + s = s.Next() + } else { + // N.B. commitRead() conveniently returns the next segment to read, after + // removing the data/segment that is read. + s = e.commitRead(n) + } + + if err != nil { + break + } + } + + // If something is read, we must report it. Report error when nothing is read. + if done == 0 && err != nil { + return tcpip.ReadResult{}, tcpip.ErrBadBuffer + } + return tcpip.ReadResult{ + Count: done, + Total: done, + }, nil +} + +// startRead checks that endpoint is in a readable state, and return the +// inclusive range of segments that can be read. +// +// Precondition: e.rcvReadMu must be held. +func (e *endpoint) startRead() (first, last *segment, err *tcpip.Error) { e.LockUser() defer e.UnlockUser() @@ -1319,7 +1393,7 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, // on a receive. It can expect to read any data after the handshake // is complete. RFC793, section 3.9, p58. if e.EndpointState() == StateSynSent { - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrWouldBlock + return nil, nil, tcpip.ErrWouldBlock } // The endpoint can be read if it's connected, or if it's already closed @@ -1327,61 +1401,69 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, // would cause the state to become StateError so we should allow the // reads to proceed before returning a ECONNRESET. e.rcvListMu.Lock() + defer e.rcvListMu.Unlock() + bufUsed := e.rcvBufUsed if s := e.EndpointState(); !s.connected() && s != StateClose && bufUsed == 0 { - e.rcvListMu.Unlock() if s == StateError { if err := e.hardErrorLocked(); err != nil { - return buffer.View{}, tcpip.ControlMessages{}, err + return nil, nil, err } - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive + return nil, nil, tcpip.ErrClosedForReceive } e.stats.ReadErrors.NotConnected.Increment() - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrNotConnected + return nil, nil, tcpip.ErrNotConnected } - v, err := e.readLocked() - e.rcvListMu.Unlock() - - if err == tcpip.ErrClosedForReceive { - e.stats.ReadErrors.ReadClosed.Increment() - } - return v, tcpip.ControlMessages{}, err -} - -func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { if e.rcvBufUsed == 0 { if e.rcvClosed || !e.EndpointState().connected() { - return buffer.View{}, tcpip.ErrClosedForReceive + return nil, nil, tcpip.ErrClosedForReceive } - return buffer.View{}, tcpip.ErrWouldBlock + return nil, nil, tcpip.ErrWouldBlock } - s := e.rcvList.Front() - views := s.data.Views() - v := views[s.viewToDeliver] - s.viewToDeliver++ + return e.rcvList.Front(), e.rcvList.Back(), nil +} + +// commitRead commits a read of done bytes and returns the next non-empty +// segment to read. Data read from the segment must have also been removed from +// the segment in order for this method to work correctly. +// +// It is performance critical to call commitRead frequently when servicing a big +// Read request, so TCP can make progress timely. Right now, it is designed to +// do this per segment read, hence this method conveniently returns the next +// segment to read while holding the lock. +// +// Precondition: e.rcvReadMu must be held. +func (e *endpoint) commitRead(done int) *segment { + e.LockUser() + defer e.UnlockUser() + e.rcvListMu.Lock() + defer e.rcvListMu.Unlock() - var delta int - if s.viewToDeliver >= len(views) { + memDelta := 0 + s := e.rcvList.Front() + for s != nil && s.data.Size() == 0 { e.rcvList.Remove(s) - // We only free up receive buffer space when the segment is released as the - // segment is still holding on to the views even though some views have been - // read out to the user. - delta = s.segMemSize() + // Memory is only considered released when the whole segment has been + // read. + memDelta += s.segMemSize() s.decRef() + s = e.rcvList.Front() } + e.rcvBufUsed -= done - e.rcvBufUsed -= len(v) - // If the window was small before this read and if the read freed up - // enough buffer space, to either fit an aMSS or half a receive buffer - // (whichever smaller), then notify the protocol goroutine to send a - // window update. - if crossed, above := e.windowCrossedACKThresholdLocked(delta); crossed && above { - e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) + if memDelta > 0 { + // If the window was small before this read and if the read freed up + // enough buffer space, to either fit an aMSS or half a receive buffer + // (whichever smaller), then notify the protocol goroutine to send a + // window update. + if crossed, above := e.windowCrossedACKThresholdLocked(memDelta); crossed && above { + e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) + } } - return v, nil + return e.rcvList.Front() } // isEndpointWritableLocked checks if a given endpoint is writable @@ -1499,64 +1581,6 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-c return queueAndSend() } -// Peek reads data without consuming it from the endpoint. -// -// This method does not block if there is no data pending. -func (e *endpoint) Peek(vec [][]byte) (int64, *tcpip.Error) { - e.LockUser() - defer e.UnlockUser() - - // The endpoint can be read if it's connected, or if it's already closed - // but has some pending unread data. - if s := e.EndpointState(); !s.connected() && s != StateClose { - if s == StateError { - return 0, e.hardErrorLocked() - } - e.stats.ReadErrors.InvalidEndpointState.Increment() - return 0, tcpip.ErrInvalidEndpointState - } - - e.rcvListMu.Lock() - defer e.rcvListMu.Unlock() - - if e.rcvBufUsed == 0 { - if e.rcvClosed || !e.EndpointState().connected() { - e.stats.ReadErrors.ReadClosed.Increment() - return 0, tcpip.ErrClosedForReceive - } - return 0, tcpip.ErrWouldBlock - } - - // Make a copy of vec so we can modify the slide headers. - vec = append([][]byte(nil), vec...) - - var num int64 - for s := e.rcvList.Front(); s != nil; s = s.Next() { - views := s.data.Views() - - for i := s.viewToDeliver; i < len(views); i++ { - v := views[i] - - for len(v) > 0 { - if len(vec) == 0 { - return num, nil - } - if len(vec[0]) == 0 { - vec = vec[1:] - continue - } - - n := copy(vec[0], v) - v = v[n:] - vec[0] = vec[0][n:] - num += int64(n) - } - } - } - - return num, nil -} - // selectWindowLocked returns the new window without checking for shrinking or scaling // applied. // Precondition: e.mu and e.rcvListMu must be held. |