diff options
Diffstat (limited to 'pkg/shim/v2')
-rw-r--r-- | pkg/shim/v2/BUILD | 8 | ||||
-rw-r--r-- | pkg/shim/v2/api.go | 22 | ||||
-rw-r--r-- | pkg/shim/v2/epoll.go | 11 | ||||
-rw-r--r-- | pkg/shim/v2/runtimeoptions/BUILD | 20 | ||||
-rw-r--r-- | pkg/shim/v2/runtimeoptions/runtimeoptions.go | 27 | ||||
-rw-r--r-- | pkg/shim/v2/runtimeoptions/runtimeoptions.proto | 25 | ||||
-rw-r--r-- | pkg/shim/v2/service.go | 102 | ||||
-rw-r--r-- | pkg/shim/v2/service_linux.go | 6 |
8 files changed, 157 insertions, 64 deletions
diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD index 450f62979..7e0a114a0 100644 --- a/pkg/shim/v2/BUILD +++ b/pkg/shim/v2/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "v2", srcs = [ + "api.go", "epoll.go", "service.go", "service_linux.go", @@ -15,10 +16,10 @@ go_library( "//pkg/shim/v1/proc", "//pkg/shim/v1/utils", "//pkg/shim/v2/options", + "//pkg/shim/v2/runtimeoptions", "//runsc/specutils", "@com_github_burntsushi_toml//:go_default_library", "@com_github_containerd_cgroups//:go_default_library", - "@com_github_containerd_cgroups//stats/v1:go_default_library", "@com_github_containerd_console//:go_default_library", "@com_github_containerd_containerd//api/events:go_default_library", "@com_github_containerd_containerd//api/types/task:go_default_library", @@ -27,12 +28,13 @@ go_library( "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_containerd//runtime:go_default_library", "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", "@com_github_containerd_containerd//runtime/v2/shim:go_default_library", "@com_github_containerd_containerd//runtime/v2/task:go_default_library", - "@com_github_containerd_cri//pkg/api/runtimeoptions/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_typeurl//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go new file mode 100644 index 000000000..dbe5c59f6 --- /dev/null +++ b/pkg/shim/v2/api.go @@ -0,0 +1,22 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskOOM = events.TaskOOM diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go index 45cc38c2a..41232cca8 100644 --- a/pkg/shim/v2/epoll.go +++ b/pkg/shim/v2/epoll.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/containerd/cgroups" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/events" "github.com/containerd/containerd/runtime" "golang.org/x/sys/unix" @@ -68,10 +67,11 @@ func (e *epoller) run(ctx context.Context) { default: n, err := unix.EpollWait(e.fd, events[:], -1) if err != nil { - if err == unix.EINTR { + if err == unix.EINTR || err == unix.EAGAIN { continue } - fmt.Errorf("cgroups: epoll wait: %w", err) + // Should not happen. + panic(fmt.Errorf("cgroups: epoll wait: %w", err)) } for i := 0; i < n; i++ { e.process(ctx, uintptr(events[i].Fd)) @@ -114,10 +114,11 @@ func (e *epoller) process(ctx context.Context, fd uintptr) { unix.Close(int(fd)) return } - if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{ + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{ ContainerID: i.id, }); err != nil { - fmt.Errorf("publish OOM event: %w", err) + // Should not happen. + panic(fmt.Errorf("publish OOM event: %w", err)) } } diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD new file mode 100644 index 000000000..01716034c --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/BUILD @@ -0,0 +1,20 @@ +load("//tools:defs.bzl", "go_library", "proto_library") + +package(licenses = ["notice"]) + +proto_library( + name = "api", + srcs = [ + "runtimeoptions.proto", + ], +) + +go_library( + name = "runtimeoptions", + srcs = ["runtimeoptions.go"], + visibility = ["//pkg/shim/v2:__pkg__"], + deps = [ + "//pkg/shim/v2/runtimeoptions:api_go_proto", + "@com_github_gogo_protobuf//proto:go_default_library", + ], +) diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go new file mode 100644 index 000000000..1c1a0c5d1 --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go @@ -0,0 +1,27 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtimeoptions + +import ( + proto "github.com/gogo/protobuf/proto" + pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto" +) + +type Options = pb.Options + +func init() { + proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options") +} diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto new file mode 100644 index 000000000..edb19020a --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto @@ -0,0 +1,25 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package runtimeoptions; + +// This is a version of the runtimeoptions CRI API that is vendored. +// +// Imported the full CRI package is a nightmare. +message Options { + string type_url = 1; + string config_path = 2; +} diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index c67b1beba..1534152fc 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -27,34 +27,34 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" - metrics "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" - runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/shim/runsc" "gvisor.dev/gvisor/pkg/shim/v1/proc" "gvisor.dev/gvisor/pkg/shim/v1/utils" "gvisor.dev/gvisor/pkg/shim/v2/options" + "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" "gvisor.dev/gvisor/runsc/specutils" ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{}) const configFile = "config.toml" // New returns a new shim service that can be used via GRPC. -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { ep, err := newOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, oomPoller: ep, cancel: cancel, } go s.processExits() - runsc.Monitor = shim.Default + runsc.Monitor = reaper.Default if err := s.initPlatform(); err != nil { cancel() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) @@ -101,10 +100,10 @@ type service struct { mu sync.Mutex context context.Context - task rproc.Process - processes map[string]rproc.Process + task process.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform opts options.Options ec chan proc.Exit oomPoller *epoller @@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) return cmd, nil } -func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { cmd, err := newCommand(ctx, containerdBinary, containerdAddress) if err != nil { return "", err @@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * break } if o.TypeUrl != options.OptionType { - return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl) + return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) } path = o.ConfigPath default: @@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } // Exec spawns an additional process inside the container. -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() p := s.processes[r.ExecID] s.mu.Unlock() @@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } // ResizePty resizes the terminal of a process. -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. } // Pause the container. -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume the container. -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill a process with the provided signal. -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi } // CloseIO closes the I/O context of a process. -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task }, nil } -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { s.cancel() os.Exit(0) return empty, nil @@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // gvisor currently (as of 2020-03-03) only returns the total memory // usage and current PID value[0]. However, we copy the common fields here // so that future updates will propagate correct information. We're - // using the metrics.Metrics structure so we're returning the same type + // using the cgroups.Metrics structure so we're returning the same type // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - data, err := typeurl.MarshalAny(&metrics.Metrics{ - CPU: &metrics.CPUStat{ - Usage: &metrics.CPUUsage{ + data, err := typeurl.MarshalAny(&cgroups.Metrics{ + CPU: &cgroups.CPUStat{ + Usage: &cgroups.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &metrics.Throttle{ + Throttling: &cgroups.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &metrics.MemoryStat{ + Memory: &cgroups.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &metrics.MemoryEntry{ + Usage: &cgroups.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &metrics.MemoryEntry{ + Swap: &cgroups.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &metrics.MemoryEntry{ + Kernel: &cgroups.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &metrics.MemoryEntry{ + KernelTCP: &cgroups.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &metrics.PidsStat{ + Pids: &cgroups.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, @@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. } // Update updates a running container. -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &events.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } -func (s *service) allProcesses() (o []rproc.Process) { +func (s *service) allProcesses() (o []process.Process) { s.mu.Lock() defer s.mu.Unlock() for _, p := range s.processes { @@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(publisher shim.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) cancel() if err != nil { - fmt.Errorf("post event: %w", err) + // Should not happen. + panic(fmt.Errorf("post event: %w", err)) } } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getProcess(execID string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() if execID == "" { @@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { func getTopic(e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *events.TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *events.TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *events.TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *events.TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *events.TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *events.TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *events.TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -795,7 +795,7 @@ func getTopic(e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { spec, err := utils.ReadSpec(r.Bundle) if err != nil { return nil, fmt.Errorf("read oci spec: %w", err) @@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro } runsc.FormatLogPath(r.ID, options.RunscConfig) runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro p.IoGID = int(options.IoGid) p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go index 257c58812..1800ab90b 100644 --- a/pkg/shim/v2/service_linux.go +++ b/pkg/shim/v2/service_linux.go @@ -32,7 +32,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, fmt.Errorf("uninitialized epoller") } @@ -47,9 +47,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(epollConsole, in, *p) @@ -65,9 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(outw, epollConsole, *p) |