summaryrefslogtreecommitdiffhomepage
path: root/pkg/shim
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/shim')
-rw-r--r--pkg/shim/runsc/BUILD2
-rw-r--r--pkg/shim/runsc/runsc.go2
-rw-r--r--pkg/shim/runsc/utils.go2
-rw-r--r--pkg/shim/v1/proc/BUILD5
-rw-r--r--pkg/shim/v1/proc/deleted_state.go12
-rw-r--r--pkg/shim/v1/proc/exec.go12
-rw-r--r--pkg/shim/v1/proc/init.go26
-rw-r--r--pkg/shim/v1/proc/init_state.go18
-rw-r--r--pkg/shim/v1/proc/io.go8
-rw-r--r--pkg/shim/v1/proc/types.go7
-rw-r--r--pkg/shim/v1/shim/BUILD6
-rw-r--r--pkg/shim/v1/shim/api.go28
-rw-r--r--pkg/shim/v1/shim/platform.go6
-rw-r--r--pkg/shim/v1/shim/service.go96
-rw-r--r--pkg/shim/v1/utils/BUILD5
-rw-r--r--pkg/shim/v1/utils/annotations.go25
-rw-r--r--pkg/shim/v1/utils/utils.go7
-rw-r--r--pkg/shim/v1/utils/volumes.go4
-rw-r--r--pkg/shim/v1/utils/volumes_test.go45
-rw-r--r--pkg/shim/v2/BUILD8
-rw-r--r--pkg/shim/v2/api.go22
-rw-r--r--pkg/shim/v2/epoll.go11
-rw-r--r--pkg/shim/v2/runtimeoptions/BUILD20
-rw-r--r--pkg/shim/v2/runtimeoptions/runtimeoptions.go27
-rw-r--r--pkg/shim/v2/runtimeoptions/runtimeoptions.proto25
-rw-r--r--pkg/shim/v2/service.go102
-rw-r--r--pkg/shim/v2/service_linux.go6
27 files changed, 334 insertions, 203 deletions
diff --git a/pkg/shim/runsc/BUILD b/pkg/shim/runsc/BUILD
index 4d388ca05..f08599ebd 100644
--- a/pkg/shim/runsc/BUILD
+++ b/pkg/shim/runsc/BUILD
@@ -11,6 +11,6 @@ go_library(
visibility = ["//:sandbox"],
deps = [
"@com_github_containerd_go_runc//:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
],
)
diff --git a/pkg/shim/runsc/runsc.go b/pkg/shim/runsc/runsc.go
index 50807d0c5..c5cf68efa 100644
--- a/pkg/shim/runsc/runsc.go
+++ b/pkg/shim/runsc/runsc.go
@@ -1,5 +1,5 @@
// Copyright 2018 The containerd Authors.
-// Copyright 2018 The gVisor authors.
+// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/shim/runsc/utils.go b/pkg/shim/runsc/utils.go
index 2732d5e98..c514b3bc7 100644
--- a/pkg/shim/runsc/utils.go
+++ b/pkg/shim/runsc/utils.go
@@ -1,5 +1,5 @@
// Copyright 2018 The containerd Authors.
-// Copyright 2018 The gVisor authors.
+// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/shim/v1/proc/BUILD b/pkg/shim/v1/proc/BUILD
index a59bf198d..4377306af 100644
--- a/pkg/shim/v1/proc/BUILD
+++ b/pkg/shim/v1/proc/BUILD
@@ -25,11 +25,12 @@ go_library(
"@com_github_containerd_containerd//errdefs:go_default_library",
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_go_runc//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
],
)
diff --git a/pkg/shim/v1/proc/deleted_state.go b/pkg/shim/v1/proc/deleted_state.go
index 52aad40ac..d9b970c4d 100644
--- a/pkg/shim/v1/proc/deleted_state.go
+++ b/pkg/shim/v1/proc/deleted_state.go
@@ -21,29 +21,29 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
)
type deletedState struct{}
func (*deletedState) Resize(ws console.WinSize) error {
- return fmt.Errorf("cannot resize a deleted process")
+ return fmt.Errorf("cannot resize a deleted process.ss")
}
func (*deletedState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a deleted process")
+ return fmt.Errorf("cannot start a deleted process.ss")
}
func (*deletedState) Delete(ctx context.Context) error {
- return fmt.Errorf("cannot delete a deleted process: %w", errdefs.ErrNotFound)
+ return fmt.Errorf("cannot delete a deleted process.ss: %w", errdefs.ErrNotFound)
}
func (*deletedState) Kill(ctx context.Context, sig uint32, all bool) error {
- return fmt.Errorf("cannot kill a deleted process: %w", errdefs.ErrNotFound)
+ return fmt.Errorf("cannot kill a deleted process.ss: %w", errdefs.ErrNotFound)
}
func (*deletedState) SetExited(status int) {}
-func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return nil, fmt.Errorf("cannot exec in a deleted state")
}
diff --git a/pkg/shim/v1/proc/exec.go b/pkg/shim/v1/proc/exec.go
index 4ef9cf6cf..1d1d90488 100644
--- a/pkg/shim/v1/proc/exec.go
+++ b/pkg/shim/v1/proc/exec.go
@@ -27,7 +27,7 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
@@ -51,7 +51,7 @@ type execProcess struct {
internalPid int
closers []io.Closer
stdin io.Closer
- stdio proc.Stdio
+ stdio stdio.Stdio
path string
spec specs.Process
@@ -164,7 +164,7 @@ func (e *execProcess) Stdin() io.Closer {
return e.stdin
}
-func (e *execProcess) Stdio() proc.Stdio {
+func (e *execProcess) Stdio() stdio.Stdio {
return e.stdio
}
@@ -223,7 +223,6 @@ func (e *execProcess) start(ctx context.Context) (err error) {
e.closers = append(e.closers, sc)
e.stdin = sc
}
- var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@@ -231,15 +230,14 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("failed to retrieve console master: %w", err)
}
- if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
+ if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return fmt.Errorf("failed to start console copy: %w", err)
}
} else if !e.stdio.IsNull() {
- if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
+ if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return fmt.Errorf("failed to start io pipe copy: %w", err)
}
}
- copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return fmt.Errorf("failed to retrieve OCI runtime exec pid: %w", err)
diff --git a/pkg/shim/v1/proc/init.go b/pkg/shim/v1/proc/init.go
index b429cb94f..dab3123d6 100644
--- a/pkg/shim/v1/proc/init.go
+++ b/pkg/shim/v1/proc/init.go
@@ -30,7 +30,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
@@ -59,7 +60,7 @@ type Init struct {
id string
Bundle string
console console.Console
- Platform proc.Platform
+ Platform stdio.Platform
io runc.IO
runtime *runsc.Runsc
status int
@@ -67,7 +68,7 @@ type Init struct {
pid int
closers []io.Closer
stdin io.Closer
- stdio proc.Stdio
+ stdio stdio.Stdio
Rootfs string
IoUID int
IoGID int
@@ -92,7 +93,7 @@ func NewRunsc(root, path, namespace, runtime string, config map[string]string) *
}
// New returns a new init process.
-func New(id string, runtime *runsc.Runsc, stdio proc.Stdio) *Init {
+func New(id string, runtime *runsc.Runsc, stdio stdio.Stdio) *Init {
p := &Init{
id: id,
runtime: runtime,
@@ -144,7 +145,6 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) {
p.stdin = sc
p.closers = append(p.closers, sc)
}
- var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@@ -152,18 +152,16 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) {
if err != nil {
return fmt.Errorf("failed to retrieve console master: %w", err)
}
- console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
+ console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg)
if err != nil {
return fmt.Errorf("failed to start console copy: %w", err)
}
p.console = console
} else if !hasNoIO(r) {
- if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
+ if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg); err != nil {
return fmt.Errorf("failed to start io pipe copy: %w", err)
}
}
-
- copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err)
@@ -391,7 +389,7 @@ func (p *Init) Runtime() *runsc.Runsc {
}
// Exec returns a new child process.
-func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -399,7 +397,7 @@ func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
}
// exec returns a new exec'd process.
-func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
@@ -412,7 +410,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
path: path,
parent: p,
spec: spec,
- stdio: proc.Stdio{
+ stdio: stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -425,7 +423,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
}
// Stdio returns the stdio of the process.
-func (p *Init) Stdio() proc.Stdio {
+func (p *Init) Stdio() stdio.Stdio {
return p.stdio
}
@@ -453,7 +451,7 @@ func (p *Init) convertStatus(status string) string {
return status
}
-func withConditionalIO(c proc.Stdio) runc.IOOpt {
+func withConditionalIO(c stdio.Stdio) runc.IOOpt {
return func(o *runc.IOOption) {
o.OpenStdin = c.Stdin != ""
o.OpenStdout = c.Stdout != ""
diff --git a/pkg/shim/v1/proc/init_state.go b/pkg/shim/v1/proc/init_state.go
index 509f27762..9233ecc85 100644
--- a/pkg/shim/v1/proc/init_state.go
+++ b/pkg/shim/v1/proc/init_state.go
@@ -21,14 +21,14 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
)
type initState interface {
Resize(console.WinSize) error
Start(context.Context) error
Delete(context.Context) error
- Exec(context.Context, string, *ExecConfig) (proc.Process, error)
+ Exec(context.Context, string, *ExecConfig) (process.Process, error)
Kill(context.Context, uint32, bool) error
SetExited(int)
}
@@ -94,7 +94,7 @@ func (s *createdState) SetExited(status int) {
}
}
-func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return s.p.exec(ctx, path, r)
}
@@ -117,11 +117,11 @@ func (s *runningState) Resize(ws console.WinSize) error {
}
func (s *runningState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a running process")
+ return fmt.Errorf("cannot start a running process.ss")
}
func (s *runningState) Delete(ctx context.Context) error {
- return fmt.Errorf("cannot delete a running process")
+ return fmt.Errorf("cannot delete a running process.ss")
}
func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error {
@@ -136,7 +136,7 @@ func (s *runningState) SetExited(status int) {
}
}
-func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return s.p.exec(ctx, path, r)
}
@@ -159,7 +159,7 @@ func (s *stoppedState) Resize(ws console.WinSize) error {
}
func (s *stoppedState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a stopped process")
+ return fmt.Errorf("cannot start a stopped process.ss")
}
func (s *stoppedState) Delete(ctx context.Context) error {
@@ -170,13 +170,13 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
- return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
+ return errdefs.ToGRPCf(errdefs.ErrNotFound, "process.ss %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {
// no op
}
-func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return nil, fmt.Errorf("cannot exec in a stopped state")
}
diff --git a/pkg/shim/v1/proc/io.go b/pkg/shim/v1/proc/io.go
index 5313c7a50..34d825fb7 100644
--- a/pkg/shim/v1/proc/io.go
+++ b/pkg/shim/v1/proc/io.go
@@ -38,7 +38,7 @@ var bufPool = sync.Pool{
},
}
-func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
+func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error {
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
@@ -48,9 +48,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
@@ -67,9 +65,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
@@ -124,9 +120,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
if err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err)
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
diff --git a/pkg/shim/v1/proc/types.go b/pkg/shim/v1/proc/types.go
index 5c215de5f..2b0df4663 100644
--- a/pkg/shim/v1/proc/types.go
+++ b/pkg/shim/v1/proc/types.go
@@ -18,9 +18,8 @@ package proc
import (
"time"
- google_protobuf "github.com/gogo/protobuf/types"
-
runc "github.com/containerd/go-runc"
+ "github.com/gogo/protobuf/types"
)
// Mount holds filesystem mount configuration.
@@ -41,7 +40,7 @@ type CreateConfig struct {
Stdin string
Stdout string
Stderr string
- Options *google_protobuf.Any
+ Options *types.Any
}
// ExecConfig holds exec creation configuration.
@@ -51,7 +50,7 @@ type ExecConfig struct {
Stdin string
Stdout string
Stderr string
- Spec *google_protobuf.Any
+ Spec *types.Any
}
// Exit is the type of exit events.
diff --git a/pkg/shim/v1/shim/BUILD b/pkg/shim/v1/shim/BUILD
index 02cffbb01..05c595bc9 100644
--- a/pkg/shim/v1/shim/BUILD
+++ b/pkg/shim/v1/shim/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "shim",
srcs = [
+ "api.go",
"platform.go",
"service.go",
],
@@ -24,11 +25,12 @@ go_library(
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
"@com_github_containerd_containerd//namespaces:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_containerd//runtime:go_default_library",
"@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
- "@com_github_containerd_containerd//runtime/v1/shim:go_default_library",
"@com_github_containerd_containerd//runtime/v1/shim/v1:go_default_library",
+ "@com_github_containerd_containerd//sys/reaper:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_typeurl//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
diff --git a/pkg/shim/v1/shim/api.go b/pkg/shim/v1/shim/api.go
new file mode 100644
index 000000000..5dd8ff172
--- /dev/null
+++ b/pkg/shim/v1/shim/api.go
@@ -0,0 +1,28 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shim
+
+import (
+ "github.com/containerd/containerd/api/events"
+)
+
+type TaskCreate = events.TaskCreate
+type TaskStart = events.TaskStart
+type TaskOOM = events.TaskOOM
+type TaskExit = events.TaskExit
+type TaskDelete = events.TaskDelete
+type TaskExecAdded = events.TaskExecAdded
+type TaskExecStarted = events.TaskExecStarted
diff --git a/pkg/shim/v1/shim/platform.go b/pkg/shim/v1/shim/platform.go
index f6795563c..f590f80ef 100644
--- a/pkg/shim/v1/shim/platform.go
+++ b/pkg/shim/v1/shim/platform.go
@@ -30,7 +30,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
-func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
+func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, fmt.Errorf("uninitialized epoller")
}
@@ -45,9 +45,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil {
return nil, err
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
@@ -63,9 +61,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err
}
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
diff --git a/pkg/shim/v1/shim/service.go b/pkg/shim/v1/shim/service.go
index 7b0280ed2..84a810cb2 100644
--- a/pkg/shim/v1/shim/service.go
+++ b/pkg/shim/v1/shim/service.go
@@ -23,20 +23,20 @@ import (
"sync"
"github.com/containerd/console"
- eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
- rproc "github.com/containerd/containerd/runtime/proc"
- "github.com/containerd/containerd/runtime/v1/shim"
- shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
+ shim "github.com/containerd/containerd/runtime/v1/shim/v1"
+ "github.com/containerd/containerd/sys/reaper"
"github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
+ "github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,7 +46,7 @@ import (
)
var (
- empty = &ptypes.Empty{}
+ empty = &types.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
@@ -73,7 +73,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
s := &Service{
config: config,
context: ctx,
- processes: make(map[string]rproc.Process),
+ processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
}
@@ -91,9 +91,9 @@ type Service struct {
config Config
context context.Context
- processes map[string]rproc.Process
+ processes map[string]process.Process
events chan interface{}
- platform rproc.Platform
+ platform stdio.Platform
ec chan proc.Exit
// Filled by Create()
@@ -102,7 +102,7 @@ type Service struct {
}
// Create creates a new initial process and container with the underlying OCI runtime.
-func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
+func (s *Service) Create(ctx context.Context, r *shim.CreateTaskRequest) (_ *shim.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -168,13 +168,13 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
- return &shimapi.CreateTaskResponse{
+ return &shim.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
// Start starts a process.
-func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
+func (s *Service) Start(ctx context.Context, r *shim.StartRequest) (*shim.StartResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -182,14 +182,14 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
if err := p.Start(ctx); err != nil {
return nil, err
}
- return &shimapi.StartResponse{
+ return &shim.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
// Delete deletes the initial process and container.
-func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
+func (s *Service) Delete(ctx context.Context, r *types.Empty) (*shim.DeleteResponse, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
@@ -201,7 +201,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR
delete(s.processes, s.id)
s.mu.Unlock()
s.platform.Close()
- return &shimapi.DeleteResponse{
+ return &shim.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
@@ -209,7 +209,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR
}
// DeleteProcess deletes an exec'd process.
-func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
+func (s *Service) DeleteProcess(ctx context.Context, r *shim.DeleteProcessRequest) (*shim.DeleteResponse, error) {
if r.ID == s.id {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
@@ -223,7 +223,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
s.mu.Lock()
delete(s.processes, r.ID)
s.mu.Unlock()
- return &shimapi.DeleteResponse{
+ return &shim.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
@@ -231,7 +231,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
}
// Exec spawns an additional process inside the container.
-func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
+func (s *Service) Exec(ctx context.Context, r *shim.ExecProcessRequest) (*types.Empty, error) {
s.mu.Lock()
if p := s.processes[r.ID]; p != nil {
@@ -263,7 +263,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty
}
// ResizePty resises the terminal of a process.
-func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
+func (s *Service) ResizePty(ctx context.Context, r *shim.ResizePtyRequest) (*types.Empty, error) {
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
}
@@ -282,7 +282,7 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
}
// State returns runtime state information for a process.
-func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
+func (s *Service) State(ctx context.Context, r *shim.StateRequest) (*shim.StateResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -301,7 +301,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
status = task.StatusStopped
}
sio := p.Stdio()
- return &shimapi.StateResponse{
+ return &shim.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Pid: uint32(p.Pid()),
@@ -316,17 +316,17 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
}
// Pause pauses the container.
-func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
+func (s *Service) Pause(ctx context.Context, r *types.Empty) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Resume resumes the container.
-func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
+func (s *Service) Resume(ctx context.Context, r *types.Empty) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Kill kills a process with the provided signal.
-func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
+func (s *Service) Kill(ctx context.Context, r *shim.KillRequest) (*types.Empty, error) {
if r.ID == "" {
p, err := s.getInitProcess()
if err != nil {
@@ -349,7 +349,7 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Emp
}
// ListPids returns all pids inside the container.
-func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
+func (s *Service) ListPids(ctx context.Context, r *shim.ListPidsRequest) (*shim.ListPidsResponse, error) {
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
@@ -374,13 +374,13 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh
}
processes = append(processes, &pInfo)
}
- return &shimapi.ListPidsResponse{
+ return &shim.ListPidsResponse{
Processes: processes,
}, nil
}
// CloseIO closes the I/O context of a process.
-func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
+func (s *Service) CloseIO(ctx context.Context, r *shim.CloseIORequest) (*types.Empty, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -394,31 +394,31 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptyp
}
// Checkpoint checkpoints the container.
-func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
+func (s *Service) Checkpoint(ctx context.Context, r *shim.CheckpointTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// ShimInfo returns shim information such as the shim's pid.
-func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
- return &shimapi.ShimInfoResponse{
+func (s *Service) ShimInfo(ctx context.Context, r *types.Empty) (*shim.ShimInfoResponse, error) {
+ return &shim.ShimInfoResponse{
ShimPid: uint32(os.Getpid()),
}, nil
}
// Update updates a running container.
-func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
+func (s *Service) Update(ctx context.Context, r *shim.UpdateTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Wait waits for a process to exit.
-func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
+func (s *Service) Wait(ctx context.Context, r *shim.WaitRequest) (*shim.WaitResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
p.Wait()
- return &shimapi.WaitResponse{
+ return &shim.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
@@ -430,11 +430,11 @@ func (s *Service) processExits() {
}
}
-func (s *Service) allProcesses() []rproc.Process {
+func (s *Service) allProcesses() []process.Process {
s.mu.Lock()
defer s.mu.Unlock()
- res := make([]rproc.Process, 0, len(s.processes))
+ res := make([]process.Process, 0, len(s.processes))
for _, p := range s.processes {
res = append(res, p)
}
@@ -452,7 +452,7 @@ func (s *Service) checkProcesses(e proc.Exit) {
}
}
p.SetExited(e.Status)
- s.events <- &eventstypes.TaskExit{
+ s.events <- &TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(p.Pid()),
@@ -490,7 +490,7 @@ func (s *Service) forward(publisher events.Publisher) {
}
// getInitProcess returns the init process.
-func (s *Service) getInitProcess() (rproc.Process, error) {
+func (s *Service) getInitProcess() (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
@@ -501,7 +501,7 @@ func (s *Service) getInitProcess() (rproc.Process, error) {
}
// getExecProcess returns the given exec process.
-func (s *Service) getExecProcess(id string) (rproc.Process, error) {
+func (s *Service) getExecProcess(id string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[id]
@@ -513,19 +513,19 @@ func (s *Service) getExecProcess(id string) (rproc.Process, error) {
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
- case *eventstypes.TaskCreate:
+ case *TaskCreate:
return runtime.TaskCreateEventTopic
- case *eventstypes.TaskStart:
+ case *TaskStart:
return runtime.TaskStartEventTopic
- case *eventstypes.TaskOOM:
+ case *TaskOOM:
return runtime.TaskOOMEventTopic
- case *eventstypes.TaskExit:
+ case *TaskExit:
return runtime.TaskExitEventTopic
- case *eventstypes.TaskDelete:
+ case *TaskDelete:
return runtime.TaskDeleteEventTopic
- case *eventstypes.TaskExecAdded:
+ case *TaskExecAdded:
return runtime.TaskExecAddedEventTopic
- case *eventstypes.TaskExecStarted:
+ case *TaskExecStarted:
return runtime.TaskExecStartedEventTopic
default:
log.L.Printf("no topic for type %#v", e)
@@ -533,7 +533,7 @@ func getTopic(ctx context.Context, e interface{}) string {
return runtime.TaskUnknownTopic
}
-func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) {
+func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform stdio.Platform, r *proc.CreateConfig) (*proc.Init, error) {
var options runctypes.CreateOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
@@ -554,7 +554,7 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string,
runsc.FormatLogPath(r.ID, config)
rootfs := filepath.Join(path, "rootfs")
runtime := proc.NewRunsc(runtimeRoot, path, namespace, r.Runtime, config)
- p := proc.New(r.ID, runtime, rproc.Stdio{
+ p := proc.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -568,6 +568,6 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string,
p.IoGID = int(options.IoGid)
p.Sandbox = utils.IsSandbox(spec)
p.UserLog = utils.UserLogPath(spec)
- p.Monitor = shim.Default
+ p.Monitor = reaper.Default
return p, nil
}
diff --git a/pkg/shim/v1/utils/BUILD b/pkg/shim/v1/utils/BUILD
index 9045781e1..54a0aabb7 100644
--- a/pkg/shim/v1/utils/BUILD
+++ b/pkg/shim/v1/utils/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "utils",
srcs = [
+ "annotations.go",
"utils.go",
"volumes.go",
],
@@ -13,8 +14,7 @@ go_library(
"//shim:__subpackages__",
],
deps = [
- "@com_github_containerd_cri//pkg/annotations:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
],
)
@@ -23,4 +23,5 @@ go_test(
size = "small",
srcs = ["volumes_test.go"],
library = ":utils",
+ deps = ["@com_github_opencontainers_runtime_spec//specs-go:go_default_library"],
)
diff --git a/pkg/shim/v1/utils/annotations.go b/pkg/shim/v1/utils/annotations.go
new file mode 100644
index 000000000..1e9d3f365
--- /dev/null
+++ b/pkg/shim/v1/utils/annotations.go
@@ -0,0 +1,25 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+// Annotations from the CRI annotations package.
+//
+// These are vendor due to import conflicts.
+const (
+ sandboxLogDirAnnotation = "io.kubernetes.cri.sandbox-log-directory"
+ containerTypeAnnotation = "io.kubernetes.cri.container-type"
+ containerTypeSandbox = "sandbox"
+ containerTypeContainer = "container"
+)
diff --git a/pkg/shim/v1/utils/utils.go b/pkg/shim/v1/utils/utils.go
index 7a400af1c..07e346654 100644
--- a/pkg/shim/v1/utils/utils.go
+++ b/pkg/shim/v1/utils/utils.go
@@ -20,7 +20,6 @@ import (
"os"
"path/filepath"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -43,13 +42,13 @@ func ReadSpec(bundle string) (*specs.Spec, error) {
// IsSandbox checks whether a container is a sandbox container.
func IsSandbox(spec *specs.Spec) bool {
- t, ok := spec.Annotations[annotations.ContainerType]
- return !ok || t == annotations.ContainerTypeSandbox
+ t, ok := spec.Annotations[containerTypeAnnotation]
+ return !ok || t == containerTypeSandbox
}
// UserLogPath gets user log path from OCI annotation.
func UserLogPath(spec *specs.Spec) string {
- sandboxLogDir := spec.Annotations[annotations.SandboxLogDir]
+ sandboxLogDir := spec.Annotations[sandboxLogDirAnnotation]
if sandboxLogDir == "" {
return ""
}
diff --git a/pkg/shim/v1/utils/volumes.go b/pkg/shim/v1/utils/volumes.go
index e4e9bf9b1..52a428179 100644
--- a/pkg/shim/v1/utils/volumes.go
+++ b/pkg/shim/v1/utils/volumes.go
@@ -21,7 +21,6 @@ import (
"path/filepath"
"strings"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -44,7 +43,7 @@ func volumeFieldName(k string) string {
// podUID gets pod UID from the pod log path.
func podUID(s *specs.Spec) (string, error) {
- sandboxLogDir := s.Annotations[annotations.SandboxLogDir]
+ sandboxLogDir := s.Annotations[sandboxLogDirAnnotation]
if sandboxLogDir == "" {
return "", fmt.Errorf("no sandbox log path annotation")
}
@@ -101,7 +100,6 @@ func UpdateVolumeAnnotations(bundle string, s *specs.Spec) error {
if err != nil {
// Skip if we can't get pod UID, because this doesn't work
// for containerd 1.1.
- fmt.Errorf("Can't get pod uid: %w", err)
return nil
}
}
diff --git a/pkg/shim/v1/utils/volumes_test.go b/pkg/shim/v1/utils/volumes_test.go
index 4b2639545..3e02c6151 100644
--- a/pkg/shim/v1/utils/volumes_test.go
+++ b/pkg/shim/v1/utils/volumes_test.go
@@ -23,7 +23,6 @@ import (
"reflect"
"testing"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -58,8 +57,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "volume annotations for sandbox",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -67,8 +66,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -81,8 +80,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "volume annotations for sandbox with legacy log path",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLegacyLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLegacyLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -90,8 +89,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLegacyLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLegacyLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -118,7 +117,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -140,7 +139,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -160,7 +159,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "container",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "bind",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -176,7 +175,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "container",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "bind",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -188,7 +187,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "should not return error without pod log directory",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -196,7 +195,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -207,8 +206,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "should return error if volume path does not exist",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount.notexist.share": "pod",
"dev.gvisor.spec.mount.notexist.type": "tmpfs",
"dev.gvisor.spec.mount.notexist.options": "ro",
@@ -220,14 +219,14 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "no volume annotations for sandbox",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
},
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
},
},
},
@@ -249,7 +248,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
},
},
expected: &specs.Spec{
@@ -268,7 +267,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
},
},
},
diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD
index 450f62979..7e0a114a0 100644
--- a/pkg/shim/v2/BUILD
+++ b/pkg/shim/v2/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "v2",
srcs = [
+ "api.go",
"epoll.go",
"service.go",
"service_linux.go",
@@ -15,10 +16,10 @@ go_library(
"//pkg/shim/v1/proc",
"//pkg/shim/v1/utils",
"//pkg/shim/v2/options",
+ "//pkg/shim/v2/runtimeoptions",
"//runsc/specutils",
"@com_github_burntsushi_toml//:go_default_library",
"@com_github_containerd_cgroups//:go_default_library",
- "@com_github_containerd_cgroups//stats/v1:go_default_library",
"@com_github_containerd_console//:go_default_library",
"@com_github_containerd_containerd//api/events:go_default_library",
"@com_github_containerd_containerd//api/types/task:go_default_library",
@@ -27,12 +28,13 @@ go_library(
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
"@com_github_containerd_containerd//namespaces:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_containerd//runtime:go_default_library",
"@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
"@com_github_containerd_containerd//runtime/v2/shim:go_default_library",
"@com_github_containerd_containerd//runtime/v2/task:go_default_library",
- "@com_github_containerd_cri//pkg/api/runtimeoptions/v1:go_default_library",
+ "@com_github_containerd_containerd//sys/reaper:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_typeurl//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go
new file mode 100644
index 000000000..dbe5c59f6
--- /dev/null
+++ b/pkg/shim/v2/api.go
@@ -0,0 +1,22 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v2
+
+import (
+ "github.com/containerd/containerd/api/events"
+)
+
+type TaskOOM = events.TaskOOM
diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go
index 45cc38c2a..41232cca8 100644
--- a/pkg/shim/v2/epoll.go
+++ b/pkg/shim/v2/epoll.go
@@ -23,7 +23,6 @@ import (
"sync"
"github.com/containerd/cgroups"
- eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime"
"golang.org/x/sys/unix"
@@ -68,10 +67,11 @@ func (e *epoller) run(ctx context.Context) {
default:
n, err := unix.EpollWait(e.fd, events[:], -1)
if err != nil {
- if err == unix.EINTR {
+ if err == unix.EINTR || err == unix.EAGAIN {
continue
}
- fmt.Errorf("cgroups: epoll wait: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("cgroups: epoll wait: %w", err))
}
for i := 0; i < n; i++ {
e.process(ctx, uintptr(events[i].Fd))
@@ -114,10 +114,11 @@ func (e *epoller) process(ctx context.Context, fd uintptr) {
unix.Close(int(fd))
return
}
- if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
+ if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{
ContainerID: i.id,
}); err != nil {
- fmt.Errorf("publish OOM event: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("publish OOM event: %w", err))
}
}
diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD
new file mode 100644
index 000000000..01716034c
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/BUILD
@@ -0,0 +1,20 @@
+load("//tools:defs.bzl", "go_library", "proto_library")
+
+package(licenses = ["notice"])
+
+proto_library(
+ name = "api",
+ srcs = [
+ "runtimeoptions.proto",
+ ],
+)
+
+go_library(
+ name = "runtimeoptions",
+ srcs = ["runtimeoptions.go"],
+ visibility = ["//pkg/shim/v2:__pkg__"],
+ deps = [
+ "//pkg/shim/v2/runtimeoptions:api_go_proto",
+ "@com_github_gogo_protobuf//proto:go_default_library",
+ ],
+)
diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go
new file mode 100644
index 000000000..1c1a0c5d1
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go
@@ -0,0 +1,27 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package runtimeoptions
+
+import (
+ proto "github.com/gogo/protobuf/proto"
+ pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto"
+)
+
+type Options = pb.Options
+
+func init() {
+ proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options")
+}
diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto
new file mode 100644
index 000000000..edb19020a
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto
@@ -0,0 +1,25 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package runtimeoptions;
+
+// This is a version of the runtimeoptions CRI API that is vendored.
+//
+// Imported the full CRI package is a nightmare.
+message Options {
+ string type_url = 1;
+ string config_path = 2;
+}
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go
index c67b1beba..1534152fc 100644
--- a/pkg/shim/v2/service.go
+++ b/pkg/shim/v2/service.go
@@ -27,34 +27,34 @@ import (
"github.com/BurntSushi/toml"
"github.com/containerd/cgroups"
- metrics "github.com/containerd/cgroups/stats/v1"
"github.com/containerd/console"
- eventstypes "github.com/containerd/containerd/api/events"
+ "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
- rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
- runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1"
+ "github.com/containerd/containerd/sys/reaper"
"github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
+ "github.com/gogo/protobuf/types"
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/shim/runsc"
"gvisor.dev/gvisor/pkg/shim/v1/proc"
"gvisor.dev/gvisor/pkg/shim/v1/utils"
"gvisor.dev/gvisor/pkg/shim/v2/options"
+ "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions"
"gvisor.dev/gvisor/runsc/specutils"
)
var (
- empty = &ptypes.Empty{}
+ empty = &types.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
@@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{})
const configFile = "config.toml"
// New returns a new shim service that can be used via GRPC.
-func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
+func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) {
ep, err := newOOMEpoller(publisher)
if err != nil {
return nil, err
}
- ctx, cancel := context.WithCancel(ctx)
go ep.run(ctx)
s := &service{
id: id,
context: ctx,
- processes: make(map[string]rproc.Process),
+ processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
oomPoller: ep,
cancel: cancel,
}
go s.processExits()
- runsc.Monitor = shim.Default
+ runsc.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
@@ -101,10 +100,10 @@ type service struct {
mu sync.Mutex
context context.Context
- task rproc.Process
- processes map[string]rproc.Process
+ task process.Process
+ processes map[string]process.Process
events chan interface{}
- platform rproc.Platform
+ platform stdio.Platform
opts options.Options
ec chan proc.Exit
oomPoller *epoller
@@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string)
return cmd, nil
}
-func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
+func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, containerdBinary, containerdAddress)
if err != nil {
return "", err
@@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
break
}
if o.TypeUrl != options.OptionType {
- return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl)
+ return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl)
}
path = o.ConfigPath
default:
@@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
// Exec spawns an additional process inside the container.
-func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
+func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) {
s.mu.Lock()
p := s.processes[r.ExecID]
s.mu.Unlock()
@@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}
// ResizePty resizes the terminal of a process.
-func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
+func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
}
// Pause the container.
-func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
+func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Resume the container.
-func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
+func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Kill a process with the provided signal.
-func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
+func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
}
// CloseIO closes the I/O context of a process.
-func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
+func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
}
// Checkpoint checkpoints the container.
-func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}, nil
}
-func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
+func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) {
s.cancel()
os.Exit(0)
return empty, nil
@@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
// gvisor currently (as of 2020-03-03) only returns the total memory
// usage and current PID value[0]. However, we copy the common fields here
// so that future updates will propagate correct information. We're
- // using the metrics.Metrics structure so we're returning the same type
+ // using the cgroups.Metrics structure so we're returning the same type
// as runc.
//
// [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81
- data, err := typeurl.MarshalAny(&metrics.Metrics{
- CPU: &metrics.CPUStat{
- Usage: &metrics.CPUUsage{
+ data, err := typeurl.MarshalAny(&cgroups.Metrics{
+ CPU: &cgroups.CPUStat{
+ Usage: &cgroups.CPUUsage{
Total: stats.Cpu.Usage.Total,
Kernel: stats.Cpu.Usage.Kernel,
User: stats.Cpu.Usage.User,
PerCPU: stats.Cpu.Usage.Percpu,
},
- Throttling: &metrics.Throttle{
+ Throttling: &cgroups.Throttle{
Periods: stats.Cpu.Throttling.Periods,
ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods,
ThrottledTime: stats.Cpu.Throttling.ThrottledTime,
},
},
- Memory: &metrics.MemoryStat{
+ Memory: &cgroups.MemoryStat{
Cache: stats.Memory.Cache,
- Usage: &metrics.MemoryEntry{
+ Usage: &cgroups.MemoryEntry{
Limit: stats.Memory.Usage.Limit,
Usage: stats.Memory.Usage.Usage,
Max: stats.Memory.Usage.Max,
Failcnt: stats.Memory.Usage.Failcnt,
},
- Swap: &metrics.MemoryEntry{
+ Swap: &cgroups.MemoryEntry{
Limit: stats.Memory.Swap.Limit,
Usage: stats.Memory.Swap.Usage,
Max: stats.Memory.Swap.Max,
Failcnt: stats.Memory.Swap.Failcnt,
},
- Kernel: &metrics.MemoryEntry{
+ Kernel: &cgroups.MemoryEntry{
Limit: stats.Memory.Kernel.Limit,
Usage: stats.Memory.Kernel.Usage,
Max: stats.Memory.Kernel.Max,
Failcnt: stats.Memory.Kernel.Failcnt,
},
- KernelTCP: &metrics.MemoryEntry{
+ KernelTCP: &cgroups.MemoryEntry{
Limit: stats.Memory.KernelTCP.Limit,
Usage: stats.Memory.KernelTCP.Usage,
Max: stats.Memory.KernelTCP.Max,
Failcnt: stats.Memory.KernelTCP.Failcnt,
},
},
- Pids: &metrics.PidsStat{
+ Pids: &cgroups.PidsStat{
Current: stats.Pids.Current,
Limit: stats.Pids.Limit,
},
@@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}
// Update updates a running container.
-func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
p.SetExited(e.Status)
- s.events <- &eventstypes.TaskExit{
+ s.events <- &events.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(p.Pid()),
@@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
-func (s *service) allProcesses() (o []rproc.Process) {
+func (s *service) allProcesses() (o []process.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
@@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil
}
-func (s *service) forward(publisher events.Publisher) {
+func (s *service) forward(publisher shim.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
- fmt.Errorf("post event: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("post event: %w", err))
}
}
}
-func (s *service) getProcess(execID string) (rproc.Process, error) {
+func (s *service) getProcess(execID string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
if execID == "" {
@@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
func getTopic(e interface{}) string {
switch e.(type) {
- case *eventstypes.TaskCreate:
+ case *events.TaskCreate:
return runtime.TaskCreateEventTopic
- case *eventstypes.TaskStart:
+ case *events.TaskStart:
return runtime.TaskStartEventTopic
- case *eventstypes.TaskOOM:
+ case *events.TaskOOM:
return runtime.TaskOOMEventTopic
- case *eventstypes.TaskExit:
+ case *events.TaskExit:
return runtime.TaskExitEventTopic
- case *eventstypes.TaskDelete:
+ case *events.TaskDelete:
return runtime.TaskDeleteEventTopic
- case *eventstypes.TaskExecAdded:
+ case *events.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
- case *eventstypes.TaskExecStarted:
+ case *events.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
default:
log.L.Printf("no topic for type %#v", e)
@@ -795,7 +795,7 @@ func getTopic(e interface{}) string {
return runtime.TaskUnknownTopic
}
-func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
+func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
spec, err := utils.ReadSpec(r.Bundle)
if err != nil {
return nil, fmt.Errorf("read oci spec: %w", err)
@@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
}
runsc.FormatLogPath(r.ID, options.RunscConfig)
runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig)
- p := proc.New(r.ID, runtime, rproc.Stdio{
+ p := proc.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
p.IoGID = int(options.IoGid)
p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox
p.UserLog = utils.UserLogPath(spec)
- p.Monitor = shim.Default
+ p.Monitor = reaper.Default
return p, nil
}
diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go
index 257c58812..1800ab90b 100644
--- a/pkg/shim/v2/service_linux.go
+++ b/pkg/shim/v2/service_linux.go
@@ -32,7 +32,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
-func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
+func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, fmt.Errorf("uninitialized epoller")
}
@@ -47,9 +47,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil {
return nil, err
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
@@ -65,9 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err
}
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)