diff options
49 files changed, 215 insertions, 187 deletions
diff --git a/pkg/sentry/fs/host/socket.go b/pkg/sentry/fs/host/socket.go index f2d96d1ec..0b3d0617f 100644 --- a/pkg/sentry/fs/host/socket.go +++ b/pkg/sentry/fs/host/socket.go @@ -248,7 +248,7 @@ func (c *ConnectedEndpoint) Writable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.EventOut)&waiter.EventOut != 0 + return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.WritableEvents)&waiter.WritableEvents != 0 } // Passcred implements transport.ConnectedEndpoint.Passcred. @@ -345,7 +345,7 @@ func (c *ConnectedEndpoint) Readable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.EventIn)&waiter.EventIn != 0 + return fdnotifier.NonBlockingPoll(int32(c.file.FD()), waiter.ReadableEvents)&waiter.ReadableEvents != 0 } // SendQueuedSize implements transport.Receiver.SendQueuedSize. diff --git a/pkg/sentry/fs/inotify.go b/pkg/sentry/fs/inotify.go index c5c07d564..fb81d903d 100644 --- a/pkg/sentry/fs/inotify.go +++ b/pkg/sentry/fs/inotify.go @@ -107,7 +107,7 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { defer i.evMu.Unlock() if !i.events.Empty() { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return mask & ready @@ -246,7 +246,7 @@ func (i *Inotify) queueEvent(ev *Event) { // can do. i.evMu.Unlock() - i.Queue.Notify(waiter.EventIn) + i.Queue.Notify(waiter.ReadableEvents) } // newWatchLocked creates and adds a new watch to target. diff --git a/pkg/sentry/fs/timerfd/timerfd.go b/pkg/sentry/fs/timerfd/timerfd.go index f362ca9b6..46511a6ac 100644 --- a/pkg/sentry/fs/timerfd/timerfd.go +++ b/pkg/sentry/fs/timerfd/timerfd.go @@ -101,7 +101,7 @@ func (t *TimerOperations) SetTime(s ktime.Setting) (ktime.Time, ktime.Setting) { func (t *TimerOperations) Readiness(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask if atomic.LoadUint64(&t.val) != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready } @@ -143,7 +143,7 @@ func (t *TimerOperations) Write(context.Context, *fs.File, usermem.IOSequence, i // Notify implements ktime.TimerListener.Notify. func (t *TimerOperations) Notify(exp uint64, setting ktime.Setting) (ktime.Setting, bool) { atomic.AddUint64(&t.val, exp) - t.events.Notify(waiter.EventIn) + t.events.Notify(waiter.ReadableEvents) return ktime.Setting{}, false } diff --git a/pkg/sentry/fs/tty/line_discipline.go b/pkg/sentry/fs/tty/line_discipline.go index b34f4a0eb..3ba02c218 100644 --- a/pkg/sentry/fs/tty/line_discipline.go +++ b/pkg/sentry/fs/tty/line_discipline.go @@ -143,7 +143,7 @@ func (l *lineDiscipline) setTermios(task *kernel.Task, args arch.SyscallArgument l.inQueue.pushWaitBufLocked(l) l.inQueue.readable = true l.inQueue.mu.Unlock() - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return 0, err @@ -187,9 +187,9 @@ func (l *lineDiscipline) inputQueueRead(ctx context.Context, dst usermem.IOSeque return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventOut) + l.masterWaiter.Notify(waiter.WritableEvents) if pushed { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -204,7 +204,7 @@ func (l *lineDiscipline) inputQueueWrite(ctx context.Context, src usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -222,9 +222,9 @@ func (l *lineDiscipline) outputQueueRead(ctx context.Context, dst usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventOut) + l.replicaWaiter.Notify(waiter.WritableEvents) if pushed { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -239,7 +239,7 @@ func (l *lineDiscipline) outputQueueWrite(ctx context.Context, src usermem.IOSeq return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -399,7 +399,7 @@ func (*inputQueueTransformer) transform(l *lineDiscipline, q *queue, buf []byte) // Anything written to the readBuf will have to be echoed. if l.termios.LEnabled(linux.ECHO) { l.outQueue.writeBytes(cBytes, l) - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } // If we finish a line, make it available for reading. diff --git a/pkg/sentry/fs/tty/queue.go b/pkg/sentry/fs/tty/queue.go index 79975d812..11d6c15d0 100644 --- a/pkg/sentry/fs/tty/queue.go +++ b/pkg/sentry/fs/tty/queue.go @@ -71,7 +71,7 @@ func (q *queue) readReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if len(q.readBuf) > 0 && q.readable { - return waiter.EventIn + return waiter.ReadableEvents } return waiter.EventMask(0) } @@ -81,7 +81,7 @@ func (q *queue) writeReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if q.waitBufLen < waitBufMaxBytes { - return waiter.EventOut + return waiter.WritableEvents } return waiter.EventMask(0) } diff --git a/pkg/sentry/fsimpl/devpts/line_discipline.go b/pkg/sentry/fsimpl/devpts/line_discipline.go index ae95fdd08..e94a5bac3 100644 --- a/pkg/sentry/fsimpl/devpts/line_discipline.go +++ b/pkg/sentry/fsimpl/devpts/line_discipline.go @@ -141,7 +141,7 @@ func (l *lineDiscipline) setTermios(task *kernel.Task, args arch.SyscallArgument l.inQueue.pushWaitBufLocked(l) l.inQueue.readable = true l.inQueue.mu.Unlock() - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return 0, err @@ -185,9 +185,9 @@ func (l *lineDiscipline) inputQueueRead(ctx context.Context, dst usermem.IOSeque return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventOut) + l.masterWaiter.Notify(waiter.WritableEvents) if pushed { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -202,7 +202,7 @@ func (l *lineDiscipline) inputQueueWrite(ctx context.Context, src usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventIn) + l.replicaWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -220,9 +220,9 @@ func (l *lineDiscipline) outputQueueRead(ctx context.Context, dst usermem.IOSequ return 0, err } if n > 0 { - l.replicaWaiter.Notify(waiter.EventOut) + l.replicaWaiter.Notify(waiter.WritableEvents) if pushed { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } return n, nil } @@ -237,7 +237,7 @@ func (l *lineDiscipline) outputQueueWrite(ctx context.Context, src usermem.IOSeq return 0, err } if n > 0 { - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) return n, nil } return 0, syserror.ErrWouldBlock @@ -397,7 +397,7 @@ func (*inputQueueTransformer) transform(l *lineDiscipline, q *queue, buf []byte) // Anything written to the readBuf will have to be echoed. if l.termios.LEnabled(linux.ECHO) { l.outQueue.writeBytes(cBytes, l) - l.masterWaiter.Notify(waiter.EventIn) + l.masterWaiter.Notify(waiter.ReadableEvents) } // If we finish a line, make it available for reading. diff --git a/pkg/sentry/fsimpl/devpts/queue.go b/pkg/sentry/fsimpl/devpts/queue.go index 55bff3e60..47b0f1599 100644 --- a/pkg/sentry/fsimpl/devpts/queue.go +++ b/pkg/sentry/fsimpl/devpts/queue.go @@ -69,7 +69,7 @@ func (q *queue) readReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if len(q.readBuf) > 0 && q.readable { - return waiter.EventIn + return waiter.ReadableEvents } return waiter.EventMask(0) } @@ -79,7 +79,7 @@ func (q *queue) writeReadiness(t *linux.KernelTermios) waiter.EventMask { q.mu.Lock() defer q.mu.Unlock() if q.waitBufLen < waitBufMaxBytes { - return waiter.EventOut + return waiter.WritableEvents } return waiter.EventMask(0) } diff --git a/pkg/sentry/fsimpl/eventfd/eventfd.go b/pkg/sentry/fsimpl/eventfd/eventfd.go index 7f810f720..30bd05357 100644 --- a/pkg/sentry/fsimpl/eventfd/eventfd.go +++ b/pkg/sentry/fsimpl/eventfd/eventfd.go @@ -185,7 +185,7 @@ func (efd *EventFileDescription) read(ctx context.Context, dst usermem.IOSequenc // Notify writers. We do this even if we were already writable because // it is possible that a writer is waiting to write the maximum value // to the event. - efd.queue.Notify(waiter.EventOut) + efd.queue.Notify(waiter.WritableEvents) var buf [8]byte usermem.ByteOrder.PutUint64(buf[:], val) @@ -238,7 +238,7 @@ func (efd *EventFileDescription) Signal(val uint64) error { efd.mu.Unlock() // Always trigger a notification. - efd.queue.Notify(waiter.EventIn) + efd.queue.Notify(waiter.ReadableEvents) return nil } @@ -254,11 +254,11 @@ func (efd *EventFileDescription) Readiness(mask waiter.EventMask) waiter.EventMa ready := waiter.EventMask(0) if efd.val > 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if efd.val < math.MaxUint64-1 { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } return mask & ready diff --git a/pkg/sentry/fsimpl/fuse/connection.go b/pkg/sentry/fsimpl/fuse/connection.go index 34d25a61e..077bf9307 100644 --- a/pkg/sentry/fsimpl/fuse/connection.go +++ b/pkg/sentry/fsimpl/fuse/connection.go @@ -316,7 +316,7 @@ func (conn *connection) callFutureLocked(t *kernel.Task, r *Request) (*futureRes conn.fd.completions[r.id] = fut // Signal the readers that there is something to read. - conn.fd.waitQueue.Notify(waiter.EventIn) + conn.fd.waitQueue.Notify(waiter.ReadableEvents) return fut, nil } diff --git a/pkg/sentry/fsimpl/fuse/dev.go b/pkg/sentry/fsimpl/fuse/dev.go index 1eeb95216..5d2bae14e 100644 --- a/pkg/sentry/fsimpl/fuse/dev.go +++ b/pkg/sentry/fsimpl/fuse/dev.go @@ -368,10 +368,10 @@ func (fd *DeviceFD) readinessLocked(mask waiter.EventMask) waiter.EventMask { } // FD is always writable. - ready |= waiter.EventOut + ready |= waiter.WritableEvents if !fd.queue.Empty() { // Have reqs available, FD is readable. - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready & mask diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index fef857afb..167c899e2 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -286,7 +286,7 @@ func (fs *filesystem) Release(ctx context.Context) { fs.umounted = true fs.conn.Abort(ctx) // Notify all the waiters on this fd. - fs.conn.fd.waitQueue.Notify(waiter.EventIn) + fs.conn.fd.waitQueue.Notify(waiter.ReadableEvents) fs.conn.fd.mu.Unlock() diff --git a/pkg/sentry/fsimpl/host/socket.go b/pkg/sentry/fsimpl/host/socket.go index 056f910aa..60e237ac7 100644 --- a/pkg/sentry/fsimpl/host/socket.go +++ b/pkg/sentry/fsimpl/host/socket.go @@ -192,7 +192,7 @@ func (c *ConnectedEndpoint) Writable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventOut)&waiter.EventOut != 0 + return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.WritableEvents)&waiter.WritableEvents != 0 } // Passcred implements transport.ConnectedEndpoint.Passcred. @@ -282,7 +282,7 @@ func (c *ConnectedEndpoint) Readable() bool { c.mu.RLock() defer c.mu.RUnlock() - return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.EventIn)&waiter.EventIn != 0 + return fdnotifier.NonBlockingPoll(int32(c.fd), waiter.ReadableEvents)&waiter.ReadableEvents != 0 } // SendQueuedSize implements transport.Receiver.SendQueuedSize. diff --git a/pkg/sentry/fsimpl/signalfd/signalfd.go b/pkg/sentry/fsimpl/signalfd/signalfd.go index 246bd87bc..a7f5928b7 100644 --- a/pkg/sentry/fsimpl/signalfd/signalfd.go +++ b/pkg/sentry/fsimpl/signalfd/signalfd.go @@ -117,8 +117,8 @@ func (sfd *SignalFileDescription) Read(ctx context.Context, dst usermem.IOSequen func (sfd *SignalFileDescription) Readiness(mask waiter.EventMask) waiter.EventMask { sfd.mu.Lock() defer sfd.mu.Unlock() - if mask&waiter.EventIn != 0 && sfd.target.PendingSignals()&sfd.mask != 0 { - return waiter.EventIn // Pending signals. + if mask&waiter.ReadableEvents != 0 && sfd.target.PendingSignals()&sfd.mask != 0 { + return waiter.ReadableEvents // Pending signals. } return 0 } diff --git a/pkg/sentry/fsimpl/timerfd/timerfd.go b/pkg/sentry/fsimpl/timerfd/timerfd.go index 8853c8ad2..64d33c3a8 100644 --- a/pkg/sentry/fsimpl/timerfd/timerfd.go +++ b/pkg/sentry/fsimpl/timerfd/timerfd.go @@ -105,7 +105,7 @@ func (tfd *TimerFileDescription) SetTime(s ktime.Setting) (ktime.Time, ktime.Set func (tfd *TimerFileDescription) Readiness(mask waiter.EventMask) waiter.EventMask { var ready waiter.EventMask if atomic.LoadUint64(&tfd.val) != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return ready } @@ -138,7 +138,7 @@ func (tfd *TimerFileDescription) Release(context.Context) { // Notify implements ktime.TimerListener.Notify. func (tfd *TimerFileDescription) Notify(exp uint64, setting ktime.Setting) (ktime.Setting, bool) { atomic.AddUint64(&tfd.val, exp) - tfd.events.Notify(waiter.EventIn) + tfd.events.Notify(waiter.ReadableEvents) return ktime.Setting{}, false } diff --git a/pkg/sentry/kernel/epoll/epoll.go b/pkg/sentry/kernel/epoll/epoll.go index ba73a7812..6006c46a9 100644 --- a/pkg/sentry/kernel/epoll/epoll.go +++ b/pkg/sentry/kernel/epoll/epoll.go @@ -213,8 +213,8 @@ func (e *EventPoll) eventsAvailable() bool { func (e *EventPoll) Readiness(mask waiter.EventMask) waiter.EventMask { ready := waiter.EventMask(0) - if (mask&waiter.EventIn) != 0 && e.eventsAvailable() { - ready |= waiter.EventIn + if (mask&waiter.ReadableEvents) != 0 && e.eventsAvailable() { + ready |= waiter.ReadableEvents } return ready @@ -290,7 +290,7 @@ func (p *pollEntry) Callback(*waiter.Entry, waiter.EventMask) { p.curList = &e.readyList e.listsMu.Unlock() - e.Notify(waiter.EventIn) + e.Notify(waiter.ReadableEvents) return } diff --git a/pkg/sentry/kernel/epoll/epoll_state.go b/pkg/sentry/kernel/epoll/epoll_state.go index 7c61e0258..e08d6287f 100644 --- a/pkg/sentry/kernel/epoll/epoll_state.go +++ b/pkg/sentry/kernel/epoll/epoll_state.go @@ -45,7 +45,7 @@ func (e *EventPoll) afterLoad() { e.waitingList.Remove(entry) e.readyList.PushBack(entry) entry.curList = &e.readyList - e.Notify(waiter.EventIn) + e.Notify(waiter.ReadableEvents) } } } diff --git a/pkg/sentry/kernel/eventfd/eventfd.go b/pkg/sentry/kernel/eventfd/eventfd.go index 64f1cc631..2aca02fd5 100644 --- a/pkg/sentry/kernel/eventfd/eventfd.go +++ b/pkg/sentry/kernel/eventfd/eventfd.go @@ -183,7 +183,7 @@ func (e *EventOperations) read(ctx context.Context, dst usermem.IOSequence) erro // Notify writers. We do this even if we were already writable because // it is possible that a writer is waiting to write the maximum value // to the event. - e.wq.Notify(waiter.EventOut) + e.wq.Notify(waiter.WritableEvents) var buf [8]byte usermem.ByteOrder.PutUint64(buf[:], val) @@ -236,7 +236,7 @@ func (e *EventOperations) Signal(val uint64) error { e.mu.Unlock() // Always trigger a notification. - e.wq.Notify(waiter.EventIn) + e.wq.Notify(waiter.ReadableEvents) return nil } @@ -251,11 +251,11 @@ func (e *EventOperations) Readiness(mask waiter.EventMask) waiter.EventMask { ready := waiter.EventMask(0) if e.val > 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if e.val < math.MaxUint64-1 { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } e.mu.Unlock() diff --git a/pkg/sentry/kernel/fasync/fasync.go b/pkg/sentry/kernel/fasync/fasync.go index b66d61c6f..dbbbaeeb0 100644 --- a/pkg/sentry/kernel/fasync/fasync.go +++ b/pkg/sentry/kernel/fasync/fasync.go @@ -162,7 +162,7 @@ func (a *FileAsync) Register(w waiter.Waitable) { a.registered = true a.mu.Unlock() - w.EventRegister(&a.e, waiter.EventIn|waiter.EventOut|waiter.EventErr|waiter.EventHUp) + w.EventRegister(&a.e, waiter.ReadableEvents|waiter.WritableEvents|waiter.EventErr|waiter.EventHUp) } // Unregister stops monitoring a file. diff --git a/pkg/sentry/kernel/pipe/pipe.go b/pkg/sentry/kernel/pipe/pipe.go index 68a55a186..d004f2357 100644 --- a/pkg/sentry/kernel/pipe/pipe.go +++ b/pkg/sentry/kernel/pipe/pipe.go @@ -183,7 +183,7 @@ func (p *Pipe) Open(ctx context.Context, d *fs.Dirent, flags fs.FileFlags) *fs.F // // peekLocked does not mutate the pipe; if the read consumes bytes from the // pipe, then the caller is responsible for calling p.consumeLocked() and -// p.Notify(waiter.EventOut). (The latter must be called with p.mu unlocked.) +// p.Notify(waiter.WritableEvents). (The latter must be called with p.mu unlocked.) // // Preconditions: // * p.mu must be locked. @@ -237,7 +237,7 @@ func (p *Pipe) consumeLocked(n int64) { // Unlike peekLocked, writeLocked assumes that f returns the number of bytes // written to the pipe, and increases the number of bytes stored in the pipe // accordingly. Callers are still responsible for calling -// p.Notify(waiter.EventIn) with p.mu unlocked. +// p.Notify(waiter.ReadableEvents) with p.mu unlocked. // // Preconditions: // * p.mu must be locked. @@ -357,7 +357,7 @@ func (p *Pipe) HasWriters() bool { func (p *Pipe) rReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasReaders() && p.size != 0 { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } if !p.HasWriters() && p.hadWriter { // POLLHUP must be suppressed until the pipe has had at least one writer @@ -383,7 +383,7 @@ func (p *Pipe) rReadiness() waiter.EventMask { func (p *Pipe) wReadinessLocked() waiter.EventMask { ready := waiter.EventMask(0) if p.HasWriters() && p.size < p.max { - ready |= waiter.EventOut + ready |= waiter.WritableEvents } if !p.HasReaders() { ready |= waiter.EventErr diff --git a/pkg/sentry/kernel/pipe/pipe_util.go b/pkg/sentry/kernel/pipe/pipe_util.go index 76ea389ca..2d89b9ccd 100644 --- a/pkg/sentry/kernel/pipe/pipe_util.go +++ b/pkg/sentry/kernel/pipe/pipe_util.go @@ -39,14 +39,14 @@ func (p *Pipe) Release(context.Context) { p.wClose() // Wake up readers and writers. - p.Notify(waiter.EventIn | waiter.EventOut) + p.Notify(waiter.ReadableEvents | waiter.WritableEvents) } // Read reads from the Pipe into dst. func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { n, err := dst.CopyOutFrom(ctx, p) if n > 0 { - p.Notify(waiter.EventOut) + p.Notify(waiter.WritableEvents) } return n, err } @@ -75,7 +75,7 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) return safemem.FromIOWriter{w}.WriteFromBlocks(srcs) }, !dup /* removeFromSrc */) if n > 0 && !dup { - p.Notify(waiter.EventOut) + p.Notify(waiter.WritableEvents) } return n, err } @@ -84,7 +84,7 @@ func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { n, err := src.CopyInTo(ctx, p) if n > 0 { - p.Notify(waiter.EventIn) + p.Notify(waiter.ReadableEvents) } return n, err } @@ -109,7 +109,7 @@ func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, e return safemem.FromIOReader{r}.ReadToBlocks(dsts) }) if n > 0 { - p.Notify(waiter.EventIn) + p.Notify(waiter.ReadableEvents) } return n, err } diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go index 09c0ccaf2..e524afad5 100644 --- a/pkg/sentry/kernel/pipe/vfs.go +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -194,11 +194,11 @@ func (fd *VFSPipeFD) Release(context.Context) { var event waiter.EventMask if fd.vfsfd.IsReadable() { fd.pipe.rClose() - event |= waiter.EventOut + event |= waiter.WritableEvents } if fd.vfsfd.IsWritable() { fd.pipe.wClose() - event |= waiter.EventIn | waiter.EventHUp + event |= waiter.ReadableEvents | waiter.EventHUp } if event == 0 { panic("invalid pipe flags: must be readable, writable, or both") @@ -293,7 +293,7 @@ func (fd *VFSPipeFD) SpliceToNonPipe(ctx context.Context, out *vfs.FileDescripti fd.pipe.mu.Unlock() if n > 0 { - fd.pipe.Notify(waiter.EventOut) + fd.pipe.Notify(waiter.WritableEvents) } return n, err } @@ -318,14 +318,14 @@ func (fd *VFSPipeFD) SpliceFromNonPipe(ctx context.Context, in *vfs.FileDescript fd.pipe.mu.Unlock() if n > 0 { - fd.pipe.Notify(waiter.EventIn) + fd.pipe.Notify(waiter.ReadableEvents) } return n, err } // CopyIn implements usermem.IO.CopyIn. Note that it is the caller's // responsibility to call fd.pipe.consumeLocked() and -// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// fd.pipe.Notify(waiter.WritableEvents) after the read is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, opts usermem.IOOpts) (int, error) { @@ -336,8 +336,8 @@ func (fd *VFSPipeFD) CopyIn(ctx context.Context, addr usermem.Addr, dst []byte, } // CopyOut implements usermem.IO.CopyOut. Note that it is the caller's -// responsibility to call fd.pipe.Notify(waiter.EventIn) after the -// write is completed. +// responsibility to call fd.pipe.Notify(waiter.ReadableEvents) after the write +// is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOut(ctx context.Context, addr usermem.Addr, src []byte, opts usermem.IOOpts) (int, error) { @@ -359,7 +359,7 @@ func (fd *VFSPipeFD) ZeroOut(ctx context.Context, addr usermem.Addr, toZero int6 // CopyInTo implements usermem.IO.CopyInTo. Note that it is the caller's // responsibility to call fd.pipe.consumeLocked() and -// fd.pipe.Notify(waiter.EventOut) after the read is completed. +// fd.pipe.Notify(waiter.WritableEvents) after the read is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst safemem.Writer, opts usermem.IOOpts) (int64, error) { @@ -369,8 +369,8 @@ func (fd *VFSPipeFD) CopyInTo(ctx context.Context, ars usermem.AddrRangeSeq, dst } // CopyOutFrom implements usermem.IO.CopyOutFrom. Note that it is the caller's -// responsibility to call fd.pipe.Notify(waiter.EventIn) after the write is -// completed. +// responsibility to call fd.pipe.Notify(waiter.ReadableEvents) after the write +// is completed. // // Preconditions: fd.pipe.mu must be locked. func (fd *VFSPipeFD) CopyOutFrom(ctx context.Context, ars usermem.AddrRangeSeq, src safemem.Reader, opts usermem.IOOpts) (int64, error) { @@ -431,9 +431,9 @@ func spliceOrTee(ctx context.Context, dst, src *VFSPipeFD, count int64, removeFr src.pipe.mu.Unlock() if n > 0 { - dst.pipe.Notify(waiter.EventIn) + dst.pipe.Notify(waiter.ReadableEvents) if removeFromSrc { - src.pipe.Notify(waiter.EventOut) + src.pipe.Notify(waiter.WritableEvents) } } return n, err diff --git a/pkg/sentry/kernel/signalfd/signalfd.go b/pkg/sentry/kernel/signalfd/signalfd.go index 884966120..f58ec4194 100644 --- a/pkg/sentry/kernel/signalfd/signalfd.go +++ b/pkg/sentry/kernel/signalfd/signalfd.go @@ -122,8 +122,8 @@ func (s *SignalOperations) Read(ctx context.Context, _ *fs.File, dst usermem.IOS // Readiness implements waiter.Waitable.Readiness. func (s *SignalOperations) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 && s.target.PendingSignals()&s.Mask() != 0 { - return waiter.EventIn // Pending signals. + if mask&waiter.ReadableEvents != 0 && s.target.PendingSignals()&s.Mask() != 0 { + return waiter.ReadableEvents // Pending signals. } return 0 } diff --git a/pkg/sentry/socket/hostinet/socket.go b/pkg/sentry/socket/hostinet/socket.go index c711d0684..2d9dbbdba 100644 --- a/pkg/sentry/socket/hostinet/socket.go +++ b/pkg/sentry/socket/hostinet/socket.go @@ -200,9 +200,10 @@ func (s *socketOpsCommon) Connect(t *kernel.Task, sockaddr []byte, blocking bool // (SO_ERROR is zero) or unsuccessfully (SO_ERROR is one of the usual error // codes listed here, explaining the reason for the failure)." - connect(2) e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + writableMask := waiter.WritableEvents + s.EventRegister(&e, writableMask) defer s.EventUnregister(&e) - if s.Readiness(waiter.EventOut)&waiter.EventOut == 0 { + if s.Readiness(writableMask)&writableMask == 0 { if err := t.Block(ch); err != nil { return syserr.FromError(err) } @@ -244,7 +245,7 @@ func (s *socketOpsCommon) Accept(t *kernel.Task, peerRequested bool, flags int, } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) } fd, syscallErr = accept4(s.fd, peerAddrPtr, peerAddrlenPtr, unix.SOCK_NONBLOCK|unix.SOCK_CLOEXEC) @@ -496,7 +497,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) } n, err = copyToDst() @@ -652,7 +653,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b } else { var e waiter.Entry e, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) } n, err = src.CopyInTo(t, sendmsgFromBlocks) diff --git a/pkg/sentry/socket/netlink/socket.go b/pkg/sentry/socket/netlink/socket.go index 057f4d294..d5ffc75ce 100644 --- a/pkg/sentry/socket/netlink/socket.go +++ b/pkg/sentry/socket/netlink/socket.go @@ -176,10 +176,10 @@ func (s *socketOpsCommon) Readiness(mask waiter.EventMask) waiter.EventMask { // ep holds messages to be read and thus handles EventIn readiness. ready := s.ep.Readiness(mask) - if mask&waiter.EventOut == waiter.EventOut { + if mask&waiter.WritableEvents != 0 { // sendMsg handles messages synchronously and is thus always // ready for writing. - ready |= waiter.EventOut + ready |= waiter.WritableEvents } return ready @@ -544,7 +544,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notification and keep trying to // receive all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go index 9efb195f0..64e70ab9d 100644 --- a/pkg/sentry/socket/netstack/netstack.go +++ b/pkg/sentry/socket/netstack/netstack.go @@ -567,7 +567,7 @@ func (s *socketOpsCommon) Connect(t *kernel.Task, sockaddr []byte, blocking bool // Register for notification when the endpoint becomes writable, then // initiate the connection. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) switch err := s.Endpoint.Connect(addr); err.(type) { @@ -663,7 +663,7 @@ func (s *socketOpsCommon) Listen(t *kernel.Task, backlog int) *syserr.Error { func (s *socketOpsCommon) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (tcpip.Endpoint, *waiter.Queue, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) // Try to accept the connection again; if it fails, then wait until we @@ -2753,7 +2753,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notifications and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { @@ -2840,7 +2840,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b // We'll have to block. Register for notification and keep trying to // send all the data. entry, ch = waiter.NewChannelEntry(nil) - s.EventRegister(&entry, waiter.EventOut) + s.EventRegister(&entry, waiter.WritableEvents) defer s.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index b1967fc36..159b8f90f 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -338,8 +338,8 @@ func (e *connectionedEndpoint) BidirectionalConnect(ctx context.Context, ce Conn ce.Unlock() // Notify on both ends. - e.Notify(waiter.EventIn) - ce.WaiterQueue().Notify(waiter.EventOut) + e.Notify(waiter.ReadableEvents) + ce.WaiterQueue().Notify(waiter.WritableEvents) return nil default: @@ -480,15 +480,15 @@ func (e *connectionedEndpoint) Readiness(mask waiter.EventMask) waiter.EventMask ready := waiter.EventMask(0) switch { case e.Connected(): - if mask&waiter.EventIn != 0 && e.receiver.Readable() { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && e.receiver.Readable() { + ready |= waiter.ReadableEvents } - if mask&waiter.EventOut != 0 && e.connected.Writable() { - ready |= waiter.EventOut + if mask&waiter.WritableEvents != 0 && e.connected.Writable() { + ready |= waiter.WritableEvents } case e.Listening(): - if mask&waiter.EventIn != 0 && len(e.acceptedChan) > 0 { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && len(e.acceptedChan) > 0 { + ready |= waiter.ReadableEvents } } diff --git a/pkg/sentry/socket/unix/transport/connectionless.go b/pkg/sentry/socket/unix/transport/connectionless.go index 0be78480c..d0df28b59 100644 --- a/pkg/sentry/socket/unix/transport/connectionless.go +++ b/pkg/sentry/socket/unix/transport/connectionless.go @@ -191,13 +191,13 @@ func (e *connectionlessEndpoint) Readiness(mask waiter.EventMask) waiter.EventMa defer e.Unlock() ready := waiter.EventMask(0) - if mask&waiter.EventIn != 0 && e.receiver.Readable() { - ready |= waiter.EventIn + if mask&waiter.ReadableEvents != 0 && e.receiver.Readable() { + ready |= waiter.ReadableEvents } if e.Connected() { - if mask&waiter.EventOut != 0 && e.connected.Writable() { - ready |= waiter.EventOut + if mask&waiter.WritableEvents != 0 && e.connected.Writable() { + ready |= waiter.WritableEvents } } diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go index 698a9a82c..e4de44498 100644 --- a/pkg/sentry/socket/unix/transport/queue.go +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -44,8 +44,8 @@ type queue struct { // will become unreadable when no more data is pending. // // Both the read and write queues must be notified after closing: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) +// q.ReaderQueue.Notify(waiter.ReadableEvents) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Close() { q.mu.Lock() q.closed = true @@ -55,8 +55,8 @@ func (q *queue) Close() { // Reset empties the queue and Releases all of the Entries. // // Both the read and write queues must be notified after resetting: -// q.ReaderQueue.Notify(waiter.EventIn) -// q.WriterQueue.Notify(waiter.EventOut) +// q.ReaderQueue.Notify(waiter.ReadableEvents) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Reset(ctx context.Context) { q.mu.Lock() for cur := q.dataList.Front(); cur != nil; cur = cur.Next() { @@ -112,7 +112,7 @@ func (q *queue) IsWritable() bool { // err indicates why. // // If notify is true, ReaderQueue.Notify must be called: -// q.ReaderQueue.Notify(waiter.EventIn) +// q.ReaderQueue.Notify(waiter.ReadableEvents) func (q *queue) Enqueue(ctx context.Context, data [][]byte, c ControlMessages, from tcpip.FullAddress, discardEmpty bool, truncate bool) (l int64, notify bool, err *syserr.Error) { q.mu.Lock() @@ -179,7 +179,7 @@ func (q *queue) Enqueue(ctx context.Context, data [][]byte, c ControlMessages, f // Dequeue removes the first entry in the data queue, if one exists. // // If notify is true, WriterQueue.Notify must be called: -// q.WriterQueue.Notify(waiter.EventOut) +// q.WriterQueue.Notify(waiter.WritableEvents) func (q *queue) Dequeue() (e *message, notify bool, err *syserr.Error) { q.mu.Lock() diff --git a/pkg/sentry/socket/unix/transport/unix.go b/pkg/sentry/socket/unix/transport/unix.go index 089a0a647..0c5f5ab42 100644 --- a/pkg/sentry/socket/unix/transport/unix.go +++ b/pkg/sentry/socket/unix/transport/unix.go @@ -376,13 +376,13 @@ func (q *queueReceiver) Recv(ctx context.Context, data [][]byte, creds bool, num // RecvNotify implements Receiver.RecvNotify. func (q *queueReceiver) RecvNotify() { - q.readQueue.WriterQueue.Notify(waiter.EventOut) + q.readQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseNotify implements Receiver.CloseNotify. func (q *queueReceiver) CloseNotify() { - q.readQueue.ReaderQueue.Notify(waiter.EventIn) - q.readQueue.WriterQueue.Notify(waiter.EventOut) + q.readQueue.ReaderQueue.Notify(waiter.ReadableEvents) + q.readQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseRecv implements Receiver.CloseRecv. @@ -692,13 +692,13 @@ func (e *connectedEndpoint) Send(ctx context.Context, data [][]byte, c ControlMe // SendNotify implements ConnectedEndpoint.SendNotify. func (e *connectedEndpoint) SendNotify() { - e.writeQueue.ReaderQueue.Notify(waiter.EventIn) + e.writeQueue.ReaderQueue.Notify(waiter.ReadableEvents) } // CloseNotify implements ConnectedEndpoint.CloseNotify. func (e *connectedEndpoint) CloseNotify() { - e.writeQueue.ReaderQueue.Notify(waiter.EventIn) - e.writeQueue.WriterQueue.Notify(waiter.EventOut) + e.writeQueue.ReaderQueue.Notify(waiter.ReadableEvents) + e.writeQueue.WriterQueue.Notify(waiter.WritableEvents) } // CloseSend implements ConnectedEndpoint.CloseSend. diff --git a/pkg/sentry/socket/unix/unix.go b/pkg/sentry/socket/unix/unix.go index 63165e1d4..b22f7973a 100644 --- a/pkg/sentry/socket/unix/unix.go +++ b/pkg/sentry/socket/unix/unix.go @@ -207,7 +207,7 @@ func (s *socketOpsCommon) Listen(t *kernel.Task, backlog int) *syserr.Error { func (s *SocketOperations) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (transport.Endpoint, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) // Try to accept the connection; if it fails, then wait until we get a @@ -502,7 +502,7 @@ func (s *socketOpsCommon) SendMsg(t *kernel.Task, src usermem.IOSequence, to []b // We'll have to block. Register for notification and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventOut) + s.EventRegister(&e, waiter.WritableEvents) defer s.EventUnregister(&e) total := n @@ -677,7 +677,7 @@ func (s *socketOpsCommon) RecvMsg(t *kernel.Task, dst usermem.IOSequence, flags // We'll have to block. Register for notification and keep trying to // send all the data. e, ch := waiter.NewChannelEntry(nil) - s.EventRegister(&e, waiter.EventIn) + s.EventRegister(&e, waiter.ReadableEvents) defer s.EventUnregister(&e) for { diff --git a/pkg/sentry/socket/unix/unix_vfs2.go b/pkg/sentry/socket/unix/unix_vfs2.go index 9c037cbae..7890d1048 100644 --- a/pkg/sentry/socket/unix/unix_vfs2.go +++ b/pkg/sentry/socket/unix/unix_vfs2.go @@ -121,7 +121,7 @@ func (s *SocketVFS2) GetSockOpt(t *kernel.Task, level, name int, outPtr usermem. func (s *SocketVFS2) blockingAccept(t *kernel.Task, peerAddr *tcpip.FullAddress) (transport.Endpoint, *syserr.Error) { // Register for notifications. e, ch := waiter.NewChannelEntry(nil) - s.socketOpsCommon.EventRegister(&e, waiter.EventIn) + s.socketOpsCommon.EventRegister(&e, waiter.ReadableEvents) defer s.socketOpsCommon.EventUnregister(&e) // Try to accept the connection; if it fails, then wait until we get a diff --git a/pkg/sentry/syscalls/epoll.go b/pkg/sentry/syscalls/epoll.go index d23a0068a..e115683f8 100644 --- a/pkg/sentry/syscalls/epoll.go +++ b/pkg/sentry/syscalls/epoll.go @@ -151,7 +151,7 @@ func WaitEpoll(t *kernel.Task, fd int32, max int, timeout int) ([]linux.EpollEve } w, ch := waiter.NewChannelEntry(nil) - e.EventRegister(&w, waiter.EventIn) + e.EventRegister(&w, waiter.ReadableEvents) defer e.EventUnregister(&w) // Try to read the events again until we succeed, timeout or get diff --git a/pkg/sentry/syscalls/linux/sys_read.go b/pkg/sentry/syscalls/linux/sys_read.go index f655d3db1..13e5e3a51 100644 --- a/pkg/sentry/syscalls/linux/sys_read.go +++ b/pkg/sentry/syscalls/linux/sys_read.go @@ -32,7 +32,7 @@ import ( const ( // EventMaskRead contains events that can be triggered on reads. - EventMaskRead = waiter.EventIn | waiter.EventHUp | waiter.EventErr + EventMaskRead = waiter.ReadableEvents | waiter.EventHUp | waiter.EventErr ) // Read implements linux syscall read(2). Note that we try to get a buffer that diff --git a/pkg/sentry/syscalls/linux/vfs2/epoll.go b/pkg/sentry/syscalls/linux/vfs2/epoll.go index d0cbb77eb..b980aa43e 100644 --- a/pkg/sentry/syscalls/linux/vfs2/epoll.go +++ b/pkg/sentry/syscalls/linux/vfs2/epoll.go @@ -169,7 +169,7 @@ func EpollWait(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sys if ch == nil { var w waiter.Entry w, ch = waiter.NewChannelEntry(nil) - epfile.EventRegister(&w, waiter.EventIn) + epfile.EventRegister(&w, waiter.ReadableEvents) defer epfile.EventUnregister(&w) } else { // Set up the timer if a timeout was specified. diff --git a/pkg/sentry/syscalls/linux/vfs2/read_write.go b/pkg/sentry/syscalls/linux/vfs2/read_write.go index c7417840f..b863d7b84 100644 --- a/pkg/sentry/syscalls/linux/vfs2/read_write.go +++ b/pkg/sentry/syscalls/linux/vfs2/read_write.go @@ -30,8 +30,8 @@ import ( ) const ( - eventMaskRead = waiter.EventIn | waiter.EventHUp | waiter.EventErr - eventMaskWrite = waiter.EventOut | waiter.EventHUp | waiter.EventErr + eventMaskRead = waiter.EventRdNorm | waiter.EventIn | waiter.EventHUp | waiter.EventErr + eventMaskWrite = waiter.EventWrNorm | waiter.EventOut | waiter.EventHUp | waiter.EventErr ) // Read implements Linux syscall read(2). diff --git a/pkg/sentry/vfs/epoll.go b/pkg/sentry/vfs/epoll.go index 072655fe8..ae004b371 100644 --- a/pkg/sentry/vfs/epoll.go +++ b/pkg/sentry/vfs/epoll.go @@ -131,7 +131,7 @@ func (ep *EpollInstance) Release(ctx context.Context) { // Readiness implements waiter.Waitable.Readiness. func (ep *EpollInstance) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn == 0 { + if mask&waiter.ReadableEvents == 0 { return 0 } ep.mu.Lock() @@ -139,7 +139,7 @@ func (ep *EpollInstance) Readiness(mask waiter.EventMask) waiter.EventMask { wmask := waiter.EventMaskFromLinux(epi.mask) if epi.key.file.Readiness(wmask)&wmask != 0 { ep.mu.Unlock() - return waiter.EventIn + return waiter.ReadableEvents } } ep.mu.Unlock() @@ -321,7 +321,7 @@ func (epi *epollInterest) Callback(*waiter.Entry, waiter.EventMask) { } epi.epoll.mu.Unlock() if newReady { - epi.epoll.q.Notify(waiter.EventIn) + epi.epoll.q.Notify(waiter.ReadableEvents) } } diff --git a/pkg/sentry/vfs/file_description_impl_util.go b/pkg/sentry/vfs/file_description_impl_util.go index d2050b3f7..1556b41a3 100644 --- a/pkg/sentry/vfs/file_description_impl_util.go +++ b/pkg/sentry/vfs/file_description_impl_util.go @@ -72,7 +72,7 @@ func (FileDescriptionDefaultImpl) Allocate(ctx context.Context, mode, offset, le // file_operations::poll == NULL in Linux. func (FileDescriptionDefaultImpl) Readiness(mask waiter.EventMask) waiter.EventMask { // include/linux/poll.h:vfs_poll() => DEFAULT_POLLMASK - return waiter.EventIn | waiter.EventOut + return waiter.ReadableEvents | waiter.WritableEvents } // EventRegister implements waiter.Waitable.EventRegister analogously to diff --git a/pkg/sentry/vfs/inotify.go b/pkg/sentry/vfs/inotify.go index a48ac1cd6..32fa01578 100644 --- a/pkg/sentry/vfs/inotify.go +++ b/pkg/sentry/vfs/inotify.go @@ -175,7 +175,7 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { defer i.evMu.Unlock() if !i.events.Empty() { - ready |= waiter.EventIn + ready |= waiter.ReadableEvents } return mask & ready @@ -286,7 +286,7 @@ func (i *Inotify) queueEvent(ev *Event) { // can do. i.evMu.Unlock() - i.queue.Notify(waiter.EventIn) + i.queue.Notify(waiter.ReadableEvents) } // newWatchLocked creates and adds a new watch to target. diff --git a/pkg/tcpip/adapters/gonet/gonet.go b/pkg/tcpip/adapters/gonet/gonet.go index c188aaa18..010e2e833 100644 --- a/pkg/tcpip/adapters/gonet/gonet.go +++ b/pkg/tcpip/adapters/gonet/gonet.go @@ -251,7 +251,7 @@ func (l *TCPListener) Accept() (net.Conn, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - l.wq.EventRegister(&waitEntry, waiter.EventIn) + l.wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer l.wq.EventUnregister(&waitEntry) for { @@ -301,7 +301,7 @@ func commonRead(b []byte, ep tcpip.Endpoint, wq *waiter.Queue, deadline <-chan s if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventIn) + wq.EventRegister(&waitEntry, waiter.ReadableEvents) defer wq.EventUnregister(&waitEntry) for { res, err = ep.Read(&w, opts) @@ -382,7 +382,7 @@ func (c *TCPConn) Write(b []byte) (int, error) { if ch == nil { entry, ch = waiter.NewChannelEntry(nil) - c.wq.EventRegister(&entry, waiter.EventOut) + c.wq.EventRegister(&entry, waiter.WritableEvents) defer c.wq.EventUnregister(&entry) } else { // Don't wait immediately after registration in case more data @@ -485,7 +485,7 @@ func DialContextTCP(ctx context.Context, s *stack.Stack, addr tcpip.FullAddress, // // We do this unconditionally as Connect will always return an error. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - wq.EventRegister(&waitEntry, waiter.EventOut) + wq.EventRegister(&waitEntry, waiter.WritableEvents) defer wq.EventUnregister(&waitEntry) select { @@ -652,7 +652,7 @@ func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { if _, ok := err.(*tcpip.ErrWouldBlock); ok { // Create wait queue entry that notifies a channel. waitEntry, notifyCh := waiter.NewChannelEntry(nil) - c.wq.EventRegister(&waitEntry, waiter.EventOut) + c.wq.EventRegister(&waitEntry, waiter.WritableEvents) defer c.wq.EventUnregister(&waitEntry) for { select { diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go index 2bb1be5d6..6be945116 100644 --- a/pkg/tcpip/link/fdbased/endpoint.go +++ b/pkg/tcpip/link/fdbased/endpoint.go @@ -41,6 +41,8 @@ package fdbased import ( "fmt" + "math" + "sync/atomic" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/binary" @@ -188,7 +190,9 @@ type Options struct { // set of FD's that point to the same NIC. Trying to set the PACKET_FANOUT // option for an FD with a fanoutID already in use by another FD for a different // NIC will return an EINVAL. -var fanoutID = 1 +// +// Must be accessed using atomic operations. +var fanoutID int32 = 0 // New creates a new fd-based endpoint. // @@ -233,6 +237,10 @@ func New(opts *Options) (stack.LinkEndpoint, error) { packetDispatchMode: opts.PacketDispatchMode, } + // Increment fanoutID to ensure that we don't re-use the same fanoutID for + // the next endpoint. + fid := atomic.AddInt32(&fanoutID, 1) + // Create per channel dispatchers. for i := 0; i < len(e.fds); i++ { fd := e.fds[i] @@ -254,21 +262,17 @@ func New(opts *Options) (stack.LinkEndpoint, error) { e.gsoMaxSize = opts.GSOMaxSize } } - inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket) + inboundDispatcher, err := createInboundDispatcher(e, fd, isSocket, fid) if err != nil { return nil, fmt.Errorf("createInboundDispatcher(...) = %v", err) } e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher) } - // Increment fanoutID to ensure that we don't re-use the same fanoutID for - // the next endpoint. - fanoutID++ - return e, nil } -func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher, error) { +func createInboundDispatcher(e *endpoint, fd int, isSocket bool, fID int32) (linkDispatcher, error) { // By default use the readv() dispatcher as it works with all kinds of // FDs (tap/tun/unix domain sockets and af_packet). inboundDispatcher, err := newReadVDispatcher(fd, e) @@ -283,13 +287,32 @@ func createInboundDispatcher(e *endpoint, fd int, isSocket bool) (linkDispatcher } switch sa.(type) { case *unix.SockaddrLinklayer: + // See: PACKET_FANOUT_MAX in net/packet/internal.h + const packetFanoutMax = 1 << 16 + if fID > packetFanoutMax { + return nil, fmt.Errorf("host fanoutID limit exceeded, fanoutID must be <= %d", math.MaxUint16) + } // Enable PACKET_FANOUT mode if the underlying socket is of type // AF_PACKET. We do not enable PACKET_FANOUT_FLAG_DEFRAG as that will // prevent gvisor from receiving fragmented packets and the host does the // reassembly on our behalf before delivering the fragments. This makes it // hard to test fragmentation reassembly code in Netstack. + // + // See: include/uapi/linux/if_packet.h (struct fanout_args). + // + // NOTE: We are using SetSockOptInt here even though the underlying + // option is actually a struct. The code follows the example in the + // kernel documentation as described at the link below: + // + // See: https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt + // + // This works out because the actual implementation for the option zero + // initializes the structure and will initialize the max_members field + // to a proper value if zero. + // + // See: https://github.com/torvalds/linux/blob/7acac4b3196caee5e21fb5ea53f8bc124e6a16fc/net/packet/af_packet.c#L3881 const fanoutType = unix.PACKET_FANOUT_HASH - fanoutArg := fanoutID | fanoutType<<16 + fanoutArg := int(fID) | fanoutType<<16 if err := unix.SetsockoptInt(fd, unix.SOL_PACKET, unix.PACKET_FANOUT, fanoutArg); err != nil { return nil, fmt.Errorf("failed to enable PACKET_FANOUT option: %v", err) } diff --git a/pkg/tcpip/link/tun/device.go b/pkg/tcpip/link/tun/device.go index 80fb343c5..36af2a029 100644 --- a/pkg/tcpip/link/tun/device.go +++ b/pkg/tcpip/link/tun/device.go @@ -309,20 +309,20 @@ func (d *Device) Flags() Flags { // Readiness implements watier.Waitable.Readiness. func (d *Device) Readiness(mask waiter.EventMask) waiter.EventMask { - if mask&waiter.EventIn != 0 { + if mask&waiter.ReadableEvents != 0 { d.mu.RLock() endpoint := d.endpoint d.mu.RUnlock() if endpoint != nil && endpoint.NumQueued() == 0 { - mask &= ^waiter.EventIn + mask &= ^waiter.ReadableEvents } } - return mask & (waiter.EventIn | waiter.EventOut) + return mask & (waiter.ReadableEvents | waiter.WritableEvents) } // WriteNotify implements channel.Notification.WriteNotify. func (d *Device) WriteNotify() { - d.Notify(waiter.EventIn) + d.Notify(waiter.ReadableEvents) } // tunEndpoint is the link endpoint for the NIC created by the tun device. diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 1dce35c63..50991c3c0 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -149,7 +149,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -588,7 +588,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -725,13 +725,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -804,7 +804,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB e.stats.PacketsReceived.Increment() // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index 367757d3b..52ed9560c 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -152,7 +152,7 @@ func (ep *endpoint) Close() { ep.closed = true ep.bound = false - ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + ep.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -287,13 +287,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { ep.rcvMu.Lock() if !ep.rcvList.Empty() || ep.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } ep.rcvMu.Unlock() } @@ -483,7 +483,7 @@ func (ep *endpoint) HandlePacket(nicID tcpip.NICID, localAddr tcpip.LinkAddress, ep.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - ep.waiterQueue.Notify(waiter.EventIn) + ep.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 4b2f08379..e27a249cd 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -177,7 +177,7 @@ func (e *endpoint) Close() { e.closed = true - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -486,13 +486,13 @@ func (*endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // Readiness implements tcpip.Endpoint.Readiness. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine whether the endpoint is readable. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -655,7 +655,7 @@ func (e *endpoint) HandlePacket(pkt *stack.PacketBuffer) { e.stats.PacketsReceived.Increment() // Notify waiters that there's data to be read. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 0a2f3291c..025b134e2 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -410,7 +410,7 @@ func (e *endpoint) deliverAccepted(n *endpoint, withSynCookie bool) { atomic.AddInt32(&e.synRcvdCount, -1) } e.acceptMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) return default: e.acceptCond.Wait() @@ -462,7 +462,7 @@ func (e *endpoint) reserveTupleLocked() bool { // can't really have any registered waiters except when stack.Wait() is called // which waits for all registered endpoints to stop and expects an EventHUp. func (e *endpoint) notifyAborted() { - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // handleSynSegment is called in its own goroutine once the listening endpoint @@ -771,7 +771,7 @@ func (e *endpoint) protocolListenLoop(rcvWnd seqnum.Size) { e.drainClosingSegmentQueue() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) }() var s sleep.Sleeper diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index b32fe2fb1..a9e978cf6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -1320,7 +1320,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.drainClosingSegmentQueue() // When the protocol loop exits we should wake up our waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } if handshake { @@ -1495,7 +1495,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ } // Tell waiters that the endpoint is connected and writable. - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) // The following assertions and notifications are needed for restored // endpoints. Fresh newly created endpoints have empty states and should @@ -1506,7 +1506,7 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{ e.rcvListMu.Lock() if !e.rcvList.Empty() { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } e.rcvListMu.Unlock() @@ -1570,7 +1570,7 @@ loop: // wakers. s.Done() // Wake up any waiters before we enter TIME_WAIT. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.workerCleanup = true reuseTW = e.doTimeWait() } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 0a5e9cbb4..c5daba232 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -960,30 +960,30 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { case StateListen: // Check if there's anything in the accepted channel. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.acceptMu.Lock() if len(e.acceptedChan) > 0 { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.acceptMu.Unlock() } } if e.EndpointState().connected() { // Determine if the endpoint is writable if requested. - if (mask & waiter.EventOut) != 0 { + if (mask & waiter.WritableEvents) != 0 { e.sndBufMu.Lock() sndBufSize := e.getSendBufferSize() if e.sndClosed || e.sndBufUsed < sndBufSize { - result |= waiter.EventOut + result |= waiter.WritableEvents } e.sndBufMu.Unlock() } // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if (mask & waiter.ReadableEvents) != 0 { e.rcvListMu.Lock() if e.rcvBufUsed > 0 || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvListMu.Unlock() } @@ -1121,7 +1121,7 @@ func (e *endpoint) closeNoShutdownLocked() { return } - eventMask := waiter.EventIn | waiter.EventOut + eventMask := waiter.ReadableEvents | waiter.WritableEvents // Either perform the local cleanup or kick the worker to make sure it // knows it needs to cleanup. if e.workerRunning { @@ -2133,7 +2133,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) tcpip.Error { if err != nil { if !err.IgnoreStats() { // Connect failed. Let's wake up any waiters. - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) e.stack.Stats().TCP.FailedConnectionAttempts.Increment() e.stats.FailedConnectionAttempts.Increment() } @@ -2463,7 +2463,7 @@ func (e *endpoint) shutdownLocked(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvListMu.Unlock() e.closePendingAcceptableConnectionsLocked() // Notify waiters that the endpoint is shutdown. - e.waiterQueue.Notify(waiter.EventIn | waiter.EventOut | waiter.EventHUp | waiter.EventErr) + e.waiterQueue.Notify(waiter.ReadableEvents | waiter.WritableEvents | waiter.EventHUp | waiter.EventErr) } return nil default: @@ -2811,7 +2811,7 @@ func (e *endpoint) updateSndBufferUsage(v int) { e.sndBufMu.Unlock() if notify { - e.waiterQueue.Notify(waiter.EventOut) + e.waiterQueue.Notify(waiter.WritableEvents) } } @@ -2828,7 +2828,7 @@ func (e *endpoint) readyToRead(s *segment) { e.rcvClosed = true } e.rcvListMu.Unlock() - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } // receiveBufferAvailableLocked calculates how many bytes are still available diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0f59181bb..956da0e0c 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -284,7 +284,7 @@ func (e *endpoint) Close() { e.mu.Unlock() - e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.EventIn | waiter.EventOut) + e.waiterQueue.Notify(waiter.EventHUp | waiter.EventErr | waiter.ReadableEvents | waiter.WritableEvents) } // ModerateRecvBuf implements tcpip.Endpoint.ModerateRecvBuf. @@ -1070,7 +1070,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) tcpip.Error { e.rcvMu.Unlock() if !wasClosed { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } @@ -1234,13 +1234,13 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, tcpip.Error) { // waiter.EventIn is set, the endpoint is immediately readable. func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // The endpoint is always writable. - result := waiter.EventOut & mask + result := waiter.WritableEvents & mask // Determine if the endpoint is readable if requested. - if (mask & waiter.EventIn) != 0 { + if mask&waiter.ReadableEvents != 0 { e.rcvMu.Lock() if !e.rcvList.Empty() || e.rcvClosed { - result |= waiter.EventIn + result |= waiter.ReadableEvents } e.rcvMu.Unlock() } @@ -1349,7 +1349,7 @@ func (e *endpoint) HandlePacket(id stack.TransportEndpointID, pkt *stack.PacketB // Notify any waiters that there's data to be read now. if wasEmpty { - e.waiterQueue.Notify(waiter.EventIn) + e.waiterQueue.Notify(waiter.ReadableEvents) } } diff --git a/pkg/waiter/waiter.go b/pkg/waiter/waiter.go index 83d4f893a..4ea067cb7 100644 --- a/pkg/waiter/waiter.go +++ b/pkg/waiter/waiter.go @@ -67,13 +67,17 @@ type EventMask uint64 // Events that waiters can wait on. The meaning is the same as those in the // poll() syscall. const ( - EventIn EventMask = 0x01 // POLLIN - EventPri EventMask = 0x02 // POLLPRI - EventOut EventMask = 0x04 // POLLOUT - EventErr EventMask = 0x08 // POLLERR - EventHUp EventMask = 0x10 // POLLHUP - - allEvents EventMask = 0x1f + EventIn EventMask = 0x01 // POLLIN + EventPri EventMask = 0x02 // POLLPRI + EventOut EventMask = 0x04 // POLLOUT + EventErr EventMask = 0x08 // POLLERR + EventHUp EventMask = 0x10 // POLLHUP + EventRdNorm EventMask = 0x0040 // POLLRDNORM + EventWrNorm EventMask = 0x0100 // POLLWRNORM + + allEvents EventMask = 0x1f | EventRdNorm | EventWrNorm + ReadableEvents EventMask = EventIn | EventRdNorm + WritableEvents EventMask = EventOut | EventWrNorm ) // EventMaskFromLinux returns an EventMask representing the supported events |