summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport/tcp/endpoint.go
diff options
context:
space:
mode:
authorZhaozhong Ni <nzz@google.com>2018-07-10 09:22:37 -0700
committerShentubot <shentubot@google.com>2018-07-10 09:23:35 -0700
commitb1683df90bf81974e9e309ed66edaff30537c1be (patch)
tree728061e78466951d1f069e5a73358f84aa16d6c0 /pkg/tcpip/transport/tcp/endpoint.go
parentafd655a5d8b9d9bc747ee99b1ec2475cc526c996 (diff)
netstack: tcp socket connected state S/R support.
PiperOrigin-RevId: 203958972 Change-Id: Ia6fe16547539296d48e2c6731edacdd96bd6e93c
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go72
1 files changed, 57 insertions, 15 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index cb105b863..8b9a81f6a 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -80,7 +80,7 @@ type endpoint struct {
// change throughout the lifetime of the endpoint.
stack *stack.Stack `state:"manual"`
netProto tcpip.NetworkProtocolNumber
- waiterQueue *waiter.Queue
+ waiterQueue *waiter.Queue `state:"wait"`
// lastError represents the last error that the endpoint reported;
// access to it is protected by the following mutex.
@@ -95,8 +95,8 @@ type endpoint struct {
// to indicate to users that no more data is coming.
//
// rcvListMu can be taken after the endpoint mu below.
- rcvListMu sync.Mutex `state:"nosave"`
- rcvList segmentList
+ rcvListMu sync.Mutex `state:"nosave"`
+ rcvList segmentList `state:"wait"`
rcvClosed bool
rcvBufSize int
rcvBufUsed int
@@ -104,8 +104,8 @@ type endpoint struct {
// The following fields are protected by the mutex.
mu sync.RWMutex `state:"nosave"`
id stack.TransportEndpointID
- state endpointState
- isPortReserved bool `state:"manual"`
+ state endpointState `state:".(endpointState)"`
+ isPortReserved bool `state:"manual"`
isRegistered bool
boundNICID tcpip.NICID `state:"manual"`
route stack.Route `state:"manual"`
@@ -131,7 +131,7 @@ type endpoint struct {
// workerCleanup specifies if the worker goroutine must perform cleanup
// before exitting. This can only be set to true when workerRunning is
// also true, and they're both protected by the mutex.
- workerCleanup bool `state:"zerovalue"`
+ workerCleanup bool
// sendTSOk is used to indicate when the TS Option has been negotiated.
// When sendTSOk is true every non-RST segment should carry a TS as per
@@ -166,7 +166,7 @@ type endpoint struct {
// segmentQueue is used to hand received segments to the protocol
// goroutine. Segments are queued as long as the queue is not full,
// and dropped when it is.
- segmentQueue segmentQueue `state:"zerovalue"`
+ segmentQueue segmentQueue `state:"wait"`
// The following fields are used to manage the send buffer. When
// segments are ready to be sent, they are added to sndQueue and the
@@ -179,7 +179,7 @@ type endpoint struct {
sndBufUsed int
sndClosed bool
sndBufInQueue seqnum.Size
- sndQueue segmentList
+ sndQueue segmentList `state:"wait"`
sndWaker sleep.Waker `state:"manual"`
sndCloseWaker sleep.Waker `state:"manual"`
@@ -201,17 +201,21 @@ type endpoint struct {
// notifyFlags is a bitmask of flags used to indicate to the protocol
// goroutine what it was notified; this is only accessed atomically.
- notifyFlags uint32 `state:"zerovalue"`
+ notifyFlags uint32 `state:"nosave"`
// acceptedChan is used by a listening endpoint protocol goroutine to
// send newly accepted connections to the endpoint so that they can be
// read by Accept() calls.
- acceptedChan chan *endpoint `state:".(endpointChan)"`
+ acceptedChan chan *endpoint `state:"manual"`
+
+ // acceptedEndpoints is only used to save / restore the channel buffer.
+ // FIXME
+ acceptedEndpoints []*endpoint
// The following are only used from the protocol goroutine, and
// therefore don't need locks to protect them.
- rcv *receiver
- snd *sender
+ rcv *receiver `state:"wait"`
+ snd *sender `state:"wait"`
// The goroutine drain completion notification channel.
drainDone chan struct{} `state:"nosave"`
@@ -224,6 +228,7 @@ type endpoint struct {
probe stack.TCPProbeFunc `state:"nosave"`
// The following are only used to assist the restore run to re-connect.
+ bindAddress tcpip.Address
connectingAddress tcpip.Address
}
@@ -357,6 +362,7 @@ func (e *endpoint) Close() {
// Either perform the local cleanup or kick the worker to make sure it
// knows it needs to cleanup.
+ tcpip.AddDanglingEndpoint(e)
if !e.workerRunning {
e.cleanupLocked()
} else {
@@ -376,9 +382,12 @@ func (e *endpoint) cleanupLocked() {
if e.acceptedChan != nil {
close(e.acceptedChan)
for n := range e.acceptedChan {
+ n.mu.Lock()
n.resetConnectionLocked(tcpip.ErrConnectionAborted)
+ n.mu.Unlock()
n.Close()
}
+ e.acceptedChan = nil
}
e.workerCleanup = false
@@ -387,6 +396,7 @@ func (e *endpoint) cleanupLocked() {
}
e.route.Release()
+ tcpip.DeleteDanglingEndpoint(e)
}
// Read reads data from the endpoint.
@@ -801,6 +811,16 @@ func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocol
// Connect connects the endpoint to its peer.
func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error {
+ return e.connect(addr, true, true)
+}
+
+// connect connects the endpoint to its peer. In the normal non-S/R case, the
+// new connection is expected to run the main goroutine and perform handshake.
+// In restore of previously connected endpoints, both ends will be passively
+// created (so no new handshaking is done); for stack-accepted connections not
+// yet accepted by the app, they are restored without running the main goroutine
+// here.
+func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
@@ -912,9 +932,27 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error {
e.boundNICID = nicid
e.effectiveNetProtos = netProtos
e.connectingAddress = connectingAddr
- e.workerRunning = true
- go e.protocolMainLoop(false) // S/R-SAFE: will be drained before save.
+ // Connect in the restore phase does not perform handshake. Restore its
+ // connection setting here.
+ if !handshake {
+ e.segmentQueue.mu.Lock()
+ for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} {
+ for s := l.Front(); s != nil; s = s.Next() {
+ s.id = e.id
+ s.route = r.Clone()
+ e.sndWaker.Assert()
+ }
+ }
+ e.segmentQueue.mu.Unlock()
+ e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0)
+ e.state = stateConnected
+ }
+
+ if run {
+ e.workerRunning = true
+ go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save.
+ }
return tcpip.ErrConnectStarted
}
@@ -999,6 +1037,9 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error {
if len(e.acceptedChan) > backlog {
return tcpip.ErrInvalidEndpointState
}
+ if cap(e.acceptedChan) == backlog {
+ return nil
+ }
origChan := e.acceptedChan
e.acceptedChan = make(chan *endpoint, backlog)
close(origChan)
@@ -1036,7 +1077,7 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error {
func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) {
e.waiterQueue = waiterQueue
e.workerRunning = true
- go e.protocolMainLoop(true) // S/R-FIXME
+ go e.protocolMainLoop(false) // S/R-SAFE: drained on save.
}
// Accept returns a new endpoint if a peer has established a connection
@@ -1077,6 +1118,7 @@ func (e *endpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) (ret
return tcpip.ErrAlreadyBound
}
+ e.bindAddress = addr.Addr
netProto, err := e.checkV4Mapped(&addr)
if err != nil {
return err