summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go168
-rw-r--r--pkg/tcpip/link/sniffer/sniffer.go10
-rw-r--r--pkg/tcpip/network/ipv4/ipv4.go4
-rw-r--r--pkg/tcpip/network/ipv6/ipv6.go4
-rw-r--r--pkg/tcpip/stack/route.go10
-rw-r--r--pkg/tcpip/tcpip.go4
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go6
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go5
-rw-r--r--pkg/tcpip/transport/tcp/accept.go12
-rw-r--r--pkg/tcpip/transport/tcp/connect.go47
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go174
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go42
-rw-r--r--pkg/tcpip/transport/tcp/rcv.go37
-rw-r--r--pkg/tcpip/transport/tcp/snd.go6
-rwxr-xr-xpkg/tcpip/transport/tcp/tcp_state_autogen.go4
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go6
16 files changed, 393 insertions, 146 deletions
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
index 1f889c2a0..b88e2e7bf 100644
--- a/pkg/tcpip/link/fdbased/endpoint.go
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -21,12 +21,29 @@
// FD based endpoints can be used in the networking stack by calling New() to
// create a new endpoint, and then passing it as an argument to
// Stack.CreateNIC().
+//
+// FD based endpoints can use more than one file descriptor to read incoming
+// packets. If there are more than one FDs specified and the underlying FD is an
+// AF_PACKET then the endpoint will enable FANOUT mode on the socket so that the
+// host kernel will consistently hash the packets to the sockets. This ensures
+// that packets for the same TCP streams are not reordered.
+//
+// Similarly if more than one FD's are specified where the underlying FD is not
+// AF_PACKET then it's the caller's responsibility to ensure that all inbound
+// packets on the descriptors are consistently 5 tuple hashed to one of the
+// descriptors to prevent TCP reordering.
+//
+// Since netstack today does not compute 5 tuple hashes for outgoing packets we
+// only use the first FD to write outbound packets. Once 5 tuple hashes for
+// all outbound packets are available we will make use of all underlying FD's to
+// write outbound packets.
package fdbased
import (
"fmt"
"syscall"
+ "golang.org/x/sys/unix"
"gvisor.googlesource.com/gvisor/pkg/tcpip"
"gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
"gvisor.googlesource.com/gvisor/pkg/tcpip/header"
@@ -65,8 +82,10 @@ const (
)
type endpoint struct {
- // fd is the file descriptor used to send and receive packets.
- fd int
+ // fds is the set of file descriptors each identifying one inbound/outbound
+ // channel. The endpoint will dispatch from all inbound channels as well as
+ // hash outbound packets to specific channels based on the packet hash.
+ fds []int
// mtu (maximum transmission unit) is the maximum size of a packet.
mtu uint32
@@ -85,8 +104,8 @@ type endpoint struct {
// its end of the communication pipe.
closed func(*tcpip.Error)
- inboundDispatcher linkDispatcher
- dispatcher stack.NetworkDispatcher
+ inboundDispatchers []linkDispatcher
+ dispatcher stack.NetworkDispatcher
// packetDispatchMode controls the packet dispatcher used by this
// endpoint.
@@ -99,17 +118,47 @@ type endpoint struct {
// Options specify the details about the fd-based endpoint to be created.
type Options struct {
- FD int
- MTU uint32
- EthernetHeader bool
- ClosedFunc func(*tcpip.Error)
- Address tcpip.LinkAddress
- SaveRestore bool
- DisconnectOk bool
- GSOMaxSize uint32
+ // FDs is a set of FDs used to read/write packets.
+ FDs []int
+
+ // MTU is the mtu to use for this endpoint.
+ MTU uint32
+
+ // EthernetHeader if true, indicates that the endpoint should read/write
+ // ethernet frames instead of IP packets.
+ EthernetHeader bool
+
+ // ClosedFunc is a function to be called when an endpoint's peer (if
+ // any) closes its end of the communication pipe.
+ ClosedFunc func(*tcpip.Error)
+
+ // Address is the link address for this endpoint. Only used if
+ // EthernetHeader is true.
+ Address tcpip.LinkAddress
+
+ // SaveRestore if true, indicates that this NIC capability set should
+ // include CapabilitySaveRestore
+ SaveRestore bool
+
+ // DisconnectOk if true, indicates that this NIC capability set should
+ // include CapabilityDisconnectOk.
+ DisconnectOk bool
+
+ // GSOMaxSize is the maximum GSO packet size. It is zero if GSO is
+ // disabled.
+ GSOMaxSize uint32
+
+ // PacketDispatchMode specifies the type of inbound dispatcher to be
+ // used for this endpoint.
PacketDispatchMode PacketDispatchMode
- TXChecksumOffload bool
- RXChecksumOffload bool
+
+ // TXChecksumOffload if true, indicates that this endpoints capability
+ // set should include CapabilityTXChecksumOffload.
+ TXChecksumOffload bool
+
+ // RXChecksumOffload if true, indicates that this endpoints capability
+ // set should include CapabilityRXChecksumOffload.
+ RXChecksumOffload bool
}
// New creates a new fd-based endpoint.
@@ -117,10 +166,6 @@ type Options struct {
// Makes fd non-blocking, but does not take ownership of fd, which must remain
// open for the lifetime of the returned endpoint.
func New(opts *Options) (tcpip.LinkEndpointID, error) {
- if err := syscall.SetNonblock(opts.FD, true); err != nil {
- return 0, fmt.Errorf("syscall.SetNonblock(%v) failed: %v", opts.FD, err)
- }
-
caps := stack.LinkEndpointCapabilities(0)
if opts.RXChecksumOffload {
caps |= stack.CapabilityRXChecksumOffload
@@ -144,8 +189,12 @@ func New(opts *Options) (tcpip.LinkEndpointID, error) {
caps |= stack.CapabilityDisconnectOk
}
+ if len(opts.FDs) == 0 {
+ return 0, fmt.Errorf("opts.FD is empty, at least one FD must be specified")
+ }
+
e := &endpoint{
- fd: opts.FD,
+ fds: opts.FDs,
mtu: opts.MTU,
caps: caps,
closed: opts.ClosedFunc,
@@ -154,46 +203,71 @@ func New(opts *Options) (tcpip.LinkEndpointID, error) {
packetDispatchMode: opts.PacketDispatchMode,
}
- isSocket, err := isSocketFD(e.fd)
- if err != nil {
- return 0, err
- }
- if isSocket {
- if opts.GSOMaxSize != 0 {
- e.caps |= stack.CapabilityGSO
- e.gsoMaxSize = opts.GSOMaxSize
+ // Create per channel dispatchers.
+ for i := 0; i < len(e.fds); i++ {
+ fd := e.fds[i]
+ if err := syscall.SetNonblock(fd, true); err != nil {
+ return 0, fmt.Errorf("syscall.SetNonblock(%v) failed: %v", fd, err)
}
- }
- e.inboundDispatcher, err = createInboundDispatcher(e, isSocket)
- if err != nil {
- return 0, fmt.Errorf("createInboundDispatcher(...) = %v", err)
+
+ isSocket, err := isSocketFD(fd)
+ if err != nil {
+ return 0, err
+ }
+ if isSocket {
+ if opts.GSOMaxSize != 0 {
+ e.caps |= stack.CapabilityGSO
+ e.gsoMaxSize = opts.GSOMaxSize
+ }
+ }
+ inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket)
+ if err != nil {
+ return 0, fmt.Errorf("createInboundDispatcher(...) = %v", err)
+ }
+ e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher)
}
return stack.RegisterLinkEndpoint(e), nil
}
-func createInboundDispatcher(e *endpoint, isSocket bool) (linkDispatcher, error) {
+func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher, error) {
// By default use the readv() dispatcher as it works with all kinds of
// FDs (tap/tun/unix domain sockets and af_packet).
- inboundDispatcher, err := newReadVDispatcher(e.fd, e)
+ inboundDispatcher, err := newReadVDispatcher(fd, e)
if err != nil {
- return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", e.fd, e, err)
+ return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", fd, e, err)
}
if isSocket {
+ sa, err := unix.Getsockname(fd)
+ if err != nil {
+ return nil, fmt.Errorf("unix.Getsockname(%d) = %v", fd, err)
+ }
+ switch sa.(type) {
+ case *unix.SockaddrLinklayer:
+ // enable PACKET_FANOUT mode is the underlying socket is
+ // of type AF_PACKET.
+ const fanoutID = 1
+ const fanoutType = 0x8000 // PACKET_FANOUT_HASH | PACKET_FANOUT_FLAG_DEFRAG
+ fanoutArg := fanoutID | fanoutType<<16
+ if err := syscall.SetsockoptInt(fd, syscall.SOL_PACKET, unix.PACKET_FANOUT, fanoutArg); err != nil {
+ return nil, fmt.Errorf("failed to enable PACKET_FANOUT option: %v", err)
+ }
+ }
+
switch e.packetDispatchMode {
case PacketMMap:
- inboundDispatcher, err = newPacketMMapDispatcher(e.fd, e)
+ inboundDispatcher, err = newPacketMMapDispatcher(fd, e)
if err != nil {
- return nil, fmt.Errorf("newPacketMMapDispatcher(%d, %+v) = %v", e.fd, e, err)
+ return nil, fmt.Errorf("newPacketMMapDispatcher(%d, %+v) = %v", fd, e, err)
}
case RecvMMsg:
// If the provided FD is a socket then we optimize
// packet reads by using recvmmsg() instead of read() to
// read packets in a batch.
- inboundDispatcher, err = newRecvMMsgDispatcher(e.fd, e)
+ inboundDispatcher, err = newRecvMMsgDispatcher(fd, e)
if err != nil {
- return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", e.fd, e, err)
+ return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", fd, e, err)
}
}
}
@@ -215,7 +289,9 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
// Link endpoints are not savable. When transportation endpoints are
// saved, they stop sending outgoing packets and all incoming packets
// are rejected.
- go e.dispatchLoop() // S/R-SAFE: See above.
+ for i := range e.inboundDispatchers {
+ go e.dispatchLoop(e.inboundDispatchers[i]) // S/R-SAFE: See above.
+ }
}
// IsAttached implements stack.LinkEndpoint.IsAttached.
@@ -305,26 +381,26 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen
}
}
- return rawfile.NonBlockingWrite3(e.fd, vnetHdrBuf, hdr.View(), payload.ToView())
+ return rawfile.NonBlockingWrite3(e.fds[0], vnetHdrBuf, hdr.View(), payload.ToView())
}
if payload.Size() == 0 {
- return rawfile.NonBlockingWrite(e.fd, hdr.View())
+ return rawfile.NonBlockingWrite(e.fds[0], hdr.View())
}
- return rawfile.NonBlockingWrite3(e.fd, hdr.View(), payload.ToView(), nil)
+ return rawfile.NonBlockingWrite3(e.fds[0], hdr.View(), payload.ToView(), nil)
}
// WriteRawPacket writes a raw packet directly to the file descriptor.
func (e *endpoint) WriteRawPacket(dest tcpip.Address, packet []byte) *tcpip.Error {
- return rawfile.NonBlockingWrite(e.fd, packet)
+ return rawfile.NonBlockingWrite(e.fds[0], packet)
}
// dispatchLoop reads packets from the file descriptor in a loop and dispatches
// them to the network stack.
-func (e *endpoint) dispatchLoop() *tcpip.Error {
+func (e *endpoint) dispatchLoop(inboundDispatcher linkDispatcher) *tcpip.Error {
for {
- cont, err := e.inboundDispatcher.dispatch()
+ cont, err := inboundDispatcher.dispatch()
if err != nil || !cont {
if e.closed != nil {
e.closed(err)
@@ -363,7 +439,7 @@ func NewInjectable(fd int, mtu uint32, capabilities stack.LinkEndpointCapabiliti
syscall.SetNonblock(fd, true)
e := &InjectableEndpoint{endpoint: endpoint{
- fd: fd,
+ fds: []int{fd},
mtu: mtu,
caps: capabilities,
}}
diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go
index fccabd554..98581e50e 100644
--- a/pkg/tcpip/link/sniffer/sniffer.go
+++ b/pkg/tcpip/link/sniffer/sniffer.go
@@ -118,7 +118,7 @@ func NewWithFile(lower tcpip.LinkEndpointID, file *os.File, snapLen uint32) (tcp
// logs the packet before forwarding to the actual dispatcher.
func (e *endpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) {
if atomic.LoadUint32(&LogPackets) == 1 && e.file == nil {
- logPacket("recv", protocol, vv.First())
+ logPacket("recv", protocol, vv.First(), nil)
}
if e.file != nil && atomic.LoadUint32(&LogPacketsToFile) == 1 {
vs := vv.Views()
@@ -198,7 +198,7 @@ func (e *endpoint) GSOMaxSize() uint32 {
// the request to the lower endpoint.
func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
if atomic.LoadUint32(&LogPackets) == 1 && e.file == nil {
- logPacket("send", protocol, hdr.View())
+ logPacket("send", protocol, hdr.View(), gso)
}
if e.file != nil && atomic.LoadUint32(&LogPacketsToFile) == 1 {
hdrBuf := hdr.View()
@@ -240,7 +240,7 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen
return e.lower.WritePacket(r, gso, hdr, payload, protocol)
}
-func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.View) {
+func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.View, gso *stack.GSO) {
// Figure out the network layer info.
var transProto uint8
src := tcpip.Address("unknown")
@@ -404,5 +404,9 @@ func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.Vie
return
}
+ if gso != nil {
+ details += fmt.Sprintf(" gso: %+v", gso)
+ }
+
log.Infof("%s %s %v:%v -> %v:%v len:%d id:%04x %s", prefix, transName, src, srcPort, dst, dstPort, size, id, details)
}
diff --git a/pkg/tcpip/network/ipv4/ipv4.go b/pkg/tcpip/network/ipv4/ipv4.go
index da07a39e5..44b1d5b9b 100644
--- a/pkg/tcpip/network/ipv4/ipv4.go
+++ b/pkg/tcpip/network/ipv4/ipv4.go
@@ -215,7 +215,9 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen
views[0] = hdr.View()
views = append(views, payload.Views()...)
vv := buffer.NewVectorisedView(len(views[0])+payload.Size(), views)
- e.HandlePacket(r, vv)
+ loopedR := r.MakeLoopedRoute()
+ e.HandlePacket(&loopedR, vv)
+ loopedR.Release()
}
if loop&stack.PacketOut == 0 {
return nil
diff --git a/pkg/tcpip/network/ipv6/ipv6.go b/pkg/tcpip/network/ipv6/ipv6.go
index 4b8cd496b..bcae98e1f 100644
--- a/pkg/tcpip/network/ipv6/ipv6.go
+++ b/pkg/tcpip/network/ipv6/ipv6.go
@@ -108,7 +108,9 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen
views[0] = hdr.View()
views = append(views, payload.Views()...)
vv := buffer.NewVectorisedView(len(views[0])+payload.Size(), views)
- e.HandlePacket(r, vv)
+ loopedR := r.MakeLoopedRoute()
+ e.HandlePacket(&loopedR, vv)
+ loopedR.Release()
}
if loop&stack.PacketOut == 0 {
return nil
diff --git a/pkg/tcpip/stack/route.go b/pkg/tcpip/stack/route.go
index 3d4c282a9..55ed02479 100644
--- a/pkg/tcpip/stack/route.go
+++ b/pkg/tcpip/stack/route.go
@@ -187,3 +187,13 @@ func (r *Route) Clone() Route {
r.ref.incRef()
return *r
}
+
+// MakeLoopedRoute duplicates the given route and tweaks it in case of multicast.
+func (r *Route) MakeLoopedRoute() Route {
+ l := r.Clone()
+ if header.IsV4MulticastAddress(r.RemoteAddress) || header.IsV6MulticastAddress(r.RemoteAddress) {
+ l.RemoteAddress, l.LocalAddress = l.LocalAddress, l.RemoteAddress
+ l.RemoteLinkAddress = l.LocalLinkAddress
+ }
+ return l
+}
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index f9886c6e4..85ef014d0 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -377,6 +377,10 @@ type Endpoint interface {
// GetSockOpt gets a socket option. opt should be a pointer to one of the
// *Option types.
GetSockOpt(opt interface{}) *Error
+
+ // State returns a socket's lifecycle state. The returned value is
+ // protocol-specific and is primarily used for diagnostics.
+ State() uint32
}
// WriteOptions contains options for Endpoint.Write.
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index e2b90ef10..b8005093a 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -708,3 +708,9 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
// HandleControlPacket implements stack.TransportEndpoint.HandleControlPacket.
func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.ControlType, extra uint32, vv buffer.VectorisedView) {
}
+
+// State implements tcpip.Endpoint.State. The ICMP endpoint currently doesn't
+// expose internal socket state.
+func (e *endpoint) State() uint32 {
+ return 0
+}
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index 1daf5823f..e4ff50c91 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -519,3 +519,8 @@ func (ep *endpoint) HandlePacket(route *stack.Route, netHeader buffer.View, vv b
ep.waiterQueue.Notify(waiter.EventIn)
}
}
+
+// State implements socket.Socket.State.
+func (ep *endpoint) State() uint32 {
+ return 0
+}
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 31e365ae5..a32e20b06 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -226,7 +226,6 @@ func (l *listenContext) createConnectingEndpoint(s *segment, iss seqnum.Value, i
}
n.isRegistered = true
- n.state = stateConnecting
// Create sender and receiver.
//
@@ -258,8 +257,9 @@ func (l *listenContext) createEndpointAndPerformHandshake(s *segment, opts *head
ep.Close()
return nil, err
}
-
- ep.state = stateConnected
+ ep.mu.Lock()
+ ep.state = StateEstablished
+ ep.mu.Unlock()
// Update the receive window scaling. We can't do it before the
// handshake because it's possible that the peer doesn't support window
@@ -276,7 +276,7 @@ func (e *endpoint) deliverAccepted(n *endpoint) {
e.mu.RLock()
state := e.state
e.mu.RUnlock()
- if state == stateListen {
+ if state == StateListen {
e.acceptedChan <- n
e.waiterQueue.Notify(waiter.EventIn)
} else {
@@ -406,7 +406,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
n.tsOffset = 0
// Switch state to connected.
- n.state = stateConnected
+ n.state = StateEstablished
// Do the delivery in a separate goroutine so
// that we don't block the listen loop in case
@@ -429,7 +429,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
// handleSynSegment() from attempting to queue new connections
// to the endpoint.
e.mu.Lock()
- e.state = stateClosed
+ e.state = StateClose
// Do cleanup if needed.
e.completeWorkerLocked()
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 2aed6f286..0ad7bfb38 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -151,6 +151,9 @@ func (h *handshake) resetToSynRcvd(iss seqnum.Value, irs seqnum.Value, opts *hea
h.mss = opts.MSS
h.sndWndScale = opts.WS
h.listenEP = listenEP
+ h.ep.mu.Lock()
+ h.ep.state = StateSynRecv
+ h.ep.mu.Unlock()
}
// checkAck checks if the ACK number, if present, of a segment received during
@@ -219,6 +222,9 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error {
// but resend our own SYN and wait for it to be acknowledged in the
// SYN-RCVD state.
h.state = handshakeSynRcvd
+ h.ep.mu.Lock()
+ h.ep.state = StateSynRecv
+ h.ep.mu.Unlock()
synOpts := header.TCPSynOptions{
WS: h.rcvWndScale,
TS: rcvSynOpts.TS,
@@ -284,14 +290,19 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error {
// listenContext is also used by a tcp.Forwarder and in that
// context we do not have a listening endpoint to check the
// backlog. So skip this check if listenEP is nil.
- if h.listenEP != nil && len(h.listenEP.acceptedChan) == cap(h.listenEP.acceptedChan) {
- // If there is no space in the accept queue to accept
- // this endpoint then silently drop this ACK. The peer
- // will anyway resend the ack and we can complete the
- // connection the next time it's retransmitted.
- h.ep.stack.Stats().TCP.ListenOverflowAckDrop.Increment()
- h.ep.stack.Stats().DroppedPackets.Increment()
- return nil
+ if h.listenEP != nil {
+ h.listenEP.mu.Lock()
+ if len(h.listenEP.acceptedChan) == cap(h.listenEP.acceptedChan) {
+ h.listenEP.mu.Unlock()
+ // If there is no space in the accept queue to accept
+ // this endpoint then silently drop this ACK. The peer
+ // will anyway resend the ack and we can complete the
+ // connection the next time it's retransmitted.
+ h.ep.stack.Stats().TCP.ListenOverflowAckDrop.Increment()
+ h.ep.stack.Stats().DroppedPackets.Increment()
+ return nil
+ }
+ h.listenEP.mu.Unlock()
}
// If the timestamp option is negotiated and the segment does
// not carry a timestamp option then the segment must be dropped
@@ -663,7 +674,7 @@ func (e *endpoint) makeOptions(sackBlocks []header.SACKBlock) []byte {
// sendRaw sends a TCP segment to the endpoint's peer.
func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size) *tcpip.Error {
var sackBlocks []header.SACKBlock
- if e.state == stateConnected && e.rcv.pendingBufSize > 0 && (flags&header.TCPFlagAck != 0) {
+ if e.state == StateEstablished && e.rcv.pendingBufSize > 0 && (flags&header.TCPFlagAck != 0) {
sackBlocks = e.sack.Blocks[:e.sack.NumBlocks]
}
options := e.makeOptions(sackBlocks)
@@ -714,8 +725,7 @@ func (e *endpoint) handleClose() *tcpip.Error {
// protocol goroutine.
func (e *endpoint) resetConnectionLocked(err *tcpip.Error) {
e.sendRaw(buffer.VectorisedView{}, header.TCPFlagAck|header.TCPFlagRst, e.snd.sndUna, e.rcv.rcvNxt, 0)
-
- e.state = stateError
+ e.state = StateError
e.hardError = err
}
@@ -871,14 +881,19 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// handshake, and then inform potential waiters about its
// completion.
h := newHandshake(e, seqnum.Size(e.receiveBufferAvailable()))
+ e.mu.Lock()
+ h.ep.state = StateSynSent
+ e.mu.Unlock()
+
if err := h.execute(); err != nil {
e.lastErrorMu.Lock()
e.lastError = err
e.lastErrorMu.Unlock()
e.mu.Lock()
- e.state = stateError
+ e.state = StateError
e.hardError = err
+
// Lock released below.
epilogue()
@@ -900,7 +915,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// Tell waiters that the endpoint is connected and writable.
e.mu.Lock()
- e.state = stateConnected
+ e.state = StateEstablished
drained := e.drainDone != nil
e.mu.Unlock()
if drained {
@@ -1000,7 +1015,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
return err
}
}
- if e.state != stateError {
+ if e.state != StateError {
close(e.drainDone)
<-e.undrain
}
@@ -1056,8 +1071,8 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
// Mark endpoint as closed.
e.mu.Lock()
- if e.state != stateError {
- e.state = stateClosed
+ if e.state != StateError {
+ e.state = StateClose
}
// Lock released below.
epilogue()
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index fd697402e..23422ca5e 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -32,18 +32,81 @@ import (
"gvisor.googlesource.com/gvisor/pkg/waiter"
)
-type endpointState int
+// EndpointState represents the state of a TCP endpoint.
+type EndpointState uint32
+// Endpoint states. Note that are represented in a netstack-specific manner and
+// may not be meaningful externally. Specifically, they need to be translated to
+// Linux's representation for these states if presented to userspace.
const (
- stateInitial endpointState = iota
- stateBound
- stateListen
- stateConnecting
- stateConnected
- stateClosed
- stateError
+ // Endpoint states internal to netstack. These map to the TCP state CLOSED.
+ StateInitial EndpointState = iota
+ StateBound
+ StateConnecting // Connect() called, but the initial SYN hasn't been sent.
+ StateError
+
+ // TCP protocol states.
+ StateEstablished
+ StateSynSent
+ StateSynRecv
+ StateFinWait1
+ StateFinWait2
+ StateTimeWait
+ StateClose
+ StateCloseWait
+ StateLastAck
+ StateListen
+ StateClosing
)
+// connected is the set of states where an endpoint is connected to a peer.
+func (s EndpointState) connected() bool {
+ switch s {
+ case StateEstablished, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
+ return true
+ default:
+ return false
+ }
+}
+
+// String implements fmt.Stringer.String.
+func (s EndpointState) String() string {
+ switch s {
+ case StateInitial:
+ return "INITIAL"
+ case StateBound:
+ return "BOUND"
+ case StateConnecting:
+ return "CONNECTING"
+ case StateError:
+ return "ERROR"
+ case StateEstablished:
+ return "ESTABLISHED"
+ case StateSynSent:
+ return "SYN-SENT"
+ case StateSynRecv:
+ return "SYN-RCVD"
+ case StateFinWait1:
+ return "FIN-WAIT1"
+ case StateFinWait2:
+ return "FIN-WAIT2"
+ case StateTimeWait:
+ return "TIME-WAIT"
+ case StateClose:
+ return "CLOSED"
+ case StateCloseWait:
+ return "CLOSE-WAIT"
+ case StateLastAck:
+ return "LAST-ACK"
+ case StateListen:
+ return "LISTEN"
+ case StateClosing:
+ return "CLOSING"
+ default:
+ panic("unreachable")
+ }
+}
+
// Reasons for notifying the protocol goroutine.
const (
notifyNonZeroReceiveWindow = 1 << iota
@@ -108,10 +171,14 @@ type endpoint struct {
rcvBufUsed int
// The following fields are protected by the mutex.
- mu sync.RWMutex `state:"nosave"`
- id stack.TransportEndpointID
- state endpointState `state:".(endpointState)"`
- isPortReserved bool `state:"manual"`
+ mu sync.RWMutex `state:"nosave"`
+ id stack.TransportEndpointID
+
+ // state endpointState `state:".(endpointState)"`
+ // pState ProtocolState
+ state EndpointState `state:".(EndpointState)"`
+
+ isPortReserved bool `state:"manual"`
isRegistered bool
boundNICID tcpip.NICID `state:"manual"`
route stack.Route `state:"manual"`
@@ -304,6 +371,7 @@ func newEndpoint(stack *stack.Stack, netProto tcpip.NetworkProtocolNumber, waite
stack: stack,
netProto: netProto,
waiterQueue: waiterQueue,
+ state: StateInitial,
rcvBufSize: DefaultBufferSize,
sndBufSize: DefaultBufferSize,
sndMTU: int(math.MaxInt32),
@@ -351,14 +419,14 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
defer e.mu.RUnlock()
switch e.state {
- case stateInitial, stateBound, stateConnecting:
+ case StateInitial, StateBound, StateConnecting, StateSynSent, StateSynRecv:
// Ready for nothing.
- case stateClosed, stateError:
+ case StateClose, StateError:
// Ready for anything.
result = mask
- case stateListen:
+ case StateListen:
// Check if there's anything in the accepted channel.
if (mask & waiter.EventIn) != 0 {
if len(e.acceptedChan) > 0 {
@@ -366,7 +434,7 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
}
}
- case stateConnected:
+ case StateEstablished, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
// Determine if the endpoint is writable if requested.
if (mask & waiter.EventOut) != 0 {
e.sndBufMu.Lock()
@@ -427,7 +495,7 @@ func (e *endpoint) Close() {
// are immediately available for reuse after Close() is called. If also
// registered, we unregister as well otherwise the next user would fail
// in Listen() when trying to register.
- if e.state == stateListen && e.isPortReserved {
+ if e.state == StateListen && e.isPortReserved {
if e.isRegistered {
e.stack.UnregisterTransportEndpoint(e.boundNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e)
e.isRegistered = false
@@ -487,15 +555,15 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages,
e.mu.RLock()
// The endpoint can be read if it's connected, or if it's already closed
// but has some pending unread data. Also note that a RST being received
- // would cause the state to become stateError so we should allow the
+ // would cause the state to become StateError so we should allow the
// reads to proceed before returning a ECONNRESET.
e.rcvListMu.Lock()
bufUsed := e.rcvBufUsed
- if s := e.state; s != stateConnected && s != stateClosed && bufUsed == 0 {
+ if s := e.state; !s.connected() && s != StateClose && bufUsed == 0 {
e.rcvListMu.Unlock()
he := e.hardError
e.mu.RUnlock()
- if s == stateError {
+ if s == StateError {
return buffer.View{}, tcpip.ControlMessages{}, he
}
return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
@@ -511,7 +579,7 @@ func (e *endpoint) Read(*tcpip.FullAddress) (buffer.View, tcpip.ControlMessages,
func (e *endpoint) readLocked() (buffer.View, *tcpip.Error) {
if e.rcvBufUsed == 0 {
- if e.rcvClosed || e.state != stateConnected {
+ if e.rcvClosed || !e.state.connected() {
return buffer.View{}, tcpip.ErrClosedForReceive
}
return buffer.View{}, tcpip.ErrWouldBlock
@@ -547,9 +615,9 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (uintptr, <-c
defer e.mu.RUnlock()
// The endpoint cannot be written to if it's not connected.
- if e.state != stateConnected {
+ if !e.state.connected() {
switch e.state {
- case stateError:
+ case StateError:
return 0, nil, e.hardError
default:
return 0, nil, tcpip.ErrClosedForSend
@@ -612,8 +680,8 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Er
// The endpoint can be read if it's connected, or if it's already closed
// but has some pending unread data.
- if s := e.state; s != stateConnected && s != stateClosed {
- if s == stateError {
+ if s := e.state; !s.connected() && s != StateClose {
+ if s == StateError {
return 0, tcpip.ControlMessages{}, e.hardError
}
return 0, tcpip.ControlMessages{}, tcpip.ErrInvalidEndpointState
@@ -623,7 +691,7 @@ func (e *endpoint) Peek(vec [][]byte) (uintptr, tcpip.ControlMessages, *tcpip.Er
defer e.rcvListMu.Unlock()
if e.rcvBufUsed == 0 {
- if e.rcvClosed || e.state != stateConnected {
+ if e.rcvClosed || !e.state.connected() {
return 0, tcpip.ControlMessages{}, tcpip.ErrClosedForReceive
}
return 0, tcpip.ControlMessages{}, tcpip.ErrWouldBlock
@@ -789,7 +857,7 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
defer e.mu.Unlock()
// We only allow this to be set when we're in the initial state.
- if e.state != stateInitial {
+ if e.state != StateInitial {
return tcpip.ErrInvalidEndpointState
}
@@ -841,7 +909,7 @@ func (e *endpoint) readyReceiveSize() (int, *tcpip.Error) {
defer e.mu.RUnlock()
// The endpoint cannot be in listen state.
- if e.state == stateListen {
+ if e.state == StateListen {
return 0, tcpip.ErrInvalidEndpointState
}
@@ -1057,7 +1125,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
nicid := addr.NIC
switch e.state {
- case stateBound:
+ case StateBound:
// If we're already bound to a NIC but the caller is requesting
// that we use a different one now, we cannot proceed.
if e.boundNICID == 0 {
@@ -1070,16 +1138,16 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
nicid = e.boundNICID
- case stateInitial:
- // Nothing to do. We'll eventually fill-in the gaps in the ID
- // (if any) when we find a route.
+ case StateInitial:
+ // Nothing to do. We'll eventually fill-in the gaps in the ID (if any)
+ // when we find a route.
- case stateConnecting:
- // A connection request has already been issued but hasn't
- // completed yet.
+ case StateConnecting, StateSynSent, StateSynRecv:
+ // A connection request has already been issued but hasn't completed
+ // yet.
return tcpip.ErrAlreadyConnecting
- case stateConnected:
+ case StateEstablished:
// The endpoint is already connected. If caller hasn't been notified yet, return success.
if !e.isConnectNotified {
e.isConnectNotified = true
@@ -1088,7 +1156,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
// Otherwise return that it's already connected.
return tcpip.ErrAlreadyConnected
- case stateError:
+ case StateError:
return e.hardError
default:
@@ -1154,7 +1222,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
}
e.isRegistered = true
- e.state = stateConnecting
+ e.state = StateConnecting
e.route = r.Clone()
e.boundNICID = nicid
e.effectiveNetProtos = netProtos
@@ -1175,7 +1243,7 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) (er
}
e.segmentQueue.mu.Unlock()
e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0)
- e.state = stateConnected
+ e.state = StateEstablished
}
if run {
@@ -1199,8 +1267,8 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
defer e.mu.Unlock()
e.shutdownFlags |= flags
- switch e.state {
- case stateConnected:
+ switch {
+ case e.state.connected():
// Close for read.
if (e.shutdownFlags & tcpip.ShutdownRead) != 0 {
// Mark read side as closed.
@@ -1241,7 +1309,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error {
e.sndCloseWaker.Assert()
}
- case stateListen:
+ case e.state == StateListen:
// Tell protocolListenLoop to stop.
if flags&tcpip.ShutdownRead != 0 {
e.notifyProtocolGoroutine(notifyClose)
@@ -1269,7 +1337,7 @@ func (e *endpoint) Listen(backlog int) (err *tcpip.Error) {
// When the endpoint shuts down, it sets workerCleanup to true, and from
// that point onward, acceptedChan is the responsibility of the cleanup()
// method (and should not be touched anywhere else, including here).
- if e.state == stateListen && !e.workerCleanup {
+ if e.state == StateListen && !e.workerCleanup {
// Adjust the size of the channel iff we can fix existing
// pending connections into the new one.
if len(e.acceptedChan) > backlog {
@@ -1288,7 +1356,7 @@ func (e *endpoint) Listen(backlog int) (err *tcpip.Error) {
}
// Endpoint must be bound before it can transition to listen mode.
- if e.state != stateBound {
+ if e.state != StateBound {
return tcpip.ErrInvalidEndpointState
}
@@ -1298,7 +1366,7 @@ func (e *endpoint) Listen(backlog int) (err *tcpip.Error) {
}
e.isRegistered = true
- e.state = stateListen
+ e.state = StateListen
if e.acceptedChan == nil {
e.acceptedChan = make(chan *endpoint, backlog)
}
@@ -1325,7 +1393,7 @@ func (e *endpoint) Accept() (tcpip.Endpoint, *waiter.Queue, *tcpip.Error) {
defer e.mu.RUnlock()
// Endpoint must be in listen state before it can accept connections.
- if e.state != stateListen {
+ if e.state != StateListen {
return nil, nil, tcpip.ErrInvalidEndpointState
}
@@ -1353,7 +1421,7 @@ func (e *endpoint) Bind(addr tcpip.FullAddress) (err *tcpip.Error) {
// Don't allow binding once endpoint is not in the initial state
// anymore. This is because once the endpoint goes into a connected or
// listen state, it is already bound.
- if e.state != stateInitial {
+ if e.state != StateInitial {
return tcpip.ErrAlreadyBound
}
@@ -1408,7 +1476,7 @@ func (e *endpoint) Bind(addr tcpip.FullAddress) (err *tcpip.Error) {
}
// Mark endpoint as bound.
- e.state = stateBound
+ e.state = StateBound
return nil
}
@@ -1430,7 +1498,7 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) {
e.mu.RLock()
defer e.mu.RUnlock()
- if e.state != stateConnected {
+ if !e.state.connected() {
return tcpip.FullAddress{}, tcpip.ErrNotConnected
}
@@ -1739,3 +1807,11 @@ func (e *endpoint) initGSO() {
gso.MaxSize = e.route.GSOMaxSize()
e.gso = gso
}
+
+// State implements tcpip.Endpoint.State. It exports the endpoint's protocol
+// state for diagnostics.
+func (e *endpoint) State() uint32 {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return uint32(e.state)
+}
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index e8aed2875..5f30c2374 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -49,8 +49,8 @@ func (e *endpoint) beforeSave() {
defer e.mu.Unlock()
switch e.state {
- case stateInitial, stateBound:
- case stateConnected:
+ case StateInitial, StateBound:
+ case StateEstablished, StateSynSent, StateSynRecv, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
if e.route.Capabilities()&stack.CapabilitySaveRestore == 0 {
if e.route.Capabilities()&stack.CapabilityDisconnectOk == 0 {
panic(tcpip.ErrSaveRejection{fmt.Errorf("endpoint cannot be saved in connected state: local %v:%d, remote %v:%d", e.id.LocalAddress, e.id.LocalPort, e.id.RemoteAddress, e.id.RemotePort)})
@@ -66,17 +66,17 @@ func (e *endpoint) beforeSave() {
break
}
fallthrough
- case stateListen, stateConnecting:
+ case StateListen, StateConnecting:
e.drainSegmentLocked()
- if e.state != stateClosed && e.state != stateError {
+ if e.state != StateClose && e.state != StateError {
if !e.workerRunning {
panic("endpoint has no worker running in listen, connecting, or connected state")
}
break
}
fallthrough
- case stateError, stateClosed:
- for e.state == stateError && e.workerRunning {
+ case StateError, StateClose:
+ for e.state == StateError && e.workerRunning {
e.mu.Unlock()
time.Sleep(100 * time.Millisecond)
e.mu.Lock()
@@ -92,7 +92,7 @@ func (e *endpoint) beforeSave() {
panic("endpoint still has waiters upon save")
}
- if e.state != stateClosed && !((e.state == stateBound || e.state == stateListen) == e.isPortReserved) {
+ if e.state != StateClose && !((e.state == StateBound || e.state == StateListen) == e.isPortReserved) {
panic("endpoints which are not in the closed state must have a reserved port IFF they are in bound or listen state")
}
}
@@ -132,7 +132,7 @@ func (e *endpoint) loadAcceptedChan(acceptedEndpoints []*endpoint) {
}
// saveState is invoked by stateify.
-func (e *endpoint) saveState() endpointState {
+func (e *endpoint) saveState() EndpointState {
return e.state
}
@@ -146,15 +146,15 @@ var connectingLoading sync.WaitGroup
// Bound endpoint loading happens last.
// loadState is invoked by stateify.
-func (e *endpoint) loadState(state endpointState) {
+func (e *endpoint) loadState(state EndpointState) {
// This is to ensure that the loading wait groups include all applicable
// endpoints before any asynchronous calls to the Wait() methods.
switch state {
- case stateConnected:
+ case StateEstablished, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
connectedLoading.Add(1)
- case stateListen:
+ case StateListen:
listenLoading.Add(1)
- case stateConnecting:
+ case StateConnecting, StateSynSent, StateSynRecv:
connectingLoading.Add(1)
}
e.state = state
@@ -168,7 +168,7 @@ func (e *endpoint) afterLoad() {
state := e.state
switch state {
- case stateInitial, stateBound, stateListen, stateConnecting, stateConnected:
+ case StateInitial, StateBound, StateListen, StateConnecting, StateEstablished:
var ss SendBufferSizeOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
if e.sndBufSize < ss.Min || e.sndBufSize > ss.Max {
@@ -181,7 +181,7 @@ func (e *endpoint) afterLoad() {
}
bind := func() {
- e.state = stateInitial
+ e.state = StateInitial
if len(e.bindAddress) == 0 {
e.bindAddress = e.id.LocalAddress
}
@@ -191,7 +191,7 @@ func (e *endpoint) afterLoad() {
}
switch state {
- case stateConnected:
+ case StateEstablished, StateFinWait1, StateFinWait2, StateTimeWait, StateCloseWait, StateLastAck, StateClosing:
bind()
if len(e.connectingAddress) == 0 {
// This endpoint is accepted by netstack but not yet by
@@ -211,7 +211,7 @@ func (e *endpoint) afterLoad() {
panic("endpoint connecting failed: " + err.String())
}
connectedLoading.Done()
- case stateListen:
+ case StateListen:
tcpip.AsyncLoading.Add(1)
go func() {
connectedLoading.Wait()
@@ -223,7 +223,7 @@ func (e *endpoint) afterLoad() {
listenLoading.Done()
tcpip.AsyncLoading.Done()
}()
- case stateConnecting:
+ case StateConnecting, StateSynSent, StateSynRecv:
tcpip.AsyncLoading.Add(1)
go func() {
connectedLoading.Wait()
@@ -235,7 +235,7 @@ func (e *endpoint) afterLoad() {
connectingLoading.Done()
tcpip.AsyncLoading.Done()
}()
- case stateBound:
+ case StateBound:
tcpip.AsyncLoading.Add(1)
go func() {
connectedLoading.Wait()
@@ -244,7 +244,7 @@ func (e *endpoint) afterLoad() {
bind()
tcpip.AsyncLoading.Done()
}()
- case stateClosed:
+ case StateClose:
if e.isPortReserved {
tcpip.AsyncLoading.Add(1)
go func() {
@@ -252,12 +252,12 @@ func (e *endpoint) afterLoad() {
listenLoading.Wait()
connectingLoading.Wait()
bind()
- e.state = stateClosed
+ e.state = StateClose
tcpip.AsyncLoading.Done()
}()
}
fallthrough
- case stateError:
+ case StateError:
tcpip.DeleteDanglingEndpoint(e)
}
}
diff --git a/pkg/tcpip/transport/tcp/rcv.go b/pkg/tcpip/transport/tcp/rcv.go
index b08a0e356..f02fa6105 100644
--- a/pkg/tcpip/transport/tcp/rcv.go
+++ b/pkg/tcpip/transport/tcp/rcv.go
@@ -134,6 +134,7 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
// sequence numbers that have been consumed.
TrimSACKBlockList(&r.ep.sack, r.rcvNxt)
+ // Handle FIN or FIN-ACK.
if s.flagIsSet(header.TCPFlagFin) {
r.rcvNxt++
@@ -144,6 +145,25 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
r.closed = true
r.ep.readyToRead(nil)
+ // We just received a FIN, our next state depends on whether we sent a
+ // FIN already or not.
+ r.ep.mu.Lock()
+ switch r.ep.state {
+ case StateEstablished:
+ r.ep.state = StateCloseWait
+ case StateFinWait1:
+ if s.flagIsSet(header.TCPFlagAck) {
+ // FIN-ACK, transition to TIME-WAIT.
+ r.ep.state = StateTimeWait
+ } else {
+ // Simultaneous close, expecting a final ACK.
+ r.ep.state = StateClosing
+ }
+ case StateFinWait2:
+ r.ep.state = StateTimeWait
+ }
+ r.ep.mu.Unlock()
+
// Flush out any pending segments, except the very first one if
// it happens to be the one we're handling now because the
// caller is using it.
@@ -156,6 +176,23 @@ func (r *receiver) consumeSegment(s *segment, segSeq seqnum.Value, segLen seqnum
r.pendingRcvdSegments[i].decRef()
}
r.pendingRcvdSegments = r.pendingRcvdSegments[:first]
+
+ return true
+ }
+
+ // Handle ACK (not FIN-ACK, which we handled above) during one of the
+ // shutdown states.
+ if s.flagIsSet(header.TCPFlagAck) {
+ r.ep.mu.Lock()
+ switch r.ep.state {
+ case StateFinWait1:
+ r.ep.state = StateFinWait2
+ case StateClosing:
+ r.ep.state = StateTimeWait
+ case StateLastAck:
+ r.ep.state = StateClose
+ }
+ r.ep.mu.Unlock()
}
return true
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index afc1d0a55..b236d7af2 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -632,6 +632,10 @@ func (s *sender) maybeSendSegment(seg *segment, limit int, end seqnum.Value) (se
}
seg.flags = header.TCPFlagAck | header.TCPFlagFin
segEnd = seg.sequenceNumber.Add(1)
+ // Transition to FIN-WAIT1 state since we're initiating an active close.
+ s.ep.mu.Lock()
+ s.ep.state = StateFinWait1
+ s.ep.mu.Unlock()
} else {
// We're sending a non-FIN segment.
if seg.flags&header.TCPFlagFin != 0 {
@@ -779,7 +783,7 @@ func (s *sender) sendData() {
break
}
dataSent = true
- s.outstanding++
+ s.outstanding += s.pCount(seg)
s.writeNext = seg.Next()
}
}
diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go
index 9049a99b2..5d7e11715 100755
--- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go
+++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go
@@ -24,7 +24,7 @@ func (x *endpoint) save(m state.Map) {
x.beforeSave()
var lastError string = x.saveLastError()
m.SaveValue("lastError", lastError)
- var state endpointState = x.saveState()
+ var state EndpointState = x.saveState()
m.SaveValue("state", state)
var hardError string = x.saveHardError()
m.SaveValue("hardError", hardError)
@@ -116,7 +116,7 @@ func (x *endpoint) load(m state.Map) {
m.Load("connectingAddress", &x.connectingAddress)
m.Load("gso", &x.gso)
m.LoadValue("lastError", new(string), func(y interface{}) { x.loadLastError(y.(string)) })
- m.LoadValue("state", new(endpointState), func(y interface{}) { x.loadState(y.(endpointState)) })
+ m.LoadValue("state", new(EndpointState), func(y interface{}) { x.loadState(y.(EndpointState)) })
m.LoadValue("hardError", new(string), func(y interface{}) { x.loadHardError(y.(string)) })
m.LoadValue("acceptedChan", new([]*endpoint), func(y interface{}) { x.loadAcceptedChan(y.([]*endpoint)) })
m.AfterLoad(x.afterLoad)
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index 3d52a4f31..fa7278286 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -1000,3 +1000,9 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv
// HandleControlPacket implements stack.TransportEndpoint.HandleControlPacket.
func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.ControlType, extra uint32, vv buffer.VectorisedView) {
}
+
+// State implements socket.Socket.State.
+func (e *endpoint) State() uint32 {
+ // TODO(b/112063468): Translate internal state to values returned by Linux.
+ return 0
+}