summaryrefslogtreecommitdiffhomepage
path: root/pkg/shim/v2/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/shim/v2/service.go')
-rw-r--r--pkg/shim/v2/service.go102
1 files changed, 51 insertions, 51 deletions
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go
index c67b1beba..1534152fc 100644
--- a/pkg/shim/v2/service.go
+++ b/pkg/shim/v2/service.go
@@ -27,34 +27,34 @@ import (
"github.com/BurntSushi/toml"
"github.com/containerd/cgroups"
- metrics "github.com/containerd/cgroups/stats/v1"
"github.com/containerd/console"
- eventstypes "github.com/containerd/containerd/api/events"
+ "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
- rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
- runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1"
+ "github.com/containerd/containerd/sys/reaper"
"github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
+ "github.com/gogo/protobuf/types"
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/shim/runsc"
"gvisor.dev/gvisor/pkg/shim/v1/proc"
"gvisor.dev/gvisor/pkg/shim/v1/utils"
"gvisor.dev/gvisor/pkg/shim/v2/options"
+ "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions"
"gvisor.dev/gvisor/runsc/specutils"
)
var (
- empty = &ptypes.Empty{}
+ empty = &types.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
@@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{})
const configFile = "config.toml"
// New returns a new shim service that can be used via GRPC.
-func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
+func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) {
ep, err := newOOMEpoller(publisher)
if err != nil {
return nil, err
}
- ctx, cancel := context.WithCancel(ctx)
go ep.run(ctx)
s := &service{
id: id,
context: ctx,
- processes: make(map[string]rproc.Process),
+ processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
oomPoller: ep,
cancel: cancel,
}
go s.processExits()
- runsc.Monitor = shim.Default
+ runsc.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
@@ -101,10 +100,10 @@ type service struct {
mu sync.Mutex
context context.Context
- task rproc.Process
- processes map[string]rproc.Process
+ task process.Process
+ processes map[string]process.Process
events chan interface{}
- platform rproc.Platform
+ platform stdio.Platform
opts options.Options
ec chan proc.Exit
oomPoller *epoller
@@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string)
return cmd, nil
}
-func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
+func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, containerdBinary, containerdAddress)
if err != nil {
return "", err
@@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
break
}
if o.TypeUrl != options.OptionType {
- return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl)
+ return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl)
}
path = o.ConfigPath
default:
@@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
// Exec spawns an additional process inside the container.
-func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
+func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) {
s.mu.Lock()
p := s.processes[r.ExecID]
s.mu.Unlock()
@@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}
// ResizePty resizes the terminal of a process.
-func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
+func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
}
// Pause the container.
-func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
+func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Resume the container.
-func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
+func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Kill a process with the provided signal.
-func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
+func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
}
// CloseIO closes the I/O context of a process.
-func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
+func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
}
// Checkpoint checkpoints the container.
-func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}, nil
}
-func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
+func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) {
s.cancel()
os.Exit(0)
return empty, nil
@@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
// gvisor currently (as of 2020-03-03) only returns the total memory
// usage and current PID value[0]. However, we copy the common fields here
// so that future updates will propagate correct information. We're
- // using the metrics.Metrics structure so we're returning the same type
+ // using the cgroups.Metrics structure so we're returning the same type
// as runc.
//
// [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81
- data, err := typeurl.MarshalAny(&metrics.Metrics{
- CPU: &metrics.CPUStat{
- Usage: &metrics.CPUUsage{
+ data, err := typeurl.MarshalAny(&cgroups.Metrics{
+ CPU: &cgroups.CPUStat{
+ Usage: &cgroups.CPUUsage{
Total: stats.Cpu.Usage.Total,
Kernel: stats.Cpu.Usage.Kernel,
User: stats.Cpu.Usage.User,
PerCPU: stats.Cpu.Usage.Percpu,
},
- Throttling: &metrics.Throttle{
+ Throttling: &cgroups.Throttle{
Periods: stats.Cpu.Throttling.Periods,
ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods,
ThrottledTime: stats.Cpu.Throttling.ThrottledTime,
},
},
- Memory: &metrics.MemoryStat{
+ Memory: &cgroups.MemoryStat{
Cache: stats.Memory.Cache,
- Usage: &metrics.MemoryEntry{
+ Usage: &cgroups.MemoryEntry{
Limit: stats.Memory.Usage.Limit,
Usage: stats.Memory.Usage.Usage,
Max: stats.Memory.Usage.Max,
Failcnt: stats.Memory.Usage.Failcnt,
},
- Swap: &metrics.MemoryEntry{
+ Swap: &cgroups.MemoryEntry{
Limit: stats.Memory.Swap.Limit,
Usage: stats.Memory.Swap.Usage,
Max: stats.Memory.Swap.Max,
Failcnt: stats.Memory.Swap.Failcnt,
},
- Kernel: &metrics.MemoryEntry{
+ Kernel: &cgroups.MemoryEntry{
Limit: stats.Memory.Kernel.Limit,
Usage: stats.Memory.Kernel.Usage,
Max: stats.Memory.Kernel.Max,
Failcnt: stats.Memory.Kernel.Failcnt,
},
- KernelTCP: &metrics.MemoryEntry{
+ KernelTCP: &cgroups.MemoryEntry{
Limit: stats.Memory.KernelTCP.Limit,
Usage: stats.Memory.KernelTCP.Usage,
Max: stats.Memory.KernelTCP.Max,
Failcnt: stats.Memory.KernelTCP.Failcnt,
},
},
- Pids: &metrics.PidsStat{
+ Pids: &cgroups.PidsStat{
Current: stats.Pids.Current,
Limit: stats.Pids.Limit,
},
@@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}
// Update updates a running container.
-func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
p.SetExited(e.Status)
- s.events <- &eventstypes.TaskExit{
+ s.events <- &events.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(p.Pid()),
@@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
-func (s *service) allProcesses() (o []rproc.Process) {
+func (s *service) allProcesses() (o []process.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
@@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil
}
-func (s *service) forward(publisher events.Publisher) {
+func (s *service) forward(publisher shim.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
- fmt.Errorf("post event: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("post event: %w", err))
}
}
}
-func (s *service) getProcess(execID string) (rproc.Process, error) {
+func (s *service) getProcess(execID string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
if execID == "" {
@@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
func getTopic(e interface{}) string {
switch e.(type) {
- case *eventstypes.TaskCreate:
+ case *events.TaskCreate:
return runtime.TaskCreateEventTopic
- case *eventstypes.TaskStart:
+ case *events.TaskStart:
return runtime.TaskStartEventTopic
- case *eventstypes.TaskOOM:
+ case *events.TaskOOM:
return runtime.TaskOOMEventTopic
- case *eventstypes.TaskExit:
+ case *events.TaskExit:
return runtime.TaskExitEventTopic
- case *eventstypes.TaskDelete:
+ case *events.TaskDelete:
return runtime.TaskDeleteEventTopic
- case *eventstypes.TaskExecAdded:
+ case *events.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
- case *eventstypes.TaskExecStarted:
+ case *events.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
default:
log.L.Printf("no topic for type %#v", e)
@@ -795,7 +795,7 @@ func getTopic(e interface{}) string {
return runtime.TaskUnknownTopic
}
-func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
+func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
spec, err := utils.ReadSpec(r.Bundle)
if err != nil {
return nil, fmt.Errorf("read oci spec: %w", err)
@@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
}
runsc.FormatLogPath(r.ID, options.RunscConfig)
runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig)
- p := proc.New(r.ID, runtime, rproc.Stdio{
+ p := proc.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
p.IoGID = int(options.IoGid)
p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox
p.UserLog = utils.UserLogPath(spec)
- p.Monitor = shim.Default
+ p.Monitor = reaper.Default
return p, nil
}