diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/control/server/server.go | 5 | ||||
-rw-r--r-- | pkg/sentry/control/pprof.go | 287 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/ext/inode.go | 4 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/fuse/fusefs.go | 3 | ||||
-rw-r--r-- | pkg/sentry/fsimpl/pipefs/pipefs.go | 4 | ||||
-rw-r--r-- | pkg/sentry/kernel/pipe/vfs.go | 28 | ||||
-rw-r--r-- | pkg/sentry/socket/control/BUILD | 16 | ||||
-rw-r--r-- | pkg/sentry/socket/control/control.go | 10 | ||||
-rw-r--r-- | pkg/sentry/socket/control/control_test.go | 59 | ||||
-rw-r--r-- | pkg/sentry/syscalls/linux/sys_socket.go | 2 | ||||
-rw-r--r-- | pkg/sentry/syscalls/linux/vfs2/pipe.go | 5 | ||||
-rw-r--r-- | pkg/sentry/syscalls/linux/vfs2/socket.go | 2 | ||||
-rw-r--r-- | pkg/state/tests/register_test.go | 11 | ||||
-rw-r--r-- | pkg/state/tests/struct_test.go | 11 | ||||
-rw-r--r-- | pkg/state/types.go | 71 | ||||
-rw-r--r-- | pkg/test/dockerutil/container.go | 106 | ||||
-rw-r--r-- | pkg/test/dockerutil/dockerutil.go | 10 | ||||
-rw-r--r-- | pkg/test/dockerutil/profile.go | 161 | ||||
-rw-r--r-- | pkg/test/dockerutil/profile_test.go | 59 | ||||
-rw-r--r-- | pkg/urpc/urpc.go | 27 |
20 files changed, 534 insertions, 347 deletions
diff --git a/pkg/control/server/server.go b/pkg/control/server/server.go index 41abe1f2d..629dae8f4 100644 --- a/pkg/control/server/server.go +++ b/pkg/control/server/server.go @@ -67,9 +67,10 @@ func (s *Server) Wait() { // and the server should not be used afterwards. func (s *Server) Stop() { s.socket.Close() - s.wg.Wait() + s.Wait() - // This will cause existing clients to be terminated safely. + // This will cause existing clients to be terminated safely. If the + // registered handlers have a Stop callback, it will be called. s.server.Stop() } diff --git a/pkg/sentry/control/pprof.go b/pkg/sentry/control/pprof.go index 91b8fb44f..2f3664c57 100644 --- a/pkg/sentry/control/pprof.go +++ b/pkg/sentry/control/pprof.go @@ -15,10 +15,10 @@ package control import ( - "errors" "runtime" "runtime/pprof" "runtime/trace" + "time" "gvisor.dev/gvisor/pkg/fd" "gvisor.dev/gvisor/pkg/sentry/kernel" @@ -26,184 +26,263 @@ import ( "gvisor.dev/gvisor/pkg/urpc" ) -var errNoOutput = errors.New("no output writer provided") +// Profile includes profile-related RPC stubs. It provides a way to +// control the built-in runtime profiling facilities. +// +// The profile object must be instantied via NewProfile. +type Profile struct { + // kernel is the kernel under profile. It's immutable. + kernel *kernel.Kernel -// ProfileOpts contains options for the StartCPUProfile/Goroutine RPC call. -type ProfileOpts struct { - // File is the filesystem path for the profile. - File string `json:"path"` + // cpuMu protects CPU profiling. + cpuMu sync.Mutex - // FilePayload is the destination for the profiling output. - urpc.FilePayload + // blockMu protects block profiling. + blockMu sync.Mutex + + // mutexMu protects mutex profiling. + mutexMu sync.Mutex + + // traceMu protects trace profiling. + traceMu sync.Mutex + + // done is closed when profiling is done. + done chan struct{} } -// Profile includes profile-related RPC stubs. It provides a way to -// control the built-in pprof facility in sentry via sentryctl. -// -// The following options to sentryctl are added: -// -// - collect CPU profile on-demand. -// sentryctl -pid <pid> pprof-cpu-start -// sentryctl -pid <pid> pprof-cpu-stop -// -// - dump out the stack trace of current go routines. -// sentryctl -pid <pid> pprof-goroutine -type Profile struct { - // Kernel is the kernel under profile. It's immutable. - Kernel *kernel.Kernel +// NewProfile returns a new Profile object. +func NewProfile(k *kernel.Kernel) *Profile { + return &Profile{ + kernel: k, + done: make(chan struct{}), + } +} - // mu protects the fields below. - mu sync.Mutex +// Stop implements urpc.Stopper.Stop. +func (p *Profile) Stop() { + close(p.done) +} - // cpuFile is the current CPU profile output file. - cpuFile *fd.FD +// CPUProfileOpts contains options specifically for CPU profiles. +type CPUProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload - // traceFile is the current execution trace output file. - traceFile *fd.FD + // Duration is the duration of the profile. + Duration time.Duration `json:"duration"` } -// StartCPUProfile is an RPC stub which starts recording the CPU profile in a -// file. -func (p *Profile) StartCPUProfile(o *ProfileOpts, _ *struct{}) error { +// CPU is an RPC stub which collects a CPU profile. +func (p *Profile) CPU(o *CPUProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } - output, err := fd.NewFromFile(o.FilePayload.Files[0]) - if err != nil { - return err - } + output := o.FilePayload.Files[0] + defer output.Close() - p.mu.Lock() - defer p.mu.Unlock() + p.cpuMu.Lock() + defer p.cpuMu.Unlock() // Returns an error if profiling is already started. if err := pprof.StartCPUProfile(output); err != nil { - output.Close() return err } + defer pprof.StopCPUProfile() + + // Collect the profile. + select { + case <-time.After(o.Duration): + case <-p.done: + } - p.cpuFile = output return nil } -// StopCPUProfile is an RPC stub which stops the CPU profiling and flush out the -// profile data. It takes no argument. -func (p *Profile) StopCPUProfile(_, _ *struct{}) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.cpuFile == nil { - return errors.New("CPU profiling not started") - } +// HeapProfileOpts contains options specifically for heap profiles. +type HeapProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload - pprof.StopCPUProfile() - p.cpuFile.Close() - p.cpuFile = nil - return nil + // Delay is the sleep time, similar to Duration. This may + // not affect the data collected however, as the heap will + // continue only the memory associated with the last alloc. + Delay time.Duration `json:"delay"` } -// HeapProfile generates a heap profile for the sentry. -func (p *Profile) HeapProfile(o *ProfileOpts, _ *struct{}) error { +// Heap generates a heap profile. +func (p *Profile) Heap(o *HeapProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } + output := o.FilePayload.Files[0] defer output.Close() - runtime.GC() // Get up-to-date statistics. - if err := pprof.WriteHeapProfile(output); err != nil { - return err + + // Wait for the given delay. + select { + case <-time.After(o.Delay): + case <-p.done: } - return nil + + // Get up-to-date statistics. + runtime.GC() + + // Write the given profile. + return pprof.WriteHeapProfile(output) +} + +// GoroutineProfileOpts contains options specifically for goroutine profiles. +type GoroutineProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload } -// GoroutineProfile is an RPC stub which dumps out the stack trace for all -// running goroutines. -func (p *Profile) GoroutineProfile(o *ProfileOpts, _ *struct{}) error { +// Goroutine dumps out the stack trace for all running goroutines. +func (p *Profile) Goroutine(o *GoroutineProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } + output := o.FilePayload.Files[0] defer output.Close() - if err := pprof.Lookup("goroutine").WriteTo(output, 2); err != nil { - return err - } - return nil + + return pprof.Lookup("goroutine").WriteTo(output, 2) +} + +// BlockProfileOpts contains options specifically for block profiles. +type BlockProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload + + // Duration is the duration of the profile. + Duration time.Duration `json:"duration"` + + // Rate is the block profile rate. + Rate int `json:"rate"` } -// BlockProfile is an RPC stub which dumps out the stack trace that led to -// blocking on synchronization primitives. -func (p *Profile) BlockProfile(o *ProfileOpts, _ *struct{}) error { +// Block dumps a blocking profile. +func (p *Profile) Block(o *BlockProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } + output := o.FilePayload.Files[0] defer output.Close() - if err := pprof.Lookup("block").WriteTo(output, 0); err != nil { - return err + + p.blockMu.Lock() + defer p.blockMu.Unlock() + + // Always set the rate. We then wait to collect a profile at this rate, + // and disable when we're done. Note that the default here is 10%, which + // will record a stacktrace 10% of the time when blocking occurs. Since + // these events should not be super frequent, we expect this to achieve + // a reasonable balance between collecting the data we need and imposing + // a high performance cost (e.g. skewing even the CPU profile). + rate := 10 + if o.Rate != 0 { + rate = o.Rate } - return nil + runtime.SetBlockProfileRate(rate) + defer runtime.SetBlockProfileRate(0) + + // Collect the profile. + select { + case <-time.After(o.Duration): + case <-p.done: + } + + return pprof.Lookup("block").WriteTo(output, 0) +} + +// MutexProfileOpts contains options specifically for mutex profiles. +type MutexProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload + + // Duration is the duration of the profile. + Duration time.Duration `json:"duration"` + + // Fraction is the mutex profile fraction. + Fraction int `json:"fraction"` } -// MutexProfile is an RPC stub which dumps out the stack trace of holders of -// contended mutexes. -func (p *Profile) MutexProfile(o *ProfileOpts, _ *struct{}) error { +// Mutex dumps a mutex profile. +func (p *Profile) Mutex(o *MutexProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } + output := o.FilePayload.Files[0] defer output.Close() - if err := pprof.Lookup("mutex").WriteTo(output, 0); err != nil { - return err + + p.mutexMu.Lock() + defer p.mutexMu.Unlock() + + // Always set the fraction. Like the block rate above, we use + // a default rate of 10% for the same reasons. + fraction := 10 + if o.Fraction != 0 { + fraction = o.Fraction } - return nil + runtime.SetMutexProfileFraction(fraction) + defer runtime.SetMutexProfileFraction(0) + + // Collect the profile. + select { + case <-time.After(o.Duration): + case <-p.done: + } + + return pprof.Lookup("mutex").WriteTo(output, 0) } -// StartTrace is an RPC stub which starts collection of an execution trace. -func (p *Profile) StartTrace(o *ProfileOpts, _ *struct{}) error { +// TraceProfileOpts contains options specifically for traces. +type TraceProfileOpts struct { + // FilePayload is the destination for the profiling output. + urpc.FilePayload + + // Duration is the duration of the profile. + Duration time.Duration `json:"duration"` +} + +// Trace is an RPC stub which starts collection of an execution trace. +func (p *Profile) Trace(o *TraceProfileOpts, _ *struct{}) error { if len(o.FilePayload.Files) < 1 { - return errNoOutput + return nil // Allowed. } output, err := fd.NewFromFile(o.FilePayload.Files[0]) if err != nil { return err } + defer output.Close() - p.mu.Lock() - defer p.mu.Unlock() + p.traceMu.Lock() + defer p.traceMu.Unlock() // Returns an error if profiling is already started. if err := trace.Start(output); err != nil { output.Close() return err } + defer trace.Stop() // Ensure all trace contexts are registered. - p.Kernel.RebuildTraceContexts() - - p.traceFile = output - return nil -} - -// StopTrace is an RPC stub which stops collection of an ongoing execution -// trace and flushes the trace data. It takes no argument. -func (p *Profile) StopTrace(_, _ *struct{}) error { - p.mu.Lock() - defer p.mu.Unlock() + p.kernel.RebuildTraceContexts() - if p.traceFile == nil { - return errors.New("execution tracing not started") + // Wait for the trace. + select { + case <-time.After(o.Duration): + case <-p.done: } // Similarly to the case above, if tasks have not ended traces, we will // lose information. Thus we need to rebuild the tasks in order to have // complete information. This will not lose information if multiple // traces are overlapping. - p.Kernel.RebuildTraceContexts() + p.kernel.RebuildTraceContexts() - trace.Stop() - p.traceFile.Close() - p.traceFile = nil return nil } diff --git a/pkg/sentry/fsimpl/ext/inode.go b/pkg/sentry/fsimpl/ext/inode.go index 9009ba3c7..4a555bf72 100644 --- a/pkg/sentry/fsimpl/ext/inode.go +++ b/pkg/sentry/fsimpl/ext/inode.go @@ -200,7 +200,9 @@ func (in *inode) open(rp *vfs.ResolvingPath, vfsd *vfs.Dentry, opts *vfs.OpenOpt } var fd symlinkFD fd.LockFD.Init(&in.locks) - fd.vfsfd.Init(&fd, opts.Flags, mnt, vfsd, &vfs.FileDescriptionOptions{}) + if err := fd.vfsfd.Init(&fd, opts.Flags, mnt, vfsd, &vfs.FileDescriptionOptions{}); err != nil { + return nil, err + } return &fd.vfsfd, nil default: panic(fmt.Sprintf("unknown inode type: %T", in.impl)) diff --git a/pkg/sentry/fsimpl/fuse/fusefs.go b/pkg/sentry/fsimpl/fuse/fusefs.go index 3af807a21..204d8d143 100644 --- a/pkg/sentry/fsimpl/fuse/fusefs.go +++ b/pkg/sentry/fsimpl/fuse/fusefs.go @@ -129,6 +129,9 @@ func (fsType FilesystemType) GetFilesystem(ctx context.Context, vfsObj *vfs.Virt return nil, nil, syserror.EINVAL } fuseFDGeneric := kernelTask.GetFileVFS2(int32(deviceDescriptor)) + if fuseFDGeneric == nil { + return nil, nil, syserror.EINVAL + } defer fuseFDGeneric.DecRef(ctx) fuseFD, ok := fuseFDGeneric.Impl().(*DeviceFD) if !ok { diff --git a/pkg/sentry/fsimpl/pipefs/pipefs.go b/pkg/sentry/fsimpl/pipefs/pipefs.go index 0ecb592cf..429733c10 100644 --- a/pkg/sentry/fsimpl/pipefs/pipefs.go +++ b/pkg/sentry/fsimpl/pipefs/pipefs.go @@ -164,11 +164,11 @@ func (i *inode) StatFS(ctx context.Context, fs *vfs.Filesystem) (linux.Statfs, e // and write ends of a newly-created pipe, as for pipe(2) and pipe2(2). // // Preconditions: mnt.Filesystem() must have been returned by NewFilesystem(). -func NewConnectedPipeFDs(ctx context.Context, mnt *vfs.Mount, flags uint32) (*vfs.FileDescription, *vfs.FileDescription) { +func NewConnectedPipeFDs(ctx context.Context, mnt *vfs.Mount, flags uint32) (*vfs.FileDescription, *vfs.FileDescription, error) { fs := mnt.Filesystem().Impl().(*filesystem) inode := newInode(ctx, fs) var d kernfs.Dentry d.Init(&fs.Filesystem, inode) defer d.DecRef(ctx) - return inode.pipe.ReaderWriterPair(mnt, d.VFSDentry(), flags) + return inode.pipe.ReaderWriterPair(ctx, mnt, d.VFSDentry(), flags) } diff --git a/pkg/sentry/kernel/pipe/vfs.go b/pkg/sentry/kernel/pipe/vfs.go index 7b23cbe86..2d47d2e82 100644 --- a/pkg/sentry/kernel/pipe/vfs.go +++ b/pkg/sentry/kernel/pipe/vfs.go @@ -63,10 +63,19 @@ func NewVFSPipe(isNamed bool, sizeBytes int64) *VFSPipe { // ReaderWriterPair returns read-only and write-only FDs for vp. // // Preconditions: statusFlags should not contain an open access mode. -func (vp *VFSPipe) ReaderWriterPair(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32) (*vfs.FileDescription, *vfs.FileDescription) { +func (vp *VFSPipe) ReaderWriterPair(ctx context.Context, mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32) (*vfs.FileDescription, *vfs.FileDescription, error) { // Connected pipes share the same locks. locks := &vfs.FileLocks{} - return vp.newFD(mnt, vfsd, linux.O_RDONLY|statusFlags, locks), vp.newFD(mnt, vfsd, linux.O_WRONLY|statusFlags, locks) + r, err := vp.newFD(mnt, vfsd, linux.O_RDONLY|statusFlags, locks) + if err != nil { + return nil, nil, err + } + w, err := vp.newFD(mnt, vfsd, linux.O_WRONLY|statusFlags, locks) + if err != nil { + r.DecRef(ctx) + return nil, nil, err + } + return r, w, nil } // Allocate implements vfs.FileDescriptionImpl.Allocate. @@ -85,7 +94,10 @@ func (vp *VFSPipe) Open(ctx context.Context, mnt *vfs.Mount, vfsd *vfs.Dentry, s return nil, syserror.EINVAL } - fd := vp.newFD(mnt, vfsd, statusFlags, locks) + fd, err := vp.newFD(mnt, vfsd, statusFlags, locks) + if err != nil { + return nil, err + } // Named pipes have special blocking semantics during open: // @@ -137,16 +149,18 @@ func (vp *VFSPipe) Open(ctx context.Context, mnt *vfs.Mount, vfsd *vfs.Dentry, s } // Preconditions: vp.mu must be held. -func (vp *VFSPipe) newFD(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32, locks *vfs.FileLocks) *vfs.FileDescription { +func (vp *VFSPipe) newFD(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32, locks *vfs.FileLocks) (*vfs.FileDescription, error) { fd := &VFSPipeFD{ pipe: &vp.pipe, } fd.LockFD.Init(locks) - fd.vfsfd.Init(fd, statusFlags, mnt, vfsd, &vfs.FileDescriptionOptions{ + if err := fd.vfsfd.Init(fd, statusFlags, mnt, vfsd, &vfs.FileDescriptionOptions{ DenyPRead: true, DenyPWrite: true, UseDentryMetadata: true, - }) + }); err != nil { + return nil, err + } switch { case fd.vfsfd.IsReadable() && fd.vfsfd.IsWritable(): @@ -160,7 +174,7 @@ func (vp *VFSPipe) newFD(mnt *vfs.Mount, vfsd *vfs.Dentry, statusFlags uint32, l panic("invalid pipe flags: must be readable, writable, or both") } - return &fd.vfsfd + return &fd.vfsfd, nil } // VFSPipeFD implements vfs.FileDescriptionImpl for pipes. It also implements diff --git a/pkg/sentry/socket/control/BUILD b/pkg/sentry/socket/control/BUILD index fb7c5dc61..ebcc891b3 100644 --- a/pkg/sentry/socket/control/BUILD +++ b/pkg/sentry/socket/control/BUILD @@ -1,4 +1,4 @@ -load("//tools:defs.bzl", "go_library") +load("//tools:defs.bzl", "go_library", "go_test") package(licenses = ["notice"]) @@ -26,3 +26,17 @@ go_library( "//pkg/usermem", ], ) + +go_test( + name = "control_test", + size = "small", + srcs = ["control_test.go"], + library = ":control", + deps = [ + "//pkg/abi/linux", + "//pkg/binary", + "//pkg/sentry/socket", + "//pkg/usermem", + "@com_github_google_go_cmp//cmp:go_default_library", + ], +) diff --git a/pkg/sentry/socket/control/control.go b/pkg/sentry/socket/control/control.go index ff6b71802..65b556489 100644 --- a/pkg/sentry/socket/control/control.go +++ b/pkg/sentry/socket/control/control.go @@ -463,7 +463,7 @@ func CmsgsSpace(t *kernel.Task, cmsgs socket.ControlMessages) int { } // Parse parses a raw socket control message into portable objects. -func Parse(t *kernel.Task, socketOrEndpoint interface{}, buf []byte) (socket.ControlMessages, error) { +func Parse(t *kernel.Task, socketOrEndpoint interface{}, buf []byte, width uint) (socket.ControlMessages, error) { var ( cmsgs socket.ControlMessages fds linux.ControlMessageRights @@ -487,10 +487,6 @@ func Parse(t *kernel.Task, socketOrEndpoint interface{}, buf []byte) (socket.Con i += linux.SizeOfControlMessageHeader length := int(h.Length) - linux.SizeOfControlMessageHeader - // The use of t.Arch().Width() is analogous to Linux's use of - // sizeof(long) in CMSG_ALIGN. - width := t.Arch().Width() - switch h.Level { case linux.SOL_SOCKET: switch h.Type { @@ -526,8 +522,10 @@ func Parse(t *kernel.Task, socketOrEndpoint interface{}, buf []byte) (socket.Con if length < linux.SizeOfTimeval { return socket.ControlMessages{}, syserror.EINVAL } + var ts linux.Timeval + binary.Unmarshal(buf[i:i+linux.SizeOfTimeval], usermem.ByteOrder, &ts) + cmsgs.IP.Timestamp = ts.ToNsecCapped() cmsgs.IP.HasTimestamp = true - binary.Unmarshal(buf[i:i+linux.SizeOfTimeval], usermem.ByteOrder, &cmsgs.IP.Timestamp) i += binary.AlignUp(length, width) default: diff --git a/pkg/sentry/socket/control/control_test.go b/pkg/sentry/socket/control/control_test.go new file mode 100644 index 000000000..d40a4cc85 --- /dev/null +++ b/pkg/sentry/socket/control/control_test.go @@ -0,0 +1,59 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package control provides internal representations of socket control +// messages. +package control + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "gvisor.dev/gvisor/pkg/abi/linux" + "gvisor.dev/gvisor/pkg/binary" + "gvisor.dev/gvisor/pkg/sentry/socket" + "gvisor.dev/gvisor/pkg/usermem" +) + +func TestParse(t *testing.T) { + // Craft the control message to parse. + length := linux.SizeOfControlMessageHeader + linux.SizeOfTimeval + hdr := linux.ControlMessageHeader{ + Length: uint64(length), + Level: linux.SOL_SOCKET, + Type: linux.SO_TIMESTAMP, + } + buf := make([]byte, 0, length) + buf = binary.Marshal(buf, usermem.ByteOrder, &hdr) + ts := linux.Timeval{ + Sec: 2401, + Usec: 343, + } + buf = binary.Marshal(buf, usermem.ByteOrder, &ts) + + cmsg, err := Parse(nil, nil, buf, 8 /* width */) + if err != nil { + t.Fatalf("Parse(_, _, %+v, _): %v", cmsg, err) + } + + want := socket.ControlMessages{ + IP: socket.IPControlMessages{ + HasTimestamp: true, + Timestamp: ts.ToNsecCapped(), + }, + } + if diff := cmp.Diff(want, cmsg); diff != "" { + t.Errorf("unexpected message parsed, (-want, +got):\n%s", diff) + } +} diff --git a/pkg/sentry/syscalls/linux/sys_socket.go b/pkg/sentry/syscalls/linux/sys_socket.go index 4adfa6637..fe45225c1 100644 --- a/pkg/sentry/syscalls/linux/sys_socket.go +++ b/pkg/sentry/syscalls/linux/sys_socket.go @@ -1030,7 +1030,7 @@ func sendSingleMsg(t *kernel.Task, s socket.Socket, file *fs.File, msgPtr userme return 0, err } - controlMessages, err := control.Parse(t, s, controlData) + controlMessages, err := control.Parse(t, s, controlData, t.Arch().Width()) if err != nil { return 0, err } diff --git a/pkg/sentry/syscalls/linux/vfs2/pipe.go b/pkg/sentry/syscalls/linux/vfs2/pipe.go index ee38fdca0..6986e39fe 100644 --- a/pkg/sentry/syscalls/linux/vfs2/pipe.go +++ b/pkg/sentry/syscalls/linux/vfs2/pipe.go @@ -42,7 +42,10 @@ func pipe2(t *kernel.Task, addr usermem.Addr, flags int32) error { if flags&^(linux.O_NONBLOCK|linux.O_CLOEXEC) != 0 { return syserror.EINVAL } - r, w := pipefs.NewConnectedPipeFDs(t, t.Kernel().PipeMount(), uint32(flags&linux.O_NONBLOCK)) + r, w, err := pipefs.NewConnectedPipeFDs(t, t.Kernel().PipeMount(), uint32(flags&linux.O_NONBLOCK)) + if err != nil { + return err + } defer r.DecRef(t) defer w.DecRef(t) diff --git a/pkg/sentry/syscalls/linux/vfs2/socket.go b/pkg/sentry/syscalls/linux/vfs2/socket.go index 987012acc..f5795b4a8 100644 --- a/pkg/sentry/syscalls/linux/vfs2/socket.go +++ b/pkg/sentry/syscalls/linux/vfs2/socket.go @@ -1033,7 +1033,7 @@ func sendSingleMsg(t *kernel.Task, s socket.SocketVFS2, file *vfs.FileDescriptio return 0, err } - controlMessages, err := control.Parse(t, s, controlData) + controlMessages, err := control.Parse(t, s, controlData, t.Arch().Width()) if err != nil { return 0, err } diff --git a/pkg/state/tests/register_test.go b/pkg/state/tests/register_test.go index c829753cc..75bdbfc6e 100644 --- a/pkg/state/tests/register_test.go +++ b/pkg/state/tests/register_test.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +// +build race + package tests import ( @@ -165,3 +167,12 @@ func TestRegisterBad(t *testing.T) { } } + +func TestRegisterTypeOnlyStruct(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("Register did not panic") + } + }() + state.Register((*typeOnlyEmptyStruct)(nil)) +} diff --git a/pkg/state/tests/struct_test.go b/pkg/state/tests/struct_test.go index c91c2c032..9826f1ee9 100644 --- a/pkg/state/tests/struct_test.go +++ b/pkg/state/tests/struct_test.go @@ -17,8 +17,6 @@ package tests import ( "math/rand" "testing" - - "gvisor.dev/gvisor/pkg/state" ) func TestEmptyStruct(t *testing.T) { @@ -58,15 +56,6 @@ func TestEmptyStruct(t *testing.T) { }) } -func TestRegisterTypeOnlyStruct(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Errorf("Register did not panic") - } - }() - state.Register((*typeOnlyEmptyStruct)(nil)) -} - func TestEmbeddedPointers(t *testing.T) { // Give each int64 a random value to prevent Go from using // runtime.staticuint64s, which confounds tests for struct duplication. diff --git a/pkg/state/types.go b/pkg/state/types.go index 84aed8732..420675880 100644 --- a/pkg/state/types.go +++ b/pkg/state/types.go @@ -329,47 +329,48 @@ var reverseTypeDatabase = map[reflect.Type]string{} // This must be called on init and only done once. func Register(t Type) { name := t.StateTypeName() - fields := t.StateFields() - assertValidType(name, fields) - // Register must always be called on pointers. typ := reflect.TypeOf(t) - if typ.Kind() != reflect.Ptr { - Failf("Register must be called on pointers") + if raceEnabled { + assertValidType(name, t.StateFields()) + // Register must always be called on pointers. + if typ.Kind() != reflect.Ptr { + Failf("Register must be called on pointers") + } } typ = typ.Elem() - if typ.Kind() == reflect.Struct { - // All registered structs must implement SaverLoader. We allow - // the registration is non-struct types with just the Type - // interface, but we need to call StateSave/StateLoad methods - // on aggregate types. - if _, ok := t.(SaverLoader); !ok { - Failf("struct %T does not implement SaverLoader", t) + if raceEnabled { + if typ.Kind() == reflect.Struct { + // All registered structs must implement SaverLoader. We allow + // the registration is non-struct types with just the Type + // interface, but we need to call StateSave/StateLoad methods + // on aggregate types. + if _, ok := t.(SaverLoader); !ok { + Failf("struct %T does not implement SaverLoader", t) + } + } else { + // Non-structs must not have any fields. We don't support + // calling StateSave/StateLoad methods on any non-struct types. + // If custom behavior is required, these types should be + // wrapped in a structure of some kind. + if fields := t.StateFields(); len(fields) != 0 { + Failf("non-struct %T has non-zero fields %v", t, fields) + } + // We don't allow non-structs to implement StateSave/StateLoad + // methods, because they won't be called and it's confusing. + if _, ok := t.(SaverLoader); ok { + Failf("non-struct %T implements SaverLoader", t) + } } - } else { - // Non-structs must not have any fields. We don't support - // calling StateSave/StateLoad methods on any non-struct types. - // If custom behavior is required, these types should be - // wrapped in a structure of some kind. - if len(fields) != 0 { - Failf("non-struct %T has non-zero fields %v", t, fields) + if _, ok := primitiveTypeDatabase[name]; ok { + Failf("conflicting primitiveTypeDatabase entry for %T: used by primitive", t) } - // We don't allow non-structs to implement StateSave/StateLoad - // methods, because they won't be called and it's confusing. - if _, ok := t.(SaverLoader); ok { - Failf("non-struct %T implements SaverLoader", t) + if _, ok := globalTypeDatabase[name]; ok { + Failf("conflicting globalTypeDatabase entries for %T: name conflict", t) + } + if name == interfaceType { + Failf("conflicting name for %T: matches interfaceType", t) } - } - if _, ok := primitiveTypeDatabase[name]; ok { - Failf("conflicting primitiveTypeDatabase entry for %T: used by primitive", t) - } - if _, ok := globalTypeDatabase[name]; ok { - Failf("conflicting globalTypeDatabase entries for %T: name conflict", t) - } - if name == interfaceType { - Failf("conflicting name for %T: matches interfaceType", t) - } - globalTypeDatabase[name] = typ - if raceEnabled { reverseTypeDatabase[typ] = name } + globalTypeDatabase[name] = typ } diff --git a/pkg/test/dockerutil/container.go b/pkg/test/dockerutil/container.go index 2bf0a22ff..7bacb70d3 100644 --- a/pkg/test/dockerutil/container.go +++ b/pkg/test/dockerutil/container.go @@ -55,11 +55,8 @@ type Container struct { copyErr error cleanups []func() - // Profiles are profiles added to this container. They contain methods - // that are run after Creation, Start, and Cleanup of this Container, along - // a handle to restart the profile. Generally, tests/benchmarks using - // profiles need to run as root. - profiles []Profile + // profile is the profiling hook associated with this container. + profile *profile } // RunOpts are options for running a container. @@ -105,22 +102,7 @@ type RunOpts struct { Links []string } -// MakeContainer sets up the struct for a Docker container. -// -// Names of containers will be unique. -// Containers will check flags for profiling requests. -func MakeContainer(ctx context.Context, logger testutil.Logger) *Container { - c := MakeNativeContainer(ctx, logger) - c.runtime = *runtime - if p := MakePprofFromFlags(c); p != nil { - c.AddProfile(p) - } - return c -} - -// MakeNativeContainer sets up the struct for a DockerContainer using runc. Native -// containers aren't profiled. -func MakeNativeContainer(ctx context.Context, logger testutil.Logger) *Container { +func makeContainer(ctx context.Context, logger testutil.Logger, runtime string) *Container { // Slashes are not allowed in container names. name := testutil.RandomID(logger.Name()) name = strings.ReplaceAll(name, "/", "-") @@ -132,29 +114,32 @@ func MakeNativeContainer(ctx context.Context, logger testutil.Logger) *Container return &Container{ logger: logger, Name: name, - runtime: "", + runtime: runtime, client: client, } } -// AddProfile adds a profile to this container. -func (c *Container) AddProfile(p Profile) { - c.profiles = append(c.profiles, p) +// MakeContainer constructs a suitable Container object. +// +// The runtime used is determined by the runtime flag. +// +// Containers will check flags for profiling requests. +func MakeContainer(ctx context.Context, logger testutil.Logger) *Container { + return makeContainer(ctx, logger, *runtime) } -// RestartProfiles calls Restart on all profiles for this container. -func (c *Container) RestartProfiles() error { - for _, profile := range c.profiles { - if err := profile.Restart(c); err != nil { - return err - } - } - return nil +// MakeNativeContainer constructs a suitable Container object. +// +// The runtime used will be the system default. +// +// Native containers aren't profiled. +func MakeNativeContainer(ctx context.Context, logger testutil.Logger) *Container { + return makeContainer(ctx, logger, "" /*runtime*/) } // Spawn is analogous to 'docker run -d'. func (c *Container) Spawn(ctx context.Context, r RunOpts, args ...string) error { - if err := c.create(ctx, c.config(r, args), c.hostConfig(r), nil); err != nil { + if err := c.create(ctx, r.Image, c.config(r, args), c.hostConfig(r), nil); err != nil { return err } return c.Start(ctx) @@ -167,7 +152,7 @@ func (c *Container) SpawnProcess(ctx context.Context, r RunOpts, args ...string) config.Tty = true config.OpenStdin = true - if err := c.CreateFrom(ctx, config, hostconf, netconf); err != nil { + if err := c.CreateFrom(ctx, r.Image, config, hostconf, netconf); err != nil { return Process{}, err } @@ -194,7 +179,7 @@ func (c *Container) SpawnProcess(ctx context.Context, r RunOpts, args ...string) // Run is analogous to 'docker run'. func (c *Container) Run(ctx context.Context, r RunOpts, args ...string) (string, error) { - if err := c.create(ctx, c.config(r, args), c.hostConfig(r), nil); err != nil { + if err := c.create(ctx, r.Image, c.config(r, args), c.hostConfig(r), nil); err != nil { return "", err } @@ -221,26 +206,26 @@ func (c *Container) MakeLink(target string) string { } // CreateFrom creates a container from the given configs. -func (c *Container) CreateFrom(ctx context.Context, conf *container.Config, hostconf *container.HostConfig, netconf *network.NetworkingConfig) error { - return c.create(ctx, conf, hostconf, netconf) +func (c *Container) CreateFrom(ctx context.Context, profileImage string, conf *container.Config, hostconf *container.HostConfig, netconf *network.NetworkingConfig) error { + return c.create(ctx, profileImage, conf, hostconf, netconf) } // Create is analogous to 'docker create'. func (c *Container) Create(ctx context.Context, r RunOpts, args ...string) error { - return c.create(ctx, c.config(r, args), c.hostConfig(r), nil) + return c.create(ctx, r.Image, c.config(r, args), c.hostConfig(r), nil) } -func (c *Container) create(ctx context.Context, conf *container.Config, hostconf *container.HostConfig, netconf *network.NetworkingConfig) error { +func (c *Container) create(ctx context.Context, profileImage string, conf *container.Config, hostconf *container.HostConfig, netconf *network.NetworkingConfig) error { + if c.runtime != "" { + // Use the image name as provided here; which normally represents the + // unmodified "basic/alpine" image name. This should be easy to grok. + c.profileInit(profileImage) + } cont, err := c.client.ContainerCreate(ctx, conf, hostconf, nil, c.Name) if err != nil { return err } c.id = cont.ID - for _, profile := range c.profiles { - if err := profile.OnCreate(c); err != nil { - return fmt.Errorf("OnCreate method failed with: %v", err) - } - } return nil } @@ -286,11 +271,13 @@ func (c *Container) Start(ctx context.Context) error { if err := c.client.ContainerStart(ctx, c.id, types.ContainerStartOptions{}); err != nil { return fmt.Errorf("ContainerStart failed: %v", err) } - for _, profile := range c.profiles { - if err := profile.OnStart(c); err != nil { - return fmt.Errorf("OnStart method failed: %v", err) + + if c.profile != nil { + if err := c.profile.Start(c); err != nil { + c.logger.Logf("profile.Start failed: %v", err) } } + return nil } @@ -442,6 +429,7 @@ func (c *Container) Status(ctx context.Context) (types.ContainerState, error) { // Wait waits for the container to exit. func (c *Container) Wait(ctx context.Context) error { + defer c.stopProfiling() statusChan, errChan := c.client.ContainerWait(ctx, c.id, container.WaitConditionNotRunning) select { case err := <-errChan: @@ -499,8 +487,20 @@ func (c *Container) WaitForOutputSubmatch(ctx context.Context, pattern string, t } } +// stopProfiling stops profiling. +func (c *Container) stopProfiling() { + if c.profile != nil { + if err := c.profile.Stop(c); err != nil { + // This most likely means that the runtime for the container + // was too short to connect and actually get a profile. + c.logger.Logf("warning: profile.Stop failed: %v", err) + } + } +} + // Kill kills the container. func (c *Container) Kill(ctx context.Context) error { + defer c.stopProfiling() return c.client.ContainerKill(ctx, c.id, "") } @@ -517,14 +517,6 @@ func (c *Container) Remove(ctx context.Context) error { // CleanUp kills and deletes the container (best effort). func (c *Container) CleanUp(ctx context.Context) { - // Execute profile cleanups before the container goes down. - for _, profile := range c.profiles { - profile.OnCleanUp(c) - } - - // Forget profiles. - c.profiles = nil - // Execute all cleanups. We execute cleanups here to close any // open connections to the container before closing. Open connections // can cause Kill and Remove to hang. @@ -538,10 +530,12 @@ func (c *Container) CleanUp(ctx context.Context) { // Just log; can't do anything here. c.logger.Logf("error killing container %q: %v", c.Name, err) } + // Remove the image. if err := c.Remove(ctx); err != nil { c.logger.Logf("error removing container %q: %v", c.Name, err) } + // Forget all mounts. c.mounts = nil } diff --git a/pkg/test/dockerutil/dockerutil.go b/pkg/test/dockerutil/dockerutil.go index 7027df1a5..a40005799 100644 --- a/pkg/test/dockerutil/dockerutil.go +++ b/pkg/test/dockerutil/dockerutil.go @@ -49,15 +49,11 @@ var ( // pprofBaseDir allows the user to change the directory to which profiles are // written. By default, profiles will appear under: // /tmp/profile/RUNTIME/CONTAINER_NAME/*.pprof. - pprofBaseDir = flag.String("pprof-dir", "/tmp/profile", "base directory in: BASEDIR/RUNTIME/CONTINER_NAME/FILENAME (e.g. /tmp/profile/runtime/mycontainer/cpu.pprof)") - - // duration is the max duration `runsc debug` will run and capture profiles. - // If the container's clean up method is called prior to duration, the - // profiling process will be killed. - duration = flag.Duration("pprof-duration", 10*time.Second, "duration to run the profile in seconds") + pprofBaseDir = flag.String("pprof-dir", "/tmp/profile", "base directory in: BASEDIR/RUNTIME/CONTINER_NAME/FILENAME (e.g. /tmp/profile/runtime/mycontainer/cpu.pprof)") + pprofDuration = flag.Duration("pprof-duration", time.Hour, "profiling duration (automatically stopped at container exit)") // The below flags enable each type of profile. Multiple profiles can be - // enabled for each run. + // enabled for each run. The profile will be collected from the start. pprofBlock = flag.Bool("pprof-block", false, "enables block profiling with runsc debug") pprofCPU = flag.Bool("pprof-cpu", false, "enables CPU profiling with runsc debug") pprofHeap = flag.Bool("pprof-heap", false, "enables heap profiling with runsc debug") diff --git a/pkg/test/dockerutil/profile.go b/pkg/test/dockerutil/profile.go index 55f9496cd..5cad3e959 100644 --- a/pkg/test/dockerutil/profile.go +++ b/pkg/test/dockerutil/profile.go @@ -17,72 +17,64 @@ package dockerutil import ( "context" "fmt" - "io" "os" "os/exec" "path/filepath" + "syscall" "time" ) -// Profile represents profile-like operations on a container, -// such as running perf or pprof. It is meant to be added to containers -// such that the container type calls the Profile during its lifecycle. -type Profile interface { - // OnCreate is called just after the container is created when the container - // has a valid ID (e.g. c.ID()). - OnCreate(c *Container) error - - // OnStart is called just after the container is started when the container - // has a valid Pid (e.g. c.SandboxPid()). - OnStart(c *Container) error - - // Restart restarts the Profile on request. - Restart(c *Container) error - - // OnCleanUp is called during the container's cleanup method. - // Cleanups should just log errors if they have them. - OnCleanUp(c *Container) error -} - -// Pprof is for running profiles with 'runsc debug'. Pprof workloads -// should be run as root and ONLY against runsc sandboxes. The runtime -// should have --profile set as an option in /etc/docker/daemon.json in -// order for profiling to work with Pprof. -type Pprof struct { - BasePath string // path to put profiles - BlockProfile bool - CPUProfile bool - HeapProfile bool - MutexProfile bool - Duration time.Duration // duration to run profiler e.g. '10s' or '1m'. - shouldRun bool - cmd *exec.Cmd - stdout io.ReadCloser - stderr io.ReadCloser +// profile represents profile-like operations on a container. +// +// It is meant to be added to containers such that the container type calls +// the profile during its lifecycle. Standard implementations are below. + +// profile is for running profiles with 'runsc debug'. +type profile struct { + BasePath string + Types []string + Duration time.Duration + cmd *exec.Cmd } -// MakePprofFromFlags makes a Pprof profile from flags. -func MakePprofFromFlags(c *Container) *Pprof { - if !(*pprofBlock || *pprofCPU || *pprofHeap || *pprofMutex) { - return nil +// profileInit initializes a profile object, if required. +// +// N.B. The profiling filename initialized here will use the *image* +// name, and not the unique container name. This is intentional. Most +// of the time, profiling will be used for benchmarks. Benchmarks will +// be run iteratively until a sufficiently large N is reached. It is +// useful in this context to overwrite previous runs, and generate a +// single profile result for the final test. +func (c *Container) profileInit(image string) { + if !*pprofBlock && !*pprofCPU && !*pprofMutex && !*pprofHeap { + return // Nothing to do. + } + c.profile = &profile{ + BasePath: filepath.Join(*pprofBaseDir, c.runtime, c.logger.Name(), image), + Duration: *pprofDuration, + } + if *pprofCPU { + c.profile.Types = append(c.profile.Types, "cpu") } - return &Pprof{ - BasePath: filepath.Join(*pprofBaseDir, c.runtime, c.Name), - BlockProfile: *pprofBlock, - CPUProfile: *pprofCPU, - HeapProfile: *pprofHeap, - MutexProfile: *pprofMutex, - Duration: *duration, + if *pprofHeap { + c.profile.Types = append(c.profile.Types, "heap") + } + if *pprofMutex { + c.profile.Types = append(c.profile.Types, "mutex") + } + if *pprofBlock { + c.profile.Types = append(c.profile.Types, "block") } } -// OnCreate implements Profile.OnCreate. -func (p *Pprof) OnCreate(c *Container) error { - return os.MkdirAll(p.BasePath, 0755) -} +// createProcess creates the collection process. +func (p *profile) createProcess(c *Container) error { + // Ensure our directory exists. + if err := os.MkdirAll(p.BasePath, 0755); err != nil { + return err + } -// OnStart implements Profile.OnStart. -func (p *Pprof) OnStart(c *Container) error { + // Find the runtime to invoke. path, err := RuntimePath() if err != nil { return fmt.Errorf("failed to get runtime path: %v", err) @@ -90,58 +82,63 @@ func (p *Pprof) OnStart(c *Container) error { // The root directory of this container's runtime. root := fmt.Sprintf("--root=/var/run/docker/runtime-%s/moby", c.runtime) - // Format is `runsc --root=rootdir debug --profile-*=file --duration=* containerID`. + + // Format is `runsc --root=rootdir debug --profile-*=file --duration=24h containerID`. args := []string{root, "debug"} - args = append(args, p.makeProfileArgs(c)...) + for _, profileArg := range p.Types { + outputPath := filepath.Join(p.BasePath, fmt.Sprintf("%s.pprof", profileArg)) + args = append(args, fmt.Sprintf("--profile-%s=%s", profileArg, outputPath)) + } + args = append(args, fmt.Sprintf("--duration=%s", p.Duration)) // Or until container exits. + args = append(args, fmt.Sprintf("--delay=%s", p.Duration)) // Ditto. args = append(args, c.ID()) // Best effort wait until container is running. for now := time.Now(); time.Since(now) < 5*time.Second; { if status, err := c.Status(context.Background()); err != nil { return fmt.Errorf("failed to get status with: %v", err) - } else if status.Running { break } - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } p.cmd = exec.Command(path, args...) + p.cmd.Stderr = os.Stderr // Pass through errors. if err := p.cmd.Start(); err != nil { - return fmt.Errorf("process failed: %v", err) + return fmt.Errorf("start process failed: %v", err) } + return nil } -// Restart implements Profile.Restart. -func (p *Pprof) Restart(c *Container) error { - p.OnCleanUp(c) - return p.OnStart(c) +// killProcess kills the process, if running. +func (p *profile) killProcess() error { + if p.cmd != nil && p.cmd.Process != nil { + return p.cmd.Process.Signal(syscall.SIGTERM) + } + return nil } -// OnCleanUp implements Profile.OnCleanup -func (p *Pprof) OnCleanUp(c *Container) error { +// waitProcess waits for the process, if running. +func (p *profile) waitProcess() error { defer func() { p.cmd = nil }() - if p.cmd != nil && p.cmd.Process != nil && p.cmd.ProcessState != nil && !p.cmd.ProcessState.Exited() { - return p.cmd.Process.Kill() + if p.cmd != nil { + return p.cmd.Wait() } return nil } -// makeProfileArgs turns Pprof fields into runsc debug flags. -func (p *Pprof) makeProfileArgs(c *Container) []string { - var ret []string - if p.BlockProfile { - ret = append(ret, fmt.Sprintf("--profile-block=%s", filepath.Join(p.BasePath, "block.pprof"))) - } - if p.CPUProfile { - ret = append(ret, fmt.Sprintf("--profile-cpu=%s", filepath.Join(p.BasePath, "cpu.pprof"))) - } - if p.HeapProfile { - ret = append(ret, fmt.Sprintf("--profile-heap=%s", filepath.Join(p.BasePath, "heap.pprof"))) - } - if p.MutexProfile { - ret = append(ret, fmt.Sprintf("--profile-mutex=%s", filepath.Join(p.BasePath, "mutex.pprof"))) +// Start is called when profiling is started. +func (p *profile) Start(c *Container) error { + return p.createProcess(c) +} + +// Stop is called when profiling is started. +func (p *profile) Stop(c *Container) error { + killErr := p.killProcess() + waitErr := p.waitProcess() + if waitErr != nil && killErr != nil { + return killErr } - ret = append(ret, fmt.Sprintf("--duration=%s", p.Duration)) - return ret + return waitErr // Ignore okay wait, err kill. } diff --git a/pkg/test/dockerutil/profile_test.go b/pkg/test/dockerutil/profile_test.go index 8c4ffe483..4fe9ce15c 100644 --- a/pkg/test/dockerutil/profile_test.go +++ b/pkg/test/dockerutil/profile_test.go @@ -17,6 +17,7 @@ package dockerutil import ( "context" "fmt" + "io/ioutil" "os" "path/filepath" "testing" @@ -25,52 +26,60 @@ import ( type testCase struct { name string - pprof Pprof + profile profile expectedFiles []string } -func TestPprof(t *testing.T) { +func TestProfile(t *testing.T) { // Basepath and expected file names for each type of profile. - basePath := "/tmp/test/profile" + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unable to create temporary directory: %v", err) + } + defer os.RemoveAll(tmpDir) + + // All expected names. + basePath := tmpDir block := "block.pprof" cpu := "cpu.pprof" - goprofle := "go.pprof" heap := "heap.pprof" mutex := "mutex.pprof" testCases := []testCase{ { - name: "Cpu", - pprof: Pprof{ - BasePath: basePath, - CPUProfile: true, - Duration: 2 * time.Second, + name: "One", + profile: profile{ + BasePath: basePath, + Types: []string{"cpu"}, + Duration: 2 * time.Second, }, expectedFiles: []string{cpu}, }, { name: "All", - pprof: Pprof{ - BasePath: basePath, - BlockProfile: true, - CPUProfile: true, - HeapProfile: true, - MutexProfile: true, - Duration: 2 * time.Second, + profile: profile{ + BasePath: basePath, + Types: []string{"block", "cpu", "heap", "mutex"}, + Duration: 2 * time.Second, }, - expectedFiles: []string{block, cpu, goprofle, heap, mutex}, + expectedFiles: []string{block, cpu, heap, mutex}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx := context.Background() c := MakeContainer(ctx, t) + // Set basepath to include the container name so there are no conflicts. - tc.pprof.BasePath = filepath.Join(tc.pprof.BasePath, c.Name) - c.AddProfile(&tc.pprof) + localProfile := tc.profile // Copy it. + localProfile.BasePath = filepath.Join(localProfile.BasePath, tc.name) + + // Set directly on the container, to avoid flags. + c.profile = &localProfile func() { defer c.CleanUp(ctx) + // Start a container. if err := c.Spawn(ctx, RunOpts{ Image: "basic/alpine", @@ -83,24 +92,24 @@ func TestPprof(t *testing.T) { } // End early if the expected files exist and have data. - for start := time.Now(); time.Since(start) < tc.pprof.Duration; time.Sleep(500 * time.Millisecond) { - if err := checkFiles(tc); err == nil { + for start := time.Now(); time.Since(start) < localProfile.Duration; time.Sleep(100 * time.Millisecond) { + if err := checkFiles(localProfile.BasePath, tc.expectedFiles); err == nil { break } } }() // Check all expected files exist and have data. - if err := checkFiles(tc); err != nil { + if err := checkFiles(localProfile.BasePath, tc.expectedFiles); err != nil { t.Fatalf(err.Error()) } }) } } -func checkFiles(tc testCase) error { - for _, file := range tc.expectedFiles { - stat, err := os.Stat(filepath.Join(tc.pprof.BasePath, file)) +func checkFiles(basePath string, expectedFiles []string) error { + for _, file := range expectedFiles { + stat, err := os.Stat(filepath.Join(basePath, file)) if err != nil { return fmt.Errorf("stat failed with: %v", err) } else if stat.Size() < 1 { diff --git a/pkg/urpc/urpc.go b/pkg/urpc/urpc.go index 13b2ea314..0e9a829f6 100644 --- a/pkg/urpc/urpc.go +++ b/pkg/urpc/urpc.go @@ -170,6 +170,9 @@ type Server struct { // methods is the set of server methods. methods map[string]registeredMethod + // stoppers are all registered stoppers. + stoppers []Stopper + // clients is a map of clients. clients map[*unet.Socket]clientState @@ -195,6 +198,12 @@ func NewServerWithCallback(afterRPCCallback func()) *Server { } } +// Stopper is an optional interface, that when implemented, allows an object +// to have a callback executed when the server is shutting down. +type Stopper interface { + Stop() +} + // Register registers the given object as an RPC receiver. // // This functions is the same way as the built-in RPC package, but it does not @@ -206,6 +215,7 @@ func (s *Server) Register(obj interface{}) { defer s.mu.Unlock() typ := reflect.TypeOf(obj) + stopper, hasStop := obj.(Stopper) // If we got a pointer, deref it to the underlying object. We need this to // obtain the name of the underlying type. @@ -221,6 +231,10 @@ func (s *Server) Register(obj interface{}) { // Can't be anonymous. panic("type not named.") } + if hasStop && method.Name == "Stop" { + s.stoppers = append(s.stoppers, stopper) + continue // Legal stop method. + } prettyName := typDeref.Name() + "." + method.Name if _, ok := s.methods[prettyName]; ok { @@ -283,12 +297,10 @@ func (s *Server) handleOne(client *unet.Socket) error { // Client is dead. return err } + if s.afterRPCCallback != nil { + defer s.afterRPCCallback() + } - defer func() { - if s.afterRPCCallback != nil { - s.afterRPCCallback() - } - }() // Explicitly close all these files after the call. // // This is also explicitly a reference to the files after the call, @@ -450,6 +462,11 @@ func (s *Server) Stop() { // Wait for all outstanding requests. defer s.wg.Wait() + // Call any Stop callbacks. + for _, stopper := range s.stoppers { + stopper.Stop() + } + // Close all known clients. s.mu.Lock() defer s.mu.Unlock() |