diff options
author | Bhasker Hariharan <bhaskerh@google.com> | 2021-03-24 12:08:24 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-03-24 12:11:44 -0700 |
commit | e7ca2a51a89a8ff2c9f5adfdfa5b51be1b3faeb3 (patch) | |
tree | 1abf748d2755526978f560abb67f29b6f83496c7 /pkg/tcpip | |
parent | 72ff6a1cac6ab35132b4f79b1149590e103e5291 (diff) |
Add POLLRDNORM/POLLWRNORM support.
On Linux these are meant to be equivalent to POLLIN/POLLOUT. Rather
than hack these on in sys_poll etc it felt cleaner to just cleanup
the call sites to notify for both events. This is what linux does
as well.
Fixes #5544
PiperOrigin-RevId: 364859977
Diffstat (limited to 'pkg/tcpip')
26 files changed, 122 insertions, 122 deletions
diff --git a/pkg/tcpip/adapters/gonet/gonet.go b/pkg/tcpip/adapters/gonet/gonet.go index c188aaa18..010e2e833 100644 --- a/pkg/tcpip/adapters/gonet/gonet.go +++ b/pkg/tcpip/adapters/gonet/gonet.go @@ -251,7 +251,7 @@ func (l *TCPListener) Accept() (net.Conn, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - l.wq.EventRegister(&waitEntry, waiter.EventIn) + l.wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer l.wq.EventUnregister(&waitEntry) for { @@ -301,7 +301,7 @@ func commonRead(b []byte, ep tcpip.Endpoint, wq *waiter.Queue, deadline <-chan s if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { res, err = ep.Read(&w, opts) @@ -382,7 +382,7 @@ func (c *TCPConn) Write(b []byte) (int, error) { if ch == nil { entry, ch = waiter.NewChannelEntry(nil) - c.wq.EventRegister(&entry, waiter.EventOut) + c.wq.EventRegister(&entry, waiter.WritableEvents) defer c.wq.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data @@ -485,7 +485,7 @@ func DialContextTCP(ctx context.Context, s *stack.Stack, addr tcpip.FullAddress, // // We do this unconditionally as Connect will always return an error. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) select { @@ -652,7 +652,7 @@ func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&waitEntry, waiter.EventOut) + c.wq.EventRegister(&waitEntry, waiter.WritableEvents) defer c.wq.EventUnregister(&waitEntry) for { select { diff --git a/pkg/tcpip/adapters/gonet/gonet_test.go b/pkg/tcpip/adapters/gonet/gonet_test.go index 2b3ea4bdf..48b24692b 100644 --- a/pkg/tcpip/adapters/gonet/gonet_test.go +++ b/pkg/tcpip/adapters/gonet/gonet_test.go @@ -102,7 +102,7 @@ func connect(s *stack.Stack, addr tcpip.FullAddress) (*testConnection, tcpip.Err } entry, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&entry, waiter.EventOut) + wq.EventRegister(&entry, waiter.WritableEvents) err = ep.Connect(addr) if _, ok := err.(*tcpip.ErrConnectStarted); ok { @@ -114,7 +114,7 @@ func connect(s *stack.Stack, addr tcpip.FullAddress) (*testConnection, tcpip.Err } wq.EventUnregister(&entry) - wq.EventRegister(&entry, waiter.EventIn) + wq.EventRegister(&entry, waiter.ReadableEvents) return &testConnection{wq, &entry, ch, ep}, nil } diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index 80fb343c5..36af2a029 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -309,20 +309,20 @@ func (d *Device) Flags() Flags { // Readiness implements watier.Waitable.Readiness. func (d *Device) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 { + if mask&waiter.ReadableEvents != 0 { d.mu.RLock() endpoint := d.endpoint d.mu.RUnlock() if endpoint != nil && endpoint.NumQueued() == 0 { - mask &= ^waiter.EventIn + mask &= ^waiter.ReadableEvents } } - return mask & (waiter.EventIn | waiter.EventOut) + return mask & (waiter.ReadableEvents | waiter.WritableEvents) } // WriteNotify implements channel.Notification.WriteNotify. func (d *Device) WriteNotify() { - d.Notify(waiter.EventIn) + d.Notify(waiter.ReadableEvents) } // tunEndpoint is the link endpoint for the NIC created by the tun device. diff --git a/pkg/tcpip/network/ipv4/ipv4_test.go b/pkg/tcpip/network/ipv4/ipv4_test.go index cfed241bf..eba91c68c 100644 --- a/pkg/tcpip/network/ipv4/ipv4_test.go +++ b/pkg/tcpip/network/ipv4/ipv4_test.go @@ -2514,7 +2514,7 @@ func TestReceiveFragments(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, header.IPv4ProtocolNumber, &wq) diff --git a/pkg/tcpip/network/ipv6/ipv6_test.go b/pkg/tcpip/network/ipv6/ipv6_test.go index 81f5f23c3..c206cebeb 100644 --- a/pkg/tcpip/network/ipv6/ipv6_test.go +++ b/pkg/tcpip/network/ipv6/ipv6_test.go @@ -101,7 +101,7 @@ func testReceiveUDP(t *testing.T, s *stack.Stack, e *channel.Endpoint, src, dst wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) @@ -912,7 +912,7 @@ func TestReceiveIPv6ExtHdrs(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.WritableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, ProtocolNumber, &wq) @@ -1998,7 +1998,7 @@ func TestReceiveIPv6Fragments(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, ProtocolNumber, &wq) diff --git a/pkg/tcpip/sample/tun_tcp_connect/main.go b/pkg/tcpip/sample/tun_tcp_connect/main.go index 856ea998d..b9a24ff56 100644 --- a/pkg/tcpip/sample/tun_tcp_connect/main.go +++ b/pkg/tcpip/sample/tun_tcp_connect/main.go @@ -173,7 +173,7 @@ func main() { // Issue connect request and wait for it to complete. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) terr := ep.Connect(remote) if _, ok := terr.(*tcpip.ErrConnectStarted); ok { fmt.Println("Connect is pending...") @@ -194,7 +194,7 @@ func main() { // Read data and write to standard output until the peer closes the // connection from its side. - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) for { _, err := ep.Read(os.Stdout, tcpip.ReadOptions{}) if err != nil { diff --git a/pkg/tcpip/sample/tun_tcp_echo/main.go b/pkg/tcpip/sample/tun_tcp_echo/main.go index 9b23df3a9..ef1bfc186 100644 --- a/pkg/tcpip/sample/tun_tcp_echo/main.go +++ b/pkg/tcpip/sample/tun_tcp_echo/main.go @@ -79,7 +79,7 @@ func echo(wq *waiter.Queue, ep tcpip.Endpoint) { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) w := endpointWriter{ @@ -211,7 +211,7 @@ func main() { // Wait for connections to appear. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { diff --git a/pkg/tcpip/stack/ndp_test.go b/pkg/tcpip/stack/ndp_test.go index 0725e028b..14124ae66 100644 --- a/pkg/tcpip/stack/ndp_test.go +++ b/pkg/tcpip/stack/ndp_test.go @@ -2909,7 +2909,7 @@ func addrForNewConnectionTo(t *testing.T, s *stack.Stack, addr tcpip.FullAddress wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) @@ -2943,7 +2943,7 @@ func addrForNewConnectionWithAddr(t *testing.T, s *stack.Stack, addr tcpip.FullA wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) @@ -3272,7 +3272,7 @@ func TestAutoGenAddrJobDeprecation(t *testing.T) { } wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) diff --git a/pkg/tcpip/stack/transport_demuxer_test.go b/pkg/tcpip/stack/transport_demuxer_test.go index c1c6cbccd..4848495c9 100644 --- a/pkg/tcpip/stack/transport_demuxer_test.go +++ b/pkg/tcpip/stack/transport_demuxer_test.go @@ -290,7 +290,7 @@ func TestBindToDeviceDistribution(t *testing.T) { // Try to receive the data. wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) diff --git a/pkg/tcpip/tests/integration/forward_test.go b/pkg/tcpip/tests/integration/forward_test.go index 38c2f321b..d10ae05c2 100644 --- a/pkg/tcpip/tests/integration/forward_test.go +++ b/pkg/tcpip/tests/integration/forward_test.go @@ -48,7 +48,7 @@ func TestForwarding(t *testing.T) { t.Helper() var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(transProto, netProto, &wq) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d, _): %s", transProto, netProto, err) @@ -184,7 +184,7 @@ func TestForwarding(t *testing.T) { } we, newCH := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) return newEP, newCH } }, diff --git a/pkg/tcpip/tests/integration/link_resolution_test.go b/pkg/tcpip/tests/integration/link_resolution_test.go index 095623789..d39809e1c 100644 --- a/pkg/tcpip/tests/integration/link_resolution_test.go +++ b/pkg/tcpip/tests/integration/link_resolution_test.go @@ -151,7 +151,7 @@ func TestPing(t *testing.T) { var wq waiter.Queue we, waiterCH := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := host1Stack.NewEndpoint(test.transProto, test.netProto, &wq) if err != nil { t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", test.transProto, test.netProto, err) @@ -308,7 +308,7 @@ func TestTCPLinkResolutionFailure(t *testing.T) { var clientWQ waiter.Queue we, ch := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&we, waiter.EventOut|waiter.EventErr) + clientWQ.EventRegister(&we, waiter.WritableEvents|waiter.EventErr) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, test.netProto, &clientWQ) if err != nil { t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, test.netProto, err) @@ -641,7 +641,7 @@ func TestWritePacketsLinkResolution(t *testing.T) { var serverWQ waiter.Queue serverWE, serverCH := waiter.NewChannelEntry(nil) - serverWQ.EventRegister(&serverWE, waiter.EventIn) + serverWQ.EventRegister(&serverWE, waiter.ReadableEvents) serverEP, err := host2Stack.NewEndpoint(udp.ProtocolNumber, test.netProto, &serverWQ) if err != nil { t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", udp.ProtocolNumber, test.netProto, err) @@ -821,7 +821,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -845,7 +845,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -869,7 +869,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -893,7 +893,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -917,7 +917,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -942,7 +942,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -967,7 +967,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -992,7 +992,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() diff --git a/pkg/tcpip/tests/integration/loopback_test.go b/pkg/tcpip/tests/integration/loopback_test.go index 6462e9d42..2c538a43e 100644 --- a/pkg/tcpip/tests/integration/loopback_test.go +++ b/pkg/tcpip/tests/integration/loopback_test.go @@ -449,7 +449,7 @@ func TestLoopbackAcceptAllInSubnetTCP(t *testing.T) { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) listeningEndpoint, err := s.NewEndpoint(tcp.ProtocolNumber, test.addAddress.Protocol, &wq) if err != nil { diff --git a/pkg/tcpip/tests/integration/multicast_broadcast_test.go b/pkg/tcpip/tests/integration/multicast_broadcast_test.go index 77f4a88ec..c6a9c2393 100644 --- a/pkg/tcpip/tests/integration/multicast_broadcast_test.go +++ b/pkg/tcpip/tests/integration/multicast_broadcast_test.go @@ -498,7 +498,7 @@ func TestReuseAddrAndBroadcast(t *testing.T) { for i := 0; i < 2; i++ { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(udp.ProtocolNumber, ipv4.ProtocolNumber, &wq) if err != nil { t.Fatalf("(eps[%d]) NewEndpoint(%d, %d, _): %s", len(eps), udp.ProtocolNumber, ipv4.ProtocolNumber, err) diff --git a/pkg/tcpip/tests/integration/route_test.go b/pkg/tcpip/tests/integration/route_test.go index ed499179f..78244f4eb 100644 --- a/pkg/tcpip/tests/integration/route_test.go +++ b/pkg/tcpip/tests/integration/route_test.go @@ -189,7 +189,7 @@ func TestLocalPing(t *testing.T) { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(test.transProto, test.netProto, &wq) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d, _): %s", test.transProto, test.netProto, err) @@ -311,7 +311,7 @@ func TestLocalUDP(t *testing.T) { var serverWQ waiter.Queue serverWE, serverCH := waiter.NewChannelEntry(nil) - serverWQ.EventRegister(&serverWE, waiter.EventIn) + serverWQ.EventRegister(&serverWE, waiter.ReadableEvents) server, err := s.NewEndpoint(udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, &serverWQ) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d): %s", udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, err) @@ -325,7 +325,7 @@ func TestLocalUDP(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventIn) + clientWQ.EventRegister(&clientWE, waiter.ReadableEvents) client, err := s.NewEndpoint(udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, &clientWQ) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d): %s", udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, err) diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 1dce35c63..50991c3c0 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -149,7 +149,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -588,7 +588,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -725,13 +725,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -804,7 +804,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB e.stats.PacketsReceived.Increment() // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index 367757d3b..52ed9560c 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -152,7 +152,7 @@ func (ep *endpoint) Close() { ep.closed = true ep.bound = false - ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -287,13 +287,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { ep.rcvMu.Lock() if !ep.rcvList.Empty() || ep.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } ep.rcvMu.Unlock() } @@ -483,7 +483,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress, ep.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - ep.waiterQueue.Notify(waiter.EventIn) + ep.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 4b2f08379..e27a249cd 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -177,7 +177,7 @@ func (e *endpoint) Close() { e.closed = true - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -486,13 +486,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -655,7 +655,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) { e.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 0a2f3291c..025b134e2 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -410,7 +410,7 @@ func (e *endpoint) deliverAccepted(n *endpoint, withSynCookie bool) { atomic.AddInt32(&e.synRcvdCount, -1) } e.acceptMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) return default: e.acceptCond.Wait() @@ -462,7 +462,7 @@ func (e *endpoint) reserveTupleLocked() bool { // can't really have any registered waiters except when stack.Wait() is called // which waits for all registered endpoints to stop and expects an EventHUp. func (e *endpoint) notifyAborted() { - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // handleSynSegment is called in its own goroutine once the listening endpoint @@ -771,7 +771,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.drainClosingSegmentQueue() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) }() var s sleep.Sleeper diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index b32fe2fb1..a9e978cf6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1320,7 +1320,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.drainClosingSegmentQueue() // When the protocol loop exits we should wake up our waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } if handshake { @@ -1495,7 +1495,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ } // Tell waiters that the endpoint is connected and writable. - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) // The following assertions and notifications are needed for restored // endpoints. Fresh newly created endpoints have empty states and should @@ -1506,7 +1506,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.rcvListMu.Lock() if !e.rcvList.Empty() { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } e.rcvListMu.Unlock() @@ -1570,7 +1570,7 @@ loop: // wakers. s.Done() // Wake up any waiters before we enter TIME_WAIT. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.workerCleanup = true reuseTW = e.doTimeWait() } diff --git a/pkg/tcpip/transport/tcp/dual_stack_test.go b/pkg/tcpip/transport/tcp/dual_stack_test.go index 2d90246e4..f6a16f96e 100644 --- a/pkg/tcpip/transport/tcp/dual_stack_test.go +++ b/pkg/tcpip/transport/tcp/dual_stack_test.go @@ -45,7 +45,7 @@ func TestV4MappedConnectOnV6Only(t *testing.T) { func testV4Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) { // Start connection attempt. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV4MappedAddr, Port: context.TestPort}) @@ -152,7 +152,7 @@ func TestV4ConnectWhenBoundToV4Mapped(t *testing.T) { func testV6Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) { // Start connection attempt to IPv6 address. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV6Addr, Port: context.TestPort}) @@ -387,7 +387,7 @@ func testV4Accept(t *testing.T, c *context.Context) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) nep, _, err := c.EP.Accept(nil) @@ -521,7 +521,7 @@ func TestV6AcceptOnV6(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) var addr tcpip.FullAddress _, _, err := c.EP.Accept(&addr) @@ -610,7 +610,7 @@ func testV4ListenClose(t *testing.T, c *context.Context) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) nep, _, err := c.EP.Accept(nil) if _, ok := err.(*tcpip.ErrWouldBlock); ok { diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 0a5e9cbb4..c5daba232 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -960,30 +960,30 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { case StateListen: // Check if there's anything in the accepted channel. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() if len(e.acceptedChan) > 0 { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.acceptMu.Unlock() } } if e.EndpointState().connected() { // Determine if the endpoint is writable if requested. - if (mask & waiter.EventOut) != 0 { + if (mask & waiter.WritableEvents) != 0 { e.sndBufMu.Lock() sndBufSize := e.getSendBufferSize() if e.sndClosed || e.sndBufUsed < sndBufSize { - result |= waiter.EventOut + result |= waiter.WritableEvents } e.sndBufMu.Unlock() } // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvListMu.Lock() if e.rcvBufUsed > 0 || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvListMu.Unlock() } @@ -1121,7 +1121,7 @@ func (e *endpoint) closeNoShutdownLocked() { return } - eventMask := waiter.EventIn | waiter.EventOut + eventMask := waiter.ReadableEvents | waiter.WritableEvents // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. if e.workerRunning { @@ -2133,7 +2133,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error { if err != nil { if !err.IgnoreStats() { // Connect failed. Let's wake up any waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.stack.Stats().TCP.FailedConnectionAttempts.Increment() e.stats.FailedConnectionAttempts.Increment() } @@ -2463,7 +2463,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvListMu.Unlock() e.closePendingAcceptableConnectionsLocked() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) } return nil default: @@ -2811,7 +2811,7 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndBufMu.Unlock() if notify { - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -2828,7 +2828,7 @@ func (e *endpoint) readyToRead(s *segment) { e.rcvClosed = true } e.rcvListMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } // receiveBufferAvailableLocked calculates how many bytes are still available diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 6c86ae1ae..9c23469f2 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -388,7 +388,7 @@ func TestTCPResetSentForACKWhenNotUsingSynCookies(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -809,7 +809,7 @@ func TestSimpleReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1315,7 +1315,7 @@ func TestListenCloseWhileConnect(t *testing.T) { } waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventIn) + c.WQ.EventRegister(&waitEntry, waiter.ReadableEvents) defer c.WQ.EventUnregister(&waitEntry) executeHandshake(t, c, context.TestPort, false /* synCookiesInUse */) @@ -1455,7 +1455,7 @@ func TestConnectBindToDevice(t *testing.T) { } // Start connection attempt. waitEntry, _ := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestAddr, Port: context.TestPort}) @@ -1590,7 +1590,7 @@ func TestOutOfOrderReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1732,7 +1732,7 @@ func TestRstOnCloseWithUnreadData(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1801,7 +1801,7 @@ func TestRstOnCloseWithUnreadDataFinConvertRst(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1909,7 +1909,7 @@ func TestFullWindowReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, rcvBufSz) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -2069,7 +2069,7 @@ func TestNoWindowShrinking(t *testing.T) { }) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -2391,7 +2391,7 @@ func TestScaledWindowAccept(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -2465,7 +2465,7 @@ func TestNonScaledWindowAccept(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3059,7 +3059,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3115,7 +3115,7 @@ func TestSynCookiePassiveSendMSSLessThanMTU(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3191,7 +3191,7 @@ func TestSynOptionsOnActiveConnect(t *testing.T) { // Start connection attempt. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) { @@ -3304,7 +3304,7 @@ func TestReceiveOnResetConnection(t *testing.T) { // Try to read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) loop: @@ -4232,7 +4232,7 @@ func TestReadAfterClosedState(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -4660,7 +4660,7 @@ func TestSelfConnect(t *testing.T) { // Register for notification, then start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) { @@ -4685,7 +4685,7 @@ func TestSelfConnect(t *testing.T) { // Read back what was written. wq.EventUnregister(&waitEntry) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) ept := endpointTester{ep} rd := ept.CheckReadFull(t, len(data), notifyCh, 5*time.Second) @@ -5382,7 +5382,7 @@ func TestListenBacklogFull(t *testing.T) { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) for i := 0; i < listenBacklog; i++ { @@ -5730,7 +5730,7 @@ func TestListenSynRcvdQueueFull(t *testing.T) { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) newEP, _, err := c.EP.Accept(nil) @@ -5807,7 +5807,7 @@ func TestListenBacklogFullSynCookieInUse(t *testing.T) { // Verify that there is only one acceptable connection at this point. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) _, _, err = c.EP.Accept(nil) @@ -5969,7 +5969,7 @@ func TestSynRcvdBadSeqNumber(t *testing.T) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Wait for connection to be established. @@ -6029,7 +6029,7 @@ func TestPassiveConnectionAttemptIncrement(t *testing.T) { executeHandshake(t, c, srcPort+1, false) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Verify that there is only one acceptable connection at this point. @@ -6099,7 +6099,7 @@ func TestPassiveFailedConnectionAttemptIncrement(t *testing.T) { } we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Now check that there is one acceptable connections. @@ -6152,7 +6152,7 @@ func TestEndpointBindListenAcceptState(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) aep, _, err := ep.Accept(nil) @@ -6614,7 +6614,7 @@ func TestTCPTimeWaitRSTIgnored(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -6733,7 +6733,7 @@ func TestTCPTimeWaitOutOfOrder(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -6840,7 +6840,7 @@ func TestTCPTimeWaitNewSyn(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -7004,7 +7004,7 @@ func TestTCPTimeWaitDuplicateFINExtendsTimeWait(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -7154,7 +7154,7 @@ func TestTCPCloseWithData(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go index cb4f82903..2949588ce 100644 --- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go @@ -46,7 +46,7 @@ func TestTimeStampEnabledConnect(t *testing.T) { // Register for read and validate that we have data to read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // The following tests ensure that TS option once enabled behaves @@ -273,7 +273,7 @@ func TestSegmentNotDroppedWhenTimestampMissing(t *testing.T) { // Register for read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) droppedPacketsStat := c.Stack().Stats().DroppedPackets diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 2f1c1011d..e73f90bb0 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -686,7 +686,7 @@ func (c *Context) Connect(iss seqnum.Value, rcvWnd seqnum.Size, options []byte) // Start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) err := c.EP.Connect(tcpip.FullAddress{Addr: TestAddr, Port: TestPort}) @@ -899,7 +899,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * // Start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) testFullAddr := tcpip.FullAddress{Addr: TestAddr, Port: TestPort} @@ -1051,7 +1051,7 @@ func (c *Context) AcceptWithOptions(wndScale int, synOptions header.TCPSynOption // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0f59181bb..956da0e0c 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -284,7 +284,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -1070,7 +1070,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -1234,13 +1234,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if mask&waiter.ReadableEvents != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -1349,7 +1349,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go index c8126b51b..77ca70a04 100644 --- a/pkg/tcpip/transport/udp/udp_test.go +++ b/pkg/tcpip/transport/udp/udp_test.go @@ -591,7 +591,7 @@ func testReadInternal(c *testContext, flow testFlow, packetShouldBeDropped, expe // Try to receive the data. we, ch := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&we, waiter.EventIn) + c.wq.EventRegister(&we, waiter.ReadableEvents) defer c.wq.EventUnregister(&we) // Take a snapshot of the stats to validate them at the end of the test. |