From 0061d0e4e5d74efce8af8d706437cba3d040cd5f Mon Sep 17 00:00:00 2001 From: "Zyad A. Ali" Date: Tue, 20 Jul 2021 20:31:46 +0200 Subject: Implement queueInode and queueFD in mqfs. Implement inode and file description representing a POSIX message queue, and other utilities needed to implement file operations. Updates #136 --- pkg/sentry/kernel/mq/mq.go | 84 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) (limited to 'pkg/sentry/kernel/mq/mq.go') diff --git a/pkg/sentry/kernel/mq/mq.go b/pkg/sentry/kernel/mq/mq.go index df9bdc267..29a46e8a9 100644 --- a/pkg/sentry/kernel/mq/mq.go +++ b/pkg/sentry/kernel/mq/mq.go @@ -16,7 +16,11 @@ package mq import ( + "bytes" + "fmt" + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/context" "gvisor.dev/gvisor/pkg/sentry/fs" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/waiter" @@ -52,7 +56,7 @@ type Queue struct { // subscriber represents a task registered to receive async notification // from this queue. - subscriber Subscriber + subscriber *Subscriber // nonBlock is true if this queue is non-blocking. nonBlock bool @@ -93,4 +97,82 @@ type Message struct { // +stateify savable type Subscriber struct { // TODO: Add fields when mq_notify(2) is implemented. + + // pid is the PID of the registered task. + pid int32 +} + +// Generate implements vfs.DynamicBytesSource.Generate. Queue is used as a +// dynamic bytes source for mqfs's queueInode. +func (q *Queue) Generate(ctx context.Context, buf *bytes.Buffer) error { + q.mu.Lock() + defer q.mu.Unlock() + + var ( + pid int32 + method int + sigNumber int + ) + if q.subscriber != nil { + pid = q.subscriber.pid + // TODO: add method and sigNumber when mq_notify(2) is implemented. + } + + buf.WriteString( + fmt.Sprintf("QSIZE:%-10d NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n", + q.byteCount, method, sigNumber, pid), + ) + return nil +} + +// Flush checks if the calling process has attached a notification request to +// this queue, if yes, then the request is removed, and another process can +// attach a request. +func (q *Queue) Flush(ctx context.Context) { + q.mu.Lock() + defer q.mu.Unlock() + + pid, ok := context.ThreadGroupIDFromContext(ctx) + if ok { + if q.subscriber != nil && pid == q.subscriber.pid { + q.subscriber = nil + } + } +} + +// Readiness implements Waitable.Readiness. +func (q *Queue) Readiness(mask waiter.EventMask) waiter.EventMask { + q.mu.Lock() + defer q.mu.Unlock() + + events := waiter.EventMask(0) + if q.messageCount > 0 { + events |= waiter.ReadableEvents + } + if q.messageCount < q.maxMessageCount { + events |= waiter.WritableEvents + } + return events & mask +} + +// EventRegister implements Waitable.EventRegister. +func (q *Queue) EventRegister(e *waiter.Entry, mask waiter.EventMask) { + q.mu.Lock() + defer q.mu.Unlock() + + if mask&waiter.WritableEvents != 0 { + q.senders.EventRegister(e, waiter.EventOut) + } + if mask&waiter.ReadableEvents != 0 { + q.receivers.EventRegister(e, waiter.EventIn) + } +} + +// EventUnregister implements Waitable.EventUnregister. +func (q *Queue) EventUnregister(e *waiter.Entry) { + q.mu.Lock() + defer q.mu.Unlock() + + q.senders.EventUnregister(e) + q.receivers.EventUnregister(e) } -- cgit v1.2.3