diff options
-rw-r--r-- | pkg/sentry/kernel/msgqueue/msgqueue.go | 53 |
1 files changed, 33 insertions, 20 deletions
diff --git a/pkg/sentry/kernel/msgqueue/msgqueue.go b/pkg/sentry/kernel/msgqueue/msgqueue.go index c111297d7..fab396d7c 100644 --- a/pkg/sentry/kernel/msgqueue/msgqueue.go +++ b/pkg/sentry/kernel/msgqueue/msgqueue.go @@ -208,11 +208,13 @@ func (r *Registry) FindByID(id ipc.ID) (*Queue, error) { // Send appends a message to the message queue, and returns an error if sending // fails. See msgsnd(2). -func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) (err error) { +func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) error { // Try to perform a non-blocking send using queue.append. If EWOULDBLOCK // is returned, start the blocking procedure. Otherwise, return normally. creds := auth.CredentialsFromContext(ctx) - if err := q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { + + // Fast path: first attempt a non-blocking push. + if err := q.push(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { return err } @@ -220,25 +222,30 @@ func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid i return linuxerr.EAGAIN } + // Slow path: at this point, the queue was found to be full, and we were + // asked to block. + e, ch := waiter.NewChannelEntry(nil) q.senders.EventRegister(&e, waiter.EventOut) + defer q.senders.EventUnregister(&e) + // Note: we need to check again before blocking the first time since space + // may have become available. for { - if err = q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { - break + if err := q.push(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { + return err + } + if err := b.Block(ch); err != nil { + return err } - b.Block(ch) } - - q.senders.EventUnregister(&e) - return err } -// append appends a message to the queue's message list and notifies waiting +// push appends a message to the queue's message list and notifies waiting // receivers that a message has been inserted. It returns an error if adding // the message would cause the queue to exceed its maximum capacity, which can // be used as a signal to block the task. Other errors should be returned as is. -func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error { +func (q *Queue) push(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error { if m.Type <= 0 { return linuxerr.EINVAL } @@ -295,15 +302,14 @@ func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials, } // Receive removes a message from the queue and returns it. See msgrcv(2). -func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int64, wait, truncate, except bool, pid int32) (msg *Message, err error) { +func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int64, wait, truncate, except bool, pid int32) (*Message, error) { if maxSize < 0 || maxSize > maxMessageBytes { return nil, linuxerr.EINVAL } max := uint64(maxSize) - - // Try to perform a non-blocking receive using queue.pop. If EWOULDBLOCK - // is returned, start the blocking procedure. Otherwise, return normally. creds := auth.CredentialsFromContext(ctx) + + // Fast path: first attempt a non-blocking pop. if msg, err := q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK { return msg, err } @@ -312,24 +318,30 @@ func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int return nil, linuxerr.ENOMSG } + // Slow path: at this point, the queue was found to be empty, and we were + // asked to block. + e, ch := waiter.NewChannelEntry(nil) q.receivers.EventRegister(&e, waiter.EventIn) + defer q.receivers.EventUnregister(&e) + // Note: we need to check again before blocking the first time since a + // message may have become available. for { - if msg, err = q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK { - break + if msg, err := q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK { + return msg, err + } + if err := b.Block(ch); err != nil { + return nil, err } - b.Block(ch) } - q.receivers.EventUnregister(&e) - return msg, err } // pop pops the first message from the queue that matches the given type. It // returns an error for all the cases specified in msgrcv(2). If the queue is // empty or no message of the specified type is available, a EWOULDBLOCK error // is returned, which can then be used as a signal to block the process or fail. -func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, maxSize uint64, truncate, except bool, pid int32) (msg *Message, _ error) { +func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, maxSize uint64, truncate, except bool, pid int32) (*Message, error) { q.mu.Lock() defer q.mu.Unlock() @@ -350,6 +362,7 @@ func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, m } // Get a message from the queue. + var msg *Message switch { case mType == 0: msg = q.messages.Front() |