diff options
author | Bhasker Hariharan <bhaskerh@google.com> | 2021-03-24 12:08:24 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-03-24 12:11:44 -0700 |
commit | e7ca2a51a89a8ff2c9f5adfdfa5b51be1b3faeb3 (patch) | |
tree | 1abf748d2755526978f560abb67f29b6f83496c7 | |
parent | 72ff6a1cac6ab35132b4f79b1149590e103e5291 (diff) |
Add POLLRDNORM/POLLWRNORM support.
On Linux these are meant to be equivalent to POLLIN/POLLOUT. Rather
than hack these on in sys_poll etc it felt cleaner to just cleanup
the call sites to notify for both events. This is what linux does
as well.
Fixes #5544
PiperOrigin-RevId: 364859977
73 files changed, 352 insertions, 277 deletions
diff --git a/pkg/sentry/fs/host/socket.go b/pkg/sentry/fs/host/socket.go index f2d96d1ec..0b3d0617f 100644 --- a/pkg/sentry/fs/host/socket.go +++ b/pkg/sentry/fs/host/socket.go @@ -248,7 +248,7 @@ func (c *ConnectedEndpoint) Writable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.EventOut)&waiter.EventOut != 0 + return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.WritableEvents)&waiter.WritableEvents != 0 } // Passcred implements transport.ConnectedEndpoint.Passcred. @@ -345,7 +345,7 @@ func (c *ConnectedEndpoint) Readable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.EventIn)&waiter.EventIn != 0 + return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.ReadableEvents)&waiter.ReadableEvents != 0 } // SendQueuedSize implements transport.Receiver.SendQueuedSize. diff --git a/pkg/sentry/fs/host/wait_test.go b/pkg/sentry/fs/host/wait_test.go index 5925c85ea..bd6188e03 100644 --- a/pkg/sentry/fs/host/wait_test.go +++ b/pkg/sentry/fs/host/wait_test.go @@ -41,13 +41,13 @@ func TestWait(t *testing.T) { defer file.DecRef(ctx) - r := file.Readiness(waiter.EventIn) + r := file.Readiness(waiter.ReadableEvents) if r != 0 { t.Fatalf("File is ready for read when it shouldn't be.") } e, ch := waiter.NewChannelEntry(nil) - file.EventRegister(&e, waiter.EventIn) + file.EventRegister(&e, waiter.ReadableEvents) defer file.EventUnregister(&e) // Check that there are no notifications yet. diff --git a/pkg/sentry/fs/inotify.go b/pkg/sentry/fs/inotify.go index c5c07d564..fb81d903d 100644 --- a/pkg/sentry/fs/inotify.go +++ b/pkg/sentry/fs/inotify.go @@ -107,7 +107,7 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { defer i.evMu.Unlock() if !i.events.Empty() { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return mask & ready @@ -246,7 +246,7 @@ func (i *Inotify) queueEvent(ev *Event) { // can do. i.evMu.Unlock() - i.Queue.Notify(waiter.EventIn) + i.Queue.Notify(waiter.ReadableEvents) } // newWatchLocked creates and adds a new watch to target. diff --git a/pkg/sentry/fs/timerfd/timerfd.go b/pkg/sentry/fs/timerfd/timerfd.go index f362ca9b6..46511a6ac 100644 --- a/pkg/sentry/fs/timerfd/timerfd.go +++ b/pkg/sentry/fs/timerfd/timerfd.go @@ -101,7 +101,7 @@ func (t *TimerOperations) SetTime(s ktime.Setting) (ktime.Time, ktime.Setting) { func (t *TimerOperations) Readiness(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask if atomic.LoadUint64(&t.val) != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready } @@ -143,7 +143,7 @@ func (t *TimerOperations) Write(context.Context, *fs.File, usermem.IOSequence, i // Notify implements ktime.TimerListener.Notify. func (t *TimerOperations) Notify(exp uint64, setting ktime.Setting) (ktime.Setting, bool) { atomic.AddUint64(&t.val, exp) - t.events.Notify(waiter.EventIn) + t.events.Notify(waiter.ReadableEvents) return ktime.Setting{}, false } diff --git a/pkg/sentry/fs/tty/line_discipline.go b/pkg/sentry/fs/tty/line_discipline.go index b34f4a0eb..3ba02c218 100644 --- a/pkg/sentry/fs/tty/line_discipline.go +++ b/pkg/sentry/fs/tty/line_discipline.go @@ -143,7 +143,7 @@ func (l *lineDiscipline) setTermios(task *kernel.Task, args arch.SyscallArgument l.inQueue.pushWaitBufLocked(l) l.inQueue.readable = true l.inQueue.mu.Unlock() - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return 0, err @@ -187,9 +187,9 @@ func (l *lineDiscipline) inputQueueRead(ctx context.Context, dst usermem.IOSeque return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventOut) + l.masterWaiter.Notify(waiter.WritableEvents) if pushed { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -204,7 +204,7 @@ func (l *lineDiscipline) inputQueueWrite(ctx context.Context, src usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -222,9 +222,9 @@ func (l *lineDiscipline) outputQueueRead(ctx context.Context, dst usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventOut) + l.replicaWaiter.Notify(waiter.WritableEvents) if pushed { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -239,7 +239,7 @@ func (l *lineDiscipline) outputQueueWrite(ctx context.Context, src usermem.IOSeq return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -399,7 +399,7 @@ func (*inputQueueTransformer) transform(l *lineDiscipline, q *queue, buf []byte) // Anything written to the readBuf will have to be echoed. if l.termios.LEnabled(linux.ECHO) { l.outQueue.writeBytes(cBytes, l) - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } // If we finish a line, make it available for reading. diff --git a/pkg/sentry/fs/tty/queue.go b/pkg/sentry/fs/tty/queue.go index 79975d812..11d6c15d0 100644 --- a/pkg/sentry/fs/tty/queue.go +++ b/pkg/sentry/fs/tty/queue.go @@ -71,7 +71,7 @@ func (q *queue) readReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if len(q.readBuf) > 0 && q.readable { - return waiter.EventIn + return waiter.ReadableEvents } return waiter.EventMask(0) } @@ -81,7 +81,7 @@ func (q *queue) writeReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if q.waitBufLen < waitBufMaxBytes { - return waiter.EventOut + return waiter.WritableEvents } return waiter.EventMask(0) } diff --git a/pkg/sentry/fsimpl/devpts/line_discipline.go b/pkg/sentry/fsimpl/devpts/line_discipline.go index ae95fdd08..e94a5bac3 100644 --- a/pkg/sentry/fsimpl/devpts/line_discipline.go +++ b/pkg/sentry/fsimpl/devpts/line_discipline.go @@ -141,7 +141,7 @@ func (l *lineDiscipline) setTermios(task *kernel.Task, args arch.SyscallArgument l.inQueue.pushWaitBufLocked(l) l.inQueue.readable = true l.inQueue.mu.Unlock() - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return 0, err @@ -185,9 +185,9 @@ func (l *lineDiscipline) inputQueueRead(ctx context.Context, dst usermem.IOSeque return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventOut) + l.masterWaiter.Notify(waiter.WritableEvents) if pushed { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -202,7 +202,7 @@ func (l *lineDiscipline) inputQueueWrite(ctx context.Context, src usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -220,9 +220,9 @@ func (l *lineDiscipline) outputQueueRead(ctx context.Context, dst usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventOut) + l.replicaWaiter.Notify(waiter.WritableEvents) if pushed { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -237,7 +237,7 @@ func (l *lineDiscipline) outputQueueWrite(ctx context.Context, src usermem.IOSeq return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -397,7 +397,7 @@ func (*inputQueueTransformer) transform(l *lineDiscipline, q *queue, buf []byte) // Anything written to the readBuf will have to be echoed. if l.termios.LEnabled(linux.ECHO) { l.outQueue.writeBytes(cBytes, l) - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } // If we finish a line, make it available for reading. diff --git a/pkg/sentry/fsimpl/devpts/queue.go b/pkg/sentry/fsimpl/devpts/queue.go index 55bff3e60..47b0f1599 100644 --- a/pkg/sentry/fsimpl/devpts/queue.go +++ b/pkg/sentry/fsimpl/devpts/queue.go @@ -69,7 +69,7 @@ func (q *queue) readReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if len(q.readBuf) > 0 && q.readable { - return waiter.EventIn + return waiter.ReadableEvents } return waiter.EventMask(0) } @@ -79,7 +79,7 @@ func (q *queue) writeReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if q.waitBufLen < waitBufMaxBytes { - return waiter.EventOut + return waiter.WritableEvents } return waiter.EventMask(0) } diff --git a/pkg/sentry/fsimpl/eventfd/eventfd.go b/pkg/sentry/fsimpl/eventfd/eventfd.go index 7f810f720..30bd05357 100644 --- a/pkg/sentry/fsimpl/eventfd/eventfd.go +++ b/pkg/sentry/fsimpl/eventfd/eventfd.go @@ -185,7 +185,7 @@ func (efd *EventFileDescription) read(ctx context.Context, dst usermem.IOSequenc // Notify writers. We do this even if we were already writable because // it is possible that a writer is waiting to write the maximum value // to the event. - efd.queue.Notify(waiter.EventOut) + efd.queue.Notify(waiter.WritableEvents) var buf [8]byte usermem.ByteOrder.PutUint64(buf[:], val) @@ -238,7 +238,7 @@ func (efd *EventFileDescription) Signal(val uint64) error { efd.mu.Unlock() // Always trigger a notification. - efd.queue.Notify(waiter.EventIn) + efd.queue.Notify(waiter.ReadableEvents) return nil } @@ -254,11 +254,11 @@ func (efd *EventFileDescription) Readiness(mask waiter.EventMask) waiter.EventMa ready := waiter.EventMask(0) if efd.val > 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if efd.val < math.MaxUint64-1 { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } return mask & ready diff --git a/pkg/sentry/fsimpl/eventfd/eventfd_test.go b/pkg/sentry/fsimpl/eventfd/eventfd_test.go index 49916fa81..85718f813 100644 --- a/pkg/sentry/fsimpl/eventfd/eventfd_test.go +++ b/pkg/sentry/fsimpl/eventfd/eventfd_test.go @@ -49,7 +49,7 @@ func TestEventFD(t *testing.T) { // Register a callback for a write event. w, ch := waiter.NewChannelEntry(nil) - eventfd.EventRegister(&w, waiter.EventIn) + eventfd.EventRegister(&w, waiter.ReadableEvents) defer eventfd.EventUnregister(&w) data := []byte("00000124") diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index 34d25a61e..077bf9307 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -316,7 +316,7 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes conn.fd.completions[r.id] = fut // Signal the readers that there is something to read. - conn.fd.waitQueue.Notify(waiter.EventIn) + conn.fd.waitQueue.Notify(waiter.ReadableEvents) return fut, nil } diff --git a/pkg/sentry/fsimpl/fuse/dev.go b/pkg/sentry/fsimpl/fuse/dev.go index 1eeb95216..5d2bae14e 100644 --- a/pkg/sentry/fsimpl/fuse/dev.go +++ b/pkg/sentry/fsimpl/fuse/dev.go @@ -368,10 +368,10 @@ func (fd *DeviceFD) readinessLocked(mask waiter.EventMask) waiter.EventMask { } // FD is always writable. - ready |= waiter.EventOut + ready |= waiter.WritableEvents if !fd.queue.Empty() { // Have reqs available, FD is readable. - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready & mask diff --git a/pkg/sentry/fsimpl/fuse/dev_test.go b/pkg/sentry/fsimpl/fuse/dev_test.go index bb2d0d31a..04250d796 100644 --- a/pkg/sentry/fsimpl/fuse/dev_test.go +++ b/pkg/sentry/fsimpl/fuse/dev_test.go @@ -180,7 +180,7 @@ func ReadTest(serverTask *kernel.Task, fd *vfs.FileDescription, inIOseq usermem. // Register for notifications. w, ch := waiter.NewChannelEntry(nil) - dev.EventRegister(&w, waiter.EventIn) + dev.EventRegister(&w, waiter.ReadableEvents) for { // Issue the request and break out if it completes with anything other than // "would block". diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index fef857afb..167c899e2 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -286,7 +286,7 @@ func (fs *filesystem) Release(ctx context.Context) { fs.umounted = true fs.conn.Abort(ctx) // Notify all the waiters on this fd. - fs.conn.fd.waitQueue.Notify(waiter.EventIn) + fs.conn.fd.waitQueue.Notify(waiter.ReadableEvents) fs.conn.fd.mu.Unlock() diff --git a/pkg/sentry/fsimpl/host/socket.go b/pkg/sentry/fsimpl/host/socket.go index 056f910aa..60e237ac7 100644 --- a/pkg/sentry/fsimpl/host/socket.go +++ b/pkg/sentry/fsimpl/host/socket.go @@ -192,7 +192,7 @@ func (c *ConnectedEndpoint) Writable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventOut)&waiter.EventOut != 0 + return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.WritableEvents)&waiter.WritableEvents != 0 } // Passcred implements transport.ConnectedEndpoint.Passcred. @@ -282,7 +282,7 @@ func (c *ConnectedEndpoint) Readable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventIn)&waiter.EventIn != 0 + return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.ReadableEvents)&waiter.ReadableEvents != 0 } // SendQueuedSize implements transport.Receiver.SendQueuedSize. diff --git a/pkg/sentry/fsimpl/signalfd/signalfd.go b/pkg/sentry/fsimpl/signalfd/signalfd.go index 246bd87bc..a7f5928b7 100644 --- a/pkg/sentry/fsimpl/signalfd/signalfd.go +++ b/pkg/sentry/fsimpl/signalfd/signalfd.go @@ -117,8 +117,8 @@ func (sfd *SignalFileDescription) Read(ctx context.Context, dst usermem.IOSequen func (sfd *SignalFileDescription) Readiness(mask waiter.EventMask) waiter.EventMask { sfd.mu.Lock() defer sfd.mu.Unlock() - if mask&waiter.EventIn != 0 && sfd.target.PendingSignals()&sfd.mask != 0 { - return waiter.EventIn // Pending signals. + if mask&waiter.ReadableEvents != 0 && sfd.target.PendingSignals()&sfd.mask != 0 { + return waiter.ReadableEvents // Pending signals. } return 0 } diff --git a/pkg/sentry/fsimpl/timerfd/timerfd.go b/pkg/sentry/fsimpl/timerfd/timerfd.go index 8853c8ad2..64d33c3a8 100644 --- a/pkg/sentry/fsimpl/timerfd/timerfd.go +++ b/pkg/sentry/fsimpl/timerfd/timerfd.go @@ -105,7 +105,7 @@ func (tfd *TimerFileDescription) SetTime(s ktime.Setting) (ktime.Time, ktime.Set func (tfd *TimerFileDescription) Readiness(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask if atomic.LoadUint64(&tfd.val) != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready } @@ -138,7 +138,7 @@ func (tfd *TimerFileDescription) Release(context.Context) { // Notify implements ktime.TimerListener.Notify. func (tfd *TimerFileDescription) Notify(exp uint64, setting ktime.Setting) (ktime.Setting, bool) { atomic.AddUint64(&tfd.val, exp) - tfd.events.Notify(waiter.EventIn) + tfd.events.Notify(waiter.ReadableEvents) return ktime.Setting{}, false } diff --git a/pkg/sentry/kernel/epoll/epoll.go b/pkg/sentry/kernel/epoll/epoll.go index ba73a7812..6006c46a9 100644 --- a/pkg/sentry/kernel/epoll/epoll.go +++ b/pkg/sentry/kernel/epoll/epoll.go @@ -213,8 +213,8 @@ func (e *EventPoll) eventsAvailable() bool { func (e *EventPoll) Readiness(mask waiter.EventMask) waiter.EventMask { ready := waiter.EventMask(0) - if (mask&waiter.EventIn) != 0 && e.eventsAvailable() { - ready |= waiter.EventIn + if (mask&waiter.ReadableEvents) != 0 && e.eventsAvailable() { + ready |= waiter.ReadableEvents } return ready @@ -290,7 +290,7 @@ func (p *pollEntry) Callback(*waiter.Entry, waiter.EventMask) { p.curList = &e.readyList e.listsMu.Unlock() - e.Notify(waiter.EventIn) + e.Notify(waiter.ReadableEvents) return } diff --git a/pkg/sentry/kernel/epoll/epoll_state.go b/pkg/sentry/kernel/epoll/epoll_state.go index 7c61e0258..e08d6287f 100644 --- a/pkg/sentry/kernel/epoll/epoll_state.go +++ b/pkg/sentry/kernel/epoll/epoll_state.go @@ -45,7 +45,7 @@ func (e *EventPoll) afterLoad() { e.waitingList.Remove(entry) e.readyList.PushBack(entry) entry.curList = &e.readyList - e.Notify(waiter.EventIn) + e.Notify(waiter.ReadableEvents) } } } diff --git a/pkg/sentry/kernel/epoll/epoll_test.go b/pkg/sentry/kernel/epoll/epoll_test.go index 55b505593..8ef6cb3e7 100644 --- a/pkg/sentry/kernel/epoll/epoll_test.go +++ b/pkg/sentry/kernel/epoll/epoll_test.go @@ -29,7 +29,7 @@ func TestFileDestroyed(t *testing.T) { ctx := contexttest.Context(t) efile := NewEventPoll(ctx) e := efile.FileOperations.(*EventPoll) - if err := e.AddEntry(id, 0, waiter.EventIn, [2]int32{}); err != nil { + if err := e.AddEntry(id, 0, waiter.ReadableEvents, [2]int32{}); err != nil { t.Fatalf("addEntry failed: %v", err) } diff --git a/pkg/sentry/kernel/eventfd/eventfd.go b/pkg/sentry/kernel/eventfd/eventfd.go index 64f1cc631..2aca02fd5 100644 --- a/pkg/sentry/kernel/eventfd/eventfd.go +++ b/pkg/sentry/kernel/eventfd/eventfd.go @@ -183,7 +183,7 @@ func (e *EventOperations) read(ctx context.Context, dst usermem.IOSequence) erro // Notify writers. We do this even if we were already writable because // it is possible that a writer is waiting to write the maximum value // to the event. - e.wq.Notify(waiter.EventOut) + e.wq.Notify(waiter.WritableEvents) var buf [8]byte usermem.ByteOrder.PutUint64(buf[:], val) @@ -236,7 +236,7 @@ func (e *EventOperations) Signal(val uint64) error { e.mu.Unlock() // Always trigger a notification. - e.wq.Notify(waiter.EventIn) + e.wq.Notify(waiter.ReadableEvents) return nil } @@ -251,11 +251,11 @@ func (e *EventOperations) Readiness(mask waiter.EventMask) waiter.EventMask { ready := waiter.EventMask(0) if e.val > 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if e.val < math.MaxUint64-1 { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } e.mu.Unlock() diff --git a/pkg/sentry/kernel/eventfd/eventfd_test.go b/pkg/sentry/kernel/eventfd/eventfd_test.go index 9b4892f74..1b9e60b3a 100644 --- a/pkg/sentry/kernel/eventfd/eventfd_test.go +++ b/pkg/sentry/kernel/eventfd/eventfd_test.go @@ -39,7 +39,7 @@ func TestEventfd(t *testing.T) { // Register a callback for a write event. w, ch := waiter.NewChannelEntry(nil) - event.EventRegister(&w, waiter.EventIn) + event.EventRegister(&w, waiter.ReadableEvents) defer event.EventUnregister(&w) data := []byte("00000124") diff --git a/pkg/sentry/kernel/fasync/fasync.go b/pkg/sentry/kernel/fasync/fasync.go index b66d61c6f..dbbbaeeb0 100644 --- a/pkg/sentry/kernel/fasync/fasync.go +++ b/pkg/sentry/kernel/fasync/fasync.go @@ -162,7 +162,7 @@ func (a *FileAsync) Register(w waiter.Waitable) { a.registered = true a.mu.Unlock() - w.EventRegister(&a.e, waiter.EventIn|waiter.EventOut|waiter.EventErr|waiter.EventHUp) + w.EventRegister(&a.e, waiter.ReadableEvents|waiter.WritableEvents|waiter.EventErr|waiter.EventHUp) } // Unregister stops monitoring a file. diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index 68a55a186..d004f2357 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -183,7 +183,7 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F // // peekLocked does not mutate the pipe; if the read consumes bytes from the // pipe, then the caller is responsible for calling p.consumeLocked() and -// p.Notify(waiter.EventOut). (The latter must be called with p.mu unlocked.) +// p.Notify(waiter.WritableEvents). (The latter must be called with p.mu unlocked.) // // Preconditions: // * p.mu must be locked. @@ -237,7 +237,7 @@ func (p *Pipe) consumeLocked(n int64) { // Unlike peekLocked, writeLocked assumes that f returns the number of bytes // written to the pipe, and increases the number of bytes stored in the pipe // accordingly. Callers are still responsible for calling -// p.Notify(waiter.EventIn) with p.mu unlocked. +// p.Notify(waiter.ReadableEvents) with p.mu unlocked. // // Preconditions: // * p.mu must be locked. @@ -357,7 +357,7 @@ func (p *Pipe) HasWriters() bool { func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasReaders() && p.size != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if !p.HasWriters() && p.hadWriter { // POLLHUP must be suppressed until the pipe has had at least one writer @@ -383,7 +383,7 @@ func (p *Pipe) rReadiness() waiter.EventMask { func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasWriters() && p.size < p.max { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } if !p.HasReaders() { ready |= waiter.EventErr diff --git a/pkg/sentry/kernel/pipe/pipe_test.go b/pkg/sentry/kernel/pipe/pipe_test.go index 3dd739080..867f4a76b 100644 --- a/pkg/sentry/kernel/pipe/pipe_test.go +++ b/pkg/sentry/kernel/pipe/pipe_test.go @@ -97,7 +97,7 @@ func TestPipeWriteUntilEnd(t *testing.T) { buf := make([]byte, len(msg)+1) dst := usermem.BytesIOSequence(buf) e, ch := waiter.NewChannelEntry(nil) - r.EventRegister(&e, waiter.EventIn) + r.EventRegister(&e, waiter.ReadableEvents) defer r.EventUnregister(&e) for { n, err := r.Readv(ctx, dst) @@ -124,7 +124,7 @@ func TestPipeWriteUntilEnd(t *testing.T) { src := usermem.BytesIOSequence(msg) e, ch := waiter.NewChannelEntry(nil) - w.EventRegister(&e, waiter.EventOut) + w.EventRegister(&e, waiter.WritableEvents) defer w.EventUnregister(&e) for src.NumBytes() != 0 { n, err := w.Writev(ctx, src) diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go index 76ea389ca..2d89b9ccd 100644 --- a/pkg/sentry/kernel/pipe/pipe_util.go +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -39,14 +39,14 @@ func (p *Pipe) Release(context.Context) { p.wClose() // Wake up readers and writers. - p.Notify(waiter.EventIn | waiter.EventOut) + p.Notify(waiter.ReadableEvents | waiter.WritableEvents) } // Read reads from the Pipe into dst. func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { n, err := dst.CopyOutFrom(ctx, p) if n > 0 { - p.Notify(waiter.EventOut) + p.Notify(waiter.WritableEvents) } return n, err } @@ -75,7 +75,7 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) return safemem.FromIOWriter{w}.WriteFromBlocks(srcs) }, !dup /* removeFromSrc */) if n > 0 && !dup { - p.Notify(waiter.EventOut) + p.Notify(waiter.WritableEvents) } return n, err } @@ -84,7 +84,7 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { n, err := src.CopyInTo(ctx, p) if n > 0 { - p.Notify(waiter.EventIn) + p.Notify(waiter.ReadableEvents) } return n, err } @@ -109,7 +109,7 @@ func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, e return safemem.FromIOReader{r}.ReadToBlocks(dsts) }) if n > 0 { - p.Notify(waiter.EventIn) + p.Notify(waiter.ReadableEvents) } return n, err } diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go index 09c0ccaf2..e524afad5 100644 --- a/pkg/sentry/kernel/pipe/vfs.go +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -194,11 +194,11 @@ func (fd *VFSPipeFD) Release(context.Context) { var event waiter.EventMask if fd.vfsfd.IsReadable() { fd.pipe.rClose() - event |= waiter.EventOut + event |= waiter.WritableEvents } if fd.vfsfd.IsWritable() { fd.pipe.wClose() - event |= waiter.EventIn | waiter.EventHUp + event |= waiter.ReadableEvents | waiter.EventHUp } if event == 0 { panic("invalid pipe flags: must be readable, writable, or both") @@ -293,7 +293,7 @@ func (fd *VFSPipeFD) SpliceToNonPipe(ctx context.Context, out *vfs.FileDescripti fd.pipe.mu.Unlock() if n > 0 { - fd.pipe.Notify(waiter.EventOut) + fd.pipe.Notify(waiter.WritableEvents) } return n, err } @@ -318,14 +318,14 @@ func (fd *VFSPipeFD) SpliceFromNonPipe(ctx context.Context, in *vfs.FileDescript fd.pipe.mu.Unlock() if n > 0 { - fd.pipe.Notify(waiter.EventIn) + fd.pipe.Notify(waiter.ReadableEvents) } return n, err } // CopyIn implements usermem.IO.CopyIn. Note that it is the caller's // responsibility to call fd.pipe.consumeLocked() and -// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// fd.pipe.Notify(waiter.WritableEvents) after the read is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, opts usermem.IOOpts) (int, error) { @@ -336,8 +336,8 @@ func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, } // CopyOut implements usermem.IO.CopyOut. Note that it is the caller's -// responsibility to call fd.pipe.Notify(waiter.EventIn) after the -// write is completed. +// responsibility to call fd.pipe.Notify(waiter.ReadableEvents) after the write +// is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOut(ctx context.Context, addr usermem.Addr, src []byte, opts usermem.IOOpts) (int, error) { @@ -359,7 +359,7 @@ func (fd *VFSPipeFD) ZeroOut(ctx context.Context, addr usermem.Addr, toZero int6 // CopyInTo implements usermem.IO.CopyInTo. Note that it is the caller's // responsibility to call fd.pipe.consumeLocked() and -// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// fd.pipe.Notify(waiter.WritableEvents) after the read is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst safemem.Writer, opts usermem.IOOpts) (int64, error) { @@ -369,8 +369,8 @@ func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst } // CopyOutFrom implements usermem.IO.CopyOutFrom. Note that it is the caller's -// responsibility to call fd.pipe.Notify(waiter.EventIn) after the write is -// completed. +// responsibility to call fd.pipe.Notify(waiter.ReadableEvents) after the write +// is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOutFrom(ctx context.Context, ars usermem.AddrRangeSeq, src safemem.Reader, opts usermem.IOOpts) (int64, error) { @@ -431,9 +431,9 @@ func spliceOrTee(ctx context.Context, dst, src *VFSPipeFD, count int64, removeFr src.pipe.mu.Unlock() if n > 0 { - dst.pipe.Notify(waiter.EventIn) + dst.pipe.Notify(waiter.ReadableEvents) if removeFromSrc { - src.pipe.Notify(waiter.EventOut) + src.pipe.Notify(waiter.WritableEvents) } } return n, err diff --git a/pkg/sentry/kernel/signalfd/signalfd.go b/pkg/sentry/kernel/signalfd/signalfd.go index 884966120..f58ec4194 100644 --- a/pkg/sentry/kernel/signalfd/signalfd.go +++ b/pkg/sentry/kernel/signalfd/signalfd.go @@ -122,8 +122,8 @@ func (s *SignalOperations) Read(ctx context.Context, _ *fs.File, dst usermem.IOS // Readiness implements waiter.Waitable.Readiness. func (s *SignalOperations) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 && s.target.PendingSignals()&s.Mask() != 0 { - return waiter.EventIn // Pending signals. + if mask&waiter.ReadableEvents != 0 && s.target.PendingSignals()&s.Mask() != 0 { + return waiter.ReadableEvents // Pending signals. } return 0 } diff --git a/pkg/sentry/socket/hostinet/socket.go b/pkg/sentry/socket/hostinet/socket.go index c711d0684..2d9dbbdba 100644 --- a/pkg/sentry/socket/hostinet/socket.go +++ b/pkg/sentry/socket/hostinet/socket.go @@ -200,9 +200,10 @@ func (s *socketOpsCommon) Connect(t *kernel.Task, sockaddr []byte, blocking bool // (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error // codes listed here, explaining the reason for the failure)." - connect(2) e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + writableMask := waiter.WritableEvents + s.EventRegister(&e, writableMask) defer s.EventUnregister(&e) - if s.Readiness(waiter.EventOut)&waiter.EventOut == 0 { + if s.Readiness(writableMask)&writableMask == 0 { if err := t.Block(ch); err != nil { return syserr.FromError(err) } @@ -244,7 +245,7 @@ func (s *socketOpsCommon) Accept(t *kernel.Task, peerRequested bool, flags int, } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) } fd, syscallErr = accept4(s.fd, peerAddrPtr, peerAddrlenPtr, unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC) @@ -496,7 +497,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) } n, err = copyToDst() @@ -652,7 +653,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) } n, err = src.CopyInTo(t, sendmsgFromBlocks) diff --git a/pkg/sentry/socket/netlink/socket.go b/pkg/sentry/socket/netlink/socket.go index 057f4d294..d5ffc75ce 100644 --- a/pkg/sentry/socket/netlink/socket.go +++ b/pkg/sentry/socket/netlink/socket.go @@ -176,10 +176,10 @@ func (s *socketOpsCommon) Readiness(mask waiter.EventMask) waiter.EventMask { // ep holds messages to be read and thus handles EventIn readiness. ready := s.ep.Readiness(mask) - if mask&waiter.EventOut == waiter.EventOut { + if mask&waiter.WritableEvents != 0 { // sendMsg handles messages synchronously and is thus always // ready for writing. - ready |= waiter.EventOut + ready |= waiter.WritableEvents } return ready @@ -544,7 +544,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notification and keep trying to // receive all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go index 9efb195f0..64e70ab9d 100644 --- a/pkg/sentry/socket/netstack/netstack.go +++ b/pkg/sentry/socket/netstack/netstack.go @@ -567,7 +567,7 @@ func (s *socketOpsCommon) Connect(t *kernel.Task, sockaddr []byte, blocking bool // Register for notification when the endpoint becomes writable, then // initiate the connection. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) switch err := s.Endpoint.Connect(addr); err.(type) { @@ -663,7 +663,7 @@ func (s *socketOpsCommon) Listen(t *kernel.Task, backlog int) *syserr.Error { func (s *socketOpsCommon) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter.Queue, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) // Try to accept the connection again; if it fails, then wait until we @@ -2753,7 +2753,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notifications and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { @@ -2840,7 +2840,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b // We'll have to block. Register for notification and keep trying to // send all the data. entry, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&entry, waiter.EventOut) + s.EventRegister(&entry, waiter.WritableEvents) defer s.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index b1967fc36..159b8f90f 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -338,8 +338,8 @@ func (e *connectionedEndpoint) BidirectionalConnect(ctx context.Context, ce Conn ce.Unlock() // Notify on both ends. - e.Notify(waiter.EventIn) - ce.WaiterQueue().Notify(waiter.EventOut) + e.Notify(waiter.ReadableEvents) + ce.WaiterQueue().Notify(waiter.WritableEvents) return nil default: @@ -480,15 +480,15 @@ func (e *connectionedEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask ready := waiter.EventMask(0) switch { case e.Connected(): - if mask&waiter.EventIn != 0 && e.receiver.Readable() { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && e.receiver.Readable() { + ready |= waiter.ReadableEvents } - if mask&waiter.EventOut != 0 && e.connected.Writable() { - ready |= waiter.EventOut + if mask&waiter.WritableEvents != 0 && e.connected.Writable() { + ready |= waiter.WritableEvents } case e.Listening(): - if mask&waiter.EventIn != 0 && len(e.acceptedChan) > 0 { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && len(e.acceptedChan) > 0 { + ready |= waiter.ReadableEvents } } diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go index 0be78480c..d0df28b59 100644 --- a/pkg/sentry/socket/unix/transport/connectionless.go +++ b/pkg/sentry/socket/unix/transport/connectionless.go @@ -191,13 +191,13 @@ func (e *connectionlessEndpoint) Readiness(mask waiter.EventMask) waiter.EventMa defer e.Unlock() ready := waiter.EventMask(0) - if mask&waiter.EventIn != 0 && e.receiver.Readable() { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && e.receiver.Readable() { + ready |= waiter.ReadableEvents } if e.Connected() { - if mask&waiter.EventOut != 0 && e.connected.Writable() { - ready |= waiter.EventOut + if mask&waiter.WritableEvents != 0 && e.connected.Writable() { + ready |= waiter.WritableEvents } } diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go index 698a9a82c..e4de44498 100644 --- a/pkg/sentry/socket/unix/transport/queue.go +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -44,8 +44,8 @@ type queue struct { // will become unreadable when no more data is pending. // // Both the read and write queues must be notified after closing: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) +// q.ReaderQueue.Notify(waiter.ReadableEvents) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Close() { q.mu.Lock() q.closed = true @@ -55,8 +55,8 @@ func (q *queue) Close() { // Reset empties the queue and Releases all of the Entries. // // Both the read and write queues must be notified after resetting: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) +// q.ReaderQueue.Notify(waiter.ReadableEvents) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Reset(ctx context.Context) { q.mu.Lock() for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { @@ -112,7 +112,7 @@ func (q *queue) IsWritable() bool { // err indicates why. // // If notify is true, ReaderQueue.Notify must be called: -// q.ReaderQueue.Notify(waiter.EventIn) +// q.ReaderQueue.Notify(waiter.ReadableEvents) func (q *queue) Enqueue(ctx context.Context, data [][]byte, c ControlMessages, from tcpip.FullAddress, discardEmpty bool, truncate bool) (l int64, notify bool, err *syserr.Error) { q.mu.Lock() @@ -179,7 +179,7 @@ func (q *queue) Enqueue(ctx context.Context, data [][]byte, c ControlMessages, f // Dequeue removes the first entry in the data queue, if one exists. // // If notify is true, WriterQueue.Notify must be called: -// q.WriterQueue.Notify(waiter.EventOut) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Dequeue() (e *message, notify bool, err *syserr.Error) { q.mu.Lock() diff --git a/pkg/sentry/socket/unix/transport/unix.go b/pkg/sentry/socket/unix/transport/unix.go index 089a0a647..0c5f5ab42 100644 --- a/pkg/sentry/socket/unix/transport/unix.go +++ b/pkg/sentry/socket/unix/transport/unix.go @@ -376,13 +376,13 @@ func (q *queueReceiver) Recv(ctx context.Context, data [][]byte, creds bool, num // RecvNotify implements Receiver.RecvNotify. func (q *queueReceiver) RecvNotify() { - q.readQueue.WriterQueue.Notify(waiter.EventOut) + q.readQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseNotify implements Receiver.CloseNotify. func (q *queueReceiver) CloseNotify() { - q.readQueue.ReaderQueue.Notify(waiter.EventIn) - q.readQueue.WriterQueue.Notify(waiter.EventOut) + q.readQueue.ReaderQueue.Notify(waiter.ReadableEvents) + q.readQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseRecv implements Receiver.CloseRecv. @@ -692,13 +692,13 @@ func (e *connectedEndpoint) Send(ctx context.Context, data [][]byte, c ControlMe // SendNotify implements ConnectedEndpoint.SendNotify. func (e *connectedEndpoint) SendNotify() { - e.writeQueue.ReaderQueue.Notify(waiter.EventIn) + e.writeQueue.ReaderQueue.Notify(waiter.ReadableEvents) } // CloseNotify implements ConnectedEndpoint.CloseNotify. func (e *connectedEndpoint) CloseNotify() { - e.writeQueue.ReaderQueue.Notify(waiter.EventIn) - e.writeQueue.WriterQueue.Notify(waiter.EventOut) + e.writeQueue.ReaderQueue.Notify(waiter.ReadableEvents) + e.writeQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseSend implements ConnectedEndpoint.CloseSend. diff --git a/pkg/sentry/socket/unix/unix.go b/pkg/sentry/socket/unix/unix.go index 63165e1d4..b22f7973a 100644 --- a/pkg/sentry/socket/unix/unix.go +++ b/pkg/sentry/socket/unix/unix.go @@ -207,7 +207,7 @@ func (s *socketOpsCommon) Listen(t *kernel.Task, backlog int) *syserr.Error { func (s *SocketOperations) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (transport.Endpoint, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) // Try to accept the connection; if it fails, then wait until we get a @@ -502,7 +502,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b // We'll have to block. Register for notification and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) total := n @@ -677,7 +677,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notification and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { diff --git a/pkg/sentry/socket/unix/unix_vfs2.go b/pkg/sentry/socket/unix/unix_vfs2.go index 9c037cbae..7890d1048 100644 --- a/pkg/sentry/socket/unix/unix_vfs2.go +++ b/pkg/sentry/socket/unix/unix_vfs2.go @@ -121,7 +121,7 @@ func (s *SocketVFS2) GetSockOpt(t *kernel.Task, level, name int, outPtr usermem. func (s *SocketVFS2) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (transport.Endpoint, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.socketOpsCommon.EventRegister(&e, waiter.EventIn) + s.socketOpsCommon.EventRegister(&e, waiter.ReadableEvents) defer s.socketOpsCommon.EventUnregister(&e) // Try to accept the connection; if it fails, then wait until we get a diff --git a/pkg/sentry/syscalls/epoll.go b/pkg/sentry/syscalls/epoll.go index d23a0068a..e115683f8 100644 --- a/pkg/sentry/syscalls/epoll.go +++ b/pkg/sentry/syscalls/epoll.go @@ -151,7 +151,7 @@ func WaitEpoll(t *kernel.Task, fd int32, max int, timeout int) ([]linux.EpollEve } w, ch := waiter.NewChannelEntry(nil) - e.EventRegister(&w, waiter.EventIn) + e.EventRegister(&w, waiter.ReadableEvents) defer e.EventUnregister(&w) // Try to read the events again until we succeed, timeout or get diff --git a/pkg/sentry/syscalls/linux/sys_read.go b/pkg/sentry/syscalls/linux/sys_read.go index f655d3db1..13e5e3a51 100644 --- a/pkg/sentry/syscalls/linux/sys_read.go +++ b/pkg/sentry/syscalls/linux/sys_read.go @@ -32,7 +32,7 @@ import ( const ( // EventMaskRead contains events that can be triggered on reads. - EventMaskRead = waiter.EventIn | waiter.EventHUp | waiter.EventErr + EventMaskRead = waiter.ReadableEvents | waiter.EventHUp | waiter.EventErr ) // Read implements linux syscall read(2). Note that we try to get a buffer that diff --git a/pkg/sentry/syscalls/linux/vfs2/epoll.go b/pkg/sentry/syscalls/linux/vfs2/epoll.go index d0cbb77eb..b980aa43e 100644 --- a/pkg/sentry/syscalls/linux/vfs2/epoll.go +++ b/pkg/sentry/syscalls/linux/vfs2/epoll.go @@ -169,7 +169,7 @@ func EpollWait(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sys if ch == nil { var w waiter.Entry w, ch = waiter.NewChannelEntry(nil) - epfile.EventRegister(&w, waiter.EventIn) + epfile.EventRegister(&w, waiter.ReadableEvents) defer epfile.EventUnregister(&w) } else { // Set up the timer if a timeout was specified. diff --git a/pkg/sentry/syscalls/linux/vfs2/read_write.go b/pkg/sentry/syscalls/linux/vfs2/read_write.go index c7417840f..b863d7b84 100644 --- a/pkg/sentry/syscalls/linux/vfs2/read_write.go +++ b/pkg/sentry/syscalls/linux/vfs2/read_write.go @@ -30,8 +30,8 @@ import ( ) const ( - eventMaskRead = waiter.EventIn | waiter.EventHUp | waiter.EventErr - eventMaskWrite = waiter.EventOut | waiter.EventHUp | waiter.EventErr + eventMaskRead = waiter.EventRdNorm | waiter.EventIn | waiter.EventHUp | waiter.EventErr + eventMaskWrite = waiter.EventWrNorm | waiter.EventOut | waiter.EventHUp | waiter.EventErr ) // Read implements Linux syscall read(2). diff --git a/pkg/sentry/vfs/epoll.go b/pkg/sentry/vfs/epoll.go index 072655fe8..ae004b371 100644 --- a/pkg/sentry/vfs/epoll.go +++ b/pkg/sentry/vfs/epoll.go @@ -131,7 +131,7 @@ func (ep *EpollInstance) Release(ctx context.Context) { // Readiness implements waiter.Waitable.Readiness. func (ep *EpollInstance) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn == 0 { + if mask&waiter.ReadableEvents == 0 { return 0 } ep.mu.Lock() @@ -139,7 +139,7 @@ func (ep *EpollInstance) Readiness(mask waiter.EventMask) waiter.EventMask { wmask := waiter.EventMaskFromLinux(epi.mask) if epi.key.file.Readiness(wmask)&wmask != 0 { ep.mu.Unlock() - return waiter.EventIn + return waiter.ReadableEvents } } ep.mu.Unlock() @@ -321,7 +321,7 @@ func (epi *epollInterest) Callback(*waiter.Entry, waiter.EventMask) { } epi.epoll.mu.Unlock() if newReady { - epi.epoll.q.Notify(waiter.EventIn) + epi.epoll.q.Notify(waiter.ReadableEvents) } } diff --git a/pkg/sentry/vfs/file_description_impl_util.go b/pkg/sentry/vfs/file_description_impl_util.go index d2050b3f7..1556b41a3 100644 --- a/pkg/sentry/vfs/file_description_impl_util.go +++ b/pkg/sentry/vfs/file_description_impl_util.go @@ -72,7 +72,7 @@ func (FileDescriptionDefaultImpl) Allocate(ctx context.Context, mode, offset, le // file_operations::poll == NULL in Linux. func (FileDescriptionDefaultImpl) Readiness(mask waiter.EventMask) waiter.EventMask { // include/linux/poll.h:vfs_poll() => DEFAULT_POLLMASK - return waiter.EventIn | waiter.EventOut + return waiter.ReadableEvents | waiter.WritableEvents } // EventRegister implements waiter.Waitable.EventRegister analogously to diff --git a/pkg/sentry/vfs/inotify.go b/pkg/sentry/vfs/inotify.go index a48ac1cd6..32fa01578 100644 --- a/pkg/sentry/vfs/inotify.go +++ b/pkg/sentry/vfs/inotify.go @@ -175,7 +175,7 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { defer i.evMu.Unlock() if !i.events.Empty() { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return mask & ready @@ -286,7 +286,7 @@ func (i *Inotify) queueEvent(ev *Event) { // can do. i.evMu.Unlock() - i.queue.Notify(waiter.EventIn) + i.queue.Notify(waiter.ReadableEvents) } // newWatchLocked creates and adds a new watch to target. diff --git a/pkg/tcpip/adapters/gonet/gonet.go b/pkg/tcpip/adapters/gonet/gonet.go index c188aaa18..010e2e833 100644 --- a/pkg/tcpip/adapters/gonet/gonet.go +++ b/pkg/tcpip/adapters/gonet/gonet.go @@ -251,7 +251,7 @@ func (l *TCPListener) Accept() (net.Conn, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - l.wq.EventRegister(&waitEntry, waiter.EventIn) + l.wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer l.wq.EventUnregister(&waitEntry) for { @@ -301,7 +301,7 @@ func commonRead(b []byte, ep tcpip.Endpoint, wq *waiter.Queue, deadline <-chan s if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { res, err = ep.Read(&w, opts) @@ -382,7 +382,7 @@ func (c *TCPConn) Write(b []byte) (int, error) { if ch == nil { entry, ch = waiter.NewChannelEntry(nil) - c.wq.EventRegister(&entry, waiter.EventOut) + c.wq.EventRegister(&entry, waiter.WritableEvents) defer c.wq.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data @@ -485,7 +485,7 @@ func DialContextTCP(ctx context.Context, s *stack.Stack, addr tcpip.FullAddress, // // We do this unconditionally as Connect will always return an error. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) select { @@ -652,7 +652,7 @@ func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&waitEntry, waiter.EventOut) + c.wq.EventRegister(&waitEntry, waiter.WritableEvents) defer c.wq.EventUnregister(&waitEntry) for { select { diff --git a/pkg/tcpip/adapters/gonet/gonet_test.go b/pkg/tcpip/adapters/gonet/gonet_test.go index 2b3ea4bdf..48b24692b 100644 --- a/pkg/tcpip/adapters/gonet/gonet_test.go +++ b/pkg/tcpip/adapters/gonet/gonet_test.go @@ -102,7 +102,7 @@ func connect(s *stack.Stack, addr tcpip.FullAddress) (*testConnection, tcpip.Err } entry, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&entry, waiter.EventOut) + wq.EventRegister(&entry, waiter.WritableEvents) err = ep.Connect(addr) if _, ok := err.(*tcpip.ErrConnectStarted); ok { @@ -114,7 +114,7 @@ func connect(s *stack.Stack, addr tcpip.FullAddress) (*testConnection, tcpip.Err } wq.EventUnregister(&entry) - wq.EventRegister(&entry, waiter.EventIn) + wq.EventRegister(&entry, waiter.ReadableEvents) return &testConnection{wq, &entry, ch, ep}, nil } diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index 80fb343c5..36af2a029 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -309,20 +309,20 @@ func (d *Device) Flags() Flags { // Readiness implements watier.Waitable.Readiness. func (d *Device) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 { + if mask&waiter.ReadableEvents != 0 { d.mu.RLock() endpoint := d.endpoint d.mu.RUnlock() if endpoint != nil && endpoint.NumQueued() == 0 { - mask &= ^waiter.EventIn + mask &= ^waiter.ReadableEvents } } - return mask & (waiter.EventIn | waiter.EventOut) + return mask & (waiter.ReadableEvents | waiter.WritableEvents) } // WriteNotify implements channel.Notification.WriteNotify. func (d *Device) WriteNotify() { - d.Notify(waiter.EventIn) + d.Notify(waiter.ReadableEvents) } // tunEndpoint is the link endpoint for the NIC created by the tun device. diff --git a/pkg/tcpip/network/ipv4/ipv4_test.go b/pkg/tcpip/network/ipv4/ipv4_test.go index cfed241bf..eba91c68c 100644 --- a/pkg/tcpip/network/ipv4/ipv4_test.go +++ b/pkg/tcpip/network/ipv4/ipv4_test.go @@ -2514,7 +2514,7 @@ func TestReceiveFragments(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, header.IPv4ProtocolNumber, &wq) diff --git a/pkg/tcpip/network/ipv6/ipv6_test.go b/pkg/tcpip/network/ipv6/ipv6_test.go index 81f5f23c3..c206cebeb 100644 --- a/pkg/tcpip/network/ipv6/ipv6_test.go +++ b/pkg/tcpip/network/ipv6/ipv6_test.go @@ -101,7 +101,7 @@ func testReceiveUDP(t *testing.T, s *stack.Stack, e *channel.Endpoint, src, dst wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) @@ -912,7 +912,7 @@ func TestReceiveIPv6ExtHdrs(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.WritableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, ProtocolNumber, &wq) @@ -1998,7 +1998,7 @@ func TestReceiveIPv6Fragments(t *testing.T) { wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(udp.ProtocolNumber, ProtocolNumber, &wq) diff --git a/pkg/tcpip/sample/tun_tcp_connect/main.go b/pkg/tcpip/sample/tun_tcp_connect/main.go index 856ea998d..b9a24ff56 100644 --- a/pkg/tcpip/sample/tun_tcp_connect/main.go +++ b/pkg/tcpip/sample/tun_tcp_connect/main.go @@ -173,7 +173,7 @@ func main() { // Issue connect request and wait for it to complete. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) terr := ep.Connect(remote) if _, ok := terr.(*tcpip.ErrConnectStarted); ok { fmt.Println("Connect is pending...") @@ -194,7 +194,7 @@ func main() { // Read data and write to standard output until the peer closes the // connection from its side. - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) for { _, err := ep.Read(os.Stdout, tcpip.ReadOptions{}) if err != nil { diff --git a/pkg/tcpip/sample/tun_tcp_echo/main.go b/pkg/tcpip/sample/tun_tcp_echo/main.go index 9b23df3a9..ef1bfc186 100644 --- a/pkg/tcpip/sample/tun_tcp_echo/main.go +++ b/pkg/tcpip/sample/tun_tcp_echo/main.go @@ -79,7 +79,7 @@ func echo(wq *waiter.Queue, ep tcpip.Endpoint) { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) w := endpointWriter{ @@ -211,7 +211,7 @@ func main() { // Wait for connections to appear. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { diff --git a/pkg/tcpip/stack/ndp_test.go b/pkg/tcpip/stack/ndp_test.go index 0725e028b..14124ae66 100644 --- a/pkg/tcpip/stack/ndp_test.go +++ b/pkg/tcpip/stack/ndp_test.go @@ -2909,7 +2909,7 @@ func addrForNewConnectionTo(t *testing.T, s *stack.Stack, addr tcpip.FullAddress wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) @@ -2943,7 +2943,7 @@ func addrForNewConnectionWithAddr(t *testing.T, s *stack.Stack, addr tcpip.FullA wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) @@ -3272,7 +3272,7 @@ func TestAutoGenAddrJobDeprecation(t *testing.T) { } wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) ep, err := s.NewEndpoint(header.UDPProtocolNumber, header.IPv6ProtocolNumber, &wq) diff --git a/pkg/tcpip/stack/transport_demuxer_test.go b/pkg/tcpip/stack/transport_demuxer_test.go index c1c6cbccd..4848495c9 100644 --- a/pkg/tcpip/stack/transport_demuxer_test.go +++ b/pkg/tcpip/stack/transport_demuxer_test.go @@ -290,7 +290,7 @@ func TestBindToDeviceDistribution(t *testing.T) { // Try to receive the data. wq := waiter.Queue{} we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) defer close(ch) diff --git a/pkg/tcpip/tests/integration/forward_test.go b/pkg/tcpip/tests/integration/forward_test.go index 38c2f321b..d10ae05c2 100644 --- a/pkg/tcpip/tests/integration/forward_test.go +++ b/pkg/tcpip/tests/integration/forward_test.go @@ -48,7 +48,7 @@ func TestForwarding(t *testing.T) { t.Helper() var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(transProto, netProto, &wq) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d, _): %s", transProto, netProto, err) @@ -184,7 +184,7 @@ func TestForwarding(t *testing.T) { } we, newCH := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) return newEP, newCH } }, diff --git a/pkg/tcpip/tests/integration/link_resolution_test.go b/pkg/tcpip/tests/integration/link_resolution_test.go index 095623789..d39809e1c 100644 --- a/pkg/tcpip/tests/integration/link_resolution_test.go +++ b/pkg/tcpip/tests/integration/link_resolution_test.go @@ -151,7 +151,7 @@ func TestPing(t *testing.T) { var wq waiter.Queue we, waiterCH := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := host1Stack.NewEndpoint(test.transProto, test.netProto, &wq) if err != nil { t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", test.transProto, test.netProto, err) @@ -308,7 +308,7 @@ func TestTCPLinkResolutionFailure(t *testing.T) { var clientWQ waiter.Queue we, ch := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&we, waiter.EventOut|waiter.EventErr) + clientWQ.EventRegister(&we, waiter.WritableEvents|waiter.EventErr) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, test.netProto, &clientWQ) if err != nil { t.Fatalf("host1Stack.NewEndpoint(%d, %d, _): %s", tcp.ProtocolNumber, test.netProto, err) @@ -641,7 +641,7 @@ func TestWritePacketsLinkResolution(t *testing.T) { var serverWQ waiter.Queue serverWE, serverCH := waiter.NewChannelEntry(nil) - serverWQ.EventRegister(&serverWE, waiter.EventIn) + serverWQ.EventRegister(&serverWE, waiter.ReadableEvents) serverEP, err := host2Stack.NewEndpoint(udp.ProtocolNumber, test.netProto, &serverWQ) if err != nil { t.Fatalf("host2Stack.NewEndpoint(%d, %d, _): %s", udp.ProtocolNumber, test.netProto, err) @@ -821,7 +821,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -845,7 +845,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -869,7 +869,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -893,7 +893,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host1Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -917,7 +917,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -942,7 +942,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := routerStack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -967,7 +967,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv4.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() @@ -992,7 +992,7 @@ func TestTCPConfirmNeighborReachability(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventOut) + clientWQ.EventRegister(&clientWE, waiter.WritableEvents) clientEP, err := host2Stack.NewEndpoint(tcp.ProtocolNumber, ipv6.ProtocolNumber, &clientWQ) if err != nil { listenerEP.Close() diff --git a/pkg/tcpip/tests/integration/loopback_test.go b/pkg/tcpip/tests/integration/loopback_test.go index 6462e9d42..2c538a43e 100644 --- a/pkg/tcpip/tests/integration/loopback_test.go +++ b/pkg/tcpip/tests/integration/loopback_test.go @@ -449,7 +449,7 @@ func TestLoopbackAcceptAllInSubnetTCP(t *testing.T) { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) listeningEndpoint, err := s.NewEndpoint(tcp.ProtocolNumber, test.addAddress.Protocol, &wq) if err != nil { diff --git a/pkg/tcpip/tests/integration/multicast_broadcast_test.go b/pkg/tcpip/tests/integration/multicast_broadcast_test.go index 77f4a88ec..c6a9c2393 100644 --- a/pkg/tcpip/tests/integration/multicast_broadcast_test.go +++ b/pkg/tcpip/tests/integration/multicast_broadcast_test.go @@ -498,7 +498,7 @@ func TestReuseAddrAndBroadcast(t *testing.T) { for i := 0; i < 2; i++ { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(udp.ProtocolNumber, ipv4.ProtocolNumber, &wq) if err != nil { t.Fatalf("(eps[%d]) NewEndpoint(%d, %d, _): %s", len(eps), udp.ProtocolNumber, ipv4.ProtocolNumber, err) diff --git a/pkg/tcpip/tests/integration/route_test.go b/pkg/tcpip/tests/integration/route_test.go index ed499179f..78244f4eb 100644 --- a/pkg/tcpip/tests/integration/route_test.go +++ b/pkg/tcpip/tests/integration/route_test.go @@ -189,7 +189,7 @@ func TestLocalPing(t *testing.T) { var wq waiter.Queue we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) ep, err := s.NewEndpoint(test.transProto, test.netProto, &wq) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d, _): %s", test.transProto, test.netProto, err) @@ -311,7 +311,7 @@ func TestLocalUDP(t *testing.T) { var serverWQ waiter.Queue serverWE, serverCH := waiter.NewChannelEntry(nil) - serverWQ.EventRegister(&serverWE, waiter.EventIn) + serverWQ.EventRegister(&serverWE, waiter.ReadableEvents) server, err := s.NewEndpoint(udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, &serverWQ) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d): %s", udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, err) @@ -325,7 +325,7 @@ func TestLocalUDP(t *testing.T) { var clientWQ waiter.Queue clientWE, clientCH := waiter.NewChannelEntry(nil) - clientWQ.EventRegister(&clientWE, waiter.EventIn) + clientWQ.EventRegister(&clientWE, waiter.ReadableEvents) client, err := s.NewEndpoint(udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, &clientWQ) if err != nil { t.Fatalf("s.NewEndpoint(%d, %d): %s", udp.ProtocolNumber, test.firstPrimaryAddr.Protocol, err) diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 1dce35c63..50991c3c0 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -149,7 +149,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -588,7 +588,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -725,13 +725,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -804,7 +804,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB e.stats.PacketsReceived.Increment() // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index 367757d3b..52ed9560c 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -152,7 +152,7 @@ func (ep *endpoint) Close() { ep.closed = true ep.bound = false - ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -287,13 +287,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { ep.rcvMu.Lock() if !ep.rcvList.Empty() || ep.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } ep.rcvMu.Unlock() } @@ -483,7 +483,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress, ep.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - ep.waiterQueue.Notify(waiter.EventIn) + ep.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 4b2f08379..e27a249cd 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -177,7 +177,7 @@ func (e *endpoint) Close() { e.closed = true - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -486,13 +486,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -655,7 +655,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) { e.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 0a2f3291c..025b134e2 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -410,7 +410,7 @@ func (e *endpoint) deliverAccepted(n *endpoint, withSynCookie bool) { atomic.AddInt32(&e.synRcvdCount, -1) } e.acceptMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) return default: e.acceptCond.Wait() @@ -462,7 +462,7 @@ func (e *endpoint) reserveTupleLocked() bool { // can't really have any registered waiters except when stack.Wait() is called // which waits for all registered endpoints to stop and expects an EventHUp. func (e *endpoint) notifyAborted() { - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // handleSynSegment is called in its own goroutine once the listening endpoint @@ -771,7 +771,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.drainClosingSegmentQueue() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) }() var s sleep.Sleeper diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index b32fe2fb1..a9e978cf6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1320,7 +1320,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.drainClosingSegmentQueue() // When the protocol loop exits we should wake up our waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } if handshake { @@ -1495,7 +1495,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ } // Tell waiters that the endpoint is connected and writable. - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) // The following assertions and notifications are needed for restored // endpoints. Fresh newly created endpoints have empty states and should @@ -1506,7 +1506,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.rcvListMu.Lock() if !e.rcvList.Empty() { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } e.rcvListMu.Unlock() @@ -1570,7 +1570,7 @@ loop: // wakers. s.Done() // Wake up any waiters before we enter TIME_WAIT. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.workerCleanup = true reuseTW = e.doTimeWait() } diff --git a/pkg/tcpip/transport/tcp/dual_stack_test.go b/pkg/tcpip/transport/tcp/dual_stack_test.go index 2d90246e4..f6a16f96e 100644 --- a/pkg/tcpip/transport/tcp/dual_stack_test.go +++ b/pkg/tcpip/transport/tcp/dual_stack_test.go @@ -45,7 +45,7 @@ func TestV4MappedConnectOnV6Only(t *testing.T) { func testV4Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) { // Start connection attempt. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV4MappedAddr, Port: context.TestPort}) @@ -152,7 +152,7 @@ func TestV4ConnectWhenBoundToV4Mapped(t *testing.T) { func testV6Connect(t *testing.T, c *context.Context, checkers ...checker.NetworkChecker) { // Start connection attempt to IPv6 address. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestV6Addr, Port: context.TestPort}) @@ -387,7 +387,7 @@ func testV4Accept(t *testing.T, c *context.Context) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) nep, _, err := c.EP.Accept(nil) @@ -521,7 +521,7 @@ func TestV6AcceptOnV6(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) var addr tcpip.FullAddress _, _, err := c.EP.Accept(&addr) @@ -610,7 +610,7 @@ func testV4ListenClose(t *testing.T, c *context.Context) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) nep, _, err := c.EP.Accept(nil) if _, ok := err.(*tcpip.ErrWouldBlock); ok { diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 0a5e9cbb4..c5daba232 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -960,30 +960,30 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { case StateListen: // Check if there's anything in the accepted channel. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() if len(e.acceptedChan) > 0 { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.acceptMu.Unlock() } } if e.EndpointState().connected() { // Determine if the endpoint is writable if requested. - if (mask & waiter.EventOut) != 0 { + if (mask & waiter.WritableEvents) != 0 { e.sndBufMu.Lock() sndBufSize := e.getSendBufferSize() if e.sndClosed || e.sndBufUsed < sndBufSize { - result |= waiter.EventOut + result |= waiter.WritableEvents } e.sndBufMu.Unlock() } // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvListMu.Lock() if e.rcvBufUsed > 0 || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvListMu.Unlock() } @@ -1121,7 +1121,7 @@ func (e *endpoint) closeNoShutdownLocked() { return } - eventMask := waiter.EventIn | waiter.EventOut + eventMask := waiter.ReadableEvents | waiter.WritableEvents // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. if e.workerRunning { @@ -2133,7 +2133,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error { if err != nil { if !err.IgnoreStats() { // Connect failed. Let's wake up any waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.stack.Stats().TCP.FailedConnectionAttempts.Increment() e.stats.FailedConnectionAttempts.Increment() } @@ -2463,7 +2463,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvListMu.Unlock() e.closePendingAcceptableConnectionsLocked() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) } return nil default: @@ -2811,7 +2811,7 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndBufMu.Unlock() if notify { - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -2828,7 +2828,7 @@ func (e *endpoint) readyToRead(s *segment) { e.rcvClosed = true } e.rcvListMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } // receiveBufferAvailableLocked calculates how many bytes are still available diff --git a/pkg/tcpip/transport/tcp/tcp_test.go b/pkg/tcpip/transport/tcp/tcp_test.go index 6c86ae1ae..9c23469f2 100644 --- a/pkg/tcpip/transport/tcp/tcp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_test.go @@ -388,7 +388,7 @@ func TestTCPResetSentForACKWhenNotUsingSynCookies(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -809,7 +809,7 @@ func TestSimpleReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1315,7 +1315,7 @@ func TestListenCloseWhileConnect(t *testing.T) { } waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventIn) + c.WQ.EventRegister(&waitEntry, waiter.ReadableEvents) defer c.WQ.EventUnregister(&waitEntry) executeHandshake(t, c, context.TestPort, false /* synCookiesInUse */) @@ -1455,7 +1455,7 @@ func TestConnectBindToDevice(t *testing.T) { } // Start connection attempt. waitEntry, _ := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) err := c.EP.Connect(tcpip.FullAddress{Addr: context.TestAddr, Port: context.TestPort}) @@ -1590,7 +1590,7 @@ func TestOutOfOrderReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1732,7 +1732,7 @@ func TestRstOnCloseWithUnreadData(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1801,7 +1801,7 @@ func TestRstOnCloseWithUnreadDataFinConvertRst(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -1909,7 +1909,7 @@ func TestFullWindowReceive(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, rcvBufSz) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -2069,7 +2069,7 @@ func TestNoWindowShrinking(t *testing.T) { }) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -2391,7 +2391,7 @@ func TestScaledWindowAccept(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -2465,7 +2465,7 @@ func TestNonScaledWindowAccept(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3059,7 +3059,7 @@ func TestPassiveSendMSSLessThanMTU(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3115,7 +3115,7 @@ func TestSynCookiePassiveSendMSSLessThanMTU(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -3191,7 +3191,7 @@ func TestSynOptionsOnActiveConnect(t *testing.T) { // Start connection attempt. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventOut) + c.WQ.EventRegister(&we, waiter.WritableEvents) defer c.WQ.EventUnregister(&we) { @@ -3304,7 +3304,7 @@ func TestReceiveOnResetConnection(t *testing.T) { // Try to read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) loop: @@ -4232,7 +4232,7 @@ func TestReadAfterClosedState(t *testing.T) { c.CreateConnected(context.TestInitialSequenceNumber, 30000, -1 /* epRcvBuf */) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) ept := endpointTester{c.EP} @@ -4660,7 +4660,7 @@ func TestSelfConnect(t *testing.T) { // Register for notification, then start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) { @@ -4685,7 +4685,7 @@ func TestSelfConnect(t *testing.T) { // Read back what was written. wq.EventUnregister(&waitEntry) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) ept := endpointTester{ep} rd := ept.CheckReadFull(t, len(data), notifyCh, 5*time.Second) @@ -5382,7 +5382,7 @@ func TestListenBacklogFull(t *testing.T) { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) for i := 0; i < listenBacklog; i++ { @@ -5730,7 +5730,7 @@ func TestListenSynRcvdQueueFull(t *testing.T) { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) newEP, _, err := c.EP.Accept(nil) @@ -5807,7 +5807,7 @@ func TestListenBacklogFullSynCookieInUse(t *testing.T) { // Verify that there is only one acceptable connection at this point. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) _, _, err = c.EP.Accept(nil) @@ -5969,7 +5969,7 @@ func TestSynRcvdBadSeqNumber(t *testing.T) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Try to accept the connections in the backlog. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Wait for connection to be established. @@ -6029,7 +6029,7 @@ func TestPassiveConnectionAttemptIncrement(t *testing.T) { executeHandshake(t, c, srcPort+1, false) we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Verify that there is only one acceptable connection at this point. @@ -6099,7 +6099,7 @@ func TestPassiveFailedConnectionAttemptIncrement(t *testing.T) { } we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // Now check that there is one acceptable connections. @@ -6152,7 +6152,7 @@ func TestEndpointBindListenAcceptState(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) aep, _, err := ep.Accept(nil) @@ -6614,7 +6614,7 @@ func TestTCPTimeWaitRSTIgnored(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -6733,7 +6733,7 @@ func TestTCPTimeWaitOutOfOrder(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -6840,7 +6840,7 @@ func TestTCPTimeWaitNewSyn(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -7004,7 +7004,7 @@ func TestTCPTimeWaitDuplicateFINExtendsTimeWait(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) @@ -7154,7 +7154,7 @@ func TestTCPCloseWithData(t *testing.T) { // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) diff --git a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go index cb4f82903..2949588ce 100644 --- a/pkg/tcpip/transport/tcp/tcp_timestamp_test.go +++ b/pkg/tcpip/transport/tcp/tcp_timestamp_test.go @@ -46,7 +46,7 @@ func TestTimeStampEnabledConnect(t *testing.T) { // Register for read and validate that we have data to read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) // The following tests ensure that TS option once enabled behaves @@ -273,7 +273,7 @@ func TestSegmentNotDroppedWhenTimestampMissing(t *testing.T) { // Register for read. we, ch := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&we, waiter.EventIn) + c.WQ.EventRegister(&we, waiter.ReadableEvents) defer c.WQ.EventUnregister(&we) droppedPacketsStat := c.Stack().Stats().DroppedPackets diff --git a/pkg/tcpip/transport/tcp/testing/context/context.go b/pkg/tcpip/transport/tcp/testing/context/context.go index 2f1c1011d..e73f90bb0 100644 --- a/pkg/tcpip/transport/tcp/testing/context/context.go +++ b/pkg/tcpip/transport/tcp/testing/context/context.go @@ -686,7 +686,7 @@ func (c *Context) Connect(iss seqnum.Value, rcvWnd seqnum.Size, options []byte) // Start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) err := c.EP.Connect(tcpip.FullAddress{Addr: TestAddr, Port: TestPort}) @@ -899,7 +899,7 @@ func (c *Context) CreateConnectedWithOptions(wantOptions header.TCPSynOptions) * // Start connection attempt. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.WQ.EventRegister(&waitEntry, waiter.EventOut) + c.WQ.EventRegister(&waitEntry, waiter.WritableEvents) defer c.WQ.EventUnregister(&waitEntry) testFullAddr := tcpip.FullAddress{Addr: TestAddr, Port: TestPort} @@ -1051,7 +1051,7 @@ func (c *Context) AcceptWithOptions(wndScale int, synOptions header.TCPSynOption // Try to accept the connection. we, ch := waiter.NewChannelEntry(nil) - wq.EventRegister(&we, waiter.EventIn) + wq.EventRegister(&we, waiter.ReadableEvents) defer wq.EventUnregister(&we) c.EP, _, err = ep.Accept(nil) diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0f59181bb..956da0e0c 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -284,7 +284,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -1070,7 +1070,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -1234,13 +1234,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if mask&waiter.ReadableEvents != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -1349,7 +1349,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go index c8126b51b..77ca70a04 100644 --- a/pkg/tcpip/transport/udp/udp_test.go +++ b/pkg/tcpip/transport/udp/udp_test.go @@ -591,7 +591,7 @@ func testReadInternal(c *testContext, flow testFlow, packetShouldBeDropped, expe // Try to receive the data. we, ch := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&we, waiter.EventIn) + c.wq.EventRegister(&we, waiter.ReadableEvents) defer c.wq.EventUnregister(&we) // Take a snapshot of the stats to validate them at the end of the test. diff --git a/pkg/waiter/waiter.go b/pkg/waiter/waiter.go index 83d4f893a..4ea067cb7 100644 --- a/pkg/waiter/waiter.go +++ b/pkg/waiter/waiter.go @@ -67,13 +67,17 @@ type EventMask uint64 // Events that waiters can wait on. The meaning is the same as those in the // poll() syscall. const ( - EventIn EventMask = 0x01 // POLLIN - EventPri EventMask = 0x02 // POLLPRI - EventOut EventMask = 0x04 // POLLOUT - EventErr EventMask = 0x08 // POLLERR - EventHUp EventMask = 0x10 // POLLHUP - - allEvents EventMask = 0x1f + EventIn EventMask = 0x01 // POLLIN + EventPri EventMask = 0x02 // POLLPRI + EventOut EventMask = 0x04 // POLLOUT + EventErr EventMask = 0x08 // POLLERR + EventHUp EventMask = 0x10 // POLLHUP + EventRdNorm EventMask = 0x0040 // POLLRDNORM + EventWrNorm EventMask = 0x0100 // POLLWRNORM + + allEvents EventMask = 0x1f | EventRdNorm | EventWrNorm + ReadableEvents EventMask = EventIn | EventRdNorm + WritableEvents EventMask = EventOut | EventWrNorm ) // EventMaskFromLinux returns an EventMask representing the supported events diff --git a/test/syscalls/linux/poll.cc b/test/syscalls/linux/poll.cc index 7a316427d..6f9a9498c 100644 --- a/test/syscalls/linux/poll.cc +++ b/test/syscalls/linux/poll.cc @@ -64,7 +64,7 @@ TEST_F(PollTest, NegativeTimeout_NoRandomSave) { EXPECT_TRUE(TimerFired()); } -TEST_F(PollTest, NonBlockingEventPOLLIN) { +void NonBlockingReadableTest(int16_t mask) { // Create a pipe. int fds[2]; ASSERT_THAT(pipe(fds), SyscallSucceeds()); @@ -77,14 +77,24 @@ TEST_F(PollTest, NonBlockingEventPOLLIN) { ASSERT_THAT(WriteFd(fd1.get(), s, strlen(s) + 1), SyscallSucceeds()); // Poll on the reader fd with POLLIN event. - struct pollfd poll_fd = {fd0.get(), POLLIN, 0}; + struct pollfd poll_fd = {fd0.get(), mask, 0}; EXPECT_THAT(RetryEINTR(poll)(&poll_fd, 1, 0), SyscallSucceedsWithValue(1)); // Should trigger POLLIN event. - EXPECT_EQ(poll_fd.revents & POLLIN, POLLIN); + EXPECT_EQ(poll_fd.revents & mask, mask); } -TEST_F(PollTest, BlockingEventPOLLIN) { +TEST_F(PollTest, NonBlockingEventPOLLIN) { NonBlockingReadableTest(POLLIN); } + +TEST_F(PollTest, NonBlockingEventPOLLRDNORM) { + NonBlockingReadableTest(POLLRDNORM); +} + +TEST_F(PollTest, NonBlockingEventPOLLRDNORM_POLLIN) { + NonBlockingReadableTest(POLLRDNORM | POLLIN); +} + +void BlockingReadableTest(int16_t mask) { // Create a pipe. int fds[2]; ASSERT_THAT(pipe(fds), SyscallSucceeds()); @@ -94,15 +104,15 @@ TEST_F(PollTest, BlockingEventPOLLIN) { // Start a blocking poll on the read fd. absl::Notification notify; - ScopedThread t([&fd0, ¬ify]() { + ScopedThread t([&fd0, ¬ify, &mask]() { notify.Notify(); - // Poll on the reader fd with POLLIN event. - struct pollfd poll_fd = {fd0.get(), POLLIN, 0}; + // Poll on the reader fd with readable event. + struct pollfd poll_fd = {fd0.get(), mask, 0}; EXPECT_THAT(RetryEINTR(poll)(&poll_fd, 1, -1), SyscallSucceedsWithValue(1)); - // Should trigger POLLIN event. - EXPECT_EQ(poll_fd.revents & POLLIN, POLLIN); + // Should trigger readable event. + EXPECT_EQ(poll_fd.revents & mask, mask); }); notify.WaitForNotification(); @@ -113,6 +123,57 @@ TEST_F(PollTest, BlockingEventPOLLIN) { ASSERT_THAT(WriteFd(fd1.get(), s, strlen(s) + 1), SyscallSucceeds()); } +TEST_F(PollTest, BlockingEventPOLLIN) { BlockingReadableTest(POLLIN); } + +TEST_F(PollTest, BlockingEventPOLLRDNORM) { BlockingReadableTest(POLLRDNORM); } + +TEST_F(PollTest, BlockingEventPOLLRDNORM_POLLIN) { + BlockingReadableTest(POLLRDNORM | POLLIN); +} + +void WritableTest(int16_t mask, int timeout) { + // Create a pipe. + int fds[2]; + ASSERT_THAT(pipe(fds), SyscallSucceeds()); + + FileDescriptor fd0(fds[0]); + FileDescriptor fd1(fds[1]); + + // In a newly created pipe 2nd fd should be writable. + + // Poll on second fd for a writable event. + struct pollfd poll_fd = {fd1.get(), mask, 0}; + EXPECT_THAT(RetryEINTR(poll)(&poll_fd, 1, timeout), + SyscallSucceedsWithValue(1)); + + // Should trigger a writable event. + EXPECT_EQ(poll_fd.revents & mask, mask); +} + +TEST_F(PollTest, NonBlockingEventPOLLOUT) { + WritableTest(POLLOUT, /*timeout=*/0); +} + +TEST_F(PollTest, NonBlockingEventPOLLWRNORM) { + WritableTest(POLLWRNORM, /*timeout=*/0); +} + +TEST_F(PollTest, NonBlockingEventPOLLWRNORM_POLLOUT) { + WritableTest(POLLWRNORM | POLLOUT, /*timeout=*/0); +} + +TEST_F(PollTest, BlockingEventPOLLOUT) { + WritableTest(POLLOUT, /*timeout=*/-1); +} + +TEST_F(PollTest, BlockingEventPOLLWRNORM) { + WritableTest(POLLWRNORM, /*timeout=*/-1); +} + +TEST_F(PollTest, BlockingEventPOLLWRNORM_POLLOUT) { + WritableTest(POLLWRNORM | POLLOUT, /*timeout=*/-1); +} + TEST_F(PollTest, NonBlockingEventPOLLHUP) { // Create a pipe. int fds[2]; diff --git a/test/syscalls/linux/tcp_socket.cc b/test/syscalls/linux/tcp_socket.cc index f56c50e61..7341cf1a6 100644 --- a/test/syscalls/linux/tcp_socket.cc +++ b/test/syscalls/linux/tcp_socket.cc @@ -1204,13 +1204,12 @@ TEST_P(SimpleTcpSocketTest, SelfConnectSend_NoRandomSave) { EXPECT_THAT(shutdown(s.get(), SHUT_WR), SyscallSucceedsWithValue(0)); } -TEST_P(SimpleTcpSocketTest, NonBlockingConnect) { +void NonBlockingConnect(int family, int16_t pollMask) { const FileDescriptor listener = - ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP)); + ASSERT_NO_ERRNO_AND_VALUE(Socket(family, SOCK_STREAM, IPPROTO_TCP)); // Initialize address to the loopback one. - sockaddr_storage addr = - ASSERT_NO_ERRNO_AND_VALUE(InetLoopbackAddr(GetParam())); + sockaddr_storage addr = ASSERT_NO_ERRNO_AND_VALUE(InetLoopbackAddr(family)); socklen_t addrlen = sizeof(addr); // Bind to some port then start listening. @@ -1221,7 +1220,7 @@ TEST_P(SimpleTcpSocketTest, NonBlockingConnect) { ASSERT_THAT(listen(listener.get(), SOMAXCONN), SyscallSucceeds()); FileDescriptor s = - ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP)); + ASSERT_NO_ERRNO_AND_VALUE(Socket(family, SOCK_STREAM, IPPROTO_TCP)); // Set the FD to O_NONBLOCK. int opts; @@ -1241,9 +1240,7 @@ TEST_P(SimpleTcpSocketTest, NonBlockingConnect) { ASSERT_THAT(t = RetryEINTR(accept)(listener.get(), nullptr, nullptr), SyscallSucceeds()); - // Now polling on the FD with a timeout should return 0 corresponding to no - // FDs ready. - struct pollfd poll_fd = {s.get(), POLLOUT, 0}; + struct pollfd poll_fd = {s.get(), pollMask, 0}; EXPECT_THAT(RetryEINTR(poll)(&poll_fd, 1, 10000), SyscallSucceedsWithValue(1)); @@ -1257,6 +1254,18 @@ TEST_P(SimpleTcpSocketTest, NonBlockingConnect) { EXPECT_THAT(close(t), SyscallSucceeds()); } +TEST_P(SimpleTcpSocketTest, NonBlockingConnect_PollOut) { + NonBlockingConnect(GetParam(), POLLOUT); +} + +TEST_P(SimpleTcpSocketTest, NonBlockingConnect_PollWrNorm) { + NonBlockingConnect(GetParam(), POLLWRNORM); +} + +TEST_P(SimpleTcpSocketTest, NonBlockingConnect_PollWrNorm_PollOut) { + NonBlockingConnect(GetParam(), POLLWRNORM | POLLOUT); +} + TEST_P(SimpleTcpSocketTest, NonBlockingConnectRemoteClose) { const FileDescriptor listener = ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP)); |