summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/connect.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go114
1 files changed, 62 insertions, 52 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 8f0f0c3e9..7bc6b08f0 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -156,7 +156,7 @@ func (h *handshake) resetState() {
h.flags = header.TCPFlagSyn
h.ackNum = 0
h.mss = 0
- h.iss = generateSecureISN(h.ep.ID, h.ep.stack.Seed())
+ h.iss = generateSecureISN(h.ep.TransportEndpointInfo.ID, h.ep.stack.Seed())
}
// generateSecureISN generates a secure Initial Sequence number based on the
@@ -302,7 +302,7 @@ func (h *handshake) synSentState(s *segment) tcpip.Error {
ttl = h.ep.route.DefaultTTL()
}
h.ep.sendSynTCP(h.ep.route, tcpFields{
- id: h.ep.ID,
+ id: h.ep.TransportEndpointInfo.ID,
ttl: ttl,
tos: h.ep.sendTOS,
flags: h.flags,
@@ -358,14 +358,14 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
h.resetState()
synOpts := header.TCPSynOptions{
WS: h.rcvWndScale,
- TS: h.ep.sendTSOk,
+ TS: h.ep.SendTSOk,
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTimestamp(),
- SACKPermitted: h.ep.sackPermitted,
+ SACKPermitted: h.ep.SACKPermitted,
MSS: h.ep.amss,
}
h.ep.sendSynTCP(h.ep.route, tcpFields{
- id: h.ep.ID,
+ id: h.ep.TransportEndpointInfo.ID,
ttl: h.ep.ttl,
tos: h.ep.sendTOS,
flags: h.flags,
@@ -390,7 +390,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
// If the timestamp option is negotiated and the segment does
// not carry a timestamp option then the segment must be dropped
// as per https://tools.ietf.org/html/rfc7323#section-3.2.
- if h.ep.sendTSOk && !s.parsedOptions.TS {
+ if h.ep.SendTSOk && !s.parsedOptions.TS {
h.ep.stack.Stats().DroppedPackets.Increment()
return nil
}
@@ -405,7 +405,7 @@ func (h *handshake) synRcvdState(s *segment) tcpip.Error {
}
// Update timestamp if required. See RFC7323, section-4.3.
- if h.ep.sendTSOk && s.parsedOptions.TS {
+ if h.ep.SendTSOk && s.parsedOptions.TS {
h.ep.updateRecentTimestamp(s.parsedOptions.TSVal, h.ackNum, s.sequenceNumber)
}
h.state = handshakeCompleted
@@ -495,8 +495,8 @@ func (h *handshake) start() {
// start() is also called in a listen context so we want to make sure we only
// send the TS/SACK option when we received the TS/SACK in the initial SYN.
if h.state == handshakeSynRcvd {
- synOpts.TS = h.ep.sendTSOk
- synOpts.SACKPermitted = h.ep.sackPermitted && bool(sackEnabled)
+ synOpts.TS = h.ep.SendTSOk
+ synOpts.SACKPermitted = h.ep.SACKPermitted && bool(sackEnabled)
if h.sndWndScale < 0 {
// Disable window scaling if the peer did not send us
// the window scaling option.
@@ -506,7 +506,7 @@ func (h *handshake) start() {
h.sendSYNOpts = synOpts
h.ep.sendSynTCP(h.ep.route, tcpFields{
- id: h.ep.ID,
+ id: h.ep.TransportEndpointInfo.ID,
ttl: h.ep.ttl,
tos: h.ep.sendTOS,
flags: h.flags,
@@ -554,7 +554,7 @@ func (h *handshake) complete() tcpip.Error {
// retransmitted on their own).
if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept {
h.ep.sendSynTCP(h.ep.route, tcpFields{
- id: h.ep.ID,
+ id: h.ep.TransportEndpointInfo.ID,
ttl: h.ep.ttl,
tos: h.ep.sendTOS,
flags: h.flags,
@@ -855,7 +855,7 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte {
// N.B. the ordering here matches the ordering used by Linux internally
// and described in the raw makeOptions function. We don't include
// unnecessary cases here (post connection.)
- if e.sendTSOk {
+ if e.SendTSOk {
// Embed the timestamp if timestamp has been enabled.
//
// We only use the lower 32 bits of the unix time in
@@ -872,7 +872,7 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte {
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeTSOption(e.timestamp(), e.recentTimestamp(), options[offset:])
}
- if e.sackPermitted && len(sackBlocks) > 0 {
+ if e.SACKPermitted && len(sackBlocks) > 0 {
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeNOP(options[offset:])
offset += header.EncodeSACKBlocks(sackBlocks, options[offset:])
@@ -894,7 +894,7 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, se
}
options := e.makeOptions(sackBlocks)
err := e.sendTCP(e.route, tcpFields{
- id: e.ID,
+ id: e.TransportEndpointInfo.ID,
ttl: e.ttl,
tos: e.sendTOS,
flags: flags,
@@ -908,9 +908,9 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags header.TCPFlags, se
}
func (e *endpoint) handleWrite() {
- e.sndBufMu.Lock()
+ e.sndQueueInfo.sndQueueMu.Lock()
next := e.drainSendQueueLocked()
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Unlock()
e.sendData(next)
}
@@ -919,10 +919,10 @@ func (e *endpoint) handleWrite() {
//
// Precondition: e.sndBufMu must be locked.
func (e *endpoint) drainSendQueueLocked() *segment {
- first := e.sndQueue.Front()
+ first := e.sndQueueInfo.sndQueue.Front()
if first != nil {
- e.snd.writeList.PushBackList(&e.sndQueue)
- e.sndBufInQueue = 0
+ e.snd.writeList.PushBackList(&e.sndQueueInfo.sndQueue)
+ e.sndQueueInfo.SndBufInQueue = 0
}
return first
}
@@ -946,7 +946,7 @@ func (e *endpoint) handleClose() {
e.handleWrite()
// Mark send side as closed.
- e.snd.closed = true
+ e.snd.Closed = true
}
// resetConnectionLocked puts the endpoint in an error state with the given
@@ -968,12 +968,12 @@ func (e *endpoint) resetConnectionLocked(err tcpip.Error) {
//
// See: https://www.snellman.net/blog/archive/2016-02-01-tcp-rst/ for more
// information.
- sndWndEnd := e.snd.sndUna.Add(e.snd.sndWnd)
+ sndWndEnd := e.snd.SndUna.Add(e.snd.SndWnd)
resetSeqNum := sndWndEnd
- if !sndWndEnd.LessThan(e.snd.sndNxt) || e.snd.sndNxt.Size(sndWndEnd) < (1<<e.snd.sndWndScale) {
- resetSeqNum = e.snd.sndNxt
+ if !sndWndEnd.LessThan(e.snd.SndNxt) || e.snd.SndNxt.Size(sndWndEnd) < (1<<e.snd.SndWndScale) {
+ resetSeqNum = e.snd.SndNxt
}
- e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, resetSeqNum, e.rcv.rcvNxt, 0)
+ e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, resetSeqNum, e.rcv.RcvNxt, 0)
}
}
@@ -999,13 +999,13 @@ func (e *endpoint) transitionToStateEstablishedLocked(h *handshake) {
// (indicated by a negative send window scale).
e.snd = newSender(e, h.iss, h.ackNum-1, h.sndWnd, h.mss, h.sndWndScale)
- e.rcvListMu.Lock()
+ e.rcvQueueInfo.rcvQueueMu.Lock()
e.rcv = newReceiver(e, h.ackNum-1, h.rcvWnd, h.effectiveRcvWndScale())
// Bootstrap the auto tuning algorithm. Starting at zero will
// result in a really large receive window after the first auto
// tuning adjustment.
- e.rcvAutoParams.prevCopied = int(h.rcvWnd)
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.RcvAutoParams.PrevCopiedBytes = int(h.rcvWnd)
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
e.setEndpointState(StateEstablished)
}
@@ -1036,10 +1036,15 @@ func (e *endpoint) transitionToStateCloseLocked() {
// only when the endpoint is in StateClose and we want to deliver the segment
// to any other listening endpoint. We reply with RST if we cannot find one.
func (e *endpoint) tryDeliverSegmentFromClosedEndpoint(s *segment) {
- ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.ID, s.nicID)
+ ep := e.stack.FindTransportEndpoint(e.NetProto, e.TransProto, e.TransportEndpointInfo.ID, s.nicID)
if ep == nil && e.NetProto == header.IPv6ProtocolNumber && e.TransportEndpointInfo.ID.LocalAddress.To4() != "" {
// Dual-stack socket, try IPv4.
- ep = e.stack.FindTransportEndpoint(header.IPv4ProtocolNumber, e.TransProto, e.ID, s.nicID)
+ ep = e.stack.FindTransportEndpoint(
+ header.IPv4ProtocolNumber,
+ e.TransProto,
+ e.TransportEndpointInfo.ID,
+ s.nicID,
+ )
}
if ep == nil {
replyWithReset(e.stack, s, stack.DefaultTOS, 0 /* ttl */)
@@ -1118,7 +1123,9 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err tcpip.Error) {
}
// handleSegments processes all inbound segments.
-func (e *endpoint) handleSegments(fastPath bool) tcpip.Error {
+//
+// Precondition: e.mu must be held.
+func (e *endpoint) handleSegmentsLocked(fastPath bool) tcpip.Error {
checkRequeue := true
for i := 0; i < maxSegmentsPerWake; i++ {
if e.EndpointState().closed() {
@@ -1130,7 +1137,7 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error {
break
}
- cont, err := e.handleSegment(s)
+ cont, err := e.handleSegmentLocked(s)
s.decRef()
if err != nil {
return err
@@ -1148,7 +1155,7 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error {
}
// Send an ACK for all processed packets if needed.
- if e.rcv.rcvNxt != e.snd.maxSentAck {
+ if e.rcv.RcvNxt != e.snd.MaxSentAck {
e.snd.sendAck()
}
@@ -1157,18 +1164,21 @@ func (e *endpoint) handleSegments(fastPath bool) tcpip.Error {
return nil
}
-func (e *endpoint) probeSegment() {
- if e.probe != nil {
- e.probe(e.completeState())
+// Precondition: e.mu must be held.
+func (e *endpoint) probeSegmentLocked() {
+ if fn := e.probe; fn != nil {
+ fn(e.completeStateLocked())
}
}
// handleSegment handles a given segment and notifies the worker goroutine if
// if the connection should be terminated.
-func (e *endpoint) handleSegment(s *segment) (cont bool, err tcpip.Error) {
+//
+// Precondition: e.mu must be held.
+func (e *endpoint) handleSegmentLocked(s *segment) (cont bool, err tcpip.Error) {
// Invoke the tcp probe if installed. The tcp probe function will update
// the TCPEndpointState after the segment is processed.
- defer e.probeSegment()
+ defer e.probeSegmentLocked()
if s.flagIsSet(header.TCPFlagRst) {
if ok, err := e.handleReset(s); !ok {
@@ -1201,7 +1211,7 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err tcpip.Error) {
} else if s.flagIsSet(header.TCPFlagAck) {
// Patch the window size in the segment according to the
// send window scale.
- s.window <<= e.snd.sndWndScale
+ s.window <<= e.snd.SndWndScale
// RFC 793, page 41 states that "once in the ESTABLISHED
// state all segments must carry current acknowledgment
@@ -1265,7 +1275,7 @@ func (e *endpoint) keepaliveTimerExpired() tcpip.Error {
// seg.seq = snd.nxt-1.
e.keepalive.unacked++
e.keepalive.Unlock()
- e.snd.sendSegmentFromView(buffer.VectorisedView{}, header.TCPFlagAck, e.snd.sndNxt-1)
+ e.snd.sendSegmentFromView(buffer.VectorisedView{}, header.TCPFlagAck, e.snd.SndNxt-1)
e.resetKeepaliveTimer(false)
return nil
}
@@ -1279,7 +1289,7 @@ func (e *endpoint) resetKeepaliveTimer(receivedData bool) {
}
// Start the keepalive timer IFF it's enabled and there is no pending
// data to send.
- if !e.SocketOptions().GetKeepAlive() || e.snd == nil || e.snd.sndUna != e.snd.sndNxt {
+ if !e.SocketOptions().GetKeepAlive() || e.snd == nil || e.snd.SndUna != e.snd.SndNxt {
e.keepalive.timer.disable()
e.keepalive.Unlock()
return
@@ -1372,14 +1382,14 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
f func() tcpip.Error
}{
{
- w: &e.sndWaker,
+ w: &e.sndQueueInfo.sndWaker,
f: func() tcpip.Error {
e.handleWrite()
return nil
},
},
{
- w: &e.sndCloseWaker,
+ w: &e.sndQueueInfo.sndCloseWaker,
f: func() tcpip.Error {
e.handleClose()
return nil
@@ -1413,7 +1423,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
{
w: &e.newSegmentWaker,
f: func() tcpip.Error {
- return e.handleSegments(false /* fastPath */)
+ return e.handleSegmentsLocked(false /* fastPath */)
},
},
{
@@ -1429,11 +1439,11 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}
if n&notifyMTUChanged != 0 {
- e.sndBufMu.Lock()
- count := e.packetTooBigCount
- e.packetTooBigCount = 0
- mtu := e.sndMTU
- e.sndBufMu.Unlock()
+ e.sndQueueInfo.sndQueueMu.Lock()
+ count := e.sndQueueInfo.PacketTooBigCount
+ e.sndQueueInfo.PacketTooBigCount = 0
+ mtu := e.sndQueueInfo.SndMTU
+ e.sndQueueInfo.sndQueueMu.Unlock()
e.snd.updateMaxPayloadSize(mtu, count)
}
@@ -1463,7 +1473,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
if n&notifyDrain != 0 {
for !e.segmentQueue.empty() {
- if err := e.handleSegments(false /* fastPath */); err != nil {
+ if err := e.handleSegmentsLocked(false /* fastPath */); err != nil {
return err
}
}
@@ -1514,11 +1524,11 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.newSegmentWaker.Assert()
}
- e.rcvListMu.Lock()
- if !e.rcvList.Empty() {
+ e.rcvQueueInfo.rcvQueueMu.Lock()
+ if !e.rcvQueueInfo.rcvQueue.Empty() {
e.waiterQueue.Notify(waiter.ReadableEvents)
}
- e.rcvListMu.Unlock()
+ e.rcvQueueInfo.rcvQueueMu.Unlock()
if e.workerCleanup {
e.notifyProtocolGoroutine(notifyClose)