diff options
Diffstat (limited to 'pkg/shim')
27 files changed, 334 insertions, 203 deletions
diff --git a/pkg/shim/runsc/BUILD b/pkg/shim/runsc/BUILD index 4d388ca05..f08599ebd 100644 --- a/pkg/shim/runsc/BUILD +++ b/pkg/shim/runsc/BUILD @@ -11,6 +11,6 @@ go_library( visibility = ["//:sandbox"], deps = [ "@com_github_containerd_go_runc//:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) diff --git a/pkg/shim/runsc/runsc.go b/pkg/shim/runsc/runsc.go index 50807d0c5..c5cf68efa 100644 --- a/pkg/shim/runsc/runsc.go +++ b/pkg/shim/runsc/runsc.go @@ -1,5 +1,5 @@ // Copyright 2018 The containerd Authors. -// Copyright 2018 The gVisor authors. +// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/shim/runsc/utils.go b/pkg/shim/runsc/utils.go index 2732d5e98..c514b3bc7 100644 --- a/pkg/shim/runsc/utils.go +++ b/pkg/shim/runsc/utils.go @@ -1,5 +1,5 @@ // Copyright 2018 The containerd Authors. -// Copyright 2018 The gVisor authors. +// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/shim/v1/proc/BUILD b/pkg/shim/v1/proc/BUILD index a59bf198d..4377306af 100644 --- a/pkg/shim/v1/proc/BUILD +++ b/pkg/shim/v1/proc/BUILD @@ -25,11 +25,12 @@ go_library( "@com_github_containerd_containerd//errdefs:go_default_library", "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_go_runc//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", "@org_golang_x_sys//unix:go_default_library", ], ) diff --git a/pkg/shim/v1/proc/deleted_state.go b/pkg/shim/v1/proc/deleted_state.go index 52aad40ac..d9b970c4d 100644 --- a/pkg/shim/v1/proc/deleted_state.go +++ b/pkg/shim/v1/proc/deleted_state.go @@ -21,29 +21,29 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" ) type deletedState struct{} func (*deletedState) Resize(ws console.WinSize) error { - return fmt.Errorf("cannot resize a deleted process") + return fmt.Errorf("cannot resize a deleted process.ss") } func (*deletedState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a deleted process") + return fmt.Errorf("cannot start a deleted process.ss") } func (*deletedState) Delete(ctx context.Context) error { - return fmt.Errorf("cannot delete a deleted process: %w", errdefs.ErrNotFound) + return fmt.Errorf("cannot delete a deleted process.ss: %w", errdefs.ErrNotFound) } func (*deletedState) Kill(ctx context.Context, sig uint32, all bool) error { - return fmt.Errorf("cannot kill a deleted process: %w", errdefs.ErrNotFound) + return fmt.Errorf("cannot kill a deleted process.ss: %w", errdefs.ErrNotFound) } func (*deletedState) SetExited(status int) {} -func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return nil, fmt.Errorf("cannot exec in a deleted state") } diff --git a/pkg/shim/v1/proc/exec.go b/pkg/shim/v1/proc/exec.go index 4ef9cf6cf..1d1d90488 100644 --- a/pkg/shim/v1/proc/exec.go +++ b/pkg/shim/v1/proc/exec.go @@ -27,7 +27,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -51,7 +51,7 @@ type execProcess struct { internalPid int closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio path string spec specs.Process @@ -164,7 +164,7 @@ func (e *execProcess) Stdin() io.Closer { return e.stdin } -func (e *execProcess) Stdio() proc.Stdio { +func (e *execProcess) Stdio() stdio.Stdio { return e.stdio } @@ -223,7 +223,6 @@ func (e *execProcess) start(ctx context.Context) (err error) { e.closers = append(e.closers, sc) e.stdin = sc } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -231,15 +230,14 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("failed to retrieve console master: %w", err) } - if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return fmt.Errorf("failed to start console copy: %w", err) } } else if !e.stdio.IsNull() { - if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return fmt.Errorf("failed to start io pipe copy: %w", err) } } - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(opts.PidFile) if err != nil { return fmt.Errorf("failed to retrieve OCI runtime exec pid: %w", err) diff --git a/pkg/shim/v1/proc/init.go b/pkg/shim/v1/proc/init.go index b429cb94f..dab3123d6 100644 --- a/pkg/shim/v1/proc/init.go +++ b/pkg/shim/v1/proc/init.go @@ -30,7 +30,8 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -59,7 +60,7 @@ type Init struct { id string Bundle string console console.Console - Platform proc.Platform + Platform stdio.Platform io runc.IO runtime *runsc.Runsc status int @@ -67,7 +68,7 @@ type Init struct { pid int closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio Rootfs string IoUID int IoGID int @@ -92,7 +93,7 @@ func NewRunsc(root, path, namespace, runtime string, config map[string]string) * } // New returns a new init process. -func New(id string, runtime *runsc.Runsc, stdio proc.Stdio) *Init { +func New(id string, runtime *runsc.Runsc, stdio stdio.Stdio) *Init { p := &Init{ id: id, runtime: runtime, @@ -144,7 +145,6 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) { p.stdin = sc p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -152,18 +152,16 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) { if err != nil { return fmt.Errorf("failed to retrieve console master: %w", err) } - console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) if err != nil { return fmt.Errorf("failed to start console copy: %w", err) } p.console = console } else if !hasNoIO(r) { - if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil { + if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg); err != nil { return fmt.Errorf("failed to start io pipe copy: %w", err) } } - - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(pidFile) if err != nil { return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err) @@ -391,7 +389,7 @@ func (p *Init) Runtime() *runsc.Runsc { } // Exec returns a new child process. -func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { p.mu.Lock() defer p.mu.Unlock() @@ -399,7 +397,7 @@ func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce } // exec returns a new exec'd process. -func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { @@ -412,7 +410,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce path: path, parent: p, spec: spec, - stdio: proc.Stdio{ + stdio: stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -425,7 +423,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce } // Stdio returns the stdio of the process. -func (p *Init) Stdio() proc.Stdio { +func (p *Init) Stdio() stdio.Stdio { return p.stdio } @@ -453,7 +451,7 @@ func (p *Init) convertStatus(status string) string { return status } -func withConditionalIO(c proc.Stdio) runc.IOOpt { +func withConditionalIO(c stdio.Stdio) runc.IOOpt { return func(o *runc.IOOption) { o.OpenStdin = c.Stdin != "" o.OpenStdout = c.Stdout != "" diff --git a/pkg/shim/v1/proc/init_state.go b/pkg/shim/v1/proc/init_state.go index 509f27762..9233ecc85 100644 --- a/pkg/shim/v1/proc/init_state.go +++ b/pkg/shim/v1/proc/init_state.go @@ -21,14 +21,14 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" ) type initState interface { Resize(console.WinSize) error Start(context.Context) error Delete(context.Context) error - Exec(context.Context, string, *ExecConfig) (proc.Process, error) + Exec(context.Context, string, *ExecConfig) (process.Process, error) Kill(context.Context, uint32, bool) error SetExited(int) } @@ -94,7 +94,7 @@ func (s *createdState) SetExited(status int) { } } -func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return s.p.exec(ctx, path, r) } @@ -117,11 +117,11 @@ func (s *runningState) Resize(ws console.WinSize) error { } func (s *runningState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a running process") + return fmt.Errorf("cannot start a running process.ss") } func (s *runningState) Delete(ctx context.Context) error { - return fmt.Errorf("cannot delete a running process") + return fmt.Errorf("cannot delete a running process.ss") } func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error { @@ -136,7 +136,7 @@ func (s *runningState) SetExited(status int) { } } -func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return s.p.exec(ctx, path, r) } @@ -159,7 +159,7 @@ func (s *stoppedState) Resize(ws console.WinSize) error { } func (s *stoppedState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a stopped process") + return fmt.Errorf("cannot start a stopped process.ss") } func (s *stoppedState) Delete(ctx context.Context) error { @@ -170,13 +170,13 @@ func (s *stoppedState) Delete(ctx context.Context) error { } func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error { - return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id) + return errdefs.ToGRPCf(errdefs.ErrNotFound, "process.ss %s not found", s.p.id) } func (s *stoppedState) SetExited(status int) { // no op } -func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return nil, fmt.Errorf("cannot exec in a stopped state") } diff --git a/pkg/shim/v1/proc/io.go b/pkg/shim/v1/proc/io.go index 5313c7a50..34d825fb7 100644 --- a/pkg/shim/v1/proc/io.go +++ b/pkg/shim/v1/proc/io.go @@ -38,7 +38,7 @@ var bufPool = sync.Pool{ }, } -func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { +func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error { var sameFile *countingWriteCloser for _, i := range []struct { name string @@ -48,9 +48,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w name: stdout, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { @@ -67,9 +65,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w name: stderr, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { @@ -124,9 +120,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w if err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err) } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) diff --git a/pkg/shim/v1/proc/types.go b/pkg/shim/v1/proc/types.go index 5c215de5f..2b0df4663 100644 --- a/pkg/shim/v1/proc/types.go +++ b/pkg/shim/v1/proc/types.go @@ -18,9 +18,8 @@ package proc import ( "time" - google_protobuf "github.com/gogo/protobuf/types" - runc "github.com/containerd/go-runc" + "github.com/gogo/protobuf/types" ) // Mount holds filesystem mount configuration. @@ -41,7 +40,7 @@ type CreateConfig struct { Stdin string Stdout string Stderr string - Options *google_protobuf.Any + Options *types.Any } // ExecConfig holds exec creation configuration. @@ -51,7 +50,7 @@ type ExecConfig struct { Stdin string Stdout string Stderr string - Spec *google_protobuf.Any + Spec *types.Any } // Exit is the type of exit events. diff --git a/pkg/shim/v1/shim/BUILD b/pkg/shim/v1/shim/BUILD index 02cffbb01..05c595bc9 100644 --- a/pkg/shim/v1/shim/BUILD +++ b/pkg/shim/v1/shim/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "shim", srcs = [ + "api.go", "platform.go", "service.go", ], @@ -24,11 +25,12 @@ go_library( "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_containerd//runtime:go_default_library", "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", - "@com_github_containerd_containerd//runtime/v1/shim:go_default_library", "@com_github_containerd_containerd//runtime/v1/shim/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_typeurl//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/pkg/shim/v1/shim/api.go b/pkg/shim/v1/shim/api.go new file mode 100644 index 000000000..5dd8ff172 --- /dev/null +++ b/pkg/shim/v1/shim/api.go @@ -0,0 +1,28 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shim + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskCreate = events.TaskCreate +type TaskStart = events.TaskStart +type TaskOOM = events.TaskOOM +type TaskExit = events.TaskExit +type TaskDelete = events.TaskDelete +type TaskExecAdded = events.TaskExecAdded +type TaskExecStarted = events.TaskExecStarted diff --git a/pkg/shim/v1/shim/platform.go b/pkg/shim/v1/shim/platform.go index f6795563c..f590f80ef 100644 --- a/pkg/shim/v1/shim/platform.go +++ b/pkg/shim/v1/shim/platform.go @@ -30,7 +30,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, fmt.Errorf("uninitialized epoller") } @@ -45,9 +45,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(epollConsole, in, *p) @@ -63,9 +61,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(outw, epollConsole, *p) diff --git a/pkg/shim/v1/shim/service.go b/pkg/shim/v1/shim/service.go index 7b0280ed2..84a810cb2 100644 --- a/pkg/shim/v1/shim/service.go +++ b/pkg/shim/v1/shim/service.go @@ -23,20 +23,20 @@ import ( "sync" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/shim" - shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + shim "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,7 +46,7 @@ import ( ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -73,7 +73,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { s := &Service{ config: config, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, } @@ -91,9 +91,9 @@ type Service struct { config Config context context.Context - processes map[string]rproc.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform ec chan proc.Exit // Filled by Create() @@ -102,7 +102,7 @@ type Service struct { } // Create creates a new initial process and container with the underlying OCI runtime. -func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) { +func (s *Service) Create(ctx context.Context, r *shim.CreateTaskRequest) (_ *shim.CreateTaskResponse, err error) { s.mu.Lock() defer s.mu.Unlock() @@ -168,13 +168,13 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ * s.bundle = r.Bundle pid := process.Pid() s.processes[r.ID] = process - return &shimapi.CreateTaskResponse{ + return &shim.CreateTaskResponse{ Pid: uint32(pid), }, nil } // Start starts a process. -func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) { +func (s *Service) Start(ctx context.Context, r *shim.StartRequest) (*shim.StartResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -182,14 +182,14 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. if err := p.Start(ctx); err != nil { return nil, err } - return &shimapi.StartResponse{ + return &shim.StartResponse{ ID: p.ID(), Pid: uint32(p.Pid()), }, nil } // Delete deletes the initial process and container. -func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) { +func (s *Service) Delete(ctx context.Context, r *types.Empty) (*shim.DeleteResponse, error) { p, err := s.getInitProcess() if err != nil { return nil, err @@ -201,7 +201,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR delete(s.processes, s.id) s.mu.Unlock() s.platform.Close() - return &shimapi.DeleteResponse{ + return &shim.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), Pid: uint32(p.Pid()), @@ -209,7 +209,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR } // DeleteProcess deletes an exec'd process. -func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { +func (s *Service) DeleteProcess(ctx context.Context, r *shim.DeleteProcessRequest) (*shim.DeleteResponse, error) { if r.ID == s.id { return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") } @@ -223,7 +223,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq s.mu.Lock() delete(s.processes, r.ID) s.mu.Unlock() - return &shimapi.DeleteResponse{ + return &shim.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), Pid: uint32(p.Pid()), @@ -231,7 +231,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq } // Exec spawns an additional process inside the container. -func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *Service) Exec(ctx context.Context, r *shim.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() if p := s.processes[r.ID]; p != nil { @@ -263,7 +263,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty } // ResizePty resises the terminal of a process. -func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *Service) ResizePty(ctx context.Context, r *shim.ResizePtyRequest) (*types.Empty, error) { if r.ID == "" { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided") } @@ -282,7 +282,7 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (* } // State returns runtime state information for a process. -func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) { +func (s *Service) State(ctx context.Context, r *shim.StateRequest) (*shim.StateResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -301,7 +301,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. status = task.StatusStopped } sio := p.Stdio() - return &shimapi.StateResponse{ + return &shim.StateResponse{ ID: p.ID(), Bundle: s.bundle, Pid: uint32(p.Pid()), @@ -316,17 +316,17 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. } // Pause pauses the container. -func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) { +func (s *Service) Pause(ctx context.Context, r *types.Empty) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume resumes the container. -func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) { +func (s *Service) Resume(ctx context.Context, r *types.Empty) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill kills a process with the provided signal. -func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) { +func (s *Service) Kill(ctx context.Context, r *shim.KillRequest) (*types.Empty, error) { if r.ID == "" { p, err := s.getInitProcess() if err != nil { @@ -349,7 +349,7 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Emp } // ListPids returns all pids inside the container. -func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) { +func (s *Service) ListPids(ctx context.Context, r *shim.ListPidsRequest) (*shim.ListPidsResponse, error) { pids, err := s.getContainerPids(ctx, r.ID) if err != nil { return nil, errdefs.ToGRPC(err) @@ -374,13 +374,13 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh } processes = append(processes, &pInfo) } - return &shimapi.ListPidsResponse{ + return &shim.ListPidsResponse{ Processes: processes, }, nil } // CloseIO closes the I/O context of a process. -func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) { +func (s *Service) CloseIO(ctx context.Context, r *shim.CloseIORequest) (*types.Empty, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -394,31 +394,31 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *Service) Checkpoint(ctx context.Context, r *shim.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // ShimInfo returns shim information such as the shim's pid. -func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) { - return &shimapi.ShimInfoResponse{ +func (s *Service) ShimInfo(ctx context.Context, r *types.Empty) (*shim.ShimInfoResponse, error) { + return &shim.ShimInfoResponse{ ShimPid: uint32(os.Getpid()), }, nil } // Update updates a running container. -func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *Service) Update(ctx context.Context, r *shim.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Wait waits for a process to exit. -func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) { +func (s *Service) Wait(ctx context.Context, r *shim.WaitRequest) (*shim.WaitResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err } p.Wait() - return &shimapi.WaitResponse{ + return &shim.WaitResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), }, nil @@ -430,11 +430,11 @@ func (s *Service) processExits() { } } -func (s *Service) allProcesses() []rproc.Process { +func (s *Service) allProcesses() []process.Process { s.mu.Lock() defer s.mu.Unlock() - res := make([]rproc.Process, 0, len(s.processes)) + res := make([]process.Process, 0, len(s.processes)) for _, p := range s.processes { res = append(res, p) } @@ -452,7 +452,7 @@ func (s *Service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -490,7 +490,7 @@ func (s *Service) forward(publisher events.Publisher) { } // getInitProcess returns the init process. -func (s *Service) getInitProcess() (rproc.Process, error) { +func (s *Service) getInitProcess() (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() p := s.processes[s.id] @@ -501,7 +501,7 @@ func (s *Service) getInitProcess() (rproc.Process, error) { } // getExecProcess returns the given exec process. -func (s *Service) getExecProcess(id string) (rproc.Process, error) { +func (s *Service) getExecProcess(id string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() p := s.processes[id] @@ -513,19 +513,19 @@ func (s *Service) getExecProcess(id string) (rproc.Process, error) { func getTopic(ctx context.Context, e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -533,7 +533,7 @@ func getTopic(ctx context.Context, e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform stdio.Platform, r *proc.CreateConfig) (*proc.Init, error) { var options runctypes.CreateOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -554,7 +554,7 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, runsc.FormatLogPath(r.ID, config) rootfs := filepath.Join(path, "rootfs") runtime := proc.NewRunsc(runtimeRoot, path, namespace, r.Runtime, config) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -568,6 +568,6 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, p.IoGID = int(options.IoGid) p.Sandbox = utils.IsSandbox(spec) p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } diff --git a/pkg/shim/v1/utils/BUILD b/pkg/shim/v1/utils/BUILD index 9045781e1..54a0aabb7 100644 --- a/pkg/shim/v1/utils/BUILD +++ b/pkg/shim/v1/utils/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "utils", srcs = [ + "annotations.go", "utils.go", "volumes.go", ], @@ -13,8 +14,7 @@ go_library( "//shim:__subpackages__", ], deps = [ - "@com_github_containerd_cri//pkg/annotations:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) @@ -23,4 +23,5 @@ go_test( size = "small", srcs = ["volumes_test.go"], library = ":utils", + deps = ["@com_github_opencontainers_runtime_spec//specs-go:go_default_library"], ) diff --git a/pkg/shim/v1/utils/annotations.go b/pkg/shim/v1/utils/annotations.go new file mode 100644 index 000000000..1e9d3f365 --- /dev/null +++ b/pkg/shim/v1/utils/annotations.go @@ -0,0 +1,25 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +// Annotations from the CRI annotations package. +// +// These are vendor due to import conflicts. +const ( + sandboxLogDirAnnotation = "io.kubernetes.cri.sandbox-log-directory" + containerTypeAnnotation = "io.kubernetes.cri.container-type" + containerTypeSandbox = "sandbox" + containerTypeContainer = "container" +) diff --git a/pkg/shim/v1/utils/utils.go b/pkg/shim/v1/utils/utils.go index 7a400af1c..07e346654 100644 --- a/pkg/shim/v1/utils/utils.go +++ b/pkg/shim/v1/utils/utils.go @@ -20,7 +20,6 @@ import ( "os" "path/filepath" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -43,13 +42,13 @@ func ReadSpec(bundle string) (*specs.Spec, error) { // IsSandbox checks whether a container is a sandbox container. func IsSandbox(spec *specs.Spec) bool { - t, ok := spec.Annotations[annotations.ContainerType] - return !ok || t == annotations.ContainerTypeSandbox + t, ok := spec.Annotations[containerTypeAnnotation] + return !ok || t == containerTypeSandbox } // UserLogPath gets user log path from OCI annotation. func UserLogPath(spec *specs.Spec) string { - sandboxLogDir := spec.Annotations[annotations.SandboxLogDir] + sandboxLogDir := spec.Annotations[sandboxLogDirAnnotation] if sandboxLogDir == "" { return "" } diff --git a/pkg/shim/v1/utils/volumes.go b/pkg/shim/v1/utils/volumes.go index e4e9bf9b1..52a428179 100644 --- a/pkg/shim/v1/utils/volumes.go +++ b/pkg/shim/v1/utils/volumes.go @@ -21,7 +21,6 @@ import ( "path/filepath" "strings" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -44,7 +43,7 @@ func volumeFieldName(k string) string { // podUID gets pod UID from the pod log path. func podUID(s *specs.Spec) (string, error) { - sandboxLogDir := s.Annotations[annotations.SandboxLogDir] + sandboxLogDir := s.Annotations[sandboxLogDirAnnotation] if sandboxLogDir == "" { return "", fmt.Errorf("no sandbox log path annotation") } @@ -101,7 +100,6 @@ func UpdateVolumeAnnotations(bundle string, s *specs.Spec) error { if err != nil { // Skip if we can't get pod UID, because this doesn't work // for containerd 1.1. - fmt.Errorf("Can't get pod uid: %w", err) return nil } } diff --git a/pkg/shim/v1/utils/volumes_test.go b/pkg/shim/v1/utils/volumes_test.go index 4b2639545..3e02c6151 100644 --- a/pkg/shim/v1/utils/volumes_test.go +++ b/pkg/shim/v1/utils/volumes_test.go @@ -23,7 +23,6 @@ import ( "reflect" "testing" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -58,8 +57,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "volume annotations for sandbox", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -67,8 +66,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -81,8 +80,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "volume annotations for sandbox with legacy log path", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLegacyLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -90,8 +89,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLegacyLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -118,7 +117,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -140,7 +139,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -160,7 +159,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -176,7 +175,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -188,7 +187,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "should not return error without pod log directory", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeSandbox, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -196,7 +195,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeSandbox, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -207,8 +206,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "should return error if volume path does not exist", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount.notexist.share": "pod", "dev.gvisor.spec.mount.notexist.type": "tmpfs", "dev.gvisor.spec.mount.notexist.options": "ro", @@ -220,14 +219,14 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "no volume annotations for sandbox", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, }, }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, }, }, }, @@ -249,7 +248,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, }, }, expected: &specs.Spec{ @@ -268,7 +267,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, }, }, }, diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD index 450f62979..7e0a114a0 100644 --- a/pkg/shim/v2/BUILD +++ b/pkg/shim/v2/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "v2", srcs = [ + "api.go", "epoll.go", "service.go", "service_linux.go", @@ -15,10 +16,10 @@ go_library( "//pkg/shim/v1/proc", "//pkg/shim/v1/utils", "//pkg/shim/v2/options", + "//pkg/shim/v2/runtimeoptions", "//runsc/specutils", "@com_github_burntsushi_toml//:go_default_library", "@com_github_containerd_cgroups//:go_default_library", - "@com_github_containerd_cgroups//stats/v1:go_default_library", "@com_github_containerd_console//:go_default_library", "@com_github_containerd_containerd//api/events:go_default_library", "@com_github_containerd_containerd//api/types/task:go_default_library", @@ -27,12 +28,13 @@ go_library( "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_containerd//runtime:go_default_library", "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", "@com_github_containerd_containerd//runtime/v2/shim:go_default_library", "@com_github_containerd_containerd//runtime/v2/task:go_default_library", - "@com_github_containerd_cri//pkg/api/runtimeoptions/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_typeurl//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go new file mode 100644 index 000000000..dbe5c59f6 --- /dev/null +++ b/pkg/shim/v2/api.go @@ -0,0 +1,22 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskOOM = events.TaskOOM diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go index 45cc38c2a..41232cca8 100644 --- a/pkg/shim/v2/epoll.go +++ b/pkg/shim/v2/epoll.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/containerd/cgroups" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/events" "github.com/containerd/containerd/runtime" "golang.org/x/sys/unix" @@ -68,10 +67,11 @@ func (e *epoller) run(ctx context.Context) { default: n, err := unix.EpollWait(e.fd, events[:], -1) if err != nil { - if err == unix.EINTR { + if err == unix.EINTR || err == unix.EAGAIN { continue } - fmt.Errorf("cgroups: epoll wait: %w", err) + // Should not happen. + panic(fmt.Errorf("cgroups: epoll wait: %w", err)) } for i := 0; i < n; i++ { e.process(ctx, uintptr(events[i].Fd)) @@ -114,10 +114,11 @@ func (e *epoller) process(ctx context.Context, fd uintptr) { unix.Close(int(fd)) return } - if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{ + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{ ContainerID: i.id, }); err != nil { - fmt.Errorf("publish OOM event: %w", err) + // Should not happen. + panic(fmt.Errorf("publish OOM event: %w", err)) } } diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD new file mode 100644 index 000000000..01716034c --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/BUILD @@ -0,0 +1,20 @@ +load("//tools:defs.bzl", "go_library", "proto_library") + +package(licenses = ["notice"]) + +proto_library( + name = "api", + srcs = [ + "runtimeoptions.proto", + ], +) + +go_library( + name = "runtimeoptions", + srcs = ["runtimeoptions.go"], + visibility = ["//pkg/shim/v2:__pkg__"], + deps = [ + "//pkg/shim/v2/runtimeoptions:api_go_proto", + "@com_github_gogo_protobuf//proto:go_default_library", + ], +) diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go new file mode 100644 index 000000000..1c1a0c5d1 --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go @@ -0,0 +1,27 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtimeoptions + +import ( + proto "github.com/gogo/protobuf/proto" + pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto" +) + +type Options = pb.Options + +func init() { + proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options") +} diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto new file mode 100644 index 000000000..edb19020a --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto @@ -0,0 +1,25 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package runtimeoptions; + +// This is a version of the runtimeoptions CRI API that is vendored. +// +// Imported the full CRI package is a nightmare. +message Options { + string type_url = 1; + string config_path = 2; +} diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index c67b1beba..1534152fc 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -27,34 +27,34 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" - metrics "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" - runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/shim/runsc" "gvisor.dev/gvisor/pkg/shim/v1/proc" "gvisor.dev/gvisor/pkg/shim/v1/utils" "gvisor.dev/gvisor/pkg/shim/v2/options" + "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" "gvisor.dev/gvisor/runsc/specutils" ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{}) const configFile = "config.toml" // New returns a new shim service that can be used via GRPC. -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { ep, err := newOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, oomPoller: ep, cancel: cancel, } go s.processExits() - runsc.Monitor = shim.Default + runsc.Monitor = reaper.Default if err := s.initPlatform(); err != nil { cancel() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) @@ -101,10 +100,10 @@ type service struct { mu sync.Mutex context context.Context - task rproc.Process - processes map[string]rproc.Process + task process.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform opts options.Options ec chan proc.Exit oomPoller *epoller @@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) return cmd, nil } -func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { cmd, err := newCommand(ctx, containerdBinary, containerdAddress) if err != nil { return "", err @@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * break } if o.TypeUrl != options.OptionType { - return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl) + return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) } path = o.ConfigPath default: @@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } // Exec spawns an additional process inside the container. -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() p := s.processes[r.ExecID] s.mu.Unlock() @@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } // ResizePty resizes the terminal of a process. -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. } // Pause the container. -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume the container. -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill a process with the provided signal. -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi } // CloseIO closes the I/O context of a process. -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task }, nil } -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { s.cancel() os.Exit(0) return empty, nil @@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // gvisor currently (as of 2020-03-03) only returns the total memory // usage and current PID value[0]. However, we copy the common fields here // so that future updates will propagate correct information. We're - // using the metrics.Metrics structure so we're returning the same type + // using the cgroups.Metrics structure so we're returning the same type // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - data, err := typeurl.MarshalAny(&metrics.Metrics{ - CPU: &metrics.CPUStat{ - Usage: &metrics.CPUUsage{ + data, err := typeurl.MarshalAny(&cgroups.Metrics{ + CPU: &cgroups.CPUStat{ + Usage: &cgroups.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &metrics.Throttle{ + Throttling: &cgroups.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &metrics.MemoryStat{ + Memory: &cgroups.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &metrics.MemoryEntry{ + Usage: &cgroups.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &metrics.MemoryEntry{ + Swap: &cgroups.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &metrics.MemoryEntry{ + Kernel: &cgroups.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &metrics.MemoryEntry{ + KernelTCP: &cgroups.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &metrics.PidsStat{ + Pids: &cgroups.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, @@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. } // Update updates a running container. -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &events.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } -func (s *service) allProcesses() (o []rproc.Process) { +func (s *service) allProcesses() (o []process.Process) { s.mu.Lock() defer s.mu.Unlock() for _, p := range s.processes { @@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(publisher shim.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) cancel() if err != nil { - fmt.Errorf("post event: %w", err) + // Should not happen. + panic(fmt.Errorf("post event: %w", err)) } } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getProcess(execID string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() if execID == "" { @@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { func getTopic(e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *events.TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *events.TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *events.TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *events.TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *events.TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *events.TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *events.TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -795,7 +795,7 @@ func getTopic(e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { spec, err := utils.ReadSpec(r.Bundle) if err != nil { return nil, fmt.Errorf("read oci spec: %w", err) @@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro } runsc.FormatLogPath(r.ID, options.RunscConfig) runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro p.IoGID = int(options.IoGid) p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go index 257c58812..1800ab90b 100644 --- a/pkg/shim/v2/service_linux.go +++ b/pkg/shim/v2/service_linux.go @@ -32,7 +32,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, fmt.Errorf("uninitialized epoller") } @@ -47,9 +47,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(epollConsole, in, *p) @@ -65,9 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(outw, epollConsole, *p) |