summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/transport
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/transport')
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go12
-rw-r--r--pkg/tcpip/transport/packet/endpoint.go10
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go10
-rw-r--r--pkg/tcpip/transport/tcp/accept.go6
-rw-r--r--pkg/tcpip/transport/tcp/connect.go8
-rw-r--r--pkg/tcpip/transport/tcp/dual_stack_test.go10
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go22
-rw-r--r--pkg/tcpip/transport/tcp/tcp_test.go60
-rw-r--r--pkg/tcpip/transport/tcp/tcp_timestamp_test.go4
-rw-r--r--pkg/tcpip/transport/tcp/testing/context/context.go6
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go12
-rw-r--r--pkg/tcpip/transport/udp/udp_test.go2
12 files changed, 81 insertions, 81 deletions
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index 1dce35c63..50991c3c0 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -149,7 +149,7 @@ func (e *endpoint) Close() {
e.mu.Unlock()
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
@@ -588,7 +588,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error {
e.rcvMu.Unlock()
if !wasClosed {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
}
@@ -725,13 +725,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) {
// waiter.EventIn is set, the endpoint is immediately readable.
func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// The endpoint is always writable.
- result := waiter.EventOut & mask
+ result := waiter.WritableEvents & mask
// Determine if the endpoint is readable if requested.
- if (mask & waiter.EventIn) != 0 {
+ if (mask & waiter.ReadableEvents) != 0 {
e.rcvMu.Lock()
if !e.rcvList.Empty() || e.rcvClosed {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
e.rcvMu.Unlock()
}
@@ -804,7 +804,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB
e.stats.PacketsReceived.Increment()
// Notify any waiters that there's data to be read now.
if wasEmpty {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
}
diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go
index 367757d3b..52ed9560c 100644
--- a/pkg/tcpip/transport/packet/endpoint.go
+++ b/pkg/tcpip/transport/packet/endpoint.go
@@ -152,7 +152,7 @@ func (ep *endpoint) Close() {
ep.closed = true
ep.bound = false
- ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
@@ -287,13 +287,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) {
// Readiness implements tcpip.Endpoint.Readiness.
func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// The endpoint is always writable.
- result := waiter.EventOut & mask
+ result := waiter.WritableEvents & mask
// Determine whether the endpoint is readable.
- if (mask & waiter.EventIn) != 0 {
+ if (mask & waiter.ReadableEvents) != 0 {
ep.rcvMu.Lock()
if !ep.rcvList.Empty() || ep.rcvClosed {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
ep.rcvMu.Unlock()
}
@@ -483,7 +483,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress,
ep.stats.PacketsReceived.Increment()
// Notify waiters that there's data to be read.
if wasEmpty {
- ep.waiterQueue.Notify(waiter.EventIn)
+ ep.waiterQueue.Notify(waiter.ReadableEvents)
}
}
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index 4b2f08379..e27a249cd 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -177,7 +177,7 @@ func (e *endpoint) Close() {
e.closed = true
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
@@ -486,13 +486,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) {
// Readiness implements tcpip.Endpoint.Readiness.
func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// The endpoint is always writable.
- result := waiter.EventOut & mask
+ result := waiter.WritableEvents & mask
// Determine whether the endpoint is readable.
- if (mask & waiter.EventIn) != 0 {
+ if (mask & waiter.ReadableEvents) != 0 {
e.rcvMu.Lock()
if !e.rcvList.Empty() || e.rcvClosed {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
e.rcvMu.Unlock()
}
@@ -655,7 +655,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) {
e.stats.PacketsReceived.Increment()
// Notify waiters that there's data to be read.
if wasEmpty {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
}
diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go
index 0a2f3291c..025b134e2 100644
--- a/pkg/tcpip/transport/tcp/accept.go
+++ b/pkg/tcpip/transport/tcp/accept.go
@@ -410,7 +410,7 @@ func (e *endpoint) deliverAccepted(n *endpoint, withSynCookie bool) {
atomic.AddInt32(&e.synRcvdCount, -1)
}
e.acceptMu.Unlock()
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
return
default:
e.acceptCond.Wait()
@@ -462,7 +462,7 @@ func (e *endpoint) reserveTupleLocked() bool {
// can't really have any registered waiters except when stack.Wait() is called
// which waits for all registered endpoints to stop and expects an EventHUp.
func (e *endpoint) notifyAborted() {
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
// handleSynSegment is called in its own goroutine once the listening endpoint
@@ -771,7 +771,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) {
e.drainClosingSegmentQueue()
// Notify waiters that the endpoint is shutdown.
- e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr)
+ e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr)
}()
var s sleep.Sleeper
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index b32fe2fb1..a9e978cf6 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -1320,7 +1320,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.drainClosingSegmentQueue()
// When the protocol loop exits we should wake up our waiters.
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
if handshake {
@@ -1495,7 +1495,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}
// Tell waiters that the endpoint is connected and writable.
- e.waiterQueue.Notify(waiter.EventOut)
+ e.waiterQueue.Notify(waiter.WritableEvents)
// The following assertions and notifications are needed for restored
// endpoints. Fresh newly created endpoints have empty states and should
@@ -1506,7 +1506,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
e.rcvListMu.Lock()
if !e.rcvList.Empty() {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
e.rcvListMu.Unlock()
@@ -1570,7 +1570,7 @@ loop:
// wakers.
s.Done()
// Wake up any waiters before we enter TIME_WAIT.
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
e.workerCleanup = true
reuseTW = e.doTimeWait()
}
diff --git a/pkg/tcpip/transport/tcp/dual_stack_test.go b/pkg/tcpip/transport/tcp/dual_stack_test.go
index 2d90246e4..f6a16f96e 100644
--- a/pkg/tcpip/transport/tcp/dual_stack_test.go
+++ b/pkg/tcpip/transport/tcp/dual_stack_test.go
@@ -45,7 +45,7 @@ func TestV4MappedConnectOnV6Only(t *testing.T) {
func testV4Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) {
// Start connection attempt.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventOut)
+ c.WQ.EventRegister(&we, waiter.WritableEvents)
defer c.WQ.EventUnregister(&we)
err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV4MappedAddr, Port: context.TestPort})
@@ -152,7 +152,7 @@ func TestV4ConnectWhenBoundToV4Mapped(t *testing.T) {
func testV6Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) {
// Start connection attempt to IPv6 address.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventOut)
+ c.WQ.EventRegister(&we, waiter.WritableEvents)
defer c.WQ.EventUnregister(&we)
err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV6Addr, Port: context.TestPort})
@@ -387,7 +387,7 @@ func testV4Accept(t *testing.T, c *context.Context) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
nep, _, err := c.EP.Accept(nil)
@@ -521,7 +521,7 @@ func TestV6AcceptOnV6(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
var addr tcpip.FullAddress
_, _, err := c.EP.Accept(&addr)
@@ -610,7 +610,7 @@ func testV4ListenClose(t *testing.T, c *context.Context) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
nep, _, err := c.EP.Accept(nil)
if _, ok := err.(*tcpip.ErrWouldBlock); ok {
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 0a5e9cbb4..c5daba232 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -960,30 +960,30 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
case StateListen:
// Check if there's anything in the accepted channel.
- if (mask & waiter.EventIn) != 0 {
+ if (mask & waiter.ReadableEvents) != 0 {
e.acceptMu.Lock()
if len(e.acceptedChan) > 0 {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
e.acceptMu.Unlock()
}
}
if e.EndpointState().connected() {
// Determine if the endpoint is writable if requested.
- if (mask & waiter.EventOut) != 0 {
+ if (mask & waiter.WritableEvents) != 0 {
e.sndBufMu.Lock()
sndBufSize := e.getSendBufferSize()
if e.sndClosed || e.sndBufUsed < sndBufSize {
- result |= waiter.EventOut
+ result |= waiter.WritableEvents
}
e.sndBufMu.Unlock()
}
// Determine if the endpoint is readable if requested.
- if (mask & waiter.EventIn) != 0 {
+ if (mask & waiter.ReadableEvents) != 0 {
e.rcvListMu.Lock()
if e.rcvBufUsed > 0 || e.rcvClosed {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
e.rcvListMu.Unlock()
}
@@ -1121,7 +1121,7 @@ func (e *endpoint) closeNoShutdownLocked() {
return
}
- eventMask := waiter.EventIn | waiter.EventOut
+ eventMask := waiter.ReadableEvents | waiter.WritableEvents
// Either perform the local cleanup or kick the worker to make sure it
// knows it needs to cleanup.
if e.workerRunning {
@@ -2133,7 +2133,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error {
if err != nil {
if !err.IgnoreStats() {
// Connect failed. Let's wake up any waiters.
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
e.stack.Stats().TCP.FailedConnectionAttempts.Increment()
e.stats.FailedConnectionAttempts.Increment()
}
@@ -2463,7 +2463,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error {
e.rcvListMu.Unlock()
e.closePendingAcceptableConnectionsLocked()
// Notify waiters that the endpoint is shutdown.
- e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr)
+ e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr)
}
return nil
default:
@@ -2811,7 +2811,7 @@ func (e *endpoint) updateSndBufferUsage(v int) {
e.sndBufMu.Unlock()
if notify {
- e.waiterQueue.Notify(waiter.EventOut)
+ e.waiterQueue.Notify(waiter.WritableEvents)
}
}
@@ -2828,7 +2828,7 @@ func (e *endpoint) readyToRead(s *segment) {
e.rcvClosed = true
}
e.rcvListMu.Unlock()
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
// receiveBufferAvailableLocked calculates how many bytes are still available
diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go
index 6c86ae1ae..9c23469f2 100644
--- a/pkg/tcpip/transport/tcp/tcp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_test.go
@@ -388,7 +388,7 @@ func TestTCPResetSentForACKWhenNotUsingSynCookies(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -809,7 +809,7 @@ func TestSimpleReceive(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -1315,7 +1315,7 @@ func TestListenCloseWhileConnect(t *testing.T) {
}
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&waitEntry, waiter.EventIn)
+ c.WQ.EventRegister(&waitEntry, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&waitEntry)
executeHandshake(t, c, context.TestPort, false /* synCookiesInUse */)
@@ -1455,7 +1455,7 @@ func TestConnectBindToDevice(t *testing.T) {
}
// Start connection attempt.
waitEntry, _ := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&waitEntry, waiter.EventOut)
+ c.WQ.EventRegister(&waitEntry, waiter.WritableEvents)
defer c.WQ.EventUnregister(&waitEntry)
err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestAddr, Port: context.TestPort})
@@ -1590,7 +1590,7 @@ func TestOutOfOrderReceive(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -1732,7 +1732,7 @@ func TestRstOnCloseWithUnreadData(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -1801,7 +1801,7 @@ func TestRstOnCloseWithUnreadDataFinConvertRst(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -1909,7 +1909,7 @@ func TestFullWindowReceive(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, rcvBufSz)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -2069,7 +2069,7 @@ func TestNoWindowShrinking(t *testing.T) {
})
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -2391,7 +2391,7 @@ func TestScaledWindowAccept(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -2465,7 +2465,7 @@ func TestNonScaledWindowAccept(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -3059,7 +3059,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -3115,7 +3115,7 @@ func TestSynCookiePassiveSendMSSLessThanMTU(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -3191,7 +3191,7 @@ func TestSynOptionsOnActiveConnect(t *testing.T) {
// Start connection attempt.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventOut)
+ c.WQ.EventRegister(&we, waiter.WritableEvents)
defer c.WQ.EventUnregister(&we)
{
@@ -3304,7 +3304,7 @@ func TestReceiveOnResetConnection(t *testing.T) {
// Try to read.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
loop:
@@ -4232,7 +4232,7 @@ func TestReadAfterClosedState(t *testing.T) {
c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
ept := endpointTester{c.EP}
@@ -4660,7 +4660,7 @@ func TestSelfConnect(t *testing.T) {
// Register for notification, then start connection attempt.
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
- wq.EventRegister(&waitEntry, waiter.EventOut)
+ wq.EventRegister(&waitEntry, waiter.WritableEvents)
defer wq.EventUnregister(&waitEntry)
{
@@ -4685,7 +4685,7 @@ func TestSelfConnect(t *testing.T) {
// Read back what was written.
wq.EventUnregister(&waitEntry)
- wq.EventRegister(&waitEntry, waiter.EventIn)
+ wq.EventRegister(&waitEntry, waiter.ReadableEvents)
ept := endpointTester{ep}
rd := ept.CheckReadFull(t, len(data), notifyCh, 5*time.Second)
@@ -5382,7 +5382,7 @@ func TestListenBacklogFull(t *testing.T) {
// Try to accept the connections in the backlog.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
for i := 0; i < listenBacklog; i++ {
@@ -5730,7 +5730,7 @@ func TestListenSynRcvdQueueFull(t *testing.T) {
// Try to accept the connections in the backlog.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
newEP, _, err := c.EP.Accept(nil)
@@ -5807,7 +5807,7 @@ func TestListenBacklogFullSynCookieInUse(t *testing.T) {
// Verify that there is only one acceptable connection at this point.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
_, _, err = c.EP.Accept(nil)
@@ -5969,7 +5969,7 @@ func TestSynRcvdBadSeqNumber(t *testing.T) {
if _, ok := err.(*tcpip.ErrWouldBlock); ok {
// Try to accept the connections in the backlog.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
// Wait for connection to be established.
@@ -6029,7 +6029,7 @@ func TestPassiveConnectionAttemptIncrement(t *testing.T) {
executeHandshake(t, c, srcPort+1, false)
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
// Verify that there is only one acceptable connection at this point.
@@ -6099,7 +6099,7 @@ func TestPassiveFailedConnectionAttemptIncrement(t *testing.T) {
}
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
// Now check that there is one acceptable connections.
@@ -6152,7 +6152,7 @@ func TestEndpointBindListenAcceptState(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
aep, _, err := ep.Accept(nil)
@@ -6614,7 +6614,7 @@ func TestTCPTimeWaitRSTIgnored(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -6733,7 +6733,7 @@ func TestTCPTimeWaitOutOfOrder(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -6840,7 +6840,7 @@ func TestTCPTimeWaitNewSyn(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -7004,7 +7004,7 @@ func TestTCPTimeWaitDuplicateFINExtendsTimeWait(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
@@ -7154,7 +7154,7 @@ func TestTCPCloseWithData(t *testing.T) {
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
index cb4f82903..2949588ce 100644
--- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
+++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go
@@ -46,7 +46,7 @@ func TestTimeStampEnabledConnect(t *testing.T) {
// Register for read and validate that we have data to read.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
// The following tests ensure that TS option once enabled behaves
@@ -273,7 +273,7 @@ func TestSegmentNotDroppedWhenTimestampMissing(t *testing.T) {
// Register for read.
we, ch := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&we, waiter.EventIn)
+ c.WQ.EventRegister(&we, waiter.ReadableEvents)
defer c.WQ.EventUnregister(&we)
droppedPacketsStat := c.Stack().Stats().DroppedPackets
diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go
index 2f1c1011d..e73f90bb0 100644
--- a/pkg/tcpip/transport/tcp/testing/context/context.go
+++ b/pkg/tcpip/transport/tcp/testing/context/context.go
@@ -686,7 +686,7 @@ func (c *Context) Connect(iss seqnum.Value, rcvWnd seqnum.Size, options []byte)
// Start connection attempt.
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&waitEntry, waiter.EventOut)
+ c.WQ.EventRegister(&waitEntry, waiter.WritableEvents)
defer c.WQ.EventUnregister(&waitEntry)
err := c.EP.Connect(tcpip.FullAddress{Addr: TestAddr, Port: TestPort})
@@ -899,7 +899,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) *
// Start connection attempt.
waitEntry, notifyCh := waiter.NewChannelEntry(nil)
- c.WQ.EventRegister(&waitEntry, waiter.EventOut)
+ c.WQ.EventRegister(&waitEntry, waiter.WritableEvents)
defer c.WQ.EventUnregister(&waitEntry)
testFullAddr := tcpip.FullAddress{Addr: TestAddr, Port: TestPort}
@@ -1051,7 +1051,7 @@ func (c *Context) AcceptWithOptions(wndScale int, synOptions header.TCPSynOption
// Try to accept the connection.
we, ch := waiter.NewChannelEntry(nil)
- wq.EventRegister(&we, waiter.EventIn)
+ wq.EventRegister(&we, waiter.ReadableEvents)
defer wq.EventUnregister(&we)
c.EP, _, err = ep.Accept(nil)
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index 0f59181bb..956da0e0c 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -284,7 +284,7 @@ func (e *endpoint) Close() {
e.mu.Unlock()
- e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut)
+ e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents)
}
// ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf.
@@ -1070,7 +1070,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error {
e.rcvMu.Unlock()
if !wasClosed {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
}
@@ -1234,13 +1234,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) {
// waiter.EventIn is set, the endpoint is immediately readable.
func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// The endpoint is always writable.
- result := waiter.EventOut & mask
+ result := waiter.WritableEvents & mask
// Determine if the endpoint is readable if requested.
- if (mask & waiter.EventIn) != 0 {
+ if mask&waiter.ReadableEvents != 0 {
e.rcvMu.Lock()
if !e.rcvList.Empty() || e.rcvClosed {
- result |= waiter.EventIn
+ result |= waiter.ReadableEvents
}
e.rcvMu.Unlock()
}
@@ -1349,7 +1349,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB
// Notify any waiters that there's data to be read now.
if wasEmpty {
- e.waiterQueue.Notify(waiter.EventIn)
+ e.waiterQueue.Notify(waiter.ReadableEvents)
}
}
diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go
index c8126b51b..77ca70a04 100644
--- a/pkg/tcpip/transport/udp/udp_test.go
+++ b/pkg/tcpip/transport/udp/udp_test.go
@@ -591,7 +591,7 @@ func testReadInternal(c *testContext, flow testFlow, packetShouldBeDropped, expe
// Try to receive the data.
we, ch := waiter.NewChannelEntry(nil)
- c.wq.EventRegister(&we, waiter.EventIn)
+ c.wq.EventRegister(&we, waiter.ReadableEvents)
defer c.wq.EventUnregister(&we)
// Take a snapshot of the stats to validate them at the end of the test.