// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tcp import ( "encoding/binary" "gvisor.dev/gvisor/pkg/rand" "gvisor.dev/gvisor/pkg/sleep" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/stack" ) // epQueue is a queue of endpoints. type epQueue struct { mu sync.Mutex list endpointList } // enqueue adds e to the queue if the endpoint is not already on the queue. func (q *epQueue) enqueue(e *endpoint) { q.mu.Lock() if e.pendingProcessing { q.mu.Unlock() return } q.list.PushBack(e) e.pendingProcessing = true q.mu.Unlock() } // dequeue removes and returns the first element from the queue if available, // returns nil otherwise. func (q *epQueue) dequeue() *endpoint { q.mu.Lock() if e := q.list.Front(); e != nil { q.list.Remove(e) e.pendingProcessing = false q.mu.Unlock() return e } q.mu.Unlock() return nil } // empty returns true if the queue is empty, false otherwise. func (q *epQueue) empty() bool { q.mu.Lock() v := q.list.Empty() q.mu.Unlock() return v } // processor is responsible for processing packets queued to a tcp endpoint. type processor struct { epQ epQueue sleeper sleep.Sleeper newEndpointWaker sleep.Waker closeWaker sleep.Waker } func (p *processor) close() { p.closeWaker.Assert() } func (p *processor) queueEndpoint(ep *endpoint) { // Queue an endpoint for processing by the processor goroutine. p.epQ.enqueue(ep) p.newEndpointWaker.Assert() } const ( newEndpointWaker = 1 closeWaker = 2 ) func (p *processor) start(wg *sync.WaitGroup) { defer wg.Done() defer p.sleeper.Done() for { if id, _ := p.sleeper.Fetch(true); id == closeWaker { break } for { ep := p.epQ.dequeue() if ep == nil { break } if ep.segmentQueue.empty() { continue } // If socket has transitioned out of connected state then just let the // worker handle the packet. // // NOTE: We read this outside of e.mu lock which means that by the time // we get to handleSegments the endpoint may not be in ESTABLISHED. But // this should be fine as all normal shutdown states are handled by // handleSegments and if the endpoint moves to a CLOSED/ERROR state // then handleSegments is a noop. if ep.EndpointState() == StateEstablished && ep.mu.TryLock() { // If the endpoint is in a connected state then we do direct delivery // to ensure low latency and avoid scheduler interactions. switch err := ep.handleSegments(true /* fastPath */); { case err != nil: // Send any active resets if required. ep.resetConnectionLocked(err) fallthrough case ep.EndpointState() == StateClose: ep.notifyProtocolGoroutine(notifyTickleWorker) case !ep.segmentQueue.empty(): p.epQ.enqueue(ep) } ep.mu.Unlock() } else { ep.newSegmentWaker.Assert() } } } } // dispatcher manages a pool of TCP endpoint processors which are responsible // for the processing of inbound segments. This fixed pool of processor // goroutines do full tcp processing. The processor is selected based on the // hash of the endpoint id to ensure that delivery for the same endpoint happens // in-order. type dispatcher struct { processors []processor seed uint32 wg sync.WaitGroup } func (d *dispatcher) init(nProcessors int) { d.close() d.wait() d.processors = make([]processor, nProcessors) d.seed = generateRandUint32() for i := range d.processors { p := &d.processors[i] p.sleeper.AddWaker(&p.newEndpointWaker, newEndpointWaker) p.sleeper.AddWaker(&p.closeWaker, closeWaker) d.wg.Add(1) // NB: sleeper-waker registration must happen synchronously to avoid races // with `close`. It's possible to pull all this logic into `start`, but // that results in a heap-allocated function literal. go p.start(&d.wg) } } func (d *dispatcher) close() { for i := range d.processors { d.processors[i].close() } } func (d *dispatcher) wait() { d.wg.Wait() } func (d *dispatcher) queuePacket(stackEP stack.TransportEndpoint, id stack.TransportEndpointID, pkt *stack.PacketBuffer) { ep := stackEP.(*endpoint) s := newIncomingSegment(id, pkt) if !s.parse(pkt.RXTransportChecksumValidated) { ep.stack.Stats().MalformedRcvdPackets.Increment() ep.stack.Stats().TCP.InvalidSegmentsReceived.Increment() ep.stats.ReceiveErrors.MalformedPacketsReceived.Increment() s.decRef() return } if !s.csumValid { ep.stack.Stats().MalformedRcvdPackets.Increment() ep.stack.Stats().TCP.ChecksumErrors.Increment() ep.stats.ReceiveErrors.ChecksumErrors.Increment() s.decRef() return } ep.stack.Stats().TCP.ValidSegmentsReceived.Increment() ep.stats.SegmentsReceived.Increment() if (s.flags & header.TCPFlagRst) != 0 { ep.stack.Stats().TCP.ResetsReceived.Increment() } if !ep.enqueueSegment(s) { s.decRef() return } // For sockets not in established state let the worker goroutine // handle the packets. if ep.EndpointState() != StateEstablished { ep.newSegmentWaker.Assert() return } d.selectProcessor(id).queueEndpoint(ep) } func generateRandUint32() uint32 { b := make([]byte, 4) if _, err := rand.Read(b); err != nil { panic(err) } return binary.LittleEndian.Uint32(b) } func (d *dispatcher) selectProcessor(id stack.TransportEndpointID) *processor { var payload [4]byte binary.LittleEndian.PutUint16(payload[0:], id.LocalPort) binary.LittleEndian.PutUint16(payload[2:], id.RemotePort) h := jenkins.Sum32(d.seed) h.Write(payload[:]) h.Write([]byte(id.LocalAddress)) h.Write([]byte(id.RemoteAddress)) return &d.processors[h.Sum32()%uint32(len(d.processors))] }