summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip')
-rw-r--r--pkg/tcpip/stack/stack_global_state.go2
-rw-r--r--pkg/tcpip/tcpip.go36
-rw-r--r--pkg/tcpip/transport/tcp/BUILD7
-rw-r--r--pkg/tcpip/transport/tcp/accept.go11
-rw-r--r--pkg/tcpip/transport/tcp/connect.go43
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go72
-rw-r--r--pkg/tcpip/transport/tcp/endpoint_state.go185
-rw-r--r--pkg/tcpip/transport/tcp/segment.go8
-rw-r--r--pkg/tcpip/transport/tcp/segment_queue.go4
-rw-r--r--pkg/tcpip/transport/tcp/segment_state.go41
-rw-r--r--pkg/tcpip/transport/tcp/snd.go4
-rw-r--r--pkg/tcpip/transport/tcp/snd_state.go39
12 files changed, 90 insertions, 362 deletions
diff --git a/pkg/tcpip/stack/stack_global_state.go b/pkg/tcpip/stack/stack_global_state.go
index 260d7d05c..030ae98d1 100644
--- a/pkg/tcpip/stack/stack_global_state.go
+++ b/pkg/tcpip/stack/stack_global_state.go
@@ -5,5 +5,5 @@
package stack
// StackFromEnv is the global stack created in restore run.
-// FIXME
+// FIXME: remove this variable once tcpip S/R is fully supported.
var StackFromEnv *Stack
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index 17fa0efb7..cf25a086d 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -23,7 +23,6 @@ import (
"fmt"
"strconv"
"strings"
- "sync"
"time"
"gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
@@ -553,38 +552,3 @@ type ProtocolAddress struct {
// Address is a network address.
Address Address
}
-
-// danglingEndpointsMu protects access to danglingEndpoints.
-var danglingEndpointsMu sync.Mutex
-
-// danglingEndpoints tracks all dangling endpoints no longer owned by the app.
-var danglingEndpoints = make(map[Endpoint]struct{})
-
-// GetDanglingEndpoints returns all dangling endpoints.
-func GetDanglingEndpoints() []Endpoint {
- es := make([]Endpoint, 0, len(danglingEndpoints))
- danglingEndpointsMu.Lock()
- for e, _ := range danglingEndpoints {
- es = append(es, e)
- }
- danglingEndpointsMu.Unlock()
- return es
-}
-
-// AddDanglingEndpoint adds a dangling endpoint.
-func AddDanglingEndpoint(e Endpoint) {
- danglingEndpointsMu.Lock()
- danglingEndpoints[e] = struct{}{}
- danglingEndpointsMu.Unlock()
-}
-
-// DeleteDanglingEndpoint removes a dangling endpoint.
-func DeleteDanglingEndpoint(e Endpoint) {
- danglingEndpointsMu.Lock()
- delete(danglingEndpoints, e)
- danglingEndpointsMu.Unlock()
-}
-
-// AsyncLoading is the global barrier for asynchronous endpoint loading
-// activities.
-var AsyncLoading sync.WaitGroup
diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD
index d129aa285..f38f58e87 100644
--- a/pkg/tcpip/transport/tcp/BUILD
+++ b/pkg/tcpip/transport/tcp/BUILD
@@ -10,16 +10,11 @@ go_stateify(
"endpoint.go",
"endpoint_state.go",
"rcv.go",
- "segment.go",
"segment_heap.go",
- "segment_queue.go",
- "segment_state.go",
"snd.go",
- "snd_state.go",
"tcp_segment_list.go",
],
out = "tcp_state.go",
- imports = ["gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"],
package = "tcp",
)
@@ -48,9 +43,7 @@ go_library(
"segment.go",
"segment_heap.go",
"segment_queue.go",
- "segment_state.go",
"snd.go",
- "snd_state.go",
"tcp_segment_list.go",
"tcp_state.go",
"timer.go",
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 410dfdad4..85adeef0e 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -68,8 +68,7 @@ func encodeMSS(mss uint16) uint32 {
// to go above a threshold.
var synRcvdCount struct {
sync.Mutex
- value uint64
- pending sync.WaitGroup
+ value uint64
}
// listenContext is used by a listening endpoint to store state used while
@@ -103,7 +102,6 @@ func incSynRcvdCount() bool {
return false
}
- synRcvdCount.pending.Add(1)
synRcvdCount.value++
return true
@@ -117,7 +115,6 @@ func decSynRcvdCount() {
defer synRcvdCount.Unlock()
synRcvdCount.value--
- synRcvdCount.pending.Done()
}
// newListenContext creates a new listen context.
@@ -295,7 +292,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) {
opts := parseSynSegmentOptions(s)
if incSynRcvdCount() {
s.incRef()
- go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier.
+ go e.handleSynSegment(ctx, s, &opts) // S/R-FIXME
} else {
cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS))
// Send SYN with window scaling because we currently
@@ -384,12 +381,10 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error {
return nil
}
if n&notifyDrain != 0 {
- for !e.segmentQueue.empty() {
- s := e.segmentQueue.dequeue()
+ for s := e.segmentQueue.dequeue(); s != nil; s = e.segmentQueue.dequeue() {
e.handleListenSegment(ctx, s)
s.decRef()
}
- synRcvdCount.pending.Wait()
close(e.drainDone)
<-e.undrain
}
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index d9f87c793..9aaabe0b1 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -443,8 +443,7 @@ func (h *handshake) execute() *tcpip.Error {
return tcpip.ErrAborted
}
if n&notifyDrain != 0 {
- for !h.ep.segmentQueue.empty() {
- s := h.ep.segmentQueue.dequeue()
+ for s := h.ep.segmentQueue.dequeue(); s != nil; s = h.ep.segmentQueue.dequeue() {
err := h.handleSegment(s)
s.decRef()
if err != nil {
@@ -814,13 +813,15 @@ func (e *endpoint) handleSegments() *tcpip.Error {
// protocolMainLoop is the main loop of the TCP protocol. It runs in its own
// goroutine and is responsible for sending segments and handling received
// segments.
-func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
+func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error {
var closeTimer *time.Timer
var closeWaker sleep.Waker
defer func() {
// e.mu is expected to be hold upon entering this section.
+ e.completeWorkerLocked()
+
if e.snd != nil {
e.snd.resendTimer.cleanup()
}
@@ -829,8 +830,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
closeTimer.Stop()
}
- e.completeWorkerLocked()
-
if e.drainDone != nil {
close(e.drainDone)
}
@@ -841,7 +840,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
}()
- if handshake {
+ if !passive {
// This is an active connection, so we must initiate the 3-way
// handshake, and then inform potential waiters about its
// completion.
@@ -946,17 +945,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
closeWaker.Assert()
})
}
-
- if n&notifyDrain != 0 {
- for !e.segmentQueue.empty() {
- if err := e.handleSegments(); err != nil {
- return err
- }
- }
- close(e.drainDone)
- <-e.undrain
- }
-
return nil
},
},
@@ -968,27 +956,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error {
s.AddWaker(funcs[i].w, i)
}
- // The following assertions and notifications are needed for restored
- // endpoints. Fresh newly created endpoints have empty states and should
- // not invoke any.
- e.segmentQueue.mu.Lock()
- if !e.segmentQueue.list.Empty() {
- e.newSegmentWaker.Assert()
- }
- e.segmentQueue.mu.Unlock()
-
- e.rcvListMu.Lock()
- if !e.rcvList.Empty() {
- e.waiterQueue.Notify(waiter.EventIn)
- }
- e.rcvListMu.Unlock()
-
- e.mu.RLock()
- if e.workerCleanup {
- e.notifyProtocolGoroutine(notifyClose)
- }
- e.mu.RUnlock()
-
// Main loop. Handle segments until both send and receive ends of the
// connection have completed.
for !e.rcv.closed || !e.snd.closed || e.snd.sndUna != e.snd.sndNxtList {
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 706977618..b21c2b4ab 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -69,7 +69,7 @@ type endpoint struct {
// change throughout the lifetime of the endpoint.
stack *stack.Stack `state:"manual"`
netProto tcpip.NetworkProtocolNumber
- waiterQueue *waiter.Queue `state:"wait"`
+ waiterQueue *waiter.Queue
// lastError represents the last error that the endpoint reported;
// access to it is protected by the following mutex.
@@ -82,8 +82,8 @@ type endpoint struct {
//
// Once the peer has closed its send side, rcvClosed is set to true
// to indicate to users that no more data is coming.
- rcvListMu sync.Mutex `state:"nosave"`
- rcvList segmentList `state:"wait"`
+ rcvListMu sync.Mutex `state:"nosave"`
+ rcvList segmentList
rcvClosed bool
rcvBufSize int
rcvBufUsed int
@@ -91,8 +91,8 @@ type endpoint struct {
// The following fields are protected by the mutex.
mu sync.RWMutex `state:"nosave"`
id stack.TransportEndpointID
- state endpointState `state:".(endpointState)"`
- isPortReserved bool `state:"manual"`
+ state endpointState
+ isPortReserved bool `state:"manual"`
isRegistered bool
boundNICID tcpip.NICID `state:"manual"`
route stack.Route `state:"manual"`
@@ -118,7 +118,7 @@ type endpoint struct {
// workerCleanup specifies if the worker goroutine must perform cleanup
// before exitting. This can only be set to true when workerRunning is
// also true, and they're both protected by the mutex.
- workerCleanup bool
+ workerCleanup bool `state:"zerovalue"`
// sendTSOk is used to indicate when the TS Option has been negotiated.
// When sendTSOk is true every non-RST segment should carry a TS as per
@@ -153,7 +153,7 @@ type endpoint struct {
// segmentQueue is used to hand received segments to the protocol
// goroutine. Segments are queued as long as the queue is not full,
// and dropped when it is.
- segmentQueue segmentQueue `state:"wait"`
+ segmentQueue segmentQueue `state:"zerovalue"`
// The following fields are used to manage the send buffer. When
// segments are ready to be sent, they are added to sndQueue and the
@@ -166,7 +166,7 @@ type endpoint struct {
sndBufUsed int
sndClosed bool
sndBufInQueue seqnum.Size
- sndQueue segmentList `state:"wait"`
+ sndQueue segmentList
sndWaker sleep.Waker `state:"manual"`
sndCloseWaker sleep.Waker `state:"manual"`
@@ -188,21 +188,17 @@ type endpoint struct {
// notifyFlags is a bitmask of flags used to indicate to the protocol
// goroutine what it was notified; this is only accessed atomically.
- notifyFlags uint32 `state:"nosave"`
+ notifyFlags uint32 `state:"zerovalue"`
// acceptedChan is used by a listening endpoint protocol goroutine to
// send newly accepted connections to the endpoint so that they can be
// read by Accept() calls.
- acceptedChan chan *endpoint `state:"manual"`
-
- // acceptedEndpoints is only used to save / restore the channel buffer.
- // FIXME
- acceptedEndpoints []*endpoint
+ acceptedChan chan *endpoint `state:".(endpointChan)"`
// The following are only used from the protocol goroutine, and
// therefore don't need locks to protect them.
- rcv *receiver `state:"wait"`
- snd *sender `state:"wait"`
+ rcv *receiver
+ snd *sender
// The goroutine drain completion notification channel.
drainDone chan struct{} `state:"nosave"`
@@ -215,7 +211,6 @@ type endpoint struct {
probe stack.TCPProbeFunc `state:"nosave"`
// The following are only used to assist the restore run to re-connect.
- bindAddress tcpip.Address
connectingAddress tcpip.Address
}
@@ -349,7 +344,6 @@ func (e *endpoint) Close() {
// Either perform the local cleanup or kick the worker to make sure it
// knows it needs to cleanup.
- tcpip.AddDanglingEndpoint(e)
if !e.workerRunning {
e.cleanupLocked()
} else {
@@ -369,12 +363,9 @@ func (e *endpoint) cleanupLocked() {
if e.acceptedChan != nil {
close(e.acceptedChan)
for n := range e.acceptedChan {
- n.mu.Lock()
n.resetConnectionLocked(tcpip.ErrConnectionAborted)
- n.mu.Unlock()
n.Close()
}
- e.acceptedChan = nil
}
e.workerCleanup = false
@@ -383,7 +374,6 @@ func (e *endpoint) cleanupLocked() {
}
e.route.Release()
- tcpip.DeleteDanglingEndpoint(e)
}
// Read reads data from the endpoint.
@@ -796,16 +786,6 @@ func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocol
// Connect connects the endpoint to its peer.
func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error {
- return e.connect(addr, true, true)
-}
-
-// connect connects the endpoint to its peer. In the normal non-S/R case, the
-// new connection is expected to run the main goroutine and perform handshake.
-// In restore of previously connected endpoints, both ends will be passively
-// created (so no new handshaking is done); for stack-accepted connections not
-// yet accepted by the app, they are restored without running the main goroutine
-// here.
-func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tcpip.Error {
e.mu.Lock()
defer e.mu.Unlock()
@@ -917,27 +897,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tc
e.boundNICID = nicid
e.effectiveNetProtos = netProtos
e.connectingAddress = connectingAddr
+ e.workerRunning = true
- // Connect in the restore phase does not perform handshake. Restore its
- // connection setting here.
- if !handshake {
- e.segmentQueue.mu.Lock()
- for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} {
- for s := l.Front(); s != nil; s = s.Next() {
- s.id = e.id
- s.route = r.Clone()
- e.sndWaker.Assert()
- }
- }
- e.segmentQueue.mu.Unlock()
- e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0)
- e.state = stateConnected
- }
-
- if run {
- e.workerRunning = true
- go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save.
- }
+ go e.protocolMainLoop(false) // S/R-SAFE: will be drained before save.
return tcpip.ErrConnectStarted
}
@@ -1009,9 +971,6 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error {
if len(e.acceptedChan) > backlog {
return tcpip.ErrInvalidEndpointState
}
- if cap(e.acceptedChan) == backlog {
- return nil
- }
origChan := e.acceptedChan
e.acceptedChan = make(chan *endpoint, backlog)
close(origChan)
@@ -1049,7 +1008,7 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error {
func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) {
e.waiterQueue = waiterQueue
e.workerRunning = true
- go e.protocolMainLoop(false) // S/R-SAFE: drained on save.
+ go e.protocolMainLoop(true) // S/R-FIXME
}
// Accept returns a new endpoint if a peer has established a connection
@@ -1090,7 +1049,6 @@ func (e *endpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) (ret
return tcpip.ErrAlreadyBound
}
- e.bindAddress = addr.Addr
netProto, err := e.checkV4Mapped(&addr)
if err != nil {
return err
diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go
index 38c97c796..b1e249bff 100644
--- a/pkg/tcpip/transport/tcp/endpoint_state.go
+++ b/pkg/tcpip/transport/tcp/endpoint_state.go
@@ -9,7 +9,6 @@ import (
"sync"
"gvisor.googlesource.com/gvisor/pkg/tcpip"
- "gvisor.googlesource.com/gvisor/pkg/tcpip/header"
"gvisor.googlesource.com/gvisor/pkg/tcpip/stack"
)
@@ -23,7 +22,7 @@ func (e *endpoint) drainSegmentLocked() {
e.undrain = make(chan struct{})
e.mu.Unlock()
- e.notifyProtocolGoroutine(notifyDrain)
+ e.notificationWaker.Assert()
<-e.drainDone
e.mu.Lock()
@@ -39,98 +38,37 @@ func (e *endpoint) beforeSave() {
switch e.state {
case stateInitial, stateBound:
- case stateListen, stateConnecting, stateConnected:
- if e.state == stateConnected && !e.workerRunning {
- // The endpoint must be in acceptedChan.
- break
+ case stateListen:
+ if !e.segmentQueue.empty() {
+ e.drainSegmentLocked()
}
+ case stateConnecting:
e.drainSegmentLocked()
- if e.state != stateClosed && e.state != stateError {
- if !e.workerRunning {
- panic("endpoint has no worker running in listen, connecting, or connected state")
- }
+ if e.state != stateConnected {
break
}
fallthrough
+ case stateConnected:
+ // FIXME
+ panic(tcpip.ErrSaveRejection{fmt.Errorf("endpoint cannot be saved in connected state: local %v:%v, remote %v:%v", e.id.LocalAddress, e.id.LocalPort, e.id.RemoteAddress, e.id.RemotePort)})
case stateClosed, stateError:
if e.workerRunning {
- panic("endpoint still has worker running in closed or error state")
+ panic(fmt.Sprintf("endpoint still has worker running in closed or error state"))
}
default:
panic(fmt.Sprintf("endpoint in unknown state %v", e.state))
}
-
- if e.waiterQueue != nil && !e.waiterQueue.IsEmpty() {
- panic("endpoint still has waiters upon save")
- }
-
- if !((e.state == stateBound || e.state == stateListen) == e.isPortReserved) {
- panic("endpoint port must and must only be reserved in bound or listen state")
- }
-
- if e.acceptedChan != nil {
- close(e.acceptedChan)
- e.acceptedEndpoints = make([]*endpoint, len(e.acceptedChan), cap(e.acceptedChan))
- i := 0
- for ep := range e.acceptedChan {
- e.acceptedEndpoints[i] = ep
- i++
- }
- if i != len(e.acceptedEndpoints) {
- panic("endpoint acceptedChan buffer got consumed by background context")
- }
- }
-}
-
-// saveState is invoked by stateify.
-func (e *endpoint) saveState() endpointState {
- return e.state
-}
-
-// Endpoint loading must be done in the following ordering by their state, to
-// avoid dangling connecting w/o listening peer, and to avoid conflicts in port
-// reservation.
-var connectedLoading sync.WaitGroup
-var listenLoading sync.WaitGroup
-var connectingLoading sync.WaitGroup
-
-// Bound endpoint loading happens last.
-
-// loadState is invoked by stateify.
-func (e *endpoint) loadState(state endpointState) {
- // This is to ensure that the loading wait groups include all applicable
- // endpoints before any asynchronous calls to the Wait() methods.
- switch state {
- case stateConnected:
- connectedLoading.Add(1)
- case stateListen:
- listenLoading.Add(1)
- case stateConnecting:
- connectingLoading.Add(1)
- }
- e.state = state
}
// afterLoad is invoked by stateify.
func (e *endpoint) afterLoad() {
- // We load acceptedChan buffer indirectly here. Note that closed
- // endpoints might not need to allocate the channel.
- // FIXME
- if cap(e.acceptedEndpoints) > 0 {
- e.acceptedChan = make(chan *endpoint, cap(e.acceptedEndpoints))
- for _, ep := range e.acceptedEndpoints {
- e.acceptedChan <- ep
- }
- e.acceptedEndpoints = nil
- }
-
e.stack = stack.StackFromEnv
e.segmentQueue.setLimit(2 * e.rcvBufSize)
e.workMu.Init()
state := e.state
switch state {
- case stateInitial, stateBound, stateListen, stateConnecting, stateConnected:
+ case stateInitial, stateBound, stateListen, stateConnecting:
var ss SendBufferSizeOption
if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil {
if e.sndBufSize < ss.Min || e.sndBufSize > ss.Max {
@@ -142,72 +80,65 @@ func (e *endpoint) afterLoad() {
}
}
- bind := func() {
+ switch state {
+ case stateBound, stateListen, stateConnecting:
e.state = stateInitial
- if len(e.bindAddress) == 0 {
- e.bindAddress = e.id.LocalAddress
- }
- if err := e.Bind(tcpip.FullAddress{Addr: e.bindAddress, Port: e.id.LocalPort}, nil); err != nil {
+ if err := e.Bind(tcpip.FullAddress{Addr: e.id.LocalAddress, Port: e.id.LocalPort}, nil); err != nil {
panic("endpoint binding failed: " + err.String())
}
}
switch state {
- case stateConnected:
- bind()
- if len(e.connectingAddress) == 0 {
- // This endpoint is accepted by netstack but not yet by
- // the app. If the endpoint is IPv6 but the remote
- // address is IPv4, we need to connect as IPv6 so that
- // dual-stack mode can be properly activated.
- if e.netProto == header.IPv6ProtocolNumber && len(e.id.RemoteAddress) != header.IPv6AddressSize {
- e.connectingAddress = "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff" + e.id.RemoteAddress
- } else {
- e.connectingAddress = e.id.RemoteAddress
- }
+ case stateListen:
+ backlog := cap(e.acceptedChan)
+ e.acceptedChan = nil
+ if err := e.Listen(backlog); err != nil {
+ panic("endpoint listening failed: " + err.String())
}
- if err := e.connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}, false, e.workerRunning); err != tcpip.ErrConnectStarted {
+ }
+
+ switch state {
+ case stateConnecting:
+ if err := e.Connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}); err != tcpip.ErrConnectStarted {
panic("endpoint connecting failed: " + err.String())
}
- connectedLoading.Done()
- case stateListen:
- tcpip.AsyncLoading.Add(1)
- go func() {
- connectedLoading.Wait()
- bind()
- backlog := cap(e.acceptedChan)
- if err := e.Listen(backlog); err != nil {
- panic("endpoint listening failed: " + err.String())
- }
- listenLoading.Done()
- tcpip.AsyncLoading.Done()
- }()
- case stateConnecting:
- tcpip.AsyncLoading.Add(1)
- go func() {
- connectedLoading.Wait()
- listenLoading.Wait()
- bind()
- if err := e.Connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}); err != tcpip.ErrConnectStarted {
- panic("endpoint connecting failed: " + err.String())
- }
- connectingLoading.Done()
- tcpip.AsyncLoading.Done()
- }()
- case stateBound:
- tcpip.AsyncLoading.Add(1)
- go func() {
- connectedLoading.Wait()
- listenLoading.Wait()
- connectingLoading.Wait()
- bind()
- tcpip.AsyncLoading.Done()
- }()
- case stateClosed, stateError:
- tcpip.DeleteDanglingEndpoint(e)
}
}
+// saveAcceptedChan is invoked by stateify.
+func (e *endpoint) saveAcceptedChan() endpointChan {
+ if e.acceptedChan == nil {
+ return endpointChan{}
+ }
+ close(e.acceptedChan)
+ buffer := make([]*endpoint, 0, len(e.acceptedChan))
+ for ep := range e.acceptedChan {
+ buffer = append(buffer, ep)
+ }
+ if len(buffer) != cap(buffer) {
+ panic("endpoint.acceptedChan buffer got consumed by background context")
+ }
+ c := cap(e.acceptedChan)
+ e.acceptedChan = nil
+ return endpointChan{buffer: buffer, cap: c}
+}
+
+// loadAcceptedChan is invoked by stateify.
+func (e *endpoint) loadAcceptedChan(c endpointChan) {
+ if c.cap == 0 {
+ return
+ }
+ e.acceptedChan = make(chan *endpoint, c.cap)
+ for _, ep := range c.buffer {
+ e.acceptedChan <- ep
+ }
+}
+
+type endpointChan struct {
+ buffer []*endpoint
+ cap int
+}
+
// saveLastError is invoked by stateify.
func (e *endpoint) saveLastError() string {
if e.lastError == nil {
diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go
index c5bff5f4f..07e4bfd73 100644
--- a/pkg/tcpip/transport/tcp/segment.go
+++ b/pkg/tcpip/transport/tcp/segment.go
@@ -29,9 +29,9 @@ const (
type segment struct {
segmentEntry
refCnt int32
- id stack.TransportEndpointID `state:"manual"`
- route stack.Route `state:"manual"`
- data buffer.VectorisedView `state:".(buffer.VectorisedView)"`
+ id stack.TransportEndpointID
+ route stack.Route `state:"manual"`
+ data buffer.VectorisedView
// views is used as buffer for data when its length is large
// enough to store a VectorisedView.
views [8]buffer.View
@@ -45,7 +45,7 @@ type segment struct {
// parsedOptions stores the parsed values from the options in the segment.
parsedOptions header.TCPOptions
- options []byte `state:".([]byte)"`
+ options []byte
}
func newSegment(r *stack.Route, id stack.TransportEndpointID, vv *buffer.VectorisedView) *segment {
diff --git a/pkg/tcpip/transport/tcp/segment_queue.go b/pkg/tcpip/transport/tcp/segment_queue.go
index a5e7b2ebf..c4a7f7d5b 100644
--- a/pkg/tcpip/transport/tcp/segment_queue.go
+++ b/pkg/tcpip/transport/tcp/segment_queue.go
@@ -12,8 +12,8 @@ import (
// segmentQueue is a bounded, thread-safe queue of TCP segments.
type segmentQueue struct {
- mu sync.Mutex `state:"nosave"`
- list segmentList `state:"wait"`
+ mu sync.Mutex
+ list segmentList
limit int
used int
}
diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go
deleted file mode 100644
index e5243200b..000000000
--- a/pkg/tcpip/transport/tcp/segment_state.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2018 The Netstack Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package tcp
-
-import (
- "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
-)
-
-// saveData is invoked by stateify.
-func (s *segment) saveData() buffer.VectorisedView {
- // We cannot save s.data directly as s.data.views may alias to s.views,
- // which is not allowed by state framework (in-struct pointer).
- return s.data.Clone(nil)
-}
-
-// loadData is invoked by stateify.
-func (s *segment) loadData(data buffer.VectorisedView) {
- // NOTE: We cannot do the s.data = data.Clone(s.views[:]) optimization
- // here because data.views is not guaranteed to be loaded by now. Plus,
- // data.views will be allocated anyway so there really is little point
- // of utilizing s.views for data.views.
- s.data = data
-}
-
-// saveOptions is invoked by stateify.
-func (s *segment) saveOptions() []byte {
- // We cannot save s.options directly as it may point to s.data's trimmed
- // tail, which is not allowed by state framework (in-struct pointer).
- b := make([]byte, 0, cap(s.options))
- return append(b, s.options...)
-}
-
-// loadOptions is invoked by stateify.
-func (s *segment) loadOptions(options []byte) {
- // NOTE: We cannot point s.options back into s.data's trimmed tail. But
- // it is OK as they do not need to aliased. Plus, options is already
- // allocated so there is no cost here.
- s.options = options
-}
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index a98aca293..95bea4d88 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -28,7 +28,7 @@ type sender struct {
ep *endpoint
// lastSendTime is the timestamp when the last packet was sent.
- lastSendTime time.Time `state:".(unixTime)"`
+ lastSendTime time.Time
// dupAckCount is the number of duplicated acks received. It is used for
// fast retransmit.
@@ -71,7 +71,7 @@ type sender struct {
rttMeasureSeqNum seqnum.Value
// rttMeasureTime is the time when the rttMeasureSeqNum was sent.
- rttMeasureTime time.Time `state:".(unixTime)"`
+ rttMeasureTime time.Time
closed bool
writeNext *segment
diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go
deleted file mode 100644
index d68773a7c..000000000
--- a/pkg/tcpip/transport/tcp/snd_state.go
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright 2018 The Netstack Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package tcp
-
-import (
- "time"
-)
-
-type unixTime struct {
- second int64
- nano int64
-}
-
-// saveLastSendTime is invoked by stateify.
-func (s *sender) saveLastSendTime() unixTime {
- return unixTime{s.lastSendTime.Unix(), s.lastSendTime.UnixNano()}
-}
-
-// loadLastSendTime is invoked by stateify.
-func (s *sender) loadLastSendTime(unix unixTime) {
- s.lastSendTime = time.Unix(unix.second, unix.nano)
-}
-
-// saveRttMeasureTime is invoked by stateify.
-func (s *sender) saveRttMeasureTime() unixTime {
- return unixTime{s.rttMeasureTime.Unix(), s.rttMeasureTime.UnixNano()}
-}
-
-// loadRttMeasureTime is invoked by stateify.
-func (s *sender) loadRttMeasureTime(unix unixTime) {
- s.rttMeasureTime = time.Unix(unix.second, unix.nano)
-}
-
-// afterLoad is invoked by stateify.
-func (s *sender) afterLoad() {
- s.resendTimer.init(&s.resendWaker)
-}