diff options
author | Ian Gudger <igudger@google.com> | 2020-02-24 10:31:01 -0800 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2020-02-24 10:32:17 -0800 |
commit | c37b196455e8b3816298e3eea98e4ee2dab8d368 (patch) | |
tree | ff6311a7455fa9de89be60b1bc2a7bbc52402465 /pkg/tcpip/transport/tcp/dispatcher.go | |
parent | b8f56c79be40d9c75f4e2f279c9d821d1c1c3569 (diff) |
Add support for tearing down protocol dispatchers and TIME_WAIT endpoints.
Protocol dispatchers were previously leaked. Bypassing TIME_WAIT is required to
test this change.
Also fix a race when a socket in SYN-RCVD is closed. This is also required to
test this change.
PiperOrigin-RevId: 296922548
Diffstat (limited to 'pkg/tcpip/transport/tcp/dispatcher.go')
-rw-r--r-- | pkg/tcpip/transport/tcp/dispatcher.go | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/pkg/tcpip/transport/tcp/dispatcher.go b/pkg/tcpip/transport/tcp/dispatcher.go index e18012ac0..d792b07d6 100644 --- a/pkg/tcpip/transport/tcp/dispatcher.go +++ b/pkg/tcpip/transport/tcp/dispatcher.go @@ -68,17 +68,28 @@ func (q *epQueue) empty() bool { type processor struct { epQ epQueue newEndpointWaker sleep.Waker + closeWaker sleep.Waker id int + wg sync.WaitGroup } func newProcessor(id int) *processor { p := &processor{ id: id, } + p.wg.Add(1) go p.handleSegments() return p } +func (p *processor) close() { + p.closeWaker.Assert() +} + +func (p *processor) wait() { + p.wg.Wait() +} + func (p *processor) queueEndpoint(ep *endpoint) { // Queue an endpoint for processing by the processor goroutine. p.epQ.enqueue(ep) @@ -87,11 +98,17 @@ func (p *processor) queueEndpoint(ep *endpoint) { func (p *processor) handleSegments() { const newEndpointWaker = 1 + const closeWaker = 2 s := sleep.Sleeper{} s.AddWaker(&p.newEndpointWaker, newEndpointWaker) + s.AddWaker(&p.closeWaker, closeWaker) defer s.Done() for { - s.Fetch(true) + id, ok := s.Fetch(true) + if ok && id == closeWaker { + p.wg.Done() + return + } for ep := p.epQ.dequeue(); ep != nil; ep = p.epQ.dequeue() { if ep.segmentQueue.empty() { continue @@ -160,6 +177,18 @@ func newDispatcher(nProcessors int) *dispatcher { } } +func (d *dispatcher) close() { + for _, p := range d.processors { + p.close() + } +} + +func (d *dispatcher) wait() { + for _, p := range d.processors { + p.wait() + } +} + func (d *dispatcher) queuePacket(r *stack.Route, stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt tcpip.PacketBuffer) { ep := stackEP.(*endpoint) s := newSegment(r, id, pkt) |