diff options
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 117 |
1 files changed, 93 insertions, 24 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index a114c06c1..4206db8b6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -299,6 +299,15 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { return nil } + // RFC 793, Section 3.9, page 69, states that in the SYN-RCVD state, a + // sequence number outside of the window causes an ACK with the proper seq + // number and "After sending the acknowledgment, drop the unacceptable + // segment and return." + if !s.sequenceNumber.InWindow(h.ackNum, h.rcvWnd) { + h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck, h.iss+1, h.ackNum, h.rcvWnd) + return nil + } + if s.flagIsSet(header.TCPFlagSyn) && s.sequenceNumber != h.ackNum-1 { // We received two SYN segments with different sequence // numbers, so we reset this and restart the whole @@ -631,13 +640,14 @@ func (e *endpoint) sendTCP(r *stack.Route, id stack.TransportEndpointID, data bu return nil } -func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, d *stack.PacketDescriptor, data buffer.VectorisedView, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) { +func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, pkt *tcpip.PacketBuffer, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) { optLen := len(opts) - hdr := &d.Hdr - packetSize := d.Size - off := d.Off + hdr := &pkt.Header + packetSize := pkt.DataSize + off := pkt.DataOffset // Initialize the header. tcp := header.TCP(hdr.Prepend(header.TCPMinimumSize + optLen)) + pkt.TransportHeader = buffer.View(tcp) tcp.Encode(&header.TCPFields{ SrcPort: id.LocalPort, DstPort: id.RemotePort, @@ -659,7 +669,7 @@ func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, d *stack.PacketDe // header and data and get the right sum of the TCP packet. tcp.SetChecksum(xsum) } else if r.Capabilities()&stack.CapabilityTXChecksumOffload == 0 { - xsum = header.ChecksumVVWithOffset(data, xsum, off, packetSize) + xsum = header.ChecksumVVWithOffset(pkt.Data, xsum, off, packetSize) tcp.SetChecksum(^tcp.CalculateChecksum(xsum)) } @@ -674,7 +684,13 @@ func sendTCPBatch(r *stack.Route, id stack.TransportEndpointID, data buffer.Vect mss := int(gso.MSS) n := (data.Size() + mss - 1) / mss - hdrs := stack.NewPacketDescriptors(n, header.TCPMinimumSize+int(r.MaxHeaderLength())+optLen) + // Allocate one big slice for all the headers. + hdrSize := header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen + buf := make([]byte, n*hdrSize) + pkts := make([]tcpip.PacketBuffer, n) + for i := range pkts { + pkts[i].Header = buffer.NewEmptyPrependableFromView(buf[i*hdrSize:][:hdrSize]) + } size := data.Size() off := 0 @@ -684,16 +700,17 @@ func sendTCPBatch(r *stack.Route, id stack.TransportEndpointID, data buffer.Vect packetSize = size } size -= packetSize - hdrs[i].Off = off - hdrs[i].Size = packetSize - buildTCPHdr(r, id, &hdrs[i], data, flags, seq, ack, rcvWnd, opts, gso) + pkts[i].DataOffset = off + pkts[i].DataSize = packetSize + pkts[i].Data = data + buildTCPHdr(r, id, &pkts[i], flags, seq, ack, rcvWnd, opts, gso) off += packetSize seq = seq.Add(seqnum.Size(packetSize)) } if ttl == 0 { ttl = r.DefaultTTL() } - sent, err := r.WritePackets(gso, hdrs, data, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}) + sent, err := r.WritePackets(gso, pkts, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}) if err != nil { r.Stats().TCP.SegmentSendErrors.IncrementBy(uint64(n - sent)) } @@ -713,17 +730,18 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise return sendTCPBatch(r, id, data, ttl, tos, flags, seq, ack, rcvWnd, opts, gso) } - d := &stack.PacketDescriptor{ - Hdr: buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen), - Off: 0, - Size: data.Size(), + pkt := tcpip.PacketBuffer{ + Header: buffer.NewPrependable(header.TCPMinimumSize + int(r.MaxHeaderLength()) + optLen), + DataOffset: 0, + DataSize: data.Size(), + Data: data, } - buildTCPHdr(r, id, d, data, flags, seq, ack, rcvWnd, opts, gso) + buildTCPHdr(r, id, &pkt, flags, seq, ack, rcvWnd, opts, gso) if ttl == 0 { ttl = r.DefaultTTL() } - if err := r.WritePacket(gso, d.Hdr, data, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}); err != nil { + if err := r.WritePacket(gso, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}, pkt); err != nil { r.Stats().TCP.SegmentSendErrors.Increment() return err } @@ -862,6 +880,33 @@ func (e *endpoint) completeWorkerLocked() { } } +// transitionToStateCloseLocked ensures that the endpoint is +// cleaned up from the transport demuxer, "before" moving to +// StateClose. This will ensure that no packet will be +// delivered to this endpoint from the demuxer when the endpoint +// is transitioned to StateClose. +func (e *endpoint) transitionToStateCloseLocked() { + if e.state == StateClose { + return + } + e.cleanupLocked() + e.state = StateClose +} + +// tryDeliverSegmentFromClosedEndpoint attempts to deliver the parsed +// segment to any other endpoint other than the current one. This is called +// only when the endpoint is in StateClose and we want to deliver the segment +// to any other listening endpoint. We reply with RST if we cannot find one. +func (e *endpoint) tryDeliverSegmentFromClosedEndpoint(s *segment) { + ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.ID, &s.route) + if ep == nil { + replyWithReset(s) + s.decRef() + return + } + ep.(*endpoint).enqueueSegment(s) +} + func (e *endpoint) handleReset(s *segment) (ok bool, err *tcpip.Error) { if e.rcv.acceptable(s.sequenceNumber, 0) { // RFC 793, page 37 states that "in all states @@ -891,12 +936,8 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err *tcpip.Error) { // general "connection reset" signal. Enter the CLOSED state, // delete the TCB, and return. case StateCloseWait: - e.state = StateClose + e.transitionToStateCloseLocked() e.HardError = tcpip.ErrAborted - // We need to set this explicitly here because otherwise - // the port registrations will not be released till the - // endpoint is actively closed by the application. - e.workerCleanup = true e.mu.Unlock() return false, nil default: @@ -912,6 +953,20 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err *tcpip.Error) { func (e *endpoint) handleSegments() *tcpip.Error { checkRequeue := true for i := 0; i < maxSegmentsPerWake; i++ { + e.mu.RLock() + state := e.state + e.mu.RUnlock() + if state == StateClose { + // When we get into StateClose while processing from the queue, + // return immediately and let the protocolMainloop handle it. + // + // We can reach StateClose only while processing a previous segment + // or a notification from the protocolMainLoop (caller goroutine). + // This means that with this return, the segment dequeue below can + // never occur on a closed endpoint. + return nil + } + s := e.segmentQueue.dequeue() if s == nil { checkRequeue = false @@ -1157,7 +1212,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { // to the TCP_FIN_WAIT2 timeout was hit. Just // mark the socket as closed. e.mu.Lock() - e.state = StateClose + e.transitionToStateCloseLocked() e.mu.Unlock() return nil }, @@ -1229,7 +1284,9 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { return err } } - if e.state != StateError { + if e.state != StateClose && e.state != StateError { + // Only block the worker if the endpoint + // is not in closed state or error state. close(e.drainDone) <-e.undrain } @@ -1316,12 +1373,24 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { if e.state != StateError { e.stack.Stats().TCP.EstablishedResets.Increment() e.stack.Stats().TCP.CurrentEstablished.Decrement() - e.state = StateClose + e.transitionToStateCloseLocked() } // Lock released below. epilogue() + // epilogue removes the endpoint from the transport-demuxer and + // unlocks e.mu. Now that no new segments can get enqueued to this + // endpoint, try to re-match the segment to a different endpoint + // as the current endpoint is closed. + for { + s := e.segmentQueue.dequeue() + if s == nil { + break + } + e.tryDeliverSegmentFromClosedEndpoint(s) + } + // A new SYN was received during TIME_WAIT and we need to abort // the timewait and redirect the segment to the listener queue if reuseTW != nil { |