summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel/msgqueue/msgqueue.go
diff options
context:
space:
mode:
authorZyad A. Ali <zyad.ali.me@gmail.com>2021-06-14 17:18:34 +0200
committerZyad A. Ali <zyad.ali.me@gmail.com>2021-08-03 18:13:24 +0200
commit527c369299c0b51631852838a6e2c1d357b609f7 (patch)
tree5b2a03af187fc8019e33e1426b555b0fa35dca5b /pkg/sentry/kernel/msgqueue/msgqueue.go
parent61bb9b254e3e03af9d4d1ee1e2fabd7d2ce13f5e (diff)
Implement Queue.Send.
Send implements the functionality of msgsnd(2). Updates #135
Diffstat (limited to 'pkg/sentry/kernel/msgqueue/msgqueue.go')
-rw-r--r--pkg/sentry/kernel/msgqueue/msgqueue.go120
1 files changed, 114 insertions, 6 deletions
diff --git a/pkg/sentry/kernel/msgqueue/msgqueue.go b/pkg/sentry/kernel/msgqueue/msgqueue.go
index 3ce926950..a25718fbb 100644
--- a/pkg/sentry/kernel/msgqueue/msgqueue.go
+++ b/pkg/sentry/kernel/msgqueue/msgqueue.go
@@ -119,14 +119,21 @@ type Queue struct {
type Message struct {
msgEntry
- // mType is an integer representing the type of the sent message.
- mType int64
+ // Type is an integer representing the type of the sent message.
+ Type int64
- // mText is an untyped block of memory.
- mText []byte
+ // Text is an untyped block of memory.
+ Text []byte
- // mSize is the size of mText.
- mSize uint64
+ // Size is the size of Text.
+ Size uint64
+}
+
+// Blocker is used for blocking Queue.Send, and Queue.Receive calls that serves
+// as an abstracted version of kernel.Task. kernel.Task is not directly used to
+// prevent circular dependencies.
+type Blocker interface {
+ Block(C <-chan struct{}) error
}
// FindOrCreate creates a new message queue or returns an existing one. See
@@ -186,6 +193,107 @@ func (r *Registry) Remove(id ipc.ID, creds *auth.Credentials) error {
return nil
}
+// FindByID returns the queue with the specified ID and an error if the ID
+// doesn't exist.
+func (r *Registry) FindByID(id ipc.ID) (*Queue, error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ mech := r.reg.FindByID(id)
+ if mech == nil {
+ return nil, linuxerr.EINVAL
+ }
+ return mech.(*Queue), nil
+}
+
+// Send appends a message to the message queue, and returns an error if sending
+// fails. See msgsnd(2).
+func (q *Queue) Send(ctx context.Context, m Message, b Blocker, wait bool, pid int32) (err error) {
+ // Try to perform a non-blocking send using queue.append. If EWOULDBLOCK
+ // is returned, start the blocking procedure. Otherwise, return normally.
+ creds := auth.CredentialsFromContext(ctx)
+ if err := q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
+ return err
+ }
+
+ if !wait {
+ return linuxerr.EAGAIN
+ }
+
+ e, ch := waiter.NewChannelEntry(nil)
+ q.senders.EventRegister(&e, waiter.EventOut)
+
+ for {
+ if err = q.append(ctx, m, creds, pid); err != linuxerr.EWOULDBLOCK {
+ break
+ }
+ b.Block(ch)
+ }
+
+ q.senders.EventUnregister(&e)
+ return err
+}
+
+// append appends a message to the queue's message list and notifies waiting
+// receivers that a message has been inserted. It returns an error if adding
+// the message would cause the queue to exceed its maximum capacity, which can
+// be used as a signal to block the task. Other errors should be returned as is.
+func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials, pid int32) error {
+ if m.Type <= 0 {
+ return linuxerr.EINVAL
+ }
+
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ if !q.obj.CheckPermissions(creds, fs.PermMask{Write: true}) {
+ // The calling process does not have write permission on the message
+ // queue, and does not have the CAP_IPC_OWNER capability in the user
+ // namespace that governs its IPC namespace.
+ return linuxerr.EACCES
+ }
+
+ // Queue was removed while the process was waiting.
+ if q.dead {
+ return linuxerr.EIDRM
+ }
+
+ // Check if sufficient space is available (the queue isn't full.) From
+ // the man pages:
+ //
+ // "A message queue is considered to be full if either of the following
+ // conditions is true:
+ //
+ // • Adding a new message to the queue would cause the total number
+ // of bytes in the queue to exceed the queue's maximum size (the
+ // msg_qbytes field).
+ //
+ // • Adding another message to the queue would cause the total
+ // number of messages in the queue to exceed the queue's maximum
+ // size (the msg_qbytes field). This check is necessary to
+ // prevent an unlimited number of zero-length messages being
+ // placed on the queue. Although such messages contain no data,
+ // they nevertheless consume (locked) kernel memory."
+ //
+ // The msg_qbytes field in our implementation is q.maxBytes.
+ if m.Size+q.byteCount > q.maxBytes || q.messageCount+1 > q.maxBytes {
+ return linuxerr.EWOULDBLOCK
+ }
+
+ // Copy the message into the queue.
+ q.messages.PushBack(&m)
+
+ q.byteCount += m.Size
+ q.messageCount++
+ q.sendPID = pid
+ q.sendTime = ktime.NowFromContext(ctx)
+
+ // Notify receivers about the new message.
+ q.receivers.Notify(waiter.EventIn)
+
+ return nil
+}
+
// Lock implements ipc.Mechanism.Lock.
func (q *Queue) Lock() {
q.mu.Lock()