diff options
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r-- | pkg/tcpip/link/channel/channel.go | 12 | ||||
-rw-r--r-- | pkg/tcpip/link/fdbased/mmap.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/fdbased/packet_dispatchers.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/link/loopback/loopback.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/pipe/pipe.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/link/qdisc/fifo/endpoint.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go | 8 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem_server.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/link/sniffer/sniffer.go | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/tun/device.go | 2 |
11 files changed, 33 insertions, 9 deletions
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go index 658557d62..270fa8c79 100644 --- a/pkg/tcpip/link/channel/channel.go +++ b/pkg/tcpip/link/channel/channel.go @@ -81,6 +81,18 @@ func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) { } func (q *queue) Write(p PacketInfo) bool { + // q holds the PacketBuffer. + + // Ideally, Write() should take a reference here, since it is adding + // the underlying PacketBuffer to the channel. However, in practice, + // calls to Read() are not necessarily symetric with calls + // to Write() (e.g writing to this endpoint and then exiting). This + // causes tests and analyzers to detect erroneous "leaks" for expected + // behavior. To prevent this, we allow the refcount to go to zero, and + // make a call to PreserveObject(), which prevents the PacketBuffer + // pooling implementation from reclaiming this instance, even when + // the refcount goes to zero. + p.Pkt.PreserveObject() wrote := false select { case q.c <- p: diff --git a/pkg/tcpip/link/fdbased/mmap.go b/pkg/tcpip/link/fdbased/mmap.go index 3f516cab5..47047578d 100644 --- a/pkg/tcpip/link/fdbased/mmap.go +++ b/pkg/tcpip/link/fdbased/mmap.go @@ -194,6 +194,7 @@ func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) { pbuf := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: buffer.View(pkt).ToVectorisedView(), }) + defer pbuf.DecRef() if d.e.hdrSize > 0 { if _, ok := pbuf.LinkHeader().Consume(d.e.hdrSize); !ok { panic(fmt.Sprintf("LinkHeader().Consume(%d) must succeed", d.e.hdrSize)) diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go index fab34c5fa..c22bba3b5 100644 --- a/pkg/tcpip/link/fdbased/packet_dispatchers.go +++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go @@ -181,6 +181,7 @@ func (d *readVDispatcher) dispatch() (bool, tcpip.Error) { pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: d.buf.pullViews(n), }) + defer pkt.DecRef() var ( p tcpip.NetworkProtocolNumber @@ -289,6 +290,7 @@ func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) { pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: d.bufs[k].pullViews(n), }) + defer pkt.DecRef() // Mark that this iovec has been processed. d.msgHdrs[k].Msg.Iovlen = 0 diff --git a/pkg/tcpip/link/loopback/loopback.go b/pkg/tcpip/link/loopback/loopback.go index ca1f9c08d..49b0a29a9 100644 --- a/pkg/tcpip/link/loopback/loopback.go +++ b/pkg/tcpip/link/loopback/loopback.go @@ -104,6 +104,7 @@ func (e *endpoint) WriteRawPacket(pkt *stack.PacketBuffer) tcpip.Error { newPkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: data, }) + defer newPkt.DecRef() e.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, pkt.NetworkProtocolNumber, newPkt) return nil diff --git a/pkg/tcpip/link/pipe/pipe.go b/pkg/tcpip/link/pipe/pipe.go index c67ca98ea..c2a888054 100644 --- a/pkg/tcpip/link/pipe/pipe.go +++ b/pkg/tcpip/link/pipe/pipe.go @@ -59,9 +59,11 @@ func (e *Endpoint) deliverPackets(r stack.RouteInfo, proto tcpip.NetworkProtocol // avoid a deadlock when a packet triggers a response which leads the stack to // try and take a lock it already holds. for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() { - e.linked.dispatcher.DeliverNetworkPacket(r.LocalLinkAddress /* remote */, r.RemoteLinkAddress /* local */, proto, stack.NewPacketBuffer(stack.PacketBufferOptions{ + newPkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: buffer.NewVectorisedView(pkt.Size(), pkt.Views()), - })) + }) + e.linked.dispatcher.DeliverNetworkPacket(r.LocalLinkAddress /* remote */, r.RemoteLinkAddress /* local */, proto, newPkt) + newPkt.DecRef() } } diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go index c15cbf81b..a68b274b2 100644 --- a/pkg/tcpip/link/qdisc/fifo/endpoint.go +++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go @@ -94,9 +94,7 @@ func (q *queueDispatcher) dispatchLoop() { // We pass a protocol of zero here because each packet carries its // NetworkProtocol. q.lower.WritePackets(stack.RouteInfo{}, batch, 0 /* protocol */) - for pkt := batch.Front(); pkt != nil; pkt = pkt.Next() { - batch.Remove(pkt) - } + batch.DecRef() batch.Reset() } } diff --git a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go index eb5abb906..09e5b8314 100644 --- a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go +++ b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go @@ -54,13 +54,13 @@ func (q *packetBufferQueue) setLimit(limit int) { // enqueue adds the given packet to the queue. // // Returns true when the PacketBuffer is successfully added to the queue, in -// which case ownership of the reference is transferred to the queue. And -// returns false if the queue is full, in which case ownership is retained by -// the caller. +// which case the queue acquires a reference to the PacketBuffer, and +// returns false if the queue is full. func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool { q.mu.Lock() r := q.used < q.limit if r { + s.IncRef() q.list.PushBack(s) q.used++ } @@ -70,7 +70,7 @@ func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool { } // dequeue removes and returns the next PacketBuffer from queue, if one exists. -// Ownership is transferred to the caller. +// Caller is responsible for calling DecRef on the PacketBuffer. func (q *packetBufferQueue) dequeue() *stack.PacketBuffer { q.mu.Lock() s := q.list.Front() diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go index b75522a51..8797d1bb9 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem.go +++ b/pkg/tcpip/link/sharedmem/sharedmem.go @@ -413,6 +413,7 @@ func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) { pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ Data: buffer.View(b).ToVectorisedView(), }) + defer pkt.DecRef() var src, dst tcpip.LinkAddress var proto tcpip.NetworkProtocolNumber diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go index c39eca33f..00c8a6a3b 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem_server.go +++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go @@ -311,6 +311,7 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) { if e.addr != "" { hdr, ok := pkt.LinkHeader().Consume(header.EthernetMinimumSize) if !ok { + pkt.DecRef() continue } eth := header.Ethernet(hdr) @@ -323,6 +324,7 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) { // IP version information is at the first octet, so pulling up 1 byte. h, ok := pkt.Data().PullUp(1) if !ok { + pkt.DecRef() continue } switch header.IPVersion(h) { @@ -331,11 +333,13 @@ func (e *serverEndpoint) dispatchLoop(d stack.NetworkDispatcher) { case header.IPv6Version: proto = header.IPv6ProtocolNumber default: + pkt.DecRef() continue } } // Send packet up the stack. d.DeliverNetworkPacket(src, dst, proto, pkt) + pkt.DecRef() } e.mu.Lock() diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go index 2afa95af0..965cc994f 100644 --- a/pkg/tcpip/link/sniffer/sniffer.go +++ b/pkg/tcpip/link/sniffer/sniffer.go @@ -209,6 +209,7 @@ func logPacket(prefix string, dir direction, protocol tcpip.NetworkProtocolNumbe vv := buffer.NewVectorisedView(pkt.Size(), pkt.Views()) vv.TrimFront(len(pkt.LinkHeader().View())) pkt = stack.NewPacketBuffer(stack.PacketBufferOptions{Data: vv}) + defer pkt.DecRef() switch protocol { case header.IPv4ProtocolNumber: if ok := parse.IPv4(pkt); !ok { diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index fa2131c28..5230ac281 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -76,6 +76,7 @@ func (d *Device) Release(ctx context.Context) { // Decrease refcount if there is an endpoint associated with this file. if d.endpoint != nil { + d.endpoint.Drain() d.endpoint.RemoveNotify(d.notifyHandle) d.endpoint.DecRef(ctx) d.endpoint = nil @@ -231,6 +232,7 @@ func (d *Device) Write(data []byte) (int64, error) { ReserveHeaderBytes: len(ethHdr), Data: buffer.View(data).ToVectorisedView(), }) + defer pkt.DecRef() copy(pkt.LinkHeader().Push(len(ethHdr)), ethHdr) endpoint.InjectLinkAddr(protocol, remote, pkt) return dataLen, nil |