diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 228 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 16 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment_state.go | 13 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/tcp_state_autogen.go | 58 |
4 files changed, 168 insertions, 147 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 6e3c8860e..8f3981075 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -17,6 +17,7 @@ package tcp import ( "encoding/binary" "fmt" + "io" "math" "runtime" "strings" @@ -27,7 +28,6 @@ import ( "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" - "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/ports" @@ -393,15 +393,28 @@ type endpoint struct { lastErrorMu sync.Mutex `state:"nosave"` lastError *tcpip.Error `state:".(string)"` - // The following fields are used to manage the receive queue. The - // protocol goroutine adds ready-for-delivery segments to rcvList, - // which are returned by Read() calls to users. + // rcvReadMu synchronizes calls to Read. // - // Once the peer has closed its send side, rcvClosed is set to true - // to indicate to users that no more data is coming. + // mu and rcvListMu are temporarily released during data copying. rcvReadMu + // must be held during each read to ensure atomicity, so that multiple reads + // do not interleave. + // + // rcvReadMu should be held before holding mu. + rcvReadMu sync.Mutex `state:"nosave"` + + // rcvListMu synchronizes access to rcvList. // // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` + rcvListMu sync.Mutex `state:"nosave"` + + // rcvList is the queue for ready-for-delivery segments. + // + // rcvReadMu, mu and rcvListMu must be held, in the stated order, to read data + // and removing segments from list. A range of segment can be determined, then + // temporarily release mu and rcvListMu while processing the segment range. + // This allows new segments to be appended to the list while processing. + // + // rcvListMu must be held to append segments to list. rcvList segmentList `state:"wait"` rcvClosed bool // rcvBufSize is the total size of the receive buffer. @@ -1309,8 +1322,69 @@ func (e *endpoint) UpdateLastError(err *tcpip.Error) { e.UnlockUser() } -// Read reads data from the endpoint. -func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { +// Read implements tcpip.Endpoint.Read. +func (e *endpoint) Read(dst io.Writer, count int, opts tcpip.ReadOptions) (tcpip.ReadResult, *tcpip.Error) { + e.rcvReadMu.Lock() + defer e.rcvReadMu.Unlock() + + // N.B. Here we get a range of segments to be processed. It is safe to not + // hold rcvListMu when processing, since we hold rcvReadMu to ensure only we + // can remove segments from the list through commitRead(). + first, last, serr := e.startRead() + if serr != nil { + if serr == tcpip.ErrClosedForReceive { + e.stats.ReadErrors.ReadClosed.Increment() + } + return tcpip.ReadResult{}, serr + } + + var err error + done := 0 + s := first + for s != nil && done < count { + var n int + n, err = s.data.ReadTo(dst, count-done, opts.Peek) + // Book keeping first then error handling. + + done += n + + if opts.Peek { + // For peek, we use the (first, last) range of segment returned from + // startRead. We don't consume the receive buffer, so commitRead should + // not be called. + // + // N.B. It is important to use `last` to determine the last segment, since + // appending can happen while we process, and will lead to data race. + if s == last { + break + } + s = s.Next() + } else { + // N.B. commitRead() conveniently returns the next segment to read, after + // removing the data/segment that is read. + s = e.commitRead(n) + } + + if err != nil { + break + } + } + + // If something is read, we must report it. Report error when nothing is read. + if done == 0 && err != nil { + return tcpip.ReadResult{}, tcpip.ErrBadBuffer + } + return tcpip.ReadResult{ + Count: done, + Total: done, + }, nil +} + +// startRead checks that endpoint is in a readable state, and return the +// inclusive range of segments that can be read. +// +// Precondition: e.rcvReadMu must be held. +func (e *endpoint) startRead() (first, last *segment, err *tcpip.Error) { e.LockUser() defer e.UnlockUser() @@ -1319,7 +1393,7 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, // on a receive. It can expect to read any data after the handshake // is complete. RFC793, section 3.9, p58. if e.EndpointState() == StateSynSent { - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrWouldBlock + return nil, nil, tcpip.ErrWouldBlock } // The endpoint can be read if it's connected, or if it's already closed @@ -1327,61 +1401,69 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, // would cause the state to become StateError so we should allow the // reads to proceed before returning a ECONNRESET. e.rcvListMu.Lock() + defer e.rcvListMu.Unlock() + bufUsed := e.rcvBufUsed if s := e.EndpointState(); !s.connected() && s != StateClose && bufUsed == 0 { - e.rcvListMu.Unlock() if s == StateError { if err := e.hardErrorLocked(); err != nil { - return buffer.View{}, tcpip.ControlMessages{}, err + return nil, nil, err } - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive + return nil, nil, tcpip.ErrClosedForReceive } e.stats.ReadErrors.NotConnected.Increment() - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrNotConnected + return nil, nil, tcpip.ErrNotConnected } - v, err := e.readLocked() - e.rcvListMu.Unlock() - - if err == tcpip.ErrClosedForReceive { - e.stats.ReadErrors.ReadClosed.Increment() - } - return v, tcpip.ControlMessages{}, err -} - -func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) { if e.rcvBufUsed == 0 { if e.rcvClosed || !e.EndpointState().connected() { - return buffer.View{}, tcpip.ErrClosedForReceive + return nil, nil, tcpip.ErrClosedForReceive } - return buffer.View{}, tcpip.ErrWouldBlock + return nil, nil, tcpip.ErrWouldBlock } - s := e.rcvList.Front() - views := s.data.Views() - v := views[s.viewToDeliver] - s.viewToDeliver++ + return e.rcvList.Front(), e.rcvList.Back(), nil +} + +// commitRead commits a read of done bytes and returns the next non-empty +// segment to read. Data read from the segment must have also been removed from +// the segment in order for this method to work correctly. +// +// It is performance critical to call commitRead frequently when servicing a big +// Read request, so TCP can make progress timely. Right now, it is designed to +// do this per segment read, hence this method conveniently returns the next +// segment to read while holding the lock. +// +// Precondition: e.rcvReadMu must be held. +func (e *endpoint) commitRead(done int) *segment { + e.LockUser() + defer e.UnlockUser() + e.rcvListMu.Lock() + defer e.rcvListMu.Unlock() - var delta int - if s.viewToDeliver >= len(views) { + memDelta := 0 + s := e.rcvList.Front() + for s != nil && s.data.Size() == 0 { e.rcvList.Remove(s) - // We only free up receive buffer space when the segment is released as the - // segment is still holding on to the views even though some views have been - // read out to the user. - delta = s.segMemSize() + // Memory is only considered released when the whole segment has been + // read. + memDelta += s.segMemSize() s.decRef() + s = e.rcvList.Front() } + e.rcvBufUsed -= done - e.rcvBufUsed -= len(v) - // If the window was small before this read and if the read freed up - // enough buffer space, to either fit an aMSS or half a receive buffer - // (whichever smaller), then notify the protocol goroutine to send a - // window update. - if crossed, above := e.windowCrossedACKThresholdLocked(delta); crossed && above { - e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) + if memDelta > 0 { + // If the window was small before this read and if the read freed up + // enough buffer space, to either fit an aMSS or half a receive buffer + // (whichever smaller), then notify the protocol goroutine to send a + // window update. + if crossed, above := e.windowCrossedACKThresholdLocked(memDelta); crossed && above { + e.notifyProtocolGoroutine(notifyNonZeroReceiveWindow) + } } - return v, nil + return e.rcvList.Front() } // isEndpointWritableLocked checks if a given endpoint is writable @@ -1499,64 +1581,6 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-c return queueAndSend() } -// Peek reads data without consuming it from the endpoint. -// -// This method does not block if there is no data pending. -func (e *endpoint) Peek(vec [][]byte) (int64, *tcpip.Error) { - e.LockUser() - defer e.UnlockUser() - - // The endpoint can be read if it's connected, or if it's already closed - // but has some pending unread data. - if s := e.EndpointState(); !s.connected() && s != StateClose { - if s == StateError { - return 0, e.hardErrorLocked() - } - e.stats.ReadErrors.InvalidEndpointState.Increment() - return 0, tcpip.ErrInvalidEndpointState - } - - e.rcvListMu.Lock() - defer e.rcvListMu.Unlock() - - if e.rcvBufUsed == 0 { - if e.rcvClosed || !e.EndpointState().connected() { - e.stats.ReadErrors.ReadClosed.Increment() - return 0, tcpip.ErrClosedForReceive - } - return 0, tcpip.ErrWouldBlock - } - - // Make a copy of vec so we can modify the slide headers. - vec = append([][]byte(nil), vec...) - - var num int64 - for s := e.rcvList.Front(); s != nil; s = s.Next() { - views := s.data.Views() - - for i := s.viewToDeliver; i < len(views); i++ { - v := views[i] - - for len(v) > 0 { - if len(vec) == 0 { - return num, nil - } - if len(vec[0]) == 0 { - vec = vec[1:] - continue - } - - n := copy(vec[0], v) - v = v[n:] - vec[0] = vec[0][n:] - num += int64(n) - } - } - } - - return num, nil -} - // selectWindowLocked returns the new window without checking for shrinking or scaling // applied. // Precondition: e.mu and e.rcvListMu must be held. diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index 5ef73ec74..c5a6d2fba 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -37,7 +37,7 @@ const ( // segment represents a TCP segment. It holds the payload and parsed TCP segment // information, and can be added to intrusive lists. -// segment is mostly immutable, the only field allowed to change is viewToDeliver. +// segment is mostly immutable, the only field allowed to change is data. // // +stateify savable type segment struct { @@ -60,10 +60,7 @@ type segment struct { hdr header.TCP // views is used as buffer for data when its length is large // enough to store a VectorisedView. - views [8]buffer.View `state:"nosave"` - // viewToDeliver keeps track of the next View that should be - // delivered by the Read endpoint. - viewToDeliver int + views [8]buffer.View `state:"nosave"` sequenceNumber seqnum.Value ackNumber seqnum.Value flags uint8 @@ -84,6 +81,9 @@ type segment struct { // acked indicates if the segment has already been SACKed. acked bool + + // dataMemSize is the memory used by data initially. + dataMemSize int } func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) *segment { @@ -100,6 +100,7 @@ func newIncomingSegment(id stack.TransportEndpointID, pkt *stack.PacketBuffer) * s.data = pkt.Data.Clone(s.views[:]) s.hdr = header.TCP(pkt.TransportHeader().View()) s.rcvdTime = time.Now() + s.dataMemSize = s.data.Size() return s } @@ -113,6 +114,7 @@ func newOutgoingSegment(id stack.TransportEndpointID, v buffer.View) *segment { s.views[0] = v s.data = buffer.NewVectorisedView(len(v), s.views[:1]) } + s.dataMemSize = s.data.Size() return s } @@ -127,12 +129,12 @@ func (s *segment) clone() *segment { netProto: s.netProto, nicID: s.nicID, remoteLinkAddr: s.remoteLinkAddr, - viewToDeliver: s.viewToDeliver, rcvdTime: s.rcvdTime, xmitTime: s.xmitTime, xmitCount: s.xmitCount, ep: s.ep, qFlags: s.qFlags, + dataMemSize: s.dataMemSize, } t.data = s.data.Clone(t.views[:]) return t @@ -204,7 +206,7 @@ func (s *segment) payloadSize() int { // segMemSize is the amount of memory used to hold the segment data and // the associated metadata. func (s *segment) segMemSize() int { - return SegSize + s.data.Size() + return SegSize + s.dataMemSize } // parse populates the sequence & ack numbers, flags, and window fields of the diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go index 7dc2741a6..7422d8c02 100644 --- a/pkg/tcpip/transport/tcp/segment_state.go +++ b/pkg/tcpip/transport/tcp/segment_state.go @@ -24,16 +24,11 @@ import ( func (s *segment) saveData() buffer.VectorisedView { // We cannot save s.data directly as s.data.views may alias to s.views, // which is not allowed by state framework (in-struct pointer). - v := make([]buffer.View, len(s.data.Views())) - // For views already delivered, we cannot save them directly as they may - // have already been sliced and saved elsewhere (e.g., readViews). - for i := 0; i < s.viewToDeliver; i++ { - v[i] = append([]byte(nil), s.data.Views()[i]...) + vs := make([]buffer.View, len(s.data.Views())) + for i, v := range s.data.Views() { + vs[i] = v } - for i := s.viewToDeliver; i < len(v); i++ { - v[i] = s.data.Views()[i] - } - return buffer.NewVectorisedView(s.data.Size(), v) + return buffer.NewVectorisedView(s.data.Size(), vs) } // loadData is invoked by stateify. diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go index 5922083a9..aab92b94f 100644 --- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go +++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go @@ -595,7 +595,6 @@ func (s *segment) StateFields() []string { "remoteLinkAddr", "data", "hdr", - "viewToDeliver", "sequenceNumber", "ackNumber", "flags", @@ -609,6 +608,7 @@ func (s *segment) StateFields() []string { "xmitTime", "xmitCount", "acked", + "dataMemSize", } } @@ -619,11 +619,11 @@ func (s *segment) StateSave(stateSinkObject state.Sink) { var dataValue buffer.VectorisedView = s.saveData() stateSinkObject.SaveValue(9, dataValue) var optionsValue []byte = s.saveOptions() - stateSinkObject.SaveValue(19, optionsValue) + stateSinkObject.SaveValue(18, optionsValue) var rcvdTimeValue unixTime = s.saveRcvdTime() - stateSinkObject.SaveValue(21, rcvdTimeValue) + stateSinkObject.SaveValue(20, rcvdTimeValue) var xmitTimeValue unixTime = s.saveXmitTime() - stateSinkObject.SaveValue(22, xmitTimeValue) + stateSinkObject.SaveValue(21, xmitTimeValue) stateSinkObject.Save(0, &s.segmentEntry) stateSinkObject.Save(1, &s.refCnt) stateSinkObject.Save(2, &s.ep) @@ -634,17 +634,17 @@ func (s *segment) StateSave(stateSinkObject state.Sink) { stateSinkObject.Save(7, &s.nicID) stateSinkObject.Save(8, &s.remoteLinkAddr) stateSinkObject.Save(10, &s.hdr) - stateSinkObject.Save(11, &s.viewToDeliver) - stateSinkObject.Save(12, &s.sequenceNumber) - stateSinkObject.Save(13, &s.ackNumber) - stateSinkObject.Save(14, &s.flags) - stateSinkObject.Save(15, &s.window) - stateSinkObject.Save(16, &s.csum) - stateSinkObject.Save(17, &s.csumValid) - stateSinkObject.Save(18, &s.parsedOptions) - stateSinkObject.Save(20, &s.hasNewSACKInfo) - stateSinkObject.Save(23, &s.xmitCount) - stateSinkObject.Save(24, &s.acked) + stateSinkObject.Save(11, &s.sequenceNumber) + stateSinkObject.Save(12, &s.ackNumber) + stateSinkObject.Save(13, &s.flags) + stateSinkObject.Save(14, &s.window) + stateSinkObject.Save(15, &s.csum) + stateSinkObject.Save(16, &s.csumValid) + stateSinkObject.Save(17, &s.parsedOptions) + stateSinkObject.Save(19, &s.hasNewSACKInfo) + stateSinkObject.Save(22, &s.xmitCount) + stateSinkObject.Save(23, &s.acked) + stateSinkObject.Save(24, &s.dataMemSize) } func (s *segment) afterLoad() {} @@ -660,21 +660,21 @@ func (s *segment) StateLoad(stateSourceObject state.Source) { stateSourceObject.Load(7, &s.nicID) stateSourceObject.Load(8, &s.remoteLinkAddr) stateSourceObject.Load(10, &s.hdr) - stateSourceObject.Load(11, &s.viewToDeliver) - stateSourceObject.Load(12, &s.sequenceNumber) - stateSourceObject.Load(13, &s.ackNumber) - stateSourceObject.Load(14, &s.flags) - stateSourceObject.Load(15, &s.window) - stateSourceObject.Load(16, &s.csum) - stateSourceObject.Load(17, &s.csumValid) - stateSourceObject.Load(18, &s.parsedOptions) - stateSourceObject.Load(20, &s.hasNewSACKInfo) - stateSourceObject.Load(23, &s.xmitCount) - stateSourceObject.Load(24, &s.acked) + stateSourceObject.Load(11, &s.sequenceNumber) + stateSourceObject.Load(12, &s.ackNumber) + stateSourceObject.Load(13, &s.flags) + stateSourceObject.Load(14, &s.window) + stateSourceObject.Load(15, &s.csum) + stateSourceObject.Load(16, &s.csumValid) + stateSourceObject.Load(17, &s.parsedOptions) + stateSourceObject.Load(19, &s.hasNewSACKInfo) + stateSourceObject.Load(22, &s.xmitCount) + stateSourceObject.Load(23, &s.acked) + stateSourceObject.Load(24, &s.dataMemSize) stateSourceObject.LoadValue(9, new(buffer.VectorisedView), func(y interface{}) { s.loadData(y.(buffer.VectorisedView)) }) - stateSourceObject.LoadValue(19, new([]byte), func(y interface{}) { s.loadOptions(y.([]byte)) }) - stateSourceObject.LoadValue(21, new(unixTime), func(y interface{}) { s.loadRcvdTime(y.(unixTime)) }) - stateSourceObject.LoadValue(22, new(unixTime), func(y interface{}) { s.loadXmitTime(y.(unixTime)) }) + stateSourceObject.LoadValue(18, new([]byte), func(y interface{}) { s.loadOptions(y.([]byte)) }) + stateSourceObject.LoadValue(20, new(unixTime), func(y interface{}) { s.loadRcvdTime(y.(unixTime)) }) + stateSourceObject.LoadValue(21, new(unixTime), func(y interface{}) { s.loadXmitTime(y.(unixTime)) }) } func (q *segmentQueue) StateTypeName() string { |