summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/accept.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport/tcp/accept.go')
-rw-r--r--pkg/tcpip/transport/tcp/accept.go161
1 files changed, 118 insertions, 43 deletions
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index e506d7133..d4b860975 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -19,6 +19,7 @@ import (
"encoding/binary"
"hash"
"io"
+ "log"
"sync"
"time"
@@ -87,9 +88,10 @@ var synRcvdCount struct {
// and must not be accessed or have its methods called concurrently as they
// may mutate the stored objects.
type listenContext struct {
- stack *stack.Stack
- rcvWnd seqnum.Size
- nonce [2][sha1.BlockSize]byte
+ stack *stack.Stack
+ rcvWnd seqnum.Size
+ nonce [2][sha1.BlockSize]byte
+ listenEP *endpoint
hasherMu sync.Mutex
hasher hash.Hash
@@ -107,15 +109,16 @@ func timeStamp() uint32 {
// threshold, and fails otherwise.
func incSynRcvdCount() bool {
synRcvdCount.Lock()
- defer synRcvdCount.Unlock()
if synRcvdCount.value >= SynRcvdCountThreshold {
+ synRcvdCount.Unlock()
return false
}
synRcvdCount.pending.Add(1)
synRcvdCount.value++
+ synRcvdCount.Unlock()
return true
}
@@ -124,20 +127,21 @@ func incSynRcvdCount() bool {
// succeeded.
func decSynRcvdCount() {
synRcvdCount.Lock()
- defer synRcvdCount.Unlock()
synRcvdCount.value--
synRcvdCount.pending.Done()
+ synRcvdCount.Unlock()
}
// newListenContext creates a new listen context.
-func newListenContext(stack *stack.Stack, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext {
+func newListenContext(stack *stack.Stack, listenEP *endpoint, rcvWnd seqnum.Size, v6only bool, netProto tcpip.NetworkProtocolNumber) *listenContext {
l := &listenContext{
stack: stack,
rcvWnd: rcvWnd,
hasher: sha1.New(),
v6only: v6only,
netProto: netProto,
+ listenEP: listenEP,
}
rand.Read(l.nonce[0][:])
@@ -195,9 +199,9 @@ func (l *listenContext) isCookieValid(id stack.TransportEndpointID, cookie seqnu
return (v - l.cookieHash(id, cookieTS, 1)) & hashMask, true
}
-// createConnectedEndpoint creates a new connected endpoint, with the connection
-// parameters given by the arguments.
-func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
+// createConnectingEndpoint creates a new endpoint in a connecting state, with
+// the connection parameters given by the arguments.
+func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, irs seqnum.Value, rcvdSynOpts *header.TCPSynOptions) (*endpoint, *tcpip.Error) {
// Create a new endpoint.
netProto := l.netProto
if netProto == 0 {
@@ -223,7 +227,7 @@ func (l *listenContext) createConnectedEndpoint(s *segment, iss seqnum.Value, ir
}
n.isRegistered = true
- n.state = stateConnected
+ n.state = stateConnecting
// Create sender and receiver.
//
@@ -241,7 +245,7 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// Create new endpoint.
irs := s.sequenceNumber
cookie := l.createCookie(s.id, irs, encodeMSS(opts.MSS))
- ep, err := l.createConnectedEndpoint(s, cookie, irs, opts)
+ ep, err := l.createConnectingEndpoint(s, cookie, irs, opts)
if err != nil {
return nil, err
}
@@ -249,12 +253,15 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// Perform the 3-way handshake.
h := newHandshake(ep, l.rcvWnd)
- h.resetToSynRcvd(cookie, irs, opts)
+ h.resetToSynRcvd(cookie, irs, opts, l.listenEP)
if err := h.execute(); err != nil {
+ ep.stack.Stats().TCP.FailedConnectionAttempts.Increment()
ep.Close()
return nil, err
}
+ ep.state = stateConnected
+
// Update the receive window scaling. We can't do it before the
// handshake because it's possible that the peer doesn't support window
// scaling.
@@ -268,13 +275,14 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
// instead.
func (e *endpoint) deliverAccepted(n *endpoint) {
e.mu.RLock()
- if e.state == stateListen {
+ state := e.state
+ e.mu.RUnlock()
+ if state == stateListen {
e.acceptedChan <- n
e.waiterQueue.Notify(waiter.EventIn)
} else {
n.Close()
}
- e.mu.RUnlock()
}
// handleSynSegment is called in its own goroutine once the listening endpoint
@@ -285,16 +293,36 @@ func (e *endpoint) deliverAccepted(n *endpoint) {
// cookies to accept connections.
func (e *endpoint) handleSynSegment(ctx *listenContext, s *segment, opts *header.TCPSynOptions) {
defer decSynRcvdCount()
+ defer e.decSynRcvdCount()
defer s.decRef()
n, err := ctx.createEndpointAndPerformHandshake(s, opts)
if err != nil {
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
return
}
e.deliverAccepted(n)
}
+func (e *endpoint) incSynRcvdCount() bool {
+ e.mu.Lock()
+ log.Printf("l: %d, c: %d, e.synRcvdCount: %d", len(e.acceptedChan), cap(e.acceptedChan), e.synRcvdCount)
+ if l, c := len(e.acceptedChan), cap(e.acceptedChan); l == c && e.synRcvdCount >= c {
+ e.mu.Unlock()
+ return false
+ }
+ e.synRcvdCount++
+ e.mu.Unlock()
+ return true
+}
+
+func (e *endpoint) decSynRcvdCount() {
+ e.mu.Lock()
+ e.synRcvdCount--
+ e.mu.Unlock()
+}
+
// handleListenSegment is called when a listening endpoint receives a segment
// and needs to handle it.
func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
@@ -302,9 +330,20 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
case header.TCPFlagSyn:
opts := parseSynSegmentOptions(s)
if incSynRcvdCount() {
- s.incRef()
- go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier.
+ // Drop the SYN if the listen endpoint's accept queue is
+ // overflowing.
+ if e.incSynRcvdCount() {
+ log.Printf("processing syn packet")
+ s.incRef()
+ go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier.
+ return
+ }
+ log.Printf("dropping syn packet")
+ e.stack.Stats().TCP.ListenOverflowSynDrop.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
} else {
+ // TODO(bhaskerh): Increment syncookie sent stat.
cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS))
// Send SYN with window scaling because we currently
// dont't encode this information in the cookie.
@@ -318,36 +357,72 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
TSEcr: opts.TSVal,
}
sendSynTCP(&s.route, s.id, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts)
+ e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment()
}
case header.TCPFlagAck:
- if data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1); ok && int(data) < len(mssTable) {
- // Create newly accepted endpoint and deliver it.
- rcvdSynOptions := &header.TCPSynOptions{
- MSS: mssTable[data],
- // Disable Window scaling as original SYN is
- // lost.
- WS: -1,
- }
- // When syn cookies are in use we enable timestamp only
- // if the ack specifies the timestamp option assuming
- // that the other end did in fact negotiate the
- // timestamp option in the original SYN.
- if s.parsedOptions.TS {
- rcvdSynOptions.TS = true
- rcvdSynOptions.TSVal = s.parsedOptions.TSVal
- rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr
- }
- n, err := ctx.createConnectedEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions)
- if err == nil {
- // clear the tsOffset for the newly created
- // endpoint as the Timestamp was already
- // randomly offset when the original SYN-ACK was
- // sent above.
- n.tsOffset = 0
- e.deliverAccepted(n)
- }
+ if len(e.acceptedChan) == cap(e.acceptedChan) {
+ // Silently drop the ack as the application can't accept
+ // the connection at this point. The ack will be
+ // retransmitted by the sender anyway and we can
+ // complete the connection at the time of retransmit if
+ // the backlog has space.
+ e.stack.Stats().TCP.ListenOverflowAckDrop.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
+ }
+
+ // Validate the cookie.
+ data, ok := ctx.isCookieValid(s.id, s.ackNumber-1, s.sequenceNumber-1)
+ if !ok || int(data) >= len(mssTable) {
+ e.stack.Stats().TCP.ListenOverflowInvalidSynCookieRcvd.Increment()
+ e.stack.Stats().DroppedPackets.Increment()
+ return
}
+ e.stack.Stats().TCP.ListenOverflowSynCookieRcvd.Increment()
+ // Create newly accepted endpoint and deliver it.
+ rcvdSynOptions := &header.TCPSynOptions{
+ MSS: mssTable[data],
+ // Disable Window scaling as original SYN is
+ // lost.
+ WS: -1,
+ }
+
+ // When syn cookies are in use we enable timestamp only
+ // if the ack specifies the timestamp option assuming
+ // that the other end did in fact negotiate the
+ // timestamp option in the original SYN.
+ if s.parsedOptions.TS {
+ rcvdSynOptions.TS = true
+ rcvdSynOptions.TSVal = s.parsedOptions.TSVal
+ rcvdSynOptions.TSEcr = s.parsedOptions.TSEcr
+ }
+
+ n, err := ctx.createConnectingEndpoint(s, s.ackNumber-1, s.sequenceNumber-1, rcvdSynOptions)
+ if err != nil {
+ e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
+ return
+ }
+
+ // clear the tsOffset for the newly created
+ // endpoint as the Timestamp was already
+ // randomly offset when the original SYN-ACK was
+ // sent above.
+ n.tsOffset = 0
+
+ // Switch state to connected.
+ n.state = stateConnected
+
+ // Do the delivery in a separate goroutine so
+ // that we don't block the listen loop in case
+ // the application is slow to accept or stops
+ // accepting.
+ //
+ // NOTE: This won't result in an unbounded
+ // number of goroutines as we do check before
+ // entering here that there was at least some
+ // space available in the backlog.
+ go e.deliverAccepted(n)
}
}
@@ -377,7 +452,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
v6only := e.v6only
e.mu.Unlock()
- ctx := newListenContext(e.stack, rcvWnd, v6only, e.netProto)
+ ctx := newListenContext(e.stack, e, rcvWnd, v6only, e.netProto)
s := sleep.Sleeper{}
s.AddWaker(&e.notificationWaker, wakerForNotification)