summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/sentry/kernel/msgqueue/msgqueue.go53
1 files changed, 33 insertions, 20 deletions
diff --git a/pkg/sentry/kernel/msgqueue/msgqueue.go b/pkg/sentry/kernel/msgqueue/msgqueue.go
index c111297d7..fab396d7c 100644
--- a/pkg/sentry/kernel/msgqueue/msgqueue.go
+++ b/pkg/sentry/kernel/msgqueue/msgqueue.go
@@ -208,11 +208,13 @@ func (r *Registry) FindByID(id ipc.ID) (*Queue, error) {
// Send appends a message to the message queue, and returns an error if sending
// fails. See msgsnd(2).
-func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) (err error) {
+func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) error {
// Try to perform a non-blocking send using queue.append. If EWOULDBLOCK
// is returned, start the blocking procedure. Otherwise, return normally.
creds := auth.CredentialsFromContext(ctx)
- if err := q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
+
+ // Fast path: first attempt a non-blocking push.
+ if err := q.push(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
return err
}
@@ -220,25 +222,30 @@ func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid i
return linuxerr.EAGAIN
}
+ // Slow path: at this point, the queue was found to be full, and we were
+ // asked to block.
+
e, ch := waiter.NewChannelEntry(nil)
q.senders.EventRegister(&e, waiter.EventOut)
+ defer q.senders.EventUnregister(&e)
+ // Note: we need to check again before blocking the first time since space
+ // may have become available.
for {
- if err = q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
- break
+ if err := q.push(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
+ return err
+ }
+ if err := b.Block(ch); err != nil {
+ return err
}
- b.Block(ch)
}
-
- q.senders.EventUnregister(&e)
- return err
}
-// append appends a message to the queue's message list and notifies waiting
+// push appends a message to the queue's message list and notifies waiting
// receivers that a message has been inserted. It returns an error if adding
// the message would cause the queue to exceed its maximum capacity, which can
// be used as a signal to block the task. Other errors should be returned as is.
-func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error {
+func (q *Queue) push(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error {
if m.Type <= 0 {
return linuxerr.EINVAL
}
@@ -295,15 +302,14 @@ func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials,
}
// Receive removes a message from the queue and returns it. See msgrcv(2).
-func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int64, wait, truncate, except bool, pid int32) (msg *Message, err error) {
+func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int64, wait, truncate, except bool, pid int32) (*Message, error) {
if maxSize < 0 || maxSize > maxMessageBytes {
return nil, linuxerr.EINVAL
}
max := uint64(maxSize)
-
- // Try to perform a non-blocking receive using queue.pop. If EWOULDBLOCK
- // is returned, start the blocking procedure. Otherwise, return normally.
creds := auth.CredentialsFromContext(ctx)
+
+ // Fast path: first attempt a non-blocking pop.
if msg, err := q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK {
return msg, err
}
@@ -312,24 +318,30 @@ func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int
return nil, linuxerr.ENOMSG
}
+ // Slow path: at this point, the queue was found to be empty, and we were
+ // asked to block.
+
e, ch := waiter.NewChannelEntry(nil)
q.receivers.EventRegister(&e, waiter.EventIn)
+ defer q.receivers.EventUnregister(&e)
+ // Note: we need to check again before blocking the first time since a
+ // message may have become available.
for {
- if msg, err = q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK {
- break
+ if msg, err := q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK {
+ return msg, err
+ }
+ if err := b.Block(ch); err != nil {
+ return nil, err
}
- b.Block(ch)
}
- q.receivers.EventUnregister(&e)
- return msg, err
}
// pop pops the first message from the queue that matches the given type. It
// returns an error for all the cases specified in msgrcv(2). If the queue is
// empty or no message of the specified type is available, a EWOULDBLOCK error
// is returned, which can then be used as a signal to block the process or fail.
-func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, maxSize uint64, truncate, except bool, pid int32) (msg *Message, _ error) {
+func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, maxSize uint64, truncate, except bool, pid int32) (*Message, error) {
q.mu.Lock()
defer q.mu.Unlock()
@@ -350,6 +362,7 @@ func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, m
}
// Get a message from the queue.
+ var msg *Message
switch {
case mType == 0:
msg = q.messages.Front()