diff options
Diffstat (limited to 'pkg/shim/v2/service.go')
-rw-r--r-- | pkg/shim/v2/service.go | 429 |
1 files changed, 279 insertions, 150 deletions
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index 1534152fc..6aaf5fab8 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package v2 implements Containerd Shim v2 interface. package v2 import ( "context" "fmt" - "io/ioutil" + "io" "os" "os/exec" "path/filepath" @@ -27,6 +28,7 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" + cgroupsstats "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" @@ -43,12 +45,13 @@ import ( "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/cleanup" "gvisor.dev/gvisor/pkg/shim/runsc" "gvisor.dev/gvisor/pkg/shim/v1/proc" "gvisor.dev/gvisor/pkg/shim/v1/utils" - "gvisor.dev/gvisor/pkg/shim/v2/options" "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" "gvisor.dev/gvisor/runsc/specutils" ) @@ -65,55 +68,108 @@ var ( var _ = (taskAPI.TaskService)(&service{}) -// configFile is the default config file name. For containerd 1.2, -// we assume that a config.toml should exist in the runtime root. -const configFile = "config.toml" +const ( + // configFile is the default config file name. For containerd 1.2, + // we assume that a config.toml should exist in the runtime root. + configFile = "config.toml" + + // shimAddressPath is the relative path to a file that contains the address + // to the shim UDS. See service.shimAddress. + shimAddressPath = "address" +) // New returns a new shim service that can be used via GRPC. func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { + log.L.Debugf("service.New, id: %s", id) + + var opts shim.Opts + if ctxOpts := ctx.Value(shim.OptsKey{}); ctxOpts != nil { + opts = ctxOpts.(shim.Opts) + } + ep, err := newOOMEpoller(publisher) if err != nil { return nil, err } go ep.run(ctx) s := &service{ - id: id, - context: ctx, - processes: make(map[string]process.Process), - events: make(chan interface{}, 128), - ec: proc.ExitCh, - oomPoller: ep, - cancel: cancel, - } - go s.processExits() - runsc.Monitor = reaper.Default + id: id, + processes: make(map[string]process.Process), + events: make(chan interface{}, 128), + ec: proc.ExitCh, + oomPoller: ep, + cancel: cancel, + genericOptions: opts, + } + go s.processExits(ctx) + runsc.Monitor = &runsc.LogMonitor{Next: reaper.Default} if err := s.initPlatform(); err != nil { cancel() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) } - go s.forward(publisher) + go s.forward(ctx, publisher) + + if address, err := shim.ReadAddress(shimAddressPath); err == nil { + s.shimAddress = address + } + return s, nil } -// service is the shim implementation of a remote shim over GRPC. +// service is the shim implementation of a remote shim over GRPC. It runs in 2 +// different modes: +// 1. Service: process runs for the life time of the container and receives +// calls described in shimapi.TaskService interface. +// 2. Tool: process is short lived and runs only to perform the requested +// operations and then exits. It implements the direct functions in +// shim.Shim interface. +// +// When the service is running, it saves a json file with state information so +// that commands sent to the tool can load the state and perform the operation. type service struct { mu sync.Mutex - context context.Context - task process.Process + // id is the container ID. + id string + + // bundle is a path provided by the caller on container creation. Store + // because it's needed in commands that don't receive bundle in the request. + bundle string + + // task is the main process that is running the container. + task *proc.Init + + // processes maps ExecId to processes running through exec. processes map[string]process.Process - events chan interface{} - platform stdio.Platform - opts options.Options - ec chan proc.Exit + + events chan interface{} + + // platform handles operations related to the console. + platform stdio.Platform + + // genericOptions are options that come from the shim interface and are common + // to all shims. + genericOptions shim.Opts + + // opts are configuration options specific for this shim. + opts options + + // ex gets notified whenever the container init process or an exec'd process + // exits from inside the sandbox. + ec chan proc.Exit + + // oomPoller monitors the sandbox's cgroup for OOM notifications. oomPoller *epoller - id string - bundle string + // cancel is a function that needs to be called before the shim stops. The + // function is provided by the caller to New(). cancel func() + + // shimAddress is the location of the UDS used to communicate to containerd. + shimAddress string } -func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { +func (s *service) newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -131,6 +187,9 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) "-address", containerdAddress, "-publish-binary", containerdBinary, } + if s.genericOptions.Debug { + args = append(args, "-debug") + } cmd := exec.Command(self, args...) cmd.Dir = cwd cmd.Env = append(os.Environ(), "GOMAXPROCS=2") @@ -141,50 +200,78 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) } func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { - cmd, err := newCommand(ctx, containerdBinary, containerdAddress) + log.L.Debugf("StartShim, id: %s, binary: %q, address: %q", id, containerdBinary, containerdAddress) + + cmd, err := s.newCommand(ctx, containerdBinary, containerdAddress) if err != nil { return "", err } - address, err := shim.SocketAddress(ctx, id) + address, err := shim.SocketAddress(ctx, containerdAddress, id) if err != nil { return "", err } socket, err := shim.NewSocket(address) if err != nil { - return "", err + // The only time where this would happen is if there is a bug and the socket + // was not cleaned up in the cleanup method of the shim or we are using the + // grouping functionality where the new process should be run with the same + // shim as an existing container. + if !shim.SocketEaddrinuse(err) { + return "", fmt.Errorf("create new shim socket: %w", err) + } + if shim.CanConnect(address) { + if err := shim.WriteAddress(shimAddressPath, address); err != nil { + return "", fmt.Errorf("write existing socket for shim: %w", err) + } + return address, nil + } + if err := shim.RemoveSocket(address); err != nil { + return "", fmt.Errorf("remove pre-existing socket: %w", err) + } + if socket, err = shim.NewSocket(address); err != nil { + return "", fmt.Errorf("try create new shim socket 2x: %w", err) + } } - defer socket.Close() + cu := cleanup.Make(func() { + socket.Close() + _ = shim.RemoveSocket(address) + }) + defer cu.Clean() + f, err := socket.File() if err != nil { return "", err } - defer f.Close() cmd.ExtraFiles = append(cmd.ExtraFiles, f) + log.L.Debugf("Executing: %q %s", cmd.Path, cmd.Args) if err := cmd.Start(); err != nil { + f.Close() return "", err } - defer func() { - if err != nil { - cmd.Process.Kill() - } - }() + cu.Add(func() { cmd.Process.Kill() }) + // make sure to wait after start go cmd.Wait() if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { return "", err } - if err := shim.WriteAddress("address", address); err != nil { + if err := shim.WriteAddress(shimAddressPath, address); err != nil { return "", err } if err := shim.SetScore(cmd.Process.Pid); err != nil { return "", fmt.Errorf("failed to set OOM Score on shim: %w", err) } + cu.Release() return address, nil } +// Cleanup is called from another process (need to reload state) to stop the +// container and undo all operations done in Create(). func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + log.L.Debugf("Cleanup") + path, err := os.Getwd() if err != nil { return nil, err @@ -193,18 +280,19 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - runtime, err := s.readRuntime(path) - if err != nil { + var st state + if err := st.load(path); err != nil { return nil, err } - r := proc.NewRunsc(s.opts.Root, path, ns, runtime, nil) + r := proc.NewRunsc(s.opts.Root, path, ns, st.Options.BinaryName, nil) + if err := r.Delete(ctx, s.id, &runsc.DeleteOpts{ Force: true, }); err != nil { - log.L.Printf("failed to remove runc container: %v", err) + log.L.Infof("failed to remove runc container: %v", err) } - if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { - log.L.Printf("failed to cleanup rootfs mount: %v", err) + if err := mount.UnmountAll(st.Rootfs, 0); err != nil { + log.L.Infof("failed to cleanup rootfs mount: %v", err) } return &taskAPI.DeleteResponse{ ExitedAt: time.Now(), @@ -212,31 +300,24 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) }, nil } -func (s *service) readRuntime(path string) (string, error) { - data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) - if err != nil { - return "", err - } - return string(data), nil -} - -func (s *service) writeRuntime(path, runtime string) error { - return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) -} - // Create creates a new initial process and container with the underlying OCI // runtime. -func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) { + log.L.Debugf("Create, id: %s, bundle: %q", r.ID, r.Bundle) + s.mu.Lock() defer s.mu.Unlock() + // Save the main task id and bundle to the shim for additional requests. + s.id = r.ID + s.bundle = r.Bundle + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, fmt.Errorf("create namespace: %w", err) } // Read from root for now. - var opts options.Options if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) if err != nil { @@ -245,16 +326,16 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * var path string switch o := v.(type) { case *runctypes.CreateOptions: // containerd 1.2.x - opts.IoUid = o.IoUid - opts.IoGid = o.IoGid - opts.ShimCgroup = o.ShimCgroup + s.opts.IoUID = o.IoUid + s.opts.IoGID = o.IoGid + s.opts.ShimCgroup = o.ShimCgroup case *runctypes.RuncOptions: // containerd 1.2.x root := proc.RunscRoot if o.RuntimeRoot != "" { root = o.RuntimeRoot } - opts.BinaryName = o.Runtime + s.opts.BinaryName = o.Runtime path = filepath.Join(root, configFile) if _, err := os.Stat(path); err != nil { @@ -268,7 +349,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * if o.ConfigPath == "" { break } - if o.TypeUrl != options.OptionType { + if o.TypeUrl != optionsType { return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) } path = o.ConfigPath @@ -276,12 +357,61 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * return nil, fmt.Errorf("unsupported option type %q", r.Options.TypeUrl) } if path != "" { - if _, err = toml.DecodeFile(path, &opts); err != nil { + if _, err = toml.DecodeFile(path, &s.opts); err != nil { return nil, fmt.Errorf("decode config file %q: %w", path, err) } } } + if len(s.opts.LogLevel) != 0 { + lvl, err := logrus.ParseLevel(s.opts.LogLevel) + if err != nil { + return nil, err + } + logrus.SetLevel(lvl) + } + if len(s.opts.LogPath) != 0 { + logPath := runsc.FormatShimLogPath(s.opts.LogPath, s.id) + if err := os.MkdirAll(filepath.Dir(logPath), 0777); err != nil { + return nil, fmt.Errorf("failed to create log dir: %w", err) + } + logFile, err := os.Create(logPath) + if err != nil { + return nil, fmt.Errorf("failed to create log file: %w", err) + } + log.L.Debugf("Starting mirror log at %q", logPath) + std := logrus.StandardLogger() + std.SetOutput(io.MultiWriter(std.Out, logFile)) + + log.L.Debugf("Create shim") + log.L.Debugf("***************************") + log.L.Debugf("Args: %s", os.Args) + log.L.Debugf("PID: %d", os.Getpid()) + log.L.Debugf("ID: %s", s.id) + log.L.Debugf("Options: %+v", s.opts) + log.L.Debugf("Bundle: %s", r.Bundle) + log.L.Debugf("Terminal: %t", r.Terminal) + log.L.Debugf("stdin: %s", r.Stdin) + log.L.Debugf("stdout: %s", r.Stdout) + log.L.Debugf("stderr: %s", r.Stderr) + log.L.Debugf("***************************") + } + + // Save state before any action is taken to ensure Cleanup() will have all + // the information it needs to undo the operations. + st := state{ + Rootfs: filepath.Join(r.Bundle, "rootfs"), + Options: s.opts, + } + if err := st.save(r.Bundle); err != nil { + return nil, err + } + + if err := os.Mkdir(st.Rootfs, 0711); err != nil && !os.IsExist(err) { + return nil, err + } + + // Convert from types.Mount to proc.Mount. var mounts []proc.Mount for _, m := range r.Rootfs { mounts = append(mounts, proc.Mount{ @@ -292,62 +422,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * }) } - rootfs := filepath.Join(r.Bundle, "rootfs") - if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) { - return nil, err + // Cleans up all mounts in case of failure. + cu := cleanup.Make(func() { + if err := mount.UnmountAll(st.Rootfs, 0); err != nil { + log.L.Infof("failed to cleanup rootfs mount: %v", err) + } + }) + defer cu.Clean() + for _, rm := range mounts { + m := &mount.Mount{ + Type: rm.Type, + Source: rm.Source, + Options: rm.Options, + } + if err := m.Mount(st.Rootfs); err != nil { + return nil, fmt.Errorf("failed to mount rootfs component %v: %w", m, err) + } } config := &proc.CreateConfig{ ID: r.ID, Bundle: r.Bundle, - Runtime: opts.BinaryName, + Runtime: s.opts.BinaryName, Rootfs: mounts, Terminal: r.Terminal, Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, - Options: r.Options, - } - if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil { - return nil, err } - defer func() { - if err != nil { - if err := mount.UnmountAll(rootfs, 0); err != nil { - log.L.Printf("failed to cleanup rootfs mount: %v", err) - } - } - }() - for _, rm := range mounts { - m := &mount.Mount{ - Type: rm.Type, - Source: rm.Source, - Options: rm.Options, - } - if err := m.Mount(rootfs); err != nil { - return nil, fmt.Errorf("failed to mount rootfs component %v: %w", m, err) - } - } - process, err := newInit( - ctx, - r.Bundle, - filepath.Join(r.Bundle, "work"), - ns, - s.platform, - config, - &opts, - rootfs, - ) + process, err := newInit(r.Bundle, filepath.Join(r.Bundle, "work"), ns, s.platform, config, &s.opts, st.Rootfs) if err != nil { return nil, errdefs.ToGRPC(err) } if err := process.Create(ctx, config); err != nil { return nil, errdefs.ToGRPC(err) } - // Save the main task id and bundle to the shim for additional - // requests. - s.id = r.ID - s.bundle = r.Bundle // Set up OOM notification on the sandbox's cgroup. This is done on // sandbox create since the sandbox process will be created here. @@ -361,16 +470,19 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * return nil, fmt.Errorf("add cg to OOM monitor: %w", err) } } + + // Success + cu.Release() s.task = process - s.opts = opts return &taskAPI.CreateTaskResponse{ Pid: uint32(process.Pid()), }, nil - } // Start starts a process. func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + log.L.Debugf("Start, id: %s, execID: %s", r.ID, r.ExecID) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -387,6 +499,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete deletes the initial process and container. func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + log.L.Debugf("Delete, id: %s, execID: %s", r.ID, r.ExecID) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -397,13 +511,11 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP if err := p.Delete(ctx); err != nil { return nil, err } - isTask := r.ExecID == "" - if !isTask { + if len(r.ExecID) != 0 { s.mu.Lock() delete(s.processes, r.ExecID) s.mu.Unlock() - } - if isTask && s.platform != nil { + } else if s.platform != nil { s.platform.Close() } return &taskAPI.DeleteResponse{ @@ -415,17 +527,18 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec spawns an additional process inside the container. func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { + log.L.Debugf("Exec, id: %s, execID: %s", r.ID, r.ExecID) + s.mu.Lock() p := s.processes[r.ExecID] s.mu.Unlock() if p != nil { return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) } - p = s.task - if p == nil { + if s.task == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{ + process, err := s.task.Exec(ctx, s.bundle, &proc.ExecConfig{ ID: r.ExecID, Terminal: r.Terminal, Stdin: r.Stdin, @@ -444,6 +557,8 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*typ // ResizePty resizes the terminal of a process. func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { + log.L.Debugf("ResizePty, id: %s, execID: %s, dimension: %dx%d", r.ID, r.ExecID, r.Height, r.Width) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -460,6 +575,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (* // State returns runtime state information for a process. func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + log.L.Debugf("State, id: %s, execID: %s", r.ID, r.ExecID) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -494,16 +611,20 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container. func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { + log.L.Debugf("Pause, id: %s", r.ID) return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume the container. func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { + log.L.Debugf("Resume, id: %s", r.ID) return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill a process with the provided signal. func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { + log.L.Debugf("Kill, id: %s, execID: %s, signal: %d, all: %t", r.ID, r.ExecID, r.Signal, r.All) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -519,6 +640,8 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empt // Pids returns all pids inside the container. func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + log.L.Debugf("Pids, id: %s", r.ID) + pids, err := s.getContainerPids(ctx, r.ID) if err != nil { return nil, errdefs.ToGRPC(err) @@ -550,6 +673,8 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO closes the I/O context of a process. func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { + log.L.Debugf("CloseIO, id: %s, execID: %s, stdin: %t", r.ID, r.ExecID, r.Stdin) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -564,11 +689,14 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*type // Checkpoint checkpoints the container. func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { + log.L.Debugf("Checkpoint, id: %s", r.ID) return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Connect returns shim information such as the shim's pid. func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + log.L.Debugf("Connect, id: %s", r.ID) + var pid int if s.task != nil { pid = s.task.Pid() @@ -580,27 +708,24 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { + log.L.Debugf("Shutdown, id: %s", r.ID) s.cancel() + if s.shimAddress != "" { + _ = shim.RemoveSocket(s.shimAddress) + } os.Exit(0) - return empty, nil + panic("Should not get here") } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - path, err := os.Getwd() - if err != nil { - return nil, err - } - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return nil, err - } - runtime, err := s.readRuntime(path) - if err != nil { - return nil, err + log.L.Debugf("Stats, id: %s", r.ID) + if s.task == nil { + log.L.Debugf("Stats error, id: %s: container not created", r.ID) + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - rs := proc.NewRunsc(s.opts.Root, path, ns, runtime, nil) - stats, err := rs.Stats(ctx, s.id) + stats, err := s.task.Runtime().Stats(ctx, s.id) if err != nil { + log.L.Debugf("Stats error, id: %s: %v", r.ID, err) return nil, err } @@ -611,55 +736,58 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - data, err := typeurl.MarshalAny(&cgroups.Metrics{ - CPU: &cgroups.CPUStat{ - Usage: &cgroups.CPUUsage{ + metrics := &cgroupsstats.Metrics{ + CPU: &cgroupsstats.CPUStat{ + Usage: &cgroupsstats.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &cgroups.Throttle{ + Throttling: &cgroupsstats.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &cgroups.MemoryStat{ + Memory: &cgroupsstats.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &cgroups.MemoryEntry{ + Usage: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &cgroups.MemoryEntry{ + Swap: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &cgroups.MemoryEntry{ + Kernel: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &cgroups.MemoryEntry{ + KernelTCP: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &cgroups.PidsStat{ + Pids: &cgroupsstats.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, - }) + } + data, err := typeurl.MarshalAny(metrics) if err != nil { + log.L.Debugf("Stats error, id: %s: %v", r.ID, err) return nil, err } + log.L.Debugf("Stats success, id: %s: %+v", r.ID, data) return &taskAPI.StatsResponse{ Stats: data, }, nil @@ -672,6 +800,8 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ty // Wait waits for a process to exit. func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + log.L.Debugf("Wait, id: %s, execID: %s", r.ID, r.ExecID) + p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -687,21 +817,22 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa }, nil } -func (s *service) processExits() { +func (s *service) processExits(ctx context.Context) { for e := range s.ec { - s.checkProcesses(e) + s.checkProcesses(ctx, e) } } -func (s *service) checkProcesses(e proc.Exit) { +func (s *service) checkProcesses(ctx context.Context, e proc.Exit) { // TODO(random-liu): Add `shouldKillAll` logic if container pid // namespace is supported. for _, p := range s.allProcesses() { if p.ID() == e.ID { if ip, ok := p.(*proc.Init); ok { // Ensure all children are killed. - if err := ip.KillAll(s.context); err != nil { - log.G(s.context).WithError(err).WithField("id", ip.ID()). + log.L.Debugf("Container init process exited, killing all container processes") + if err := ip.KillAll(ctx); err != nil { + log.G(ctx).WithError(err).WithField("id", ip.ID()). Error("failed to kill init's children") } } @@ -737,7 +868,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er if p == nil { return nil, fmt.Errorf("container must be created: %w", errdefs.ErrFailedPrecondition) } - ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + ps, err := p.Runtime().Ps(ctx, id) if err != nil { return nil, err } @@ -748,11 +879,9 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher shim.Publisher) { +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { for e := range s.events { - ctx, cancel := context.WithTimeout(s.context, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) - cancel() if err != nil { // Should not happen. panic(fmt.Errorf("post event: %w", err)) @@ -790,12 +919,12 @@ func getTopic(e interface{}) string { case *events.TaskExecStarted: return runtime.TaskExecStartedEventTopic default: - log.L.Printf("no topic for type %#v", e) + log.L.Infof("no topic for type %#v", e) } return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { +func newInit(path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options, rootfs string) (*proc.Init, error) { spec, err := utils.ReadSpec(r.Bundle) if err != nil { return nil, fmt.Errorf("read oci spec: %w", err) @@ -803,7 +932,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform stdi if err := utils.UpdateVolumeAnnotations(r.Bundle, spec); err != nil { return nil, fmt.Errorf("update volume annotations: %w", err) } - runsc.FormatLogPath(r.ID, options.RunscConfig) + runsc.FormatRunscLogPath(r.ID, options.RunscConfig) runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, @@ -815,8 +944,8 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform stdi p.Platform = platform p.Rootfs = rootfs p.WorkDir = workDir - p.IoUID = int(options.IoUid) - p.IoGID = int(options.IoGid) + p.IoUID = int(options.IoUID) + p.IoGID = int(options.IoGID) p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox p.UserLog = utils.UserLogPath(spec) p.Monitor = reaper.Default |