summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r--pkg/tcpip/link/channel/channel.go12
-rw-r--r--pkg/tcpip/link/fdbased/mmap.go1
-rw-r--r--pkg/tcpip/link/fdbased/packet_dispatchers.go2
-rw-r--r--pkg/tcpip/link/loopback/loopback.go1
-rw-r--r--pkg/tcpip/link/pipe/pipe.go6
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go4
-rw-r--r--pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go8
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem.go1
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_server.go4
-rw-r--r--pkg/tcpip/link/sniffer/sniffer.go1
-rw-r--r--pkg/tcpip/link/tun/device.go2
11 files changed, 33 insertions, 9 deletions
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go
index 658557d62..270fa8c79 100644
--- a/pkg/tcpip/link/channel/channel.go
+++ b/pkg/tcpip/link/channel/channel.go
@@ -81,6 +81,18 @@ func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) {
}
func (q *queue) Write(p PacketInfo) bool {
+ // q holds the PacketBuffer.
+
+ // Ideally, Write() should take a reference here, since it is adding
+ // the underlying PacketBuffer to the channel. However, in practice,
+ // calls to Read() are not necessarily symetric with calls
+ // to Write() (e.g writing to this endpoint and then exiting). This
+ // causes tests and analyzers to detect erroneous "leaks" for expected
+ // behavior. To prevent this, we allow the refcount to go to zero, and
+ // make a call to PreserveObject(), which prevents the PacketBuffer
+ // pooling implementation from reclaiming this instance, even when
+ // the refcount goes to zero.
+ p.Pkt.PreserveObject()
wrote := false
select {
case q.c <- p:
diff --git a/pkg/tcpip/link/fdbased/mmap.go b/pkg/tcpip/link/fdbased/mmap.go
index 3f516cab5..47047578d 100644
--- a/pkg/tcpip/link/fdbased/mmap.go
+++ b/pkg/tcpip/link/fdbased/mmap.go
@@ -194,6 +194,7 @@ func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) {
pbuf := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: buffer.View(pkt).ToVectorisedView(),
})
+ defer pbuf.DecRef()
if d.e.hdrSize > 0 {
if _, ok := pbuf.LinkHeader().Consume(d.e.hdrSize); !ok {
panic(fmt.Sprintf("LinkHeader().Consume(%d) must succeed", d.e.hdrSize))
diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go
index fab34c5fa..c22bba3b5 100644
--- a/pkg/tcpip/link/fdbased/packet_dispatchers.go
+++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go
@@ -181,6 +181,7 @@ func (d *readVDispatcher) dispatch() (bool, tcpip.Error) {
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: d.buf.pullViews(n),
})
+ defer pkt.DecRef()
var (
p tcpip.NetworkProtocolNumber
@@ -289,6 +290,7 @@ func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: d.bufs[k].pullViews(n),
})
+ defer pkt.DecRef()
// Mark that this iovec has been processed.
d.msgHdrs[k].Msg.Iovlen = 0
diff --git a/pkg/tcpip/link/loopback/loopback.go b/pkg/tcpip/link/loopback/loopback.go
index ca1f9c08d..49b0a29a9 100644
--- a/pkg/tcpip/link/loopback/loopback.go
+++ b/pkg/tcpip/link/loopback/loopback.go
@@ -104,6 +104,7 @@ func (e *endpoint) WriteRawPacket(pkt *stack.PacketBuffer) tcpip.Error {
newPkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: data,
})
+ defer newPkt.DecRef()
e.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, pkt.NetworkProtocolNumber, newPkt)
return nil
diff --git a/pkg/tcpip/link/pipe/pipe.go b/pkg/tcpip/link/pipe/pipe.go
index c67ca98ea..c2a888054 100644
--- a/pkg/tcpip/link/pipe/pipe.go
+++ b/pkg/tcpip/link/pipe/pipe.go
@@ -59,9 +59,11 @@ func (e *Endpoint) deliverPackets(r stack.RouteInfo, proto tcpip.NetworkProtocol
// avoid a deadlock when a packet triggers a response which leads the stack to
// try and take a lock it already holds.
for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
- e.linked.dispatcher.DeliverNetworkPacket(r.LocalLinkAddress /* remote */, r.RemoteLinkAddress /* local */, proto, stack.NewPacketBuffer(stack.PacketBufferOptions{
+ newPkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: buffer.NewVectorisedView(pkt.Size(), pkt.Views()),
- }))
+ })
+ e.linked.dispatcher.DeliverNetworkPacket(r.LocalLinkAddress /* remote */, r.RemoteLinkAddress /* local */, proto, newPkt)
+ newPkt.DecRef()
}
}
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
index c15cbf81b..a68b274b2 100644
--- a/pkg/tcpip/link/qdisc/fifo/endpoint.go
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -94,9 +94,7 @@ func (q *queueDispatcher) dispatchLoop() {
// We pass a protocol of zero here because each packet carries its
// NetworkProtocol.
q.lower.WritePackets(stack.RouteInfo{}, batch, 0 /* protocol */)
- for pkt := batch.Front(); pkt != nil; pkt = pkt.Next() {
- batch.Remove(pkt)
- }
+ batch.DecRef()
batch.Reset()
}
}
diff --git a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
index eb5abb906..09e5b8314 100644
--- a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
+++ b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
@@ -54,13 +54,13 @@ func (q *packetBufferQueue) setLimit(limit int) {
// enqueue adds the given packet to the queue.
//
// Returns true when the PacketBuffer is successfully added to the queue, in
-// which case ownership of the reference is transferred to the queue. And
-// returns false if the queue is full, in which case ownership is retained by
-// the caller.
+// which case the queue acquires a reference to the PacketBuffer, and
+// returns false if the queue is full.
func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool {
q.mu.Lock()
r := q.used < q.limit
if r {
+ s.IncRef()
q.list.PushBack(s)
q.used++
}
@@ -70,7 +70,7 @@ func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool {
}
// dequeue removes and returns the next PacketBuffer from queue, if one exists.
-// Ownership is transferred to the caller.
+// Caller is responsible for calling DecRef on the PacketBuffer.
func (q *packetBufferQueue) dequeue() *stack.PacketBuffer {
q.mu.Lock()
s := q.list.Front()
diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go
index b75522a51..8797d1bb9 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem.go
@@ -413,6 +413,7 @@ func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: buffer.View(b).ToVectorisedView(),
})
+ defer pkt.DecRef()
var src, dst tcpip.LinkAddress
var proto tcpip.NetworkProtocolNumber
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go
index c39eca33f..00c8a6a3b 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem_server.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go
@@ -311,6 +311,7 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) {
if e.addr != "" {
hdr, ok := pkt.LinkHeader().Consume(header.EthernetMinimumSize)
if !ok {
+ pkt.DecRef()
continue
}
eth := header.Ethernet(hdr)
@@ -323,6 +324,7 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) {
// IP version information is at the first octet, so pulling up 1 byte.
h, ok := pkt.Data().PullUp(1)
if !ok {
+ pkt.DecRef()
continue
}
switch header.IPVersion(h) {
@@ -331,11 +333,13 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) {
case header.IPv6Version:
proto = header.IPv6ProtocolNumber
default:
+ pkt.DecRef()
continue
}
}
// Send packet up the stack.
d.DeliverNetworkPacket(src, dst, proto, pkt)
+ pkt.DecRef()
}
e.mu.Lock()
diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go
index 2afa95af0..965cc994f 100644
--- a/pkg/tcpip/link/sniffer/sniffer.go
+++ b/pkg/tcpip/link/sniffer/sniffer.go
@@ -209,6 +209,7 @@ func logPacket(prefix string, dir direction, protocol tcpip.NetworkProtocolNumbe
vv := buffer.NewVectorisedView(pkt.Size(), pkt.Views())
vv.TrimFront(len(pkt.LinkHeader().View()))
pkt = stack.NewPacketBuffer(stack.PacketBufferOptions{Data: vv})
+ defer pkt.DecRef()
switch protocol {
case header.IPv4ProtocolNumber:
if ok := parse.IPv4(pkt); !ok {
diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go
index fa2131c28..5230ac281 100644
--- a/pkg/tcpip/link/tun/device.go
+++ b/pkg/tcpip/link/tun/device.go
@@ -76,6 +76,7 @@ func (d *Device) Release(ctx context.Context) {
// Decrease refcount if there is an endpoint associated with this file.
if d.endpoint != nil {
+ d.endpoint.Drain()
d.endpoint.RemoveNotify(d.notifyHandle)
d.endpoint.DecRef(ctx)
d.endpoint = nil
@@ -231,6 +232,7 @@ func (d *Device) Write(data []byte) (int64, error) {
ReserveHeaderBytes: len(ethHdr),
Data: buffer.View(data).ToVectorisedView(),
})
+ defer pkt.DecRef()
copy(pkt.LinkHeader().Push(len(ethHdr)), ethHdr)
endpoint.InjectLinkAddr(protocol, remote, pkt)
return dataLen, nil