diff options
author | Kevin Krakauer <krakauer@google.com> | 2021-10-07 17:37:50 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2021-10-07 17:41:20 -0700 |
commit | e44b100654ca639d11221e547384f699e461296d (patch) | |
tree | 950811fef6620dea99871f63e82a08cfa06849bf /pkg | |
parent | 487651ac46f302592ccffc9e5a4336a331010e42 (diff) |
add convenient wrapper for eventfd
The same create/write/read pattern is copied around several places. It's easier
to understand in a package with names and comments, and we can reuse the smart
blocking code in package rawfile.
PiperOrigin-RevId: 401647108
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/eventfd/BUILD | 22 | ||||
-rw-r--r-- | pkg/eventfd/eventfd.go | 115 | ||||
-rw-r--r-- | pkg/eventfd/eventfd_test.go | 75 | ||||
-rw-r--r-- | pkg/sentry/hostmm/BUILD | 3 | ||||
-rw-r--r-- | pkg/sentry/hostmm/hostmm.go | 42 | ||||
-rw-r--r-- | pkg/tcpip/link/rawfile/rawfile_unsafe.go | 16 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/BUILD | 1 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/queuepair.go | 17 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/rx.go | 26 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/server_rx.go | 18 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/server_tx.go | 16 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem.go | 10 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem_server.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem_test.go | 14 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/tx.go | 5 | ||||
-rw-r--r-- | pkg/unet/BUILD | 1 | ||||
-rw-r--r-- | pkg/unet/unet.go | 20 | ||||
-rw-r--r-- | pkg/unet/unet_unsafe.go | 2 |
18 files changed, 283 insertions, 123 deletions
diff --git a/pkg/eventfd/BUILD b/pkg/eventfd/BUILD new file mode 100644 index 000000000..02407cb99 --- /dev/null +++ b/pkg/eventfd/BUILD @@ -0,0 +1,22 @@ +load("//tools:defs.bzl", "go_library", "go_test") + +package(licenses = ["notice"]) + +go_library( + name = "eventfd", + srcs = [ + "eventfd.go", + ], + visibility = ["//:sandbox"], + deps = [ + "//pkg/hostarch", + "//pkg/tcpip/link/rawfile", + "@org_golang_x_sys//unix:go_default_library", + ], +) + +go_test( + name = "eventfd_test", + srcs = ["eventfd_test.go"], + library = ":eventfd", +) diff --git a/pkg/eventfd/eventfd.go b/pkg/eventfd/eventfd.go new file mode 100644 index 000000000..acdac01b8 --- /dev/null +++ b/pkg/eventfd/eventfd.go @@ -0,0 +1,115 @@ +// Copyright 2021 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventfd wraps Linux's eventfd(2) syscall. +package eventfd + +import ( + "fmt" + "io" + + "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/hostarch" + "gvisor.dev/gvisor/pkg/tcpip/link/rawfile" +) + +const sizeofUint64 = 8 + +// Eventfd represents a Linux eventfd object. +type Eventfd struct { + fd int +} + +// Create returns an initialized eventfd. +func Create() (Eventfd, error) { + fd, _, err := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0) + if err != 0 { + return Eventfd{}, fmt.Errorf("failed to create eventfd: %v", error(err)) + } + if err := unix.SetNonblock(int(fd), true); err != nil { + unix.Close(int(fd)) + return Eventfd{}, err + } + return Eventfd{int(fd)}, nil +} + +// Wrap returns an initialized Eventfd using the provided fd. +func Wrap(fd int) Eventfd { + return Eventfd{fd} +} + +// Close closes the eventfd, after which it should not be used. +func (ev Eventfd) Close() error { + return unix.Close(ev.fd) +} + +// Dup copies the eventfd, calling dup(2) on the underlying file descriptor. +func (ev Eventfd) Dup() (Eventfd, error) { + other, err := unix.Dup(ev.fd) + if err != nil { + return Eventfd{}, fmt.Errorf("failed to dup: %v", other) + } + return Eventfd{other}, nil +} + +// Notify alerts other users of the eventfd. Users can receive alerts by +// calling Wait or Read. +func (ev Eventfd) Notify() error { + return ev.Write(1) +} + +// Write writes a specific value to the eventfd. +func (ev Eventfd) Write(val uint64) error { + var buf [sizeofUint64]byte + hostarch.ByteOrder.PutUint64(buf[:], val) + for { + n, err := unix.Write(ev.fd, buf[:]) + if err == unix.EINTR { + continue + } + if n != sizeofUint64 { + panic(fmt.Sprintf("short write to eventfd: got %d bytes, wanted %d", n, sizeofUint64)) + } + return err + } +} + +// Wait blocks until eventfd is non-zero (i.e. someone calls Notify or Write). +func (ev Eventfd) Wait() error { + _, err := ev.Read() + return err +} + +// Read blocks until eventfd is non-zero (i.e. someone calls Notify or Write) +// and returns the value read. +func (ev Eventfd) Read() (uint64, error) { + var tmp [sizeofUint64]byte + n, err := rawfile.BlockingReadUntranslated(ev.fd, tmp[:]) + if err != 0 { + return 0, err + } + if n == 0 { + return 0, io.EOF + } + if n != sizeofUint64 { + panic(fmt.Sprintf("short read from eventfd: got %d bytes, wanted %d", n, sizeofUint64)) + } + return hostarch.ByteOrder.Uint64(tmp[:]), nil +} + +// FD returns the underlying file descriptor. Use with care, as this breaks the +// Eventfd abstraction. +func (ev Eventfd) FD() int { + return ev.fd +} diff --git a/pkg/eventfd/eventfd_test.go b/pkg/eventfd/eventfd_test.go new file mode 100644 index 000000000..96998d530 --- /dev/null +++ b/pkg/eventfd/eventfd_test.go @@ -0,0 +1,75 @@ +// Copyright 2021 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventfd + +import ( + "testing" + "time" +) + +func TestReadWrite(t *testing.T) { + efd, err := Create() + if err != nil { + t.Fatalf("failed to Create(): %v", err) + } + defer efd.Close() + + // Make sure we can read actual values + const want = 343 + if err := efd.Write(want); err != nil { + t.Fatalf("failed to write value: %d", want) + } + + got, err := efd.Read() + if err != nil { + t.Fatalf("failed to read value: %v", err) + } + if got != want { + t.Fatalf("Read(): got %d, but wanted %d", got, want) + } +} + +func TestWait(t *testing.T) { + efd, err := Create() + if err != nil { + t.Fatalf("failed to Create(): %v", err) + } + defer efd.Close() + + // There's no way to test with certainty that Wait() blocks indefinitely, but + // as a best-effort we can wait a bit on it. + errCh := make(chan error) + go func() { + errCh <- efd.Wait() + }() + select { + case err := <-errCh: + t.Fatalf("Wait() returned without a call to Notify(): %v", err) + case <-time.After(500 * time.Millisecond): + } + + // Notify and check that Wait() returned. + if err := efd.Notify(); err != nil { + t.Fatalf("Notify() failed: %v", err) + } + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Read() failed: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("Read() did not return after Notify()") + } +} diff --git a/pkg/sentry/hostmm/BUILD b/pkg/sentry/hostmm/BUILD index 66fa1ad40..03c8e2f38 100644 --- a/pkg/sentry/hostmm/BUILD +++ b/pkg/sentry/hostmm/BUILD @@ -12,8 +12,7 @@ go_library( visibility = ["//pkg/sentry:internal"], deps = [ "//pkg/abi/linux", - "//pkg/fd", - "//pkg/hostarch", + "//pkg/eventfd", "//pkg/log", "@org_golang_x_sys//unix:go_default_library", ], diff --git a/pkg/sentry/hostmm/hostmm.go b/pkg/sentry/hostmm/hostmm.go index 285ea9050..5df06a60f 100644 --- a/pkg/sentry/hostmm/hostmm.go +++ b/pkg/sentry/hostmm/hostmm.go @@ -21,9 +21,7 @@ import ( "os" "path" - "golang.org/x/sys/unix" - "gvisor.dev/gvisor/pkg/fd" - "gvisor.dev/gvisor/pkg/hostarch" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/log" ) @@ -54,7 +52,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error) } defer eventControlFile.Close() - eventFD, err := newEventFD() + eventFD, err := eventfd.Create() if err != nil { return nil, err } @@ -75,20 +73,11 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error) const stopVal = 1 << 63 stopCh := make(chan struct{}) go func() { // S/R-SAFE: f provides synchronization if necessary - rw := fd.NewReadWriter(eventFD.FD()) - var buf [sizeofUint64]byte for { - n, err := rw.Read(buf[:]) + val, err := eventFD.Read() if err != nil { - if err == unix.EINTR { - continue - } panic(fmt.Sprintf("failed to read from memory pressure level eventfd: %v", err)) } - if n != sizeofUint64 { - panic(fmt.Sprintf("short read from memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64)) - } - val := hostarch.ByteOrder.Uint64(buf[:]) if val >= stopVal { // Assume this was due to the notifier's "destructor" (the // function returned by NotifyCurrentMemcgPressureCallback @@ -101,30 +90,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error) } }() return func() { - rw := fd.NewReadWriter(eventFD.FD()) - var buf [sizeofUint64]byte - hostarch.ByteOrder.PutUint64(buf[:], stopVal) - for { - n, err := rw.Write(buf[:]) - if err != nil { - if err == unix.EINTR { - continue - } - panic(fmt.Sprintf("failed to write to memory pressure level eventfd: %v", err)) - } - if n != sizeofUint64 { - panic(fmt.Sprintf("short write to memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64)) - } - break - } + eventFD.Write(stopVal) <-stopCh }, nil } - -func newEventFD() (*fd.FD, error) { - f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0) - if e != 0 { - return nil, fmt.Errorf("failed to create eventfd: %v", e) - } - return fd.New(int(f)), nil -} diff --git a/pkg/tcpip/link/rawfile/rawfile_unsafe.go b/pkg/tcpip/link/rawfile/rawfile_unsafe.go index 87a0b9a62..e53789d92 100644 --- a/pkg/tcpip/link/rawfile/rawfile_unsafe.go +++ b/pkg/tcpip/link/rawfile/rawfile_unsafe.go @@ -152,10 +152,22 @@ type PollEvent struct { // no data is available, it will block in a poll() syscall until the file // descriptor becomes readable. func BlockingRead(fd int, b []byte) (int, tcpip.Error) { + n, err := BlockingReadUntranslated(fd, b) + if err != 0 { + return n, TranslateErrno(err) + } + return n, nil +} + +// BlockingReadUntranslated reads from a file descriptor that is set up as +// non-blocking. If no data is available, it will block in a poll() syscall +// until the file descriptor becomes readable. It returns the raw unix.Errno +// value returned by the underlying syscalls. +func BlockingReadUntranslated(fd int, b []byte) (int, unix.Errno) { for { n, _, e := unix.RawSyscall(unix.SYS_READ, uintptr(fd), uintptr(unsafe.Pointer(&b[0])), uintptr(len(b))) if e == 0 { - return int(n), nil + return int(n), 0 } event := PollEvent{ @@ -165,7 +177,7 @@ func BlockingRead(fd int, b []byte) (int, tcpip.Error) { _, e = BlockingPoll(&event, 1, nil) if e != 0 && e != unix.EINTR { - return 0, TranslateErrno(e) + return 0, e } } } diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD index 6c35aeecf..f8076d83c 100644 --- a/pkg/tcpip/link/sharedmem/BUILD +++ b/pkg/tcpip/link/sharedmem/BUILD @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/cleanup", + "//pkg/eventfd", "//pkg/log", "//pkg/sync", "//pkg/tcpip", diff --git a/pkg/tcpip/link/sharedmem/queuepair.go b/pkg/tcpip/link/sharedmem/queuepair.go index 5fa6d91f0..b12647fdd 100644 --- a/pkg/tcpip/link/sharedmem/queuepair.go +++ b/pkg/tcpip/link/sharedmem/queuepair.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/eventfd" ) const ( @@ -116,25 +117,25 @@ type queueSizes struct { func createQueueFDs(s queueSizes) (QueueConfig, error) { success := false - var fd uintptr + var eventFD eventfd.Eventfd var dataFD, txPipeFD, rxPipeFD, sharedDataFD int defer func() { if success { return } closeFDs(QueueConfig{ - EventFD: int(fd), + EventFD: eventFD, DataFD: dataFD, TxPipeFD: txPipeFD, RxPipeFD: rxPipeFD, SharedDataFD: sharedDataFD, }) }() - eventFD, _, errno := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0) - if errno != 0 { - return QueueConfig{}, fmt.Errorf("eventfd failed: %v", error(errno)) + eventFD, err := eventfd.Create() + if err != nil { + return QueueConfig{}, fmt.Errorf("eventfd failed: %v", err) } - dataFD, err := createFile(s.dataSize, false) + dataFD, err = createFile(s.dataSize, false) if err != nil { return QueueConfig{}, fmt.Errorf("failed to create dataFD: %s", err) } @@ -152,7 +153,7 @@ func createQueueFDs(s queueSizes) (QueueConfig, error) { } success = true return QueueConfig{ - EventFD: int(eventFD), + EventFD: eventFD, DataFD: dataFD, TxPipeFD: txPipeFD, RxPipeFD: rxPipeFD, @@ -191,7 +192,7 @@ func createFile(size int64, initQueue bool) (fd int, err error) { func closeFDs(c QueueConfig) { unix.Close(c.DataFD) - unix.Close(c.EventFD) + c.EventFD.Close() unix.Close(c.TxPipeFD) unix.Close(c.RxPipeFD) unix.Close(c.SharedDataFD) diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go index 399317335..87747dcc7 100644 --- a/pkg/tcpip/link/sharedmem/rx.go +++ b/pkg/tcpip/link/sharedmem/rx.go @@ -21,7 +21,7 @@ import ( "sync/atomic" "golang.org/x/sys/unix" - "gvisor.dev/gvisor/pkg/tcpip/link/rawfile" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue" ) @@ -30,7 +30,7 @@ type rx struct { data []byte sharedData []byte q queue.Rx - eventFD int + eventFD eventfd.Eventfd } // init initializes all state needed by the rx queue based on the information @@ -68,7 +68,7 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error { // Duplicate the eventFD so that caller can close it but we can still // use it. - efd, err := unix.Dup(c.EventFD) + efd, err := c.EventFD.Dup() if err != nil { unix.Munmap(txPipe) unix.Munmap(rxPipe) @@ -77,16 +77,6 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error { return err } - // Set the eventfd as non-blocking. - if err := unix.SetNonblock(efd, true); err != nil { - unix.Munmap(txPipe) - unix.Munmap(rxPipe) - unix.Munmap(data) - unix.Munmap(sharedData) - unix.Close(efd) - return err - } - // Initialize state based on buffers. r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData)) r.data = data @@ -105,13 +95,13 @@ func (r *rx) cleanup() { unix.Munmap(r.data) unix.Munmap(r.sharedData) - unix.Close(r.eventFD) + r.eventFD.Close() } // notify writes to the tx.eventFD to indicate to the peer that there is data to // be read. func (r *rx) notify() { - unix.Write(r.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + r.eventFD.Notify() } // postAndReceive posts the provided buffers (if any), and then tries to read @@ -128,8 +118,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue. if len(b) != 0 && !r.q.PostBuffers(b) { r.q.EnableNotification() for !r.q.PostBuffers(b) { - var tmp [8]byte - rawfile.BlockingRead(r.eventFD, tmp[:]) + r.eventFD.Wait() if atomic.LoadUint32(stopRequested) != 0 { r.q.DisableNotification() return nil, 0 @@ -153,8 +142,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue. } // Wait for notification. - var tmp [8]byte - rawfile.BlockingRead(r.eventFD, tmp[:]) + r.eventFD.Wait() if atomic.LoadUint32(stopRequested) != 0 { r.q.DisableNotification() return nil, 0 diff --git a/pkg/tcpip/link/sharedmem/server_rx.go b/pkg/tcpip/link/sharedmem/server_rx.go index 2ad8bf650..6ea21ffd1 100644 --- a/pkg/tcpip/link/sharedmem/server_rx.go +++ b/pkg/tcpip/link/sharedmem/server_rx.go @@ -20,7 +20,7 @@ package sharedmem import ( "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/cleanup" - "gvisor.dev/gvisor/pkg/tcpip/link/rawfile" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue" ) @@ -38,7 +38,7 @@ type serverRx struct { data []byte // eventFD is used to notify the peer when transmission is completed. - eventFD int + eventFD eventfd.Eventfd // sharedData the memory region to use to enable/disable notifications. sharedData []byte @@ -78,16 +78,11 @@ func (s *serverRx) init(c *QueueConfig) error { // Duplicate the eventFD so that caller can close it but we can still // use it. - efd, err := unix.Dup(c.EventFD) + efd, err := c.EventFD.Dup() if err != nil { return err } - cu.Add(func() { unix.Close(efd) }) - - // Set the eventfd as non-blocking. - if err := unix.SetNonblock(efd, true); err != nil { - return err - } + cu.Add(func() { efd.Close() }) s.packetPipe.Init(packetPipeMem) s.completionPipe.Init(completionPipeMem) @@ -104,7 +99,7 @@ func (s *serverRx) cleanup() { unix.Munmap(s.completionPipe.Bytes()) unix.Munmap(s.data) unix.Munmap(s.sharedData) - unix.Close(s.eventFD) + s.eventFD.Close() } // completionNotificationSize is size in bytes of a completion notification sent @@ -143,6 +138,5 @@ func (s *serverRx) receive() []byte { } func (s *serverRx) waitForPackets() { - var tmp [8]byte - rawfile.BlockingRead(s.eventFD, tmp[:]) + s.eventFD.Wait() } diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go index 9370b2a46..13a82903f 100644 --- a/pkg/tcpip/link/sharedmem/server_tx.go +++ b/pkg/tcpip/link/sharedmem/server_tx.go @@ -20,6 +20,7 @@ package sharedmem import ( "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/cleanup" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue" @@ -40,7 +41,7 @@ type serverTx struct { data []byte // eventFD is used to notify the peer when fill requests are fulfilled. - eventFD int + eventFD eventfd.Eventfd // sharedData the memory region to use to enable/disable notifications. sharedData []byte @@ -80,16 +81,11 @@ func (s *serverTx) init(c *QueueConfig) error { // Duplicate the eventFD so that caller can close it but we can still // use it. - efd, err := unix.Dup(c.EventFD) + efd, err := c.EventFD.Dup() if err != nil { return err } - cu.Add(func() { unix.Close(efd) }) - - // Set the eventfd as non-blocking. - if err := unix.SetNonblock(efd, true); err != nil { - return err - } + cu.Add(func() { efd.Close() }) cu.Release() @@ -107,7 +103,7 @@ func (s *serverTx) cleanup() { unix.Munmap(s.completionPipe.Bytes()) unix.Munmap(s.data) unix.Munmap(s.sharedData) - unix.Close(s.eventFD) + s.eventFD.Close() } // fillPacket copies the data in the provided views into buffers pulled from the @@ -175,5 +171,5 @@ func (s *serverTx) transmit(views []buffer.View) bool { } func (s *serverTx) notify() { - unix.Write(s.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + s.eventFD.Notify() } diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go index e2a8c4863..bcb37a465 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem.go +++ b/pkg/tcpip/link/sharedmem/sharedmem.go @@ -27,7 +27,7 @@ import ( "fmt" "sync/atomic" - "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" @@ -49,7 +49,7 @@ type QueueConfig struct { // EventFD is a file descriptor for the event that is signaled when // data is becomes available in this queue. - EventFD int + EventFD eventfd.Eventfd // TxPipeFD is a file descriptor for the tx pipe associated with the // queue. @@ -70,7 +70,7 @@ type QueueConfig struct { // of FDs matches when reconstructing the config when serialized or sent // as part of control messages. func (q *QueueConfig) FDs() []int { - return []int{q.DataFD, q.EventFD, q.TxPipeFD, q.RxPipeFD, q.SharedDataFD} + return []int{q.DataFD, q.EventFD.FD(), q.TxPipeFD, q.RxPipeFD, q.SharedDataFD} } // QueueConfigFromFDs constructs a QueueConfig out of a slice of ints where each @@ -84,7 +84,7 @@ func QueueConfigFromFDs(fds []int) (QueueConfig, error) { } return QueueConfig{ DataFD: fds[0], - EventFD: fds[1], + EventFD: eventfd.Wrap(fds[1]), TxPipeFD: fds[2], RxPipeFD: fds[3], SharedDataFD: fds[4], @@ -223,7 +223,7 @@ func (e *endpoint) Close() { // Tell dispatch goroutine to stop, then write to the eventfd so that // it wakes up in case it's sleeping. atomic.StoreUint32(&e.stopRequested, 1) - unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + e.rx.eventFD.Notify() // Cleanup the queues inline if the worker hasn't started yet; we also // know it won't start from now on because stopRequested is set to 1. diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go index 16feb64b2..ccc84989d 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem_server.go +++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go @@ -20,7 +20,6 @@ package sharedmem import ( "sync/atomic" - "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" @@ -122,7 +121,7 @@ func (e *serverEndpoint) Close() { // Tell dispatch goroutine to stop, then write to the eventfd so that it wakes // up in case it's sleeping. atomic.StoreUint32(&e.stopRequested, 1) - unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + e.rx.eventFD.Notify() // Cleanup the queues inline if the worker hasn't started yet; we also know it // won't start from now on because stopRequested is set to 1. diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go index bb094da63..66ffc33b8 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem_test.go +++ b/pkg/tcpip/link/sharedmem/sharedmem_test.go @@ -619,7 +619,7 @@ func TestSimpleReceive(t *testing.T) { // Push completion. c.pushRxCompletion(uint32(len(contents)), bufs) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Wait for packet to be received, then check it. c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet") @@ -665,7 +665,7 @@ func TestRxBuffersReposted(t *testing.T) { // Complete the buffer. c.pushRxCompletion(buffers[i].Size, buffers[i:][:1]) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Wait for it to be reposted. bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) @@ -681,7 +681,7 @@ func TestRxBuffersReposted(t *testing.T) { // Complete with two buffers. c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2]) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Wait for them to be reposted. for j := 0; j < 2; j++ { @@ -706,7 +706,7 @@ func TestReceivePostingIsFull(t *testing.T) { first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted")) c.pushRxCompletion(first.Size, []queue.RxBuffer{first}) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Check that packet is received. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") @@ -715,7 +715,7 @@ func TestReceivePostingIsFull(t *testing.T) { second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted")) c.pushRxCompletion(second.Size, []queue.RxBuffer{second}) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Check that no packet is received yet, as the worker is blocked trying // to repost. @@ -728,7 +728,7 @@ func TestReceivePostingIsFull(t *testing.T) { // Flush tx queue, which will allow the first buffer to be reposted, // and the second completion to be pulled. c.rxq.tx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Check that second packet completes. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet") @@ -750,7 +750,7 @@ func TestCloseWhileWaitingToPost(t *testing.T) { bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted")) c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi}) c.rxq.rx.Flush() - unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + c.rxCfg.EventFD.Notify() // Wait for packet to be indicated. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go index 5ffcb8ab4..35e5bff12 100644 --- a/pkg/tcpip/link/sharedmem/tx.go +++ b/pkg/tcpip/link/sharedmem/tx.go @@ -18,6 +18,7 @@ import ( "math" "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue" ) @@ -32,7 +33,7 @@ type tx struct { q queue.Tx ids idManager bufs bufferManager - eventFD int + eventFD eventfd.Eventfd sharedDataFD int } @@ -148,7 +149,7 @@ func (t *tx) transmit(bufs ...buffer.View) bool { // notify writes to the tx.eventFD to indicate to the peer that there is data to // be read. func (t *tx) notify() { - unix.Write(t.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + t.eventFD.Notify() } // getBuffer returns a memory region mapped to the full contents of the given diff --git a/pkg/unet/BUILD b/pkg/unet/BUILD index 234125c38..8902be2d3 100644 --- a/pkg/unet/BUILD +++ b/pkg/unet/BUILD @@ -10,6 +10,7 @@ go_library( ], visibility = ["//visibility:public"], deps = [ + "//pkg/eventfd", "//pkg/sync", "@org_golang_x_sys//unix:go_default_library", ], diff --git a/pkg/unet/unet.go b/pkg/unet/unet.go index 40fa72925..0dc0c37bd 100644 --- a/pkg/unet/unet.go +++ b/pkg/unet/unet.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/eventfd" "gvisor.dev/gvisor/pkg/sync" ) @@ -55,15 +56,6 @@ func socket(packet bool) (int, error) { return fd, nil } -// eventFD returns a new event FD with initial value 0. -func eventFD() (int, error) { - f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0) - if e != 0 { - return -1, e - } - return int(f), nil -} - // Socket is a connected unix domain socket. type Socket struct { // gate protects use of fd. @@ -78,7 +70,7 @@ type Socket struct { // efd is an event FD that is signaled when the socket is closing. // // efd is immutable and remains valid until Close/Release. - efd int + efd eventfd.Eventfd // race is an atomic variable used to avoid triggering the race // detector. See comment in SocketPair below. @@ -95,7 +87,7 @@ func NewSocket(fd int) (*Socket, error) { return nil, err } - efd, err := eventFD() + efd, err := eventfd.Create() if err != nil { return nil, err } @@ -110,16 +102,14 @@ func NewSocket(fd int) (*Socket, error) { // closing the event FD. func (s *Socket) finish() error { // Signal any blocked or future polls. - // - // N.B. eventfd writes must be 8 bytes. - if _, err := unix.Write(s.efd, []byte{1, 0, 0, 0, 0, 0, 0, 0}); err != nil { + if err := s.efd.Notify(); err != nil { return err } // Close the gate, blocking until all FD users leave. s.gate.Close() - return unix.Close(s.efd) + return s.efd.Close() } // Close closes the socket. diff --git a/pkg/unet/unet_unsafe.go b/pkg/unet/unet_unsafe.go index f0bf93ddd..ea281fec3 100644 --- a/pkg/unet/unet_unsafe.go +++ b/pkg/unet/unet_unsafe.go @@ -43,7 +43,7 @@ func (s *Socket) wait(write bool) error { }, { // The eventfd, signaled when we are closing. - Fd: int32(s.efd), + Fd: int32(s.efd.FD()), Events: unix.POLLIN, }, } |