From b1683df90bf81974e9e309ed66edaff30537c1be Mon Sep 17 00:00:00 2001 From: Zhaozhong Ni Date: Tue, 10 Jul 2018 09:22:37 -0700 Subject: netstack: tcp socket connected state S/R support. PiperOrigin-RevId: 203958972 Change-Id: Ia6fe16547539296d48e2c6731edacdd96bd6e93c --- pkg/tcpip/transport/tcp/endpoint.go | 72 +++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 15 deletions(-) (limited to 'pkg/tcpip/transport/tcp/endpoint.go') diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index cb105b863..8b9a81f6a 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -80,7 +80,7 @@ type endpoint struct { // change throughout the lifetime of the endpoint. stack *stack.Stack `state:"manual"` netProto tcpip.NetworkProtocolNumber - waiterQueue *waiter.Queue + waiterQueue *waiter.Queue `state:"wait"` // lastError represents the last error that the endpoint reported; // access to it is protected by the following mutex. @@ -95,8 +95,8 @@ type endpoint struct { // to indicate to users that no more data is coming. // // rcvListMu can be taken after the endpoint mu below. - rcvListMu sync.Mutex `state:"nosave"` - rcvList segmentList + rcvListMu sync.Mutex `state:"nosave"` + rcvList segmentList `state:"wait"` rcvClosed bool rcvBufSize int rcvBufUsed int @@ -104,8 +104,8 @@ type endpoint struct { // The following fields are protected by the mutex. mu sync.RWMutex `state:"nosave"` id stack.TransportEndpointID - state endpointState - isPortReserved bool `state:"manual"` + state endpointState `state:".(endpointState)"` + isPortReserved bool `state:"manual"` isRegistered bool boundNICID tcpip.NICID `state:"manual"` route stack.Route `state:"manual"` @@ -131,7 +131,7 @@ type endpoint struct { // workerCleanup specifies if the worker goroutine must perform cleanup // before exitting. This can only be set to true when workerRunning is // also true, and they're both protected by the mutex. - workerCleanup bool `state:"zerovalue"` + workerCleanup bool // sendTSOk is used to indicate when the TS Option has been negotiated. // When sendTSOk is true every non-RST segment should carry a TS as per @@ -166,7 +166,7 @@ type endpoint struct { // segmentQueue is used to hand received segments to the protocol // goroutine. Segments are queued as long as the queue is not full, // and dropped when it is. - segmentQueue segmentQueue `state:"zerovalue"` + segmentQueue segmentQueue `state:"wait"` // The following fields are used to manage the send buffer. When // segments are ready to be sent, they are added to sndQueue and the @@ -179,7 +179,7 @@ type endpoint struct { sndBufUsed int sndClosed bool sndBufInQueue seqnum.Size - sndQueue segmentList + sndQueue segmentList `state:"wait"` sndWaker sleep.Waker `state:"manual"` sndCloseWaker sleep.Waker `state:"manual"` @@ -201,17 +201,21 @@ type endpoint struct { // notifyFlags is a bitmask of flags used to indicate to the protocol // goroutine what it was notified; this is only accessed atomically. - notifyFlags uint32 `state:"zerovalue"` + notifyFlags uint32 `state:"nosave"` // acceptedChan is used by a listening endpoint protocol goroutine to // send newly accepted connections to the endpoint so that they can be // read by Accept() calls. - acceptedChan chan *endpoint `state:".(endpointChan)"` + acceptedChan chan *endpoint `state:"manual"` + + // acceptedEndpoints is only used to save / restore the channel buffer. + // FIXME + acceptedEndpoints []*endpoint // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. - rcv *receiver - snd *sender + rcv *receiver `state:"wait"` + snd *sender `state:"wait"` // The goroutine drain completion notification channel. drainDone chan struct{} `state:"nosave"` @@ -224,6 +228,7 @@ type endpoint struct { probe stack.TCPProbeFunc `state:"nosave"` // The following are only used to assist the restore run to re-connect. + bindAddress tcpip.Address connectingAddress tcpip.Address } @@ -357,6 +362,7 @@ func (e *endpoint) Close() { // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. + tcpip.AddDanglingEndpoint(e) if !e.workerRunning { e.cleanupLocked() } else { @@ -376,9 +382,12 @@ func (e *endpoint) cleanupLocked() { if e.acceptedChan != nil { close(e.acceptedChan) for n := range e.acceptedChan { + n.mu.Lock() n.resetConnectionLocked(tcpip.ErrConnectionAborted) + n.mu.Unlock() n.Close() } + e.acceptedChan = nil } e.workerCleanup = false @@ -387,6 +396,7 @@ func (e *endpoint) cleanupLocked() { } e.route.Release() + tcpip.DeleteDanglingEndpoint(e) } // Read reads data from the endpoint. @@ -801,6 +811,16 @@ func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocol // Connect connects the endpoint to its peer. func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { + return e.connect(addr, true, true) +} + +// connect connects the endpoint to its peer. In the normal non-S/R case, the +// new connection is expected to run the main goroutine and perform handshake. +// In restore of previously connected endpoints, both ends will be passively +// created (so no new handshaking is done); for stack-accepted connections not +// yet accepted by the app, they are restored without running the main goroutine +// here. +func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tcpip.Error { e.mu.Lock() defer e.mu.Unlock() @@ -912,9 +932,27 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { e.boundNICID = nicid e.effectiveNetProtos = netProtos e.connectingAddress = connectingAddr - e.workerRunning = true - go e.protocolMainLoop(false) // S/R-SAFE: will be drained before save. + // Connect in the restore phase does not perform handshake. Restore its + // connection setting here. + if !handshake { + e.segmentQueue.mu.Lock() + for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { + for s := l.Front(); s != nil; s = s.Next() { + s.id = e.id + s.route = r.Clone() + e.sndWaker.Assert() + } + } + e.segmentQueue.mu.Unlock() + e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) + e.state = stateConnected + } + + if run { + e.workerRunning = true + go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save. + } return tcpip.ErrConnectStarted } @@ -999,6 +1037,9 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { if len(e.acceptedChan) > backlog { return tcpip.ErrInvalidEndpointState } + if cap(e.acceptedChan) == backlog { + return nil + } origChan := e.acceptedChan e.acceptedChan = make(chan *endpoint, backlog) close(origChan) @@ -1036,7 +1077,7 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) { e.waiterQueue = waiterQueue e.workerRunning = true - go e.protocolMainLoop(true) // S/R-FIXME + go e.protocolMainLoop(false) // S/R-SAFE: drained on save. } // Accept returns a new endpoint if a peer has established a connection @@ -1077,6 +1118,7 @@ func (e *endpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) (ret return tcpip.ErrAlreadyBound } + e.bindAddress = addr.Addr netProto, err := e.checkV4Mapped(&addr) if err != nil { return err -- cgit v1.2.3