diff options
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r-- | pkg/tcpip/link/fdbased/endpoint.go | 168 | ||||
-rw-r--r-- | pkg/tcpip/link/fdbased/endpoint_test.go | 2 | ||||
-rw-r--r-- | pkg/tcpip/link/sniffer/sniffer.go | 10 |
3 files changed, 130 insertions, 50 deletions
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go index 1f889c2a0..b88e2e7bf 100644 --- a/pkg/tcpip/link/fdbased/endpoint.go +++ b/pkg/tcpip/link/fdbased/endpoint.go @@ -21,12 +21,29 @@ // FD based endpoints can be used in the networking stack by calling New() to // create a new endpoint, and then passing it as an argument to // Stack.CreateNIC(). +// +// FD based endpoints can use more than one file descriptor to read incoming +// packets. If there are more than one FDs specified and the underlying FD is an +// AF_PACKET then the endpoint will enable FANOUT mode on the socket so that the +// host kernel will consistently hash the packets to the sockets. This ensures +// that packets for the same TCP streams are not reordered. +// +// Similarly if more than one FD's are specified where the underlying FD is not +// AF_PACKET then it's the caller's responsibility to ensure that all inbound +// packets on the descriptors are consistently 5 tuple hashed to one of the +// descriptors to prevent TCP reordering. +// +// Since netstack today does not compute 5 tuple hashes for outgoing packets we +// only use the first FD to write outbound packets. Once 5 tuple hashes for +// all outbound packets are available we will make use of all underlying FD's to +// write outbound packets. package fdbased import ( "fmt" "syscall" + "golang.org/x/sys/unix" "gvisor.googlesource.com/gvisor/pkg/tcpip" "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" "gvisor.googlesource.com/gvisor/pkg/tcpip/header" @@ -65,8 +82,10 @@ const ( ) type endpoint struct { - // fd is the file descriptor used to send and receive packets. - fd int + // fds is the set of file descriptors each identifying one inbound/outbound + // channel. The endpoint will dispatch from all inbound channels as well as + // hash outbound packets to specific channels based on the packet hash. + fds []int // mtu (maximum transmission unit) is the maximum size of a packet. mtu uint32 @@ -85,8 +104,8 @@ type endpoint struct { // its end of the communication pipe. closed func(*tcpip.Error) - inboundDispatcher linkDispatcher - dispatcher stack.NetworkDispatcher + inboundDispatchers []linkDispatcher + dispatcher stack.NetworkDispatcher // packetDispatchMode controls the packet dispatcher used by this // endpoint. @@ -99,17 +118,47 @@ type endpoint struct { // Options specify the details about the fd-based endpoint to be created. type Options struct { - FD int - MTU uint32 - EthernetHeader bool - ClosedFunc func(*tcpip.Error) - Address tcpip.LinkAddress - SaveRestore bool - DisconnectOk bool - GSOMaxSize uint32 + // FDs is a set of FDs used to read/write packets. + FDs []int + + // MTU is the mtu to use for this endpoint. + MTU uint32 + + // EthernetHeader if true, indicates that the endpoint should read/write + // ethernet frames instead of IP packets. + EthernetHeader bool + + // ClosedFunc is a function to be called when an endpoint's peer (if + // any) closes its end of the communication pipe. + ClosedFunc func(*tcpip.Error) + + // Address is the link address for this endpoint. Only used if + // EthernetHeader is true. + Address tcpip.LinkAddress + + // SaveRestore if true, indicates that this NIC capability set should + // include CapabilitySaveRestore + SaveRestore bool + + // DisconnectOk if true, indicates that this NIC capability set should + // include CapabilityDisconnectOk. + DisconnectOk bool + + // GSOMaxSize is the maximum GSO packet size. It is zero if GSO is + // disabled. + GSOMaxSize uint32 + + // PacketDispatchMode specifies the type of inbound dispatcher to be + // used for this endpoint. PacketDispatchMode PacketDispatchMode - TXChecksumOffload bool - RXChecksumOffload bool + + // TXChecksumOffload if true, indicates that this endpoints capability + // set should include CapabilityTXChecksumOffload. + TXChecksumOffload bool + + // RXChecksumOffload if true, indicates that this endpoints capability + // set should include CapabilityRXChecksumOffload. + RXChecksumOffload bool } // New creates a new fd-based endpoint. @@ -117,10 +166,6 @@ type Options struct { // Makes fd non-blocking, but does not take ownership of fd, which must remain // open for the lifetime of the returned endpoint. func New(opts *Options) (tcpip.LinkEndpointID, error) { - if err := syscall.SetNonblock(opts.FD, true); err != nil { - return 0, fmt.Errorf("syscall.SetNonblock(%v) failed: %v", opts.FD, err) - } - caps := stack.LinkEndpointCapabilities(0) if opts.RXChecksumOffload { caps |= stack.CapabilityRXChecksumOffload @@ -144,8 +189,12 @@ func New(opts *Options) (tcpip.LinkEndpointID, error) { caps |= stack.CapabilityDisconnectOk } + if len(opts.FDs) == 0 { + return 0, fmt.Errorf("opts.FD is empty, at least one FD must be specified") + } + e := &endpoint{ - fd: opts.FD, + fds: opts.FDs, mtu: opts.MTU, caps: caps, closed: opts.ClosedFunc, @@ -154,46 +203,71 @@ func New(opts *Options) (tcpip.LinkEndpointID, error) { packetDispatchMode: opts.PacketDispatchMode, } - isSocket, err := isSocketFD(e.fd) - if err != nil { - return 0, err - } - if isSocket { - if opts.GSOMaxSize != 0 { - e.caps |= stack.CapabilityGSO - e.gsoMaxSize = opts.GSOMaxSize + // Create per channel dispatchers. + for i := 0; i < len(e.fds); i++ { + fd := e.fds[i] + if err := syscall.SetNonblock(fd, true); err != nil { + return 0, fmt.Errorf("syscall.SetNonblock(%v) failed: %v", fd, err) } - } - e.inboundDispatcher, err = createInboundDispatcher(e, isSocket) - if err != nil { - return 0, fmt.Errorf("createInboundDispatcher(...) = %v", err) + + isSocket, err := isSocketFD(fd) + if err != nil { + return 0, err + } + if isSocket { + if opts.GSOMaxSize != 0 { + e.caps |= stack.CapabilityGSO + e.gsoMaxSize = opts.GSOMaxSize + } + } + inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket) + if err != nil { + return 0, fmt.Errorf("createInboundDispatcher(...) = %v", err) + } + e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher) } return stack.RegisterLinkEndpoint(e), nil } -func createInboundDispatcher(e *endpoint, isSocket bool) (linkDispatcher, error) { +func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher, error) { // By default use the readv() dispatcher as it works with all kinds of // FDs (tap/tun/unix domain sockets and af_packet). - inboundDispatcher, err := newReadVDispatcher(e.fd, e) + inboundDispatcher, err := newReadVDispatcher(fd, e) if err != nil { - return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", e.fd, e, err) + return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", fd, e, err) } if isSocket { + sa, err := unix.Getsockname(fd) + if err != nil { + return nil, fmt.Errorf("unix.Getsockname(%d) = %v", fd, err) + } + switch sa.(type) { + case *unix.SockaddrLinklayer: + // enable PACKET_FANOUT mode is the underlying socket is + // of type AF_PACKET. + const fanoutID = 1 + const fanoutType = 0x8000 // PACKET_FANOUT_HASH | PACKET_FANOUT_FLAG_DEFRAG + fanoutArg := fanoutID | fanoutType<<16 + if err := syscall.SetsockoptInt(fd, syscall.SOL_PACKET, unix.PACKET_FANOUT, fanoutArg); err != nil { + return nil, fmt.Errorf("failed to enable PACKET_FANOUT option: %v", err) + } + } + switch e.packetDispatchMode { case PacketMMap: - inboundDispatcher, err = newPacketMMapDispatcher(e.fd, e) + inboundDispatcher, err = newPacketMMapDispatcher(fd, e) if err != nil { - return nil, fmt.Errorf("newPacketMMapDispatcher(%d, %+v) = %v", e.fd, e, err) + return nil, fmt.Errorf("newPacketMMapDispatcher(%d, %+v) = %v", fd, e, err) } case RecvMMsg: // If the provided FD is a socket then we optimize // packet reads by using recvmmsg() instead of read() to // read packets in a batch. - inboundDispatcher, err = newRecvMMsgDispatcher(e.fd, e) + inboundDispatcher, err = newRecvMMsgDispatcher(fd, e) if err != nil { - return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", e.fd, e, err) + return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", fd, e, err) } } } @@ -215,7 +289,9 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) { // Link endpoints are not savable. When transportation endpoints are // saved, they stop sending outgoing packets and all incoming packets // are rejected. - go e.dispatchLoop() // S/R-SAFE: See above. + for i := range e.inboundDispatchers { + go e.dispatchLoop(e.inboundDispatchers[i]) // S/R-SAFE: See above. + } } // IsAttached implements stack.LinkEndpoint.IsAttached. @@ -305,26 +381,26 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen } } - return rawfile.NonBlockingWrite3(e.fd, vnetHdrBuf, hdr.View(), payload.ToView()) + return rawfile.NonBlockingWrite3(e.fds[0], vnetHdrBuf, hdr.View(), payload.ToView()) } if payload.Size() == 0 { - return rawfile.NonBlockingWrite(e.fd, hdr.View()) + return rawfile.NonBlockingWrite(e.fds[0], hdr.View()) } - return rawfile.NonBlockingWrite3(e.fd, hdr.View(), payload.ToView(), nil) + return rawfile.NonBlockingWrite3(e.fds[0], hdr.View(), payload.ToView(), nil) } // WriteRawPacket writes a raw packet directly to the file descriptor. func (e *endpoint) WriteRawPacket(dest tcpip.Address, packet []byte) *tcpip.Error { - return rawfile.NonBlockingWrite(e.fd, packet) + return rawfile.NonBlockingWrite(e.fds[0], packet) } // dispatchLoop reads packets from the file descriptor in a loop and dispatches // them to the network stack. -func (e *endpoint) dispatchLoop() *tcpip.Error { +func (e *endpoint) dispatchLoop(inboundDispatcher linkDispatcher) *tcpip.Error { for { - cont, err := e.inboundDispatcher.dispatch() + cont, err := inboundDispatcher.dispatch() if err != nil || !cont { if e.closed != nil { e.closed(err) @@ -363,7 +439,7 @@ func NewInjectable(fd int, mtu uint32, capabilities stack.LinkEndpointCapabiliti syscall.SetNonblock(fd, true) e := &InjectableEndpoint{endpoint: endpoint{ - fd: fd, + fds: []int{fd}, mtu: mtu, caps: capabilities, }} diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go index fd1722074..ba3e09192 100644 --- a/pkg/tcpip/link/fdbased/endpoint_test.go +++ b/pkg/tcpip/link/fdbased/endpoint_test.go @@ -67,7 +67,7 @@ func newContext(t *testing.T, opt *Options) *context { done <- struct{}{} } - opt.FD = fds[1] + opt.FDs = []int{fds[1]} epID, err := New(opt) if err != nil { t.Fatalf("Failed to create FD endpoint: %v", err) diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go index fccabd554..98581e50e 100644 --- a/pkg/tcpip/link/sniffer/sniffer.go +++ b/pkg/tcpip/link/sniffer/sniffer.go @@ -118,7 +118,7 @@ func NewWithFile(lower tcpip.LinkEndpointID, file *os.File, snapLen uint32) (tcp // logs the packet before forwarding to the actual dispatcher. func (e *endpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { if atomic.LoadUint32(&LogPackets) == 1 && e.file == nil { - logPacket("recv", protocol, vv.First()) + logPacket("recv", protocol, vv.First(), nil) } if e.file != nil && atomic.LoadUint32(&LogPacketsToFile) == 1 { vs := vv.Views() @@ -198,7 +198,7 @@ func (e *endpoint) GSOMaxSize() uint32 { // the request to the lower endpoint. func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { if atomic.LoadUint32(&LogPackets) == 1 && e.file == nil { - logPacket("send", protocol, hdr.View()) + logPacket("send", protocol, hdr.View(), gso) } if e.file != nil && atomic.LoadUint32(&LogPacketsToFile) == 1 { hdrBuf := hdr.View() @@ -240,7 +240,7 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen return e.lower.WritePacket(r, gso, hdr, payload, protocol) } -func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.View) { +func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.View, gso *stack.GSO) { // Figure out the network layer info. var transProto uint8 src := tcpip.Address("unknown") @@ -404,5 +404,9 @@ func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.Vie return } + if gso != nil { + details += fmt.Sprintf(" gso: %+v", gso) + } + log.Infof("%s %s %v:%v -> %v:%v len:%d id:%04x %s", prefix, transName, src, srcPort, dst, dstPort, size, id, details) } |