diff options
-rw-r--r-- | pkg/sentry/kernel/BUILD | 5 | ||||
-rw-r--r-- | pkg/sentry/kernel/kernel.go | 6 | ||||
-rw-r--r-- | pkg/sentry/kernel/kernel_state.go | 31 | ||||
-rw-r--r-- | pkg/tcpip/stack/stack_global_state.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/tcpip.go | 36 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/BUILD | 7 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 11 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 43 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 72 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint_state.go | 185 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment.go | 8 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment_queue.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/segment_state.go | 41 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/snd_state.go | 39 |
15 files changed, 91 insertions, 403 deletions
diff --git a/pkg/sentry/kernel/BUILD b/pkg/sentry/kernel/BUILD index 07568b47c..b2a55ddff 100644 --- a/pkg/sentry/kernel/BUILD +++ b/pkg/sentry/kernel/BUILD @@ -12,7 +12,6 @@ go_stateify( "fs_context.go", "ipc_namespace.go", "kernel.go", - "kernel_state.go", "pending_signals.go", "pending_signals_state.go", "process_group_list.go", @@ -46,11 +45,10 @@ go_stateify( "vdso.go", "version.go", ], - out = "kernel_autogen_state.go", + out = "kernel_state.go", imports = [ "gvisor.googlesource.com/gvisor/pkg/sentry/arch", "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/kdefs", - "gvisor.googlesource.com/gvisor/pkg/tcpip", ], package = "kernel", ) @@ -119,7 +117,6 @@ go_library( "fs_context.go", "ipc_namespace.go", "kernel.go", - "kernel_autogen_state.go", "kernel_state.go", "pending_signals.go", "pending_signals_list.go", diff --git a/pkg/sentry/kernel/kernel.go b/pkg/sentry/kernel/kernel.go index 64439cd9d..5662b8f08 100644 --- a/pkg/sentry/kernel/kernel.go +++ b/pkg/sentry/kernel/kernel.go @@ -57,7 +57,6 @@ import ( sentrytime "gvisor.googlesource.com/gvisor/pkg/sentry/time" "gvisor.googlesource.com/gvisor/pkg/sentry/uniqueid" "gvisor.googlesource.com/gvisor/pkg/state" - "gvisor.googlesource.com/gvisor/pkg/tcpip" ) // Kernel represents an emulated Linux kernel. It must be initialized by calling @@ -159,9 +158,6 @@ type Kernel struct { // exitErr is the error causing the sandbox to exit, if any. It is // protected by extMu. exitErr error - - // danglingEndpoints is used to save / restore tcpip.DanglingEndpoints. - danglingEndpoints struct{} `state:".([]tcpip.Endpoint)"` } // InitKernelArgs holds arguments to Init. @@ -426,8 +422,6 @@ func (k *Kernel) LoadFrom(r io.Reader, p platform.Platform, net inet.Stack) erro return err } - tcpip.AsyncLoading.Wait() - log.Infof("Overall load took [%s]", time.Since(loadStart)) // Applications may size per-cpu structures based on k.applicationCores, so diff --git a/pkg/sentry/kernel/kernel_state.go b/pkg/sentry/kernel/kernel_state.go deleted file mode 100644 index bb2d5102d..000000000 --- a/pkg/sentry/kernel/kernel_state.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2018 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kernel - -import ( - "gvisor.googlesource.com/gvisor/pkg/tcpip" -) - -// saveDanglingEndpoints is invoked by stateify. -func (k *Kernel) saveDanglingEndpoints() []tcpip.Endpoint { - return tcpip.GetDanglingEndpoints() -} - -// loadDanglingEndpoints is invoked by stateify. -func (k *Kernel) loadDanglingEndpoints(es []tcpip.Endpoint) { - for _, e := range es { - tcpip.AddDanglingEndpoint(e) - } -} diff --git a/pkg/tcpip/stack/stack_global_state.go b/pkg/tcpip/stack/stack_global_state.go index 260d7d05c..030ae98d1 100644 --- a/pkg/tcpip/stack/stack_global_state.go +++ b/pkg/tcpip/stack/stack_global_state.go @@ -5,5 +5,5 @@ package stack // StackFromEnv is the global stack created in restore run. -// FIXME +// FIXME: remove this variable once tcpip S/R is fully supported. var StackFromEnv *Stack diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index 17fa0efb7..cf25a086d 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -23,7 +23,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" @@ -553,38 +552,3 @@ type ProtocolAddress struct { // Address is a network address. Address Address } - -// danglingEndpointsMu protects access to danglingEndpoints. -var danglingEndpointsMu sync.Mutex - -// danglingEndpoints tracks all dangling endpoints no longer owned by the app. -var danglingEndpoints = make(map[Endpoint]struct{}) - -// GetDanglingEndpoints returns all dangling endpoints. -func GetDanglingEndpoints() []Endpoint { - es := make([]Endpoint, 0, len(danglingEndpoints)) - danglingEndpointsMu.Lock() - for e, _ := range danglingEndpoints { - es = append(es, e) - } - danglingEndpointsMu.Unlock() - return es -} - -// AddDanglingEndpoint adds a dangling endpoint. -func AddDanglingEndpoint(e Endpoint) { - danglingEndpointsMu.Lock() - danglingEndpoints[e] = struct{}{} - danglingEndpointsMu.Unlock() -} - -// DeleteDanglingEndpoint removes a dangling endpoint. -func DeleteDanglingEndpoint(e Endpoint) { - danglingEndpointsMu.Lock() - delete(danglingEndpoints, e) - danglingEndpointsMu.Unlock() -} - -// AsyncLoading is the global barrier for asynchronous endpoint loading -// activities. -var AsyncLoading sync.WaitGroup diff --git a/pkg/tcpip/transport/tcp/BUILD b/pkg/tcpip/transport/tcp/BUILD index d129aa285..f38f58e87 100644 --- a/pkg/tcpip/transport/tcp/BUILD +++ b/pkg/tcpip/transport/tcp/BUILD @@ -10,16 +10,11 @@ go_stateify( "endpoint.go", "endpoint_state.go", "rcv.go", - "segment.go", "segment_heap.go", - "segment_queue.go", - "segment_state.go", "snd.go", - "snd_state.go", "tcp_segment_list.go", ], out = "tcp_state.go", - imports = ["gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"], package = "tcp", ) @@ -48,9 +43,7 @@ go_library( "segment.go", "segment_heap.go", "segment_queue.go", - "segment_state.go", "snd.go", - "snd_state.go", "tcp_segment_list.go", "tcp_state.go", "timer.go", diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 410dfdad4..85adeef0e 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -68,8 +68,7 @@ func encodeMSS(mss uint16) uint32 { // to go above a threshold. var synRcvdCount struct { sync.Mutex - value uint64 - pending sync.WaitGroup + value uint64 } // listenContext is used by a listening endpoint to store state used while @@ -103,7 +102,6 @@ func incSynRcvdCount() bool { return false } - synRcvdCount.pending.Add(1) synRcvdCount.value++ return true @@ -117,7 +115,6 @@ func decSynRcvdCount() { defer synRcvdCount.Unlock() synRcvdCount.value-- - synRcvdCount.pending.Done() } // newListenContext creates a new listen context. @@ -295,7 +292,7 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { opts := parseSynSegmentOptions(s) if incSynRcvdCount() { s.incRef() - go e.handleSynSegment(ctx, s, &opts) // S/R-SAFE: synRcvdCount is the barrier. + go e.handleSynSegment(ctx, s, &opts) // S/R-FIXME } else { cookie := ctx.createCookie(s.id, s.sequenceNumber, encodeMSS(opts.MSS)) // Send SYN with window scaling because we currently @@ -384,12 +381,10 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) *tcpip.Error { return nil } if n¬ifyDrain != 0 { - for !e.segmentQueue.empty() { - s := e.segmentQueue.dequeue() + for s := e.segmentQueue.dequeue(); s != nil; s = e.segmentQueue.dequeue() { e.handleListenSegment(ctx, s) s.decRef() } - synRcvdCount.pending.Wait() close(e.drainDone) <-e.undrain } diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index d9f87c793..9aaabe0b1 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -443,8 +443,7 @@ func (h *handshake) execute() *tcpip.Error { return tcpip.ErrAborted } if n¬ifyDrain != 0 { - for !h.ep.segmentQueue.empty() { - s := h.ep.segmentQueue.dequeue() + for s := h.ep.segmentQueue.dequeue(); s != nil; s = h.ep.segmentQueue.dequeue() { err := h.handleSegment(s) s.decRef() if err != nil { @@ -814,13 +813,15 @@ func (e *endpoint) handleSegments() *tcpip.Error { // protocolMainLoop is the main loop of the TCP protocol. It runs in its own // goroutine and is responsible for sending segments and handling received // segments. -func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { +func (e *endpoint) protocolMainLoop(passive bool) *tcpip.Error { var closeTimer *time.Timer var closeWaker sleep.Waker defer func() { // e.mu is expected to be hold upon entering this section. + e.completeWorkerLocked() + if e.snd != nil { e.snd.resendTimer.cleanup() } @@ -829,8 +830,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { closeTimer.Stop() } - e.completeWorkerLocked() - if e.drainDone != nil { close(e.drainDone) } @@ -841,7 +840,7 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) }() - if handshake { + if !passive { // This is an active connection, so we must initiate the 3-way // handshake, and then inform potential waiters about its // completion. @@ -946,17 +945,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { closeWaker.Assert() }) } - - if n¬ifyDrain != 0 { - for !e.segmentQueue.empty() { - if err := e.handleSegments(); err != nil { - return err - } - } - close(e.drainDone) - <-e.undrain - } - return nil }, }, @@ -968,27 +956,6 @@ func (e *endpoint) protocolMainLoop(handshake bool) *tcpip.Error { s.AddWaker(funcs[i].w, i) } - // The following assertions and notifications are needed for restored - // endpoints. Fresh newly created endpoints have empty states and should - // not invoke any. - e.segmentQueue.mu.Lock() - if !e.segmentQueue.list.Empty() { - e.newSegmentWaker.Assert() - } - e.segmentQueue.mu.Unlock() - - e.rcvListMu.Lock() - if !e.rcvList.Empty() { - e.waiterQueue.Notify(waiter.EventIn) - } - e.rcvListMu.Unlock() - - e.mu.RLock() - if e.workerCleanup { - e.notifyProtocolGoroutine(notifyClose) - } - e.mu.RUnlock() - // Main loop. Handle segments until both send and receive ends of the // connection have completed. for !e.rcv.closed || !e.snd.closed || e.snd.sndUna != e.snd.sndNxtList { diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 706977618..b21c2b4ab 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -69,7 +69,7 @@ type endpoint struct { // change throughout the lifetime of the endpoint. stack *stack.Stack `state:"manual"` netProto tcpip.NetworkProtocolNumber - waiterQueue *waiter.Queue `state:"wait"` + waiterQueue *waiter.Queue // lastError represents the last error that the endpoint reported; // access to it is protected by the following mutex. @@ -82,8 +82,8 @@ type endpoint struct { // // Once the peer has closed its send side, rcvClosed is set to true // to indicate to users that no more data is coming. - rcvListMu sync.Mutex `state:"nosave"` - rcvList segmentList `state:"wait"` + rcvListMu sync.Mutex `state:"nosave"` + rcvList segmentList rcvClosed bool rcvBufSize int rcvBufUsed int @@ -91,8 +91,8 @@ type endpoint struct { // The following fields are protected by the mutex. mu sync.RWMutex `state:"nosave"` id stack.TransportEndpointID - state endpointState `state:".(endpointState)"` - isPortReserved bool `state:"manual"` + state endpointState + isPortReserved bool `state:"manual"` isRegistered bool boundNICID tcpip.NICID `state:"manual"` route stack.Route `state:"manual"` @@ -118,7 +118,7 @@ type endpoint struct { // workerCleanup specifies if the worker goroutine must perform cleanup // before exitting. This can only be set to true when workerRunning is // also true, and they're both protected by the mutex. - workerCleanup bool + workerCleanup bool `state:"zerovalue"` // sendTSOk is used to indicate when the TS Option has been negotiated. // When sendTSOk is true every non-RST segment should carry a TS as per @@ -153,7 +153,7 @@ type endpoint struct { // segmentQueue is used to hand received segments to the protocol // goroutine. Segments are queued as long as the queue is not full, // and dropped when it is. - segmentQueue segmentQueue `state:"wait"` + segmentQueue segmentQueue `state:"zerovalue"` // The following fields are used to manage the send buffer. When // segments are ready to be sent, they are added to sndQueue and the @@ -166,7 +166,7 @@ type endpoint struct { sndBufUsed int sndClosed bool sndBufInQueue seqnum.Size - sndQueue segmentList `state:"wait"` + sndQueue segmentList sndWaker sleep.Waker `state:"manual"` sndCloseWaker sleep.Waker `state:"manual"` @@ -188,21 +188,17 @@ type endpoint struct { // notifyFlags is a bitmask of flags used to indicate to the protocol // goroutine what it was notified; this is only accessed atomically. - notifyFlags uint32 `state:"nosave"` + notifyFlags uint32 `state:"zerovalue"` // acceptedChan is used by a listening endpoint protocol goroutine to // send newly accepted connections to the endpoint so that they can be // read by Accept() calls. - acceptedChan chan *endpoint `state:"manual"` - - // acceptedEndpoints is only used to save / restore the channel buffer. - // FIXME - acceptedEndpoints []*endpoint + acceptedChan chan *endpoint `state:".(endpointChan)"` // The following are only used from the protocol goroutine, and // therefore don't need locks to protect them. - rcv *receiver `state:"wait"` - snd *sender `state:"wait"` + rcv *receiver + snd *sender // The goroutine drain completion notification channel. drainDone chan struct{} `state:"nosave"` @@ -215,7 +211,6 @@ type endpoint struct { probe stack.TCPProbeFunc `state:"nosave"` // The following are only used to assist the restore run to re-connect. - bindAddress tcpip.Address connectingAddress tcpip.Address } @@ -349,7 +344,6 @@ func (e *endpoint) Close() { // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. - tcpip.AddDanglingEndpoint(e) if !e.workerRunning { e.cleanupLocked() } else { @@ -369,12 +363,9 @@ func (e *endpoint) cleanupLocked() { if e.acceptedChan != nil { close(e.acceptedChan) for n := range e.acceptedChan { - n.mu.Lock() n.resetConnectionLocked(tcpip.ErrConnectionAborted) - n.mu.Unlock() n.Close() } - e.acceptedChan = nil } e.workerCleanup = false @@ -383,7 +374,6 @@ func (e *endpoint) cleanupLocked() { } e.route.Release() - tcpip.DeleteDanglingEndpoint(e) } // Read reads data from the endpoint. @@ -796,16 +786,6 @@ func (e *endpoint) checkV4Mapped(addr *tcpip.FullAddress) (tcpip.NetworkProtocol // Connect connects the endpoint to its peer. func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { - return e.connect(addr, true, true) -} - -// connect connects the endpoint to its peer. In the normal non-S/R case, the -// new connection is expected to run the main goroutine and perform handshake. -// In restore of previously connected endpoints, both ends will be passively -// created (so no new handshaking is done); for stack-accepted connections not -// yet accepted by the app, they are restored without running the main goroutine -// here. -func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tcpip.Error { e.mu.Lock() defer e.mu.Unlock() @@ -917,27 +897,9 @@ func (e *endpoint) connect(addr tcpip.FullAddress, handshake bool, run bool) *tc e.boundNICID = nicid e.effectiveNetProtos = netProtos e.connectingAddress = connectingAddr + e.workerRunning = true - // Connect in the restore phase does not perform handshake. Restore its - // connection setting here. - if !handshake { - e.segmentQueue.mu.Lock() - for _, l := range []segmentList{e.segmentQueue.list, e.sndQueue, e.snd.writeList} { - for s := l.Front(); s != nil; s = s.Next() { - s.id = e.id - s.route = r.Clone() - e.sndWaker.Assert() - } - } - e.segmentQueue.mu.Unlock() - e.snd.updateMaxPayloadSize(int(e.route.MTU()), 0) - e.state = stateConnected - } - - if run { - e.workerRunning = true - go e.protocolMainLoop(handshake) // S/R-SAFE: will be drained before save. - } + go e.protocolMainLoop(false) // S/R-SAFE: will be drained before save. return tcpip.ErrConnectStarted } @@ -1009,9 +971,6 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { if len(e.acceptedChan) > backlog { return tcpip.ErrInvalidEndpointState } - if cap(e.acceptedChan) == backlog { - return nil - } origChan := e.acceptedChan e.acceptedChan = make(chan *endpoint, backlog) close(origChan) @@ -1049,7 +1008,7 @@ func (e *endpoint) Listen(backlog int) *tcpip.Error { func (e *endpoint) startAcceptedLoop(waiterQueue *waiter.Queue) { e.waiterQueue = waiterQueue e.workerRunning = true - go e.protocolMainLoop(false) // S/R-SAFE: drained on save. + go e.protocolMainLoop(true) // S/R-FIXME } // Accept returns a new endpoint if a peer has established a connection @@ -1090,7 +1049,6 @@ func (e *endpoint) Bind(addr tcpip.FullAddress, commit func() *tcpip.Error) (ret return tcpip.ErrAlreadyBound } - e.bindAddress = addr.Addr netProto, err := e.checkV4Mapped(&addr) if err != nil { return err diff --git a/pkg/tcpip/transport/tcp/endpoint_state.go b/pkg/tcpip/transport/tcp/endpoint_state.go index 38c97c796..b1e249bff 100644 --- a/pkg/tcpip/transport/tcp/endpoint_state.go +++ b/pkg/tcpip/transport/tcp/endpoint_state.go @@ -9,7 +9,6 @@ import ( "sync" "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/header" "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" ) @@ -23,7 +22,7 @@ func (e *endpoint) drainSegmentLocked() { e.undrain = make(chan struct{}) e.mu.Unlock() - e.notifyProtocolGoroutine(notifyDrain) + e.notificationWaker.Assert() <-e.drainDone e.mu.Lock() @@ -39,98 +38,37 @@ func (e *endpoint) beforeSave() { switch e.state { case stateInitial, stateBound: - case stateListen, stateConnecting, stateConnected: - if e.state == stateConnected && !e.workerRunning { - // The endpoint must be in acceptedChan. - break + case stateListen: + if !e.segmentQueue.empty() { + e.drainSegmentLocked() } + case stateConnecting: e.drainSegmentLocked() - if e.state != stateClosed && e.state != stateError { - if !e.workerRunning { - panic("endpoint has no worker running in listen, connecting, or connected state") - } + if e.state != stateConnected { break } fallthrough + case stateConnected: + // FIXME + panic(tcpip.ErrSaveRejection{fmt.Errorf("endpoint cannot be saved in connected state: local %v:%v, remote %v:%v", e.id.LocalAddress, e.id.LocalPort, e.id.RemoteAddress, e.id.RemotePort)}) case stateClosed, stateError: if e.workerRunning { - panic("endpoint still has worker running in closed or error state") + panic(fmt.Sprintf("endpoint still has worker running in closed or error state")) } default: panic(fmt.Sprintf("endpoint in unknown state %v", e.state)) } - - if e.waiterQueue != nil && !e.waiterQueue.IsEmpty() { - panic("endpoint still has waiters upon save") - } - - if !((e.state == stateBound || e.state == stateListen) == e.isPortReserved) { - panic("endpoint port must and must only be reserved in bound or listen state") - } - - if e.acceptedChan != nil { - close(e.acceptedChan) - e.acceptedEndpoints = make([]*endpoint, len(e.acceptedChan), cap(e.acceptedChan)) - i := 0 - for ep := range e.acceptedChan { - e.acceptedEndpoints[i] = ep - i++ - } - if i != len(e.acceptedEndpoints) { - panic("endpoint acceptedChan buffer got consumed by background context") - } - } -} - -// saveState is invoked by stateify. -func (e *endpoint) saveState() endpointState { - return e.state -} - -// Endpoint loading must be done in the following ordering by their state, to -// avoid dangling connecting w/o listening peer, and to avoid conflicts in port -// reservation. -var connectedLoading sync.WaitGroup -var listenLoading sync.WaitGroup -var connectingLoading sync.WaitGroup - -// Bound endpoint loading happens last. - -// loadState is invoked by stateify. -func (e *endpoint) loadState(state endpointState) { - // This is to ensure that the loading wait groups include all applicable - // endpoints before any asynchronous calls to the Wait() methods. - switch state { - case stateConnected: - connectedLoading.Add(1) - case stateListen: - listenLoading.Add(1) - case stateConnecting: - connectingLoading.Add(1) - } - e.state = state } // afterLoad is invoked by stateify. func (e *endpoint) afterLoad() { - // We load acceptedChan buffer indirectly here. Note that closed - // endpoints might not need to allocate the channel. - // FIXME - if cap(e.acceptedEndpoints) > 0 { - e.acceptedChan = make(chan *endpoint, cap(e.acceptedEndpoints)) - for _, ep := range e.acceptedEndpoints { - e.acceptedChan <- ep - } - e.acceptedEndpoints = nil - } - e.stack = stack.StackFromEnv e.segmentQueue.setLimit(2 * e.rcvBufSize) e.workMu.Init() state := e.state switch state { - case stateInitial, stateBound, stateListen, stateConnecting, stateConnected: + case stateInitial, stateBound, stateListen, stateConnecting: var ss SendBufferSizeOption if err := e.stack.TransportProtocolOption(ProtocolNumber, &ss); err == nil { if e.sndBufSize < ss.Min || e.sndBufSize > ss.Max { @@ -142,72 +80,65 @@ func (e *endpoint) afterLoad() { } } - bind := func() { + switch state { + case stateBound, stateListen, stateConnecting: e.state = stateInitial - if len(e.bindAddress) == 0 { - e.bindAddress = e.id.LocalAddress - } - if err := e.Bind(tcpip.FullAddress{Addr: e.bindAddress, Port: e.id.LocalPort}, nil); err != nil { + if err := e.Bind(tcpip.FullAddress{Addr: e.id.LocalAddress, Port: e.id.LocalPort}, nil); err != nil { panic("endpoint binding failed: " + err.String()) } } switch state { - case stateConnected: - bind() - if len(e.connectingAddress) == 0 { - // This endpoint is accepted by netstack but not yet by - // the app. If the endpoint is IPv6 but the remote - // address is IPv4, we need to connect as IPv6 so that - // dual-stack mode can be properly activated. - if e.netProto == header.IPv6ProtocolNumber && len(e.id.RemoteAddress) != header.IPv6AddressSize { - e.connectingAddress = "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff" + e.id.RemoteAddress - } else { - e.connectingAddress = e.id.RemoteAddress - } + case stateListen: + backlog := cap(e.acceptedChan) + e.acceptedChan = nil + if err := e.Listen(backlog); err != nil { + panic("endpoint listening failed: " + err.String()) } - if err := e.connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}, false, e.workerRunning); err != tcpip.ErrConnectStarted { + } + + switch state { + case stateConnecting: + if err := e.Connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}); err != tcpip.ErrConnectStarted { panic("endpoint connecting failed: " + err.String()) } - connectedLoading.Done() - case stateListen: - tcpip.AsyncLoading.Add(1) - go func() { - connectedLoading.Wait() - bind() - backlog := cap(e.acceptedChan) - if err := e.Listen(backlog); err != nil { - panic("endpoint listening failed: " + err.String()) - } - listenLoading.Done() - tcpip.AsyncLoading.Done() - }() - case stateConnecting: - tcpip.AsyncLoading.Add(1) - go func() { - connectedLoading.Wait() - listenLoading.Wait() - bind() - if err := e.Connect(tcpip.FullAddress{NIC: e.boundNICID, Addr: e.connectingAddress, Port: e.id.RemotePort}); err != tcpip.ErrConnectStarted { - panic("endpoint connecting failed: " + err.String()) - } - connectingLoading.Done() - tcpip.AsyncLoading.Done() - }() - case stateBound: - tcpip.AsyncLoading.Add(1) - go func() { - connectedLoading.Wait() - listenLoading.Wait() - connectingLoading.Wait() - bind() - tcpip.AsyncLoading.Done() - }() - case stateClosed, stateError: - tcpip.DeleteDanglingEndpoint(e) } } +// saveAcceptedChan is invoked by stateify. +func (e *endpoint) saveAcceptedChan() endpointChan { + if e.acceptedChan == nil { + return endpointChan{} + } + close(e.acceptedChan) + buffer := make([]*endpoint, 0, len(e.acceptedChan)) + for ep := range e.acceptedChan { + buffer = append(buffer, ep) + } + if len(buffer) != cap(buffer) { + panic("endpoint.acceptedChan buffer got consumed by background context") + } + c := cap(e.acceptedChan) + e.acceptedChan = nil + return endpointChan{buffer: buffer, cap: c} +} + +// loadAcceptedChan is invoked by stateify. +func (e *endpoint) loadAcceptedChan(c endpointChan) { + if c.cap == 0 { + return + } + e.acceptedChan = make(chan *endpoint, c.cap) + for _, ep := range c.buffer { + e.acceptedChan <- ep + } +} + +type endpointChan struct { + buffer []*endpoint + cap int +} + // saveLastError is invoked by stateify. func (e *endpoint) saveLastError() string { if e.lastError == nil { diff --git a/pkg/tcpip/transport/tcp/segment.go b/pkg/tcpip/transport/tcp/segment.go index c5bff5f4f..07e4bfd73 100644 --- a/pkg/tcpip/transport/tcp/segment.go +++ b/pkg/tcpip/transport/tcp/segment.go @@ -29,9 +29,9 @@ const ( type segment struct { segmentEntry refCnt int32 - id stack.TransportEndpointID `state:"manual"` - route stack.Route `state:"manual"` - data buffer.VectorisedView `state:".(buffer.VectorisedView)"` + id stack.TransportEndpointID + route stack.Route `state:"manual"` + data buffer.VectorisedView // views is used as buffer for data when its length is large // enough to store a VectorisedView. views [8]buffer.View @@ -45,7 +45,7 @@ type segment struct { // parsedOptions stores the parsed values from the options in the segment. parsedOptions header.TCPOptions - options []byte `state:".([]byte)"` + options []byte } func newSegment(r *stack.Route, id stack.TransportEndpointID, vv *buffer.VectorisedView) *segment { diff --git a/pkg/tcpip/transport/tcp/segment_queue.go b/pkg/tcpip/transport/tcp/segment_queue.go index a5e7b2ebf..c4a7f7d5b 100644 --- a/pkg/tcpip/transport/tcp/segment_queue.go +++ b/pkg/tcpip/transport/tcp/segment_queue.go @@ -12,8 +12,8 @@ import ( // segmentQueue is a bounded, thread-safe queue of TCP segments. type segmentQueue struct { - mu sync.Mutex `state:"nosave"` - list segmentList `state:"wait"` + mu sync.Mutex + list segmentList limit int used int } diff --git a/pkg/tcpip/transport/tcp/segment_state.go b/pkg/tcpip/transport/tcp/segment_state.go deleted file mode 100644 index e5243200b..000000000 --- a/pkg/tcpip/transport/tcp/segment_state.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2018 The Netstack Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package tcp - -import ( - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" -) - -// saveData is invoked by stateify. -func (s *segment) saveData() buffer.VectorisedView { - // We cannot save s.data directly as s.data.views may alias to s.views, - // which is not allowed by state framework (in-struct pointer). - return s.data.Clone(nil) -} - -// loadData is invoked by stateify. -func (s *segment) loadData(data buffer.VectorisedView) { - // NOTE: We cannot do the s.data = data.Clone(s.views[:]) optimization - // here because data.views is not guaranteed to be loaded by now. Plus, - // data.views will be allocated anyway so there really is little point - // of utilizing s.views for data.views. - s.data = data -} - -// saveOptions is invoked by stateify. -func (s *segment) saveOptions() []byte { - // We cannot save s.options directly as it may point to s.data's trimmed - // tail, which is not allowed by state framework (in-struct pointer). - b := make([]byte, 0, cap(s.options)) - return append(b, s.options...) -} - -// loadOptions is invoked by stateify. -func (s *segment) loadOptions(options []byte) { - // NOTE: We cannot point s.options back into s.data's trimmed tail. But - // it is OK as they do not need to aliased. Plus, options is already - // allocated so there is no cost here. - s.options = options -} diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go index a98aca293..95bea4d88 100644 --- a/pkg/tcpip/transport/tcp/snd.go +++ b/pkg/tcpip/transport/tcp/snd.go @@ -28,7 +28,7 @@ type sender struct { ep *endpoint // lastSendTime is the timestamp when the last packet was sent. - lastSendTime time.Time `state:".(unixTime)"` + lastSendTime time.Time // dupAckCount is the number of duplicated acks received. It is used for // fast retransmit. @@ -71,7 +71,7 @@ type sender struct { rttMeasureSeqNum seqnum.Value // rttMeasureTime is the time when the rttMeasureSeqNum was sent. - rttMeasureTime time.Time `state:".(unixTime)"` + rttMeasureTime time.Time closed bool writeNext *segment diff --git a/pkg/tcpip/transport/tcp/snd_state.go b/pkg/tcpip/transport/tcp/snd_state.go deleted file mode 100644 index d68773a7c..000000000 --- a/pkg/tcpip/transport/tcp/snd_state.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2018 The Netstack Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package tcp - -import ( - "time" -) - -type unixTime struct { - second int64 - nano int64 -} - -// saveLastSendTime is invoked by stateify. -func (s *sender) saveLastSendTime() unixTime { - return unixTime{s.lastSendTime.Unix(), s.lastSendTime.UnixNano()} -} - -// loadLastSendTime is invoked by stateify. -func (s *sender) loadLastSendTime(unix unixTime) { - s.lastSendTime = time.Unix(unix.second, unix.nano) -} - -// saveRttMeasureTime is invoked by stateify. -func (s *sender) saveRttMeasureTime() unixTime { - return unixTime{s.rttMeasureTime.Unix(), s.rttMeasureTime.UnixNano()} -} - -// loadRttMeasureTime is invoked by stateify. -func (s *sender) loadRttMeasureTime(unix unixTime) { - s.rttMeasureTime = time.Unix(unix.second, unix.nano) -} - -// afterLoad is invoked by stateify. -func (s *sender) afterLoad() { - s.resendTimer.init(&s.resendWaker) -} |