summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorTamir Duberstein <tamird@google.com>2020-03-11 21:12:41 -0700
committergVisor bot <gvisor-bot@google.com>2020-03-11 21:13:46 -0700
commit035f7434e978f3f246ae05e9c748e8ca7d8d7fd1 (patch)
tree6b54dd0ea4c150946962f5cb590824f8664a5f0e /pkg
parentac05043525a058e22a5db422e5f8d344df21fa55 (diff)
Use a heap in transport demuxer
...instead of sorting at various times. Plug a memory leak by setting removed elements to nil. PiperOrigin-RevId: 300471087
Diffstat (limited to 'pkg')
-rw-r--r--pkg/tcpip/stack/transport_demuxer.go155
-rw-r--r--pkg/tcpip/stack/transport_demuxer_test.go14
2 files changed, 89 insertions, 80 deletions
diff --git a/pkg/tcpip/stack/transport_demuxer.go b/pkg/tcpip/stack/transport_demuxer.go
index 778c0a4d6..ff1845bfb 100644
--- a/pkg/tcpip/stack/transport_demuxer.go
+++ b/pkg/tcpip/stack/transport_demuxer.go
@@ -15,9 +15,9 @@
package stack
import (
+ "container/heap"
"fmt"
"math/rand"
- "sort"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -141,16 +141,17 @@ func (epsByNic *endpointsByNic) registerEndpoint(d *transportDemuxer, netProto t
epsByNic.mu.Lock()
defer epsByNic.mu.Unlock()
- if multiPortEp, ok := epsByNic.endpoints[bindToDevice]; ok {
- // There was already a bind.
- return multiPortEp.singleRegisterEndpoint(t, reusePort)
+ multiPortEp, ok := epsByNic.endpoints[bindToDevice]
+ if !ok {
+ multiPortEp = &multiPortEndpoint{
+ demux: d,
+ netProto: netProto,
+ transProto: transProto,
+ reuse: reusePort,
+ }
+ epsByNic.endpoints[bindToDevice] = multiPortEp
}
- // This is a new binding.
- multiPortEp := &multiPortEndpoint{demux: d, netProto: netProto, transProto: transProto}
- multiPortEp.endpointsMap = make(map[TransportEndpoint]int)
- multiPortEp.reuse = reusePort
- epsByNic.endpoints[bindToDevice] = multiPortEp
return multiPortEp.singleRegisterEndpoint(t, reusePort)
}
@@ -222,6 +223,35 @@ func (d *transportDemuxer) registerEndpoint(netProtos []tcpip.NetworkProtocolNum
return nil
}
+type transportEndpointHeap []TransportEndpoint
+
+var _ heap.Interface = (*transportEndpointHeap)(nil)
+
+func (h *transportEndpointHeap) Len() int {
+ return len(*h)
+}
+
+func (h *transportEndpointHeap) Less(i, j int) bool {
+ return (*h)[i].UniqueID() < (*h)[j].UniqueID()
+}
+
+func (h *transportEndpointHeap) Swap(i, j int) {
+ (*h)[i], (*h)[j] = (*h)[j], (*h)[i]
+}
+
+func (h *transportEndpointHeap) Push(x interface{}) {
+ *h = append(*h, x.(TransportEndpoint))
+}
+
+func (h *transportEndpointHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ old[n-1] = nil
+ *h = old[:n-1]
+ return x
+}
+
// multiPortEndpoint is a container for TransportEndpoints which are bound to
// the same pair of address and port. endpointsArr always has at least one
// element.
@@ -237,15 +267,14 @@ type multiPortEndpoint struct {
netProto tcpip.NetworkProtocolNumber
transProto tcpip.TransportProtocolNumber
- endpointsArr []TransportEndpoint
- endpointsMap map[TransportEndpoint]int
+ endpoints transportEndpointHeap
// reuse indicates if more than one endpoint is allowed.
reuse bool
}
func (ep *multiPortEndpoint) transportEndpoints() []TransportEndpoint {
ep.mu.RLock()
- eps := append([]TransportEndpoint(nil), ep.endpointsArr...)
+ eps := append([]TransportEndpoint(nil), ep.endpoints...)
ep.mu.RUnlock()
return eps
}
@@ -262,8 +291,8 @@ func reciprocalScale(val, n uint32) uint32 {
// ports then uses it to select a socket. In this case, all packets from one
// address will be sent to same endpoint.
func selectEndpoint(id TransportEndpointID, mpep *multiPortEndpoint, seed uint32) TransportEndpoint {
- if len(mpep.endpointsArr) == 1 {
- return mpep.endpointsArr[0]
+ if len(mpep.endpoints) == 1 {
+ return mpep.endpoints[0]
}
payload := []byte{
@@ -279,29 +308,26 @@ func selectEndpoint(id TransportEndpointID, mpep *multiPortEndpoint, seed uint32
h.Write([]byte(id.RemoteAddress))
hash := h.Sum32()
- idx := reciprocalScale(hash, uint32(len(mpep.endpointsArr)))
- return mpep.endpointsArr[idx]
+ idx := reciprocalScale(hash, uint32(len(mpep.endpoints)))
+ return mpep.endpoints[idx]
}
func (ep *multiPortEndpoint) handlePacketAll(r *Route, id TransportEndpointID, pkt tcpip.PacketBuffer) {
ep.mu.RLock()
queuedProtocol, mustQueue := ep.demux.queuedProtocols[protocolIDs{ep.netProto, ep.transProto}]
- for i, endpoint := range ep.endpointsArr {
- // HandlePacket takes ownership of pkt, so each endpoint needs
- // its own copy except for the final one.
- if i == len(ep.endpointsArr)-1 {
- if mustQueue {
- queuedProtocol.QueuePacket(r, endpoint, id, pkt)
- break
- }
- endpoint.HandlePacket(r, id, pkt)
- break
- }
+ // HandlePacket takes ownership of pkt, so each endpoint needs
+ // its own copy except for the final one.
+ for _, endpoint := range ep.endpoints[:len(ep.endpoints)-1] {
if mustQueue {
queuedProtocol.QueuePacket(r, endpoint, id, pkt.Clone())
- continue
+ } else {
+ endpoint.HandlePacket(r, id, pkt.Clone())
}
- endpoint.HandlePacket(r, id, pkt.Clone())
+ }
+ if endpoint := ep.endpoints[len(ep.endpoints)-1]; mustQueue {
+ queuedProtocol.QueuePacket(r, endpoint, id, pkt)
+ } else {
+ endpoint.HandlePacket(r, id, pkt)
}
ep.mu.RUnlock() // Don't use defer for performance reasons.
}
@@ -312,26 +338,15 @@ func (ep *multiPortEndpoint) singleRegisterEndpoint(t TransportEndpoint, reusePo
ep.mu.Lock()
defer ep.mu.Unlock()
- if len(ep.endpointsArr) > 0 {
+ if len(ep.endpoints) != 0 {
// If it was previously bound, we need to check if we can bind again.
if !ep.reuse || !reusePort {
return tcpip.ErrPortInUse
}
}
- // A new endpoint is added into endpointsArr and its index there is saved in
- // endpointsMap. This will allow us to remove endpoint from the array fast.
- ep.endpointsMap[t] = len(ep.endpointsArr)
- ep.endpointsArr = append(ep.endpointsArr, t)
+ heap.Push(&ep.endpoints, t)
- // ep.endpointsArr is sorted by endpoint unique IDs, so that endpoints
- // can be restored in the same order.
- sort.Slice(ep.endpointsArr, func(i, j int) bool {
- return ep.endpointsArr[i].UniqueID() < ep.endpointsArr[j].UniqueID()
- })
- for i, e := range ep.endpointsArr {
- ep.endpointsMap[e] = i
- }
return nil
}
@@ -340,21 +355,13 @@ func (ep *multiPortEndpoint) unregisterEndpoint(t TransportEndpoint) bool {
ep.mu.Lock()
defer ep.mu.Unlock()
- idx, ok := ep.endpointsMap[t]
- if !ok {
- return false
- }
- delete(ep.endpointsMap, t)
- l := len(ep.endpointsArr)
- if l > 1 {
- // The last endpoint in endpointsArr is moved instead of the deleted one.
- lastEp := ep.endpointsArr[l-1]
- ep.endpointsArr[idx] = lastEp
- ep.endpointsMap[lastEp] = idx
- ep.endpointsArr = ep.endpointsArr[0 : l-1]
- return false
+ for i, endpoint := range ep.endpoints {
+ if endpoint == t {
+ heap.Remove(&ep.endpoints, i)
+ break
+ }
}
- return true
+ return len(ep.endpoints) == 0
}
func (d *transportDemuxer) singleRegisterEndpoint(netProto tcpip.NetworkProtocolNumber, protocol tcpip.TransportProtocolNumber, id TransportEndpointID, ep TransportEndpoint, reusePort bool, bindToDevice tcpip.NICID) *tcpip.Error {
@@ -371,17 +378,14 @@ func (d *transportDemuxer) singleRegisterEndpoint(netProto tcpip.NetworkProtocol
eps.mu.Lock()
defer eps.mu.Unlock()
- if epsByNic, ok := eps.endpoints[id]; ok {
- // There was already a binding.
- return epsByNic.registerEndpoint(d, netProto, protocol, ep, reusePort, bindToDevice)
- }
-
- // This is a new binding.
- epsByNic := &endpointsByNic{
- endpoints: make(map[tcpip.NICID]*multiPortEndpoint),
- seed: rand.Uint32(),
+ epsByNic, ok := eps.endpoints[id]
+ if !ok {
+ epsByNic = &endpointsByNic{
+ endpoints: make(map[tcpip.NICID]*multiPortEndpoint),
+ seed: rand.Uint32(),
+ }
+ eps.endpoints[id] = epsByNic
}
- eps.endpoints[id] = epsByNic
return epsByNic.registerEndpoint(d, netProto, protocol, ep, reusePort, bindToDevice)
}
@@ -396,14 +400,6 @@ func (d *transportDemuxer) unregisterEndpoint(netProtos []tcpip.NetworkProtocolN
}
}
-var loopbackSubnet = func() tcpip.Subnet {
- sn, err := tcpip.NewSubnet("\x7f\x00\x00\x00", "\xff\x00\x00\x00")
- if err != nil {
- panic(err)
- }
- return sn
-}()
-
// deliverPacket attempts to find one or more matching transport endpoints, and
// then, if matches are found, delivers the packet to them. Returns true if
// the packet no longer needs to be handled.
@@ -601,8 +597,8 @@ func (d *transportDemuxer) registerRawEndpoint(netProto tcpip.NetworkProtocolNum
}
eps.mu.Lock()
- defer eps.mu.Unlock()
eps.rawEndpoints = append(eps.rawEndpoints, ep)
+ eps.mu.Unlock()
return nil
}
@@ -616,13 +612,16 @@ func (d *transportDemuxer) unregisterRawEndpoint(netProto tcpip.NetworkProtocolN
}
eps.mu.Lock()
- defer eps.mu.Unlock()
for i, rawEP := range eps.rawEndpoints {
if rawEP == ep {
- eps.rawEndpoints = append(eps.rawEndpoints[:i], eps.rawEndpoints[i+1:]...)
- return
+ lastIdx := len(eps.rawEndpoints) - 1
+ eps.rawEndpoints[i] = eps.rawEndpoints[lastIdx]
+ eps.rawEndpoints[lastIdx] = nil
+ eps.rawEndpoints = eps.rawEndpoints[:lastIdx]
+ break
}
}
+ eps.mu.Unlock()
}
func isMulticastOrBroadcast(addr tcpip.Address) bool {
diff --git a/pkg/tcpip/stack/transport_demuxer_test.go b/pkg/tcpip/stack/transport_demuxer_test.go
index 5e9237de9..0e3e239c5 100644
--- a/pkg/tcpip/stack/transport_demuxer_test.go
+++ b/pkg/tcpip/stack/transport_demuxer_test.go
@@ -167,8 +167,18 @@ func TestTransportDemuxerRegister(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
s := stack.New(stack.Options{
NetworkProtocols: []stack.NetworkProtocol{ipv4.NewProtocol()},
- TransportProtocols: []stack.TransportProtocol{udp.NewProtocol()}})
- if got, want := s.RegisterTransportEndpoint(0, []tcpip.NetworkProtocolNumber{test.proto}, udp.ProtocolNumber, stack.TransportEndpointID{}, nil, false, 0), test.want; got != want {
+ TransportProtocols: []stack.TransportProtocol{udp.NewProtocol()},
+ })
+ var wq waiter.Queue
+ ep, err := s.NewEndpoint(udp.ProtocolNumber, ipv4.ProtocolNumber, &wq)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tEP, ok := ep.(stack.TransportEndpoint)
+ if !ok {
+ t.Fatalf("%T does not implement stack.TransportEndpoint", ep)
+ }
+ if got, want := s.RegisterTransportEndpoint(0, []tcpip.NetworkProtocolNumber{test.proto}, udp.ProtocolNumber, stack.TransportEndpointID{}, tEP, false, 0), test.want; got != want {
t.Fatalf("s.RegisterTransportEndpoint(...) = %v, want %v", got, want)
}
})