summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2020-04-30 16:39:18 -0700
committergVisor bot <gvisor-bot@google.com>2020-04-30 16:41:00 -0700
commitae15d90436ec5ecd8795bed2a357b1990123e8fd (patch)
treee0daf9de0392e62c395694e0203445334cac3ba3 /pkg/tcpip/link
parent01beec3bb457a2a3a7313c7fe6dc795817f47746 (diff)
FIFO QDisc implementation
Updates #231 PiperOrigin-RevId: 309323808
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r--pkg/tcpip/link/fdbased/BUILD1
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go104
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_unsafe.go10
-rw-r--r--pkg/tcpip/link/qdisc/fifo/BUILD19
-rw-r--r--pkg/tcpip/link/qdisc/fifo/endpoint.go209
-rw-r--r--pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go84
6 files changed, 384 insertions, 43 deletions
diff --git a/pkg/tcpip/link/fdbased/BUILD b/pkg/tcpip/link/fdbased/BUILD
index abe725548..aa6db9aea 100644
--- a/pkg/tcpip/link/fdbased/BUILD
+++ b/pkg/tcpip/link/fdbased/BUILD
@@ -14,6 +14,7 @@ go_library(
],
visibility = ["//visibility:public"],
deps = [
+ "//pkg/binary",
"//pkg/sync",
"//pkg/tcpip",
"//pkg/tcpip/buffer",
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
index b857ce9d0..53a9712c6 100644
--- a/pkg/tcpip/link/fdbased/endpoint.go
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -44,6 +44,7 @@ import (
"syscall"
"golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/binary"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -428,7 +429,7 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.Ne
}
}
- vnetHdrBuf := vnetHdrToByteSlice(&vnetHdr)
+ vnetHdrBuf := binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
return rawfile.NonBlockingWrite3(fd, vnetHdrBuf, pkt.Header.View(), pkt.Data.ToView())
}
@@ -439,19 +440,10 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.Ne
return rawfile.NonBlockingWrite3(fd, pkt.Header.View(), pkt.Data.ToView(), nil)
}
-// WritePackets writes outbound packets to the file descriptor. If it is not
-// currently writable, the packet is dropped.
-//
-// NOTE: This API uses sendmmsg to batch packets. As a result the underlying FD
-// picked to write the packet out has to be the same for all packets in the
-// list. In other words all packets in the batch should belong to the same
-// flow.
-func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.PacketBufferList, protocol tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
- n := pkts.Len()
-
- mmsgHdrs := make([]rawfile.MMsgHdr, n)
- i := 0
- for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+func (e *endpoint) sendBatch(batchFD int, batch []*stack.PacketBuffer) (int, *tcpip.Error) {
+ // Send a batch of packets through batchFD.
+ mmsgHdrs := make([]rawfile.MMsgHdr, 0, len(batch))
+ for _, pkt := range batch {
var ethHdrBuf []byte
iovLen := 0
if e.hdrSize > 0 {
@@ -459,13 +451,13 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
ethHdrBuf = make([]byte, header.EthernetMinimumSize)
eth := header.Ethernet(ethHdrBuf)
ethHdr := &header.EthernetFields{
- DstAddr: r.RemoteLinkAddress,
- Type: protocol,
+ DstAddr: pkt.EgressRoute.RemoteLinkAddress,
+ Type: pkt.NetworkProtocolNumber,
}
// Preserve the src address if it's set in the route.
- if r.LocalLinkAddress != "" {
- ethHdr.SrcAddr = r.LocalLinkAddress
+ if pkt.EgressRoute.LocalLinkAddress != "" {
+ ethHdr.SrcAddr = pkt.EgressRoute.LocalLinkAddress
} else {
ethHdr.SrcAddr = e.addr
}
@@ -473,34 +465,34 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
iovLen++
}
- var vnetHdrBuf []byte
vnetHdr := virtioNetHdr{}
+ var vnetHdrBuf []byte
if e.Capabilities()&stack.CapabilityHardwareGSO != 0 {
- if gso != nil {
+ if pkt.GSOOptions != nil {
vnetHdr.hdrLen = uint16(pkt.Header.UsedLength())
- if gso.NeedsCsum {
+ if pkt.GSOOptions.NeedsCsum {
vnetHdr.flags = _VIRTIO_NET_HDR_F_NEEDS_CSUM
- vnetHdr.csumStart = header.EthernetMinimumSize + gso.L3HdrLen
- vnetHdr.csumOffset = gso.CsumOffset
+ vnetHdr.csumStart = header.EthernetMinimumSize + pkt.GSOOptions.L3HdrLen
+ vnetHdr.csumOffset = pkt.GSOOptions.CsumOffset
}
- if gso.Type != stack.GSONone && uint16(pkt.Data.Size()) > gso.MSS {
- switch gso.Type {
+ if pkt.GSOOptions.Type != stack.GSONone && uint16(pkt.Data.Size()) > pkt.GSOOptions.MSS {
+ switch pkt.GSOOptions.Type {
case stack.GSOTCPv4:
vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV4
case stack.GSOTCPv6:
vnetHdr.gsoType = _VIRTIO_NET_HDR_GSO_TCPV6
default:
- panic(fmt.Sprintf("Unknown gso type: %v", gso.Type))
+ panic(fmt.Sprintf("Unknown gso type: %v", pkt.GSOOptions.Type))
}
- vnetHdr.gsoSize = gso.MSS
+ vnetHdr.gsoSize = pkt.GSOOptions.MSS
}
}
- vnetHdrBuf = vnetHdrToByteSlice(&vnetHdr)
+ vnetHdrBuf = binary.Marshal(make([]byte, 0, virtioNetHdrSize), binary.LittleEndian, vnetHdr)
iovLen++
}
iovecs := make([]syscall.Iovec, iovLen+1+len(pkt.Data.Views()))
- mmsgHdr := &mmsgHdrs[i]
+ var mmsgHdr rawfile.MMsgHdr
mmsgHdr.Msg.Iov = &iovecs[0]
iovecIdx := 0
if vnetHdrBuf != nil {
@@ -535,22 +527,68 @@ func (e *endpoint) WritePackets(r *stack.Route, gso *stack.GSO, pkts stack.Packe
pktSize += vec.Len
}
mmsgHdr.Msg.Iovlen = uint64(iovecIdx)
- i++
+ mmsgHdrs = append(mmsgHdrs, mmsgHdr)
}
packets := 0
- for packets < n {
- fd := e.fds[pkts.Front().Hash%uint32(len(e.fds))]
- sent, err := rawfile.NonBlockingSendMMsg(fd, mmsgHdrs)
+ for len(mmsgHdrs) > 0 {
+ sent, err := rawfile.NonBlockingSendMMsg(batchFD, mmsgHdrs)
if err != nil {
return packets, err
}
packets += sent
mmsgHdrs = mmsgHdrs[sent:]
}
+
return packets, nil
}
+// WritePackets writes outbound packets to the underlying file descriptors. If
+// one is not currently writable, the packet is dropped.
+//
+// Being a batch API, each packet in pkts should have the following
+// fields populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ // Preallocate to avoid repeated reallocation as we append to batch.
+ // batchSz is 47 because when SWGSO is in use then a single 65KB TCP
+ // segment can get split into 46 segments of 1420 bytes and a single 216
+ // byte segment.
+ const batchSz = 47
+ batch := make([]*stack.PacketBuffer, 0, batchSz)
+ batchFD := -1
+ sentPackets := 0
+ for pkt := pkts.Front(); pkt != nil; pkt = pkt.Next() {
+ if len(batch) == 0 {
+ batchFD = e.fds[pkt.Hash%uint32(len(e.fds))]
+ }
+ pktFD := e.fds[pkt.Hash%uint32(len(e.fds))]
+ if sendNow := pktFD != batchFD; !sendNow {
+ batch = append(batch, pkt)
+ continue
+ }
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ batch = batch[:0]
+ batch = append(batch, pkt)
+ batchFD = pktFD
+ }
+
+ if len(batch) != 0 {
+ n, err := e.sendBatch(batchFD, batch)
+ sentPackets += n
+ if err != nil {
+ return sentPackets, err
+ }
+ }
+ return sentPackets, nil
+}
+
// viewsEqual tests whether v1 and v2 refer to the same backing bytes.
func viewsEqual(vs1, vs2 []buffer.View) bool {
return len(vs1) == len(vs2) && (len(vs1) == 0 || &vs1[0] == &vs2[0])
diff --git a/pkg/tcpip/link/fdbased/endpoint_unsafe.go b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
index d81858353..df14eaad1 100644
--- a/pkg/tcpip/link/fdbased/endpoint_unsafe.go
+++ b/pkg/tcpip/link/fdbased/endpoint_unsafe.go
@@ -17,17 +17,7 @@
package fdbased
import (
- "reflect"
"unsafe"
)
const virtioNetHdrSize = int(unsafe.Sizeof(virtioNetHdr{}))
-
-func vnetHdrToByteSlice(hdr *virtioNetHdr) (slice []byte) {
- *(*reflect.SliceHeader)(unsafe.Pointer(&slice)) = reflect.SliceHeader{
- Data: uintptr((unsafe.Pointer(hdr))),
- Len: virtioNetHdrSize,
- Cap: virtioNetHdrSize,
- }
- return
-}
diff --git a/pkg/tcpip/link/qdisc/fifo/BUILD b/pkg/tcpip/link/qdisc/fifo/BUILD
new file mode 100644
index 000000000..054c213bc
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/BUILD
@@ -0,0 +1,19 @@
+load("//tools:defs.bzl", "go_library")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "fifo",
+ srcs = [
+ "endpoint.go",
+ "packet_buffer_queue.go",
+ ],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/sleep",
+ "//pkg/sync",
+ "//pkg/tcpip",
+ "//pkg/tcpip/buffer",
+ "//pkg/tcpip/stack",
+ ],
+)
diff --git a/pkg/tcpip/link/qdisc/fifo/endpoint.go b/pkg/tcpip/link/qdisc/fifo/endpoint.go
new file mode 100644
index 000000000..be9fec3b3
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/endpoint.go
@@ -0,0 +1,209 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fifo provides the implementation of data-link layer endpoints that
+// wrap another endpoint and queues all outbound packets and asynchronously
+// dispatches them to the lower endpoint.
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sleep"
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/buffer"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// endpoint represents a LinkEndpoint which implements a FIFO queue for all
+// outgoing packets. endpoint can have 1 or more underlying queueDispatchers.
+// All outgoing packets are consistenly hashed to a single underlying queue
+// using the PacketBuffer.Hash if set, otherwise all packets are queued to the
+// first queue to avoid reordering in case of missing hash.
+type endpoint struct {
+ dispatcher stack.NetworkDispatcher
+ lower stack.LinkEndpoint
+ wg sync.WaitGroup
+ dispatchers []*queueDispatcher
+}
+
+// queueDispatcher is responsible for dispatching all outbound packets in its
+// queue. It will also smartly batch packets when possible and write them
+// through the lower LinkEndpoint.
+type queueDispatcher struct {
+ lower stack.LinkEndpoint
+ q *packetBufferQueue
+ newPacketWaker sleep.Waker
+ closeWaker sleep.Waker
+}
+
+// New creates a new fifo link endpoint with the n queues with maximum
+// capacity of queueLen.
+func New(lower stack.LinkEndpoint, n int, queueLen int) stack.LinkEndpoint {
+ e := &endpoint{
+ lower: lower,
+ }
+ // Create the required dispatchers
+ for i := 0; i < n; i++ {
+ qd := &queueDispatcher{
+ q: &packetBufferQueue{limit: queueLen},
+ lower: lower,
+ }
+ e.dispatchers = append(e.dispatchers, qd)
+ e.wg.Add(1)
+ go func() {
+ defer e.wg.Done()
+ qd.dispatchLoop()
+ }()
+ }
+ return e
+}
+
+func (q *queueDispatcher) dispatchLoop() {
+ const newPacketWakerID = 1
+ const closeWakerID = 2
+ s := sleep.Sleeper{}
+ s.AddWaker(&q.newPacketWaker, newPacketWakerID)
+ s.AddWaker(&q.closeWaker, closeWakerID)
+ defer s.Done()
+
+ const batchSize = 32
+ var batch stack.PacketBufferList
+ for {
+ id, ok := s.Fetch(true)
+ if ok && id == closeWakerID {
+ return
+ }
+ for pkt := q.q.dequeue(); pkt != nil; pkt = q.q.dequeue() {
+ batch.PushBack(pkt)
+ if batch.Len() < batchSize && !q.q.empty() {
+ continue
+ }
+ // We pass a protocol of zero here because each packet carries its
+ // NetworkProtocol.
+ q.lower.WritePackets(nil /* route */, nil /* gso */, batch, 0 /* protocol */)
+ for pkt := batch.Front(); pkt != nil; pkt = pkt.Next() {
+ pkt.EgressRoute.Release()
+ batch.Remove(pkt)
+ }
+ batch.Reset()
+ }
+ }
+}
+
+// DeliverNetworkPacket implements stack.NetworkDispatcher.DeliverNetworkPacket.
+func (e *endpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) {
+ e.dispatcher.DeliverNetworkPacket(e, remote, local, protocol, pkt)
+}
+
+// Attach implements stack.LinkEndpoint.Attach.
+func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ e.dispatcher = dispatcher
+ e.lower.Attach(e)
+}
+
+// IsAttached implements stack.LinkEndpoint.IsAttached.
+func (e *endpoint) IsAttached() bool {
+ return e.dispatcher != nil
+}
+
+// MTU implements stack.LinkEndpoint.MTU.
+func (e *endpoint) MTU() uint32 {
+ return e.lower.MTU()
+}
+
+// Capabilities implements stack.LinkEndpoint.Capabilities.
+func (e *endpoint) Capabilities() stack.LinkEndpointCapabilities {
+ return e.lower.Capabilities()
+}
+
+// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength.
+func (e *endpoint) MaxHeaderLength() uint16 {
+ return e.lower.MaxHeaderLength()
+}
+
+// LinkAddress implements stack.LinkEndpoint.LinkAddress.
+func (e *endpoint) LinkAddress() tcpip.LinkAddress {
+ return e.lower.LinkAddress()
+}
+
+// GSOMaxSize returns the maximum GSO packet size.
+func (e *endpoint) GSOMaxSize() uint32 {
+ if gso, ok := e.lower.(stack.GSOEndpoint); ok {
+ return gso.GSOMaxSize()
+ }
+ return 0
+}
+
+// WritePacket implements stack.LinkEndpoint.WritePacket.
+func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, protocol tcpip.NetworkProtocolNumber, pkt stack.PacketBuffer) *tcpip.Error {
+ // WritePacket caller's do not set the following fields in PacketBuffer
+ // so we populate them here.
+ newRoute := r.Clone()
+ pkt.EgressRoute = &newRoute
+ pkt.GSOOptions = gso
+ pkt.NetworkProtocolNumber = protocol
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ if !d.q.enqueue(&pkt) {
+ return tcpip.ErrNoBufferSpace
+ }
+ d.newPacketWaker.Assert()
+ return nil
+}
+
+// WritePackets implements stack.LinkEndpoint.WritePackets.
+//
+// Being a batch API each packet in pkts should have the following fields
+// populated:
+// - pkt.EgressRoute
+// - pkt.GSOOptions
+// - pkt.NetworkProtocolNumber
+func (e *endpoint) WritePackets(_ *stack.Route, _ *stack.GSO, pkts stack.PacketBufferList, _ tcpip.NetworkProtocolNumber) (int, *tcpip.Error) {
+ enqueued := 0
+ for pkt := pkts.Front(); pkt != nil; {
+ d := e.dispatchers[int(pkt.Hash)%len(e.dispatchers)]
+ nxt := pkt.Next()
+ // Since qdisc can hold onto a packet for long we should Clone
+ // the route here to ensure it doesn't get released while the
+ // packet is still in our queue.
+ newRoute := pkt.EgressRoute.Clone()
+ pkt.EgressRoute = &newRoute
+ if !d.q.enqueue(pkt) {
+ if enqueued > 0 {
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, tcpip.ErrNoBufferSpace
+ }
+ pkt = nxt
+ enqueued++
+ d.newPacketWaker.Assert()
+ }
+ return enqueued, nil
+}
+
+// WriteRawPacket implements stack.LinkEndpoint.WriteRawPacket.
+func (e *endpoint) WriteRawPacket(vv buffer.VectorisedView) *tcpip.Error {
+ return e.lower.WriteRawPacket(vv)
+}
+
+// Wait implements stack.LinkEndpoint.Wait.
+func (e *endpoint) Wait() {
+ e.lower.Wait()
+
+ // The linkEP is gone. Teardown the outbound dispatcher goroutines.
+ for i := range e.dispatchers {
+ e.dispatchers[i].closeWaker.Assert()
+ }
+
+ e.wg.Wait()
+}
diff --git a/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
new file mode 100644
index 000000000..eb5abb906
--- /dev/null
+++ b/pkg/tcpip/link/qdisc/fifo/packet_buffer_queue.go
@@ -0,0 +1,84 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fifo
+
+import (
+ "gvisor.dev/gvisor/pkg/sync"
+ "gvisor.dev/gvisor/pkg/tcpip/stack"
+)
+
+// packetBufferQueue is a bounded, thread-safe queue of PacketBuffers.
+//
+type packetBufferQueue struct {
+ mu sync.Mutex
+ list stack.PacketBufferList
+ limit int
+ used int
+}
+
+// emptyLocked determines if the queue is empty.
+// Preconditions: q.mu must be held.
+func (q *packetBufferQueue) emptyLocked() bool {
+ return q.used == 0
+}
+
+// empty determines if the queue is empty.
+func (q *packetBufferQueue) empty() bool {
+ q.mu.Lock()
+ r := q.emptyLocked()
+ q.mu.Unlock()
+
+ return r
+}
+
+// setLimit updates the limit. No PacketBuffers are immediately dropped in case
+// the queue becomes full due to the new limit.
+func (q *packetBufferQueue) setLimit(limit int) {
+ q.mu.Lock()
+ q.limit = limit
+ q.mu.Unlock()
+}
+
+// enqueue adds the given packet to the queue.
+//
+// Returns true when the PacketBuffer is successfully added to the queue, in
+// which case ownership of the reference is transferred to the queue. And
+// returns false if the queue is full, in which case ownership is retained by
+// the caller.
+func (q *packetBufferQueue) enqueue(s *stack.PacketBuffer) bool {
+ q.mu.Lock()
+ r := q.used < q.limit
+ if r {
+ q.list.PushBack(s)
+ q.used++
+ }
+ q.mu.Unlock()
+
+ return r
+}
+
+// dequeue removes and returns the next PacketBuffer from queue, if one exists.
+// Ownership is transferred to the caller.
+func (q *packetBufferQueue) dequeue() *stack.PacketBuffer {
+ q.mu.Lock()
+ s := q.list.Front()
+ if s != nil {
+ q.list.Remove(s)
+ q.used--
+ }
+ q.mu.Unlock()
+
+ return s
+}