diff options
Diffstat (limited to 'pkg/sentry')
-rw-r--r-- | pkg/sentry/kernel/msgqueue/msgqueue.go | 120 |
1 files changed, 114 insertions, 6 deletions
diff --git a/pkg/sentry/kernel/msgqueue/msgqueue.go b/pkg/sentry/kernel/msgqueue/msgqueue.go index 3ce926950..a25718fbb 100644 --- a/pkg/sentry/kernel/msgqueue/msgqueue.go +++ b/pkg/sentry/kernel/msgqueue/msgqueue.go @@ -119,14 +119,21 @@ type Queue struct { type Message struct { msgEntry - // mType is an integer representing the type of the sent message. - mType int64 + // Type is an integer representing the type of the sent message. + Type int64 - // mText is an untyped block of memory. - mText []byte + // Text is an untyped block of memory. + Text []byte - // mSize is the size of mText. - mSize uint64 + // Size is the size of Text. + Size uint64 +} + +// Blocker is used for blocking Queue.Send, and Queue.Receive calls that serves +// as an abstracted version of kernel.Task. kernel.Task is not directly used to +// prevent circular dependencies. +type Blocker interface { + Block(C <-chan struct{}) error } // FindOrCreate creates a new message queue or returns an existing one. See @@ -186,6 +193,107 @@ func (r *Registry) Remove(id ipc.ID, creds *auth.Credentials) error { return nil } +// FindByID returns the queue with the specified ID and an error if the ID +// doesn't exist. +func (r *Registry) FindByID(id ipc.ID) (*Queue, error) { + r.mu.Lock() + defer r.mu.Unlock() + + mech := r.reg.FindByID(id) + if mech == nil { + return nil, linuxerr.EINVAL + } + return mech.(*Queue), nil +} + +// Send appends a message to the message queue, and returns an error if sending +// fails. See msgsnd(2). +func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) (err error) { + // Try to perform a non-blocking send using queue.append. If EWOULDBLOCK + // is returned, start the blocking procedure. Otherwise, return normally. + creds := auth.CredentialsFromContext(ctx) + if err := q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { + return err + } + + if !wait { + return linuxerr.EAGAIN + } + + e, ch := waiter.NewChannelEntry(nil) + q.senders.EventRegister(&e, waiter.EventOut) + + for { + if err = q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK { + break + } + b.Block(ch) + } + + q.senders.EventUnregister(&e) + return err +} + +// append appends a message to the queue's message list and notifies waiting +// receivers that a message has been inserted. It returns an error if adding +// the message would cause the queue to exceed its maximum capacity, which can +// be used as a signal to block the task. Other errors should be returned as is. +func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error { + if m.Type <= 0 { + return linuxerr.EINVAL + } + + q.mu.Lock() + defer q.mu.Unlock() + + if !q.obj.CheckPermissions(creds, fs.PermMask{Write: true}) { + // The calling process does not have write permission on the message + // queue, and does not have the CAP_IPC_OWNER capability in the user + // namespace that governs its IPC namespace. + return linuxerr.EACCES + } + + // Queue was removed while the process was waiting. + if q.dead { + return linuxerr.EIDRM + } + + // Check if sufficient space is available (the queue isn't full.) From + // the man pages: + // + // "A message queue is considered to be full if either of the following + // conditions is true: + // + // • Adding a new message to the queue would cause the total number + // of bytes in the queue to exceed the queue's maximum size (the + // msg_qbytes field). + // + // • Adding another message to the queue would cause the total + // number of messages in the queue to exceed the queue's maximum + // size (the msg_qbytes field). This check is necessary to + // prevent an unlimited number of zero-length messages being + // placed on the queue. Although such messages contain no data, + // they nevertheless consume (locked) kernel memory." + // + // The msg_qbytes field in our implementation is q.maxBytes. + if m.Size+q.byteCount > q.maxBytes || q.messageCount+1 > q.maxBytes { + return linuxerr.EWOULDBLOCK + } + + // Copy the message into the queue. + q.messages.PushBack(&m) + + q.byteCount += m.Size + q.messageCount++ + q.sendPID = pid + q.sendTime = ktime.NowFromContext(ctx) + + // Notify receivers about the new message. + q.receivers.Notify(waiter.EventIn) + + return nil +} + // Lock implements ipc.Mechanism.Lock. func (q *Queue) Lock() { q.mu.Lock() |