summaryrefslogtreecommitdiffhomepage
path: root/pkg/sentry/kernel
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/sentry/kernel')
-rw-r--r--pkg/sentry/kernel/mq/BUILD1
-rw-r--r--pkg/sentry/kernel/mq/mq.go84
2 files changed, 84 insertions, 1 deletions
diff --git a/pkg/sentry/kernel/mq/BUILD b/pkg/sentry/kernel/mq/BUILD
index ec9cd18a9..b4e17b582 100644
--- a/pkg/sentry/kernel/mq/BUILD
+++ b/pkg/sentry/kernel/mq/BUILD
@@ -24,6 +24,7 @@ go_library(
visibility = ["//pkg/sentry:internal"],
deps = [
"//pkg/abi/linux",
+ "//pkg/context",
"//pkg/sentry/fs",
"//pkg/sync",
"//pkg/waiter",
diff --git a/pkg/sentry/kernel/mq/mq.go b/pkg/sentry/kernel/mq/mq.go
index df9bdc267..29a46e8a9 100644
--- a/pkg/sentry/kernel/mq/mq.go
+++ b/pkg/sentry/kernel/mq/mq.go
@@ -16,7 +16,11 @@
package mq
import (
+ "bytes"
+ "fmt"
+
"gvisor.dev/gvisor/pkg/abi/linux"
+ "gvisor.dev/gvisor/pkg/context"
"gvisor.dev/gvisor/pkg/sentry/fs"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/waiter"
@@ -52,7 +56,7 @@ type Queue struct {
// subscriber represents a task registered to receive async notification
// from this queue.
- subscriber Subscriber
+ subscriber *Subscriber
// nonBlock is true if this queue is non-blocking.
nonBlock bool
@@ -93,4 +97,82 @@ type Message struct {
// +stateify savable
type Subscriber struct {
// TODO: Add fields when mq_notify(2) is implemented.
+
+ // pid is the PID of the registered task.
+ pid int32
+}
+
+// Generate implements vfs.DynamicBytesSource.Generate. Queue is used as a
+// dynamic bytes source for mqfs's queueInode.
+func (q *Queue) Generate(ctx context.Context, buf *bytes.Buffer) error {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ var (
+ pid int32
+ method int
+ sigNumber int
+ )
+ if q.subscriber != nil {
+ pid = q.subscriber.pid
+ // TODO: add method and sigNumber when mq_notify(2) is implemented.
+ }
+
+ buf.WriteString(
+ fmt.Sprintf("QSIZE:%-10d NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
+ q.byteCount, method, sigNumber, pid),
+ )
+ return nil
+}
+
+// Flush checks if the calling process has attached a notification request to
+// this queue, if yes, then the request is removed, and another process can
+// attach a request.
+func (q *Queue) Flush(ctx context.Context) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ pid, ok := context.ThreadGroupIDFromContext(ctx)
+ if ok {
+ if q.subscriber != nil && pid == q.subscriber.pid {
+ q.subscriber = nil
+ }
+ }
+}
+
+// Readiness implements Waitable.Readiness.
+func (q *Queue) Readiness(mask waiter.EventMask) waiter.EventMask {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ events := waiter.EventMask(0)
+ if q.messageCount > 0 {
+ events |= waiter.ReadableEvents
+ }
+ if q.messageCount < q.maxMessageCount {
+ events |= waiter.WritableEvents
+ }
+ return events & mask
+}
+
+// EventRegister implements Waitable.EventRegister.
+func (q *Queue) EventRegister(e *waiter.Entry, mask waiter.EventMask) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ if mask&waiter.WritableEvents != 0 {
+ q.senders.EventRegister(e, waiter.EventOut)
+ }
+ if mask&waiter.ReadableEvents != 0 {
+ q.receivers.EventRegister(e, waiter.EventIn)
+ }
+}
+
+// EventUnregister implements Waitable.EventUnregister.
+func (q *Queue) EventUnregister(e *waiter.Entry) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ q.senders.EventUnregister(e)
+ q.receivers.EventUnregister(e)
}