diff options
author | Brian Geffon <bgeffon@google.com> | 2018-06-26 10:32:22 -0700 |
---|---|---|
committer | Shentubot <shentubot@google.com> | 2018-06-26 10:33:24 -0700 |
commit | 51c1e510ab79607d80d6b81c2ae8ab308c323a58 (patch) | |
tree | 966e93f1f461b5b42bef3569631d9d9ad558e72f /pkg/tcpip/transport/tcp/endpoint.go | |
parent | 0ac11de8d37a1c08fe7068b67671235ee1c32cb8 (diff) |
Automated rollback of changelist 201596247
PiperOrigin-RevId: 202151720
Change-Id: I0491172c436bbb32b977f557953ba0bc41cfe299
Diffstat (limited to 'pkg/tcpip/transport/tcp/endpoint.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 72 |
1 files changed, 15 insertions, 57 deletions
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 706977618..b21c2b4ab 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -69,7 +69,7 @@ type endpoint struct { // change throughout the lifetime of the endpoint. stack *stack.Stack `state:"manual"` netProto tcpip.NetworkProtocolNumber - waiterQueue *waiter.Queue `state:"wait"` + waiterQueue *waiter.Queue // lastError represents the last error that the endpoint reported; // access to it is protected by the following mutex. @@ -82,8 +82,8 @@ type endpoint struct { // // Once the peer has closed its send side, rcvClosed is set to true // to indicate to users that no more data is coming. - rcvListMu sync.Mutex `state:"nosave"` - rcvList segmentList `state:"wait"` + rcvListMu sync.Mutex `state:"nosave"` + rcvList segmentList rcvClosed bool rcvBufSize int rcvBufUsed int @@ -91,8 +91,8 @@ type endpoint struct { // The following fields are protected by the mutex. mu sync.RWMutex `state:"nosave"` id stack.TransportEndpointID - state endpointState `state:".(endpointState)"` - isPortReserved bool `state:"manual"` + state endpointState + isPortReserved bool `state:"manual"` isRegistered bool boundNICID tcpip.NICID `state:"manual"` route stack.Route `state:"manual"` @@ -118,7 +118,7 @@ type endpoint struct { // workerCleanup specifies if the worker goroutine must perform cleanup // before exitting. This can only be set to true when workerRunning is // also true, and they're both protected by the mutex. - workerCleanup bool + workerCleanup bool `state:"zerovalue"` // sendTSOk is used to indicate when the TS Option has been negotiated. // When sendTSOk is true every non-RST segment should carry a TS as per @@ -153,7 +153,7 @@ type endpoint struct { // segmentQueue is used to hand received segments to the protocol // goroutine. Segments are queued as long as the queue is not full, // and dropped when it is. - segmentQueue segmentQueue `state:"wait"` + segmentQueue segmentQueue `state:"zerovalue"` // The following fields are used to manage the send buffer. When // segments are ready to be sent, they are added to sndQueue and the @@ -166,7 +166,7 @@ type endpoint struct { sndBufUsed int sndClosed bool sndBufInQueue seqnum.Size - sndQueue segmentList `state:"wait"` + sndQueue segmentList sndWaker sleep.Waker `state:"manual"` sndCloseWaker sleep.Waker `state:"manual"` @@ -188,21 +188,17 @@ type endpoint struct { // notifyFlags is a bitmask of flags used to indicate to the protocol // goroutine what it was notified; this is only accessed atomically. - notifyFlags uint32 `state:"nosave"` + notifyFlags uint32 `state:"zerovalue"` // acceptedChan is used by a listening endpoint protocol goroutine to // send newly accepted connections to the endpoint so that they can be // read by Accept() calls. - acceptedChan chan *endpoint `state:"manual"` - - // acceptedEndpoints is only used to save / restore the channel buffer. - // FIXME - acceptedEndpoints []*endpoint + acceptedChan chan *endpoint `state:".(endpointChan)"` // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. - rcv *receiver `state:"wait"` - snd *sender `state:"wait"` + rcv *receiver + snd *sender // The goroutine drain completion notification channel. drainDone chan struct{} `state:"nosave"` @@ -215,7 +211,6 @@ type endpoint struct { probe stack.TCPProbeFunc `state:"nosave"` // The following are only used to assist the restore run to re-connect. - bindAddress tcpip.Address connectingAddress tcpip.Address } @@ -349,7 +344,6 @@ func (e *endpoint) Close() { // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. - tcpip.AddDanglingEndpoint(e) if !e.workerRunning { e.cleanupLocked() } else { @@ -369,12 +363,9 @@ func (e *endpoint) cleanupLocked() { if e.acceptedChan != nil { close(e.acceptedChan) for n := range e.acceptedChan { - n.mu.Lock() n.resetConnectionLocked(tcpip.ErrConnectionAborted) - n.mu.Unlock() n.Close() } - e.acceptedChan = nil } e.workerCleanup = false @@ -383,7 +374,6 @@ func (e *endpoint) cleanupLocked() { } e.route.Release() - tcpip.DeleteDanglingEndpoint(e) } // Read reads data from the endpoint. @@ -796,16 +786,6 @@ func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocol // Connect connects the endpoint to its peer. func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { - return e.connect(addr, true, true) -} - -// connect connects the endpoint to its peer. In the normal non-S/R case, the -// new connection is expected to run the main goroutine and perform handshake. -// In restore of previously connected endpoints, both ends will be passively -// created (so no new handshaking is done); for stack-accepted connections not -// yet accepted by the app, they are restored without running the main goroutine -// here. -func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tcpip.Error { e.mu.Lock() defer e.mu.Unlock() @@ -917,27 +897,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tc e.boundNICID = nicid e.effectiveNetProtos = netProtos e.connectingAddress = connectingAddr + e.workerRunning = true - // Connect in the restore phase does not perform handshake. Restore its - // connection setting here. - if !handshake { - e.segmentQueue.mu.Lock() - for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { - for s := l.Front(); s != nil; s = s.Next() { - s.id = e.id - s.route = r.Clone() - e.sndWaker.Assert() - } - } - e.segmentQueue.mu.Unlock() - e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) - e.state = stateConnected - } - - if run { - e.workerRunning = true - go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save. - } + go e.protocolMainLoop(false) // S/R-SAFE: will be drained before save. return tcpip.ErrConnectStarted } @@ -1009,9 +971,6 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { if len(e.acceptedChan) > backlog { return tcpip.ErrInvalidEndpointState } - if cap(e.acceptedChan) == backlog { - return nil - } origChan := e.acceptedChan e.acceptedChan = make(chan *endpoint, backlog) close(origChan) @@ -1049,7 +1008,7 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) { e.waiterQueue = waiterQueue e.workerRunning = true - go e.protocolMainLoop(false) // S/R-SAFE: drained on save. + go e.protocolMainLoop(true) // S/R-FIXME } // Accept returns a new endpoint if a peer has established a connection @@ -1090,7 +1049,6 @@ func (e *endpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) (ret return tcpip.ErrAlreadyBound } - e.bindAddress = addr.Addr netProto, err := e.checkV4Mapped(&addr) if err != nil { return err |