From 035f7434e978f3f246ae05e9c748e8ca7d8d7fd1 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Wed, 11 Mar 2020 21:12:41 -0700 Subject: Use a heap in transport demuxer ...instead of sorting at various times. Plug a memory leak by setting removed elements to nil. PiperOrigin-RevId: 300471087 --- pkg/tcpip/stack/transport_demuxer.go | 155 +++++++++++++++--------------- pkg/tcpip/stack/transport_demuxer_test.go | 14 ++- 2 files changed, 89 insertions(+), 80 deletions(-) (limited to 'pkg/tcpip') diff --git a/pkg/tcpip/stack/transport_demuxer.go b/pkg/tcpip/stack/transport_demuxer.go index 778c0a4d6..ff1845bfb 100644 --- a/pkg/tcpip/stack/transport_demuxer.go +++ b/pkg/tcpip/stack/transport_demuxer.go @@ -15,9 +15,9 @@ package stack import ( + "container/heap" "fmt" "math/rand" - "sort" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" @@ -141,16 +141,17 @@ func (epsByNic *endpointsByNic) registerEndpoint(d *transportDemuxer, netProto t epsByNic.mu.Lock() defer epsByNic.mu.Unlock() - if multiPortEp, ok := epsByNic.endpoints[bindToDevice]; ok { - // There was already a bind. - return multiPortEp.singleRegisterEndpoint(t, reusePort) + multiPortEp, ok := epsByNic.endpoints[bindToDevice] + if !ok { + multiPortEp = &multiPortEndpoint{ + demux: d, + netProto: netProto, + transProto: transProto, + reuse: reusePort, + } + epsByNic.endpoints[bindToDevice] = multiPortEp } - // This is a new binding. - multiPortEp := &multiPortEndpoint{demux: d, netProto: netProto, transProto: transProto} - multiPortEp.endpointsMap = make(map[TransportEndpoint]int) - multiPortEp.reuse = reusePort - epsByNic.endpoints[bindToDevice] = multiPortEp return multiPortEp.singleRegisterEndpoint(t, reusePort) } @@ -222,6 +223,35 @@ func (d *transportDemuxer) registerEndpoint(netProtos []tcpip.NetworkProtocolNum return nil } +type transportEndpointHeap []TransportEndpoint + +var _ heap.Interface = (*transportEndpointHeap)(nil) + +func (h *transportEndpointHeap) Len() int { + return len(*h) +} + +func (h *transportEndpointHeap) Less(i, j int) bool { + return (*h)[i].UniqueID() < (*h)[j].UniqueID() +} + +func (h *transportEndpointHeap) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} + +func (h *transportEndpointHeap) Push(x interface{}) { + *h = append(*h, x.(TransportEndpoint)) +} + +func (h *transportEndpointHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil + *h = old[:n-1] + return x +} + // multiPortEndpoint is a container for TransportEndpoints which are bound to // the same pair of address and port. endpointsArr always has at least one // element. @@ -237,15 +267,14 @@ type multiPortEndpoint struct { netProto tcpip.NetworkProtocolNumber transProto tcpip.TransportProtocolNumber - endpointsArr []TransportEndpoint - endpointsMap map[TransportEndpoint]int + endpoints transportEndpointHeap // reuse indicates if more than one endpoint is allowed. reuse bool } func (ep *multiPortEndpoint) transportEndpoints() []TransportEndpoint { ep.mu.RLock() - eps := append([]TransportEndpoint(nil), ep.endpointsArr...) + eps := append([]TransportEndpoint(nil), ep.endpoints...) ep.mu.RUnlock() return eps } @@ -262,8 +291,8 @@ func reciprocalScale(val, n uint32) uint32 { // ports then uses it to select a socket. In this case, all packets from one // address will be sent to same endpoint. func selectEndpoint(id TransportEndpointID, mpep *multiPortEndpoint, seed uint32) TransportEndpoint { - if len(mpep.endpointsArr) == 1 { - return mpep.endpointsArr[0] + if len(mpep.endpoints) == 1 { + return mpep.endpoints[0] } payload := []byte{ @@ -279,29 +308,26 @@ func selectEndpoint(id TransportEndpointID, mpep *multiPortEndpoint, seed uint32 h.Write([]byte(id.RemoteAddress)) hash := h.Sum32() - idx := reciprocalScale(hash, uint32(len(mpep.endpointsArr))) - return mpep.endpointsArr[idx] + idx := reciprocalScale(hash, uint32(len(mpep.endpoints))) + return mpep.endpoints[idx] } func (ep *multiPortEndpoint) handlePacketAll(r *Route, id TransportEndpointID, pkt tcpip.PacketBuffer) { ep.mu.RLock() queuedProtocol, mustQueue := ep.demux.queuedProtocols[protocolIDs{ep.netProto, ep.transProto}] - for i, endpoint := range ep.endpointsArr { - // HandlePacket takes ownership of pkt, so each endpoint needs - // its own copy except for the final one. - if i == len(ep.endpointsArr)-1 { - if mustQueue { - queuedProtocol.QueuePacket(r, endpoint, id, pkt) - break - } - endpoint.HandlePacket(r, id, pkt) - break - } + // HandlePacket takes ownership of pkt, so each endpoint needs + // its own copy except for the final one. + for _, endpoint := range ep.endpoints[:len(ep.endpoints)-1] { if mustQueue { queuedProtocol.QueuePacket(r, endpoint, id, pkt.Clone()) - continue + } else { + endpoint.HandlePacket(r, id, pkt.Clone()) } - endpoint.HandlePacket(r, id, pkt.Clone()) + } + if endpoint := ep.endpoints[len(ep.endpoints)-1]; mustQueue { + queuedProtocol.QueuePacket(r, endpoint, id, pkt) + } else { + endpoint.HandlePacket(r, id, pkt) } ep.mu.RUnlock() // Don't use defer for performance reasons. } @@ -312,26 +338,15 @@ func (ep *multiPortEndpoint) singleRegisterEndpoint(t TransportEndpoint, reusePo ep.mu.Lock() defer ep.mu.Unlock() - if len(ep.endpointsArr) > 0 { + if len(ep.endpoints) != 0 { // If it was previously bound, we need to check if we can bind again. if !ep.reuse || !reusePort { return tcpip.ErrPortInUse } } - // A new endpoint is added into endpointsArr and its index there is saved in - // endpointsMap. This will allow us to remove endpoint from the array fast. - ep.endpointsMap[t] = len(ep.endpointsArr) - ep.endpointsArr = append(ep.endpointsArr, t) + heap.Push(&ep.endpoints, t) - // ep.endpointsArr is sorted by endpoint unique IDs, so that endpoints - // can be restored in the same order. - sort.Slice(ep.endpointsArr, func(i, j int) bool { - return ep.endpointsArr[i].UniqueID() < ep.endpointsArr[j].UniqueID() - }) - for i, e := range ep.endpointsArr { - ep.endpointsMap[e] = i - } return nil } @@ -340,21 +355,13 @@ func (ep *multiPortEndpoint) unregisterEndpoint(t TransportEndpoint) bool { ep.mu.Lock() defer ep.mu.Unlock() - idx, ok := ep.endpointsMap[t] - if !ok { - return false - } - delete(ep.endpointsMap, t) - l := len(ep.endpointsArr) - if l > 1 { - // The last endpoint in endpointsArr is moved instead of the deleted one. - lastEp := ep.endpointsArr[l-1] - ep.endpointsArr[idx] = lastEp - ep.endpointsMap[lastEp] = idx - ep.endpointsArr = ep.endpointsArr[0 : l-1] - return false + for i, endpoint := range ep.endpoints { + if endpoint == t { + heap.Remove(&ep.endpoints, i) + break + } } - return true + return len(ep.endpoints) == 0 } func (d *transportDemuxer) singleRegisterEndpoint(netProto tcpip.NetworkProtocolNumber, protocol tcpip.TransportProtocolNumber, id TransportEndpointID, ep TransportEndpoint, reusePort bool, bindToDevice tcpip.NICID) *tcpip.Error { @@ -371,17 +378,14 @@ func (d *transportDemuxer) singleRegisterEndpoint(netProto tcpip.NetworkProtocol eps.mu.Lock() defer eps.mu.Unlock() - if epsByNic, ok := eps.endpoints[id]; ok { - // There was already a binding. - return epsByNic.registerEndpoint(d, netProto, protocol, ep, reusePort, bindToDevice) - } - - // This is a new binding. - epsByNic := &endpointsByNic{ - endpoints: make(map[tcpip.NICID]*multiPortEndpoint), - seed: rand.Uint32(), + epsByNic, ok := eps.endpoints[id] + if !ok { + epsByNic = &endpointsByNic{ + endpoints: make(map[tcpip.NICID]*multiPortEndpoint), + seed: rand.Uint32(), + } + eps.endpoints[id] = epsByNic } - eps.endpoints[id] = epsByNic return epsByNic.registerEndpoint(d, netProto, protocol, ep, reusePort, bindToDevice) } @@ -396,14 +400,6 @@ func (d *transportDemuxer) unregisterEndpoint(netProtos []tcpip.NetworkProtocolN } } -var loopbackSubnet = func() tcpip.Subnet { - sn, err := tcpip.NewSubnet("\x7f\x00\x00\x00", "\xff\x00\x00\x00") - if err != nil { - panic(err) - } - return sn -}() - // deliverPacket attempts to find one or more matching transport endpoints, and // then, if matches are found, delivers the packet to them. Returns true if // the packet no longer needs to be handled. @@ -601,8 +597,8 @@ func (d *transportDemuxer) registerRawEndpoint(netProto tcpip.NetworkProtocolNum } eps.mu.Lock() - defer eps.mu.Unlock() eps.rawEndpoints = append(eps.rawEndpoints, ep) + eps.mu.Unlock() return nil } @@ -616,13 +612,16 @@ func (d *transportDemuxer) unregisterRawEndpoint(netProto tcpip.NetworkProtocolN } eps.mu.Lock() - defer eps.mu.Unlock() for i, rawEP := range eps.rawEndpoints { if rawEP == ep { - eps.rawEndpoints = append(eps.rawEndpoints[:i], eps.rawEndpoints[i+1:]...) - return + lastIdx := len(eps.rawEndpoints) - 1 + eps.rawEndpoints[i] = eps.rawEndpoints[lastIdx] + eps.rawEndpoints[lastIdx] = nil + eps.rawEndpoints = eps.rawEndpoints[:lastIdx] + break } } + eps.mu.Unlock() } func isMulticastOrBroadcast(addr tcpip.Address) bool { diff --git a/pkg/tcpip/stack/transport_demuxer_test.go b/pkg/tcpip/stack/transport_demuxer_test.go index 5e9237de9..0e3e239c5 100644 --- a/pkg/tcpip/stack/transport_demuxer_test.go +++ b/pkg/tcpip/stack/transport_demuxer_test.go @@ -167,8 +167,18 @@ func TestTransportDemuxerRegister(t *testing.T) { t.Run(test.name, func(t *testing.T) { s := stack.New(stack.Options{ NetworkProtocols: []stack.NetworkProtocol{ipv4.NewProtocol()}, - TransportProtocols: []stack.TransportProtocol{udp.NewProtocol()}}) - if got, want := s.RegisterTransportEndpoint(0, []tcpip.NetworkProtocolNumber{test.proto}, udp.ProtocolNumber, stack.TransportEndpointID{}, nil, false, 0), test.want; got != want { + TransportProtocols: []stack.TransportProtocol{udp.NewProtocol()}, + }) + var wq waiter.Queue + ep, err := s.NewEndpoint(udp.ProtocolNumber, ipv4.ProtocolNumber, &wq) + if err != nil { + t.Fatal(err) + } + tEP, ok := ep.(stack.TransportEndpoint) + if !ok { + t.Fatalf("%T does not implement stack.TransportEndpoint", ep) + } + if got, want := s.RegisterTransportEndpoint(0, []tcpip.NetworkProtocolNumber{test.proto}, udp.ProtocolNumber, stack.TransportEndpointID{}, tEP, false, 0), test.want; got != want { t.Fatalf("s.RegisterTransportEndpoint(...) = %v, want %v", got, want) } }) -- cgit v1.2.3