diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 114 |
1 files changed, 62 insertions, 52 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 8f0f0c3e9..7bc6b08f0 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -156,7 +156,7 @@ func (h *handshake) resetState() { h.flags = header.TCPFlagSyn h.ackNum = 0 h.mss = 0 - h.iss = generateSecureISN(h.ep.ID, h.ep.stack.Seed()) + h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Seed()) } // generateSecureISN generates a secure Initial Sequence number based on the @@ -302,7 +302,7 @@ func (h *handshake) synSentState(s *segment) tcpip.Error { ttl = h.ep.route.DefaultTTL() } h.ep.sendSynTCP(h.ep.route, tcpFields{ - id: h.ep.ID, + id: h.ep.TransportEndpointInfo.ID, ttl: ttl, tos: h.ep.sendTOS, flags: h.flags, @@ -358,14 +358,14 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error { h.resetState() synOpts := header.TCPSynOptions{ WS: h.rcvWndScale, - TS: h.ep.sendTSOk, + TS: h.ep.SendTSOk, TSVal: h.ep.timestamp(), TSEcr: h.ep.recentTimestamp(), - SACKPermitted: h.ep.sackPermitted, + SACKPermitted: h.ep.SACKPermitted, MSS: h.ep.amss, } h.ep.sendSynTCP(h.ep.route, tcpFields{ - id: h.ep.ID, + id: h.ep.TransportEndpointInfo.ID, ttl: h.ep.ttl, tos: h.ep.sendTOS, flags: h.flags, @@ -390,7 +390,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error { // If the timestamp option is negotiated and the segment does // not carry a timestamp option then the segment must be dropped // as per https://tools.ietf.org/html/rfc7323#section-3.2. - if h.ep.sendTSOk && !s.parsedOptions.TS { + if h.ep.SendTSOk && !s.parsedOptions.TS { h.ep.stack.Stats().DroppedPackets.Increment() return nil } @@ -405,7 +405,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error { } // Update timestamp if required. See RFC7323, section-4.3. - if h.ep.sendTSOk && s.parsedOptions.TS { + if h.ep.SendTSOk && s.parsedOptions.TS { h.ep.updateRecentTimestamp(s.parsedOptions.TSVal, h.ackNum, s.sequenceNumber) } h.state = handshakeCompleted @@ -495,8 +495,8 @@ func (h *handshake) start() { // start() is also called in a listen context so we want to make sure we only // send the TS/SACK option when we received the TS/SACK in the initial SYN. if h.state == handshakeSynRcvd { - synOpts.TS = h.ep.sendTSOk - synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled) + synOpts.TS = h.ep.SendTSOk + synOpts.SACKPermitted = h.ep.SACKPermitted && bool(sackEnabled) if h.sndWndScale < 0 { // Disable window scaling if the peer did not send us // the window scaling option. @@ -506,7 +506,7 @@ func (h *handshake) start() { h.sendSYNOpts = synOpts h.ep.sendSynTCP(h.ep.route, tcpFields{ - id: h.ep.ID, + id: h.ep.TransportEndpointInfo.ID, ttl: h.ep.ttl, tos: h.ep.sendTOS, flags: h.flags, @@ -554,7 +554,7 @@ func (h *handshake) complete() tcpip.Error { // retransmitted on their own). if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept { h.ep.sendSynTCP(h.ep.route, tcpFields{ - id: h.ep.ID, + id: h.ep.TransportEndpointInfo.ID, ttl: h.ep.ttl, tos: h.ep.sendTOS, flags: h.flags, @@ -855,7 +855,7 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte { // N.B. the ordering here matches the ordering used by Linux internally // and described in the raw makeOptions function. We don't include // unnecessary cases here (post connection.) - if e.sendTSOk { + if e.SendTSOk { // Embed the timestamp if timestamp has been enabled. // // We only use the lower 32 bits of the unix time in @@ -872,7 +872,7 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte { offset += header.EncodeNOP(options[offset:]) offset += header.EncodeTSOption(e.timestamp(), e.recentTimestamp(), options[offset:]) } - if e.sackPermitted && len(sackBlocks) > 0 { + if e.SACKPermitted && len(sackBlocks) > 0 { offset += header.EncodeNOP(options[offset:]) offset += header.EncodeNOP(options[offset:]) offset += header.EncodeSACKBlocks(sackBlocks, options[offset:]) @@ -894,7 +894,7 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, se } options := e.makeOptions(sackBlocks) err := e.sendTCP(e.route, tcpFields{ - id: e.ID, + id: e.TransportEndpointInfo.ID, ttl: e.ttl, tos: e.sendTOS, flags: flags, @@ -908,9 +908,9 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, se } func (e *endpoint) handleWrite() { - e.sndBufMu.Lock() + e.sndQueueInfo.sndQueueMu.Lock() next := e.drainSendQueueLocked() - e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Unlock() e.sendData(next) } @@ -919,10 +919,10 @@ func (e *endpoint) handleWrite() { // // Precondition: e.sndBufMu must be locked. func (e *endpoint) drainSendQueueLocked() *segment { - first := e.sndQueue.Front() + first := e.sndQueueInfo.sndQueue.Front() if first != nil { - e.snd.writeList.PushBackList(&e.sndQueue) - e.sndBufInQueue = 0 + e.snd.writeList.PushBackList(&e.sndQueueInfo.sndQueue) + e.sndQueueInfo.SndBufInQueue = 0 } return first } @@ -946,7 +946,7 @@ func (e *endpoint) handleClose() { e.handleWrite() // Mark send side as closed. - e.snd.closed = true + e.snd.Closed = true } // resetConnectionLocked puts the endpoint in an error state with the given @@ -968,12 +968,12 @@ func (e *endpoint) resetConnectionLocked(err tcpip.Error) { // // See: https://www.snellman.net/blog/archive/2016-02-01-tcp-rst/ for more // information. - sndWndEnd := e.snd.sndUna.Add(e.snd.sndWnd) + sndWndEnd := e.snd.SndUna.Add(e.snd.SndWnd) resetSeqNum := sndWndEnd - if !sndWndEnd.LessThan(e.snd.sndNxt) || e.snd.sndNxt.Size(sndWndEnd) < (1<<e.snd.sndWndScale) { - resetSeqNum = e.snd.sndNxt + if !sndWndEnd.LessThan(e.snd.SndNxt) || e.snd.SndNxt.Size(sndWndEnd) < (1<<e.snd.SndWndScale) { + resetSeqNum = e.snd.SndNxt } - e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, resetSeqNum, e.rcv.rcvNxt, 0) + e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, resetSeqNum, e.rcv.RcvNxt, 0) } } @@ -999,13 +999,13 @@ func (e *endpoint) transitionToStateEstablishedLocked(h *handshake) { // (indicated by a negative send window scale). e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale) - e.rcvListMu.Lock() + e.rcvQueueInfo.rcvQueueMu.Lock() e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale()) // Bootstrap the auto tuning algorithm. Starting at zero will // result in a really large receive window after the first auto // tuning adjustment. - e.rcvAutoParams.prevCopied = int(h.rcvWnd) - e.rcvListMu.Unlock() + e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes = int(h.rcvWnd) + e.rcvQueueInfo.rcvQueueMu.Unlock() e.setEndpointState(StateEstablished) } @@ -1036,10 +1036,15 @@ func (e *endpoint) transitionToStateCloseLocked() { // only when the endpoint is in StateClose and we want to deliver the segment // to any other listening endpoint. We reply with RST if we cannot find one. func (e *endpoint) tryDeliverSegmentFromClosedEndpoint(s *segment) { - ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.ID, s.nicID) + ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.TransportEndpointInfo.ID, s.nicID) if ep == nil && e.NetProto == header.IPv6ProtocolNumber && e.TransportEndpointInfo.ID.LocalAddress.To4() != "" { // Dual-stack socket, try IPv4. - ep = e.stack.FindTransportEndpoint(header.IPv4ProtocolNumber, e.TransProto, e.ID, s.nicID) + ep = e.stack.FindTransportEndpoint( + header.IPv4ProtocolNumber, + e.TransProto, + e.TransportEndpointInfo.ID, + s.nicID, + ) } if ep == nil { replyWithReset(e.stack, s, stack.DefaultTOS, 0 /* ttl */) @@ -1118,7 +1123,9 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err tcpip.Error) { } // handleSegments processes all inbound segments. -func (e *endpoint) handleSegments(fastPath bool) tcpip.Error { +// +// Precondition: e.mu must be held. +func (e *endpoint) handleSegmentsLocked(fastPath bool) tcpip.Error { checkRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { if e.EndpointState().closed() { @@ -1130,7 +1137,7 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error { break } - cont, err := e.handleSegment(s) + cont, err := e.handleSegmentLocked(s) s.decRef() if err != nil { return err @@ -1148,7 +1155,7 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error { } // Send an ACK for all processed packets if needed. - if e.rcv.rcvNxt != e.snd.maxSentAck { + if e.rcv.RcvNxt != e.snd.MaxSentAck { e.snd.sendAck() } @@ -1157,18 +1164,21 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error { return nil } -func (e *endpoint) probeSegment() { - if e.probe != nil { - e.probe(e.completeState()) +// Precondition: e.mu must be held. +func (e *endpoint) probeSegmentLocked() { + if fn := e.probe; fn != nil { + fn(e.completeStateLocked()) } } // handleSegment handles a given segment and notifies the worker goroutine if // if the connection should be terminated. -func (e *endpoint) handleSegment(s *segment) (cont bool, err tcpip.Error) { +// +// Precondition: e.mu must be held. +func (e *endpoint) handleSegmentLocked(s *segment) (cont bool, err tcpip.Error) { // Invoke the tcp probe if installed. The tcp probe function will update // the TCPEndpointState after the segment is processed. - defer e.probeSegment() + defer e.probeSegmentLocked() if s.flagIsSet(header.TCPFlagRst) { if ok, err := e.handleReset(s); !ok { @@ -1201,7 +1211,7 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err tcpip.Error) { } else if s.flagIsSet(header.TCPFlagAck) { // Patch the window size in the segment according to the // send window scale. - s.window <<= e.snd.sndWndScale + s.window <<= e.snd.SndWndScale // RFC 793, page 41 states that "once in the ESTABLISHED // state all segments must carry current acknowledgment @@ -1265,7 +1275,7 @@ func (e *endpoint) keepaliveTimerExpired() tcpip.Error { // seg.seq = snd.nxt-1. e.keepalive.unacked++ e.keepalive.Unlock() - e.snd.sendSegmentFromView(buffer.VectorisedView{}, header.TCPFlagAck, e.snd.sndNxt-1) + e.snd.sendSegmentFromView(buffer.VectorisedView{}, header.TCPFlagAck, e.snd.SndNxt-1) e.resetKeepaliveTimer(false) return nil } @@ -1279,7 +1289,7 @@ func (e *endpoint) resetKeepaliveTimer(receivedData bool) { } // Start the keepalive timer IFF it's enabled and there is no pending // data to send. - if !e.SocketOptions().GetKeepAlive() || e.snd == nil || e.snd.sndUna != e.snd.sndNxt { + if !e.SocketOptions().GetKeepAlive() || e.snd == nil || e.snd.SndUna != e.snd.SndNxt { e.keepalive.timer.disable() e.keepalive.Unlock() return @@ -1372,14 +1382,14 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ f func() tcpip.Error }{ { - w: &e.sndWaker, + w: &e.sndQueueInfo.sndWaker, f: func() tcpip.Error { e.handleWrite() return nil }, }, { - w: &e.sndCloseWaker, + w: &e.sndQueueInfo.sndCloseWaker, f: func() tcpip.Error { e.handleClose() return nil @@ -1413,7 +1423,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ { w: &e.newSegmentWaker, f: func() tcpip.Error { - return e.handleSegments(false /* fastPath */) + return e.handleSegmentsLocked(false /* fastPath */) }, }, { @@ -1429,11 +1439,11 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ } if n¬ifyMTUChanged != 0 { - e.sndBufMu.Lock() - count := e.packetTooBigCount - e.packetTooBigCount = 0 - mtu := e.sndMTU - e.sndBufMu.Unlock() + e.sndQueueInfo.sndQueueMu.Lock() + count := e.sndQueueInfo.PacketTooBigCount + e.sndQueueInfo.PacketTooBigCount = 0 + mtu := e.sndQueueInfo.SndMTU + e.sndQueueInfo.sndQueueMu.Unlock() e.snd.updateMaxPayloadSize(mtu, count) } @@ -1463,7 +1473,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ if n¬ifyDrain != 0 { for !e.segmentQueue.empty() { - if err := e.handleSegments(false /* fastPath */); err != nil { + if err := e.handleSegmentsLocked(false /* fastPath */); err != nil { return err } } @@ -1514,11 +1524,11 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.newSegmentWaker.Assert() } - e.rcvListMu.Lock() - if !e.rcvList.Empty() { + e.rcvQueueInfo.rcvQueueMu.Lock() + if !e.rcvQueueInfo.rcvQueue.Empty() { e.waiterQueue.Notify(waiter.ReadableEvents) } - e.rcvListMu.Unlock() + e.rcvQueueInfo.rcvQueueMu.Unlock() if e.workerCleanup { e.notifyProtocolGoroutine(notifyClose) |