diff options
Diffstat (limited to 'pkg/shim/v2/service.go')
-rw-r--r-- | pkg/shim/v2/service.go | 102 |
1 files changed, 51 insertions, 51 deletions
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index c67b1beba..1534152fc 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -27,34 +27,34 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" - metrics "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" - runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/shim/runsc" "gvisor.dev/gvisor/pkg/shim/v1/proc" "gvisor.dev/gvisor/pkg/shim/v1/utils" "gvisor.dev/gvisor/pkg/shim/v2/options" + "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" "gvisor.dev/gvisor/runsc/specutils" ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{}) const configFile = "config.toml" // New returns a new shim service that can be used via GRPC. -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { ep, err := newOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, oomPoller: ep, cancel: cancel, } go s.processExits() - runsc.Monitor = shim.Default + runsc.Monitor = reaper.Default if err := s.initPlatform(); err != nil { cancel() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) @@ -101,10 +100,10 @@ type service struct { mu sync.Mutex context context.Context - task rproc.Process - processes map[string]rproc.Process + task process.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform opts options.Options ec chan proc.Exit oomPoller *epoller @@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) return cmd, nil } -func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { cmd, err := newCommand(ctx, containerdBinary, containerdAddress) if err != nil { return "", err @@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * break } if o.TypeUrl != options.OptionType { - return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl) + return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) } path = o.ConfigPath default: @@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } // Exec spawns an additional process inside the container. -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() p := s.processes[r.ExecID] s.mu.Unlock() @@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } // ResizePty resizes the terminal of a process. -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. } // Pause the container. -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume the container. -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill a process with the provided signal. -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi } // CloseIO closes the I/O context of a process. -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task }, nil } -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { s.cancel() os.Exit(0) return empty, nil @@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // gvisor currently (as of 2020-03-03) only returns the total memory // usage and current PID value[0]. However, we copy the common fields here // so that future updates will propagate correct information. We're - // using the metrics.Metrics structure so we're returning the same type + // using the cgroups.Metrics structure so we're returning the same type // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - data, err := typeurl.MarshalAny(&metrics.Metrics{ - CPU: &metrics.CPUStat{ - Usage: &metrics.CPUUsage{ + data, err := typeurl.MarshalAny(&cgroups.Metrics{ + CPU: &cgroups.CPUStat{ + Usage: &cgroups.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &metrics.Throttle{ + Throttling: &cgroups.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &metrics.MemoryStat{ + Memory: &cgroups.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &metrics.MemoryEntry{ + Usage: &cgroups.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &metrics.MemoryEntry{ + Swap: &cgroups.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &metrics.MemoryEntry{ + Kernel: &cgroups.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &metrics.MemoryEntry{ + KernelTCP: &cgroups.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &metrics.PidsStat{ + Pids: &cgroups.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, @@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. } // Update updates a running container. -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &events.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } -func (s *service) allProcesses() (o []rproc.Process) { +func (s *service) allProcesses() (o []process.Process) { s.mu.Lock() defer s.mu.Unlock() for _, p := range s.processes { @@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(publisher shim.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) cancel() if err != nil { - fmt.Errorf("post event: %w", err) + // Should not happen. + panic(fmt.Errorf("post event: %w", err)) } } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getProcess(execID string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() if execID == "" { @@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { func getTopic(e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *events.TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *events.TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *events.TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *events.TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *events.TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *events.TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *events.TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -795,7 +795,7 @@ func getTopic(e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { spec, err := utils.ReadSpec(r.Bundle) if err != nil { return nil, fmt.Errorf("read oci spec: %w", err) @@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro } runsc.FormatLogPath(r.ID, options.RunscConfig) runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro p.IoGID = int(options.IoGid) p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } |