summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
authorKevin Krakauer <krakauer@google.com>2021-10-07 17:37:50 -0700
committergVisor bot <gvisor-bot@google.com>2021-10-07 17:41:20 -0700
commite44b100654ca639d11221e547384f699e461296d (patch)
tree950811fef6620dea99871f63e82a08cfa06849bf /pkg
parent487651ac46f302592ccffc9e5a4336a331010e42 (diff)
add convenient wrapper for eventfd
The same create/write/read pattern is copied around several places. It's easier to understand in a package with names and comments, and we can reuse the smart blocking code in package rawfile. PiperOrigin-RevId: 401647108
Diffstat (limited to 'pkg')
-rw-r--r--pkg/eventfd/BUILD22
-rw-r--r--pkg/eventfd/eventfd.go115
-rw-r--r--pkg/eventfd/eventfd_test.go75
-rw-r--r--pkg/sentry/hostmm/BUILD3
-rw-r--r--pkg/sentry/hostmm/hostmm.go42
-rw-r--r--pkg/tcpip/link/rawfile/rawfile_unsafe.go16
-rw-r--r--pkg/tcpip/link/sharedmem/BUILD1
-rw-r--r--pkg/tcpip/link/sharedmem/queuepair.go17
-rw-r--r--pkg/tcpip/link/sharedmem/rx.go26
-rw-r--r--pkg/tcpip/link/sharedmem/server_rx.go18
-rw-r--r--pkg/tcpip/link/sharedmem/server_tx.go16
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem.go10
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_server.go3
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_test.go14
-rw-r--r--pkg/tcpip/link/sharedmem/tx.go5
-rw-r--r--pkg/unet/BUILD1
-rw-r--r--pkg/unet/unet.go20
-rw-r--r--pkg/unet/unet_unsafe.go2
18 files changed, 283 insertions, 123 deletions
diff --git a/pkg/eventfd/BUILD b/pkg/eventfd/BUILD
new file mode 100644
index 000000000..02407cb99
--- /dev/null
+++ b/pkg/eventfd/BUILD
@@ -0,0 +1,22 @@
+load("//tools:defs.bzl", "go_library", "go_test")
+
+package(licenses = ["notice"])
+
+go_library(
+ name = "eventfd",
+ srcs = [
+ "eventfd.go",
+ ],
+ visibility = ["//:sandbox"],
+ deps = [
+ "//pkg/hostarch",
+ "//pkg/tcpip/link/rawfile",
+ "@org_golang_x_sys//unix:go_default_library",
+ ],
+)
+
+go_test(
+ name = "eventfd_test",
+ srcs = ["eventfd_test.go"],
+ library = ":eventfd",
+)
diff --git a/pkg/eventfd/eventfd.go b/pkg/eventfd/eventfd.go
new file mode 100644
index 000000000..acdac01b8
--- /dev/null
+++ b/pkg/eventfd/eventfd.go
@@ -0,0 +1,115 @@
+// Copyright 2021 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package eventfd wraps Linux's eventfd(2) syscall.
+package eventfd
+
+import (
+ "fmt"
+ "io"
+
+ "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/hostarch"
+ "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+)
+
+const sizeofUint64 = 8
+
+// Eventfd represents a Linux eventfd object.
+type Eventfd struct {
+ fd int
+}
+
+// Create returns an initialized eventfd.
+func Create() (Eventfd, error) {
+ fd, _, err := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0)
+ if err != 0 {
+ return Eventfd{}, fmt.Errorf("failed to create eventfd: %v", error(err))
+ }
+ if err := unix.SetNonblock(int(fd), true); err != nil {
+ unix.Close(int(fd))
+ return Eventfd{}, err
+ }
+ return Eventfd{int(fd)}, nil
+}
+
+// Wrap returns an initialized Eventfd using the provided fd.
+func Wrap(fd int) Eventfd {
+ return Eventfd{fd}
+}
+
+// Close closes the eventfd, after which it should not be used.
+func (ev Eventfd) Close() error {
+ return unix.Close(ev.fd)
+}
+
+// Dup copies the eventfd, calling dup(2) on the underlying file descriptor.
+func (ev Eventfd) Dup() (Eventfd, error) {
+ other, err := unix.Dup(ev.fd)
+ if err != nil {
+ return Eventfd{}, fmt.Errorf("failed to dup: %v", other)
+ }
+ return Eventfd{other}, nil
+}
+
+// Notify alerts other users of the eventfd. Users can receive alerts by
+// calling Wait or Read.
+func (ev Eventfd) Notify() error {
+ return ev.Write(1)
+}
+
+// Write writes a specific value to the eventfd.
+func (ev Eventfd) Write(val uint64) error {
+ var buf [sizeofUint64]byte
+ hostarch.ByteOrder.PutUint64(buf[:], val)
+ for {
+ n, err := unix.Write(ev.fd, buf[:])
+ if err == unix.EINTR {
+ continue
+ }
+ if n != sizeofUint64 {
+ panic(fmt.Sprintf("short write to eventfd: got %d bytes, wanted %d", n, sizeofUint64))
+ }
+ return err
+ }
+}
+
+// Wait blocks until eventfd is non-zero (i.e. someone calls Notify or Write).
+func (ev Eventfd) Wait() error {
+ _, err := ev.Read()
+ return err
+}
+
+// Read blocks until eventfd is non-zero (i.e. someone calls Notify or Write)
+// and returns the value read.
+func (ev Eventfd) Read() (uint64, error) {
+ var tmp [sizeofUint64]byte
+ n, err := rawfile.BlockingReadUntranslated(ev.fd, tmp[:])
+ if err != 0 {
+ return 0, err
+ }
+ if n == 0 {
+ return 0, io.EOF
+ }
+ if n != sizeofUint64 {
+ panic(fmt.Sprintf("short read from eventfd: got %d bytes, wanted %d", n, sizeofUint64))
+ }
+ return hostarch.ByteOrder.Uint64(tmp[:]), nil
+}
+
+// FD returns the underlying file descriptor. Use with care, as this breaks the
+// Eventfd abstraction.
+func (ev Eventfd) FD() int {
+ return ev.fd
+}
diff --git a/pkg/eventfd/eventfd_test.go b/pkg/eventfd/eventfd_test.go
new file mode 100644
index 000000000..96998d530
--- /dev/null
+++ b/pkg/eventfd/eventfd_test.go
@@ -0,0 +1,75 @@
+// Copyright 2021 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package eventfd
+
+import (
+ "testing"
+ "time"
+)
+
+func TestReadWrite(t *testing.T) {
+ efd, err := Create()
+ if err != nil {
+ t.Fatalf("failed to Create(): %v", err)
+ }
+ defer efd.Close()
+
+ // Make sure we can read actual values
+ const want = 343
+ if err := efd.Write(want); err != nil {
+ t.Fatalf("failed to write value: %d", want)
+ }
+
+ got, err := efd.Read()
+ if err != nil {
+ t.Fatalf("failed to read value: %v", err)
+ }
+ if got != want {
+ t.Fatalf("Read(): got %d, but wanted %d", got, want)
+ }
+}
+
+func TestWait(t *testing.T) {
+ efd, err := Create()
+ if err != nil {
+ t.Fatalf("failed to Create(): %v", err)
+ }
+ defer efd.Close()
+
+ // There's no way to test with certainty that Wait() blocks indefinitely, but
+ // as a best-effort we can wait a bit on it.
+ errCh := make(chan error)
+ go func() {
+ errCh <- efd.Wait()
+ }()
+ select {
+ case err := <-errCh:
+ t.Fatalf("Wait() returned without a call to Notify(): %v", err)
+ case <-time.After(500 * time.Millisecond):
+ }
+
+ // Notify and check that Wait() returned.
+ if err := efd.Notify(); err != nil {
+ t.Fatalf("Notify() failed: %v", err)
+ }
+ select {
+ case err := <-errCh:
+ if err != nil {
+ t.Fatalf("Read() failed: %v", err)
+ }
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Read() did not return after Notify()")
+ }
+}
diff --git a/pkg/sentry/hostmm/BUILD b/pkg/sentry/hostmm/BUILD
index 66fa1ad40..03c8e2f38 100644
--- a/pkg/sentry/hostmm/BUILD
+++ b/pkg/sentry/hostmm/BUILD
@@ -12,8 +12,7 @@ go_library(
visibility = ["//pkg/sentry:internal"],
deps = [
"//pkg/abi/linux",
- "//pkg/fd",
- "//pkg/hostarch",
+ "//pkg/eventfd",
"//pkg/log",
"@org_golang_x_sys//unix:go_default_library",
],
diff --git a/pkg/sentry/hostmm/hostmm.go b/pkg/sentry/hostmm/hostmm.go
index 285ea9050..5df06a60f 100644
--- a/pkg/sentry/hostmm/hostmm.go
+++ b/pkg/sentry/hostmm/hostmm.go
@@ -21,9 +21,7 @@ import (
"os"
"path"
- "golang.org/x/sys/unix"
- "gvisor.dev/gvisor/pkg/fd"
- "gvisor.dev/gvisor/pkg/hostarch"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/log"
)
@@ -54,7 +52,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
}
defer eventControlFile.Close()
- eventFD, err := newEventFD()
+ eventFD, err := eventfd.Create()
if err != nil {
return nil, err
}
@@ -75,20 +73,11 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
const stopVal = 1 << 63
stopCh := make(chan struct{})
go func() { // S/R-SAFE: f provides synchronization if necessary
- rw := fd.NewReadWriter(eventFD.FD())
- var buf [sizeofUint64]byte
for {
- n, err := rw.Read(buf[:])
+ val, err := eventFD.Read()
if err != nil {
- if err == unix.EINTR {
- continue
- }
panic(fmt.Sprintf("failed to read from memory pressure level eventfd: %v", err))
}
- if n != sizeofUint64 {
- panic(fmt.Sprintf("short read from memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64))
- }
- val := hostarch.ByteOrder.Uint64(buf[:])
if val >= stopVal {
// Assume this was due to the notifier's "destructor" (the
// function returned by NotifyCurrentMemcgPressureCallback
@@ -101,30 +90,7 @@ func NotifyCurrentMemcgPressureCallback(f func(), level string) (func(), error)
}
}()
return func() {
- rw := fd.NewReadWriter(eventFD.FD())
- var buf [sizeofUint64]byte
- hostarch.ByteOrder.PutUint64(buf[:], stopVal)
- for {
- n, err := rw.Write(buf[:])
- if err != nil {
- if err == unix.EINTR {
- continue
- }
- panic(fmt.Sprintf("failed to write to memory pressure level eventfd: %v", err))
- }
- if n != sizeofUint64 {
- panic(fmt.Sprintf("short write to memory pressure level eventfd: got %d bytes, wanted %d", n, sizeofUint64))
- }
- break
- }
+ eventFD.Write(stopVal)
<-stopCh
}, nil
}
-
-func newEventFD() (*fd.FD, error) {
- f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0)
- if e != 0 {
- return nil, fmt.Errorf("failed to create eventfd: %v", e)
- }
- return fd.New(int(f)), nil
-}
diff --git a/pkg/tcpip/link/rawfile/rawfile_unsafe.go b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
index 87a0b9a62..e53789d92 100644
--- a/pkg/tcpip/link/rawfile/rawfile_unsafe.go
+++ b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
@@ -152,10 +152,22 @@ type PollEvent struct {
// no data is available, it will block in a poll() syscall until the file
// descriptor becomes readable.
func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
+ n, err := BlockingReadUntranslated(fd, b)
+ if err != 0 {
+ return n, TranslateErrno(err)
+ }
+ return n, nil
+}
+
+// BlockingReadUntranslated reads from a file descriptor that is set up as
+// non-blocking. If no data is available, it will block in a poll() syscall
+// until the file descriptor becomes readable. It returns the raw unix.Errno
+// value returned by the underlying syscalls.
+func BlockingReadUntranslated(fd int, b []byte) (int, unix.Errno) {
for {
n, _, e := unix.RawSyscall(unix.SYS_READ, uintptr(fd), uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)))
if e == 0 {
- return int(n), nil
+ return int(n), 0
}
event := PollEvent{
@@ -165,7 +177,7 @@ func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
_, e = BlockingPoll(&event, 1, nil)
if e != 0 && e != unix.EINTR {
- return 0, TranslateErrno(e)
+ return 0, e
}
}
}
diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD
index 6c35aeecf..f8076d83c 100644
--- a/pkg/tcpip/link/sharedmem/BUILD
+++ b/pkg/tcpip/link/sharedmem/BUILD
@@ -17,6 +17,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/cleanup",
+ "//pkg/eventfd",
"//pkg/log",
"//pkg/sync",
"//pkg/tcpip",
diff --git a/pkg/tcpip/link/sharedmem/queuepair.go b/pkg/tcpip/link/sharedmem/queuepair.go
index 5fa6d91f0..b12647fdd 100644
--- a/pkg/tcpip/link/sharedmem/queuepair.go
+++ b/pkg/tcpip/link/sharedmem/queuepair.go
@@ -22,6 +22,7 @@ import (
"io/ioutil"
"golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/eventfd"
)
const (
@@ -116,25 +117,25 @@ type queueSizes struct {
func createQueueFDs(s queueSizes) (QueueConfig, error) {
success := false
- var fd uintptr
+ var eventFD eventfd.Eventfd
var dataFD, txPipeFD, rxPipeFD, sharedDataFD int
defer func() {
if success {
return
}
closeFDs(QueueConfig{
- EventFD: int(fd),
+ EventFD: eventFD,
DataFD: dataFD,
TxPipeFD: txPipeFD,
RxPipeFD: rxPipeFD,
SharedDataFD: sharedDataFD,
})
}()
- eventFD, _, errno := unix.RawSyscall(unix.SYS_EVENTFD2, 0, 0, 0)
- if errno != 0 {
- return QueueConfig{}, fmt.Errorf("eventfd failed: %v", error(errno))
+ eventFD, err := eventfd.Create()
+ if err != nil {
+ return QueueConfig{}, fmt.Errorf("eventfd failed: %v", err)
}
- dataFD, err := createFile(s.dataSize, false)
+ dataFD, err = createFile(s.dataSize, false)
if err != nil {
return QueueConfig{}, fmt.Errorf("failed to create dataFD: %s", err)
}
@@ -152,7 +153,7 @@ func createQueueFDs(s queueSizes) (QueueConfig, error) {
}
success = true
return QueueConfig{
- EventFD: int(eventFD),
+ EventFD: eventFD,
DataFD: dataFD,
TxPipeFD: txPipeFD,
RxPipeFD: rxPipeFD,
@@ -191,7 +192,7 @@ func createFile(size int64, initQueue bool) (fd int, err error) {
func closeFDs(c QueueConfig) {
unix.Close(c.DataFD)
- unix.Close(c.EventFD)
+ c.EventFD.Close()
unix.Close(c.TxPipeFD)
unix.Close(c.RxPipeFD)
unix.Close(c.SharedDataFD)
diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go
index 399317335..87747dcc7 100644
--- a/pkg/tcpip/link/sharedmem/rx.go
+++ b/pkg/tcpip/link/sharedmem/rx.go
@@ -21,7 +21,7 @@ import (
"sync/atomic"
"golang.org/x/sys/unix"
- "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
)
@@ -30,7 +30,7 @@ type rx struct {
data []byte
sharedData []byte
q queue.Rx
- eventFD int
+ eventFD eventfd.Eventfd
}
// init initializes all state needed by the rx queue based on the information
@@ -68,7 +68,7 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error {
// Duplicate the eventFD so that caller can close it but we can still
// use it.
- efd, err := unix.Dup(c.EventFD)
+ efd, err := c.EventFD.Dup()
if err != nil {
unix.Munmap(txPipe)
unix.Munmap(rxPipe)
@@ -77,16 +77,6 @@ func (r *rx) init(mtu uint32, c *QueueConfig) error {
return err
}
- // Set the eventfd as non-blocking.
- if err := unix.SetNonblock(efd, true); err != nil {
- unix.Munmap(txPipe)
- unix.Munmap(rxPipe)
- unix.Munmap(data)
- unix.Munmap(sharedData)
- unix.Close(efd)
- return err
- }
-
// Initialize state based on buffers.
r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData))
r.data = data
@@ -105,13 +95,13 @@ func (r *rx) cleanup() {
unix.Munmap(r.data)
unix.Munmap(r.sharedData)
- unix.Close(r.eventFD)
+ r.eventFD.Close()
}
// notify writes to the tx.eventFD to indicate to the peer that there is data to
// be read.
func (r *rx) notify() {
- unix.Write(r.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ r.eventFD.Notify()
}
// postAndReceive posts the provided buffers (if any), and then tries to read
@@ -128,8 +118,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.
if len(b) != 0 && !r.q.PostBuffers(b) {
r.q.EnableNotification()
for !r.q.PostBuffers(b) {
- var tmp [8]byte
- rawfile.BlockingRead(r.eventFD, tmp[:])
+ r.eventFD.Wait()
if atomic.LoadUint32(stopRequested) != 0 {
r.q.DisableNotification()
return nil, 0
@@ -153,8 +142,7 @@ func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.
}
// Wait for notification.
- var tmp [8]byte
- rawfile.BlockingRead(r.eventFD, tmp[:])
+ r.eventFD.Wait()
if atomic.LoadUint32(stopRequested) != 0 {
r.q.DisableNotification()
return nil, 0
diff --git a/pkg/tcpip/link/sharedmem/server_rx.go b/pkg/tcpip/link/sharedmem/server_rx.go
index 2ad8bf650..6ea21ffd1 100644
--- a/pkg/tcpip/link/sharedmem/server_rx.go
+++ b/pkg/tcpip/link/sharedmem/server_rx.go
@@ -20,7 +20,7 @@ package sharedmem
import (
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
- "gvisor.dev/gvisor/pkg/tcpip/link/rawfile"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
)
@@ -38,7 +38,7 @@ type serverRx struct {
data []byte
// eventFD is used to notify the peer when transmission is completed.
- eventFD int
+ eventFD eventfd.Eventfd
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
@@ -78,16 +78,11 @@ func (s *serverRx) init(c *QueueConfig) error {
// Duplicate the eventFD so that caller can close it but we can still
// use it.
- efd, err := unix.Dup(c.EventFD)
+ efd, err := c.EventFD.Dup()
if err != nil {
return err
}
- cu.Add(func() { unix.Close(efd) })
-
- // Set the eventfd as non-blocking.
- if err := unix.SetNonblock(efd, true); err != nil {
- return err
- }
+ cu.Add(func() { efd.Close() })
s.packetPipe.Init(packetPipeMem)
s.completionPipe.Init(completionPipeMem)
@@ -104,7 +99,7 @@ func (s *serverRx) cleanup() {
unix.Munmap(s.completionPipe.Bytes())
unix.Munmap(s.data)
unix.Munmap(s.sharedData)
- unix.Close(s.eventFD)
+ s.eventFD.Close()
}
// completionNotificationSize is size in bytes of a completion notification sent
@@ -143,6 +138,5 @@ func (s *serverRx) receive() []byte {
}
func (s *serverRx) waitForPackets() {
- var tmp [8]byte
- rawfile.BlockingRead(s.eventFD, tmp[:])
+ s.eventFD.Wait()
}
diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go
index 9370b2a46..13a82903f 100644
--- a/pkg/tcpip/link/sharedmem/server_tx.go
+++ b/pkg/tcpip/link/sharedmem/server_tx.go
@@ -20,6 +20,7 @@ package sharedmem
import (
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/cleanup"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
@@ -40,7 +41,7 @@ type serverTx struct {
data []byte
// eventFD is used to notify the peer when fill requests are fulfilled.
- eventFD int
+ eventFD eventfd.Eventfd
// sharedData the memory region to use to enable/disable notifications.
sharedData []byte
@@ -80,16 +81,11 @@ func (s *serverTx) init(c *QueueConfig) error {
// Duplicate the eventFD so that caller can close it but we can still
// use it.
- efd, err := unix.Dup(c.EventFD)
+ efd, err := c.EventFD.Dup()
if err != nil {
return err
}
- cu.Add(func() { unix.Close(efd) })
-
- // Set the eventfd as non-blocking.
- if err := unix.SetNonblock(efd, true); err != nil {
- return err
- }
+ cu.Add(func() { efd.Close() })
cu.Release()
@@ -107,7 +103,7 @@ func (s *serverTx) cleanup() {
unix.Munmap(s.completionPipe.Bytes())
unix.Munmap(s.data)
unix.Munmap(s.sharedData)
- unix.Close(s.eventFD)
+ s.eventFD.Close()
}
// fillPacket copies the data in the provided views into buffers pulled from the
@@ -175,5 +171,5 @@ func (s *serverTx) transmit(views []buffer.View) bool {
}
func (s *serverTx) notify() {
- unix.Write(s.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ s.eventFD.Notify()
}
diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go
index e2a8c4863..bcb37a465 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem.go
@@ -27,7 +27,7 @@ import (
"fmt"
"sync/atomic"
- "golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/log"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
@@ -49,7 +49,7 @@ type QueueConfig struct {
// EventFD is a file descriptor for the event that is signaled when
// data is becomes available in this queue.
- EventFD int
+ EventFD eventfd.Eventfd
// TxPipeFD is a file descriptor for the tx pipe associated with the
// queue.
@@ -70,7 +70,7 @@ type QueueConfig struct {
// of FDs matches when reconstructing the config when serialized or sent
// as part of control messages.
func (q *QueueConfig) FDs() []int {
- return []int{q.DataFD, q.EventFD, q.TxPipeFD, q.RxPipeFD, q.SharedDataFD}
+ return []int{q.DataFD, q.EventFD.FD(), q.TxPipeFD, q.RxPipeFD, q.SharedDataFD}
}
// QueueConfigFromFDs constructs a QueueConfig out of a slice of ints where each
@@ -84,7 +84,7 @@ func QueueConfigFromFDs(fds []int) (QueueConfig, error) {
}
return QueueConfig{
DataFD: fds[0],
- EventFD: fds[1],
+ EventFD: eventfd.Wrap(fds[1]),
TxPipeFD: fds[2],
RxPipeFD: fds[3],
SharedDataFD: fds[4],
@@ -223,7 +223,7 @@ func (e *endpoint) Close() {
// Tell dispatch goroutine to stop, then write to the eventfd so that
// it wakes up in case it's sleeping.
atomic.StoreUint32(&e.stopRequested, 1)
- unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ e.rx.eventFD.Notify()
// Cleanup the queues inline if the worker hasn't started yet; we also
// know it won't start from now on because stopRequested is set to 1.
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_server.go b/pkg/tcpip/link/sharedmem/sharedmem_server.go
index 16feb64b2..ccc84989d 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem_server.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem_server.go
@@ -20,7 +20,6 @@ package sharedmem
import (
"sync/atomic"
- "golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -122,7 +121,7 @@ func (e *serverEndpoint) Close() {
// Tell dispatch goroutine to stop, then write to the eventfd so that it wakes
// up in case it's sleeping.
atomic.StoreUint32(&e.stopRequested, 1)
- unix.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ e.rx.eventFD.Notify()
// Cleanup the queues inline if the worker hasn't started yet; we also know it
// won't start from now on because stopRequested is set to 1.
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go
index bb094da63..66ffc33b8 100644
--- a/pkg/tcpip/link/sharedmem/sharedmem_test.go
+++ b/pkg/tcpip/link/sharedmem/sharedmem_test.go
@@ -619,7 +619,7 @@ func TestSimpleReceive(t *testing.T) {
// Push completion.
c.pushRxCompletion(uint32(len(contents)), bufs)
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Wait for packet to be received, then check it.
c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet")
@@ -665,7 +665,7 @@ func TestRxBuffersReposted(t *testing.T) {
// Complete the buffer.
c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Wait for it to be reposted.
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
@@ -681,7 +681,7 @@ func TestRxBuffersReposted(t *testing.T) {
// Complete with two buffers.
c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Wait for them to be reposted.
for j := 0; j < 2; j++ {
@@ -706,7 +706,7 @@ func TestReceivePostingIsFull(t *testing.T) {
first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Check that packet is received.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
@@ -715,7 +715,7 @@ func TestReceivePostingIsFull(t *testing.T) {
second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Check that no packet is received yet, as the worker is blocked trying
// to repost.
@@ -728,7 +728,7 @@ func TestReceivePostingIsFull(t *testing.T) {
// Flush tx queue, which will allow the first buffer to be reposted,
// and the second completion to be pulled.
c.rxq.tx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Check that second packet completes.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
@@ -750,7 +750,7 @@ func TestCloseWhileWaitingToPost(t *testing.T) {
bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
c.rxq.rx.Flush()
- unix.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ c.rxCfg.EventFD.Notify()
// Wait for packet to be indicated.
c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go
index 5ffcb8ab4..35e5bff12 100644
--- a/pkg/tcpip/link/sharedmem/tx.go
+++ b/pkg/tcpip/link/sharedmem/tx.go
@@ -18,6 +18,7 @@ import (
"math"
"golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
"gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
)
@@ -32,7 +33,7 @@ type tx struct {
q queue.Tx
ids idManager
bufs bufferManager
- eventFD int
+ eventFD eventfd.Eventfd
sharedDataFD int
}
@@ -148,7 +149,7 @@ func (t *tx) transmit(bufs ...buffer.View) bool {
// notify writes to the tx.eventFD to indicate to the peer that there is data to
// be read.
func (t *tx) notify() {
- unix.Write(t.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
+ t.eventFD.Notify()
}
// getBuffer returns a memory region mapped to the full contents of the given
diff --git a/pkg/unet/BUILD b/pkg/unet/BUILD
index 234125c38..8902be2d3 100644
--- a/pkg/unet/BUILD
+++ b/pkg/unet/BUILD
@@ -10,6 +10,7 @@ go_library(
],
visibility = ["//visibility:public"],
deps = [
+ "//pkg/eventfd",
"//pkg/sync",
"@org_golang_x_sys//unix:go_default_library",
],
diff --git a/pkg/unet/unet.go b/pkg/unet/unet.go
index 40fa72925..0dc0c37bd 100644
--- a/pkg/unet/unet.go
+++ b/pkg/unet/unet.go
@@ -23,6 +23,7 @@ import (
"sync/atomic"
"golang.org/x/sys/unix"
+ "gvisor.dev/gvisor/pkg/eventfd"
"gvisor.dev/gvisor/pkg/sync"
)
@@ -55,15 +56,6 @@ func socket(packet bool) (int, error) {
return fd, nil
}
-// eventFD returns a new event FD with initial value 0.
-func eventFD() (int, error) {
- f, _, e := unix.Syscall(unix.SYS_EVENTFD2, 0, 0, 0)
- if e != 0 {
- return -1, e
- }
- return int(f), nil
-}
-
// Socket is a connected unix domain socket.
type Socket struct {
// gate protects use of fd.
@@ -78,7 +70,7 @@ type Socket struct {
// efd is an event FD that is signaled when the socket is closing.
//
// efd is immutable and remains valid until Close/Release.
- efd int
+ efd eventfd.Eventfd
// race is an atomic variable used to avoid triggering the race
// detector. See comment in SocketPair below.
@@ -95,7 +87,7 @@ func NewSocket(fd int) (*Socket, error) {
return nil, err
}
- efd, err := eventFD()
+ efd, err := eventfd.Create()
if err != nil {
return nil, err
}
@@ -110,16 +102,14 @@ func NewSocket(fd int) (*Socket, error) {
// closing the event FD.
func (s *Socket) finish() error {
// Signal any blocked or future polls.
- //
- // N.B. eventfd writes must be 8 bytes.
- if _, err := unix.Write(s.efd, []byte{1, 0, 0, 0, 0, 0, 0, 0}); err != nil {
+ if err := s.efd.Notify(); err != nil {
return err
}
// Close the gate, blocking until all FD users leave.
s.gate.Close()
- return unix.Close(s.efd)
+ return s.efd.Close()
}
// Close closes the socket.
diff --git a/pkg/unet/unet_unsafe.go b/pkg/unet/unet_unsafe.go
index f0bf93ddd..ea281fec3 100644
--- a/pkg/unet/unet_unsafe.go
+++ b/pkg/unet/unet_unsafe.go
@@ -43,7 +43,7 @@ func (s *Socket) wait(write bool) error {
},
{
// The eventfd, signaled when we are closing.
- Fd: int32(s.efd),
+ Fd: int32(s.efd.FD()),
Events: unix.POLLIN,
},
}