diff options
Diffstat (limited to 'pkg/tcpip')
-rw-r--r-- | pkg/tcpip/adapters/gonet/gonet.go | 10 | ||||
-rw-r--r-- | pkg/tcpip/link/fdbased/endpoint.go | 39 | ||||
-rw-r--r-- | pkg/tcpip/link/tun/device.go | 8 | ||||
-rw-r--r-- | pkg/tcpip/transport/icmp/endpoint.go | 12 | ||||
-rw-r--r-- | pkg/tcpip/transport/packet/endpoint.go | 10 | ||||
-rw-r--r-- | pkg/tcpip/transport/raw/endpoint.go | 10 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/accept.go | 6 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/connect.go | 8 | ||||
-rw-r--r-- | pkg/tcpip/transport/tcp/endpoint.go | 22 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 12 |
10 files changed, 80 insertions, 57 deletions
diff --git a/pkg/tcpip/adapters/gonet/gonet.go b/pkg/tcpip/adapters/gonet/gonet.go index c188aaa18..010e2e833 100644 --- a/pkg/tcpip/adapters/gonet/gonet.go +++ b/pkg/tcpip/adapters/gonet/gonet.go @@ -251,7 +251,7 @@ func (l *TCPListener) Accept() (net.Conn, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - l.wq.EventRegister(&waitEntry, waiter.EventIn) + l.wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer l.wq.EventUnregister(&waitEntry) for { @@ -301,7 +301,7 @@ func commonRead(b []byte, ep tcpip.Endpoint, wq *waiter.Queue, deadline <-chan s if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { res, err = ep.Read(&w, opts) @@ -382,7 +382,7 @@ func (c *TCPConn) Write(b []byte) (int, error) { if ch == nil { entry, ch = waiter.NewChannelEntry(nil) - c.wq.EventRegister(&entry, waiter.EventOut) + c.wq.EventRegister(&entry, waiter.WritableEvents) defer c.wq.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data @@ -485,7 +485,7 @@ func DialContextTCP(ctx context.Context, s *stack.Stack, addr tcpip.FullAddress, // // We do this unconditionally as Connect will always return an error. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) select { @@ -652,7 +652,7 @@ func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&waitEntry, waiter.EventOut) + c.wq.EventRegister(&waitEntry, waiter.WritableEvents) defer c.wq.EventUnregister(&waitEntry) for { select { diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go index 2bb1be5d6..6be945116 100644 --- a/pkg/tcpip/link/fdbased/endpoint.go +++ b/pkg/tcpip/link/fdbased/endpoint.go @@ -41,6 +41,8 @@ package fdbased import ( "fmt" + "math" + "sync/atomic" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/binary" @@ -188,7 +190,9 @@ type Options struct { // set of FD's that point to the same NIC. Trying to set the PACKET_FANOUT // option for an FD with a fanoutID already in use by another FD for a different // NIC will return an EINVAL. -var fanoutID = 1 +// +// Must be accessed using atomic operations. +var fanoutID int32 = 0 // New creates a new fd-based endpoint. // @@ -233,6 +237,10 @@ func New(opts *Options) (stack.LinkEndpoint, error) { packetDispatchMode: opts.PacketDispatchMode, } + // Increment fanoutID to ensure that we don't re-use the same fanoutID for + // the next endpoint. + fid := atomic.AddInt32(&fanoutID, 1) + // Create per channel dispatchers. for i := 0; i < len(e.fds); i++ { fd := e.fds[i] @@ -254,21 +262,17 @@ func New(opts *Options) (stack.LinkEndpoint, error) { e.gsoMaxSize = opts.GSOMaxSize } } - inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket) + inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket, fid) if err != nil { return nil, fmt.Errorf("createInboundDispatcher(...) = %v", err) } e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher) } - // Increment fanoutID to ensure that we don't re-use the same fanoutID for - // the next endpoint. - fanoutID++ - return e, nil } -func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher, error) { +func createInboundDispatcher(e *endpoint, fd int, isSocket bool, fID int32) (linkDispatcher, error) { // By default use the readv() dispatcher as it works with all kinds of // FDs (tap/tun/unix domain sockets and af_packet). inboundDispatcher, err := newReadVDispatcher(fd, e) @@ -283,13 +287,32 @@ func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher } switch sa.(type) { case *unix.SockaddrLinklayer: + // See: PACKET_FANOUT_MAX in net/packet/internal.h + const packetFanoutMax = 1 << 16 + if fID > packetFanoutMax { + return nil, fmt.Errorf("host fanoutID limit exceeded, fanoutID must be <= %d", math.MaxUint16) + } // Enable PACKET_FANOUT mode if the underlying socket is of type // AF_PACKET. We do not enable PACKET_FANOUT_FLAG_DEFRAG as that will // prevent gvisor from receiving fragmented packets and the host does the // reassembly on our behalf before delivering the fragments. This makes it // hard to test fragmentation reassembly code in Netstack. + // + // See: include/uapi/linux/if_packet.h (struct fanout_args). + // + // NOTE: We are using SetSockOptInt here even though the underlying + // option is actually a struct. The code follows the example in the + // kernel documentation as described at the link below: + // + // See: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt + // + // This works out because the actual implementation for the option zero + // initializes the structure and will initialize the max_members field + // to a proper value if zero. + // + // See: https://github.com/torvalds/linux/blob/7acac4b3196caee5e21fb5ea53f8bc124e6a16fc/net/packet/af_packet.c#L3881 const fanoutType = unix.PACKET_FANOUT_HASH - fanoutArg := fanoutID | fanoutType<<16 + fanoutArg := int(fID) | fanoutType<<16 if err := unix.SetsockoptInt(fd, unix.SOL_PACKET, unix.PACKET_FANOUT, fanoutArg); err != nil { return nil, fmt.Errorf("failed to enable PACKET_FANOUT option: %v", err) } diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index 80fb343c5..36af2a029 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -309,20 +309,20 @@ func (d *Device) Flags() Flags { // Readiness implements watier.Waitable.Readiness. func (d *Device) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 { + if mask&waiter.ReadableEvents != 0 { d.mu.RLock() endpoint := d.endpoint d.mu.RUnlock() if endpoint != nil && endpoint.NumQueued() == 0 { - mask &= ^waiter.EventIn + mask &= ^waiter.ReadableEvents } } - return mask & (waiter.EventIn | waiter.EventOut) + return mask & (waiter.ReadableEvents | waiter.WritableEvents) } // WriteNotify implements channel.Notification.WriteNotify. func (d *Device) WriteNotify() { - d.Notify(waiter.EventIn) + d.Notify(waiter.ReadableEvents) } // tunEndpoint is the link endpoint for the NIC created by the tun device. diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 1dce35c63..50991c3c0 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -149,7 +149,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -588,7 +588,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -725,13 +725,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -804,7 +804,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB e.stats.PacketsReceived.Increment() // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index 367757d3b..52ed9560c 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -152,7 +152,7 @@ func (ep *endpoint) Close() { ep.closed = true ep.bound = false - ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -287,13 +287,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { ep.rcvMu.Lock() if !ep.rcvList.Empty() || ep.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } ep.rcvMu.Unlock() } @@ -483,7 +483,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress, ep.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - ep.waiterQueue.Notify(waiter.EventIn) + ep.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 4b2f08379..e27a249cd 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -177,7 +177,7 @@ func (e *endpoint) Close() { e.closed = true - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -486,13 +486,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -655,7 +655,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) { e.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 0a2f3291c..025b134e2 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -410,7 +410,7 @@ func (e *endpoint) deliverAccepted(n *endpoint, withSynCookie bool) { atomic.AddInt32(&e.synRcvdCount, -1) } e.acceptMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) return default: e.acceptCond.Wait() @@ -462,7 +462,7 @@ func (e *endpoint) reserveTupleLocked() bool { // can't really have any registered waiters except when stack.Wait() is called // which waits for all registered endpoints to stop and expects an EventHUp. func (e *endpoint) notifyAborted() { - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // handleSynSegment is called in its own goroutine once the listening endpoint @@ -771,7 +771,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.drainClosingSegmentQueue() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) }() var s sleep.Sleeper diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index b32fe2fb1..a9e978cf6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1320,7 +1320,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.drainClosingSegmentQueue() // When the protocol loop exits we should wake up our waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } if handshake { @@ -1495,7 +1495,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ } // Tell waiters that the endpoint is connected and writable. - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) // The following assertions and notifications are needed for restored // endpoints. Fresh newly created endpoints have empty states and should @@ -1506,7 +1506,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.rcvListMu.Lock() if !e.rcvList.Empty() { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } e.rcvListMu.Unlock() @@ -1570,7 +1570,7 @@ loop: // wakers. s.Done() // Wake up any waiters before we enter TIME_WAIT. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.workerCleanup = true reuseTW = e.doTimeWait() } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 0a5e9cbb4..c5daba232 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -960,30 +960,30 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { case StateListen: // Check if there's anything in the accepted channel. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() if len(e.acceptedChan) > 0 { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.acceptMu.Unlock() } } if e.EndpointState().connected() { // Determine if the endpoint is writable if requested. - if (mask & waiter.EventOut) != 0 { + if (mask & waiter.WritableEvents) != 0 { e.sndBufMu.Lock() sndBufSize := e.getSendBufferSize() if e.sndClosed || e.sndBufUsed < sndBufSize { - result |= waiter.EventOut + result |= waiter.WritableEvents } e.sndBufMu.Unlock() } // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvListMu.Lock() if e.rcvBufUsed > 0 || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvListMu.Unlock() } @@ -1121,7 +1121,7 @@ func (e *endpoint) closeNoShutdownLocked() { return } - eventMask := waiter.EventIn | waiter.EventOut + eventMask := waiter.ReadableEvents | waiter.WritableEvents // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. if e.workerRunning { @@ -2133,7 +2133,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error { if err != nil { if !err.IgnoreStats() { // Connect failed. Let's wake up any waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.stack.Stats().TCP.FailedConnectionAttempts.Increment() e.stats.FailedConnectionAttempts.Increment() } @@ -2463,7 +2463,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvListMu.Unlock() e.closePendingAcceptableConnectionsLocked() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) } return nil default: @@ -2811,7 +2811,7 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndBufMu.Unlock() if notify { - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -2828,7 +2828,7 @@ func (e *endpoint) readyToRead(s *segment) { e.rcvClosed = true } e.rcvListMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } // receiveBufferAvailableLocked calculates how many bytes are still available diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0f59181bb..956da0e0c 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -284,7 +284,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -1070,7 +1070,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -1234,13 +1234,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if mask&waiter.ReadableEvents != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -1349,7 +1349,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } |