summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2020-04-30 16:39:18 -0700
committergVisor bot <gvisor-bot@google.com>2020-04-30 16:41:00 -0700
commitae15d90436ec5ecd8795bed2a357b1990123e8fd (patch)
treee0daf9de0392e62c395694e0203445334cac3ba3
parent01beec3bb457a2a3a7313c7fe6dc795817f47746 (diff)
FIFO QDisc implementation
Updates #231 PiperOrigin-RevId: 309323808
-rw-r--r--benchmarks/tcp/BUILD1
-rw-r--r--benchmarks/tcp/tcp_proxy.go3
-rw-r--r--pkg/tcpip/link/fdbased/BUILD1
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go104
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_unsafe.go10
-rw-r--r--pkg/tcpip/link/qdisc/fifo/BUILD19
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go209
-rw-r--r--pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go84
-rw-r--r--pkg/tcpip/network/arp/arp.go7
-rw-r--r--pkg/tcpip/network/ipv4/ipv4.go5
-rw-r--r--pkg/tcpip/network/ipv6/ipv6.go5
-rw-r--r--pkg/tcpip/stack/forwarder_test.go4
-rw-r--r--pkg/tcpip/stack/packet_buffer.go6
-rw-r--r--pkg/tcpip/stack/registration.go4
-rw-r--r--pkg/tcpip/stack/route.go6
-rw-r--r--pkg/tcpip/stack/stack_test.go4
-rw-r--r--pkg/tcpip/transport/tcp/connect.go3
-rw-r--r--runsc/boot/BUILD1
-rw-r--r--runsc/boot/config.go5
-rw-r--r--runsc/boot/network.go50
-rw-r--r--runsc/main.go8
-rw-r--r--runsc/sandbox/network.go5
22 files changed, 497 insertions, 47 deletions
diff --git a/benchmarks/tcp/BUILD b/benchmarks/tcp/BUILD
index d5e401acc..6dde7d9e6 100644
--- a/benchmarks/tcp/BUILD
+++ b/benchmarks/tcp/BUILD
@@ -10,6 +10,7 @@ go_binary(
"//pkg/tcpip",
"//pkg/tcpip/adapters/gonet",
"//pkg/tcpip/link/fdbased",
+ "//pkg/tcpip/link/qdisc/fifo",
"//pkg/tcpip/network/arp",
"//pkg/tcpip/network/ipv4",
"//pkg/tcpip/stack",
diff --git a/benchmarks/tcp/tcp_proxy.go b/benchmarks/tcp/tcp_proxy.go
index 73b7c4f5b..dc1593b34 100644
--- a/benchmarks/tcp/tcp_proxy.go
+++ b/benchmarks/tcp/tcp_proxy.go
@@ -36,6 +36,7 @@ import (
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
"gvisor.dev/gvisor/pkg/tcpip/link/fdbased"
+ "gvisor.dev/gvisor/pkg/tcpip/link/qdisc/fifo"
"gvisor.dev/gvisor/pkg/tcpip/network/arp"
"gvisor.dev/gvisor/pkg/tcpip/network/ipv4"
"gvisor.dev/gvisor/pkg/tcpip/stack"
@@ -203,7 +204,7 @@ func newNetstackImpl(mode string) (impl, error) {
if err != nil {
return nil, fmt.Errorf("failed to create FD endpoint: %v", err)
}
- if err := s.CreateNIC(nicID, ep); err != nil {
+ if err := s.CreateNIC(nicID, fifo.New(ep, runtime.GOMAXPROCS(0), 1000)); err != nil {
return nil, fmt.Errorf("error creating NIC %q: %v", *iface, err)
}
if err := s.AddAddress(nicID, arp.ProtocolNumber, arp.ProtocolAddress); err != nil {
diff --git a/pkg/tcpip/link/fdbased/BUILD b/pkg/tcpip/link/fdbased/BUILD
index abe725548..aa6db9aea 100644
--- a/pkg/tcpip/link/fdbased/BUILD
+++ b/pkg/tcpip/link/fdbased/BUILD
@@ -14,6 +14,7 @@ go_library(
],
visibility = ["//visibility:public"],
deps = [
+ "//pkg/binary",
"//pkg/sync",
"//pkg/tcpip",
"//pkg/tcpip/buffer",
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
index b857ce9d0..53a9712c6 100644
--- a/pkg/tcpip/link/fdbased/endpoint.go
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -44,6 +44,7 @@ import (
"syscall"
"golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/binary"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -428,7 +429,7 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.Ne
}
}
- vnetHdrBuf := vnetHdrToByteSlice(&vnetHdr)
+ vnetHdrBuf := binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
return rawfile.NonBlockingWrite3(fd, vnetHdrBuf, pkt.Header.View(), pkt.Data.ToView())
}
@@ -439,19 +440,10 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.Ne
return rawfile.NonBlockingWrite3(fd, pkt.Header.View(), pkt.Data.ToView(), nil)
}
-// WritePackets writes outbound packets to the file descriptor. If it is not
-// currently writable, the packet is dropped.
-//
-// NOTE: This API uses sendmmsg to batch packets. As a result the underlying FD
-// picked to write the packet out has to be the same for all packets in the
-// list. In other words all packets in the batch should belong to the same
-// flow.
-func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
- n := pkts.Len()
-
- mmsgHdrs := make([]rawfile.MMsgHdr, n)
- i := 0
- for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+func (e *endpoint) sendBatch(batchFD int, batch []*stack.PacketBuffer) (int, *tcpip.Error) {
+ // Send a batch of packets through batchFD.
+ mmsgHdrs := make([]rawfile.MMsgHdr, 0, len(batch))
+ for _, pkt := range batch {
var ethHdrBuf []byte
iovLen := 0
if e.hdrSize > 0 {
@@ -459,13 +451,13 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
ethHdrBuf = make([]byte, header.EthernetMinimumSize)
eth := header.Ethernet(ethHdrBuf)
ethHdr := &header.EthernetFields{
- DstAddr: r.RemoteLinkAddress,
- Type: protocol,
+ DstAddr: pkt.EgressRoute.RemoteLinkAddress,
+ Type: pkt.NetworkProtocolNumber,
}
// Preserve the src address if it's set in the route.
- if r.LocalLinkAddress != "" {
- ethHdr.SrcAddr = r.LocalLinkAddress
+ if pkt.EgressRoute.LocalLinkAddress != "" {
+ ethHdr.SrcAddr = pkt.EgressRoute.LocalLinkAddress
} else {
ethHdr.SrcAddr = e.addr
}
@@ -473,34 +465,34 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
iovLen++
}
- var vnetHdrBuf []byte
vnetHdr := virtioNetHdr{}
+ var vnetHdrBuf []byte
if e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
- if gso != nil {
+ if pkt.GSOOptions != nil {
vnetHdr.hdrLen = uint16(pkt.Header.UsedLength())
- if gso.NeedsCsum {
+ if pkt.GSOOptions.NeedsCsum {
vnetHdr.flags = _VIRTIO_NET_HDR_F_NEEDS_CSUM
- vnetHdr.csumStart = header.EthernetMinimumSize + gso.L3HdrLen
- vnetHdr.csumOffset = gso.CsumOffset
+ vnetHdr.csumStart = header.EthernetMinimumSize + pkt.GSOOptions.L3HdrLen
+ vnetHdr.csumOffset = pkt.GSOOptions.CsumOffset
}
- if gso.Type != stack.GSONone && uint16(pkt.Data.Size()) > gso.MSS {
- switch gso.Type {
+ if pkt.GSOOptions.Type != stack.GSONone && uint16(pkt.Data.Size()) > pkt.GSOOptions.MSS {
+ switch pkt.GSOOptions.Type {
case stack.GSOTCPv4:
vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV4
case stack.GSOTCPv6:
vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV6
default:
- panic(fmt.Sprintf("Unknown gso type: %v", gso.Type))
+ panic(fmt.Sprintf("Unknown gso type: %v", pkt.GSOOptions.Type))
}
- vnetHdr.gsoSize = gso.MSS
+ vnetHdr.gsoSize = pkt.GSOOptions.MSS
}
}
- vnetHdrBuf = vnetHdrToByteSlice(&vnetHdr)
+ vnetHdrBuf = binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
iovLen++
}
iovecs := make([]syscall.Iovec, iovLen+1+len(pkt.Data.Views()))
- mmsgHdr := &mmsgHdrs[i]
+ var mmsgHdr rawfile.MMsgHdr
mmsgHdr.Msg.Iov = &iovecs[0]
iovecIdx := 0
if vnetHdrBuf != nil {
@@ -535,22 +527,68 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
pktSize += vec.Len
}
mmsgHdr.Msg.Iovlen = uint64(iovecIdx)
- i++
+ mmsgHdrs = append(mmsgHdrs, mmsgHdr)
}
packets := 0
- for packets < n {
- fd := e.fds[pkts.Front().Hash%uint32(len(e.fds))]
- sent, err := rawfile.NonBlockingSendMMsg(fd, mmsgHdrs)
+ for len(mmsgHdrs) > 0 {
+ sent, err := rawfile.NonBlockingSendMMsg(batchFD, mmsgHdrs)
if err != nil {
return packets, err
}
packets += sent
mmsgHdrs = mmsgHdrs[sent:]
}
+
return packets, nil
}
+// WritePackets writes outbound packets to the underlying file descriptors. If
+// one is not currently writable, the packet is dropped.
+//
+// Being a batch API, each packet in pkts should have the following
+// fields populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ // Preallocate to avoid repeated reallocation as we append to batch.
+ // batchSz is 47 because when SWGSO is in use then a single 65KB TCP
+ // segment can get split into 46 segments of 1420 bytes and a single 216
+ // byte segment.
+ const batchSz = 47
+ batch := make([]*stack.PacketBuffer, 0, batchSz)
+ batchFD := -1
+ sentPackets := 0
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+ if len(batch) == 0 {
+ batchFD = e.fds[pkt.Hash%uint32(len(e.fds))]
+ }
+ pktFD := e.fds[pkt.Hash%uint32(len(e.fds))]
+ if sendNow := pktFD != batchFD; !sendNow {
+ batch = append(batch, pkt)
+ continue
+ }
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ batch = batch[:0]
+ batch = append(batch, pkt)
+ batchFD = pktFD
+ }
+
+ if len(batch) != 0 {
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ }
+ return sentPackets, nil
+}
+
// viewsEqual tests whether v1 and v2 refer to the same backing bytes.
func viewsEqual(vs1, vs2 []buffer.View) bool {
return len(vs1) == len(vs2) && (len(vs1) == 0 || &vs1[0] == &vs2[0])
diff --git a/pkg/tcpip/link/fdbased/endpoint_unsafe.go b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
index d81858353..df14eaad1 100644
--- a/pkg/tcpip/link/fdbased/endpoint_unsafe.go
+++ b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
@@ -17,17 +17,7 @@
package fdbased
import (
- "reflect"
"unsafe"
)
const virtioNetHdrSize = int(unsafe.Sizeof(virtioNetHdr{}))
-
-func vnetHdrToByteSlice(hdr *virtioNetHdr) (slice []byte) {
- *(*reflect.SliceHeader)(unsafe.Pointer(&slice)) = reflect.SliceHeader{
- Data: uintptr((unsafe.Pointer(hdr))),
- Len: virtioNetHdrSize,
- Cap: virtioNetHdrSize,
- }
- return
-}
diff --git a/pkg/tcpip/link/qdisc/fifo/BUILD b/pkg/tcpip/link/qdisc/fifo/BUILD
new file mode 100644
index 000000000..054c213bc
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/BUILD
@@ -0,0 +1,19 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "fifo",
+ srcs = [
+ "endpoint.go",
+ "packet_buffer_queue.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/sleep",
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
new file mode 100644
index 000000000..be9fec3b3
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -0,0 +1,209 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fifo provides the implementation of data-link layer endpoints that
+// wrap another endpoint and queues all outbound packets and asynchronously
+// dispatches them to the lower endpoint.
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// endpoint represents a LinkEndpoint which implements a FIFO queue for all
+// outgoing packets. endpoint can have 1 or more underlying queueDispatchers.
+// All outgoing packets are consistenly hashed to a single underlying queue
+// using the PacketBuffer.Hash if set, otherwise all packets are queued to the
+// first queue to avoid reordering in case of missing hash.
+type endpoint struct {
+ dispatcher stack.NetworkDispatcher
+ lower stack.LinkEndpoint
+ wg sync.WaitGroup
+ dispatchers []*queueDispatcher
+}
+
+// queueDispatcher is responsible for dispatching all outbound packets in its
+// queue. It will also smartly batch packets when possible and write them
+// through the lower LinkEndpoint.
+type queueDispatcher struct {
+ lower stack.LinkEndpoint
+ q *packetBufferQueue
+ newPacketWaker sleep.Waker
+ closeWaker sleep.Waker
+}
+
+// New creates a new fifo link endpoint with the n queues with maximum
+// capacity of queueLen.
+func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
+ e := &endpoint{
+ lower: lower,
+ }
+ // Create the required dispatchers
+ for i := 0; i < n; i++ {
+ qd := &queueDispatcher{
+ q: &packetBufferQueue{limit: queueLen},
+ lower: lower,
+ }
+ e.dispatchers = append(e.dispatchers, qd)
+ e.wg.Add(1)
+ go func() {
+ defer e.wg.Done()
+ qd.dispatchLoop()
+ }()
+ }
+ return e
+}
+
+func (q *queueDispatcher) dispatchLoop() {
+ const newPacketWakerID = 1
+ const closeWakerID = 2
+ s := sleep.Sleeper{}
+ s.AddWaker(&q.newPacketWaker, newPacketWakerID)
+ s.AddWaker(&q.closeWaker, closeWakerID)
+ defer s.Done()
+
+ const batchSize = 32
+ var batch stack.PacketBufferList
+ for {
+ id, ok := s.Fetch(true)
+ if ok && id == closeWakerID {
+ return
+ }
+ for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
+ batch.PushBack(pkt)
+ if batch.Len() < batchSize && !q.q.empty() {
+ continue
+ }
+ // We pass a protocol of zero here because each packet carries its
+ // NetworkProtocol.
+ q.lower.WritePackets(nil /* route */, nil /* gso */, batch, 0 /* protocol */)
+ for pkt := batch.Front(); pkt != nil; pkt = pkt.Next() {
+ pkt.EgressRoute.Release()
+ batch.Remove(pkt)
+ }
+ batch.Reset()
+ }
+ }
+}
+
+// DeliverNetworkPacket implements stack.NetworkDispatcher.DeliverNetworkPacket.
+func (e *endpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket(e, remote, local, protocol, pkt)
+}
+
+// Attach implements stack.LinkEndpoint.Attach.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+ e.lower.Attach(e)
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU.
+func (e *endpoint) MTU() uint32 {
+ return e.lower.MTU()
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (e *endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.lower.Capabilities()
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength.
+func (e *endpoint) MaxHeaderLength() uint16 {
+ return e.lower.MaxHeaderLength()
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.lower.LinkAddress()
+}
+
+// GSOMaxSize returns the maximum GSO packet size.
+func (e *endpoint) GSOMaxSize() uint32 {
+ if gso, ok := e.lower.(stack.GSOEndpoint); ok {
+ return gso.GSOMaxSize()
+ }
+ return 0
+}
+
+// WritePacket implements stack.LinkEndpoint.WritePacket.
+func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) *tcpip.Error {
+ // WritePacket caller's do not set the following fields in PacketBuffer
+ // so we populate them here.
+ newRoute := r.Clone()
+ pkt.EgressRoute = &newRoute
+ pkt.GSOOptions = gso
+ pkt.NetworkProtocolNumber = protocol
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ if !d.q.enqueue(&pkt) {
+ return tcpip.ErrNoBufferSpace
+ }
+ d.newPacketWaker.Assert()
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+//
+// Being a batch API each packet in pkts should have the following fields
+// populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ enqueued := 0
+ for pkt := pkts.Front(); pkt != nil; {
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ nxt := pkt.Next()
+ // Since qdisc can hold onto a packet for long we should Clone
+ // the route here to ensure it doesn't get released while the
+ // packet is still in our queue.
+ newRoute := pkt.EgressRoute.Clone()
+ pkt.EgressRoute = &newRoute
+ if !d.q.enqueue(pkt) {
+ if enqueued > 0 {
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, tcpip.ErrNoBufferSpace
+ }
+ pkt = nxt
+ enqueued++
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, nil
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ return e.lower.WriteRawPacket(vv)
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (e *endpoint) Wait() {
+ e.lower.Wait()
+
+ // The linkEP is gone. Teardown the outbound dispatcher goroutines.
+ for i := range e.dispatchers {
+ e.dispatchers[i].closeWaker.Assert()
+ }
+
+ e.wg.Wait()
+}
diff --git a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
new file mode 100644
index 000000000..eb5abb906
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
@@ -0,0 +1,84 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// packetBufferQueue is a bounded, thread-safe queue of PacketBuffers.
+//
+type packetBufferQueue struct {
+ mu sync.Mutex
+ list stack.PacketBufferList
+ limit int
+ used int
+}
+
+// emptyLocked determines if the queue is empty.
+// Preconditions: q.mu must be held.
+func (q *packetBufferQueue) emptyLocked() bool {
+ return q.used == 0
+}
+
+// empty determines if the queue is empty.
+func (q *packetBufferQueue) empty() bool {
+ q.mu.Lock()
+ r := q.emptyLocked()
+ q.mu.Unlock()
+
+ return r
+}
+
+// setLimit updates the limit. No PacketBuffers are immediately dropped in case
+// the queue becomes full due to the new limit.
+func (q *packetBufferQueue) setLimit(limit int) {
+ q.mu.Lock()
+ q.limit = limit
+ q.mu.Unlock()
+}
+
+// enqueue adds the given packet to the queue.
+//
+// Returns true when the PacketBuffer is successfully added to the queue, in
+// which case ownership of the reference is transferred to the queue. And
+// returns false if the queue is full, in which case ownership is retained by
+// the caller.
+func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool {
+ q.mu.Lock()
+ r := q.used < q.limit
+ if r {
+ q.list.PushBack(s)
+ q.used++
+ }
+ q.mu.Unlock()
+
+ return r
+}
+
+// dequeue removes and returns the next PacketBuffer from queue, if one exists.
+// Ownership is transferred to the caller.
+func (q *packetBufferQueue) dequeue() *stack.PacketBuffer {
+ q.mu.Lock()
+ s := q.list.Front()
+ if s != nil {
+ q.list.Remove(s)
+ q.used--
+ }
+ q.mu.Unlock()
+
+ return s
+}
diff --git a/pkg/tcpip/network/arp/arp.go b/pkg/tcpip/network/arp/arp.go
index 7acbfa0a8..9f47b4ff2 100644
--- a/pkg/tcpip/network/arp/arp.go
+++ b/pkg/tcpip/network/arp/arp.go
@@ -42,6 +42,7 @@ const (
// endpoint implements stack.NetworkEndpoint.
type endpoint struct {
+ protocol *protocol
nicID tcpip.NICID
linkEP stack.LinkEndpoint
linkAddrCache stack.LinkAddressCache
@@ -83,6 +84,11 @@ func (e *endpoint) WritePacket(*stack.Route, *stack.GSO, stack.NetworkHeaderPara
return tcpip.ErrNotSupported
}
+// NetworkProtocolNumber implements stack.NetworkEndpoint.NetworkProtocolNumber.
+func (e *endpoint) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return e.protocol.Number()
+}
+
// WritePackets implements stack.NetworkEndpoint.WritePackets.
func (e *endpoint) WritePackets(*stack.Route, *stack.GSO, stack.PacketBufferList, stack.NetworkHeaderParams) (int, *tcpip.Error) {
return 0, tcpip.ErrNotSupported
@@ -142,6 +148,7 @@ func (p *protocol) NewEndpoint(nicID tcpip.NICID, addrWithPrefix tcpip.AddressWi
return nil, tcpip.ErrBadLocalAddress
}
return &endpoint{
+ protocol: p,
nicID: nicID,
linkEP: sender,
linkAddrCache: linkAddrCache,
diff --git a/pkg/tcpip/network/ipv4/ipv4.go b/pkg/tcpip/network/ipv4/ipv4.go
index 104aafbed..a9dec0c0e 100644
--- a/pkg/tcpip/network/ipv4/ipv4.go
+++ b/pkg/tcpip/network/ipv4/ipv4.go
@@ -118,6 +118,11 @@ func (e *endpoint) GSOMaxSize() uint32 {
return 0
}
+// NetworkProtocolNumber implements stack.NetworkEndpoint.NetworkProtocolNumber.
+func (e *endpoint) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return e.protocol.Number()
+}
+
// writePacketFragments calls e.linkEP.WritePacket with each packet fragment to
// write. It assumes that the IP header is entirely in pkt.Header but does not
// assume that only the IP header is in pkt.Header. It assumes that the input
diff --git a/pkg/tcpip/network/ipv6/ipv6.go b/pkg/tcpip/network/ipv6/ipv6.go
index 331b0817b..82928fb66 100644
--- a/pkg/tcpip/network/ipv6/ipv6.go
+++ b/pkg/tcpip/network/ipv6/ipv6.go
@@ -416,6 +416,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, pkt stack.PacketBuffer) {
// Close cleans up resources associated with the endpoint.
func (*endpoint) Close() {}
+// NetworkProtocolNumber implements stack.NetworkEndpoint.NetworkProtocolNumber.
+func (e *endpoint) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return e.protocol.Number()
+}
+
type protocol struct {
// defaultTTL is the current default TTL for the protocol. Only the
// uint8 portion of it is meaningful and it must be accessed
diff --git a/pkg/tcpip/stack/forwarder_test.go b/pkg/tcpip/stack/forwarder_test.go
index e9c652042..9bc97b84e 100644
--- a/pkg/tcpip/stack/forwarder_test.go
+++ b/pkg/tcpip/stack/forwarder_test.go
@@ -89,6 +89,10 @@ func (f *fwdTestNetworkEndpoint) Capabilities() LinkEndpointCapabilities {
return f.ep.Capabilities()
}
+func (f *fwdTestNetworkEndpoint) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return f.proto.Number()
+}
+
func (f *fwdTestNetworkEndpoint) WritePacket(r *Route, gso *GSO, params NetworkHeaderParams, pkt PacketBuffer) *tcpip.Error {
// Add the protocol's header to the packet and send it to the link
// endpoint.
diff --git a/pkg/tcpip/stack/packet_buffer.go b/pkg/tcpip/stack/packet_buffer.go
index dc125f25e..06d312207 100644
--- a/pkg/tcpip/stack/packet_buffer.go
+++ b/pkg/tcpip/stack/packet_buffer.go
@@ -60,6 +60,12 @@ type PacketBuffer struct {
// Owner is implemented by task to get the uid and gid.
// Only set for locally generated packets.
Owner tcpip.PacketOwner
+
+ // The following fields are only set by the qdisc layer when the packet
+ // is added to a queue.
+ EgressRoute *Route
+ GSOOptions *GSO
+ NetworkProtocolNumber tcpip.NetworkProtocolNumber
}
// Clone makes a copy of pk. It clones the Data field, which creates a new
diff --git a/pkg/tcpip/stack/registration.go b/pkg/tcpip/stack/registration.go
index 23ca9ee03..b331427c6 100644
--- a/pkg/tcpip/stack/registration.go
+++ b/pkg/tcpip/stack/registration.go
@@ -269,6 +269,10 @@ type NetworkEndpoint interface {
// Close is called when the endpoint is reomved from a stack.
Close()
+
+ // NetworkProtocolNumber returns the tcpip.NetworkProtocolNumber for
+ // this endpoint.
+ NetworkProtocolNumber() tcpip.NetworkProtocolNumber
}
// NetworkProtocol is the interface that needs to be implemented by network
diff --git a/pkg/tcpip/stack/route.go b/pkg/tcpip/stack/route.go
index a0e5e0300..53148dc03 100644
--- a/pkg/tcpip/stack/route.go
+++ b/pkg/tcpip/stack/route.go
@@ -217,6 +217,12 @@ func (r *Route) MTU() uint32 {
return r.ref.ep.MTU()
}
+// NetworkProtocolNumber returns the NetworkProtocolNumber of the underlying
+// network endpoint.
+func (r *Route) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return r.ref.ep.NetworkProtocolNumber()
+}
+
// Release frees all resources associated with the route.
func (r *Route) Release() {
if r.ref != nil {
diff --git a/pkg/tcpip/stack/stack_test.go b/pkg/tcpip/stack/stack_test.go
index 4a686c891..3f4e08434 100644
--- a/pkg/tcpip/stack/stack_test.go
+++ b/pkg/tcpip/stack/stack_test.go
@@ -126,6 +126,10 @@ func (f *fakeNetworkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
return f.ep.Capabilities()
}
+func (f *fakeNetworkEndpoint) NetworkProtocolNumber() tcpip.NetworkProtocolNumber {
+ return f.proto.Number()
+}
+
func (f *fakeNetworkEndpoint) WritePacket(r *stack.Route, gso *stack.GSO, params stack.NetworkHeaderParams, pkt stack.PacketBuffer) *tcpip.Error {
// Increment the sent packet count in the protocol descriptor.
f.proto.sendPacketCount[int(r.RemoteAddress[0])%len(f.proto.sendPacketCount)]++
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index 76e27bf26..a7e088d4e 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -801,6 +801,9 @@ func sendTCPBatch(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso
pkt.Header = buffer.NewPrependable(hdrSize)
pkt.Hash = tf.txHash
pkt.Owner = owner
+ pkt.EgressRoute = r
+ pkt.GSOOptions = gso
+ pkt.NetworkProtocolNumber = r.NetworkProtocolNumber()
data.ReadToVV(&pkt.Data, packetSize)
buildTCPHdr(r, tf, &pkt, gso)
tf.seq = tf.seq.Add(seqnum.Size(packetSize))
diff --git a/runsc/boot/BUILD b/runsc/boot/BUILD
index ed3c8f546..abcaf4206 100644
--- a/runsc/boot/BUILD
+++ b/runsc/boot/BUILD
@@ -88,6 +88,7 @@ go_library(
"//pkg/tcpip",
"//pkg/tcpip/link/fdbased",
"//pkg/tcpip/link/loopback",
+ "//pkg/tcpip/link/qdisc/fifo",
"//pkg/tcpip/link/sniffer",
"//pkg/tcpip/network/arp",
"//pkg/tcpip/network/ipv4",
diff --git a/runsc/boot/config.go b/runsc/boot/config.go
index 715a19112..6d6a705f8 100644
--- a/runsc/boot/config.go
+++ b/runsc/boot/config.go
@@ -187,6 +187,10 @@ type Config struct {
// SoftwareGSO indicates that software segmentation offload is enabled.
SoftwareGSO bool
+ // QDisc indicates the type of queuening discipline to use by default
+ // for non-loopback interfaces.
+ QDisc QueueingDiscipline
+
// LogPackets indicates that all network packets should be logged.
LogPackets bool
@@ -294,6 +298,7 @@ func (c *Config) ToFlags() []string {
"--gso=" + strconv.FormatBool(c.HardwareGSO),
"--software-gso=" + strconv.FormatBool(c.SoftwareGSO),
"--overlayfs-stale-read=" + strconv.FormatBool(c.OverlayfsStaleRead),
+ "--qdisc=" + c.QDisc.String(),
}
if c.CPUNumFromQuota {
f = append(f, "--cpu-num-from-quota")
diff --git a/runsc/boot/network.go b/runsc/boot/network.go
index bee6ee336..0af30456e 100644
--- a/runsc/boot/network.go
+++ b/runsc/boot/network.go
@@ -17,6 +17,7 @@ package boot
import (
"fmt"
"net"
+ "runtime"
"strings"
"syscall"
@@ -24,6 +25,7 @@ import (
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/link/fdbased"
"gvisor.dev/gvisor/pkg/tcpip/link/loopback"
+ "gvisor.dev/gvisor/pkg/tcpip/link/qdisc/fifo"
"gvisor.dev/gvisor/pkg/tcpip/link/sniffer"
"gvisor.dev/gvisor/pkg/tcpip/network/arp"
"gvisor.dev/gvisor/pkg/tcpip/network/ipv4"
@@ -75,6 +77,44 @@ type DefaultRoute struct {
Name string
}
+// QueueingDiscipline is used to specify the kind of Queueing Discipline to
+// apply for a give FDBasedLink.
+type QueueingDiscipline int
+
+const (
+ // QDiscNone disables any queueing for the underlying FD.
+ QDiscNone QueueingDiscipline = iota
+
+ // QDiscFIFO applies a simple fifo based queue to the underlying
+ // FD.
+ QDiscFIFO
+)
+
+// MakeQueueingDiscipline if possible the equivalent QueuingDiscipline for s
+// else returns an error.
+func MakeQueueingDiscipline(s string) (QueueingDiscipline, error) {
+ switch s {
+ case "none":
+ return QDiscNone, nil
+ case "fifo":
+ return QDiscFIFO, nil
+ default:
+ return 0, fmt.Errorf("unsupported qdisc specified: %q", s)
+ }
+}
+
+// String implements fmt.Stringer.
+func (q QueueingDiscipline) String() string {
+ switch q {
+ case QDiscNone:
+ return "none"
+ case QDiscFIFO:
+ return "fifo"
+ default:
+ panic(fmt.Sprintf("Invalid queueing discipline: %d", q))
+ }
+}
+
// FDBasedLink configures an fd-based link.
type FDBasedLink struct {
Name string
@@ -84,6 +124,7 @@ type FDBasedLink struct {
GSOMaxSize uint32
SoftwareGSOEnabled bool
LinkAddress net.HardwareAddr
+ QDisc QueueingDiscipline
// NumChannels controls how many underlying FD's are to be used to
// create this endpoint.
@@ -185,6 +226,8 @@ func (n *Network) CreateLinksAndRoutes(args *CreateLinksAndRoutesArgs, _ *struct
}
mac := tcpip.LinkAddress(link.LinkAddress)
+ log.Infof("gso max size is: %d", link.GSOMaxSize)
+
linkEP, err := fdbased.New(&fdbased.Options{
FDs: FDs,
MTU: uint32(link.MTU),
@@ -199,6 +242,13 @@ func (n *Network) CreateLinksAndRoutes(args *CreateLinksAndRoutesArgs, _ *struct
return err
}
+ switch link.QDisc {
+ case QDiscNone:
+ case QDiscFIFO:
+ log.Infof("Enabling FIFO QDisc on %q", link.Name)
+ linkEP = fifo.New(linkEP, runtime.GOMAXPROCS(0), 1000)
+ }
+
log.Infof("Enabling interface %q with id %d on addresses %+v (%v) w/ %d channels", link.Name, nicID, link.Addresses, mac, link.NumChannels)
if err := n.createNICWithAddrs(nicID, link.Name, linkEP, link.Addresses); err != nil {
return err
diff --git a/runsc/main.go b/runsc/main.go
index 8e594c58e..0216e9481 100644
--- a/runsc/main.go
+++ b/runsc/main.go
@@ -72,6 +72,7 @@ var (
network = flag.String("network", "sandbox", "specifies which network to use: sandbox (default), host, none. Using network inside the sandbox is more secure because it's isolated from the host network.")
hardwareGSO = flag.Bool("gso", true, "enable hardware segmentation offload if it is supported by a network device.")
softwareGSO = flag.Bool("software-gso", true, "enable software segmentation offload when hardware ofload can't be enabled.")
+ qDisc = flag.String("qdisc", "none", "specifies which queueing discipline to apply by default to the non loopback nics used by the sandbox.")
fileAccess = flag.String("file-access", "exclusive", "specifies which filesystem to use for the root mount: exclusive (default), shared. Volume mounts are always shared.")
fsGoferHostUDS = flag.Bool("fsgofer-host-uds", false, "allow the gofer to mount Unix Domain Sockets.")
overlay = flag.Bool("overlay", false, "wrap filesystem mounts with writable overlay. All modifications are stored in memory inside the sandbox.")
@@ -198,6 +199,11 @@ func main() {
cmd.Fatalf("%v", err)
}
+ queueingDiscipline, err := boot.MakeQueueingDiscipline(*qDisc)
+ if err != nil {
+ cmd.Fatalf("%s", err)
+ }
+
// Sets the reference leak check mode. Also set it in config below to
// propagate it to child processes.
refs.SetLeakMode(refsLeakMode)
@@ -232,7 +238,7 @@ func main() {
OverlayfsStaleRead: *overlayfsStaleRead,
CPUNumFromQuota: *cpuNumFromQuota,
VFS2: *vfs2Enabled,
-
+ QDisc: queueingDiscipline,
TestOnlyAllowRunAsCurrentUserWithoutChroot: *testOnlyAllowRunAsCurrentUserWithoutChroot,
TestOnlyTestNameEnv: *testOnlyTestNameEnv,
}
diff --git a/runsc/sandbox/network.go b/runsc/sandbox/network.go
index bc093fba5..209bfdb20 100644
--- a/runsc/sandbox/network.go
+++ b/runsc/sandbox/network.go
@@ -62,7 +62,7 @@ func setupNetwork(conn *urpc.Client, pid int, spec *specs.Spec, conf *boot.Confi
// Build the path to the net namespace of the sandbox process.
// This is what we will copy.
nsPath := filepath.Join("/proc", strconv.Itoa(pid), "ns/net")
- if err := createInterfacesAndRoutesFromNS(conn, nsPath, conf.HardwareGSO, conf.SoftwareGSO, conf.NumNetworkChannels); err != nil {
+ if err := createInterfacesAndRoutesFromNS(conn, nsPath, conf.HardwareGSO, conf.SoftwareGSO, conf.NumNetworkChannels, conf.QDisc); err != nil {
return fmt.Errorf("creating interfaces from net namespace %q: %v", nsPath, err)
}
case boot.NetworkHost:
@@ -115,7 +115,7 @@ func isRootNS() (bool, error) {
// createInterfacesAndRoutesFromNS scrapes the interface and routes from the
// net namespace with the given path, creates them in the sandbox, and removes
// them from the host.
-func createInterfacesAndRoutesFromNS(conn *urpc.Client, nsPath string, hardwareGSO bool, softwareGSO bool, numNetworkChannels int) error {
+func createInterfacesAndRoutesFromNS(conn *urpc.Client, nsPath string, hardwareGSO bool, softwareGSO bool, numNetworkChannels int, qDisc boot.QueueingDiscipline) error {
// Join the network namespace that we will be copying.
restore, err := joinNetNS(nsPath)
if err != nil {
@@ -201,6 +201,7 @@ func createInterfacesAndRoutesFromNS(conn *urpc.Client, nsPath string, hardwareG
MTU: iface.MTU,
Routes: routes,
NumChannels: numNetworkChannels,
+ QDisc: qDisc,
}
// Get the link for the interface.