summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r--pkg/tcpip/link/channel/BUILD15
-rw-r--r--pkg/tcpip/link/channel/channel.go298
-rw-r--r--pkg/tcpip/link/fdbased/BUILD40
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go657
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_test.go502
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_unsafe.go23
-rw-r--r--pkg/tcpip/link/fdbased/mmap.go199
-rw-r--r--pkg/tcpip/link/fdbased/mmap_stub.go23
-rw-r--r--pkg/tcpip/link/fdbased/mmap_unsafe.go84
-rw-r--r--pkg/tcpip/link/fdbased/packet_dispatchers.go317
-rw-r--r--pkg/tcpip/link/loopback/BUILD15
-rw-r--r--pkg/tcpip/link/loopback/loopback.go115
-rw-r--r--pkg/tcpip/link/muxed/BUILD28
-rw-r--r--pkg/tcpip/link/muxed/injectable.go137
-rw-r--r--pkg/tcpip/link/muxed/injectable_test.go98
-rw-r--r--pkg/tcpip/link/nested/BUILD31
-rw-r--r--pkg/tcpip/link/nested/nested.go131
-rw-r--r--pkg/tcpip/link/nested/nested_test.go105
-rw-r--r--pkg/tcpip/link/qdisc/fifo/BUILD19
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go209
-rw-r--r--pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go84
-rw-r--r--pkg/tcpip/link/rawfile/BUILD20
-rw-r--r--pkg/tcpip/link/rawfile/blockingpoll_amd64.s41
-rw-r--r--pkg/tcpip/link/rawfile/blockingpoll_arm64.s42
-rw-r--r--pkg/tcpip/link/rawfile/blockingpoll_noyield_unsafe.go31
-rw-r--r--pkg/tcpip/link/rawfile/blockingpoll_yield_unsafe.go66
-rw-r--r--pkg/tcpip/link/rawfile/errors.go70
-rw-r--r--pkg/tcpip/link/rawfile/rawfile_unsafe.go192
-rw-r--r--pkg/tcpip/link/sharedmem/BUILD41
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/BUILD23
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe.go78
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_test.go518
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go35
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/rx.go93
-rw-r--r--pkg/tcpip/link/sharedmem/pipe/tx.go161
-rw-r--r--pkg/tcpip/link/sharedmem/queue/BUILD27
-rw-r--r--pkg/tcpip/link/sharedmem/queue/queue_test.go517
-rw-r--r--pkg/tcpip/link/sharedmem/queue/rx.go221
-rw-r--r--pkg/tcpip/link/sharedmem/queue/tx.go151
-rw-r--r--pkg/tcpip/link/sharedmem/rx.go159
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem.go289
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_test.go812
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_unsafe.go25
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go272
-rw-r--r--pkg/tcpip/link/sniffer/BUILD20
-rw-r--r--pkg/tcpip/link/sniffer/pcap.go66
-rw-r--r--pkg/tcpip/link/sniffer/sniffer.go394
-rw-r--r--pkg/tcpip/link/tun/BUILD25
-rw-r--r--pkg/tcpip/link/tun/device.go358
-rw-r--r--pkg/tcpip/link/tun/protocol.go56
-rw-r--r--pkg/tcpip/link/tun/tun_unsafe.go63
-rw-r--r--pkg/tcpip/link/waitable/BUILD30
-rw-r--r--pkg/tcpip/link/waitable/waitable.go149
-rw-r--r--pkg/tcpip/link/waitable/waitable_test.go173
54 files changed, 8348 insertions, 0 deletions
diff --git a/pkg/tcpip/link/channel/BUILD b/pkg/tcpip/link/channel/BUILD
new file mode 100644
index 000000000..b8b93e78e
--- /dev/null
+++ b/pkg/tcpip/link/channel/BUILD
@@ -0,0 +1,15 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "channel",
+ srcs = ["channel.go"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go
new file mode 100644
index 000000000..20b183da0
--- /dev/null
+++ b/pkg/tcpip/link/channel/channel.go
@@ -0,0 +1,298 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package channel provides the implemention of channel-based data-link layer
+// endpoints. Such endpoints allow injection of inbound packets and store
+// outbound packets in a channel.
+package channel
+
+import (
+ "context"
+
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// PacketInfo holds all the information about an outbound packet.
+type PacketInfo struct {
+ Pkt *stack.PacketBuffer
+ Proto tcpip.NetworkProtocolNumber
+ GSO *stack.GSO
+ Route stack.Route
+}
+
+// Notification is the interface for receiving notification from the packet
+// queue.
+type Notification interface {
+ // WriteNotify will be called when a write happens to the queue.
+ WriteNotify()
+}
+
+// NotificationHandle is an opaque handle to the registered notification target.
+// It can be used to unregister the notification when no longer interested.
+//
+// +stateify savable
+type NotificationHandle struct {
+ n Notification
+}
+
+type queue struct {
+ // c is the outbound packet channel.
+ c chan PacketInfo
+ // mu protects fields below.
+ mu sync.RWMutex
+ notify []*NotificationHandle
+}
+
+func (q *queue) Close() {
+ close(q.c)
+}
+
+func (q *queue) Read() (PacketInfo, bool) {
+ select {
+ case p := <-q.c:
+ return p, true
+ default:
+ return PacketInfo{}, false
+ }
+}
+
+func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) {
+ select {
+ case pkt := <-q.c:
+ return pkt, true
+ case <-ctx.Done():
+ return PacketInfo{}, false
+ }
+}
+
+func (q *queue) Write(p PacketInfo) bool {
+ wrote := false
+ select {
+ case q.c <- p:
+ wrote = true
+ default:
+ }
+ q.mu.Lock()
+ notify := q.notify
+ q.mu.Unlock()
+
+ if wrote {
+ // Send notification outside of lock.
+ for _, h := range notify {
+ h.n.WriteNotify()
+ }
+ }
+ return wrote
+}
+
+func (q *queue) Num() int {
+ return len(q.c)
+}
+
+func (q *queue) AddNotify(notify Notification) *NotificationHandle {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ h := &NotificationHandle{n: notify}
+ q.notify = append(q.notify, h)
+ return h
+}
+
+func (q *queue) RemoveNotify(handle *NotificationHandle) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ // Make a copy, since we reads the array outside of lock when notifying.
+ notify := make([]*NotificationHandle, 0, len(q.notify))
+ for _, h := range q.notify {
+ if h != handle {
+ notify = append(notify, h)
+ }
+ }
+ q.notify = notify
+}
+
+// Endpoint is link layer endpoint that stores outbound packets in a channel
+// and allows injection of inbound packets.
+type Endpoint struct {
+ dispatcher stack.NetworkDispatcher
+ mtu uint32
+ linkAddr tcpip.LinkAddress
+ LinkEPCapabilities stack.LinkEndpointCapabilities
+
+ // Outbound packet queue.
+ q *queue
+}
+
+// New creates a new channel endpoint.
+func New(size int, mtu uint32, linkAddr tcpip.LinkAddress) *Endpoint {
+ return &Endpoint{
+ q: &queue{
+ c: make(chan PacketInfo, size),
+ },
+ mtu: mtu,
+ linkAddr: linkAddr,
+ }
+}
+
+// Close closes e. Further packet injections will panic. Reads continue to
+// succeed until all packets are read.
+func (e *Endpoint) Close() {
+ e.q.Close()
+}
+
+// Read does non-blocking read one packet from the outbound packet queue.
+func (e *Endpoint) Read() (PacketInfo, bool) {
+ return e.q.Read()
+}
+
+// ReadContext does blocking read for one packet from the outbound packet queue.
+// It can be cancelled by ctx, and in this case, it returns false.
+func (e *Endpoint) ReadContext(ctx context.Context) (PacketInfo, bool) {
+ return e.q.ReadContext(ctx)
+}
+
+// Drain removes all outbound packets from the channel and counts them.
+func (e *Endpoint) Drain() int {
+ c := 0
+ for {
+ if _, ok := e.Read(); !ok {
+ return c
+ }
+ c++
+ }
+}
+
+// NumQueued returns the number of packet queued for outbound.
+func (e *Endpoint) NumQueued() int {
+ return e.q.Num()
+}
+
+// InjectInbound injects an inbound packet.
+func (e *Endpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.InjectLinkAddr(protocol, "", pkt)
+}
+
+// InjectLinkAddr injects an inbound packet with a remote link address.
+func (e *Endpoint) InjectLinkAddr(protocol tcpip.NetworkProtocolNumber, remote tcpip.LinkAddress, pkt *stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket(remote, "" /* local */, protocol, pkt)
+}
+
+// Attach saves the stack network-layer dispatcher for use later when packets
+// are injected.
+func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *Endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
+// during construction.
+func (e *Endpoint) MTU() uint32 {
+ return e.mtu
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.LinkEPCapabilities
+}
+
+// GSOMaxSize returns the maximum GSO packet size.
+func (*Endpoint) GSOMaxSize() uint32 {
+ return 1 << 15
+}
+
+// MaxHeaderLength returns the maximum size of the link layer header. Given it
+// doesn't have a header, it just returns 0.
+func (*Endpoint) MaxHeaderLength() uint16 {
+ return 0
+}
+
+// LinkAddress returns the link address of this endpoint.
+func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.linkAddr
+}
+
+// WritePacket stores outbound packets into the channel.
+func (e *Endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ // Clone r then release its resource so we only get the relevant fields from
+ // stack.Route without holding a reference to a NIC's endpoint.
+ route := r.Clone()
+ route.Release()
+ p := PacketInfo{
+ Pkt: pkt,
+ Proto: protocol,
+ GSO: gso,
+ Route: route,
+ }
+
+ e.q.Write(p)
+
+ return nil
+}
+
+// WritePackets stores outbound packets into the channel.
+func (e *Endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ // Clone r then release its resource so we only get the relevant fields from
+ // stack.Route without holding a reference to a NIC's endpoint.
+ route := r.Clone()
+ route.Release()
+ n := 0
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+ p := PacketInfo{
+ Pkt: pkt,
+ Proto: protocol,
+ GSO: gso,
+ Route: route,
+ }
+
+ if !e.q.Write(p) {
+ break
+ }
+ n++
+ }
+
+ return n, nil
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *Endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ p := PacketInfo{
+ Pkt: &stack.PacketBuffer{Data: vv},
+ Proto: 0,
+ GSO: nil,
+ }
+
+ e.q.Write(p)
+
+ return nil
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (*Endpoint) Wait() {}
+
+// AddNotify adds a notification target for receiving event about outgoing
+// packets.
+func (e *Endpoint) AddNotify(notify Notification) *NotificationHandle {
+ return e.q.AddNotify(notify)
+}
+
+// RemoveNotify removes handle from the list of notification targets.
+func (e *Endpoint) RemoveNotify(handle *NotificationHandle) {
+ e.q.RemoveNotify(handle)
+}
diff --git a/pkg/tcpip/link/fdbased/BUILD b/pkg/tcpip/link/fdbased/BUILD
new file mode 100644
index 000000000..aa6db9aea
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/BUILD
@@ -0,0 +1,40 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "fdbased",
+ srcs = [
+ "endpoint.go",
+ "endpoint_unsafe.go",
+ "mmap.go",
+ "mmap_stub.go",
+ "mmap_unsafe.go",
+ "packet_dispatchers.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/binary",
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/rawfile",
+ "//pkg/tcpip/stack",
+ "@org_golang_x_sys//unix:go_default_library",
+ ],
+)
+
+go_test(
+ name = "fdbased_test",
+ size = "small",
+ srcs = ["endpoint_test.go"],
+ library = ":fdbased",
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/rawfile",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
new file mode 100644
index 000000000..f34082e1a
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -0,0 +1,657 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+// Package fdbased provides the implemention of data-link layer endpoints
+// backed by boundary-preserving file descriptors (e.g., TUN devices,
+// seqpacket/datagram sockets).
+//
+// FD based endpoints can be used in the networking stack by calling New() to
+// create a new endpoint, and then passing it as an argument to
+// Stack.CreateNIC().
+//
+// FD based endpoints can use more than one file descriptor to read incoming
+// packets. If there are more than one FDs specified and the underlying FD is an
+// AF_PACKET then the endpoint will enable FANOUT mode on the socket so that the
+// host kernel will consistently hash the packets to the sockets. This ensures
+// that packets for the same TCP streams are not reordered.
+//
+// Similarly if more than one FD's are specified where the underlying FD is not
+// AF_PACKET then it's the caller's responsibility to ensure that all inbound
+// packets on the descriptors are consistently 5 tuple hashed to one of the
+// descriptors to prevent TCP reordering.
+//
+// Since netstack today does not compute 5 tuple hashes for outgoing packets we
+// only use the first FD to write outbound packets. Once 5 tuple hashes for
+// all outbound packets are available we will make use of all underlying FD's to
+// write outbound packets.
+package fdbased
+
+import (
+ "fmt"
+ "syscall"
+
+ "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/binary"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// linkDispatcher reads packets from the link FD and dispatches them to the
+// NetworkDispatcher.
+type linkDispatcher interface {
+ dispatch() (bool, *tcpip.Error)
+}
+
+// PacketDispatchMode are the various supported methods of receiving and
+// dispatching packets from the underlying FD.
+type PacketDispatchMode int
+
+const (
+ // Readv is the default dispatch mode and is the least performant of the
+ // dispatch options but the one that is supported by all underlying FD
+ // types.
+ Readv PacketDispatchMode = iota
+ // RecvMMsg enables use of recvmmsg() syscall instead of readv() to
+ // read inbound packets. This reduces # of syscalls needed to process
+ // packets.
+ //
+ // NOTE: recvmmsg() is only supported for sockets, so if the underlying
+ // FD is not a socket then the code will still fall back to the readv()
+ // path.
+ RecvMMsg
+ // PacketMMap enables use of PACKET_RX_RING to receive packets from the
+ // NIC. PacketMMap requires that the underlying FD be an AF_PACKET. The
+ // primary use-case for this is runsc which uses an AF_PACKET FD to
+ // receive packets from the veth device.
+ PacketMMap
+)
+
+func (p PacketDispatchMode) String() string {
+ switch p {
+ case Readv:
+ return "Readv"
+ case RecvMMsg:
+ return "RecvMMsg"
+ case PacketMMap:
+ return "PacketMMap"
+ default:
+ return fmt.Sprintf("unknown packet dispatch mode '%d'", p)
+ }
+}
+
+type endpoint struct {
+ // fds is the set of file descriptors each identifying one inbound/outbound
+ // channel. The endpoint will dispatch from all inbound channels as well as
+ // hash outbound packets to specific channels based on the packet hash.
+ fds []int
+
+ // mtu (maximum transmission unit) is the maximum size of a packet.
+ mtu uint32
+
+ // hdrSize specifies the link-layer header size. If set to 0, no header
+ // is added/removed; otherwise an ethernet header is used.
+ hdrSize int
+
+ // addr is the address of the endpoint.
+ addr tcpip.LinkAddress
+
+ // caps holds the endpoint capabilities.
+ caps stack.LinkEndpointCapabilities
+
+ // closed is a function to be called when the FD's peer (if any) closes
+ // its end of the communication pipe.
+ closed func(*tcpip.Error)
+
+ inboundDispatchers []linkDispatcher
+ dispatcher stack.NetworkDispatcher
+
+ // packetDispatchMode controls the packet dispatcher used by this
+ // endpoint.
+ packetDispatchMode PacketDispatchMode
+
+ // gsoMaxSize is the maximum GSO packet size. It is zero if GSO is
+ // disabled.
+ gsoMaxSize uint32
+
+ // wg keeps track of running goroutines.
+ wg sync.WaitGroup
+}
+
+// Options specify the details about the fd-based endpoint to be created.
+type Options struct {
+ // FDs is a set of FDs used to read/write packets.
+ FDs []int
+
+ // MTU is the mtu to use for this endpoint.
+ MTU uint32
+
+ // EthernetHeader if true, indicates that the endpoint should read/write
+ // ethernet frames instead of IP packets.
+ EthernetHeader bool
+
+ // ClosedFunc is a function to be called when an endpoint's peer (if
+ // any) closes its end of the communication pipe.
+ ClosedFunc func(*tcpip.Error)
+
+ // Address is the link address for this endpoint. Only used if
+ // EthernetHeader is true.
+ Address tcpip.LinkAddress
+
+ // SaveRestore if true, indicates that this NIC capability set should
+ // include CapabilitySaveRestore
+ SaveRestore bool
+
+ // DisconnectOk if true, indicates that this NIC capability set should
+ // include CapabilityDisconnectOk.
+ DisconnectOk bool
+
+ // GSOMaxSize is the maximum GSO packet size. It is zero if GSO is
+ // disabled.
+ GSOMaxSize uint32
+
+ // SoftwareGSOEnabled indicates whether software GSO is enabled or not.
+ SoftwareGSOEnabled bool
+
+ // PacketDispatchMode specifies the type of inbound dispatcher to be
+ // used for this endpoint.
+ PacketDispatchMode PacketDispatchMode
+
+ // TXChecksumOffload if true, indicates that this endpoints capability
+ // set should include CapabilityTXChecksumOffload.
+ TXChecksumOffload bool
+
+ // RXChecksumOffload if true, indicates that this endpoints capability
+ // set should include CapabilityRXChecksumOffload.
+ RXChecksumOffload bool
+}
+
+// fanoutID is used for AF_PACKET based endpoints to enable PACKET_FANOUT
+// support in the host kernel. This allows us to use multiple FD's to receive
+// from the same underlying NIC. The fanoutID needs to be the same for a given
+// set of FD's that point to the same NIC. Trying to set the PACKET_FANOUT
+// option for an FD with a fanoutID already in use by another FD for a different
+// NIC will return an EINVAL.
+var fanoutID = 1
+
+// New creates a new fd-based endpoint.
+//
+// Makes fd non-blocking, but does not take ownership of fd, which must remain
+// open for the lifetime of the returned endpoint (until after the endpoint has
+// stopped being using and Wait returns).
+func New(opts *Options) (stack.LinkEndpoint, error) {
+ caps := stack.LinkEndpointCapabilities(0)
+ if opts.RXChecksumOffload {
+ caps |= stack.CapabilityRXChecksumOffload
+ }
+
+ if opts.TXChecksumOffload {
+ caps |= stack.CapabilityTXChecksumOffload
+ }
+
+ hdrSize := 0
+ if opts.EthernetHeader {
+ hdrSize = header.EthernetMinimumSize
+ caps |= stack.CapabilityResolutionRequired
+ }
+
+ if opts.SaveRestore {
+ caps |= stack.CapabilitySaveRestore
+ }
+
+ if opts.DisconnectOk {
+ caps |= stack.CapabilityDisconnectOk
+ }
+
+ if len(opts.FDs) == 0 {
+ return nil, fmt.Errorf("opts.FD is empty, at least one FD must be specified")
+ }
+
+ e := &endpoint{
+ fds: opts.FDs,
+ mtu: opts.MTU,
+ caps: caps,
+ closed: opts.ClosedFunc,
+ addr: opts.Address,
+ hdrSize: hdrSize,
+ packetDispatchMode: opts.PacketDispatchMode,
+ }
+
+ // Create per channel dispatchers.
+ for i := 0; i < len(e.fds); i++ {
+ fd := e.fds[i]
+ if err := syscall.SetNonblock(fd, true); err != nil {
+ return nil, fmt.Errorf("syscall.SetNonblock(%v) failed: %v", fd, err)
+ }
+
+ isSocket, err := isSocketFD(fd)
+ if err != nil {
+ return nil, err
+ }
+ if isSocket {
+ if opts.GSOMaxSize != 0 {
+ if opts.SoftwareGSOEnabled {
+ e.caps |= stack.CapabilitySoftwareGSO
+ } else {
+ e.caps |= stack.CapabilityHardwareGSO
+ }
+ e.gsoMaxSize = opts.GSOMaxSize
+ }
+ }
+ inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket)
+ if err != nil {
+ return nil, fmt.Errorf("createInboundDispatcher(...) = %v", err)
+ }
+ e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher)
+ }
+
+ // Increment fanoutID to ensure that we don't re-use the same fanoutID for
+ // the next endpoint.
+ fanoutID++
+
+ return e, nil
+}
+
+func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher, error) {
+ // By default use the readv() dispatcher as it works with all kinds of
+ // FDs (tap/tun/unix domain sockets and af_packet).
+ inboundDispatcher, err := newReadVDispatcher(fd, e)
+ if err != nil {
+ return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", fd, e, err)
+ }
+
+ if isSocket {
+ sa, err := unix.Getsockname(fd)
+ if err != nil {
+ return nil, fmt.Errorf("unix.Getsockname(%d) = %v", fd, err)
+ }
+ switch sa.(type) {
+ case *unix.SockaddrLinklayer:
+ // enable PACKET_FANOUT mode is the underlying socket is
+ // of type AF_PACKET.
+ const fanoutType = 0x8000 // PACKET_FANOUT_HASH | PACKET_FANOUT_FLAG_DEFRAG
+ fanoutArg := fanoutID | fanoutType<<16
+ if err := syscall.SetsockoptInt(fd, syscall.SOL_PACKET, unix.PACKET_FANOUT, fanoutArg); err != nil {
+ return nil, fmt.Errorf("failed to enable PACKET_FANOUT option: %v", err)
+ }
+ }
+
+ switch e.packetDispatchMode {
+ case PacketMMap:
+ inboundDispatcher, err = newPacketMMapDispatcher(fd, e)
+ if err != nil {
+ return nil, fmt.Errorf("newPacketMMapDispatcher(%d, %+v) = %v", fd, e, err)
+ }
+ case RecvMMsg:
+ // If the provided FD is a socket then we optimize
+ // packet reads by using recvmmsg() instead of read() to
+ // read packets in a batch.
+ inboundDispatcher, err = newRecvMMsgDispatcher(fd, e)
+ if err != nil {
+ return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", fd, e, err)
+ }
+ }
+ }
+ return inboundDispatcher, nil
+}
+
+func isSocketFD(fd int) (bool, error) {
+ var stat syscall.Stat_t
+ if err := syscall.Fstat(fd, &stat); err != nil {
+ return false, fmt.Errorf("syscall.Fstat(%v,...) failed: %v", fd, err)
+ }
+ return (stat.Mode & syscall.S_IFSOCK) == syscall.S_IFSOCK, nil
+}
+
+// Attach launches the goroutine that reads packets from the file descriptor and
+// dispatches them via the provided dispatcher.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+ // Link endpoints are not savable. When transportation endpoints are
+ // saved, they stop sending outgoing packets and all incoming packets
+ // are rejected.
+ for i := range e.inboundDispatchers {
+ e.wg.Add(1)
+ go func(i int) { // S/R-SAFE: See above.
+ e.dispatchLoop(e.inboundDispatchers[i])
+ e.wg.Done()
+ }(i)
+ }
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
+// during construction.
+func (e *endpoint) MTU() uint32 {
+ return e.mtu
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (e *endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.caps
+}
+
+// MaxHeaderLength returns the maximum size of the link-layer header.
+func (e *endpoint) MaxHeaderLength() uint16 {
+ return uint16(e.hdrSize)
+}
+
+// LinkAddress returns the link address of this endpoint.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.addr
+}
+
+// Wait implements stack.LinkEndpoint.Wait. It waits for the endpoint to stop
+// reading from its FD.
+func (e *endpoint) Wait() {
+ e.wg.Wait()
+}
+
+// virtioNetHdr is declared in linux/virtio_net.h.
+type virtioNetHdr struct {
+ flags uint8
+ gsoType uint8
+ hdrLen uint16
+ gsoSize uint16
+ csumStart uint16
+ csumOffset uint16
+}
+
+// These constants are declared in linux/virtio_net.h.
+const (
+ _VIRTIO_NET_HDR_F_NEEDS_CSUM = 1
+
+ _VIRTIO_NET_HDR_GSO_TCPV4 = 1
+ _VIRTIO_NET_HDR_GSO_TCPV6 = 4
+)
+
+// WritePacket writes outbound packets to the file descriptor. If it is not
+// currently writable, the packet is dropped.
+func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ if e.hdrSize > 0 {
+ // Add ethernet header if needed.
+ eth := header.Ethernet(pkt.Header.Prepend(header.EthernetMinimumSize))
+ pkt.LinkHeader = buffer.View(eth)
+ ethHdr := &header.EthernetFields{
+ DstAddr: r.RemoteLinkAddress,
+ Type: protocol,
+ }
+
+ // Preserve the src address if it's set in the route.
+ if r.LocalLinkAddress != "" {
+ ethHdr.SrcAddr = r.LocalLinkAddress
+ } else {
+ ethHdr.SrcAddr = e.addr
+ }
+ eth.Encode(ethHdr)
+ }
+
+ fd := e.fds[pkt.Hash%uint32(len(e.fds))]
+ if e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ vnetHdr := virtioNetHdr{}
+ if gso != nil {
+ vnetHdr.hdrLen = uint16(pkt.Header.UsedLength())
+ if gso.NeedsCsum {
+ vnetHdr.flags = _VIRTIO_NET_HDR_F_NEEDS_CSUM
+ vnetHdr.csumStart = header.EthernetMinimumSize + gso.L3HdrLen
+ vnetHdr.csumOffset = gso.CsumOffset
+ }
+ if gso.Type != stack.GSONone && uint16(pkt.Data.Size()) > gso.MSS {
+ switch gso.Type {
+ case stack.GSOTCPv4:
+ vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV4
+ case stack.GSOTCPv6:
+ vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV6
+ default:
+ panic(fmt.Sprintf("Unknown gso type: %v", gso.Type))
+ }
+ vnetHdr.gsoSize = gso.MSS
+ }
+ }
+
+ vnetHdrBuf := binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
+ return rawfile.NonBlockingWrite3(fd, vnetHdrBuf, pkt.Header.View(), pkt.Data.ToView())
+ }
+
+ if pkt.Data.Size() == 0 {
+ return rawfile.NonBlockingWrite(fd, pkt.Header.View())
+ }
+ if pkt.Header.UsedLength() == 0 {
+ return rawfile.NonBlockingWrite(fd, pkt.Data.ToView())
+ }
+
+ return rawfile.NonBlockingWrite3(fd, pkt.Header.View(), pkt.Data.ToView(), nil)
+}
+
+func (e *endpoint) sendBatch(batchFD int, batch []*stack.PacketBuffer) (int, *tcpip.Error) {
+ // Send a batch of packets through batchFD.
+ mmsgHdrs := make([]rawfile.MMsgHdr, 0, len(batch))
+ for _, pkt := range batch {
+ var ethHdrBuf []byte
+ iovLen := 0
+ if e.hdrSize > 0 {
+ // Add ethernet header if needed.
+ ethHdrBuf = make([]byte, header.EthernetMinimumSize)
+ eth := header.Ethernet(ethHdrBuf)
+ ethHdr := &header.EthernetFields{
+ DstAddr: pkt.EgressRoute.RemoteLinkAddress,
+ Type: pkt.NetworkProtocolNumber,
+ }
+
+ // Preserve the src address if it's set in the route.
+ if pkt.EgressRoute.LocalLinkAddress != "" {
+ ethHdr.SrcAddr = pkt.EgressRoute.LocalLinkAddress
+ } else {
+ ethHdr.SrcAddr = e.addr
+ }
+ eth.Encode(ethHdr)
+ iovLen++
+ }
+
+ vnetHdr := virtioNetHdr{}
+ var vnetHdrBuf []byte
+ if e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ if pkt.GSOOptions != nil {
+ vnetHdr.hdrLen = uint16(pkt.Header.UsedLength())
+ if pkt.GSOOptions.NeedsCsum {
+ vnetHdr.flags = _VIRTIO_NET_HDR_F_NEEDS_CSUM
+ vnetHdr.csumStart = header.EthernetMinimumSize + pkt.GSOOptions.L3HdrLen
+ vnetHdr.csumOffset = pkt.GSOOptions.CsumOffset
+ }
+ if pkt.GSOOptions.Type != stack.GSONone && uint16(pkt.Data.Size()) > pkt.GSOOptions.MSS {
+ switch pkt.GSOOptions.Type {
+ case stack.GSOTCPv4:
+ vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV4
+ case stack.GSOTCPv6:
+ vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV6
+ default:
+ panic(fmt.Sprintf("Unknown gso type: %v", pkt.GSOOptions.Type))
+ }
+ vnetHdr.gsoSize = pkt.GSOOptions.MSS
+ }
+ }
+ vnetHdrBuf = binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
+ iovLen++
+ }
+
+ iovecs := make([]syscall.Iovec, iovLen+1+len(pkt.Data.Views()))
+ var mmsgHdr rawfile.MMsgHdr
+ mmsgHdr.Msg.Iov = &iovecs[0]
+ iovecIdx := 0
+ if vnetHdrBuf != nil {
+ v := &iovecs[iovecIdx]
+ v.Base = &vnetHdrBuf[0]
+ v.Len = uint64(len(vnetHdrBuf))
+ iovecIdx++
+ }
+ if ethHdrBuf != nil {
+ v := &iovecs[iovecIdx]
+ v.Base = &ethHdrBuf[0]
+ v.Len = uint64(len(ethHdrBuf))
+ iovecIdx++
+ }
+ pktSize := uint64(0)
+ // Encode L3 Header
+ v := &iovecs[iovecIdx]
+ hdr := &pkt.Header
+ hdrView := hdr.View()
+ v.Base = &hdrView[0]
+ v.Len = uint64(len(hdrView))
+ pktSize += v.Len
+ iovecIdx++
+
+ // Now encode the Transport Payload.
+ pktViews := pkt.Data.Views()
+ for i := range pktViews {
+ vec := &iovecs[iovecIdx]
+ iovecIdx++
+ vec.Base = &pktViews[i][0]
+ vec.Len = uint64(len(pktViews[i]))
+ pktSize += vec.Len
+ }
+ mmsgHdr.Msg.Iovlen = uint64(iovecIdx)
+ mmsgHdrs = append(mmsgHdrs, mmsgHdr)
+ }
+
+ packets := 0
+ for len(mmsgHdrs) > 0 {
+ sent, err := rawfile.NonBlockingSendMMsg(batchFD, mmsgHdrs)
+ if err != nil {
+ return packets, err
+ }
+ packets += sent
+ mmsgHdrs = mmsgHdrs[sent:]
+ }
+
+ return packets, nil
+}
+
+// WritePackets writes outbound packets to the underlying file descriptors. If
+// one is not currently writable, the packet is dropped.
+//
+// Being a batch API, each packet in pkts should have the following
+// fields populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ // Preallocate to avoid repeated reallocation as we append to batch.
+ // batchSz is 47 because when SWGSO is in use then a single 65KB TCP
+ // segment can get split into 46 segments of 1420 bytes and a single 216
+ // byte segment.
+ const batchSz = 47
+ batch := make([]*stack.PacketBuffer, 0, batchSz)
+ batchFD := -1
+ sentPackets := 0
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+ if len(batch) == 0 {
+ batchFD = e.fds[pkt.Hash%uint32(len(e.fds))]
+ }
+ pktFD := e.fds[pkt.Hash%uint32(len(e.fds))]
+ if sendNow := pktFD != batchFD; !sendNow {
+ batch = append(batch, pkt)
+ continue
+ }
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ batch = batch[:0]
+ batch = append(batch, pkt)
+ batchFD = pktFD
+ }
+
+ if len(batch) != 0 {
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ }
+ return sentPackets, nil
+}
+
+// viewsEqual tests whether v1 and v2 refer to the same backing bytes.
+func viewsEqual(vs1, vs2 []buffer.View) bool {
+ return len(vs1) == len(vs2) && (len(vs1) == 0 || &vs1[0] == &vs2[0])
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ return rawfile.NonBlockingWrite(e.fds[0], vv.ToView())
+}
+
+// InjectOutobund implements stack.InjectableEndpoint.InjectOutbound.
+func (e *endpoint) InjectOutbound(dest tcpip.Address, packet []byte) *tcpip.Error {
+ return rawfile.NonBlockingWrite(e.fds[0], packet)
+}
+
+// dispatchLoop reads packets from the file descriptor in a loop and dispatches
+// them to the network stack.
+func (e *endpoint) dispatchLoop(inboundDispatcher linkDispatcher) *tcpip.Error {
+ for {
+ cont, err := inboundDispatcher.dispatch()
+ if err != nil || !cont {
+ if e.closed != nil {
+ e.closed(err)
+ }
+ return err
+ }
+ }
+}
+
+// GSOMaxSize returns the maximum GSO packet size.
+func (e *endpoint) GSOMaxSize() uint32 {
+ return e.gsoMaxSize
+}
+
+// InjectableEndpoint is an injectable fd-based endpoint. The endpoint writes
+// to the FD, but does not read from it. All reads come from injected packets.
+type InjectableEndpoint struct {
+ endpoint
+
+ dispatcher stack.NetworkDispatcher
+}
+
+// Attach saves the stack network-layer dispatcher for use later when packets
+// are injected.
+func (e *InjectableEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+}
+
+// InjectInbound injects an inbound packet.
+func (e *InjectableEndpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, protocol, pkt)
+}
+
+// NewInjectable creates a new fd-based InjectableEndpoint.
+func NewInjectable(fd int, mtu uint32, capabilities stack.LinkEndpointCapabilities) *InjectableEndpoint {
+ syscall.SetNonblock(fd, true)
+
+ return &InjectableEndpoint{endpoint: endpoint{
+ fds: []int{fd},
+ mtu: mtu,
+ caps: capabilities,
+ }}
+}
diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go
new file mode 100644
index 000000000..eaee7e5d7
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/endpoint_test.go
@@ -0,0 +1,502 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package fdbased
+
+import (
+ "bytes"
+ "fmt"
+ "math/rand"
+ "reflect"
+ "syscall"
+ "testing"
+ "time"
+ "unsafe"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+const (
+ mtu = 1500
+ laddr = tcpip.LinkAddress("\x11\x22\x33\x44\x55\x66")
+ raddr = tcpip.LinkAddress("\x77\x88\x99\xaa\xbb\xcc")
+ proto = 10
+ csumOffset = 48
+ gsoMSS = 500
+)
+
+type packetInfo struct {
+ raddr tcpip.LinkAddress
+ proto tcpip.NetworkProtocolNumber
+ contents *stack.PacketBuffer
+}
+
+type context struct {
+ t *testing.T
+ readFDs []int
+ writeFDs []int
+ ep stack.LinkEndpoint
+ ch chan packetInfo
+ done chan struct{}
+}
+
+func newContext(t *testing.T, opt *Options) *context {
+ firstFDPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+ if err != nil {
+ t.Fatalf("Socketpair failed: %v", err)
+ }
+ secondFDPair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+ if err != nil {
+ t.Fatalf("Socketpair failed: %v", err)
+ }
+
+ done := make(chan struct{}, 2)
+ opt.ClosedFunc = func(*tcpip.Error) {
+ done <- struct{}{}
+ }
+
+ opt.FDs = []int{firstFDPair[1], secondFDPair[1]}
+ ep, err := New(opt)
+ if err != nil {
+ t.Fatalf("Failed to create FD endpoint: %v", err)
+ }
+
+ c := &context{
+ t: t,
+ readFDs: []int{firstFDPair[0], secondFDPair[0]},
+ writeFDs: opt.FDs,
+ ep: ep,
+ ch: make(chan packetInfo, 100),
+ done: done,
+ }
+
+ ep.Attach(c)
+
+ return c
+}
+
+func (c *context) cleanup() {
+ for _, fd := range c.readFDs {
+ syscall.Close(fd)
+ }
+ <-c.done
+ <-c.done
+ for _, fd := range c.writeFDs {
+ syscall.Close(fd)
+ }
+}
+
+func (c *context) DeliverNetworkPacket(remote tcpip.LinkAddress, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ c.ch <- packetInfo{remote, protocol, pkt}
+}
+
+func TestNoEthernetProperties(t *testing.T) {
+ c := newContext(t, &Options{MTU: mtu})
+ defer c.cleanup()
+
+ if want, v := uint16(0), c.ep.MaxHeaderLength(); want != v {
+ t.Fatalf("MaxHeaderLength() = %v, want %v", v, want)
+ }
+
+ if want, v := uint32(mtu), c.ep.MTU(); want != v {
+ t.Fatalf("MTU() = %v, want %v", v, want)
+ }
+}
+
+func TestEthernetProperties(t *testing.T) {
+ c := newContext(t, &Options{EthernetHeader: true, MTU: mtu})
+ defer c.cleanup()
+
+ if want, v := uint16(header.EthernetMinimumSize), c.ep.MaxHeaderLength(); want != v {
+ t.Fatalf("MaxHeaderLength() = %v, want %v", v, want)
+ }
+
+ if want, v := uint32(mtu), c.ep.MTU(); want != v {
+ t.Fatalf("MTU() = %v, want %v", v, want)
+ }
+}
+
+func TestAddress(t *testing.T) {
+ addrs := []tcpip.LinkAddress{"", "abc", "def"}
+ for _, a := range addrs {
+ t.Run(fmt.Sprintf("Address: %q", a), func(t *testing.T) {
+ c := newContext(t, &Options{Address: a, MTU: mtu})
+ defer c.cleanup()
+
+ if want, v := a, c.ep.LinkAddress(); want != v {
+ t.Fatalf("LinkAddress() = %v, want %v", v, want)
+ }
+ })
+ }
+}
+
+func testWritePacket(t *testing.T, plen int, eth bool, gsoMaxSize uint32, hash uint32) {
+ c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: eth, GSOMaxSize: gsoMaxSize})
+ defer c.cleanup()
+
+ r := &stack.Route{
+ RemoteLinkAddress: raddr,
+ }
+
+ // Build header.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()) + 100)
+ b := hdr.Prepend(100)
+ for i := range b {
+ b[i] = uint8(rand.Intn(256))
+ }
+
+ // Build payload and write.
+ payload := make(buffer.View, plen)
+ for i := range payload {
+ payload[i] = uint8(rand.Intn(256))
+ }
+ want := append(hdr.View(), payload...)
+ var gso *stack.GSO
+ if gsoMaxSize != 0 {
+ gso = &stack.GSO{
+ Type: stack.GSOTCPv6,
+ NeedsCsum: true,
+ CsumOffset: csumOffset,
+ MSS: gsoMSS,
+ MaxSize: gsoMaxSize,
+ L3HdrLen: header.IPv4MaximumHeaderSize,
+ }
+ }
+ if err := c.ep.WritePacket(r, gso, proto, &stack.PacketBuffer{
+ Header: hdr,
+ Data: payload.ToVectorisedView(),
+ Hash: hash,
+ }); err != nil {
+ t.Fatalf("WritePacket failed: %v", err)
+ }
+
+ // Read from the corresponding FD, then compare with what we wrote.
+ b = make([]byte, mtu)
+ fd := c.readFDs[hash%uint32(len(c.readFDs))]
+ n, err := syscall.Read(fd, b)
+ if err != nil {
+ t.Fatalf("Read failed: %v", err)
+ }
+ b = b[:n]
+ if gsoMaxSize != 0 {
+ vnetHdr := *(*virtioNetHdr)(unsafe.Pointer(&b[0]))
+ if vnetHdr.flags&_VIRTIO_NET_HDR_F_NEEDS_CSUM == 0 {
+ t.Fatalf("virtioNetHdr.flags %v doesn't contain %v", vnetHdr.flags, _VIRTIO_NET_HDR_F_NEEDS_CSUM)
+ }
+ csumStart := header.EthernetMinimumSize + gso.L3HdrLen
+ if vnetHdr.csumStart != csumStart {
+ t.Fatalf("vnetHdr.csumStart = %v, want %v", vnetHdr.csumStart, csumStart)
+ }
+ if vnetHdr.csumOffset != csumOffset {
+ t.Fatalf("vnetHdr.csumOffset = %v, want %v", vnetHdr.csumOffset, csumOffset)
+ }
+ gsoType := uint8(0)
+ if int(gso.MSS) < plen {
+ gsoType = _VIRTIO_NET_HDR_GSO_TCPV6
+ }
+ if vnetHdr.gsoType != gsoType {
+ t.Fatalf("vnetHdr.gsoType = %v, want %v", vnetHdr.gsoType, gsoType)
+ }
+ b = b[virtioNetHdrSize:]
+ }
+ if eth {
+ h := header.Ethernet(b)
+ b = b[header.EthernetMinimumSize:]
+
+ if a := h.SourceAddress(); a != laddr {
+ t.Fatalf("SourceAddress() = %v, want %v", a, laddr)
+ }
+
+ if a := h.DestinationAddress(); a != raddr {
+ t.Fatalf("DestinationAddress() = %v, want %v", a, raddr)
+ }
+
+ if et := h.Type(); et != proto {
+ t.Fatalf("Type() = %v, want %v", et, proto)
+ }
+ }
+ if len(b) != len(want) {
+ t.Fatalf("Read returned %v bytes, want %v", len(b), len(want))
+ }
+ if !bytes.Equal(b, want) {
+ t.Fatalf("Read returned %x, want %x", b, want)
+ }
+}
+
+func TestWritePacket(t *testing.T) {
+ lengths := []int{0, 100, 1000}
+ eths := []bool{true, false}
+ gsos := []uint32{0, 32768}
+
+ for _, eth := range eths {
+ for _, plen := range lengths {
+ for _, gso := range gsos {
+ t.Run(
+ fmt.Sprintf("Eth=%v,PayloadLen=%v,GSOMaxSize=%v", eth, plen, gso),
+ func(t *testing.T) {
+ testWritePacket(t, plen, eth, gso, 0)
+ },
+ )
+ }
+ }
+ }
+}
+
+func TestHashedWritePacket(t *testing.T) {
+ lengths := []int{0, 100, 1000}
+ eths := []bool{true, false}
+ gsos := []uint32{0, 32768}
+ hashes := []uint32{0, 1}
+ for _, eth := range eths {
+ for _, plen := range lengths {
+ for _, gso := range gsos {
+ for _, hash := range hashes {
+ t.Run(
+ fmt.Sprintf("Eth=%v,PayloadLen=%v,GSOMaxSize=%v,Hash=%d", eth, plen, gso, hash),
+ func(t *testing.T) {
+ testWritePacket(t, plen, eth, gso, hash)
+ },
+ )
+ }
+ }
+ }
+ }
+}
+
+func TestPreserveSrcAddress(t *testing.T) {
+ baddr := tcpip.LinkAddress("\xcc\xbb\xaa\x77\x88\x99")
+
+ c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: true})
+ defer c.cleanup()
+
+ // Set LocalLinkAddress in route to the value of the bridged address.
+ r := &stack.Route{
+ RemoteLinkAddress: raddr,
+ LocalLinkAddress: baddr,
+ }
+
+ // WritePacket panics given a prependable with anything less than
+ // the minimum size of the ethernet header.
+ hdr := buffer.NewPrependable(header.EthernetMinimumSize)
+ if err := c.ep.WritePacket(r, nil /* gso */, proto, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buffer.VectorisedView{},
+ }); err != nil {
+ t.Fatalf("WritePacket failed: %v", err)
+ }
+
+ // Read from the FD, then compare with what we wrote.
+ b := make([]byte, mtu)
+ n, err := syscall.Read(c.readFDs[0], b)
+ if err != nil {
+ t.Fatalf("Read failed: %v", err)
+ }
+ b = b[:n]
+ h := header.Ethernet(b)
+
+ if a := h.SourceAddress(); a != baddr {
+ t.Fatalf("SourceAddress() = %v, want %v", a, baddr)
+ }
+}
+
+func TestDeliverPacket(t *testing.T) {
+ lengths := []int{100, 1000}
+ eths := []bool{true, false}
+
+ for _, eth := range eths {
+ for _, plen := range lengths {
+ t.Run(fmt.Sprintf("Eth=%v,PayloadLen=%v", eth, plen), func(t *testing.T) {
+ c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: eth})
+ defer c.cleanup()
+
+ // Build packet.
+ b := make([]byte, plen)
+ all := b
+ for i := range b {
+ b[i] = uint8(rand.Intn(256))
+ }
+
+ var hdr header.Ethernet
+ if !eth {
+ // So that it looks like an IPv4 packet.
+ b[0] = 0x40
+ } else {
+ hdr = make(header.Ethernet, header.EthernetMinimumSize)
+ hdr.Encode(&header.EthernetFields{
+ SrcAddr: raddr,
+ DstAddr: laddr,
+ Type: proto,
+ })
+ all = append(hdr, b...)
+ }
+
+ // Write packet via the file descriptor.
+ if _, err := syscall.Write(c.readFDs[0], all); err != nil {
+ t.Fatalf("Write failed: %v", err)
+ }
+
+ // Receive packet through the endpoint.
+ select {
+ case pi := <-c.ch:
+ want := packetInfo{
+ raddr: raddr,
+ proto: proto,
+ contents: &stack.PacketBuffer{
+ Data: buffer.View(b).ToVectorisedView(),
+ LinkHeader: buffer.View(hdr),
+ },
+ }
+ if !eth {
+ want.proto = header.IPv4ProtocolNumber
+ want.raddr = ""
+ }
+ // want.contents.Data will be a single
+ // view, so make pi do the same for the
+ // DeepEqual check.
+ pi.contents.Data = pi.contents.Data.ToView().ToVectorisedView()
+ if !reflect.DeepEqual(want, pi) {
+ t.Fatalf("Unexpected received packet: %+v, want %+v", pi, want)
+ }
+ case <-time.After(10 * time.Second):
+ t.Fatalf("Timed out waiting for packet")
+ }
+ })
+ }
+ }
+}
+
+func TestBufConfigMaxLength(t *testing.T) {
+ got := 0
+ for _, i := range BufConfig {
+ got += i
+ }
+ want := header.MaxIPPacketSize // maximum TCP packet size
+ if got < want {
+ t.Errorf("total buffer size is invalid: got %d, want >= %d", got, want)
+ }
+}
+
+func TestBufConfigFirst(t *testing.T) {
+ // The stack assumes that the TCP/IP header is enterily contained in the first view.
+ // Therefore, the first view needs to be large enough to contain the maximum TCP/IP
+ // header, which is 120 bytes (60 bytes for IP + 60 bytes for TCP).
+ want := 120
+ got := BufConfig[0]
+ if got < want {
+ t.Errorf("first view has an invalid size: got %d, want >= %d", got, want)
+ }
+}
+
+var capLengthTestCases = []struct {
+ comment string
+ config []int
+ n int
+ wantUsed int
+ wantLengths []int
+}{
+ {
+ comment: "Single slice",
+ config: []int{2},
+ n: 1,
+ wantUsed: 1,
+ wantLengths: []int{1},
+ },
+ {
+ comment: "Multiple slices",
+ config: []int{1, 2},
+ n: 2,
+ wantUsed: 2,
+ wantLengths: []int{1, 1},
+ },
+ {
+ comment: "Entire buffer",
+ config: []int{1, 2},
+ n: 3,
+ wantUsed: 2,
+ wantLengths: []int{1, 2},
+ },
+ {
+ comment: "Entire buffer but not on the last slice",
+ config: []int{1, 2, 3},
+ n: 3,
+ wantUsed: 2,
+ wantLengths: []int{1, 2, 3},
+ },
+}
+
+func TestReadVDispatcherCapLength(t *testing.T) {
+ for _, c := range capLengthTestCases {
+ // fd does not matter for this test.
+ d := readVDispatcher{fd: -1, e: &endpoint{}}
+ d.views = make([]buffer.View, len(c.config))
+ d.iovecs = make([]syscall.Iovec, len(c.config))
+ d.allocateViews(c.config)
+
+ used := d.capViews(c.n, c.config)
+ if used != c.wantUsed {
+ t.Errorf("Test %q failed when calling capViews(%d, %v). Got %d. Want %d", c.comment, c.n, c.config, used, c.wantUsed)
+ }
+ lengths := make([]int, len(d.views))
+ for i, v := range d.views {
+ lengths[i] = len(v)
+ }
+ if !reflect.DeepEqual(lengths, c.wantLengths) {
+ t.Errorf("Test %q failed when calling capViews(%d, %v). Got %v. Want %v", c.comment, c.n, c.config, lengths, c.wantLengths)
+ }
+ }
+}
+
+func TestRecvMMsgDispatcherCapLength(t *testing.T) {
+ for _, c := range capLengthTestCases {
+ d := recvMMsgDispatcher{
+ fd: -1, // fd does not matter for this test.
+ e: &endpoint{},
+ views: make([][]buffer.View, 1),
+ iovecs: make([][]syscall.Iovec, 1),
+ msgHdrs: make([]rawfile.MMsgHdr, 1),
+ }
+
+ for i, _ := range d.views {
+ d.views[i] = make([]buffer.View, len(c.config))
+ }
+ for i := range d.iovecs {
+ d.iovecs[i] = make([]syscall.Iovec, len(c.config))
+ }
+ for k, msgHdr := range d.msgHdrs {
+ msgHdr.Msg.Iov = &d.iovecs[k][0]
+ msgHdr.Msg.Iovlen = uint64(len(c.config))
+ }
+
+ d.allocateViews(c.config)
+
+ used := d.capViews(0, c.n, c.config)
+ if used != c.wantUsed {
+ t.Errorf("Test %q failed when calling capViews(%d, %v). Got %d. Want %d", c.comment, c.n, c.config, used, c.wantUsed)
+ }
+ lengths := make([]int, len(d.views[0]))
+ for i, v := range d.views[0] {
+ lengths[i] = len(v)
+ }
+ if !reflect.DeepEqual(lengths, c.wantLengths) {
+ t.Errorf("Test %q failed when calling capViews(%d, %v). Got %v. Want %v", c.comment, c.n, c.config, lengths, c.wantLengths)
+ }
+
+ }
+}
diff --git a/pkg/tcpip/link/fdbased/endpoint_unsafe.go b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
new file mode 100644
index 000000000..df14eaad1
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
@@ -0,0 +1,23 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package fdbased
+
+import (
+ "unsafe"
+)
+
+const virtioNetHdrSize = int(unsafe.Sizeof(virtioNetHdr{}))
diff --git a/pkg/tcpip/link/fdbased/mmap.go b/pkg/tcpip/link/fdbased/mmap.go
new file mode 100644
index 000000000..2dfd29aa9
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/mmap.go
@@ -0,0 +1,199 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux,amd64 linux,arm64
+
+package fdbased
+
+import (
+ "encoding/binary"
+ "syscall"
+
+ "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+const (
+ tPacketAlignment = uintptr(16)
+ tpStatusKernel = 0
+ tpStatusUser = 1
+ tpStatusCopy = 2
+ tpStatusLosing = 4
+)
+
+// We overallocate the frame size to accommodate space for the
+// TPacketHdr+RawSockAddrLinkLayer+MAC header and any padding.
+//
+// Memory allocated for the ring buffer: tpBlockSize * tpBlockNR = 2 MiB
+//
+// NOTE:
+// Frames need to be aligned at 16 byte boundaries.
+// BlockSize needs to be page aligned.
+//
+// For details see PACKET_MMAP setting constraints in
+// https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
+const (
+ tpFrameSize = 65536 + 128
+ tpBlockSize = tpFrameSize * 32
+ tpBlockNR = 1
+ tpFrameNR = (tpBlockSize * tpBlockNR) / tpFrameSize
+)
+
+// tPacketAlign aligns the pointer v at a tPacketAlignment boundary. Direct
+// translation of the TPACKET_ALIGN macro in <linux/if_packet.h>.
+func tPacketAlign(v uintptr) uintptr {
+ return (v + tPacketAlignment - 1) & uintptr(^(tPacketAlignment - 1))
+}
+
+// tPacketReq is the tpacket_req structure as described in
+// https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
+type tPacketReq struct {
+ tpBlockSize uint32
+ tpBlockNR uint32
+ tpFrameSize uint32
+ tpFrameNR uint32
+}
+
+// tPacketHdr is tpacket_hdr structure as described in <linux/if_packet.h>
+type tPacketHdr []byte
+
+const (
+ tpStatusOffset = 0
+ tpLenOffset = 8
+ tpSnapLenOffset = 12
+ tpMacOffset = 16
+ tpNetOffset = 18
+ tpSecOffset = 20
+ tpUSecOffset = 24
+)
+
+func (t tPacketHdr) tpLen() uint32 {
+ return binary.LittleEndian.Uint32(t[tpLenOffset:])
+}
+
+func (t tPacketHdr) tpSnapLen() uint32 {
+ return binary.LittleEndian.Uint32(t[tpSnapLenOffset:])
+}
+
+func (t tPacketHdr) tpMac() uint16 {
+ return binary.LittleEndian.Uint16(t[tpMacOffset:])
+}
+
+func (t tPacketHdr) tpNet() uint16 {
+ return binary.LittleEndian.Uint16(t[tpNetOffset:])
+}
+
+func (t tPacketHdr) tpSec() uint32 {
+ return binary.LittleEndian.Uint32(t[tpSecOffset:])
+}
+
+func (t tPacketHdr) tpUSec() uint32 {
+ return binary.LittleEndian.Uint32(t[tpUSecOffset:])
+}
+
+func (t tPacketHdr) Payload() []byte {
+ return t[uint32(t.tpMac()) : uint32(t.tpMac())+t.tpSnapLen()]
+}
+
+// packetMMapDispatcher uses PACKET_RX_RING's to read/dispatch inbound packets.
+// See: mmap_amd64_unsafe.go for implementation details.
+type packetMMapDispatcher struct {
+ // fd is the file descriptor used to send and receive packets.
+ fd int
+
+ // e is the endpoint this dispatcher is attached to.
+ e *endpoint
+
+ // ringBuffer is only used when PacketMMap dispatcher is used and points
+ // to the start of the mmapped PACKET_RX_RING buffer.
+ ringBuffer []byte
+
+ // ringOffset is the current offset into the ring buffer where the next
+ // inbound packet will be placed by the kernel.
+ ringOffset int
+}
+
+func (d *packetMMapDispatcher) readMMappedPacket() ([]byte, *tcpip.Error) {
+ hdr := tPacketHdr(d.ringBuffer[d.ringOffset*tpFrameSize:])
+ for hdr.tpStatus()&tpStatusUser == 0 {
+ event := rawfile.PollEvent{
+ FD: int32(d.fd),
+ Events: unix.POLLIN | unix.POLLERR,
+ }
+ if _, errno := rawfile.BlockingPoll(&event, 1, nil); errno != 0 {
+ if errno == syscall.EINTR {
+ continue
+ }
+ return nil, rawfile.TranslateErrno(errno)
+ }
+ if hdr.tpStatus()&tpStatusCopy != 0 {
+ // This frame is truncated so skip it after flipping the
+ // buffer to the kernel.
+ hdr.setTPStatus(tpStatusKernel)
+ d.ringOffset = (d.ringOffset + 1) % tpFrameNR
+ hdr = (tPacketHdr)(d.ringBuffer[d.ringOffset*tpFrameSize:])
+ continue
+ }
+ }
+
+ // Copy out the packet from the mmapped frame to a locally owned buffer.
+ pkt := make([]byte, hdr.tpSnapLen())
+ copy(pkt, hdr.Payload())
+ // Release packet to kernel.
+ hdr.setTPStatus(tpStatusKernel)
+ d.ringOffset = (d.ringOffset + 1) % tpFrameNR
+ return pkt, nil
+}
+
+// dispatch reads packets from an mmaped ring buffer and dispatches them to the
+// network stack.
+func (d *packetMMapDispatcher) dispatch() (bool, *tcpip.Error) {
+ pkt, err := d.readMMappedPacket()
+ if err != nil {
+ return false, err
+ }
+ var (
+ p tcpip.NetworkProtocolNumber
+ remote, local tcpip.LinkAddress
+ eth header.Ethernet
+ )
+ if d.e.hdrSize > 0 {
+ eth = header.Ethernet(pkt)
+ p = eth.Type()
+ remote = eth.SourceAddress()
+ local = eth.DestinationAddress()
+ } else {
+ // We don't get any indication of what the packet is, so try to guess
+ // if it's an IPv4 or IPv6 packet.
+ switch header.IPVersion(pkt) {
+ case header.IPv4Version:
+ p = header.IPv4ProtocolNumber
+ case header.IPv6Version:
+ p = header.IPv6ProtocolNumber
+ default:
+ return true, nil
+ }
+ }
+
+ pkt = pkt[d.e.hdrSize:]
+ d.e.dispatcher.DeliverNetworkPacket(remote, local, p, &stack.PacketBuffer{
+ Data: buffer.View(pkt).ToVectorisedView(),
+ LinkHeader: buffer.View(eth),
+ })
+ return true, nil
+}
diff --git a/pkg/tcpip/link/fdbased/mmap_stub.go b/pkg/tcpip/link/fdbased/mmap_stub.go
new file mode 100644
index 000000000..67be52d67
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/mmap_stub.go
@@ -0,0 +1,23 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !linux !amd64,!arm64
+
+package fdbased
+
+// Stubbed out version for non-linux/non-amd64/non-arm64 platforms.
+
+func newPacketMMapDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
+ return nil, nil
+}
diff --git a/pkg/tcpip/link/fdbased/mmap_unsafe.go b/pkg/tcpip/link/fdbased/mmap_unsafe.go
new file mode 100644
index 000000000..3894185ae
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/mmap_unsafe.go
@@ -0,0 +1,84 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux,amd64 linux,arm64
+
+package fdbased
+
+import (
+ "fmt"
+ "sync/atomic"
+ "syscall"
+ "unsafe"
+
+ "golang.org/x/sys/unix"
+)
+
+// tPacketHdrlen is the TPACKET_HDRLEN variable defined in <linux/if_packet.h>.
+var tPacketHdrlen = tPacketAlign(unsafe.Sizeof(tPacketHdr{}) + unsafe.Sizeof(syscall.RawSockaddrLinklayer{}))
+
+// tpStatus returns the frame status field.
+// The status is concurrently updated by the kernel as a result we must
+// use atomic operations to prevent races.
+func (t tPacketHdr) tpStatus() uint32 {
+ hdr := unsafe.Pointer(&t[0])
+ statusPtr := unsafe.Pointer(uintptr(hdr) + uintptr(tpStatusOffset))
+ return atomic.LoadUint32((*uint32)(statusPtr))
+}
+
+// setTPStatus set's the frame status to the provided status.
+// The status is concurrently updated by the kernel as a result we must
+// use atomic operations to prevent races.
+func (t tPacketHdr) setTPStatus(status uint32) {
+ hdr := unsafe.Pointer(&t[0])
+ statusPtr := unsafe.Pointer(uintptr(hdr) + uintptr(tpStatusOffset))
+ atomic.StoreUint32((*uint32)(statusPtr), status)
+}
+
+func newPacketMMapDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
+ d := &packetMMapDispatcher{
+ fd: fd,
+ e: e,
+ }
+ pageSize := unix.Getpagesize()
+ if tpBlockSize%pageSize != 0 {
+ return nil, fmt.Errorf("tpBlockSize: %d is not page aligned, pagesize: %d", tpBlockSize, pageSize)
+ }
+ tReq := tPacketReq{
+ tpBlockSize: uint32(tpBlockSize),
+ tpBlockNR: uint32(tpBlockNR),
+ tpFrameSize: uint32(tpFrameSize),
+ tpFrameNR: uint32(tpFrameNR),
+ }
+ // Setup PACKET_RX_RING.
+ if err := setsockopt(d.fd, syscall.SOL_PACKET, syscall.PACKET_RX_RING, unsafe.Pointer(&tReq), unsafe.Sizeof(tReq)); err != nil {
+ return nil, fmt.Errorf("failed to enable PACKET_RX_RING: %v", err)
+ }
+ // Let's mmap the blocks.
+ sz := tpBlockSize * tpBlockNR
+ buf, err := syscall.Mmap(d.fd, 0, sz, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
+ if err != nil {
+ return nil, fmt.Errorf("syscall.Mmap(...,0, %v, ...) failed = %v", sz, err)
+ }
+ d.ringBuffer = buf
+ return d, nil
+}
+
+func setsockopt(fd, level, name int, val unsafe.Pointer, vallen uintptr) error {
+ if _, _, errno := syscall.Syscall6(syscall.SYS_SETSOCKOPT, uintptr(fd), uintptr(level), uintptr(name), uintptr(val), vallen, 0); errno != 0 {
+ return error(errno)
+ }
+
+ return nil
+}
diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go
new file mode 100644
index 000000000..f04738cfb
--- /dev/null
+++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go
@@ -0,0 +1,317 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package fdbased
+
+import (
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// BufConfig defines the shape of the vectorised view used to read packets from the NIC.
+var BufConfig = []int{128, 256, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}
+
+// readVDispatcher uses readv() system call to read inbound packets and
+// dispatches them.
+type readVDispatcher struct {
+ // fd is the file descriptor used to send and receive packets.
+ fd int
+
+ // e is the endpoint this dispatcher is attached to.
+ e *endpoint
+
+ // views are the actual buffers that hold the packet contents.
+ views []buffer.View
+
+ // iovecs are initialized with base pointers/len of the corresponding
+ // entries in the views defined above, except when GSO is enabled then
+ // the first iovec points to a buffer for the vnet header which is
+ // stripped before the views are passed up the stack for further
+ // processing.
+ iovecs []syscall.Iovec
+}
+
+func newReadVDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
+ d := &readVDispatcher{fd: fd, e: e}
+ d.views = make([]buffer.View, len(BufConfig))
+ iovLen := len(BufConfig)
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ iovLen++
+ }
+ d.iovecs = make([]syscall.Iovec, iovLen)
+ return d, nil
+}
+
+func (d *readVDispatcher) allocateViews(bufConfig []int) {
+ var vnetHdr [virtioNetHdrSize]byte
+ vnetHdrOff := 0
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ // The kernel adds virtioNetHdr before each packet, but
+ // we don't use it, so so we allocate a buffer for it,
+ // add it in iovecs but don't add it in a view.
+ d.iovecs[0] = syscall.Iovec{
+ Base: &vnetHdr[0],
+ Len: uint64(virtioNetHdrSize),
+ }
+ vnetHdrOff++
+ }
+ for i := 0; i < len(bufConfig); i++ {
+ if d.views[i] != nil {
+ break
+ }
+ b := buffer.NewView(bufConfig[i])
+ d.views[i] = b
+ d.iovecs[i+vnetHdrOff] = syscall.Iovec{
+ Base: &b[0],
+ Len: uint64(len(b)),
+ }
+ }
+}
+
+func (d *readVDispatcher) capViews(n int, buffers []int) int {
+ c := 0
+ for i, s := range buffers {
+ c += s
+ if c >= n {
+ d.views[i].CapLength(s - (c - n))
+ return i + 1
+ }
+ }
+ return len(buffers)
+}
+
+// dispatch reads one packet from the file descriptor and dispatches it.
+func (d *readVDispatcher) dispatch() (bool, *tcpip.Error) {
+ d.allocateViews(BufConfig)
+
+ n, err := rawfile.BlockingReadv(d.fd, d.iovecs)
+ if err != nil {
+ return false, err
+ }
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ // Skip virtioNetHdr which is added before each packet, it
+ // isn't used and it isn't in a view.
+ n -= virtioNetHdrSize
+ }
+ if n <= d.e.hdrSize {
+ return false, nil
+ }
+
+ var (
+ p tcpip.NetworkProtocolNumber
+ remote, local tcpip.LinkAddress
+ eth header.Ethernet
+ )
+ if d.e.hdrSize > 0 {
+ eth = header.Ethernet(d.views[0][:header.EthernetMinimumSize])
+ p = eth.Type()
+ remote = eth.SourceAddress()
+ local = eth.DestinationAddress()
+ } else {
+ // We don't get any indication of what the packet is, so try to guess
+ // if it's an IPv4 or IPv6 packet.
+ switch header.IPVersion(d.views[0]) {
+ case header.IPv4Version:
+ p = header.IPv4ProtocolNumber
+ case header.IPv6Version:
+ p = header.IPv6ProtocolNumber
+ default:
+ return true, nil
+ }
+ }
+
+ used := d.capViews(n, BufConfig)
+ pkt := &stack.PacketBuffer{
+ Data: buffer.NewVectorisedView(n, append([]buffer.View(nil), d.views[:used]...)),
+ LinkHeader: buffer.View(eth),
+ }
+ pkt.Data.TrimFront(d.e.hdrSize)
+
+ d.e.dispatcher.DeliverNetworkPacket(remote, local, p, pkt)
+
+ // Prepare e.views for another packet: release used views.
+ for i := 0; i < used; i++ {
+ d.views[i] = nil
+ }
+
+ return true, nil
+}
+
+// recvMMsgDispatcher uses the recvmmsg system call to read inbound packets and
+// dispatches them.
+type recvMMsgDispatcher struct {
+ // fd is the file descriptor used to send and receive packets.
+ fd int
+
+ // e is the endpoint this dispatcher is attached to.
+ e *endpoint
+
+ // views is an array of array of buffers that contain packet contents.
+ views [][]buffer.View
+
+ // iovecs is an array of array of iovec records where each iovec base
+ // pointer and length are initialzed to the corresponding view above,
+ // except when GSO is enabled then the first iovec in each array of
+ // iovecs points to a buffer for the vnet header which is stripped
+ // before the views are passed up the stack for further processing.
+ iovecs [][]syscall.Iovec
+
+ // msgHdrs is an array of MMsgHdr objects where each MMsghdr is used to
+ // reference an array of iovecs in the iovecs field defined above. This
+ // array is passed as the parameter to recvmmsg call to retrieve
+ // potentially more than 1 packet per syscall.
+ msgHdrs []rawfile.MMsgHdr
+}
+
+const (
+ // MaxMsgsPerRecv is the maximum number of packets we want to retrieve
+ // in a single RecvMMsg call.
+ MaxMsgsPerRecv = 8
+)
+
+func newRecvMMsgDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
+ d := &recvMMsgDispatcher{
+ fd: fd,
+ e: e,
+ }
+ d.views = make([][]buffer.View, MaxMsgsPerRecv)
+ for i := range d.views {
+ d.views[i] = make([]buffer.View, len(BufConfig))
+ }
+ d.iovecs = make([][]syscall.Iovec, MaxMsgsPerRecv)
+ iovLen := len(BufConfig)
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ // virtioNetHdr is prepended before each packet.
+ iovLen++
+ }
+ for i := range d.iovecs {
+ d.iovecs[i] = make([]syscall.Iovec, iovLen)
+ }
+ d.msgHdrs = make([]rawfile.MMsgHdr, MaxMsgsPerRecv)
+ for i := range d.msgHdrs {
+ d.msgHdrs[i].Msg.Iov = &d.iovecs[i][0]
+ d.msgHdrs[i].Msg.Iovlen = uint64(iovLen)
+ }
+ return d, nil
+}
+
+func (d *recvMMsgDispatcher) capViews(k, n int, buffers []int) int {
+ c := 0
+ for i, s := range buffers {
+ c += s
+ if c >= n {
+ d.views[k][i].CapLength(s - (c - n))
+ return i + 1
+ }
+ }
+ return len(buffers)
+}
+
+func (d *recvMMsgDispatcher) allocateViews(bufConfig []int) {
+ for k := 0; k < len(d.views); k++ {
+ var vnetHdr [virtioNetHdrSize]byte
+ vnetHdrOff := 0
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ // The kernel adds virtioNetHdr before each packet, but
+ // we don't use it, so so we allocate a buffer for it,
+ // add it in iovecs but don't add it in a view.
+ d.iovecs[k][0] = syscall.Iovec{
+ Base: &vnetHdr[0],
+ Len: uint64(virtioNetHdrSize),
+ }
+ vnetHdrOff++
+ }
+ for i := 0; i < len(bufConfig); i++ {
+ if d.views[k][i] != nil {
+ break
+ }
+ b := buffer.NewView(bufConfig[i])
+ d.views[k][i] = b
+ d.iovecs[k][i+vnetHdrOff] = syscall.Iovec{
+ Base: &b[0],
+ Len: uint64(len(b)),
+ }
+ }
+ }
+}
+
+// recvMMsgDispatch reads more than one packet at a time from the file
+// descriptor and dispatches it.
+func (d *recvMMsgDispatcher) dispatch() (bool, *tcpip.Error) {
+ d.allocateViews(BufConfig)
+
+ nMsgs, err := rawfile.BlockingRecvMMsg(d.fd, d.msgHdrs)
+ if err != nil {
+ return false, err
+ }
+ // Process each of received packets.
+ for k := 0; k < nMsgs; k++ {
+ n := int(d.msgHdrs[k].Len)
+ if d.e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
+ n -= virtioNetHdrSize
+ }
+ if n <= d.e.hdrSize {
+ return false, nil
+ }
+
+ var (
+ p tcpip.NetworkProtocolNumber
+ remote, local tcpip.LinkAddress
+ eth header.Ethernet
+ )
+ if d.e.hdrSize > 0 {
+ eth = header.Ethernet(d.views[k][0])
+ p = eth.Type()
+ remote = eth.SourceAddress()
+ local = eth.DestinationAddress()
+ } else {
+ // We don't get any indication of what the packet is, so try to guess
+ // if it's an IPv4 or IPv6 packet.
+ switch header.IPVersion(d.views[k][0]) {
+ case header.IPv4Version:
+ p = header.IPv4ProtocolNumber
+ case header.IPv6Version:
+ p = header.IPv6ProtocolNumber
+ default:
+ return true, nil
+ }
+ }
+
+ used := d.capViews(k, int(n), BufConfig)
+ pkt := &stack.PacketBuffer{
+ Data: buffer.NewVectorisedView(int(n), append([]buffer.View(nil), d.views[k][:used]...)),
+ LinkHeader: buffer.View(eth),
+ }
+ pkt.Data.TrimFront(d.e.hdrSize)
+ d.e.dispatcher.DeliverNetworkPacket(remote, local, p, pkt)
+
+ // Prepare e.views for another packet: release used views.
+ for i := 0; i < used; i++ {
+ d.views[k][i] = nil
+ }
+ }
+
+ for k := 0; k < nMsgs; k++ {
+ d.msgHdrs[k].Len = 0
+ }
+
+ return true, nil
+}
diff --git a/pkg/tcpip/link/loopback/BUILD b/pkg/tcpip/link/loopback/BUILD
new file mode 100644
index 000000000..6bf3805b7
--- /dev/null
+++ b/pkg/tcpip/link/loopback/BUILD
@@ -0,0 +1,15 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "loopback",
+ srcs = ["loopback.go"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/loopback/loopback.go b/pkg/tcpip/link/loopback/loopback.go
new file mode 100644
index 000000000..568c6874f
--- /dev/null
+++ b/pkg/tcpip/link/loopback/loopback.go
@@ -0,0 +1,115 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package loopback provides the implemention of loopback data-link layer
+// endpoints. Such endpoints just turn outbound packets into inbound ones.
+//
+// Loopback endpoints can be used in the networking stack by calling New() to
+// create a new endpoint, and then passing it as an argument to
+// Stack.CreateNIC().
+package loopback
+
+import (
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+type endpoint struct {
+ dispatcher stack.NetworkDispatcher
+}
+
+// New creates a new loopback endpoint. This link-layer endpoint just turns
+// outbound packets into inbound packets.
+func New() stack.LinkEndpoint {
+ return &endpoint{}
+}
+
+// Attach implements stack.LinkEndpoint.Attach. It just saves the stack network-
+// layer dispatcher for later use when packets need to be dispatched.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It returns a constant that matches the
+// linux loopback interface.
+func (*endpoint) MTU() uint32 {
+ return 65536
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities. Loopback advertises
+// itself as supporting checksum offload, but in reality it's just omitted.
+func (*endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return stack.CapabilityRXChecksumOffload | stack.CapabilityTXChecksumOffload | stack.CapabilitySaveRestore | stack.CapabilityLoopback
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. Given that the
+// loopback interface doesn't have a header, it just returns 0.
+func (*endpoint) MaxHeaderLength() uint16 {
+ return 0
+}
+
+// LinkAddress returns the link address of this endpoint.
+func (*endpoint) LinkAddress() tcpip.LinkAddress {
+ return ""
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (*endpoint) Wait() {}
+
+// WritePacket implements stack.LinkEndpoint.WritePacket. It delivers outbound
+// packets to the network-layer dispatcher.
+func (e *endpoint) WritePacket(_ *stack.Route, _ *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ views := make([]buffer.View, 1, 1+len(pkt.Data.Views()))
+ views[0] = pkt.Header.View()
+ views = append(views, pkt.Data.Views()...)
+
+ // Because we're immediately turning around and writing the packet back
+ // to the rx path, we intentionally don't preserve the remote and local
+ // link addresses from the stack.Route we're passed.
+ e.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, protocol, &stack.PacketBuffer{
+ Data: buffer.NewVectorisedView(len(views[0])+pkt.Data.Size(), views),
+ })
+
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+func (e *endpoint) WritePackets(*stack.Route, *stack.GSO, stack.PacketBufferList, tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ panic("not implemented")
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ // There should be an ethernet header at the beginning of vv.
+ hdr, ok := vv.PullUp(header.EthernetMinimumSize)
+ if !ok {
+ // Reject the packet if it's shorter than an ethernet header.
+ return tcpip.ErrBadAddress
+ }
+ linkHeader := header.Ethernet(hdr)
+ vv.TrimFront(len(linkHeader))
+ e.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, linkHeader.Type(), &stack.PacketBuffer{
+ Data: vv,
+ LinkHeader: buffer.View(linkHeader),
+ })
+
+ return nil
+}
diff --git a/pkg/tcpip/link/muxed/BUILD b/pkg/tcpip/link/muxed/BUILD
new file mode 100644
index 000000000..82b441b79
--- /dev/null
+++ b/pkg/tcpip/link/muxed/BUILD
@@ -0,0 +1,28 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "muxed",
+ srcs = ["injectable.go"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
+
+go_test(
+ name = "muxed_test",
+ size = "small",
+ srcs = ["injectable_test.go"],
+ library = ":muxed",
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/link/fdbased",
+ "//pkg/tcpip/network/ipv4",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/muxed/injectable.go b/pkg/tcpip/link/muxed/injectable.go
new file mode 100644
index 000000000..c69d6b7e9
--- /dev/null
+++ b/pkg/tcpip/link/muxed/injectable.go
@@ -0,0 +1,137 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package muxed provides a muxed link endpoints.
+package muxed
+
+import (
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// InjectableEndpoint is an injectable multi endpoint. The endpoint has
+// trivial routing rules that determine which InjectableEndpoint a given packet
+// will be written to. Note that HandleLocal works differently for this
+// endpoint (see WritePacket).
+type InjectableEndpoint struct {
+ routes map[tcpip.Address]stack.InjectableLinkEndpoint
+ dispatcher stack.NetworkDispatcher
+}
+
+// MTU implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) MTU() uint32 {
+ minMTU := ^uint32(0)
+ for _, endpoint := range m.routes {
+ if endpointMTU := endpoint.MTU(); endpointMTU < minMTU {
+ minMTU = endpointMTU
+ }
+ }
+ return minMTU
+}
+
+// Capabilities implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) Capabilities() stack.LinkEndpointCapabilities {
+ minCapabilities := stack.LinkEndpointCapabilities(^uint(0))
+ for _, endpoint := range m.routes {
+ minCapabilities &= endpoint.Capabilities()
+ }
+ return minCapabilities
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) MaxHeaderLength() uint16 {
+ minHeaderLen := ^uint16(0)
+ for _, endpoint := range m.routes {
+ if headerLen := endpoint.MaxHeaderLength(); headerLen < minHeaderLen {
+ minHeaderLen = headerLen
+ }
+ }
+ return minHeaderLen
+}
+
+// LinkAddress implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) LinkAddress() tcpip.LinkAddress {
+ return ""
+}
+
+// Attach implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ for _, endpoint := range m.routes {
+ endpoint.Attach(dispatcher)
+ }
+ m.dispatcher = dispatcher
+}
+
+// IsAttached implements stack.LinkEndpoint.
+func (m *InjectableEndpoint) IsAttached() bool {
+ return m.dispatcher != nil
+}
+
+// InjectInbound implements stack.InjectableLinkEndpoint.
+func (m *InjectableEndpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ m.dispatcher.DeliverNetworkPacket("" /* remote */, "" /* local */, protocol, pkt)
+}
+
+// WritePackets writes outbound packets to the appropriate
+// LinkInjectableEndpoint based on the RemoteAddress. HandleLocal only works if
+// r.RemoteAddress has a route registered in this endpoint.
+func (m *InjectableEndpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ endpoint, ok := m.routes[r.RemoteAddress]
+ if !ok {
+ return 0, tcpip.ErrNoRoute
+ }
+ return endpoint.WritePackets(r, gso, pkts, protocol)
+}
+
+// WritePacket writes outbound packets to the appropriate LinkInjectableEndpoint
+// based on the RemoteAddress. HandleLocal only works if r.RemoteAddress has a
+// route registered in this endpoint.
+func (m *InjectableEndpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ if endpoint, ok := m.routes[r.RemoteAddress]; ok {
+ return endpoint.WritePacket(r, gso, protocol, pkt)
+ }
+ return tcpip.ErrNoRoute
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (m *InjectableEndpoint) WriteRawPacket(buffer.VectorisedView) *tcpip.Error {
+ // WriteRawPacket doesn't get a route or network address, so there's
+ // nowhere to write this.
+ return tcpip.ErrNoRoute
+}
+
+// InjectOutbound writes outbound packets to the appropriate
+// LinkInjectableEndpoint based on the dest address.
+func (m *InjectableEndpoint) InjectOutbound(dest tcpip.Address, packet []byte) *tcpip.Error {
+ endpoint, ok := m.routes[dest]
+ if !ok {
+ return tcpip.ErrNoRoute
+ }
+ return endpoint.InjectOutbound(dest, packet)
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (m *InjectableEndpoint) Wait() {
+ for _, ep := range m.routes {
+ ep.Wait()
+ }
+}
+
+// NewInjectableEndpoint creates a new multi-endpoint injectable endpoint.
+func NewInjectableEndpoint(routes map[tcpip.Address]stack.InjectableLinkEndpoint) *InjectableEndpoint {
+ return &InjectableEndpoint{
+ routes: routes,
+ }
+}
diff --git a/pkg/tcpip/link/muxed/injectable_test.go b/pkg/tcpip/link/muxed/injectable_test.go
new file mode 100644
index 000000000..0744f66d6
--- /dev/null
+++ b/pkg/tcpip/link/muxed/injectable_test.go
@@ -0,0 +1,98 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package muxed
+
+import (
+ "bytes"
+ "net"
+ "os"
+ "syscall"
+ "testing"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/link/fdbased"
+ "gvisor.dev/gvisor/pkg/tcpip/network/ipv4"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+func TestInjectableEndpointRawDispatch(t *testing.T) {
+ endpoint, sock, dstIP := makeTestInjectableEndpoint(t)
+
+ endpoint.InjectOutbound(dstIP, []byte{0xFA})
+
+ buf := make([]byte, ipv4.MaxTotalSize)
+ bytesRead, err := sock.Read(buf)
+ if err != nil {
+ t.Fatalf("Unable to read from socketpair: %v", err)
+ }
+ if got, want := buf[:bytesRead], []byte{0xFA}; !bytes.Equal(got, want) {
+ t.Fatalf("Read %v from the socketpair, wanted %v", got, want)
+ }
+}
+
+func TestInjectableEndpointDispatch(t *testing.T) {
+ endpoint, sock, dstIP := makeTestInjectableEndpoint(t)
+
+ hdr := buffer.NewPrependable(1)
+ hdr.Prepend(1)[0] = 0xFA
+ packetRoute := stack.Route{RemoteAddress: dstIP}
+
+ endpoint.WritePacket(&packetRoute, nil /* gso */, ipv4.ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buffer.NewViewFromBytes([]byte{0xFB}).ToVectorisedView(),
+ })
+
+ buf := make([]byte, 6500)
+ bytesRead, err := sock.Read(buf)
+ if err != nil {
+ t.Fatalf("Unable to read from socketpair: %v", err)
+ }
+ if got, want := buf[:bytesRead], []byte{0xFA, 0xFB}; !bytes.Equal(got, want) {
+ t.Fatalf("Read %v from the socketpair, wanted %v", got, want)
+ }
+}
+
+func TestInjectableEndpointDispatchHdrOnly(t *testing.T) {
+ endpoint, sock, dstIP := makeTestInjectableEndpoint(t)
+ hdr := buffer.NewPrependable(1)
+ hdr.Prepend(1)[0] = 0xFA
+ packetRoute := stack.Route{RemoteAddress: dstIP}
+ endpoint.WritePacket(&packetRoute, nil /* gso */, ipv4.ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buffer.NewView(0).ToVectorisedView(),
+ })
+ buf := make([]byte, 6500)
+ bytesRead, err := sock.Read(buf)
+ if err != nil {
+ t.Fatalf("Unable to read from socketpair: %v", err)
+ }
+ if got, want := buf[:bytesRead], []byte{0xFA}; !bytes.Equal(got, want) {
+ t.Fatalf("Read %v from the socketpair, wanted %v", got, want)
+ }
+}
+
+func makeTestInjectableEndpoint(t *testing.T) (*InjectableEndpoint, *os.File, tcpip.Address) {
+ dstIP := tcpip.Address(net.ParseIP("1.2.3.4").To4())
+ pair, err := syscall.Socketpair(syscall.AF_UNIX,
+ syscall.SOCK_SEQPACKET|syscall.SOCK_CLOEXEC|syscall.SOCK_NONBLOCK, 0)
+ if err != nil {
+ t.Fatal("Failed to create socket pair:", err)
+ }
+ underlyingEndpoint := fdbased.NewInjectable(pair[1], 6500, stack.CapabilityNone)
+ routes := map[tcpip.Address]stack.InjectableLinkEndpoint{dstIP: underlyingEndpoint}
+ endpoint := NewInjectableEndpoint(routes)
+ return endpoint, os.NewFile(uintptr(pair[0]), "test route end"), dstIP
+}
diff --git a/pkg/tcpip/link/nested/BUILD b/pkg/tcpip/link/nested/BUILD
new file mode 100644
index 000000000..bdd5276ad
--- /dev/null
+++ b/pkg/tcpip/link/nested/BUILD
@@ -0,0 +1,31 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "nested",
+ srcs = [
+ "nested.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
+
+go_test(
+ name = "nested_test",
+ size = "small",
+ srcs = [
+ "nested_test.go",
+ ],
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/nested",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/nested/nested.go b/pkg/tcpip/link/nested/nested.go
new file mode 100644
index 000000000..2998f9c4f
--- /dev/null
+++ b/pkg/tcpip/link/nested/nested.go
@@ -0,0 +1,131 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package nested provides helpers to implement the pattern of nested
+// stack.LinkEndpoints.
+package nested
+
+import (
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// Endpoint is a wrapper around stack.LinkEndpoint and stack.NetworkDispatcher
+// that can be used to implement nesting safely by providing lifecycle
+// concurrency guards.
+//
+// See the tests in this package for example usage.
+type Endpoint struct {
+ child stack.LinkEndpoint
+ embedder stack.NetworkDispatcher
+
+ // mu protects dispatcher.
+ mu sync.RWMutex
+ dispatcher stack.NetworkDispatcher
+}
+
+var _ stack.GSOEndpoint = (*Endpoint)(nil)
+var _ stack.LinkEndpoint = (*Endpoint)(nil)
+var _ stack.NetworkDispatcher = (*Endpoint)(nil)
+
+// Init initializes a nested.Endpoint that uses embedder as the dispatcher for
+// child on Attach.
+//
+// See the tests in this package for example usage.
+func (e *Endpoint) Init(child stack.LinkEndpoint, embedder stack.NetworkDispatcher) {
+ e.child = child
+ e.embedder = embedder
+}
+
+// DeliverNetworkPacket implements stack.NetworkDispatcher.
+func (e *Endpoint) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.mu.RLock()
+ d := e.dispatcher
+ e.mu.RUnlock()
+ if d != nil {
+ d.DeliverNetworkPacket(remote, local, protocol, pkt)
+ }
+}
+
+// Attach implements stack.LinkEndpoint.
+func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.mu.Lock()
+ e.dispatcher = dispatcher
+ e.mu.Unlock()
+ // If we're attaching to a valid dispatcher, pass embedder as the dispatcher
+ // to our child, otherwise detach the child by giving it a nil dispatcher.
+ var pass stack.NetworkDispatcher
+ if dispatcher != nil {
+ pass = e.embedder
+ }
+ e.child.Attach(pass)
+}
+
+// IsAttached implements stack.LinkEndpoint.
+func (e *Endpoint) IsAttached() bool {
+ e.mu.RLock()
+ isAttached := e.dispatcher != nil
+ e.mu.RUnlock()
+ return isAttached
+}
+
+// MTU implements stack.LinkEndpoint.
+func (e *Endpoint) MTU() uint32 {
+ return e.child.MTU()
+}
+
+// Capabilities implements stack.LinkEndpoint.
+func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.child.Capabilities()
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.
+func (e *Endpoint) MaxHeaderLength() uint16 {
+ return e.child.MaxHeaderLength()
+}
+
+// LinkAddress implements stack.LinkEndpoint.
+func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.child.LinkAddress()
+}
+
+// WritePacket implements stack.LinkEndpoint.
+func (e *Endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ return e.child.WritePacket(r, gso, protocol, pkt)
+}
+
+// WritePackets implements stack.LinkEndpoint.
+func (e *Endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ return e.child.WritePackets(r, gso, pkts, protocol)
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.
+func (e *Endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ return e.child.WriteRawPacket(vv)
+}
+
+// Wait implements stack.LinkEndpoint.
+func (e *Endpoint) Wait() {
+ e.child.Wait()
+}
+
+// GSOMaxSize implements stack.GSOEndpoint.
+func (e *Endpoint) GSOMaxSize() uint32 {
+ if e, ok := e.child.(stack.GSOEndpoint); ok {
+ return e.GSOMaxSize()
+ }
+ return 0
+}
diff --git a/pkg/tcpip/link/nested/nested_test.go b/pkg/tcpip/link/nested/nested_test.go
new file mode 100644
index 000000000..c1a219f02
--- /dev/null
+++ b/pkg/tcpip/link/nested/nested_test.go
@@ -0,0 +1,105 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nested_test
+
+import (
+ "testing"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/nested"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+type parentEndpoint struct {
+ nested.Endpoint
+}
+
+var _ stack.LinkEndpoint = (*parentEndpoint)(nil)
+var _ stack.NetworkDispatcher = (*parentEndpoint)(nil)
+
+type childEndpoint struct {
+ stack.LinkEndpoint
+ dispatcher stack.NetworkDispatcher
+}
+
+var _ stack.LinkEndpoint = (*childEndpoint)(nil)
+
+func (c *childEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ c.dispatcher = dispatcher
+}
+
+func (c *childEndpoint) IsAttached() bool {
+ return c.dispatcher != nil
+}
+
+type counterDispatcher struct {
+ count int
+}
+
+var _ stack.NetworkDispatcher = (*counterDispatcher)(nil)
+
+func (d *counterDispatcher) DeliverNetworkPacket(tcpip.LinkAddress, tcpip.LinkAddress, tcpip.NetworkProtocolNumber, *stack.PacketBuffer) {
+ d.count++
+}
+
+func TestNestedLinkEndpoint(t *testing.T) {
+ const emptyAddress = tcpip.LinkAddress("")
+
+ var (
+ childEP childEndpoint
+ nestedEP parentEndpoint
+ disp counterDispatcher
+ )
+ nestedEP.Endpoint.Init(&childEP, &nestedEP)
+
+ if childEP.IsAttached() {
+ t.Error("On init, childEP.IsAttached() = true, want = false")
+ }
+ if nestedEP.IsAttached() {
+ t.Error("On init, nestedEP.IsAttached() = true, want = false")
+ }
+
+ nestedEP.Attach(&disp)
+ if disp.count != 0 {
+ t.Fatalf("After attach, got disp.count = %d, want = 0", disp.count)
+ }
+ if !childEP.IsAttached() {
+ t.Error("After attach, childEP.IsAttached() = false, want = true")
+ }
+ if !nestedEP.IsAttached() {
+ t.Error("After attach, nestedEP.IsAttached() = false, want = true")
+ }
+
+ nestedEP.DeliverNetworkPacket(emptyAddress, emptyAddress, header.IPv4ProtocolNumber, &stack.PacketBuffer{})
+ if disp.count != 1 {
+ t.Errorf("After first packet with dispatcher attached, got disp.count = %d, want = 1", disp.count)
+ }
+
+ nestedEP.Attach(nil)
+ if childEP.IsAttached() {
+ t.Error("After detach, childEP.IsAttached() = true, want = false")
+ }
+ if nestedEP.IsAttached() {
+ t.Error("After detach, nestedEP.IsAttached() = true, want = false")
+ }
+
+ disp.count = 0
+ nestedEP.DeliverNetworkPacket(emptyAddress, emptyAddress, header.IPv4ProtocolNumber, &stack.PacketBuffer{})
+ if disp.count != 0 {
+ t.Errorf("After second packet with dispatcher detached, got disp.count = %d, want = 0", disp.count)
+ }
+
+}
diff --git a/pkg/tcpip/link/qdisc/fifo/BUILD b/pkg/tcpip/link/qdisc/fifo/BUILD
new file mode 100644
index 000000000..054c213bc
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/BUILD
@@ -0,0 +1,19 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "fifo",
+ srcs = [
+ "endpoint.go",
+ "packet_buffer_queue.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/sleep",
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
new file mode 100644
index 000000000..b5dfb7850
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -0,0 +1,209 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fifo provides the implementation of data-link layer endpoints that
+// wrap another endpoint and queues all outbound packets and asynchronously
+// dispatches them to the lower endpoint.
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// endpoint represents a LinkEndpoint which implements a FIFO queue for all
+// outgoing packets. endpoint can have 1 or more underlying queueDispatchers.
+// All outgoing packets are consistenly hashed to a single underlying queue
+// using the PacketBuffer.Hash if set, otherwise all packets are queued to the
+// first queue to avoid reordering in case of missing hash.
+type endpoint struct {
+ dispatcher stack.NetworkDispatcher
+ lower stack.LinkEndpoint
+ wg sync.WaitGroup
+ dispatchers []*queueDispatcher
+}
+
+// queueDispatcher is responsible for dispatching all outbound packets in its
+// queue. It will also smartly batch packets when possible and write them
+// through the lower LinkEndpoint.
+type queueDispatcher struct {
+ lower stack.LinkEndpoint
+ q *packetBufferQueue
+ newPacketWaker sleep.Waker
+ closeWaker sleep.Waker
+}
+
+// New creates a new fifo link endpoint with the n queues with maximum
+// capacity of queueLen.
+func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
+ e := &endpoint{
+ lower: lower,
+ }
+ // Create the required dispatchers
+ for i := 0; i < n; i++ {
+ qd := &queueDispatcher{
+ q: &packetBufferQueue{limit: queueLen},
+ lower: lower,
+ }
+ e.dispatchers = append(e.dispatchers, qd)
+ e.wg.Add(1)
+ go func() {
+ defer e.wg.Done()
+ qd.dispatchLoop()
+ }()
+ }
+ return e
+}
+
+func (q *queueDispatcher) dispatchLoop() {
+ const newPacketWakerID = 1
+ const closeWakerID = 2
+ s := sleep.Sleeper{}
+ s.AddWaker(&q.newPacketWaker, newPacketWakerID)
+ s.AddWaker(&q.closeWaker, closeWakerID)
+ defer s.Done()
+
+ const batchSize = 32
+ var batch stack.PacketBufferList
+ for {
+ id, ok := s.Fetch(true)
+ if ok && id == closeWakerID {
+ return
+ }
+ for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
+ batch.PushBack(pkt)
+ if batch.Len() < batchSize && !q.q.empty() {
+ continue
+ }
+ // We pass a protocol of zero here because each packet carries its
+ // NetworkProtocol.
+ q.lower.WritePackets(nil /* route */, nil /* gso */, batch, 0 /* protocol */)
+ for pkt := batch.Front(); pkt != nil; pkt = pkt.Next() {
+ pkt.EgressRoute.Release()
+ batch.Remove(pkt)
+ }
+ batch.Reset()
+ }
+ }
+}
+
+// DeliverNetworkPacket implements stack.NetworkDispatcher.DeliverNetworkPacket.
+func (e *endpoint) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket(remote, local, protocol, pkt)
+}
+
+// Attach implements stack.LinkEndpoint.Attach.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+ e.lower.Attach(e)
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU.
+func (e *endpoint) MTU() uint32 {
+ return e.lower.MTU()
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (e *endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.lower.Capabilities()
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength.
+func (e *endpoint) MaxHeaderLength() uint16 {
+ return e.lower.MaxHeaderLength()
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.lower.LinkAddress()
+}
+
+// GSOMaxSize returns the maximum GSO packet size.
+func (e *endpoint) GSOMaxSize() uint32 {
+ if gso, ok := e.lower.(stack.GSOEndpoint); ok {
+ return gso.GSOMaxSize()
+ }
+ return 0
+}
+
+// WritePacket implements stack.LinkEndpoint.WritePacket.
+func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ // WritePacket caller's do not set the following fields in PacketBuffer
+ // so we populate them here.
+ newRoute := r.Clone()
+ pkt.EgressRoute = &newRoute
+ pkt.GSOOptions = gso
+ pkt.NetworkProtocolNumber = protocol
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ if !d.q.enqueue(pkt) {
+ return tcpip.ErrNoBufferSpace
+ }
+ d.newPacketWaker.Assert()
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+//
+// Being a batch API, each packet in pkts should have the following fields
+// populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ enqueued := 0
+ for pkt := pkts.Front(); pkt != nil; {
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ nxt := pkt.Next()
+ // Since qdisc can hold onto a packet for long we should Clone
+ // the route here to ensure it doesn't get released while the
+ // packet is still in our queue.
+ newRoute := pkt.EgressRoute.Clone()
+ pkt.EgressRoute = &newRoute
+ if !d.q.enqueue(pkt) {
+ if enqueued > 0 {
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, tcpip.ErrNoBufferSpace
+ }
+ pkt = nxt
+ enqueued++
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, nil
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ return e.lower.WriteRawPacket(vv)
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (e *endpoint) Wait() {
+ e.lower.Wait()
+
+ // The linkEP is gone. Teardown the outbound dispatcher goroutines.
+ for i := range e.dispatchers {
+ e.dispatchers[i].closeWaker.Assert()
+ }
+
+ e.wg.Wait()
+}
diff --git a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
new file mode 100644
index 000000000..eb5abb906
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
@@ -0,0 +1,84 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// packetBufferQueue is a bounded, thread-safe queue of PacketBuffers.
+//
+type packetBufferQueue struct {
+ mu sync.Mutex
+ list stack.PacketBufferList
+ limit int
+ used int
+}
+
+// emptyLocked determines if the queue is empty.
+// Preconditions: q.mu must be held.
+func (q *packetBufferQueue) emptyLocked() bool {
+ return q.used == 0
+}
+
+// empty determines if the queue is empty.
+func (q *packetBufferQueue) empty() bool {
+ q.mu.Lock()
+ r := q.emptyLocked()
+ q.mu.Unlock()
+
+ return r
+}
+
+// setLimit updates the limit. No PacketBuffers are immediately dropped in case
+// the queue becomes full due to the new limit.
+func (q *packetBufferQueue) setLimit(limit int) {
+ q.mu.Lock()
+ q.limit = limit
+ q.mu.Unlock()
+}
+
+// enqueue adds the given packet to the queue.
+//
+// Returns true when the PacketBuffer is successfully added to the queue, in
+// which case ownership of the reference is transferred to the queue. And
+// returns false if the queue is full, in which case ownership is retained by
+// the caller.
+func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool {
+ q.mu.Lock()
+ r := q.used < q.limit
+ if r {
+ q.list.PushBack(s)
+ q.used++
+ }
+ q.mu.Unlock()
+
+ return r
+}
+
+// dequeue removes and returns the next PacketBuffer from queue, if one exists.
+// Ownership is transferred to the caller.
+func (q *packetBufferQueue) dequeue() *stack.PacketBuffer {
+ q.mu.Lock()
+ s := q.list.Front()
+ if s != nil {
+ q.list.Remove(s)
+ q.used--
+ }
+ q.mu.Unlock()
+
+ return s
+}
diff --git a/pkg/tcpip/link/rawfile/BUILD b/pkg/tcpip/link/rawfile/BUILD
new file mode 100644
index 000000000..14b527bc2
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/BUILD
@@ -0,0 +1,20 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "rawfile",
+ srcs = [
+ "blockingpoll_amd64.s",
+ "blockingpoll_arm64.s",
+ "blockingpoll_noyield_unsafe.go",
+ "blockingpoll_yield_unsafe.go",
+ "errors.go",
+ "rawfile_unsafe.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/tcpip",
+ "@org_golang_x_sys//unix:go_default_library",
+ ],
+)
diff --git a/pkg/tcpip/link/rawfile/blockingpoll_amd64.s b/pkg/tcpip/link/rawfile/blockingpoll_amd64.s
new file mode 100644
index 000000000..298bad55d
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/blockingpoll_amd64.s
@@ -0,0 +1,41 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "textflag.h"
+
+// BlockingPoll makes the ppoll() syscall while calling the version of
+// entersyscall that relinquishes the P so that other Gs can run. This is meant
+// to be called in cases when the syscall is expected to block.
+//
+// func BlockingPoll(fds *PollEvent, nfds int, timeout *syscall.Timespec) (n int, err syscall.Errno)
+TEXT ·BlockingPoll(SB),NOSPLIT,$0-40
+ CALL ·callEntersyscallblock(SB)
+ MOVQ fds+0(FP), DI
+ MOVQ nfds+8(FP), SI
+ MOVQ timeout+16(FP), DX
+ MOVQ $0x0, R10 // sigmask parameter which isn't used here
+ MOVQ $0x10f, AX // SYS_PPOLL
+ SYSCALL
+ CMPQ AX, $0xfffffffffffff001
+ JLS ok
+ MOVQ $-1, n+24(FP)
+ NEGQ AX
+ MOVQ AX, err+32(FP)
+ CALL ·callExitsyscall(SB)
+ RET
+ok:
+ MOVQ AX, n+24(FP)
+ MOVQ $0, err+32(FP)
+ CALL ·callExitsyscall(SB)
+ RET
diff --git a/pkg/tcpip/link/rawfile/blockingpoll_arm64.s b/pkg/tcpip/link/rawfile/blockingpoll_arm64.s
new file mode 100644
index 000000000..b62888b93
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/blockingpoll_arm64.s
@@ -0,0 +1,42 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "textflag.h"
+
+// BlockingPoll makes the ppoll() syscall while calling the version of
+// entersyscall that relinquishes the P so that other Gs can run. This is meant
+// to be called in cases when the syscall is expected to block.
+//
+// func BlockingPoll(fds *PollEvent, nfds int, timeout *syscall.Timespec) (n int, err syscall.Errno)
+TEXT ·BlockingPoll(SB),NOSPLIT,$0-40
+ BL ·callEntersyscallblock(SB)
+ MOVD fds+0(FP), R0
+ MOVD nfds+8(FP), R1
+ MOVD timeout+16(FP), R2
+ MOVD $0x0, R3 // sigmask parameter which isn't used here
+ MOVD $0x49, R8 // SYS_PPOLL
+ SVC
+ CMP $0xfffffffffffff001, R0
+ BLS ok
+ MOVD $-1, R1
+ MOVD R1, n+24(FP)
+ NEG R0, R0
+ MOVD R0, err+32(FP)
+ BL ·callExitsyscall(SB)
+ RET
+ok:
+ MOVD R0, n+24(FP)
+ MOVD $0, err+32(FP)
+ BL ·callExitsyscall(SB)
+ RET
diff --git a/pkg/tcpip/link/rawfile/blockingpoll_noyield_unsafe.go b/pkg/tcpip/link/rawfile/blockingpoll_noyield_unsafe.go
new file mode 100644
index 000000000..621ab8d29
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/blockingpoll_noyield_unsafe.go
@@ -0,0 +1,31 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux,!amd64,!arm64
+
+package rawfile
+
+import (
+ "syscall"
+ "unsafe"
+)
+
+// BlockingPoll is just a stub function that forwards to the ppoll() system call
+// on non-amd64 and non-arm64 platforms.
+func BlockingPoll(fds *PollEvent, nfds int, timeout *syscall.Timespec) (int, syscall.Errno) {
+ n, _, e := syscall.Syscall6(syscall.SYS_PPOLL, uintptr(unsafe.Pointer(fds)),
+ uintptr(nfds), uintptr(unsafe.Pointer(timeout)), 0, 0, 0)
+
+ return int(n), e
+}
diff --git a/pkg/tcpip/link/rawfile/blockingpoll_yield_unsafe.go b/pkg/tcpip/link/rawfile/blockingpoll_yield_unsafe.go
new file mode 100644
index 000000000..99313ee25
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/blockingpoll_yield_unsafe.go
@@ -0,0 +1,66 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux,amd64 linux,arm64
+// +build go1.12
+// +build !go1.16
+
+// Check go:linkname function signatures when updating Go version.
+
+package rawfile
+
+import (
+ "syscall"
+ _ "unsafe" // for go:linkname
+)
+
+// BlockingPoll on amd64/arm64 makes the ppoll() syscall while calling the
+// version of entersyscall that relinquishes the P so that other Gs can
+// run. This is meant to be called in cases when the syscall is expected to
+// block. On non amd64/arm64 platforms it just forwards to the ppoll() system
+// call.
+//
+//go:noescape
+func BlockingPoll(fds *PollEvent, nfds int, timeout *syscall.Timespec) (int, syscall.Errno)
+
+// Use go:linkname to call into the runtime. As of Go 1.12 this has to
+// be done from Go code so that we make an ABIInternal call to an
+// ABIInternal function; see https://golang.org/issue/27539.
+
+// We need to call both entersyscallblock and exitsyscall this way so
+// that the runtime's check on the stack pointer lines up.
+
+// Note that calling an unexported function in the runtime package is
+// unsafe and this hack is likely to break in future Go releases.
+
+//go:linkname entersyscallblock runtime.entersyscallblock
+func entersyscallblock()
+
+//go:linkname exitsyscall runtime.exitsyscall
+func exitsyscall()
+
+// These forwarding functions must be nosplit because 1) we must
+// disallow preemption between entersyscallblock and exitsyscall, and
+// 2) we have an untyped assembly frame on the stack which can not be
+// grown or moved.
+
+//go:nosplit
+func callEntersyscallblock() {
+ entersyscallblock()
+}
+
+//go:nosplit
+func callExitsyscall() {
+ exitsyscall()
+}
diff --git a/pkg/tcpip/link/rawfile/errors.go b/pkg/tcpip/link/rawfile/errors.go
new file mode 100644
index 000000000..a0a873c84
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/errors.go
@@ -0,0 +1,70 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package rawfile
+
+import (
+ "fmt"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+)
+
+const maxErrno = 134
+
+var translations [maxErrno]*tcpip.Error
+
+// TranslateErrno translate an errno from the syscall package into a
+// *tcpip.Error.
+//
+// Valid, but unrecognized errnos will be translated to
+// tcpip.ErrInvalidEndpointState (EINVAL). Panics on invalid errnos.
+func TranslateErrno(e syscall.Errno) *tcpip.Error {
+ if err := translations[e]; err != nil {
+ return err
+ }
+ return tcpip.ErrInvalidEndpointState
+}
+
+func addTranslation(host syscall.Errno, trans *tcpip.Error) {
+ if translations[host] != nil {
+ panic(fmt.Sprintf("duplicate translation for host errno %q (%d)", host.Error(), host))
+ }
+ translations[host] = trans
+}
+
+func init() {
+ addTranslation(syscall.EEXIST, tcpip.ErrDuplicateAddress)
+ addTranslation(syscall.ENETUNREACH, tcpip.ErrNoRoute)
+ addTranslation(syscall.EINVAL, tcpip.ErrInvalidEndpointState)
+ addTranslation(syscall.EALREADY, tcpip.ErrAlreadyConnecting)
+ addTranslation(syscall.EISCONN, tcpip.ErrAlreadyConnected)
+ addTranslation(syscall.EADDRINUSE, tcpip.ErrPortInUse)
+ addTranslation(syscall.EADDRNOTAVAIL, tcpip.ErrBadLocalAddress)
+ addTranslation(syscall.EPIPE, tcpip.ErrClosedForSend)
+ addTranslation(syscall.EWOULDBLOCK, tcpip.ErrWouldBlock)
+ addTranslation(syscall.ECONNREFUSED, tcpip.ErrConnectionRefused)
+ addTranslation(syscall.ETIMEDOUT, tcpip.ErrTimeout)
+ addTranslation(syscall.EINPROGRESS, tcpip.ErrConnectStarted)
+ addTranslation(syscall.EDESTADDRREQ, tcpip.ErrDestinationRequired)
+ addTranslation(syscall.ENOTSUP, tcpip.ErrNotSupported)
+ addTranslation(syscall.ENOTTY, tcpip.ErrQueueSizeNotSupported)
+ addTranslation(syscall.ENOTCONN, tcpip.ErrNotConnected)
+ addTranslation(syscall.ECONNRESET, tcpip.ErrConnectionReset)
+ addTranslation(syscall.ECONNABORTED, tcpip.ErrConnectionAborted)
+ addTranslation(syscall.EMSGSIZE, tcpip.ErrMessageTooLong)
+ addTranslation(syscall.ENOBUFS, tcpip.ErrNoBufferSpace)
+}
diff --git a/pkg/tcpip/link/rawfile/rawfile_unsafe.go b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
new file mode 100644
index 000000000..69de6eb3e
--- /dev/null
+++ b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
@@ -0,0 +1,192 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+// Package rawfile contains utilities for using the netstack with raw host
+// files on Linux hosts.
+package rawfile
+
+import (
+ "syscall"
+ "unsafe"
+
+ "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/tcpip"
+)
+
+// GetMTU determines the MTU of a network interface device.
+func GetMTU(name string) (uint32, error) {
+ fd, err := syscall.Socket(syscall.AF_UNIX, syscall.SOCK_DGRAM, 0)
+ if err != nil {
+ return 0, err
+ }
+
+ defer syscall.Close(fd)
+
+ var ifreq struct {
+ name [16]byte
+ mtu int32
+ _ [20]byte
+ }
+
+ copy(ifreq.name[:], name)
+ _, _, errno := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), syscall.SIOCGIFMTU, uintptr(unsafe.Pointer(&ifreq)))
+ if errno != 0 {
+ return 0, errno
+ }
+
+ return uint32(ifreq.mtu), nil
+}
+
+// NonBlockingWrite writes the given buffer to a file descriptor. It fails if
+// partial data is written.
+func NonBlockingWrite(fd int, buf []byte) *tcpip.Error {
+ var ptr unsafe.Pointer
+ if len(buf) > 0 {
+ ptr = unsafe.Pointer(&buf[0])
+ }
+
+ _, _, e := syscall.RawSyscall(syscall.SYS_WRITE, uintptr(fd), uintptr(ptr), uintptr(len(buf)))
+ if e != 0 {
+ return TranslateErrno(e)
+ }
+
+ return nil
+}
+
+// NonBlockingWrite3 writes up to three byte slices to a file descriptor in a
+// single syscall. It fails if partial data is written.
+func NonBlockingWrite3(fd int, b1, b2, b3 []byte) *tcpip.Error {
+ // If there is no second and third buffer, issue a regular write.
+ if len(b2) == 0 && len(b3) == 0 {
+ return NonBlockingWrite(fd, b1)
+ }
+
+ // Build the iovec that represents them and issue a writev syscall.
+ iovec := [3]syscall.Iovec{
+ {
+ Base: &b1[0],
+ Len: uint64(len(b1)),
+ },
+ {
+ Base: &b2[0],
+ Len: uint64(len(b2)),
+ },
+ }
+ iovecLen := uintptr(2)
+
+ if len(b3) > 0 {
+ iovecLen++
+ iovec[2].Base = &b3[0]
+ iovec[2].Len = uint64(len(b3))
+ }
+
+ _, _, e := syscall.RawSyscall(syscall.SYS_WRITEV, uintptr(fd), uintptr(unsafe.Pointer(&iovec[0])), iovecLen)
+ if e != 0 {
+ return TranslateErrno(e)
+ }
+
+ return nil
+}
+
+// NonBlockingSendMMsg sends multiple messages on a socket.
+func NonBlockingSendMMsg(fd int, msgHdrs []MMsgHdr) (int, *tcpip.Error) {
+ n, _, e := syscall.RawSyscall6(unix.SYS_SENDMMSG, uintptr(fd), uintptr(unsafe.Pointer(&msgHdrs[0])), uintptr(len(msgHdrs)), syscall.MSG_DONTWAIT, 0, 0)
+ if e != 0 {
+ return 0, TranslateErrno(e)
+ }
+
+ return int(n), nil
+}
+
+// PollEvent represents the pollfd structure passed to a poll() system call.
+type PollEvent struct {
+ FD int32
+ Events int16
+ Revents int16
+}
+
+// BlockingRead reads from a file descriptor that is set up as non-blocking. If
+// no data is available, it will block in a poll() syscall until the file
+// descriptor becomes readable.
+func BlockingRead(fd int, b []byte) (int, *tcpip.Error) {
+ for {
+ n, _, e := syscall.RawSyscall(syscall.SYS_READ, uintptr(fd), uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)))
+ if e == 0 {
+ return int(n), nil
+ }
+
+ event := PollEvent{
+ FD: int32(fd),
+ Events: 1, // POLLIN
+ }
+
+ _, e = BlockingPoll(&event, 1, nil)
+ if e != 0 && e != syscall.EINTR {
+ return 0, TranslateErrno(e)
+ }
+ }
+}
+
+// BlockingReadv reads from a file descriptor that is set up as non-blocking and
+// stores the data in a list of iovecs buffers. If no data is available, it will
+// block in a poll() syscall until the file descriptor becomes readable.
+func BlockingReadv(fd int, iovecs []syscall.Iovec) (int, *tcpip.Error) {
+ for {
+ n, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
+ if e == 0 {
+ return int(n), nil
+ }
+
+ event := PollEvent{
+ FD: int32(fd),
+ Events: 1, // POLLIN
+ }
+
+ _, e = BlockingPoll(&event, 1, nil)
+ if e != 0 && e != syscall.EINTR {
+ return 0, TranslateErrno(e)
+ }
+ }
+}
+
+// MMsgHdr represents the mmsg_hdr structure required by recvmmsg() on linux.
+type MMsgHdr struct {
+ Msg syscall.Msghdr
+ Len uint32
+ _ [4]byte
+}
+
+// BlockingRecvMMsg reads from a file descriptor that is set up as non-blocking
+// and stores the received messages in a slice of MMsgHdr structures. If no data
+// is available, it will block in a poll() syscall until the file descriptor
+// becomes readable.
+func BlockingRecvMMsg(fd int, msgHdrs []MMsgHdr) (int, *tcpip.Error) {
+ for {
+ n, _, e := syscall.RawSyscall6(syscall.SYS_RECVMMSG, uintptr(fd), uintptr(unsafe.Pointer(&msgHdrs[0])), uintptr(len(msgHdrs)), syscall.MSG_DONTWAIT, 0, 0)
+ if e == 0 {
+ return int(n), nil
+ }
+
+ event := PollEvent{
+ FD: int32(fd),
+ Events: 1, // POLLIN
+ }
+
+ if _, e := BlockingPoll(&event, 1, nil); e != 0 && e != syscall.EINTR {
+ return 0, TranslateErrno(e)
+ }
+ }
+}
diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD
new file mode 100644
index 000000000..13243ebbb
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/BUILD
@@ -0,0 +1,41 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "sharedmem",
+ srcs = [
+ "rx.go",
+ "sharedmem.go",
+ "sharedmem_unsafe.go",
+ "tx.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/log",
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/rawfile",
+ "//pkg/tcpip/link/sharedmem/queue",
+ "//pkg/tcpip/stack",
+ ],
+)
+
+go_test(
+ name = "sharedmem_test",
+ srcs = [
+ "sharedmem_test.go",
+ ],
+ library = ":sharedmem",
+ deps = [
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/sharedmem/pipe",
+ "//pkg/tcpip/link/sharedmem/queue",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD
new file mode 100644
index 000000000..87020ec08
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/BUILD
@@ -0,0 +1,23 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "pipe",
+ srcs = [
+ "pipe.go",
+ "pipe_unsafe.go",
+ "rx.go",
+ "tx.go",
+ ],
+ visibility = ["//visibility:public"],
+)
+
+go_test(
+ name = "pipe_test",
+ srcs = [
+ "pipe_test.go",
+ ],
+ library = ":pipe",
+ deps = ["//pkg/sync"],
+)
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go
new file mode 100644
index 000000000..74c9f0311
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe.go
@@ -0,0 +1,78 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package pipe implements a shared memory ring buffer on which a single reader
+// and a single writer can operate (read/write) concurrently. The ring buffer
+// allows for data of different sizes to be written, and preserves the boundary
+// of the written data.
+//
+// Example usage is as follows:
+//
+// wb := t.Push(20)
+// // Write data to wb.
+// t.Flush()
+//
+// rb := r.Pull()
+// // Do something with data in rb.
+// t.Flush()
+package pipe
+
+import (
+ "math"
+)
+
+const (
+ jump uint64 = math.MaxUint32 + 1
+ offsetMask uint64 = math.MaxUint32
+ revolutionMask uint64 = ^offsetMask
+
+ sizeOfSlotHeader = 8 // sizeof(uint64)
+ slotFree uint64 = 1 << 63
+ slotSizeMask uint64 = math.MaxUint32
+)
+
+// payloadToSlotSize calculates the total size of a slot based on its payload
+// size. The total size is the header size, plus the payload size, plus padding
+// if necessary to make the total size a multiple of sizeOfSlotHeader.
+func payloadToSlotSize(payloadSize uint64) uint64 {
+ s := sizeOfSlotHeader + payloadSize
+ return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1)
+}
+
+// slotToPayloadSize calculates the payload size of a slot based on the total
+// size of the slot. This is only meant to be used when creating slots that
+// don't carry information (e.g., free slots or wrap slots).
+func slotToPayloadSize(offset uint64) uint64 {
+ return offset - sizeOfSlotHeader
+}
+
+// pipe is a basic data structure used by both (transmit & receive) ends of a
+// pipe. Indices into this pipe are split into two fields: offset, which counts
+// the number of bytes from the beginning of the buffer, and revolution, which
+// counts the number of times the index has wrapped around.
+type pipe struct {
+ buffer []byte
+}
+
+// init initializes the pipe buffer such that its size is a multiple of the size
+// of the slot header.
+func (p *pipe) init(b []byte) {
+ p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)]
+}
+
+// data returns a section of the buffer starting at the given index (which may
+// include revolution information) and with the given size.
+func (p *pipe) data(idx uint64, size uint64) []byte {
+ return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size]
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
new file mode 100644
index 000000000..dc239a0d0
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go
@@ -0,0 +1,518 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "math/rand"
+ "reflect"
+ "runtime"
+ "testing"
+
+ "gvisor.dev/gvisor/pkg/sync"
+)
+
+func TestSimpleReadWrite(t *testing.T) {
+ // Check that a simple write can be properly read from the rx side.
+ tr := rand.New(rand.NewSource(99))
+ rr := rand.New(rand.NewSource(99))
+
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ wb := tx.Push(10)
+ if wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ for i := range wb {
+ wb[i] = byte(tr.Intn(256))
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ rb := rx.Pull()
+ if len(rb) != 10 {
+ t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10)
+ }
+
+ for i := range rb {
+ if v := byte(rr.Intn(256)); v != rb[i] {
+ t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v)
+ }
+ }
+ rx.Flush()
+}
+
+func TestEmptyRead(t *testing.T) {
+ // Check that pulling from an empty pipe fails.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestTooLargeWrite(t *testing.T) {
+ // Check that writes that are too large are properly rejected.
+ b := make([]byte, 96)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(96); wb != nil {
+ t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe")
+ }
+
+ if wb := tx.Push(88); wb != nil {
+ t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe")
+ }
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+}
+
+func TestFullWrite(t *testing.T) {
+ // Check that writes fail when the pipe is full.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+
+ if wb := tx.Push(1); wb != nil {
+ t.Fatalf("Write succeeded on full pipe")
+ }
+}
+
+func TestFullAndFlushedWrite(t *testing.T) {
+ // Check that writes fail when the pipe is full and has already been
+ // flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(80); wb == nil {
+ t.Fatalf("Write of 80 bytes failed on 96-byte pipe")
+ }
+
+ tx.Flush()
+
+ if wb := tx.Push(1); wb != nil {
+ t.Fatalf("Write succeeded on full pipe")
+ }
+}
+
+func TestTxFlushTwice(t *testing.T) {
+ // Checks that a second consecutive tx flush is a no-op.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ // Make copy of original tx queue, flush it, then check that it didn't
+ // change.
+ orig := tx
+ tx.Flush()
+
+ if !reflect.DeepEqual(orig, tx) {
+ t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig)
+ }
+}
+
+func TestRxFlushTwice(t *testing.T) {
+ // Checks that a second consecutive rx flush is a no-op.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // Make copy of original rx queue, flush it, then check that it didn't
+ // change.
+ orig := rx
+ rx.Flush()
+
+ if !reflect.DeepEqual(orig, rx) {
+ t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig)
+ }
+}
+
+func TestWrapInMiddleOfTransaction(t *testing.T) {
+ // Check that writes are not flushed when we need to wrap the buffer
+ // around.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ // We haven't flushed yet, so pull must return nil.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ tx.Flush()
+
+ // The two buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+}
+
+func TestWriteAbort(t *testing.T) {
+ // Check that a read fails on a pipe that has had data pushed to it but
+ // has aborted the push.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Write failed on empty pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+
+ tx.Abort()
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestWrappedWriteAbort(t *testing.T) {
+ // Check that writes are properly aborted even if the writes wrap
+ // around.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment).
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ // We haven't flushed yet, so pull must return nil.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ tx.Abort()
+
+ // The pushes were aborted, so no data should be readable.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on non-flushed pipe")
+ }
+
+ // Try the same transactions again, but flush this time.
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ tx.Flush()
+
+ // The two buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+}
+
+func TestEmptyReadOnNonFlushedWrite(t *testing.T) {
+ // Check that a read fails on a pipe that has had data pushed to it
+ // but not yet flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Write failed on empty pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+
+ tx.Flush()
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull on failed on non-empty pipe")
+ }
+}
+
+func TestPullAfterPullingEntirePipe(t *testing.T) {
+ // Check that Pull fails when the pipe is full, but all of it has
+ // already been pulled but not yet flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3
+ // buffers that will fill the pipe.
+ if wb := tx.Push(10); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ if wb := tx.Push(20); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ if wb := tx.Push(24); wb == nil {
+ t.Fatalf("Push failed on non-full pipe")
+ }
+
+ tx.Flush()
+
+ // The three buffers must be available now.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+
+ // Fourth pull must fail.
+ if rb := rx.Pull(); rb != nil {
+ t.Fatalf("Pull succeeded on empty pipe")
+ }
+}
+
+func TestNoRoomToWrapOnPush(t *testing.T) {
+ // Check that Push fails when it tries to allocate room to add a wrap
+ // message.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ var rx Rx
+ rx.Init(b)
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // At this point the ring buffer is empty, but the write is at offset
+ // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20,
+ // which won't fit (64+20+8+padding = 96, which wouldn't leave room for
+ // the padding), so it wraps around.
+ if wb := tx.Push(20); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+
+ tx.Flush()
+
+ // Buffer offset is at 28. Try to write 70, which would require a wrap
+ // slot which cannot be created now.
+ if wb := tx.Push(70); wb != nil {
+ t.Fatalf("Push succeeded on pipe with no room for wrap message")
+ }
+}
+
+func TestRxImplicitFlushOfWrapMessage(t *testing.T) {
+ // Check if the first read is that of a wrapping message, that it gets
+ // immediately flushed.
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ if wb := tx.Push(50); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+ tx.Flush()
+
+ // This will cause a wrapping message to written.
+ if wb := tx.Push(60); wb != nil {
+ t.Fatalf("Push succeeded when there is no room in pipe")
+ }
+
+ var rx Rx
+ rx.Init(b)
+
+ // Read the first message.
+ if rb := rx.Pull(); rb == nil {
+ t.Fatalf("Pull failed on non-empty pipe")
+ }
+ rx.Flush()
+
+ // This should fail because of the wrapping message is taking up space.
+ if wb := tx.Push(60); wb != nil {
+ t.Fatalf("Push succeeded when there is no room in pipe")
+ }
+
+ // Try to read the next one. This should consume the wrapping message.
+ rx.Pull()
+
+ // This must now succeed.
+ if wb := tx.Push(60); wb == nil {
+ t.Fatalf("Push failed on empty pipe")
+ }
+}
+
+func TestConcurrentReaderWriter(t *testing.T) {
+ // Push a million buffers of random sizes and random contents. Check
+ // that buffers read match what was written.
+ tr := rand.New(rand.NewSource(99))
+ rr := rand.New(rand.NewSource(99))
+
+ b := make([]byte, 100)
+ var tx Tx
+ tx.Init(b)
+
+ var rx Rx
+ rx.Init(b)
+
+ const count = 1000000
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ runtime.Gosched()
+ for i := 0; i < count; i++ {
+ n := 1 + tr.Intn(80)
+ wb := tx.Push(uint64(n))
+ for wb == nil {
+ wb = tx.Push(uint64(n))
+ }
+
+ for j := range wb {
+ wb[j] = byte(tr.Intn(256))
+ }
+
+ tx.Flush()
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ runtime.Gosched()
+ for i := 0; i < count; i++ {
+ n := 1 + rr.Intn(80)
+ rb := rx.Pull()
+ for rb == nil {
+ rb = rx.Pull()
+ }
+
+ if n != len(rb) {
+ t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n)
+ }
+
+ for j := range rb {
+ if v := byte(rr.Intn(256)); v != rb[j] {
+ t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v)
+ }
+ }
+
+ rx.Flush()
+ }
+ }()
+
+ wg.Wait()
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
new file mode 100644
index 000000000..62d17029e
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go
@@ -0,0 +1,35 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+func (p *pipe) write(idx uint64, v uint64) {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ *ptr = v
+}
+
+func (p *pipe) writeAtomic(idx uint64, v uint64) {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ atomic.StoreUint64(ptr, v)
+}
+
+func (p *pipe) readAtomic(idx uint64) uint64 {
+ ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0]))
+ return atomic.LoadUint64(ptr)
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go
new file mode 100644
index 000000000..f22e533ac
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/rx.go
@@ -0,0 +1,93 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+// Rx is the receive side of the shared memory ring buffer.
+type Rx struct {
+ p pipe
+
+ tail uint64
+ head uint64
+}
+
+// Init initializes the receive end of the pipe. In the initial state, the next
+// slot to be inspected is the very first one.
+func (r *Rx) Init(b []byte) {
+ r.p.init(b)
+ r.tail = 0xfffffffe * jump
+ r.head = r.tail
+}
+
+// Pull reads the next buffer from the pipe, returning nil if there isn't one
+// currently available.
+//
+// The returned slice is available until Flush() is next called. After that, it
+// must not be touched.
+func (r *Rx) Pull() []byte {
+ if r.head == r.tail+jump {
+ // We've already pulled the whole pipe.
+ return nil
+ }
+
+ header := r.p.readAtomic(r.head)
+ if header&slotFree != 0 {
+ // The next slot is free, we can't pull it yet.
+ return nil
+ }
+
+ payloadSize := header & slotSizeMask
+ newHead := r.head + payloadToSlotSize(payloadSize)
+ headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
+
+ // Check if this is a wrapping slot. If that's the case, it carries no
+ // data, so we just skip it and try again from the first slot.
+ if int64(newHead-headWrap) >= 0 {
+ if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
+ return nil
+ }
+
+ if r.tail == r.head {
+ // If this is the first pull since the last Flush()
+ // call, we flush the state so that the sender can use
+ // this space if it needs to.
+ r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
+ r.tail = newHead
+ }
+
+ r.head = newHead
+ return r.Pull()
+ }
+
+ // Grab the buffer before updating r.head.
+ b := r.p.data(r.head, payloadSize)
+ r.head = newHead
+ return b
+}
+
+// Flush tells the transmitter that all buffers pulled since the last Flush()
+// have been used, so the transmitter is free to used their slots for further
+// transmission.
+func (r *Rx) Flush() {
+ if r.head == r.tail {
+ return
+ }
+ r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
+ r.tail = r.head
+}
+
+// Bytes returns the byte slice on which the pipe operates.
+func (r *Rx) Bytes() []byte {
+ return r.p.buffer
+}
diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go
new file mode 100644
index 000000000..9841eb231
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/pipe/tx.go
@@ -0,0 +1,161 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipe
+
+// Tx is the transmit side of the shared memory ring buffer.
+type Tx struct {
+ p pipe
+ maxPayloadSize uint64
+
+ head uint64
+ tail uint64
+ next uint64
+
+ tailHeader uint64
+}
+
+// Init initializes the transmit end of the pipe. In the initial state, the next
+// slot to be written is the very first one, and the transmitter has the whole
+// ring buffer available to it.
+func (t *Tx) Init(b []byte) {
+ t.p.init(b)
+ // maxPayloadSize excludes the header of the payload, and the header
+ // of the wrapping message.
+ t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
+ t.tail = 0xfffffffe * jump
+ t.next = t.tail
+ t.head = t.tail + jump
+ t.p.write(t.tail, slotFree)
+}
+
+// Capacity determines how many records of the given size can be written to the
+// pipe before it fills up.
+func (t *Tx) Capacity(recordSize uint64) uint64 {
+ available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
+ entryLen := payloadToSlotSize(recordSize)
+ return available / entryLen
+}
+
+// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
+// populates the returned slice with the data to be transferred and enventually
+// calls Flush() to make the data visible to the reader, or Abort() to make the
+// pipe forget all Push() calls since the last Flush().
+//
+// The returned slice is available until Flush() or Abort() is next called.
+// After that, it must not be touched.
+func (t *Tx) Push(payloadSize uint64) []byte {
+ // Fail request if we know we will never have enough room.
+ if payloadSize > t.maxPayloadSize {
+ return nil
+ }
+
+ totalLen := payloadToSlotSize(payloadSize)
+ newNext := t.next + totalLen
+ nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
+ if int64(newNext-nextWrap) >= 0 {
+ // The new buffer would overflow the pipe, so we push a wrapping
+ // slot, then try to add the actual slot to the front of the
+ // pipe.
+ newNext = (newNext & revolutionMask) + jump
+ wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
+ if !t.reclaim(newNext) {
+ return nil
+ }
+
+ oldNext := t.next
+ t.next = newNext
+ if oldNext != t.tail {
+ t.p.write(oldNext, wrappingPayloadSize)
+ } else {
+ t.tailHeader = wrappingPayloadSize
+ t.Flush()
+ }
+
+ newNext += totalLen
+ }
+
+ // Check that we have enough room for the buffer.
+ if !t.reclaim(newNext) {
+ return nil
+ }
+
+ if t.next != t.tail {
+ t.p.write(t.next, payloadSize)
+ } else {
+ t.tailHeader = payloadSize
+ }
+
+ // Grab the buffer before updating t.next.
+ b := t.p.data(t.next, payloadSize)
+ t.next = newNext
+
+ return b
+}
+
+// reclaim attempts to advance the head until at least newNext. If the head is
+// already at or beyond newNext, nothing happens and true is returned; otherwise
+// it tries to reclaim slots that have already been consumed by the receive end
+// of the pipe (they will be marked as free) and returns a boolean indicating
+// whether it was successful in reclaiming enough slots.
+func (t *Tx) reclaim(newNext uint64) bool {
+ for int64(newNext-t.head) > 0 {
+ // Can't reclaim if slot is not free.
+ header := t.p.readAtomic(t.head)
+ if header&slotFree == 0 {
+ return false
+ }
+
+ payloadSize := header & slotSizeMask
+ newHead := t.head + payloadToSlotSize(payloadSize)
+
+ // Check newHead is within bounds and valid.
+ if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
+ return false
+ }
+
+ t.head = newHead
+ }
+
+ return true
+}
+
+// Abort causes all Push() calls since the last Flush() to be forgotten and
+// therefore they will not be made visible to the receiver.
+func (t *Tx) Abort() {
+ t.next = t.tail
+}
+
+// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
+// is the most recent] to be made visible to the receiver.
+func (t *Tx) Flush() {
+ if t.next == t.tail {
+ // Nothing to do if there are no pushed buffers.
+ return
+ }
+
+ if t.next != t.head {
+ // The receiver will spin in t.next, so we must make sure that
+ // the slotFree bit is set.
+ t.p.write(t.next, slotFree)
+ }
+
+ t.p.writeAtomic(t.tail, t.tailHeader)
+ t.tail = t.next
+}
+
+// Bytes returns the byte slice on which the pipe operates.
+func (t *Tx) Bytes() []byte {
+ return t.p.buffer
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD
new file mode 100644
index 000000000..3ba06af73
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/BUILD
@@ -0,0 +1,27 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "queue",
+ srcs = [
+ "rx.go",
+ "tx.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/log",
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
+)
+
+go_test(
+ name = "queue_test",
+ srcs = [
+ "queue_test.go",
+ ],
+ library = ":queue",
+ deps = [
+ "//pkg/tcpip/link/sharedmem/pipe",
+ ],
+)
diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go
new file mode 100644
index 000000000..9a0aad5d7
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/queue_test.go
@@ -0,0 +1,517 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+func TestBasicTxQueue(t *testing.T) {
+ // Tests that a basic transmit on a queue works, and that completion
+ // gets properly reported as well.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Enqueue two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on empty queue")
+ }
+
+ // Check the contents of the pipe.
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+
+ want := []byte{
+ 234, 3, 0, 0, 0, 0, 0, 0, // id
+ 100, 0, 0, 0, // total size
+ 0, 0, 0, 0, // reserved
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ }
+
+ if !reflect.DeepEqual(want, d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want)
+ }
+
+ rxp.Flush()
+
+ // Check that there are no completions yet.
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Packet reported as completed too soon")
+ }
+
+ // Post a completion.
+ d = txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+
+ // Check that completion is properly reported.
+ id, ok := q.CompletedPacket()
+ if !ok {
+ t.Fatalf("Completion not reported")
+ }
+
+ if id != usedID {
+ t.Fatalf("Bad completion id: got %v, want %v", id, usedID)
+ }
+}
+
+func TestBasicRxQueue(t *testing.T) {
+ // Tests that a basic receive on a queue works.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on empty queue")
+ }
+
+ // Check the contents of the pipe.
+ want := [][]byte{
+ {
+ 100, 0, 0, 0, 0, 0, 0, 0, // Offset1
+ 60, 0, 0, 0, // Size1
+ 0, 0, 0, 0, // Remaining in group 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+ },
+ {
+ 200, 0, 0, 0, 0, 0, 0, 0, // Offset2
+ 40, 0, 0, 0, // Size2
+ 0, 0, 0, 0, // Remaining in group 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // User data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ },
+ }
+
+ for i := range b {
+ d := rxp.Pull()
+ if d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+
+ if !reflect.DeepEqual(want[i], d) {
+ t.Fatalf("Bad posted packet: got %v, want %v", d, want[i])
+ }
+
+ rxp.Flush()
+ }
+
+ // Check that there are no completions.
+ if _, n := q.Dequeue(nil); n != 0 {
+ t.Fatalf("Packet reported as received too soon")
+ }
+
+ // Post a completion.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+
+ // Check that completion is properly reported.
+ bufs, n := q.Dequeue(nil)
+ if n != 100 {
+ t.Fatalf("Bad packet size: got %v, want %v", n, 100)
+ }
+
+ if !reflect.DeepEqual(bufs, b) {
+ t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b)
+ }
+}
+
+func TestBadTxCompletion(t *testing.T) {
+ // Check that tx completions with bad sizes are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion that is too long, and check that it is ignored.
+ if d := txp.Push(10); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if _, ok := q.CompletedPacket(); ok {
+ t.Fatalf("Bad completion not ignored")
+ }
+}
+
+func TestBadRxCompletion(t *testing.T) {
+ // Check that bad rx completions are properly ignored.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post a completion that is too short, and check that it is ignored.
+ if d := txp.Push(7); d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+ txp.Flush()
+
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion whose buffer sizes add up to less than the total
+ // size.
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 10, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 10, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+
+ // Post a completion whose buffer sizes will cause a 32-bit overflow,
+ // but adds up to the right number.
+ d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 255, 255, 255, 255, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 101, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+ if b, _ := q.Dequeue(nil); b != nil {
+ t.Fatalf("Bad completion not ignored")
+ }
+}
+
+func TestFillTxPipe(t *testing.T) {
+ // Check that transmitting a new buffer when the buffer pipe is full
+ // fails gracefully.
+ pb1 := make([]byte, 104)
+ pb2 := make([]byte, 104)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Transmit twice, which should fill the tx pipe.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+ for i := uint64(0); i < 2; i++ {
+ if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Failed to transmit buffer")
+ }
+ }
+
+ // Transmit another packet now that the tx pipe is full.
+ if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue succeeded when tx pipe is full")
+ }
+}
+
+func TestFillRxPipe(t *testing.T) {
+ // Check that posting a new buffer when the buffer pipe is full fails
+ // gracefully.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Post a buffer twice, it should fill the tx pipe.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ }
+
+ for i := 0; i < 2; i++ {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+ }
+
+ // Post another buffer now that the tx pipe is full.
+ if q.PostBuffers(b) {
+ t.Fatalf("PostBuffers succeeded on full queue")
+ }
+}
+
+func TestLotsOfTransmissions(t *testing.T) {
+ // Make sure pipes are being properly flushed when transmitting packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Tx
+ q.Init(pb1, pb2)
+
+ // Prepare packet with two buffers.
+ b := []TxBuffer{
+ {nil, 100, 60},
+ {nil, 200, 40},
+ }
+
+ b[0].Next = &b[1]
+
+ const usedID = 1002
+ const usedTotalSize = 100
+
+ // Post 100000 packets and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) {
+ t.Fatalf("Enqueue failed on non-full queue")
+ }
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after Enqueue")
+ }
+ rxp.Flush()
+
+ d := txp.Push(8)
+ if d == nil {
+ t.Fatalf("Unable to write to rx pipe")
+ }
+ binary.LittleEndian.PutUint64(d, usedID)
+ txp.Flush()
+ if _, ok := q.CompletedPacket(); !ok {
+ t.Fatalf("Completion not returned")
+ }
+ }
+}
+
+func TestLotsOfReceptions(t *testing.T) {
+ // Make sure pipes are being properly flushed when receiving packets.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var rxp pipe.Rx
+ rxp.Init(pb1)
+
+ var txp pipe.Tx
+ txp.Init(pb2)
+
+ var q Rx
+ q.Init(pb1, pb2, nil)
+
+ // Prepare for posting two buffers.
+ b := []RxBuffer{
+ {100, 60, 1077, 0},
+ {200, 40, 2123, 0},
+ }
+
+ // Post 100000 buffers and completions.
+ for i := 100000; i > 0; i-- {
+ if !q.PostBuffers(b) {
+ t.Fatalf("PostBuffers failed on non-full queue")
+ }
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+
+ if d := rxp.Pull(); d == nil {
+ t.Fatalf("Tx pipe is empty after PostBuffers")
+ }
+ rxp.Flush()
+
+ d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer)
+ if d == nil {
+ t.Fatalf("Unable to push to rx pipe")
+ }
+
+ copy(d, []byte{
+ 100, 0, 0, 0, // packet size
+ 0, 0, 0, 0, // reserved
+
+ 100, 0, 0, 0, 0, 0, 0, 0, // offset 1
+ 60, 0, 0, 0, // size 1
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 1
+ 53, 4, 0, 0, 0, 0, 0, 0, // ID 1
+
+ 200, 0, 0, 0, 0, 0, 0, 0, // offset 2
+ 40, 0, 0, 0, // size 2
+ 0, 0, 0, 0, 0, 0, 0, 0, // user data 2
+ 75, 8, 0, 0, 0, 0, 0, 0, // ID 2
+ })
+
+ txp.Flush()
+
+ if _, n := q.Dequeue(nil); n == 0 {
+ t.Fatalf("Dequeue failed when there is a completion")
+ }
+ }
+}
+
+func TestRxEnableNotification(t *testing.T) {
+ // Check that enabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+
+ q.EnableNotification()
+ if state != eventFDEnabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled)
+ }
+}
+
+func TestRxDisableNotification(t *testing.T) {
+ // Check that disabling nofifications results in properly updated state.
+ pb1 := make([]byte, 100)
+ pb2 := make([]byte, 100)
+
+ var state uint32
+ var q Rx
+ q.Init(pb1, pb2, &state)
+
+ q.DisableNotification()
+ if state != eventFDDisabled {
+ t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled)
+ }
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go
new file mode 100644
index 000000000..696e6c9e5
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/rx.go
@@ -0,0 +1,221 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package queue provides the implementation of transmit and receive queues
+// based on shared memory ring buffers.
+package queue
+
+import (
+ "encoding/binary"
+ "sync/atomic"
+
+ "gvisor.dev/gvisor/pkg/log"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+const (
+ // Offsets within a posted buffer.
+ postedOffset = 0
+ postedSize = 8
+ postedRemainingInGroup = 12
+ postedUserData = 16
+ postedID = 24
+
+ sizeOfPostedBuffer = 32
+
+ // Offsets within a received packet header.
+ consumedPacketSize = 0
+ consumedPacketReserved = 4
+
+ sizeOfConsumedPacketHeader = 8
+
+ // Offsets within a consumed buffer.
+ consumedOffset = 0
+ consumedSize = 8
+ consumedUserData = 12
+ consumedID = 20
+
+ sizeOfConsumedBuffer = 28
+
+ // The following are the allowed states of the shared data area.
+ eventFDUninitialized = 0
+ eventFDDisabled = 1
+ eventFDEnabled = 2
+)
+
+// RxBuffer is the descriptor of a receive buffer.
+type RxBuffer struct {
+ Offset uint64
+ Size uint32
+ ID uint64
+ UserData uint64
+}
+
+// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx
+// pipe is used to "post" buffers, while the rx pipe is used to receive packets
+// whose contents have been written to previously posted buffers.
+//
+// This struct is thread-compatible.
+type Rx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+ sharedEventFDState *uint32
+}
+
+// Init initializes the receive queue with the given pipes, and shared state
+// pointer -- the latter is used to enable/disable eventfd notifications.
+func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) {
+ r.sharedEventFDState = sharedEventFDState
+ r.tx.Init(tx)
+ r.rx.Init(rx)
+}
+
+// EnableNotification updates the shared state such that the peer will notify
+// the eventfd when there are packets to be dequeued.
+func (r *Rx) EnableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled)
+}
+
+// DisableNotification updates the shared state such that the peer will not
+// notify the eventfd.
+func (r *Rx) DisableNotification() {
+ atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled)
+}
+
+// PostedBuffersLimit returns the maximum number of buffers that can be posted
+// before the tx queue fills up.
+func (r *Rx) PostedBuffersLimit() uint64 {
+ return r.tx.Capacity(sizeOfPostedBuffer)
+}
+
+// PostBuffers makes the given buffers available for receiving data from the
+// peer. Once they are posted, the peer is free to write to them and will
+// eventually post them back for consumption.
+func (r *Rx) PostBuffers(buffers []RxBuffer) bool {
+ for i := range buffers {
+ b := r.tx.Push(sizeOfPostedBuffer)
+ if b == nil {
+ r.tx.Abort()
+ return false
+ }
+
+ pb := &buffers[i]
+ binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset)
+ binary.LittleEndian.PutUint32(b[postedSize:], pb.Size)
+ binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0)
+ binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData)
+ binary.LittleEndian.PutUint64(b[postedID:], pb.ID)
+ }
+
+ r.tx.Flush()
+
+ return true
+}
+
+// Dequeue receives buffers that have been previously posted by PostBuffers()
+// and that have been filled by the peer and posted back.
+//
+// This is similar to append() in that new buffers are appended to "bufs", with
+// reallocation only if "bufs" doesn't have enough capacity.
+func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) {
+ for {
+ outBufs := bufs
+
+ // Pull the next descriptor from the rx pipe.
+ b := r.rx.Pull()
+ if b == nil {
+ return bufs, 0
+ }
+
+ if len(b) < sizeOfConsumedPacketHeader {
+ log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader)
+ r.rx.Flush()
+ continue
+ }
+
+ totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:])
+
+ // Calculate the number of buffer descriptors and copy them
+ // over to the output.
+ count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer
+ offset := sizeOfConsumedPacketHeader
+ buffersSize := uint32(0)
+ for i := count; i > 0; i-- {
+ s := binary.LittleEndian.Uint32(b[offset+consumedSize:])
+ buffersSize += s
+ if buffersSize < s {
+ // The buffer size overflows an unsigned 32-bit
+ // integer, so break out and force it to be
+ // ignored.
+ totalDataSize = 1
+ buffersSize = 0
+ break
+ }
+
+ outBufs = append(outBufs, RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]),
+ Size: s,
+ ID: binary.LittleEndian.Uint64(b[offset+consumedID:]),
+ })
+
+ offset += sizeOfConsumedBuffer
+ }
+
+ r.rx.Flush()
+
+ if buffersSize < totalDataSize {
+ // The descriptor is corrupted, ignore it.
+ log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize)
+ continue
+ }
+
+ return outBufs, totalDataSize
+ }
+}
+
+// Bytes returns the byte slices on which the queue operates.
+func (r *Rx) Bytes() (tx, rx []byte) {
+ return r.tx.Bytes(), r.rx.Bytes()
+}
+
+// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue.
+func DecodeRxBufferHeader(b []byte) RxBuffer {
+ return RxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[postedOffset:]),
+ Size: binary.LittleEndian.Uint32(b[postedSize:]),
+ ID: binary.LittleEndian.Uint64(b[postedID:]),
+ UserData: binary.LittleEndian.Uint64(b[postedUserData:]),
+ }
+}
+
+// RxCompletionSize returns the number of bytes needed to encode an rx
+// completion containing "count" buffers.
+func RxCompletionSize(count int) uint64 {
+ return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer
+}
+
+// EncodeRxCompletion encodes an rx completion header.
+func EncodeRxCompletion(b []byte, size, reserved uint32) {
+ binary.LittleEndian.PutUint32(b[consumedPacketSize:], size)
+ binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved)
+}
+
+// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header.
+func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) {
+ b = b[RxCompletionSize(i):]
+ binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset)
+ binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size)
+ binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData)
+ binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID)
+}
diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go
new file mode 100644
index 000000000..beffe807b
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/queue/tx.go
@@ -0,0 +1,151 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+ "encoding/binary"
+
+ "gvisor.dev/gvisor/pkg/log"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
+)
+
+const (
+ // Offsets within a packet header.
+ packetID = 0
+ packetSize = 8
+ packetReserved = 12
+
+ sizeOfPacketHeader = 16
+
+ // Offsets with a buffer descriptor
+ bufferOffset = 0
+ bufferSize = 8
+
+ sizeOfBufferDescriptor = 12
+)
+
+// TxBuffer is the descriptor of a transmit buffer.
+type TxBuffer struct {
+ Next *TxBuffer
+ Offset uint64
+ Size uint32
+}
+
+// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the
+// tx pipe is used to request the transmission of packets, while the rx pipe
+// is used to receive which transmissions have completed.
+//
+// This struct is thread-compatible.
+type Tx struct {
+ tx pipe.Tx
+ rx pipe.Rx
+}
+
+// Init initializes the transmit queue with the given pipes.
+func (t *Tx) Init(tx, rx []byte) {
+ t.tx.Init(tx)
+ t.rx.Init(rx)
+}
+
+// Enqueue queues the given linked list of buffers for transmission as one
+// packet. While it is queued, the caller must not modify them.
+func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool {
+ // Reserve room in the tx pipe.
+ totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor
+
+ b := t.tx.Push(totalLen)
+ if b == nil {
+ return false
+ }
+
+ // Initialize the packet and buffer descriptors.
+ binary.LittleEndian.PutUint64(b[packetID:], id)
+ binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen)
+ binary.LittleEndian.PutUint32(b[packetReserved:], 0)
+
+ offset := sizeOfPacketHeader
+ for i := bufferCount; i != 0; i-- {
+ binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset)
+ binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size)
+ offset += sizeOfBufferDescriptor
+ buffer = buffer.Next
+ }
+
+ t.tx.Flush()
+
+ return true
+}
+
+// CompletedPacket returns the id of the last completed transmission. The
+// returned id, if any, refers to a value passed on a previous call to
+// Enqueue().
+func (t *Tx) CompletedPacket() (id uint64, ok bool) {
+ for {
+ b := t.rx.Pull()
+ if b == nil {
+ return 0, false
+ }
+
+ if len(b) != 8 {
+ t.rx.Flush()
+ log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8)
+ continue
+ }
+
+ v := binary.LittleEndian.Uint64(b)
+
+ t.rx.Flush()
+
+ return v, true
+ }
+}
+
+// Bytes returns the byte slices on which the queue operates.
+func (t *Tx) Bytes() (tx, rx []byte) {
+ return t.tx.Bytes(), t.rx.Bytes()
+}
+
+// TxPacketInfo holds information about a packet sent on a tx queue.
+type TxPacketInfo struct {
+ ID uint64
+ Size uint32
+ Reserved uint32
+ BufferCount int
+}
+
+// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue.
+func DecodeTxPacketHeader(b []byte) TxPacketInfo {
+ return TxPacketInfo{
+ ID: binary.LittleEndian.Uint64(b[packetID:]),
+ Size: binary.LittleEndian.Uint32(b[packetSize:]),
+ Reserved: binary.LittleEndian.Uint32(b[packetReserved:]),
+ BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor,
+ }
+}
+
+// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent
+// over a tx queue.
+func DecodeTxBufferHeader(b []byte, i int) TxBuffer {
+ b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:]
+ return TxBuffer{
+ Offset: binary.LittleEndian.Uint64(b[bufferOffset:]),
+ Size: binary.LittleEndian.Uint32(b[bufferSize:]),
+ }
+}
+
+// EncodeTxCompletion encodes a tx completion header.
+func EncodeTxCompletion(b []byte, id uint64) {
+ binary.LittleEndian.PutUint64(b, id)
+}
diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go
new file mode 100644
index 000000000..eec11e4cb
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/rx.go
@@ -0,0 +1,159 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package sharedmem
+
+import (
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
+)
+
+// rx holds all state associated with an rx queue.
+type rx struct {
+ data []byte
+ sharedData []byte
+ q queue.Rx
+ eventFD int
+}
+
+// init initializes all state needed by the rx queue based on the information
+// provided.
+//
+// The caller always retains ownership of all file descriptors passed in. The
+// queue implementation will duplicate any that it may need in the future.
+func (r *rx) init(mtu uint32, c *QueueConfig) error {
+ // Map in all buffers.
+ txPipe, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ return err
+ }
+
+ rxPipe, err := getBuffer(c.RxPipeFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ return err
+ }
+
+ data, err := getBuffer(c.DataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ return err
+ }
+
+ sharedData, err := getBuffer(c.SharedDataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ return err
+ }
+
+ // Duplicate the eventFD so that caller can close it but we can still
+ // use it.
+ efd, err := syscall.Dup(c.EventFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ return err
+ }
+
+ // Set the eventfd as non-blocking.
+ if err := syscall.SetNonblock(efd, true); err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ syscall.Munmap(data)
+ syscall.Munmap(sharedData)
+ syscall.Close(efd)
+ return err
+ }
+
+ // Initialize state based on buffers.
+ r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
+ r.data = data
+ r.eventFD = efd
+ r.sharedData = sharedData
+
+ return nil
+}
+
+// cleanup releases all resources allocated during init(). It must only be
+// called if init() has previously succeeded.
+func (r *rx) cleanup() {
+ a, b := r.q.Bytes()
+ syscall.Munmap(a)
+ syscall.Munmap(b)
+
+ syscall.Munmap(r.data)
+ syscall.Munmap(r.sharedData)
+ syscall.Close(r.eventFD)
+}
+
+// postAndReceive posts the provided buffers (if any), and then tries to read
+// from the receive queue.
+//
+// Capacity permitting, it reuses the posted buffer slice to store the buffers
+// that were read as well.
+//
+// This function will block if there aren't any available packets.
+func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) {
+ // Post the buffers first. If we cannot post, sleep until we can. We
+ // never post more than will fit concurrently, so it's safe to wait
+ // until enough room is available.
+ if len(b) != 0 && !r.q.PostBuffers(b) {
+ r.q.EnableNotification()
+ for !r.q.PostBuffers(b) {
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+ }
+
+ // Read the next set of descriptors.
+ b, n := r.q.Dequeue(b[:0])
+ if len(b) != 0 {
+ return b, n
+ }
+
+ // Data isn't immediately available. Enable eventfd notifications.
+ r.q.EnableNotification()
+ for {
+ b, n = r.q.Dequeue(b)
+ if len(b) != 0 {
+ break
+ }
+
+ // Wait for notification.
+ var tmp [8]byte
+ rawfile.BlockingRead(r.eventFD, tmp[:])
+ if atomic.LoadUint32(stopRequested) != 0 {
+ r.q.DisableNotification()
+ return nil, 0
+ }
+ }
+ r.q.DisableNotification()
+
+ return b, n
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go
new file mode 100644
index 000000000..0374a2441
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem.go
@@ -0,0 +1,289 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+// Package sharedmem provides the implemention of data-link layer endpoints
+// backed by shared memory.
+//
+// Shared memory endpoints can be used in the networking stack by calling New()
+// to create a new endpoint, and then passing it as an argument to
+// Stack.CreateNIC().
+package sharedmem
+
+import (
+ "sync/atomic"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/log"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// QueueConfig holds all the file descriptors needed to describe a tx or rx
+// queue over shared memory. It is used when creating new shared memory
+// endpoints to describe tx and rx queues.
+type QueueConfig struct {
+ // DataFD is a file descriptor for the file that contains the data to
+ // be transmitted via this queue. Descriptors contain offsets within
+ // this file.
+ DataFD int
+
+ // EventFD is a file descriptor for the event that is signaled when
+ // data is becomes available in this queue.
+ EventFD int
+
+ // TxPipeFD is a file descriptor for the tx pipe associated with the
+ // queue.
+ TxPipeFD int
+
+ // RxPipeFD is a file descriptor for the rx pipe associated with the
+ // queue.
+ RxPipeFD int
+
+ // SharedDataFD is a file descriptor for the file that contains shared
+ // state between the two ends of the queue. This data specifies, for
+ // example, whether EventFD signaling is enabled or disabled.
+ SharedDataFD int
+}
+
+type endpoint struct {
+ // mtu (maximum transmission unit) is the maximum size of a packet.
+ mtu uint32
+
+ // bufferSize is the size of each individual buffer.
+ bufferSize uint32
+
+ // addr is the local address of this endpoint.
+ addr tcpip.LinkAddress
+
+ // rx is the receive queue.
+ rx rx
+
+ // stopRequested is to be accessed atomically only, and determines if
+ // the worker goroutines should stop.
+ stopRequested uint32
+
+ // Wait group used to indicate that all workers have stopped.
+ completed sync.WaitGroup
+
+ // mu protects the following fields.
+ mu sync.Mutex
+
+ // tx is the transmit queue.
+ tx tx
+
+ // workerStarted specifies whether the worker goroutine was started.
+ workerStarted bool
+}
+
+// New creates a new shared-memory-based endpoint. Buffers will be broken up
+// into buffers of "bufferSize" bytes.
+func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (stack.LinkEndpoint, error) {
+ e := &endpoint{
+ mtu: mtu,
+ bufferSize: bufferSize,
+ addr: addr,
+ }
+
+ if err := e.tx.init(bufferSize, &tx); err != nil {
+ return nil, err
+ }
+
+ if err := e.rx.init(bufferSize, &rx); err != nil {
+ e.tx.cleanup()
+ return nil, err
+ }
+
+ return e, nil
+}
+
+// Close frees all resources associated with the endpoint.
+func (e *endpoint) Close() {
+ // Tell dispatch goroutine to stop, then write to the eventfd so that
+ // it wakes up in case it's sleeping.
+ atomic.StoreUint32(&e.stopRequested, 1)
+ syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Cleanup the queues inline if the worker hasn't started yet; we also
+ // know it won't start from now on because stopRequested is set to 1.
+ e.mu.Lock()
+ workerPresent := e.workerStarted
+ e.mu.Unlock()
+
+ if !workerPresent {
+ e.tx.cleanup()
+ e.rx.cleanup()
+ }
+}
+
+// Wait implements stack.LinkEndpoint.Wait. It waits until all workers have
+// stopped after a Close() call.
+func (e *endpoint) Wait() {
+ e.completed.Wait()
+}
+
+// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that
+// reads packets from the rx queue.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.mu.Lock()
+ if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 {
+ e.workerStarted = true
+ e.completed.Add(1)
+ // Link endpoints are not savable. When transportation endpoints
+ // are saved, they stop sending outgoing packets and all
+ // incoming packets are rejected.
+ go e.dispatchLoop(dispatcher) // S/R-SAFE: see above.
+ }
+ e.mu.Unlock()
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.workerStarted
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized
+// during construction.
+func (e *endpoint) MTU() uint32 {
+ return e.mtu - header.EthernetMinimumSize
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (*endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return 0
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the
+// ethernet frame header size.
+func (*endpoint) MaxHeaderLength() uint16 {
+ return header.EthernetMinimumSize
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local
+// link address.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.addr
+}
+
+// WritePacket writes outbound packets to the file descriptor. If it is not
+// currently writable, the packet is dropped.
+func (e *endpoint) WritePacket(r *stack.Route, _ *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ // Add the ethernet header here.
+ eth := header.Ethernet(pkt.Header.Prepend(header.EthernetMinimumSize))
+ pkt.LinkHeader = buffer.View(eth)
+ ethHdr := &header.EthernetFields{
+ DstAddr: r.RemoteLinkAddress,
+ Type: protocol,
+ }
+ if r.LocalLinkAddress != "" {
+ ethHdr.SrcAddr = r.LocalLinkAddress
+ } else {
+ ethHdr.SrcAddr = e.addr
+ }
+ eth.Encode(ethHdr)
+
+ v := pkt.Data.ToView()
+ // Transmit the packet.
+ e.mu.Lock()
+ ok := e.tx.transmit(pkt.Header.View(), v)
+ e.mu.Unlock()
+
+ if !ok {
+ return tcpip.ErrWouldBlock
+ }
+
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+func (e *endpoint) WritePackets(r *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ panic("not implemented")
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ v := vv.ToView()
+ // Transmit the packet.
+ e.mu.Lock()
+ ok := e.tx.transmit(v, buffer.View{})
+ e.mu.Unlock()
+
+ if !ok {
+ return tcpip.ErrWouldBlock
+ }
+
+ return nil
+}
+
+// dispatchLoop reads packets from the rx queue in a loop and dispatches them
+// to the network stack.
+func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) {
+ // Post initial set of buffers.
+ limit := e.rx.q.PostedBuffersLimit()
+ if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l {
+ limit = l
+ }
+ for i := uint64(0); i < limit; i++ {
+ b := queue.RxBuffer{
+ Offset: i * uint64(e.bufferSize),
+ Size: e.bufferSize,
+ ID: i,
+ }
+ if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) {
+ log.Warningf("Unable to post %v-th buffer", i)
+ }
+ }
+
+ // Read in a loop until a stop is requested.
+ var rxb []queue.RxBuffer
+ for atomic.LoadUint32(&e.stopRequested) == 0 {
+ var n uint32
+ rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested)
+
+ // Copy data from the shared area to its own buffer, then
+ // prepare to repost the buffer.
+ b := make([]byte, n)
+ offset := uint32(0)
+ for i := range rxb {
+ copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size])
+ offset += rxb[i].Size
+
+ rxb[i].Size = e.bufferSize
+ }
+
+ if n < header.EthernetMinimumSize {
+ continue
+ }
+
+ // Send packet up the stack.
+ eth := header.Ethernet(b[:header.EthernetMinimumSize])
+ d.DeliverNetworkPacket(eth.SourceAddress(), eth.DestinationAddress(), eth.Type(), &stack.PacketBuffer{
+ Data: buffer.View(b[header.EthernetMinimumSize:]).ToVectorisedView(),
+ LinkHeader: buffer.View(eth),
+ })
+ }
+
+ // Clean state.
+ e.tx.cleanup()
+ e.rx.cleanup()
+
+ e.completed.Done()
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go
new file mode 100644
index 000000000..28a2e88ba
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem_test.go
@@ -0,0 +1,812 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+package sharedmem
+
+import (
+ "bytes"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+const (
+ localLinkAddr = "\xde\xad\xbe\xef\x56\x78"
+ remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34"
+
+ queueDataSize = 1024 * 1024
+ queuePipeSize = 4096
+)
+
+type queueBuffers struct {
+ data []byte
+ rx pipe.Tx
+ tx pipe.Rx
+}
+
+func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) {
+ // Prepare tx pipe.
+ b, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+ q.tx.Init(b)
+
+ // Prepare rx pipe.
+ b, err = getBuffer(c.RxPipeFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+ q.rx.Init(b)
+
+ // Get data slice.
+ q.data, err = getBuffer(c.DataFD)
+ if err != nil {
+ t.Fatalf("getBuffer failed: %v", err)
+ }
+}
+
+func (q *queueBuffers) cleanup() {
+ syscall.Munmap(q.tx.Bytes())
+ syscall.Munmap(q.rx.Bytes())
+ syscall.Munmap(q.data)
+}
+
+type packetInfo struct {
+ addr tcpip.LinkAddress
+ proto tcpip.NetworkProtocolNumber
+ vv buffer.VectorisedView
+ linkHeader buffer.View
+}
+
+type testContext struct {
+ t *testing.T
+ ep *endpoint
+ txCfg QueueConfig
+ rxCfg QueueConfig
+ txq queueBuffers
+ rxq queueBuffers
+
+ packetCh chan struct{}
+ mu sync.Mutex
+ packets []packetInfo
+}
+
+func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext {
+ var err error
+ c := &testContext{
+ t: t,
+ packetCh: make(chan struct{}, 1000000),
+ }
+ c.txCfg = createQueueFDs(t, queueSizes{
+ dataSize: queueDataSize,
+ txPipeSize: queuePipeSize,
+ rxPipeSize: queuePipeSize,
+ sharedDataSize: 4096,
+ })
+
+ c.rxCfg = createQueueFDs(t, queueSizes{
+ dataSize: queueDataSize,
+ txPipeSize: queuePipeSize,
+ rxPipeSize: queuePipeSize,
+ sharedDataSize: 4096,
+ })
+
+ initQueue(t, &c.txq, &c.txCfg)
+ initQueue(t, &c.rxq, &c.rxCfg)
+
+ ep, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg)
+ if err != nil {
+ t.Fatalf("New failed: %v", err)
+ }
+
+ c.ep = ep.(*endpoint)
+ c.ep.Attach(c)
+
+ return c
+}
+
+func (c *testContext) DeliverNetworkPacket(remoteLinkAddr, localLinkAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ c.mu.Lock()
+ c.packets = append(c.packets, packetInfo{
+ addr: remoteLinkAddr,
+ proto: proto,
+ vv: pkt.Data.Clone(nil),
+ })
+ c.mu.Unlock()
+
+ c.packetCh <- struct{}{}
+}
+
+func (c *testContext) cleanup() {
+ c.ep.Close()
+ closeFDs(&c.txCfg)
+ closeFDs(&c.rxCfg)
+ c.txq.cleanup()
+ c.rxq.cleanup()
+}
+
+func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) {
+ for i := 0; i < n; i++ {
+ select {
+ case <-c.packetCh:
+ case <-to:
+ c.t.Fatalf(errorStr)
+ }
+ }
+}
+
+func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) {
+ b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs)))
+ queue.EncodeRxCompletion(b, size, 0)
+ for i := range bs {
+ queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{
+ Offset: bs[i].Offset,
+ Size: bs[i].Size,
+ ID: bs[i].ID,
+ })
+ }
+}
+
+func randomFill(b []byte) {
+ for i := range b {
+ b[i] = byte(rand.Intn(256))
+ }
+}
+
+func shuffle(b []int) {
+ for i := len(b) - 1; i >= 0; i-- {
+ j := rand.Intn(i + 1)
+ b[i], b[j] = b[j], b[i]
+ }
+}
+
+func createFile(t *testing.T, size int64, initQueue bool) int {
+ tmpDir := os.Getenv("TEST_TMPDIR")
+ if tmpDir == "" {
+ tmpDir = os.Getenv("TMPDIR")
+ }
+ f, err := ioutil.TempFile(tmpDir, "sharedmem_test")
+ if err != nil {
+ t.Fatalf("TempFile failed: %v", err)
+ }
+ defer f.Close()
+ syscall.Unlink(f.Name())
+
+ if initQueue {
+ // Write the "slot-free" flag in the initial queue.
+ _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0)
+ if err != nil {
+ t.Fatalf("WriteAt failed: %v", err)
+ }
+ }
+
+ fd, err := syscall.Dup(int(f.Fd()))
+ if err != nil {
+ t.Fatalf("Dup failed: %v", err)
+ }
+
+ if err := syscall.Ftruncate(fd, size); err != nil {
+ syscall.Close(fd)
+ t.Fatalf("Ftruncate failed: %v", err)
+ }
+
+ return fd
+}
+
+func closeFDs(c *QueueConfig) {
+ syscall.Close(c.DataFD)
+ syscall.Close(c.EventFD)
+ syscall.Close(c.TxPipeFD)
+ syscall.Close(c.RxPipeFD)
+ syscall.Close(c.SharedDataFD)
+}
+
+type queueSizes struct {
+ dataSize int64
+ txPipeSize int64
+ rxPipeSize int64
+ sharedDataSize int64
+}
+
+func createQueueFDs(t *testing.T, s queueSizes) QueueConfig {
+ fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0)
+ if err != 0 {
+ t.Fatalf("eventfd failed: %v", error(err))
+ }
+
+ return QueueConfig{
+ EventFD: int(fd),
+ DataFD: createFile(t, s.dataSize, false),
+ TxPipeFD: createFile(t, s.txPipeSize, true),
+ RxPipeFD: createFile(t, s.rxPipeSize, true),
+ SharedDataFD: createFile(t, s.sharedDataSize, false),
+ }
+}
+
+// TestSimpleSend sends 1000 packets with random header and payload sizes,
+// then checks that the right payload is received on the shared memory queues.
+func TestSimpleSend(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare route.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ for iters := 1000; iters > 0; iters-- {
+ func() {
+ // Prepare and send packet.
+ n := rand.Intn(10000)
+ hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength()))
+ hdrBuf := hdr.Prepend(n)
+ randomFill(hdrBuf)
+
+ n = rand.Intn(10000)
+ buf := buffer.NewView(n)
+ randomFill(buf)
+
+ proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
+ if err := c.ep.WritePacket(&r, nil /* gso */, proto, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed: %v", err)
+ }
+
+ // Receive packet.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if pi.Reserved != 0 {
+ t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
+ }
+ contents := make([]byte, 0, pi.Size)
+ for i := 0; i < pi.BufferCount; i++ {
+ bi := queue.DecodeTxBufferHeader(desc, i)
+ contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
+ }
+ c.txq.tx.Flush()
+
+ defer func() {
+ // Tell the endpoint about the completion of the write.
+ b := c.txq.rx.Push(8)
+ queue.EncodeTxCompletion(b, pi.ID)
+ c.txq.rx.Flush()
+ }()
+
+ // Check the ethernet header.
+ ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
+ ethTemplate.Encode(&header.EthernetFields{
+ SrcAddr: localLinkAddr,
+ DstAddr: remoteLinkAddr,
+ Type: proto,
+ })
+ if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
+ t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
+ }
+
+ // Compare contents skipping the ethernet header added by the
+ // endpoint.
+ merged := append(hdrBuf, buf...)
+ if uint32(len(contents)) < pi.Size {
+ t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size)
+ }
+ contents = contents[:pi.Size][header.EthernetMinimumSize:]
+
+ if !bytes.Equal(contents, merged) {
+ t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged))
+ }
+ }()
+ }
+}
+
+// TestPreserveSrcAddressInSend calls WritePacket once with LocalLinkAddress
+// set in Route (using much of the same code as TestSimpleSend), then checks
+// that the encoded ethernet header received includes the correct SrcAddr.
+func TestPreserveSrcAddressInSend(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ newLocalLinkAddress := tcpip.LinkAddress(strings.Repeat("0xFE", 6))
+ // Set both remote and local link address in route.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ LocalLinkAddress: newLocalLinkAddress,
+ }
+
+ // WritePacket panics given a prependable with anything less than
+ // the minimum size of the ethernet header.
+ hdr := buffer.NewPrependable(header.EthernetMinimumSize)
+
+ proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
+ if err := c.ep.WritePacket(&r, nil /* gso */, proto, &stack.PacketBuffer{
+ Header: hdr,
+ }); err != nil {
+ t.Fatalf("WritePacket failed: %v", err)
+ }
+
+ // Receive packet.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if pi.Reserved != 0 {
+ t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
+ }
+ contents := make([]byte, 0, pi.Size)
+ for i := 0; i < pi.BufferCount; i++ {
+ bi := queue.DecodeTxBufferHeader(desc, i)
+ contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
+ }
+ c.txq.tx.Flush()
+
+ defer func() {
+ // Tell the endpoint about the completion of the write.
+ b := c.txq.rx.Push(8)
+ queue.EncodeTxCompletion(b, pi.ID)
+ c.txq.rx.Flush()
+ }()
+
+ // Check that the ethernet header contains the expected SrcAddr.
+ ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
+ ethTemplate.Encode(&header.EthernetFields{
+ SrcAddr: newLocalLinkAddress,
+ DstAddr: remoteLinkAddr,
+ Type: proto,
+ })
+ if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
+ t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
+ }
+}
+
+// TestFillTxQueue sends packets until the queue is full.
+func TestFillTxQueue(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses no more than 40 bytes, so write that many packets
+ // until the tx queue if full.
+ ids := make(map[uint64]struct{})
+ for i := queuePipeSize / 40; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets
+// until the queue is full.
+func TestFillTxQueueAfterBadCompletion(t *testing.T) {
+ c := newTestContext(t, 20000, 1500, localLinkAddr)
+ defer c.cleanup()
+
+ // Send a bad completion.
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), 1)
+ c.txq.rx.Flush()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Send two packets so that the id slice has at least two slots.
+ for i := 2; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+ }
+
+ // Complete the two writes twice.
+ for i := 2; i > 0; i-- {
+ pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull())
+
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
+ queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
+ c.txq.rx.Flush()
+ }
+ c.txq.tx.Flush()
+
+ // Each packet is uses no more than 40 bytes, so write that many packets
+ // until the tx queue if full.
+ ids := make(map[uint64]struct{})
+ for i := queuePipeSize / 40; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxMemory sends packets until the we run out of shared memory.
+func TestFillTxMemory(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses up one buffer, so write as many as possible until
+ // we fill the memory.
+ ids := make(map[uint64]struct{})
+ for i := queueDataSize / bufferSize; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Check that they have different IDs.
+ desc := c.txq.tx.Pull()
+ pi := queue.DecodeTxPacketHeader(desc)
+ if _, ok := ids[pi.ID]; ok {
+ t.Fatalf("ID (%v) reused", pi.ID)
+ }
+ ids[pi.ID] = struct{}{}
+ c.txq.tx.Flush()
+ }
+
+ // Next attempt to write must fail.
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ })
+ if want := tcpip.ErrWouldBlock; err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+}
+
+// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of
+// shared memory for a 2-buffer packet, but still with room for a 1-buffer
+// packet.
+func TestFillTxMemoryWithMultiBuffer(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Prepare to send a packet.
+ r := stack.Route{
+ RemoteLinkAddress: remoteLinkAddr,
+ }
+
+ buf := buffer.NewView(100)
+
+ // Each packet is uses up one buffer, so write as many as possible
+ // until there is only one buffer left.
+ for i := queueDataSize/bufferSize - 1; i > 0; i-- {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+
+ // Pull the posted buffer.
+ c.txq.tx.Pull()
+ c.txq.tx.Flush()
+ }
+
+ // Attempt to write a two-buffer packet. It must fail.
+ {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ uu := buffer.NewView(bufferSize).ToVectorisedView()
+ if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: uu,
+ }); err != want {
+ t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
+ }
+ }
+
+ // Attempt to write the one-buffer packet again. It must succeed.
+ {
+ hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
+ if err := c.ep.WritePacket(&r, nil /* gso */, header.IPv4ProtocolNumber, &stack.PacketBuffer{
+ Header: hdr,
+ Data: buf.ToVectorisedView(),
+ }); err != nil {
+ t.Fatalf("WritePacket failed unexpectedly: %v", err)
+ }
+ }
+}
+
+func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte {
+ t.Helper()
+
+ for {
+ b := p.Pull()
+ if b != nil {
+ return b
+ }
+
+ select {
+ case <-time.After(10 * time.Millisecond):
+ case <-to:
+ t.Fatal(errStr)
+ }
+ }
+}
+
+// TestSimpleReceive completes 1000 different receives with random payload and
+// random number of buffers. It checks that the contents match the expected
+// values.
+func TestSimpleReceive(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Check that buffers have been posted.
+ limit := c.ep.rx.q.PostedBuffersLimit()
+ for i := uint64(0); i < limit; i++ {
+ timeout := time.After(2 * time.Second)
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted"))
+
+ if want := i * bufferSize; want != bi.Offset {
+ t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want)
+ }
+
+ if want := i; want != bi.ID {
+ t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want)
+ }
+
+ if bufferSize != bi.Size {
+ t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize)
+ }
+ }
+ c.rxq.tx.Flush()
+
+ // Create a slice with the indices 0..limit-1.
+ idx := make([]int, limit)
+ for i := range idx {
+ idx[i] = i
+ }
+
+ // Complete random packets 1000 times.
+ for iters := 1000; iters > 0; iters-- {
+ timeout := time.After(2 * time.Second)
+ // Prepare a random packet.
+ shuffle(idx)
+ n := 1 + rand.Intn(10)
+ bufs := make([]queue.RxBuffer, n)
+ contents := make([]byte, bufferSize*n-rand.Intn(500))
+ randomFill(contents)
+ for i := range bufs {
+ j := idx[i]
+ bufs[i].Size = bufferSize
+ bufs[i].Offset = uint64(bufferSize * j)
+ bufs[i].ID = uint64(j)
+
+ copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:])
+ }
+
+ // Push completion.
+ c.pushRxCompletion(uint32(len(contents)), bufs)
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for packet to be received, then check it.
+ c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet")
+ c.mu.Lock()
+ rcvd := []byte(c.packets[0].vv.ToView())
+ c.packets = c.packets[:0]
+ c.mu.Unlock()
+
+ if contents := contents[header.EthernetMinimumSize:]; !bytes.Equal(contents, rcvd) {
+ t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents)
+ }
+
+ // Check that buffers have been reposted.
+ for i := range bufs {
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted"))
+ if bi != bufs[i] {
+ t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i])
+ }
+ }
+ c.rxq.tx.Flush()
+ }
+}
+
+// TestRxBuffersReposted tests that rx buffers get reposted after they have been
+// completed.
+func TestRxBuffersReposted(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Receive all posted buffers.
+ limit := c.ep.rx.q.PostedBuffersLimit()
+ buffers := make([]queue.RxBuffer, 0, limit)
+ for i := limit; i > 0; i-- {
+ timeout := time.After(2 * time.Second)
+ buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers")))
+ }
+ c.rxq.tx.Flush()
+
+ // Check that all buffers are reposted when individually completed.
+ for i := range buffers {
+ timeout := time.After(2 * time.Second)
+ // Complete the buffer.
+ c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for it to be reposted.
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
+ if bi != buffers[i] {
+ t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i])
+ }
+ }
+ c.rxq.tx.Flush()
+
+ // Check that all buffers are reposted when completed in pairs.
+ for i := 0; i < len(buffers)/2; i++ {
+ timeout := time.After(2 * time.Second)
+ // Complete with two buffers.
+ c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for them to be reposted.
+ for j := 0; j < 2; j++ {
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
+ if bi != buffers[2*i+j] {
+ t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+j])
+ }
+ }
+ }
+ c.rxq.tx.Flush()
+}
+
+// TestReceivePostingIsFull checks that the endpoint will properly handle the
+// case when a received buffer cannot be immediately reposted because it hasn't
+// been pulled from the tx pipe yet.
+func TestReceivePostingIsFull(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ defer c.cleanup()
+
+ // Complete first posted buffer before flushing it from the tx pipe.
+ first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
+ c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that packet is received.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
+
+ // Complete another buffer.
+ second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
+ c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that no packet is received yet, as the worker is blocked trying
+ // to repost.
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-c.packetCh:
+ t.Fatalf("Unexpected packet received")
+ }
+
+ // Flush tx queue, which will allow the first buffer to be reposted,
+ // and the second completion to be pulled.
+ c.rxq.tx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Check that second packet completes.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
+}
+
+// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to
+// repost a buffer. Make sure it backs out.
+func TestCloseWhileWaitingToPost(t *testing.T) {
+ const bufferSize = 1500
+ c := newTestContext(t, 20000, bufferSize, localLinkAddr)
+ cleaned := false
+ defer func() {
+ if !cleaned {
+ c.cleanup()
+ }
+ }()
+
+ // Complete first posted buffer before flushing it from the tx pipe.
+ bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
+ c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
+ c.rxq.rx.Flush()
+ syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+
+ // Wait for packet to be indicated.
+ c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
+
+ // Cleanup and wait for worker to complete.
+ c.cleanup()
+ cleaned = true
+ c.ep.Wait()
+}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
new file mode 100644
index 000000000..f7e816a41
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go
@@ -0,0 +1,25 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sharedmem
+
+import (
+ "unsafe"
+)
+
+// sharedDataPointer converts the shared data slice into a pointer so that it
+// can be used in atomic operations.
+func sharedDataPointer(sharedData []byte) *uint32 {
+ return (*uint32)(unsafe.Pointer(&sharedData[0:4][0]))
+}
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
new file mode 100644
index 000000000..6b8d7859d
--- /dev/null
+++ b/pkg/tcpip/link/sharedmem/tx.go
@@ -0,0 +1,272 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sharedmem
+
+import (
+ "math"
+ "syscall"
+
+ "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
+)
+
+const (
+ nilID = math.MaxUint64
+)
+
+// tx holds all state associated with a tx queue.
+type tx struct {
+ data []byte
+ q queue.Tx
+ ids idManager
+ bufs bufferManager
+}
+
+// init initializes all state needed by the tx queue based on the information
+// provided.
+//
+// The caller always retains ownership of all file descriptors passed in. The
+// queue implementation will duplicate any that it may need in the future.
+func (t *tx) init(mtu uint32, c *QueueConfig) error {
+ // Map in all buffers.
+ txPipe, err := getBuffer(c.TxPipeFD)
+ if err != nil {
+ return err
+ }
+
+ rxPipe, err := getBuffer(c.RxPipeFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ return err
+ }
+
+ data, err := getBuffer(c.DataFD)
+ if err != nil {
+ syscall.Munmap(txPipe)
+ syscall.Munmap(rxPipe)
+ return err
+ }
+
+ // Initialize state based on buffers.
+ t.q.Init(txPipe, rxPipe)
+ t.ids.init()
+ t.bufs.init(0, len(data), int(mtu))
+ t.data = data
+
+ return nil
+}
+
+// cleanup releases all resources allocated during init(). It must only be
+// called if init() has previously succeeded.
+func (t *tx) cleanup() {
+ a, b := t.q.Bytes()
+ syscall.Munmap(a)
+ syscall.Munmap(b)
+ syscall.Munmap(t.data)
+}
+
+// transmit sends a packet made up of up to two buffers. Returns a boolean that
+// specifies whether the packet was successfully transmitted.
+func (t *tx) transmit(a, b []byte) bool {
+ // Pull completions from the tx queue and add their buffers back to the
+ // pool so that we can reuse them.
+ for {
+ id, ok := t.q.CompletedPacket()
+ if !ok {
+ break
+ }
+
+ if buf := t.ids.remove(id); buf != nil {
+ t.bufs.free(buf)
+ }
+ }
+
+ bSize := t.bufs.entrySize
+ total := uint32(len(a) + len(b))
+ bufCount := (total + bSize - 1) / bSize
+
+ // Allocate enough buffers to hold all the data.
+ var buf *queue.TxBuffer
+ for i := bufCount; i != 0; i-- {
+ b := t.bufs.alloc()
+ if b == nil {
+ // Failed to get all buffers. Return to the pool
+ // whatever we had managed to get.
+ if buf != nil {
+ t.bufs.free(buf)
+ }
+ return false
+ }
+ b.Next = buf
+ buf = b
+ }
+
+ // Copy data into allocated buffers.
+ nBuf := buf
+ var dBuf []byte
+ for _, data := range [][]byte{a, b} {
+ for len(data) > 0 {
+ if len(dBuf) == 0 {
+ dBuf = t.data[nBuf.Offset:][:nBuf.Size]
+ nBuf = nBuf.Next
+ }
+ n := copy(dBuf, data)
+ data = data[n:]
+ dBuf = dBuf[n:]
+ }
+ }
+
+ // Get an id for this packet and send it out.
+ id := t.ids.add(buf)
+ if !t.q.Enqueue(id, total, bufCount, buf) {
+ t.ids.remove(id)
+ t.bufs.free(buf)
+ return false
+ }
+
+ return true
+}
+
+// getBuffer returns a memory region mapped to the full contents of the given
+// file descriptor.
+func getBuffer(fd int) ([]byte, error) {
+ var s syscall.Stat_t
+ if err := syscall.Fstat(fd, &s); err != nil {
+ return nil, err
+ }
+
+ // Check that size doesn't overflow an int.
+ if s.Size > int64(^uint(0)>>1) {
+ return nil, syscall.EDOM
+ }
+
+ return syscall.Mmap(fd, 0, int(s.Size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_FILE)
+}
+
+// idDescriptor is used by idManager to either point to a tx buffer (in case
+// the ID is assigned) or to the next free element (if the id is not assigned).
+type idDescriptor struct {
+ buf *queue.TxBuffer
+ nextFree uint64
+}
+
+// idManager is a manager of tx buffer identifiers. It assigns unique IDs to
+// tx buffers that are added to it; the IDs can only be reused after they have
+// been removed.
+//
+// The ID assignments are stored so that the tx buffers can be retrieved from
+// the IDs previously assigned to them.
+type idManager struct {
+ // ids is a slice containing all tx buffers. The ID is the index into
+ // this slice.
+ ids []idDescriptor
+
+ // freeList a list of free IDs.
+ freeList uint64
+}
+
+// init initializes the id manager.
+func (m *idManager) init() {
+ m.freeList = nilID
+}
+
+// add assigns an ID to the given tx buffer.
+func (m *idManager) add(b *queue.TxBuffer) uint64 {
+ if i := m.freeList; i != nilID {
+ // There is an id available in the free list, just use it.
+ m.ids[i].buf = b
+ m.freeList = m.ids[i].nextFree
+ return i
+ }
+
+ // We need to expand the id descriptor.
+ m.ids = append(m.ids, idDescriptor{buf: b})
+ return uint64(len(m.ids) - 1)
+}
+
+// remove retrieves the tx buffer associated with the given ID, and removes the
+// ID from the assigned table so that it can be reused in the future.
+func (m *idManager) remove(i uint64) *queue.TxBuffer {
+ if i >= uint64(len(m.ids)) {
+ return nil
+ }
+
+ desc := &m.ids[i]
+ b := desc.buf
+ if b == nil {
+ // The provided id is not currently assigned.
+ return nil
+ }
+
+ desc.buf = nil
+ desc.nextFree = m.freeList
+ m.freeList = i
+
+ return b
+}
+
+// bufferManager manages a buffer region broken up into smaller, equally sized
+// buffers. Smaller buffers can be allocated and freed.
+type bufferManager struct {
+ freeList *queue.TxBuffer
+ curOffset uint64
+ limit uint64
+ entrySize uint32
+}
+
+// init initializes the buffer manager.
+func (b *bufferManager) init(initialOffset, size, entrySize int) {
+ b.freeList = nil
+ b.curOffset = uint64(initialOffset)
+ b.limit = uint64(initialOffset + size/entrySize*entrySize)
+ b.entrySize = uint32(entrySize)
+}
+
+// alloc allocates a buffer from the manager, if one is available.
+func (b *bufferManager) alloc() *queue.TxBuffer {
+ if b.freeList != nil {
+ // There is a descriptor ready for reuse in the free list.
+ d := b.freeList
+ b.freeList = d.Next
+ d.Next = nil
+ return d
+ }
+
+ if b.curOffset < b.limit {
+ // There is room available in the never-used range, so create
+ // a new descriptor for it.
+ d := &queue.TxBuffer{
+ Offset: b.curOffset,
+ Size: b.entrySize,
+ }
+ b.curOffset += uint64(b.entrySize)
+ return d
+ }
+
+ return nil
+}
+
+// free returns all buffers in the list to the buffer manager so that they can
+// be reused.
+func (b *bufferManager) free(d *queue.TxBuffer) {
+ // Find the last buffer in the list.
+ last := d
+ for last.Next != nil {
+ last = last.Next
+ }
+
+ // Push list onto free list.
+ last.Next = b.freeList
+ b.freeList = d
+}
diff --git a/pkg/tcpip/link/sniffer/BUILD b/pkg/tcpip/link/sniffer/BUILD
new file mode 100644
index 000000000..7cbc305e7
--- /dev/null
+++ b/pkg/tcpip/link/sniffer/BUILD
@@ -0,0 +1,20 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "sniffer",
+ srcs = [
+ "pcap.go",
+ "sniffer.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/log",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/nested",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/sniffer/pcap.go b/pkg/tcpip/link/sniffer/pcap.go
new file mode 100644
index 000000000..c16c19647
--- /dev/null
+++ b/pkg/tcpip/link/sniffer/pcap.go
@@ -0,0 +1,66 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sniffer
+
+import "time"
+
+type pcapHeader struct {
+ // MagicNumber is the file magic number.
+ MagicNumber uint32
+
+ // VersionMajor is the major version number.
+ VersionMajor uint16
+
+ // VersionMinor is the minor version number.
+ VersionMinor uint16
+
+ // Thiszone is the GMT to local correction.
+ Thiszone int32
+
+ // Sigfigs is the accuracy of timestamps.
+ Sigfigs uint32
+
+ // Snaplen is the max length of captured packets, in octets.
+ Snaplen uint32
+
+ // Network is the data link type.
+ Network uint32
+}
+
+const pcapPacketHeaderLen = 16
+
+type pcapPacketHeader struct {
+ // Seconds is the timestamp seconds.
+ Seconds uint32
+
+ // Microseconds is the timestamp microseconds.
+ Microseconds uint32
+
+ // IncludedLength is the number of octets of packet saved in file.
+ IncludedLength uint32
+
+ // OriginalLength is the actual length of packet.
+ OriginalLength uint32
+}
+
+func newPCAPPacketHeader(incLen, orgLen uint32) pcapPacketHeader {
+ now := time.Now()
+ return pcapPacketHeader{
+ Seconds: uint32(now.Unix()),
+ Microseconds: uint32(now.Nanosecond() / 1000),
+ IncludedLength: incLen,
+ OriginalLength: orgLen,
+ }
+}
diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go
new file mode 100644
index 000000000..d9cd4e83a
--- /dev/null
+++ b/pkg/tcpip/link/sniffer/sniffer.go
@@ -0,0 +1,394 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package sniffer provides the implementation of data-link layer endpoints that
+// wrap another endpoint and logs inbound and outbound packets.
+//
+// Sniffer endpoints can be used in the networking stack by calling New(eID) to
+// create a new endpoint, where eID is the ID of the endpoint being wrapped,
+// and then passing it as an argument to Stack.CreateNIC().
+package sniffer
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "sync/atomic"
+ "time"
+
+ "gvisor.dev/gvisor/pkg/log"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/nested"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// LogPackets is a flag used to enable or disable packet logging via the log
+// package. Valid values are 0 or 1.
+//
+// LogPackets must be accessed atomically.
+var LogPackets uint32 = 1
+
+// LogPacketsToPCAP is a flag used to enable or disable logging packets to a
+// pcap writer. Valid values are 0 or 1. A writer must have been specified when the
+// sniffer was created for this flag to have effect.
+//
+// LogPacketsToPCAP must be accessed atomically.
+var LogPacketsToPCAP uint32 = 1
+
+type endpoint struct {
+ nested.Endpoint
+ writer io.Writer
+ maxPCAPLen uint32
+}
+
+var _ stack.GSOEndpoint = (*endpoint)(nil)
+var _ stack.LinkEndpoint = (*endpoint)(nil)
+var _ stack.NetworkDispatcher = (*endpoint)(nil)
+
+// New creates a new sniffer link-layer endpoint. It wraps around another
+// endpoint and logs packets and they traverse the endpoint.
+func New(lower stack.LinkEndpoint) stack.LinkEndpoint {
+ sniffer := &endpoint{}
+ sniffer.Endpoint.Init(lower, sniffer)
+ return sniffer
+}
+
+func zoneOffset() (int32, error) {
+ loc, err := time.LoadLocation("Local")
+ if err != nil {
+ return 0, err
+ }
+ date := time.Date(0, 0, 0, 0, 0, 0, 0, loc)
+ _, offset := date.Zone()
+ return int32(offset), nil
+}
+
+func writePCAPHeader(w io.Writer, maxLen uint32) error {
+ offset, err := zoneOffset()
+ if err != nil {
+ return err
+ }
+ return binary.Write(w, binary.BigEndian, pcapHeader{
+ // From https://wiki.wireshark.org/Development/LibpcapFileFormat
+ MagicNumber: 0xa1b2c3d4,
+
+ VersionMajor: 2,
+ VersionMinor: 4,
+ Thiszone: offset,
+ Sigfigs: 0,
+ Snaplen: maxLen,
+ Network: 101, // LINKTYPE_RAW
+ })
+}
+
+// NewWithWriter creates a new sniffer link-layer endpoint. It wraps around
+// another endpoint and logs packets as they traverse the endpoint.
+//
+// Packets are logged to writer in the pcap format. A sniffer created with this
+// function will not emit packets using the standard log package.
+//
+// snapLen is the maximum amount of a packet to be saved. Packets with a length
+// less than or equal to snapLen will be saved in their entirety. Longer
+// packets will be truncated to snapLen.
+func NewWithWriter(lower stack.LinkEndpoint, writer io.Writer, snapLen uint32) (stack.LinkEndpoint, error) {
+ if err := writePCAPHeader(writer, snapLen); err != nil {
+ return nil, err
+ }
+ sniffer := &endpoint{
+ writer: writer,
+ maxPCAPLen: snapLen,
+ }
+ sniffer.Endpoint.Init(lower, sniffer)
+ return sniffer, nil
+}
+
+// DeliverNetworkPacket implements the stack.NetworkDispatcher interface. It is
+// called by the link-layer endpoint being wrapped when a packet arrives, and
+// logs the packet before forwarding to the actual dispatcher.
+func (e *endpoint) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.dumpPacket("recv", nil, protocol, pkt)
+ e.Endpoint.DeliverNetworkPacket(remote, local, protocol, pkt)
+}
+
+func (e *endpoint) dumpPacket(prefix string, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ writer := e.writer
+ if writer == nil && atomic.LoadUint32(&LogPackets) == 1 {
+ logPacket(prefix, protocol, pkt, gso)
+ }
+ if writer != nil && atomic.LoadUint32(&LogPacketsToPCAP) == 1 {
+ totalLength := pkt.Header.UsedLength() + pkt.Data.Size()
+ length := totalLength
+ if max := int(e.maxPCAPLen); length > max {
+ length = max
+ }
+ if err := binary.Write(writer, binary.BigEndian, newPCAPPacketHeader(uint32(length), uint32(totalLength))); err != nil {
+ panic(err)
+ }
+ write := func(b []byte) {
+ if len(b) > length {
+ b = b[:length]
+ }
+ for len(b) != 0 {
+ n, err := writer.Write(b)
+ if err != nil {
+ panic(err)
+ }
+ b = b[n:]
+ length -= n
+ }
+ }
+ write(pkt.Header.View())
+ for _, view := range pkt.Data.Views() {
+ if length == 0 {
+ break
+ }
+ write(view)
+ }
+ }
+}
+
+// WritePacket implements the stack.LinkEndpoint interface. It is called by
+// higher-level protocols to write packets; it just logs the packet and
+// forwards the request to the lower endpoint.
+func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ e.dumpPacket("send", gso, protocol, pkt)
+ return e.Endpoint.WritePacket(r, gso, protocol, pkt)
+}
+
+// WritePackets implements the stack.LinkEndpoint interface. It is called by
+// higher-level protocols to write packets; it just logs the packet and
+// forwards the request to the lower endpoint.
+func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+ e.dumpPacket("send", gso, protocol, pkt)
+ }
+ return e.Endpoint.WritePackets(r, gso, pkts, protocol)
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ e.dumpPacket("send", nil, 0, &stack.PacketBuffer{
+ Data: vv,
+ })
+ return e.Endpoint.WriteRawPacket(vv)
+}
+
+func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer, gso *stack.GSO) {
+ // Figure out the network layer info.
+ var transProto uint8
+ src := tcpip.Address("unknown")
+ dst := tcpip.Address("unknown")
+ id := 0
+ size := uint16(0)
+ var fragmentOffset uint16
+ var moreFragments bool
+
+ // Create a clone of pkt, including any headers if present. Avoid allocating
+ // backing memory for the clone.
+ views := [8]buffer.View{}
+ vv := buffer.NewVectorisedView(0, views[:0])
+ vv.AppendView(pkt.Header.View())
+ vv.Append(pkt.Data)
+
+ switch protocol {
+ case header.IPv4ProtocolNumber:
+ hdr, ok := vv.PullUp(header.IPv4MinimumSize)
+ if !ok {
+ return
+ }
+ ipv4 := header.IPv4(hdr)
+ fragmentOffset = ipv4.FragmentOffset()
+ moreFragments = ipv4.Flags()&header.IPv4FlagMoreFragments == header.IPv4FlagMoreFragments
+ src = ipv4.SourceAddress()
+ dst = ipv4.DestinationAddress()
+ transProto = ipv4.Protocol()
+ size = ipv4.TotalLength() - uint16(ipv4.HeaderLength())
+ vv.TrimFront(int(ipv4.HeaderLength()))
+ id = int(ipv4.ID())
+
+ case header.IPv6ProtocolNumber:
+ hdr, ok := vv.PullUp(header.IPv6MinimumSize)
+ if !ok {
+ return
+ }
+ ipv6 := header.IPv6(hdr)
+ src = ipv6.SourceAddress()
+ dst = ipv6.DestinationAddress()
+ transProto = ipv6.NextHeader()
+ size = ipv6.PayloadLength()
+ vv.TrimFront(header.IPv6MinimumSize)
+
+ case header.ARPProtocolNumber:
+ hdr, ok := vv.PullUp(header.ARPSize)
+ if !ok {
+ return
+ }
+ vv.TrimFront(header.ARPSize)
+ arp := header.ARP(hdr)
+ log.Infof(
+ "%s arp %s (%s) -> %s (%s) valid:%t",
+ prefix,
+ tcpip.Address(arp.ProtocolAddressSender()), tcpip.LinkAddress(arp.HardwareAddressSender()),
+ tcpip.Address(arp.ProtocolAddressTarget()), tcpip.LinkAddress(arp.HardwareAddressTarget()),
+ arp.IsValid(),
+ )
+ return
+ default:
+ log.Infof("%s unknown network protocol", prefix)
+ return
+ }
+
+ // Figure out the transport layer info.
+ transName := "unknown"
+ srcPort := uint16(0)
+ dstPort := uint16(0)
+ details := ""
+ switch tcpip.TransportProtocolNumber(transProto) {
+ case header.ICMPv4ProtocolNumber:
+ transName = "icmp"
+ hdr, ok := vv.PullUp(header.ICMPv4MinimumSize)
+ if !ok {
+ break
+ }
+ icmp := header.ICMPv4(hdr)
+ icmpType := "unknown"
+ if fragmentOffset == 0 {
+ switch icmp.Type() {
+ case header.ICMPv4EchoReply:
+ icmpType = "echo reply"
+ case header.ICMPv4DstUnreachable:
+ icmpType = "destination unreachable"
+ case header.ICMPv4SrcQuench:
+ icmpType = "source quench"
+ case header.ICMPv4Redirect:
+ icmpType = "redirect"
+ case header.ICMPv4Echo:
+ icmpType = "echo"
+ case header.ICMPv4TimeExceeded:
+ icmpType = "time exceeded"
+ case header.ICMPv4ParamProblem:
+ icmpType = "param problem"
+ case header.ICMPv4Timestamp:
+ icmpType = "timestamp"
+ case header.ICMPv4TimestampReply:
+ icmpType = "timestamp reply"
+ case header.ICMPv4InfoRequest:
+ icmpType = "info request"
+ case header.ICMPv4InfoReply:
+ icmpType = "info reply"
+ }
+ }
+ log.Infof("%s %s %s -> %s %s len:%d id:%04x code:%d", prefix, transName, src, dst, icmpType, size, id, icmp.Code())
+ return
+
+ case header.ICMPv6ProtocolNumber:
+ transName = "icmp"
+ hdr, ok := vv.PullUp(header.ICMPv6MinimumSize)
+ if !ok {
+ break
+ }
+ icmp := header.ICMPv6(hdr)
+ icmpType := "unknown"
+ switch icmp.Type() {
+ case header.ICMPv6DstUnreachable:
+ icmpType = "destination unreachable"
+ case header.ICMPv6PacketTooBig:
+ icmpType = "packet too big"
+ case header.ICMPv6TimeExceeded:
+ icmpType = "time exceeded"
+ case header.ICMPv6ParamProblem:
+ icmpType = "param problem"
+ case header.ICMPv6EchoRequest:
+ icmpType = "echo request"
+ case header.ICMPv6EchoReply:
+ icmpType = "echo reply"
+ case header.ICMPv6RouterSolicit:
+ icmpType = "router solicit"
+ case header.ICMPv6RouterAdvert:
+ icmpType = "router advert"
+ case header.ICMPv6NeighborSolicit:
+ icmpType = "neighbor solicit"
+ case header.ICMPv6NeighborAdvert:
+ icmpType = "neighbor advert"
+ case header.ICMPv6RedirectMsg:
+ icmpType = "redirect message"
+ }
+ log.Infof("%s %s %s -> %s %s len:%d id:%04x code:%d", prefix, transName, src, dst, icmpType, size, id, icmp.Code())
+ return
+
+ case header.UDPProtocolNumber:
+ transName = "udp"
+ hdr, ok := vv.PullUp(header.UDPMinimumSize)
+ if !ok {
+ break
+ }
+ udp := header.UDP(hdr)
+ if fragmentOffset == 0 {
+ srcPort = udp.SourcePort()
+ dstPort = udp.DestinationPort()
+ details = fmt.Sprintf("xsum: 0x%x", udp.Checksum())
+ size -= header.UDPMinimumSize
+ }
+
+ case header.TCPProtocolNumber:
+ transName = "tcp"
+ hdr, ok := vv.PullUp(header.TCPMinimumSize)
+ if !ok {
+ break
+ }
+ tcp := header.TCP(hdr)
+ if fragmentOffset == 0 {
+ offset := int(tcp.DataOffset())
+ if offset < header.TCPMinimumSize {
+ details += fmt.Sprintf("invalid packet: tcp data offset too small %d", offset)
+ break
+ }
+ if offset > vv.Size() && !moreFragments {
+ details += fmt.Sprintf("invalid packet: tcp data offset %d larger than packet buffer length %d", offset, vv.Size())
+ break
+ }
+
+ srcPort = tcp.SourcePort()
+ dstPort = tcp.DestinationPort()
+ size -= uint16(offset)
+
+ // Initialize the TCP flags.
+ flags := tcp.Flags()
+ flagsStr := []byte("FSRPAU")
+ for i := range flagsStr {
+ if flags&(1<<uint(i)) == 0 {
+ flagsStr[i] = ' '
+ }
+ }
+ details = fmt.Sprintf("flags:0x%02x (%s) seqnum: %d ack: %d win: %d xsum:0x%x", flags, string(flagsStr), tcp.SequenceNumber(), tcp.AckNumber(), tcp.WindowSize(), tcp.Checksum())
+ if flags&header.TCPFlagSyn != 0 {
+ details += fmt.Sprintf(" options: %+v", header.ParseSynOptions(tcp.Options(), flags&header.TCPFlagAck != 0))
+ } else {
+ details += fmt.Sprintf(" options: %+v", tcp.ParsedOptions())
+ }
+ }
+
+ default:
+ log.Infof("%s %s -> %s unknown transport protocol: %d", prefix, src, dst, transProto)
+ return
+ }
+
+ if gso != nil {
+ details += fmt.Sprintf(" gso: %+v", gso)
+ }
+
+ log.Infof("%s %s %s:%d -> %s:%d len:%d id:%04x %s", prefix, transName, src, srcPort, dst, dstPort, size, id, details)
+}
diff --git a/pkg/tcpip/link/tun/BUILD b/pkg/tcpip/link/tun/BUILD
new file mode 100644
index 000000000..e0db6cf54
--- /dev/null
+++ b/pkg/tcpip/link/tun/BUILD
@@ -0,0 +1,25 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "tun",
+ srcs = [
+ "device.go",
+ "protocol.go",
+ "tun_unsafe.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/abi/linux",
+ "//pkg/refs",
+ "//pkg/sync",
+ "//pkg/syserror",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/header",
+ "//pkg/tcpip/link/channel",
+ "//pkg/tcpip/stack",
+ "//pkg/waiter",
+ ],
+)
diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go
new file mode 100644
index 000000000..6bc9033d0
--- /dev/null
+++ b/pkg/tcpip/link/tun/device.go
@@ -0,0 +1,358 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tun
+
+import (
+ "fmt"
+
+ "gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/refs"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/syserror"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/header"
+ "gvisor.dev/gvisor/pkg/tcpip/link/channel"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+ "gvisor.dev/gvisor/pkg/waiter"
+)
+
+const (
+ // drivers/net/tun.c:tun_net_init()
+ defaultDevMtu = 1500
+
+ // Queue length for outbound packet, arriving at fd side for read. Overflow
+ // causes packet drops. gVisor implementation-specific.
+ defaultDevOutQueueLen = 1024
+)
+
+var zeroMAC [6]byte
+
+// Device is an opened /dev/net/tun device.
+//
+// +stateify savable
+type Device struct {
+ waiter.Queue
+
+ mu sync.RWMutex `state:"nosave"`
+ endpoint *tunEndpoint
+ notifyHandle *channel.NotificationHandle
+ flags uint16
+}
+
+// beforeSave is invoked by stateify.
+func (d *Device) beforeSave() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ // TODO(b/110961832): Restore the device to stack. At this moment, the stack
+ // is not savable.
+ if d.endpoint != nil {
+ panic("/dev/net/tun does not support save/restore when a device is associated with it.")
+ }
+}
+
+// Release implements fs.FileOperations.Release.
+func (d *Device) Release() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ // Decrease refcount if there is an endpoint associated with this file.
+ if d.endpoint != nil {
+ d.endpoint.RemoveNotify(d.notifyHandle)
+ d.endpoint.DecRef()
+ d.endpoint = nil
+ }
+}
+
+// SetIff services TUNSETIFF ioctl(2) request.
+func (d *Device) SetIff(s *stack.Stack, name string, flags uint16) error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if d.endpoint != nil {
+ return syserror.EINVAL
+ }
+
+ // Input validations.
+ isTun := flags&linux.IFF_TUN != 0
+ isTap := flags&linux.IFF_TAP != 0
+ supportedFlags := uint16(linux.IFF_TUN | linux.IFF_TAP | linux.IFF_NO_PI)
+ if isTap && isTun || !isTap && !isTun || flags&^supportedFlags != 0 {
+ return syserror.EINVAL
+ }
+
+ prefix := "tun"
+ if isTap {
+ prefix = "tap"
+ }
+
+ linkCaps := stack.CapabilityNone
+ if isTap {
+ linkCaps |= stack.CapabilityResolutionRequired
+ }
+
+ endpoint, err := attachOrCreateNIC(s, name, prefix, linkCaps)
+ if err != nil {
+ return syserror.EINVAL
+ }
+
+ d.endpoint = endpoint
+ d.notifyHandle = d.endpoint.AddNotify(d)
+ d.flags = flags
+ return nil
+}
+
+func attachOrCreateNIC(s *stack.Stack, name, prefix string, linkCaps stack.LinkEndpointCapabilities) (*tunEndpoint, error) {
+ for {
+ // 1. Try to attach to an existing NIC.
+ if name != "" {
+ if nic, found := s.GetNICByName(name); found {
+ endpoint, ok := nic.LinkEndpoint().(*tunEndpoint)
+ if !ok {
+ // Not a NIC created by tun device.
+ return nil, syserror.EOPNOTSUPP
+ }
+ if !endpoint.TryIncRef() {
+ // Race detected: NIC got deleted in between.
+ continue
+ }
+ return endpoint, nil
+ }
+ }
+
+ // 2. Creating a new NIC.
+ id := tcpip.NICID(s.UniqueID())
+ endpoint := &tunEndpoint{
+ Endpoint: channel.New(defaultDevOutQueueLen, defaultDevMtu, ""),
+ stack: s,
+ nicID: id,
+ name: name,
+ }
+ endpoint.Endpoint.LinkEPCapabilities = linkCaps
+ if endpoint.name == "" {
+ endpoint.name = fmt.Sprintf("%s%d", prefix, id)
+ }
+ err := s.CreateNICWithOptions(endpoint.nicID, endpoint, stack.NICOptions{
+ Name: endpoint.name,
+ })
+ switch err {
+ case nil:
+ return endpoint, nil
+ case tcpip.ErrDuplicateNICID:
+ // Race detected: A NIC has been created in between.
+ continue
+ default:
+ return nil, syserror.EINVAL
+ }
+ }
+}
+
+// Write inject one inbound packet to the network interface.
+func (d *Device) Write(data []byte) (int64, error) {
+ d.mu.RLock()
+ endpoint := d.endpoint
+ d.mu.RUnlock()
+ if endpoint == nil {
+ return 0, syserror.EBADFD
+ }
+ if !endpoint.IsAttached() {
+ return 0, syserror.EIO
+ }
+
+ dataLen := int64(len(data))
+
+ // Packet information.
+ var pktInfoHdr PacketInfoHeader
+ if !d.hasFlags(linux.IFF_NO_PI) {
+ if len(data) < PacketInfoHeaderSize {
+ // Ignore bad packet.
+ return dataLen, nil
+ }
+ pktInfoHdr = PacketInfoHeader(data[:PacketInfoHeaderSize])
+ data = data[PacketInfoHeaderSize:]
+ }
+
+ // Ethernet header (TAP only).
+ var ethHdr header.Ethernet
+ if d.hasFlags(linux.IFF_TAP) {
+ if len(data) < header.EthernetMinimumSize {
+ // Ignore bad packet.
+ return dataLen, nil
+ }
+ ethHdr = header.Ethernet(data[:header.EthernetMinimumSize])
+ data = data[header.EthernetMinimumSize:]
+ }
+
+ // Try to determine network protocol number, default zero.
+ var protocol tcpip.NetworkProtocolNumber
+ switch {
+ case pktInfoHdr != nil:
+ protocol = pktInfoHdr.Protocol()
+ case ethHdr != nil:
+ protocol = ethHdr.Type()
+ }
+
+ // Try to determine remote link address, default zero.
+ var remote tcpip.LinkAddress
+ switch {
+ case ethHdr != nil:
+ remote = ethHdr.SourceAddress()
+ default:
+ remote = tcpip.LinkAddress(zeroMAC[:])
+ }
+
+ pkt := &stack.PacketBuffer{
+ Data: buffer.View(data).ToVectorisedView(),
+ }
+ if ethHdr != nil {
+ pkt.LinkHeader = buffer.View(ethHdr)
+ }
+ endpoint.InjectLinkAddr(protocol, remote, pkt)
+ return dataLen, nil
+}
+
+// Read reads one outgoing packet from the network interface.
+func (d *Device) Read() ([]byte, error) {
+ d.mu.RLock()
+ endpoint := d.endpoint
+ d.mu.RUnlock()
+ if endpoint == nil {
+ return nil, syserror.EBADFD
+ }
+
+ for {
+ info, ok := endpoint.Read()
+ if !ok {
+ return nil, syserror.ErrWouldBlock
+ }
+
+ v, ok := d.encodePkt(&info)
+ if !ok {
+ // Ignore unsupported packet.
+ continue
+ }
+ return v, nil
+ }
+}
+
+// encodePkt encodes packet for fd side.
+func (d *Device) encodePkt(info *channel.PacketInfo) (buffer.View, bool) {
+ var vv buffer.VectorisedView
+
+ // Packet information.
+ if !d.hasFlags(linux.IFF_NO_PI) {
+ hdr := make(PacketInfoHeader, PacketInfoHeaderSize)
+ hdr.Encode(&PacketInfoFields{
+ Protocol: info.Proto,
+ })
+ vv.AppendView(buffer.View(hdr))
+ }
+
+ // If the packet does not already have link layer header, and the route
+ // does not exist, we can't compute it. This is possibly a raw packet, tun
+ // device doesn't support this at the moment.
+ if info.Pkt.LinkHeader == nil && info.Route.RemoteLinkAddress == "" {
+ return nil, false
+ }
+
+ // Ethernet header (TAP only).
+ if d.hasFlags(linux.IFF_TAP) {
+ // Add ethernet header if not provided.
+ if info.Pkt.LinkHeader == nil {
+ hdr := &header.EthernetFields{
+ SrcAddr: info.Route.LocalLinkAddress,
+ DstAddr: info.Route.RemoteLinkAddress,
+ Type: info.Proto,
+ }
+ if hdr.SrcAddr == "" {
+ hdr.SrcAddr = d.endpoint.LinkAddress()
+ }
+
+ eth := make(header.Ethernet, header.EthernetMinimumSize)
+ eth.Encode(hdr)
+ vv.AppendView(buffer.View(eth))
+ } else {
+ vv.AppendView(info.Pkt.LinkHeader)
+ }
+ }
+
+ // Append upper headers.
+ vv.AppendView(buffer.View(info.Pkt.Header.View()[len(info.Pkt.LinkHeader):]))
+ // Append data payload.
+ vv.Append(info.Pkt.Data)
+
+ return vv.ToView(), true
+}
+
+// Name returns the name of the attached network interface. Empty string if
+// unattached.
+func (d *Device) Name() string {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.endpoint != nil {
+ return d.endpoint.name
+ }
+ return ""
+}
+
+// Flags returns the flags set for d. Zero value if unset.
+func (d *Device) Flags() uint16 {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ return d.flags
+}
+
+func (d *Device) hasFlags(flags uint16) bool {
+ return d.flags&flags == flags
+}
+
+// Readiness implements watier.Waitable.Readiness.
+func (d *Device) Readiness(mask waiter.EventMask) waiter.EventMask {
+ if mask&waiter.EventIn != 0 {
+ d.mu.RLock()
+ endpoint := d.endpoint
+ d.mu.RUnlock()
+ if endpoint != nil && endpoint.NumQueued() == 0 {
+ mask &= ^waiter.EventIn
+ }
+ }
+ return mask & (waiter.EventIn | waiter.EventOut)
+}
+
+// WriteNotify implements channel.Notification.WriteNotify.
+func (d *Device) WriteNotify() {
+ d.Notify(waiter.EventIn)
+}
+
+// tunEndpoint is the link endpoint for the NIC created by the tun device.
+//
+// It is ref-counted as multiple opening files can attach to the same NIC.
+// The last owner is responsible for deleting the NIC.
+type tunEndpoint struct {
+ *channel.Endpoint
+
+ refs.AtomicRefCount
+
+ stack *stack.Stack
+ nicID tcpip.NICID
+ name string
+}
+
+// DecRef decrements refcount of e, removes NIC if refcount goes to 0.
+func (e *tunEndpoint) DecRef() {
+ e.DecRefWithDestructor(func() {
+ e.stack.RemoveNIC(e.nicID)
+ })
+}
diff --git a/pkg/tcpip/link/tun/protocol.go b/pkg/tcpip/link/tun/protocol.go
new file mode 100644
index 000000000..89d9d91a9
--- /dev/null
+++ b/pkg/tcpip/link/tun/protocol.go
@@ -0,0 +1,56 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tun
+
+import (
+ "encoding/binary"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+)
+
+const (
+ // PacketInfoHeaderSize is the size of the packet information header.
+ PacketInfoHeaderSize = 4
+
+ offsetFlags = 0
+ offsetProtocol = 2
+)
+
+// PacketInfoFields contains fields sent through the wire if IFF_NO_PI flag is
+// not set.
+type PacketInfoFields struct {
+ Flags uint16
+ Protocol tcpip.NetworkProtocolNumber
+}
+
+// PacketInfoHeader is the wire representation of the packet information sent if
+// IFF_NO_PI flag is not set.
+type PacketInfoHeader []byte
+
+// Encode encodes f into h.
+func (h PacketInfoHeader) Encode(f *PacketInfoFields) {
+ binary.BigEndian.PutUint16(h[offsetFlags:][:2], f.Flags)
+ binary.BigEndian.PutUint16(h[offsetProtocol:][:2], uint16(f.Protocol))
+}
+
+// Flags returns the flag field in h.
+func (h PacketInfoHeader) Flags() uint16 {
+ return binary.BigEndian.Uint16(h[offsetFlags:])
+}
+
+// Protocol returns the protocol field in h.
+func (h PacketInfoHeader) Protocol() tcpip.NetworkProtocolNumber {
+ return tcpip.NetworkProtocolNumber(binary.BigEndian.Uint16(h[offsetProtocol:]))
+}
diff --git a/pkg/tcpip/link/tun/tun_unsafe.go b/pkg/tcpip/link/tun/tun_unsafe.go
new file mode 100644
index 000000000..09ca9b527
--- /dev/null
+++ b/pkg/tcpip/link/tun/tun_unsafe.go
@@ -0,0 +1,63 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build linux
+
+// Package tun contains methods to open TAP and TUN devices.
+package tun
+
+import (
+ "syscall"
+ "unsafe"
+)
+
+// Open opens the specified TUN device, sets it to non-blocking mode, and
+// returns its file descriptor.
+func Open(name string) (int, error) {
+ return open(name, syscall.IFF_TUN|syscall.IFF_NO_PI)
+}
+
+// OpenTAP opens the specified TAP device, sets it to non-blocking mode, and
+// returns its file descriptor.
+func OpenTAP(name string) (int, error) {
+ return open(name, syscall.IFF_TAP|syscall.IFF_NO_PI)
+}
+
+func open(name string, flags uint16) (int, error) {
+ fd, err := syscall.Open("/dev/net/tun", syscall.O_RDWR, 0)
+ if err != nil {
+ return -1, err
+ }
+
+ var ifr struct {
+ name [16]byte
+ flags uint16
+ _ [22]byte
+ }
+
+ copy(ifr.name[:], name)
+ ifr.flags = flags
+ _, _, errno := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), syscall.TUNSETIFF, uintptr(unsafe.Pointer(&ifr)))
+ if errno != 0 {
+ syscall.Close(fd)
+ return -1, errno
+ }
+
+ if err = syscall.SetNonblock(fd, true); err != nil {
+ syscall.Close(fd)
+ return -1, err
+ }
+
+ return fd, nil
+}
diff --git a/pkg/tcpip/link/waitable/BUILD b/pkg/tcpip/link/waitable/BUILD
new file mode 100644
index 000000000..0956d2c65
--- /dev/null
+++ b/pkg/tcpip/link/waitable/BUILD
@@ -0,0 +1,30 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "waitable",
+ srcs = [
+ "waitable.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/gate",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
+
+go_test(
+ name = "waitable_test",
+ srcs = [
+ "waitable_test.go",
+ ],
+ library = ":waitable",
+ deps = [
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/waitable/waitable.go b/pkg/tcpip/link/waitable/waitable.go
new file mode 100644
index 000000000..949b3f2b2
--- /dev/null
+++ b/pkg/tcpip/link/waitable/waitable.go
@@ -0,0 +1,149 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package waitable provides the implementation of data-link layer endpoints
+// that wrap other endpoints, and can wait for inflight calls to WritePacket or
+// DeliverNetworkPacket to finish (and new ones to be prevented).
+//
+// Waitable endpoints can be used in the networking stack by calling New(eID) to
+// create a new endpoint, where eID is the ID of the endpoint being wrapped,
+// and then passing it as an argument to Stack.CreateNIC().
+package waitable
+
+import (
+ "gvisor.dev/gvisor/pkg/gate"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// Endpoint is a waitable link-layer endpoint.
+type Endpoint struct {
+ dispatchGate gate.Gate
+ dispatcher stack.NetworkDispatcher
+
+ writeGate gate.Gate
+ lower stack.LinkEndpoint
+}
+
+// New creates a new waitable link-layer endpoint. It wraps around another
+// endpoint and allows the caller to block new write/dispatch calls and wait for
+// the inflight ones to finish before returning.
+func New(lower stack.LinkEndpoint) *Endpoint {
+ return &Endpoint{
+ lower: lower,
+ }
+}
+
+// DeliverNetworkPacket implements stack.NetworkDispatcher.DeliverNetworkPacket.
+// It is called by the link-layer endpoint being wrapped when a packet arrives,
+// and only forwards to the actual dispatcher if Wait or WaitDispatch haven't
+// been called.
+func (e *Endpoint) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ if !e.dispatchGate.Enter() {
+ return
+ }
+
+ e.dispatcher.DeliverNetworkPacket(remote, local, protocol, pkt)
+ e.dispatchGate.Leave()
+}
+
+// Attach implements stack.LinkEndpoint.Attach. It saves the dispatcher and
+// registers with the lower endpoint as its dispatcher so that "e" is called
+// for inbound packets.
+func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+ e.lower.Attach(e)
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *Endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU. It just forwards the request to the
+// lower endpoint.
+func (e *Endpoint) MTU() uint32 {
+ return e.lower.MTU()
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities. It just forwards the
+// request to the lower endpoint.
+func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.lower.Capabilities()
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It just
+// forwards the request to the lower endpoint.
+func (e *Endpoint) MaxHeaderLength() uint16 {
+ return e.lower.MaxHeaderLength()
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress. It just forwards the
+// request to the lower endpoint.
+func (e *Endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.lower.LinkAddress()
+}
+
+// WritePacket implements stack.LinkEndpoint.WritePacket. It is called by
+// higher-level protocols to write packets. It only forwards packets to the
+// lower endpoint if Wait or WaitWrite haven't been called.
+func (e *Endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ if !e.writeGate.Enter() {
+ return nil
+ }
+
+ err := e.lower.WritePacket(r, gso, protocol, pkt)
+ e.writeGate.Leave()
+ return err
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets. It is called by
+// higher-level protocols to write packets. It only forwards packets to the
+// lower endpoint if Wait or WaitWrite haven't been called.
+func (e *Endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ if !e.writeGate.Enter() {
+ return pkts.Len(), nil
+ }
+
+ n, err := e.lower.WritePackets(r, gso, pkts, protocol)
+ e.writeGate.Leave()
+ return n, err
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *Endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ if !e.writeGate.Enter() {
+ return nil
+ }
+
+ err := e.lower.WriteRawPacket(vv)
+ e.writeGate.Leave()
+ return err
+}
+
+// WaitWrite prevents new calls to WritePacket from reaching the lower endpoint,
+// and waits for inflight ones to finish before returning.
+func (e *Endpoint) WaitWrite() {
+ e.writeGate.Close()
+}
+
+// WaitDispatch prevents new calls to DeliverNetworkPacket from reaching the
+// actual dispatcher, and waits for inflight ones to finish before returning.
+func (e *Endpoint) WaitDispatch() {
+ e.dispatchGate.Close()
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (e *Endpoint) Wait() {}
diff --git a/pkg/tcpip/link/waitable/waitable_test.go b/pkg/tcpip/link/waitable/waitable_test.go
new file mode 100644
index 000000000..63bf40562
--- /dev/null
+++ b/pkg/tcpip/link/waitable/waitable_test.go
@@ -0,0 +1,173 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package waitable
+
+import (
+ "testing"
+
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+type countedEndpoint struct {
+ dispatchCount int
+ writeCount int
+ attachCount int
+
+ mtu uint32
+ capabilities stack.LinkEndpointCapabilities
+ hdrLen uint16
+ linkAddr tcpip.LinkAddress
+
+ dispatcher stack.NetworkDispatcher
+}
+
+func (e *countedEndpoint) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ e.dispatchCount++
+}
+
+func (e *countedEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.attachCount++
+ e.dispatcher = dispatcher
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *countedEndpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+func (e *countedEndpoint) MTU() uint32 {
+ return e.mtu
+}
+
+func (e *countedEndpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.capabilities
+}
+
+func (e *countedEndpoint) MaxHeaderLength() uint16 {
+ return e.hdrLen
+}
+
+func (e *countedEndpoint) LinkAddress() tcpip.LinkAddress {
+ return e.linkAddr
+}
+
+func (e *countedEndpoint) WritePacket(r *stack.Route, _ *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) *tcpip.Error {
+ e.writeCount++
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+func (e *countedEndpoint) WritePackets(r *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ e.writeCount += pkts.Len()
+ return pkts.Len(), nil
+}
+
+func (e *countedEndpoint) WriteRawPacket(buffer.VectorisedView) *tcpip.Error {
+ e.writeCount++
+ return nil
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (*countedEndpoint) Wait() {}
+
+func TestWaitWrite(t *testing.T) {
+ ep := &countedEndpoint{}
+ wep := New(ep)
+
+ // Write and check that it goes through.
+ wep.WritePacket(nil, nil /* gso */, 0, &stack.PacketBuffer{})
+ if want := 1; ep.writeCount != want {
+ t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want)
+ }
+
+ // Wait on dispatches, then try to write. It must go through.
+ wep.WaitDispatch()
+ wep.WritePacket(nil, nil /* gso */, 0, &stack.PacketBuffer{})
+ if want := 2; ep.writeCount != want {
+ t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want)
+ }
+
+ // Wait on writes, then try to write. It must not go through.
+ wep.WaitWrite()
+ wep.WritePacket(nil, nil /* gso */, 0, &stack.PacketBuffer{})
+ if want := 2; ep.writeCount != want {
+ t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want)
+ }
+}
+
+func TestWaitDispatch(t *testing.T) {
+ ep := &countedEndpoint{}
+ wep := New(ep)
+
+ // Check that attach happens.
+ wep.Attach(ep)
+ if want := 1; ep.attachCount != want {
+ t.Fatalf("Unexpected attachCount: got=%v, want=%v", ep.attachCount, want)
+ }
+
+ // Dispatch and check that it goes through.
+ ep.dispatcher.DeliverNetworkPacket("", "", 0, &stack.PacketBuffer{})
+ if want := 1; ep.dispatchCount != want {
+ t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want)
+ }
+
+ // Wait on writes, then try to dispatch. It must go through.
+ wep.WaitWrite()
+ ep.dispatcher.DeliverNetworkPacket("", "", 0, &stack.PacketBuffer{})
+ if want := 2; ep.dispatchCount != want {
+ t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want)
+ }
+
+ // Wait on dispatches, then try to dispatch. It must not go through.
+ wep.WaitDispatch()
+ ep.dispatcher.DeliverNetworkPacket("", "", 0, &stack.PacketBuffer{})
+ if want := 2; ep.dispatchCount != want {
+ t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want)
+ }
+}
+
+func TestOtherMethods(t *testing.T) {
+ const (
+ mtu = 0xdead
+ capabilities = 0xbeef
+ hdrLen = 0x1234
+ linkAddr = "test address"
+ )
+ ep := &countedEndpoint{
+ mtu: mtu,
+ capabilities: capabilities,
+ hdrLen: hdrLen,
+ linkAddr: linkAddr,
+ }
+ wep := New(ep)
+
+ if v := wep.MTU(); v != mtu {
+ t.Fatalf("Unexpected mtu: got=%v, want=%v", v, mtu)
+ }
+
+ if v := wep.Capabilities(); v != capabilities {
+ t.Fatalf("Unexpected capabilities: got=%v, want=%v", v, capabilities)
+ }
+
+ if v := wep.MaxHeaderLength(); v != hdrLen {
+ t.Fatalf("Unexpected MaxHeaderLength: got=%v, want=%v", v, hdrLen)
+ }
+
+ if v := wep.LinkAddress(); v != linkAddr {
+ t.Fatalf("Unexpected LinkAddress: got=%q, want=%q", v, linkAddr)
+ }
+}