summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/channel
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/channel')
-rw-r--r--pkg/tcpip/link/channel/BUILD6
-rw-r--r--pkg/tcpip/link/channel/channel.go234
2 files changed, 178 insertions, 62 deletions
diff --git a/pkg/tcpip/link/channel/BUILD b/pkg/tcpip/link/channel/BUILD
index 97a794986..b8b93e78e 100644
--- a/pkg/tcpip/link/channel/BUILD
+++ b/pkg/tcpip/link/channel/BUILD
@@ -1,13 +1,13 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
+load("//tools:defs.bzl", "go_library")
package(licenses = ["notice"])
go_library(
name = "channel",
srcs = ["channel.go"],
- importpath = "gvisor.dev/gvisor/pkg/tcpip/link/channel",
- visibility = ["//:sandbox"],
+ visibility = ["//visibility:public"],
deps = [
+ "//pkg/sync",
"//pkg/tcpip",
"//pkg/tcpip/buffer",
"//pkg/tcpip/stack",
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go
index 14f197a77..9bf67686d 100644
--- a/pkg/tcpip/link/channel/channel.go
+++ b/pkg/tcpip/link/channel/channel.go
@@ -18,6 +18,9 @@
package channel
import (
+ "context"
+
+ "gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/stack"
@@ -25,54 +28,166 @@ import (
// PacketInfo holds all the information about an outbound packet.
type PacketInfo struct {
- Header buffer.View
- Payload buffer.View
- Proto tcpip.NetworkProtocolNumber
- GSO *stack.GSO
+ Pkt *stack.PacketBuffer
+ Proto tcpip.NetworkProtocolNumber
+ GSO *stack.GSO
+ Route stack.Route
+}
+
+// Notification is the interface for receiving notification from the packet
+// queue.
+type Notification interface {
+ // WriteNotify will be called when a write happens to the queue.
+ WriteNotify()
+}
+
+// NotificationHandle is an opaque handle to the registered notification target.
+// It can be used to unregister the notification when no longer interested.
+//
+// +stateify savable
+type NotificationHandle struct {
+ n Notification
+}
+
+type queue struct {
+ // c is the outbound packet channel.
+ c chan PacketInfo
+ // mu protects fields below.
+ mu sync.RWMutex
+ notify []*NotificationHandle
+}
+
+func (q *queue) Close() {
+ close(q.c)
+}
+
+func (q *queue) Read() (PacketInfo, bool) {
+ select {
+ case p := <-q.c:
+ return p, true
+ default:
+ return PacketInfo{}, false
+ }
+}
+
+func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) {
+ select {
+ case pkt := <-q.c:
+ return pkt, true
+ case <-ctx.Done():
+ return PacketInfo{}, false
+ }
+}
+
+func (q *queue) Write(p PacketInfo) bool {
+ wrote := false
+ select {
+ case q.c <- p:
+ wrote = true
+ default:
+ }
+ q.mu.Lock()
+ notify := q.notify
+ q.mu.Unlock()
+
+ if wrote {
+ // Send notification outside of lock.
+ for _, h := range notify {
+ h.n.WriteNotify()
+ }
+ }
+ return wrote
+}
+
+func (q *queue) Num() int {
+ return len(q.c)
+}
+
+func (q *queue) AddNotify(notify Notification) *NotificationHandle {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ h := &NotificationHandle{n: notify}
+ q.notify = append(q.notify, h)
+ return h
+}
+
+func (q *queue) RemoveNotify(handle *NotificationHandle) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ // Make a copy, since we reads the array outside of lock when notifying.
+ notify := make([]*NotificationHandle, 0, len(q.notify))
+ for _, h := range q.notify {
+ if h != handle {
+ notify = append(notify, h)
+ }
+ }
+ q.notify = notify
}
// Endpoint is link layer endpoint that stores outbound packets in a channel
// and allows injection of inbound packets.
type Endpoint struct {
- dispatcher stack.NetworkDispatcher
- mtu uint32
- linkAddr tcpip.LinkAddress
- GSO bool
+ dispatcher stack.NetworkDispatcher
+ mtu uint32
+ linkAddr tcpip.LinkAddress
+ LinkEPCapabilities stack.LinkEndpointCapabilities
- // C is where outbound packets are queued.
- C chan PacketInfo
+ // Outbound packet queue.
+ q *queue
}
// New creates a new channel endpoint.
func New(size int, mtu uint32, linkAddr tcpip.LinkAddress) *Endpoint {
return &Endpoint{
- C: make(chan PacketInfo, size),
+ q: &queue{
+ c: make(chan PacketInfo, size),
+ },
mtu: mtu,
linkAddr: linkAddr,
}
}
+// Close closes e. Further packet injections will panic. Reads continue to
+// succeed until all packets are read.
+func (e *Endpoint) Close() {
+ e.q.Close()
+}
+
+// Read does non-blocking read one packet from the outbound packet queue.
+func (e *Endpoint) Read() (PacketInfo, bool) {
+ return e.q.Read()
+}
+
+// ReadContext does blocking read for one packet from the outbound packet queue.
+// It can be cancelled by ctx, and in this case, it returns false.
+func (e *Endpoint) ReadContext(ctx context.Context) (PacketInfo, bool) {
+ return e.q.ReadContext(ctx)
+}
+
// Drain removes all outbound packets from the channel and counts them.
func (e *Endpoint) Drain() int {
c := 0
for {
- select {
- case <-e.C:
- c++
- default:
+ if _, ok := e.Read(); !ok {
return c
}
+ c++
}
}
-// Inject injects an inbound packet.
-func (e *Endpoint) Inject(protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) {
- e.InjectLinkAddr(protocol, "", vv)
+// NumQueued returns the number of packet queued for outbound.
+func (e *Endpoint) NumQueued() int {
+ return e.q.Num()
+}
+
+// InjectInbound injects an inbound packet.
+func (e *Endpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) {
+ e.InjectLinkAddr(protocol, "", pkt)
}
// InjectLinkAddr injects an inbound packet with a remote link address.
-func (e *Endpoint) InjectLinkAddr(protocol tcpip.NetworkProtocolNumber, remote tcpip.LinkAddress, vv buffer.VectorisedView) {
- e.dispatcher.DeliverNetworkPacket(e, remote, "" /* local */, protocol, vv.Clone(nil), nil /* linkHeader */)
+func (e *Endpoint) InjectLinkAddr(protocol tcpip.NetworkProtocolNumber, remote tcpip.LinkAddress, pkt stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket(e, remote, "" /* local */, protocol, pkt)
}
// Attach saves the stack network-layer dispatcher for use later when packets
@@ -94,11 +209,7 @@ func (e *Endpoint) MTU() uint32 {
// Capabilities implements stack.LinkEndpoint.Capabilities.
func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
- caps := stack.LinkEndpointCapabilities(0)
- if e.GSO {
- caps |= stack.CapabilityHardwareGSO
- }
- return caps
+ return e.LinkEPCapabilities
}
// GSOMaxSize returns the maximum GSO packet size.
@@ -118,65 +229,70 @@ func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
}
// WritePacket stores outbound packets into the channel.
-func (e *Endpoint) WritePacket(_ *stack.Route, gso *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error {
+func (e *Endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) *tcpip.Error {
+ // Clone r then release its resource so we only get the relevant fields from
+ // stack.Route without holding a reference to a NIC's endpoint.
+ route := r.Clone()
+ route.Release()
p := PacketInfo{
- Header: hdr.View(),
- Proto: protocol,
- Payload: payload.ToView(),
- GSO: gso,
+ Pkt: &pkt,
+ Proto: protocol,
+ GSO: gso,
+ Route: route,
}
- select {
- case e.C <- p:
- default:
- }
+ e.q.Write(p)
return nil
}
// WritePackets stores outbound packets into the channel.
-func (e *Endpoint) WritePackets(_ *stack.Route, gso *stack.GSO, hdrs []stack.PacketDescriptor, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
- payloadView := payload.ToView()
+func (e *Endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ // Clone r then release its resource so we only get the relevant fields from
+ // stack.Route without holding a reference to a NIC's endpoint.
+ route := r.Clone()
+ route.Release()
n := 0
-packetLoop:
- for i := range hdrs {
- hdr := &hdrs[i].Hdr
- off := hdrs[i].Off
- size := hdrs[i].Size
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
p := PacketInfo{
- Header: hdr.View(),
- Proto: protocol,
- Payload: buffer.NewViewFromBytes(payloadView[off : off+size]),
- GSO: gso,
+ Pkt: pkt,
+ Proto: protocol,
+ GSO: gso,
+ Route: route,
}
- select {
- case e.C <- p:
- n++
- default:
- break packetLoop
+ if !e.q.Write(p) {
+ break
}
+ n++
}
return n, nil
}
// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
-func (e *Endpoint) WriteRawPacket(packet buffer.VectorisedView) *tcpip.Error {
+func (e *Endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
p := PacketInfo{
- Header: packet.ToView(),
- Proto: 0,
- Payload: buffer.View{},
- GSO: nil,
+ Pkt: &stack.PacketBuffer{Data: vv},
+ Proto: 0,
+ GSO: nil,
}
- select {
- case e.C <- p:
- default:
- }
+ e.q.Write(p)
return nil
}
// Wait implements stack.LinkEndpoint.Wait.
func (*Endpoint) Wait() {}
+
+// AddNotify adds a notification target for receiving event about outgoing
+// packets.
+func (e *Endpoint) AddNotify(notify Notification) *NotificationHandle {
+ return e.q.AddNotify(notify)
+}
+
+// RemoveNotify removes handle from the list of notification targets.
+func (e *Endpoint) RemoveNotify(handle *NotificationHandle) {
+ e.q.RemoveNotify(handle)
+}