summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/connect.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/tcp/connect.go')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go78
1 files changed, 24 insertions, 54 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index be86af502..edb37a549 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -61,6 +61,9 @@ const (
)
// handshake holds the state used during a TCP 3-way handshake.
+//
+// NOTE: handshake.ep.mu is held during handshake processing. It is released if
+// we are going to block and reacquired when we start processing an event.
type handshake struct {
ep *endpoint
state handshakeState
@@ -209,9 +212,7 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea
h.mss = opts.MSS
h.sndWndScale = opts.WS
h.deferAccept = deferAccept
- h.ep.mu.Lock()
h.ep.setEndpointState(StateSynRecv)
- h.ep.mu.Unlock()
}
// checkAck checks if the ACK number, if present, of a segment received during
@@ -241,9 +242,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
// RFC 793, page 67, states that "If the RST bit is set [and] If the ACK
// was acceptable then signal the user "error: connection reset", drop
// the segment, enter CLOSED state, delete TCB, and return."
- h.ep.mu.Lock()
h.ep.workerCleanup = true
- h.ep.mu.Unlock()
// Although the RFC above calls out ECONNRESET, Linux actually returns
// ECONNREFUSED here so we do as well.
return tcpip.ErrConnectionRefused
@@ -281,9 +280,7 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
if s.flagIsSet(header.TCPFlagAck) {
h.state = handshakeCompleted
- h.ep.mu.Lock()
h.ep.transitionToStateEstablishedLocked(h)
- h.ep.mu.Unlock()
h.ep.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck, h.iss+1, h.ackNum, h.rcvWnd>>h.effectiveRcvWndScale())
return nil
@@ -293,11 +290,9 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
// but resend our own SYN and wait for it to be acknowledged in the
// SYN-RCVD state.
h.state = handshakeSynRcvd
- h.ep.mu.Lock()
ttl := h.ep.ttl
amss := h.ep.amss
h.ep.setEndpointState(StateSynRecv)
- h.ep.mu.Unlock()
synOpts := header.TCPSynOptions{
WS: int(h.effectiveRcvWndScale()),
TS: rcvSynOpts.TS,
@@ -357,10 +352,6 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
return tcpip.ErrInvalidEndpointState
}
- h.ep.mu.RLock()
- amss := h.ep.amss
- h.ep.mu.RUnlock()
-
h.resetState()
synOpts := header.TCPSynOptions{
WS: h.rcvWndScale,
@@ -368,7 +359,7 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
TSVal: h.ep.timestamp(),
TSEcr: h.ep.recentTimestamp(),
SACKPermitted: h.ep.sackPermitted,
- MSS: amss,
+ MSS: h.ep.amss,
}
h.ep.sendSynTCP(&s.route, h.ep.ID, h.ep.ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
return nil
@@ -399,15 +390,14 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
}
h.state = handshakeCompleted
- h.ep.mu.Lock()
h.ep.transitionToStateEstablishedLocked(h)
+
// If the segment has data then requeue it for the receiver
// to process it again once main loop is started.
if s.data.Size() > 0 {
s.incRef()
h.ep.enqueueSegment(s)
}
- h.ep.mu.Unlock()
return nil
}
@@ -493,7 +483,9 @@ func (h *handshake) resolveRoute() *tcpip.Error {
}
if n&notifyDrain != 0 {
close(h.ep.drainDone)
+ h.ep.mu.Unlock()
<-h.ep.undrain
+ h.ep.mu.Lock()
}
}
@@ -535,7 +527,6 @@ func (h *handshake) execute() *tcpip.Error {
// Send the initial SYN segment and loop until the handshake is
// completed.
- h.ep.mu.Lock()
h.ep.amss = calculateAdvertisedMSS(h.ep.userMSS, h.ep.route)
synOpts := header.TCPSynOptions{
@@ -546,7 +537,6 @@ func (h *handshake) execute() *tcpip.Error {
SACKPermitted: bool(sackEnabled),
MSS: h.ep.amss,
}
- h.ep.mu.Unlock()
// Execute is also called in a listen context so we want to make sure we
// only send the TS/SACK option when we received the TS/SACK in the
@@ -563,7 +553,11 @@ func (h *handshake) execute() *tcpip.Error {
h.ep.sendSynTCP(&h.ep.route, h.ep.ID, h.ep.ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts)
for h.state != handshakeCompleted {
- switch index, _ := s.Fetch(true); index {
+ h.ep.mu.Unlock()
+ index, _ := s.Fetch(true)
+ h.ep.mu.Lock()
+ switch index {
+
case wakerForResend:
timeOut *= 2
if timeOut > MaxRTO {
@@ -600,7 +594,9 @@ func (h *handshake) execute() *tcpip.Error {
}
}
close(h.ep.drainDone)
+ h.ep.mu.Unlock()
<-h.ep.undrain
+ h.ep.mu.Lock()
}
case wakerForNewSegment:
@@ -1016,7 +1012,6 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err *tcpip.Error) {
// except SYN-SENT, all reset (RST) segments are
// validated by checking their SEQ-fields." So
// we only process it if it's acceptable.
- e.mu.Lock()
switch e.EndpointState() {
// In case of a RST in CLOSE-WAIT linux moves
// the socket to closed state with an error set
@@ -1040,11 +1035,9 @@ func (e *endpoint) handleReset(s *segment) (ok bool, err *tcpip.Error) {
case StateCloseWait:
e.transitionToStateCloseLocked()
e.HardError = tcpip.ErrAborted
- e.mu.Unlock()
e.notifyProtocolGoroutine(notifyTickleWorker)
return false, nil
default:
- e.mu.Unlock()
// RFC 793, page 37 states that "in all states
// except SYN-SENT, all reset (RST) segments are
// validated by checking their SEQ-fields." So
@@ -1157,9 +1150,7 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err *tcpip.Error) {
// Now check if the received segment has caused us to transition
// to a CLOSED state, if yes then terminate processing and do
// not invoke the sender.
- e.mu.RLock()
state := e.state
- e.mu.RUnlock()
if state == StateClose {
// When we get into StateClose while processing from the queue,
// return immediately and let the protocolMainloop handle it.
@@ -1182,9 +1173,7 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err *tcpip.Error) {
// keepalive packets periodically when the connection is idle. If we don't hear
// from the other side after a number of tries, we terminate the connection.
func (e *endpoint) keepaliveTimerExpired() *tcpip.Error {
- e.mu.RLock()
userTimeout := e.userTimeout
- e.mu.RUnlock()
e.keepalive.Lock()
if !e.keepalive.enabled || !e.keepalive.timer.checkExpiration() {
@@ -1248,6 +1237,7 @@ func (e *endpoint) disableKeepaliveTimer() {
// goroutine and is responsible for sending segments and handling received
// segments.
func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{}) *tcpip.Error {
+ e.mu.Lock()
var closeTimer *time.Timer
var closeWaker sleep.Waker
@@ -1269,7 +1259,6 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}
e.mu.Unlock()
- e.workMu.Unlock()
// When the protocol loop exits we should wake up our waiters.
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
@@ -1280,16 +1269,13 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
// completion.
initialRcvWnd := e.initialReceiveWindow()
h := newHandshake(e, seqnum.Size(initialRcvWnd))
- e.mu.Lock()
h.ep.setEndpointState(StateSynSent)
- e.mu.Unlock()
if err := h.execute(); err != nil {
e.lastErrorMu.Lock()
e.lastError = err
e.lastErrorMu.Unlock()
- e.mu.Lock()
e.setEndpointState(StateError)
e.HardError = err
@@ -1302,9 +1288,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.keepalive.timer.init(&e.keepalive.waker)
defer e.keepalive.timer.cleanup()
- e.mu.Lock()
drained := e.drainDone != nil
- e.mu.Unlock()
if drained {
close(e.drainDone)
<-e.undrain
@@ -1330,10 +1314,8 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
// This means the socket is being closed due
// to the TCP-FIN-WAIT2 timeout was hit. Just
// mark the socket as closed.
- e.mu.Lock()
e.transitionToStateCloseLocked()
e.workerCleanup = true
- e.mu.Unlock()
return nil
},
},
@@ -1388,7 +1370,6 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}
if n&notifyClose != 0 && closeTimer == nil {
- e.mu.Lock()
if e.EndpointState() == StateFinWait2 && e.closed {
// The socket has been closed and we are in FIN_WAIT2
// so start the FIN_WAIT2 timer.
@@ -1397,7 +1378,6 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
})
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}
- e.mu.Unlock()
}
if n&notifyKeepaliveChanged != 0 {
@@ -1417,7 +1397,9 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
// Only block the worker if the endpoint
// is not in closed state or error state.
close(e.drainDone)
+ e.mu.Unlock()
<-e.undrain
+ e.mu.Lock()
}
}
@@ -1460,7 +1442,6 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}
e.rcvListMu.Unlock()
- e.mu.Lock()
if e.workerCleanup {
e.notifyProtocolGoroutine(notifyClose)
}
@@ -1468,7 +1449,6 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
// Main loop. Handle segments until both send and receive ends of the
// connection have completed.
cleanupOnError := func(err *tcpip.Error) {
- e.mu.Lock()
e.workerCleanup = true
if err != nil {
e.resetConnectionLocked(err)
@@ -1480,16 +1460,11 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
loop:
for e.EndpointState() != StateTimeWait && e.EndpointState() != StateClose && e.EndpointState() != StateError {
e.mu.Unlock()
- e.workMu.Unlock()
v, _ := s.Fetch(true)
- e.workMu.Lock()
+ e.mu.Lock()
- // We need to double check here because the notification maybe
+ // We need to double check here because the notification may be
// stale by the time we got around to processing it.
- //
- // NOTE: since we now hold the workMu the processors cannot
- // change the state of the endpoint so it's safe to proceed
- // after this check.
switch e.EndpointState() {
case StateError:
// If the endpoint has already transitioned to an ERROR
@@ -1502,21 +1477,17 @@ loop:
case StateTimeWait:
fallthrough
case StateClose:
- e.mu.Lock()
break loop
default:
if err := funcs[v].f(); err != nil {
cleanupOnError(err)
return nil
}
- e.mu.Lock()
}
}
- state := e.EndpointState()
- e.mu.Unlock()
var reuseTW func()
- if state == StateTimeWait {
+ if e.EndpointState() == StateTimeWait {
// Disable close timer as we now entering real TIME_WAIT.
if closeTimer != nil {
closeTimer.Stop()
@@ -1526,14 +1497,11 @@ loop:
s.Done()
// Wake up any waiters before we enter TIME_WAIT.
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
- e.mu.Lock()
e.workerCleanup = true
- e.mu.Unlock()
reuseTW = e.doTimeWait()
}
// Mark endpoint as closed.
- e.mu.Lock()
if e.EndpointState() != StateError {
e.transitionToStateCloseLocked()
}
@@ -1649,9 +1617,9 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
defer timeWaitTimer.Stop()
for {
- e.workMu.Unlock()
+ e.mu.Unlock()
v, _ := s.Fetch(true)
- e.workMu.Lock()
+ e.mu.Lock()
switch v {
case newSegment:
extendTimeWait, reuseTW := e.handleTimeWaitSegments()
@@ -1674,7 +1642,9 @@ func (e *endpoint) doTimeWait() (twReuse func()) {
e.handleTimeWaitSegments()
}
close(e.drainDone)
+ e.mu.Unlock()
<-e.undrain
+ e.mu.Lock()
return nil
}
case timeWaitDone: