summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBhasker Hariharan <bhaskerh@google.com>2019-01-29 01:37:54 -0800
committerShentubot <shentubot@google.com>2019-01-29 01:39:01 -0800
commit24cb2c0a7256cdb515c2fc2cfc90d130e2a405ef (patch)
tree602c0214434145c9ee6518fbb6a2c422149e026a
parentb44699c5299bb0fc1b16d25a9ac2250cf0a7446d (diff)
Use recvmmsg() instead of readv() to read packets from NIC.
This should reduce the number of syscalls required to process packets significantly and improve throughputs. PiperOrigin-RevId: 231366886 Change-Id: I8b38077262bf9c53176bc4a94b530188d3d7c0ca
-rw-r--r--pkg/tcpip/link/fdbased/BUILD1
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go166
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_test.go24
-rw-r--r--pkg/tcpip/link/rawfile/rawfile_unsafe.go31
-rw-r--r--runsc/boot/filter/config.go14
-rw-r--r--runsc/boot/network.go1
6 files changed, 199 insertions, 38 deletions
diff --git a/pkg/tcpip/link/fdbased/BUILD b/pkg/tcpip/link/fdbased/BUILD
index 94391433c..a4aa3feec 100644
--- a/pkg/tcpip/link/fdbased/BUILD
+++ b/pkg/tcpip/link/fdbased/BUILD
@@ -27,6 +27,7 @@ go_test(
"//pkg/tcpip",
"//pkg/tcpip/buffer",
"//pkg/tcpip/header",
+ "//pkg/tcpip/link/rawfile",
"//pkg/tcpip/stack",
],
)
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
index 3a79d13d4..87c8ab1fc 100644
--- a/pkg/tcpip/link/fdbased/endpoint.go
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -24,6 +24,7 @@
package fdbased
import (
+ "fmt"
"syscall"
"gvisor.googlesource.com/gvisor/pkg/tcpip"
@@ -33,9 +34,19 @@ import (
"gvisor.googlesource.com/gvisor/pkg/tcpip/stack"
)
+const (
+ // MaxMsgsPerRecv is the maximum number of packets we want to retrieve
+ // in a single RecvMMsg call.
+ MaxMsgsPerRecv = 8
+)
+
// BufConfig defines the shape of the vectorised view used to read packets from the NIC.
var BufConfig = []int{128, 256, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}
+// linkDispatcher reads packets from the link FD and dispatches them to the
+// NetworkDispatcher.
+type linkDispatcher func() (bool, *tcpip.Error)
+
type endpoint struct {
// fd is the file descriptor used to send and receive packets.
fd int
@@ -57,14 +68,25 @@ type endpoint struct {
// its end of the communication pipe.
closed func(*tcpip.Error)
- iovecs []syscall.Iovec
- views []buffer.View
- dispatcher stack.NetworkDispatcher
+ views [][]buffer.View
+ iovecs [][]syscall.Iovec
+ msgHdrs []rawfile.MMsgHdr
+ inboundDispatcher linkDispatcher
+ dispatcher stack.NetworkDispatcher
// handleLocal indicates whether packets destined to itself should be
// handled by the netstack internally (true) or be forwarded to the FD
// endpoint (false).
handleLocal bool
+
+ // useRecvMMsg enables use of recvmmsg() syscall instead of readv() to
+ // read inbound packets. This reduces # of syscalls needed to process
+ // packets.
+ //
+ // NOTE: recvmmsg() is only supported for sockets, so if the underlying
+ // FD is not a socket then the code will still fall back to the readv()
+ // path.
+ useRecvMMsg bool
}
// Options specify the details about the fd-based endpoint to be created.
@@ -78,6 +100,7 @@ type Options struct {
SaveRestore bool
DisconnectOk bool
HandleLocal bool
+ UseRecvMMsg bool
}
// New creates a new fd-based endpoint.
@@ -85,7 +108,10 @@ type Options struct {
// Makes fd non-blocking, but does not take ownership of fd, which must remain
// open for the lifetime of the returned endpoint.
func New(opts *Options) tcpip.LinkEndpointID {
- syscall.SetNonblock(opts.FD, true)
+ if err := syscall.SetNonblock(opts.FD, true); err != nil {
+ // TODO : replace panic with an error return.
+ panic(fmt.Sprintf("syscall.SetNonblock(%v) failed: %v", opts.FD, err))
+ }
caps := stack.LinkEndpointCapabilities(0)
if opts.ChecksumOffload {
@@ -113,13 +139,44 @@ func New(opts *Options) tcpip.LinkEndpointID {
closed: opts.ClosedFunc,
addr: opts.Address,
hdrSize: hdrSize,
- views: make([]buffer.View, len(BufConfig)),
- iovecs: make([]syscall.Iovec, len(BufConfig)),
handleLocal: opts.HandleLocal,
+ useRecvMMsg: opts.UseRecvMMsg,
+ }
+ // For non-socket FDs we read one packet a time (e.g. TAP devices)
+ msgsPerRecv := 1
+ e.inboundDispatcher = e.dispatch
+ // If the provided FD is a socket then we optimize packet reads by
+ // using recvmmsg() instead of read() to read packets in a batch.
+ if isSocketFD(opts.FD) && e.useRecvMMsg {
+ e.inboundDispatcher = e.recvMMsgDispatch
+ msgsPerRecv = MaxMsgsPerRecv
+ }
+
+ e.views = make([][]buffer.View, msgsPerRecv)
+ for i, _ := range e.views {
+ e.views[i] = make([]buffer.View, len(BufConfig))
+ }
+ e.iovecs = make([][]syscall.Iovec, msgsPerRecv)
+ for i, _ := range e.iovecs {
+ e.iovecs[i] = make([]syscall.Iovec, len(BufConfig))
+ }
+ e.msgHdrs = make([]rawfile.MMsgHdr, msgsPerRecv)
+ for i, _ := range e.msgHdrs {
+ e.msgHdrs[i].Msg.Iov = &e.iovecs[i][0]
+ e.msgHdrs[i].Msg.Iovlen = uint64(len(BufConfig))
}
return stack.RegisterLinkEndpoint(e)
}
+func isSocketFD(fd int) bool {
+ var stat syscall.Stat_t
+ if err := syscall.Fstat(fd, &stat); err != nil {
+ // TODO : replace panic with an error return.
+ panic(fmt.Sprintf("syscall.Fstat(%v,...) failed: %v", fd, err))
+ }
+ return (stat.Mode & syscall.S_IFSOCK) == syscall.S_IFSOCK
+}
+
// Attach launches the goroutine that reads packets from the file descriptor and
// dispatches them via the provided dispatcher.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
@@ -191,12 +248,12 @@ func (e *endpoint) WritePacket(r *stack.Route, hdr buffer.Prependable, payload b
return rawfile.NonBlockingWrite2(e.fd, hdr.View(), payload.ToView())
}
-func (e *endpoint) capViews(n int, buffers []int) int {
+func (e *endpoint) capViews(k, n int, buffers []int) int {
c := 0
for i, s := range buffers {
c += s
if c >= n {
- e.views[i].CapLength(s - (c - n))
+ e.views[k][i].CapLength(s - (c - n))
return i + 1
}
}
@@ -204,24 +261,26 @@ func (e *endpoint) capViews(n int, buffers []int) int {
}
func (e *endpoint) allocateViews(bufConfig []int) {
- for i, v := range e.views {
- if v != nil {
- break
- }
- b := buffer.NewView(bufConfig[i])
- e.views[i] = b
- e.iovecs[i] = syscall.Iovec{
- Base: &b[0],
- Len: uint64(len(b)),
+ for k := 0; k < len(e.views); k++ {
+ for i := 0; i < len(bufConfig); i++ {
+ if e.views[k][i] != nil {
+ break
+ }
+ b := buffer.NewView(bufConfig[i])
+ e.views[k][i] = b
+ e.iovecs[k][i] = syscall.Iovec{
+ Base: &b[0],
+ Len: uint64(len(b)),
+ }
}
}
}
// dispatch reads one packet from the file descriptor and dispatches it.
-func (e *endpoint) dispatch(largeV buffer.View) (bool, *tcpip.Error) {
+func (e *endpoint) dispatch() (bool, *tcpip.Error) {
e.allocateViews(BufConfig)
- n, err := rawfile.BlockingReadv(e.fd, e.iovecs)
+ n, err := rawfile.BlockingReadv(e.fd, e.iovecs[0])
if err != nil {
return false, err
}
@@ -235,14 +294,14 @@ func (e *endpoint) dispatch(largeV buffer.View) (bool, *tcpip.Error) {
remote, local tcpip.LinkAddress
)
if e.hdrSize > 0 {
- eth := header.Ethernet(e.views[0])
+ eth := header.Ethernet(e.views[0][0])
p = eth.Type()
remote = eth.SourceAddress()
local = eth.DestinationAddress()
} else {
// We don't get any indication of what the packet is, so try to guess
// if it's an IPv4 or IPv6 packet.
- switch header.IPVersion(e.views[0]) {
+ switch header.IPVersion(e.views[0][0]) {
case header.IPv4Version:
p = header.IPv4ProtocolNumber
case header.IPv6Version:
@@ -252,15 +311,71 @@ func (e *endpoint) dispatch(largeV buffer.View) (bool, *tcpip.Error) {
}
}
- used := e.capViews(n, BufConfig)
- vv := buffer.NewVectorisedView(n, e.views[:used])
+ used := e.capViews(0, n, BufConfig)
+ vv := buffer.NewVectorisedView(n, e.views[0][:used])
vv.TrimFront(e.hdrSize)
e.dispatcher.DeliverNetworkPacket(e, remote, local, p, vv)
// Prepare e.views for another packet: release used views.
for i := 0; i < used; i++ {
- e.views[i] = nil
+ e.views[0][i] = nil
+ }
+
+ return true, nil
+}
+
+// recvMMsgDispatch reads more than one packet at a time from the file
+// descriptor and dispatches it.
+func (e *endpoint) recvMMsgDispatch() (bool, *tcpip.Error) {
+ e.allocateViews(BufConfig)
+
+ nMsgs, err := rawfile.BlockingRecvMMsg(e.fd, e.msgHdrs)
+ if err != nil {
+ return false, err
+ }
+ // Process each of received packets.
+ for k := 0; k < nMsgs; k++ {
+ n := e.msgHdrs[k].Len
+ if n <= uint32(e.hdrSize) {
+ return false, nil
+ }
+
+ var (
+ p tcpip.NetworkProtocolNumber
+ remote, local tcpip.LinkAddress
+ )
+ if e.hdrSize > 0 {
+ eth := header.Ethernet(e.views[k][0])
+ p = eth.Type()
+ remote = eth.SourceAddress()
+ local = eth.DestinationAddress()
+ } else {
+ // We don't get any indication of what the packet is, so try to guess
+ // if it's an IPv4 or IPv6 packet.
+ switch header.IPVersion(e.views[k][0]) {
+ case header.IPv4Version:
+ p = header.IPv4ProtocolNumber
+ case header.IPv6Version:
+ p = header.IPv6ProtocolNumber
+ default:
+ return true, nil
+ }
+ }
+
+ used := e.capViews(k, int(n), BufConfig)
+ vv := buffer.NewVectorisedView(int(n), e.views[k][:used])
+ vv.TrimFront(e.hdrSize)
+ e.dispatcher.DeliverNetworkPacket(e, remote, local, p, vv)
+
+ // Prepare e.views for another packet: release used views.
+ for i := 0; i < used; i++ {
+ e.views[k][i] = nil
+ }
+ }
+
+ for k := 0; k < nMsgs; k++ {
+ e.msgHdrs[k].Len = 0
}
return true, nil
@@ -269,9 +384,8 @@ func (e *endpoint) dispatch(largeV buffer.View) (bool, *tcpip.Error) {
// dispatchLoop reads packets from the file descriptor in a loop and dispatches
// them to the network stack.
func (e *endpoint) dispatchLoop() *tcpip.Error {
- v := buffer.NewView(header.MaxIPPacketSize)
for {
- cont, err := e.dispatch(v)
+ cont, err := e.inboundDispatcher()
if err != nil || !cont {
if e.closed != nil {
e.closed(err)
diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go
index 226639443..14abacdf2 100644
--- a/pkg/tcpip/link/fdbased/endpoint_test.go
+++ b/pkg/tcpip/link/fdbased/endpoint_test.go
@@ -28,6 +28,7 @@ import (
"gvisor.googlesource.com/gvisor/pkg/tcpip"
"gvisor.googlesource.com/gvisor/pkg/tcpip/buffer"
"gvisor.googlesource.com/gvisor/pkg/tcpip/header"
+ "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile"
"gvisor.googlesource.com/gvisor/pkg/tcpip/stack"
)
@@ -309,9 +310,22 @@ func TestBufConfigFirst(t *testing.T) {
func build(bufConfig []int) *endpoint {
e := &endpoint{
- views: make([]buffer.View, len(bufConfig)),
- iovecs: make([]syscall.Iovec, len(bufConfig)),
+ views: make([][]buffer.View, MaxMsgsPerRecv),
+ iovecs: make([][]syscall.Iovec, MaxMsgsPerRecv),
+ msgHdrs: make([]rawfile.MMsgHdr, MaxMsgsPerRecv),
}
+
+ for i, _ := range e.views {
+ e.views[i] = make([]buffer.View, len(bufConfig))
+ }
+ for i := range e.iovecs {
+ e.iovecs[i] = make([]syscall.Iovec, len(bufConfig))
+ }
+ for k, msgHdr := range e.msgHdrs {
+ msgHdr.Msg.Iov = &e.iovecs[k][0]
+ msgHdr.Msg.Iovlen = uint64(len(bufConfig))
+ }
+
e.allocateViews(bufConfig)
return e
}
@@ -356,12 +370,12 @@ var capLengthTestCases = []struct {
func TestCapLength(t *testing.T) {
for _, c := range capLengthTestCases {
e := build(c.config)
- used := e.capViews(c.n, c.config)
+ used := e.capViews(0, c.n, c.config)
if used != c.wantUsed {
t.Errorf("Test \"%s\" failed when calling capViews(%d, %v). Got %d. Want %d", c.comment, c.n, c.config, used, c.wantUsed)
}
- lengths := make([]int, len(e.views))
- for i, v := range e.views {
+ lengths := make([]int, len(e.views[0]))
+ for i, v := range e.views[0] {
lengths[i] = len(v)
}
if !reflect.DeepEqual(lengths, c.wantLengths) {
diff --git a/pkg/tcpip/link/rawfile/rawfile_unsafe.go b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
index be4a4fa9c..5deea093a 100644
--- a/pkg/tcpip/link/rawfile/rawfile_unsafe.go
+++ b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
@@ -124,7 +124,7 @@ func BlockingRead(fd int, b []byte) (int, *tcpip.Error) {
// BlockingReadv reads from a file descriptor that is set up as non-blocking and
// stores the data in a list of iovecs buffers. If no data is available, it will
-// block in a poll() syscall until the file descirptor becomes readable.
+// block in a poll() syscall until the file descriptor becomes readable.
func BlockingReadv(fd int, iovecs []syscall.Iovec) (int, *tcpip.Error) {
for {
n, _, e := syscall.RawSyscall(syscall.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
@@ -143,3 +143,32 @@ func BlockingReadv(fd int, iovecs []syscall.Iovec) (int, *tcpip.Error) {
}
}
}
+
+// MMsgHdr represents the mmsg_hdr structure required by recvmmsg() on linux.
+type MMsgHdr struct {
+ Msg syscall.Msghdr
+ Len uint32
+ _ [4]byte
+}
+
+// BlockingRecvMMsg reads from a file descriptor that is set up as non-blocking
+// and stores the received messages in a slice of MMsgHdr structures. If no data
+// is available, it will block in a poll() syscall until the file descriptor
+// becomes readable.
+func BlockingRecvMMsg(fd int, msgHdrs []MMsgHdr) (int, *tcpip.Error) {
+ for {
+ n, _, e := syscall.RawSyscall6(syscall.SYS_RECVMMSG, uintptr(fd), uintptr(unsafe.Pointer(&msgHdrs[0])), uintptr(len(msgHdrs)), syscall.MSG_DONTWAIT, 0, 0)
+ if e == 0 {
+ return int(n), nil
+ }
+
+ event := pollEvent{
+ fd: int32(fd),
+ events: 1, // POLLIN
+ }
+
+ if _, e := blockingPoll(&event, 1, -1); e != 0 && e != syscall.EINTR {
+ return 0, TranslateErrno(e)
+ }
+ }
+}
diff --git a/runsc/boot/filter/config.go b/runsc/boot/filter/config.go
index 83c1fbcce..bde749861 100644
--- a/runsc/boot/filter/config.go
+++ b/runsc/boot/filter/config.go
@@ -211,23 +211,25 @@ var allowedSyscalls = seccomp.SyscallRules{
syscall.SYS_PREAD64: {},
syscall.SYS_PWRITE64: {},
syscall.SYS_READ: {},
- syscall.SYS_READV: []seccomp.Rule{
+ syscall.SYS_RECVMSG: []seccomp.Rule{
{
seccomp.AllowAny{},
seccomp.AllowAny{},
- seccomp.AllowValue(len(fdbased.BufConfig)),
+ seccomp.AllowValue(syscall.MSG_DONTWAIT | syscall.MSG_TRUNC),
},
- },
- syscall.SYS_RECVMSG: []seccomp.Rule{
{
seccomp.AllowAny{},
seccomp.AllowAny{},
- seccomp.AllowValue(syscall.MSG_DONTWAIT | syscall.MSG_TRUNC),
+ seccomp.AllowValue(syscall.MSG_DONTWAIT | syscall.MSG_TRUNC | syscall.MSG_PEEK),
},
+ },
+ syscall.SYS_RECVMMSG: []seccomp.Rule{
{
seccomp.AllowAny{},
seccomp.AllowAny{},
- seccomp.AllowValue(syscall.MSG_DONTWAIT | syscall.MSG_TRUNC | syscall.MSG_PEEK),
+ seccomp.AllowValue(fdbased.MaxMsgsPerRecv),
+ seccomp.AllowValue(syscall.MSG_DONTWAIT),
+ seccomp.AllowValue(0),
},
},
syscall.SYS_RESTART_SYSCALL: {},
diff --git a/runsc/boot/network.go b/runsc/boot/network.go
index 89f186139..83d56f93a 100644
--- a/runsc/boot/network.go
+++ b/runsc/boot/network.go
@@ -140,6 +140,7 @@ func (n *Network) CreateLinksAndRoutes(args *CreateLinksAndRoutesArgs, _ *struct
EthernetHeader: true,
HandleLocal: true,
Address: mac,
+ UseRecvMMsg: true,
})
log.Infof("Enabling interface %q with id %d on addresses %+v (%v)", link.Name, nicID, link.Addresses, mac)