summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/sentry/fsimpl/ext/BUILD4
-rw-r--r--pkg/sentry/fsimpl/gofer/gofer.go24
-rw-r--r--pkg/sentry/fsimpl/gofer/regular_file.go4
-rw-r--r--pkg/sentry/fsimpl/kernfs/BUILD2
-rw-r--r--pkg/sentry/fsimpl/sys/BUILD2
-rw-r--r--pkg/sentry/fsimpl/testutil/BUILD2
-rw-r--r--pkg/sentry/kernel/timekeeper.go3
-rw-r--r--pkg/sentry/platform/kvm/machine_arm64_unsafe.go13
-rw-r--r--pkg/sentry/platform/ring0/entry_arm64.s13
-rw-r--r--pkg/sentry/socket/netstack/netstack.go6
-rw-r--r--pkg/sentry/time/parameters.go12
-rw-r--r--pkg/sentry/vfs/inotify.go6
-rw-r--r--pkg/shim/runsc/BUILD2
-rw-r--r--pkg/shim/runsc/runsc.go2
-rw-r--r--pkg/shim/runsc/utils.go2
-rw-r--r--pkg/shim/v1/proc/BUILD5
-rw-r--r--pkg/shim/v1/proc/deleted_state.go12
-rw-r--r--pkg/shim/v1/proc/exec.go12
-rw-r--r--pkg/shim/v1/proc/init.go26
-rw-r--r--pkg/shim/v1/proc/init_state.go18
-rw-r--r--pkg/shim/v1/proc/io.go8
-rw-r--r--pkg/shim/v1/proc/types.go7
-rw-r--r--pkg/shim/v1/shim/BUILD6
-rw-r--r--pkg/shim/v1/shim/api.go28
-rw-r--r--pkg/shim/v1/shim/platform.go6
-rw-r--r--pkg/shim/v1/shim/service.go96
-rw-r--r--pkg/shim/v1/utils/BUILD5
-rw-r--r--pkg/shim/v1/utils/annotations.go25
-rw-r--r--pkg/shim/v1/utils/utils.go7
-rw-r--r--pkg/shim/v1/utils/volumes.go4
-rw-r--r--pkg/shim/v1/utils/volumes_test.go45
-rw-r--r--pkg/shim/v2/BUILD8
-rw-r--r--pkg/shim/v2/api.go22
-rw-r--r--pkg/shim/v2/epoll.go11
-rw-r--r--pkg/shim/v2/runtimeoptions/BUILD20
-rw-r--r--pkg/shim/v2/runtimeoptions/runtimeoptions.go27
-rw-r--r--pkg/shim/v2/runtimeoptions/runtimeoptions.proto25
-rw-r--r--pkg/shim/v2/service.go102
-rw-r--r--pkg/shim/v2/service_linux.go6
-rw-r--r--pkg/sleep/BUILD1
-rw-r--r--pkg/sleep/sleep_test.go20
-rw-r--r--pkg/sleep/sleep_unsafe.go7
-rw-r--r--pkg/sync/BUILD1
-rw-r--r--pkg/sync/nocopy.go28
-rw-r--r--pkg/tcpip/header/BUILD4
-rw-r--r--pkg/tcpip/link/fdbased/endpoint_test.go73
-rw-r--r--pkg/tcpip/link/fdbased/packet_dispatchers.go2
-rw-r--r--pkg/tcpip/network/ipv4/BUILD2
-rw-r--r--pkg/tcpip/network/ipv6/BUILD2
-rw-r--r--pkg/tcpip/stack/BUILD16
-rw-r--r--pkg/tcpip/stack/conntrack.go277
-rw-r--r--pkg/tcpip/stack/iptables.go42
-rw-r--r--pkg/tcpip/stack/iptables_state.go40
-rw-r--r--pkg/tcpip/stack/iptables_types.go11
-rw-r--r--pkg/tcpip/stack/packet_buffer.go14
-rw-r--r--pkg/tcpip/stack/stack.go1
-rw-r--r--pkg/tcpip/tcpip.go10
-rw-r--r--pkg/tcpip/timer.go21
-rw-r--r--pkg/tcpip/transport/icmp/endpoint.go4
-rw-r--r--pkg/tcpip/transport/packet/endpoint.go8
-rw-r--r--pkg/tcpip/transport/raw/endpoint.go8
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go3
-rw-r--r--pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go5
-rw-r--r--pkg/tcpip/transport/udp/endpoint.go14
-rw-r--r--pkg/tcpip/transport/udp/udp_test.go104
-rw-r--r--pkg/test/criutil/criutil.go3
-rw-r--r--pkg/test/dockerutil/dockerutil.go5
-rw-r--r--pkg/test/testutil/BUILD2
68 files changed, 1002 insertions, 354 deletions
diff --git a/pkg/sentry/fsimpl/ext/BUILD b/pkg/sentry/fsimpl/ext/BUILD
index ef24f8159..abc610ef3 100644
--- a/pkg/sentry/fsimpl/ext/BUILD
+++ b/pkg/sentry/fsimpl/ext/BUILD
@@ -96,7 +96,7 @@ go_test(
"//pkg/syserror",
"//pkg/test/testutil",
"//pkg/usermem",
- "@com_github_google_go-cmp//cmp:go_default_library",
- "@com_github_google_go-cmp//cmp/cmpopts:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp/cmpopts:go_default_library",
],
)
diff --git a/pkg/sentry/fsimpl/gofer/gofer.go b/pkg/sentry/fsimpl/gofer/gofer.go
index 2b83094cd..b74d489a0 100644
--- a/pkg/sentry/fsimpl/gofer/gofer.go
+++ b/pkg/sentry/fsimpl/gofer/gofer.go
@@ -602,8 +602,14 @@ type dentry struct {
// returned by the server. dirents is protected by dirMu.
dirents []vfs.Dirent
- // Cached metadata; protected by metadataMu and accessed using atomic
- // memory operations unless otherwise specified.
+ // Cached metadata; protected by metadataMu.
+ // To access:
+ // - In situations where consistency is not required (like stat), these
+ // can be accessed using atomic operations only (without locking).
+ // - Lock metadataMu and can access without atomic operations.
+ // To mutate:
+ // - Lock metadataMu and use atomic operations to update because we might
+ // have atomic readers that don't hold the lock.
metadataMu sync.Mutex
ino inodeNumber // immutable
mode uint32 // type is immutable, perms are mutable
@@ -616,7 +622,7 @@ type dentry struct {
ctime int64
btime int64
// File size, protected by both metadataMu and dataMu (i.e. both must be
- // locked to mutate it).
+ // locked to mutate it; locking either is sufficient to access it).
size uint64
// nlink counts the number of hard links to this dentry. It's updated and
@@ -904,14 +910,14 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin
// Prepare for truncate.
if stat.Mask&linux.STATX_SIZE != 0 {
- switch d.mode & linux.S_IFMT {
- case linux.S_IFREG:
+ switch mode.FileType() {
+ case linux.ModeRegular:
if !setLocalMtime {
// Truncate updates mtime.
setLocalMtime = true
stat.Mtime.Nsec = linux.UTIME_NOW
}
- case linux.S_IFDIR:
+ case linux.ModeDirectory:
return syserror.EISDIR
default:
return syserror.EINVAL
@@ -994,7 +1000,7 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin
func (d *dentry) updateFileSizeLocked(newSize uint64) {
d.dataMu.Lock()
oldSize := d.size
- d.size = newSize
+ atomic.StoreUint64(&d.size, newSize)
// d.dataMu must be unlocked to lock d.mapsMu and invalidate mappings
// below. This allows concurrent calls to Read/Translate/etc. These
// functions synchronize with truncation by refusing to use cache
@@ -1340,8 +1346,8 @@ func (d *dentry) removexattr(ctx context.Context, creds *auth.Credentials, name
// Extended attributes in the user.* namespace are only supported for regular
// files and directories.
func (d *dentry) userXattrSupported() bool {
- filetype := linux.S_IFMT & atomic.LoadUint32(&d.mode)
- return filetype == linux.S_IFREG || filetype == linux.S_IFDIR
+ filetype := linux.FileMode(atomic.LoadUint32(&d.mode)).FileType()
+ return filetype == linux.ModeRegular || filetype == linux.ModeDirectory
}
// Preconditions: !d.isSynthetic(). d.isRegularFile() || d.isDir().
diff --git a/pkg/sentry/fsimpl/gofer/regular_file.go b/pkg/sentry/fsimpl/gofer/regular_file.go
index a2f02d9c7..f10350c97 100644
--- a/pkg/sentry/fsimpl/gofer/regular_file.go
+++ b/pkg/sentry/fsimpl/gofer/regular_file.go
@@ -89,7 +89,9 @@ func (fd *regularFileFD) Allocate(ctx context.Context, mode, offset, length uint
if err != nil {
return err
}
- d.size = size
+ d.dataMu.Lock()
+ atomic.StoreUint64(&d.size, size)
+ d.dataMu.Unlock()
if !d.cachedMetadataAuthoritative() {
d.touchCMtimeLocked()
}
diff --git a/pkg/sentry/fsimpl/kernfs/BUILD b/pkg/sentry/fsimpl/kernfs/BUILD
index 179df6c1e..3835557fe 100644
--- a/pkg/sentry/fsimpl/kernfs/BUILD
+++ b/pkg/sentry/fsimpl/kernfs/BUILD
@@ -70,6 +70,6 @@ go_test(
"//pkg/sentry/vfs",
"//pkg/syserror",
"//pkg/usermem",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/sentry/fsimpl/sys/BUILD b/pkg/sentry/fsimpl/sys/BUILD
index a741e2bb6..1b548ccd4 100644
--- a/pkg/sentry/fsimpl/sys/BUILD
+++ b/pkg/sentry/fsimpl/sys/BUILD
@@ -29,6 +29,6 @@ go_test(
"//pkg/sentry/kernel",
"//pkg/sentry/kernel/auth",
"//pkg/sentry/vfs",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/sentry/fsimpl/testutil/BUILD b/pkg/sentry/fsimpl/testutil/BUILD
index 0e4053a46..400a97996 100644
--- a/pkg/sentry/fsimpl/testutil/BUILD
+++ b/pkg/sentry/fsimpl/testutil/BUILD
@@ -32,6 +32,6 @@ go_library(
"//pkg/sentry/vfs",
"//pkg/sync",
"//pkg/usermem",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/sentry/kernel/timekeeper.go b/pkg/sentry/kernel/timekeeper.go
index 0adf25691..5f3908d8b 100644
--- a/pkg/sentry/kernel/timekeeper.go
+++ b/pkg/sentry/kernel/timekeeper.go
@@ -210,9 +210,6 @@ func (t *Timekeeper) startUpdater() {
p.realtimeBaseRef = int64(realtimeParams.BaseRef)
p.realtimeFrequency = realtimeParams.Frequency
}
-
- log.Debugf("Updating VDSO parameters: %+v", p)
-
return p
}); err != nil {
log.Warningf("Unable to update VDSO parameter page: %v", err)
diff --git a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go
index 8bed34922..3de309c1a 100644
--- a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go
+++ b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go
@@ -78,19 +78,6 @@ func (c *vCPU) initArchState() error {
return err
}
- // sctlr_el1
- regGet.id = _KVM_ARM64_REGS_SCTLR_EL1
- if err := c.getOneRegister(&regGet); err != nil {
- return err
- }
-
- dataGet |= (_SCTLR_M | _SCTLR_C | _SCTLR_I)
- data = dataGet
- reg.id = _KVM_ARM64_REGS_SCTLR_EL1
- if err := c.setOneRegister(&reg); err != nil {
- return err
- }
-
// tcr_el1
data = _TCR_TXSZ_VA48 | _TCR_CACHE_FLAGS | _TCR_SHARED | _TCR_TG_FLAGS | _TCR_ASID16 | _TCR_IPS_40BITS
reg.id = _KVM_ARM64_REGS_TCR_EL1
diff --git a/pkg/sentry/platform/ring0/entry_arm64.s b/pkg/sentry/platform/ring0/entry_arm64.s
index 2bc5f3ecd..6ed73699b 100644
--- a/pkg/sentry/platform/ring0/entry_arm64.s
+++ b/pkg/sentry/platform/ring0/entry_arm64.s
@@ -40,6 +40,14 @@
#define FPEN_ENABLE (FPEN_NOTRAP << FPEN_SHIFT)
+// sctlr_el1: system control register el1.
+#define SCTLR_M 1 << 0
+#define SCTLR_C 1 << 2
+#define SCTLR_I 1 << 12
+#define SCTLR_UCT 1 << 15
+
+#define SCTLR_EL1_DEFAULT (SCTLR_M | SCTLR_C | SCTLR_I | SCTLR_UCT)
+
// Saves a register set.
//
// This is a macro because it may need to executed in contents where a stack is
@@ -496,6 +504,11 @@ TEXT ·kernelExitToEl1(SB),NOSPLIT,$0
// Start is the CPU entrypoint.
TEXT ·Start(SB),NOSPLIT,$0
IRQ_DISABLE
+
+ // Init.
+ MOVD $SCTLR_EL1_DEFAULT, R1
+ MSR R1, SCTLR_EL1
+
MOVD R8, RSV_REG
ORR $0xffff000000000000, RSV_REG, RSV_REG
WORD $0xd518d092 //MSR R18, TPIDR_EL1
diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go
index 3b248a953..78a842973 100644
--- a/pkg/sentry/socket/netstack/netstack.go
+++ b/pkg/sentry/socket/netstack/netstack.go
@@ -192,6 +192,7 @@ var Metrics = tcpip.Stats{
PacketsSent: mustCreateMetric("/netstack/udp/packets_sent", "Number of UDP datagrams sent."),
PacketSendErrors: mustCreateMetric("/netstack/udp/packet_send_errors", "Number of UDP datagrams failed to be sent."),
ChecksumErrors: mustCreateMetric("/netstack/udp/checksum_errors", "Number of UDP datagrams dropped due to bad checksums."),
+ InvalidSourceAddress: mustCreateMetric("/netstack/udp/invalid_source", "Number of UDP datagrams dropped due to invalid source address."),
},
}
@@ -1753,6 +1754,11 @@ func setSockOptSocket(t *kernel.Task, s socket.SocketOps, ep commonEndpoint, nam
return nil
+ case linux.SO_DETACH_FILTER:
+ // optval is ignored.
+ var v tcpip.SocketDetachFilterOption
+ return syserr.TranslateNetstackError(ep.SetSockOpt(v))
+
default:
socket.SetSockOptEmitUnimplementedEvent(t, name)
}
diff --git a/pkg/sentry/time/parameters.go b/pkg/sentry/time/parameters.go
index 65868cb26..cd1b95117 100644
--- a/pkg/sentry/time/parameters.go
+++ b/pkg/sentry/time/parameters.go
@@ -228,11 +228,15 @@ func errorAdjust(prevParams Parameters, newParams Parameters, now TSCValue) (Par
//
// The log level is determined by the error severity.
func logErrorAdjustment(clock ClockID, errorNS ReferenceNS, orig, adjusted Parameters) {
- fn := log.Debugf
- if int64(errorNS.Magnitude()) > time.Millisecond.Nanoseconds() {
+ magNS := int64(errorNS.Magnitude())
+ if magNS <= 10*time.Microsecond.Nanoseconds() {
+ // Don't log small errors.
+ return
+ }
+ fn := log.Infof
+ if magNS > time.Millisecond.Nanoseconds() {
+ // Upgrade large errors to warning.
fn = log.Warningf
- } else if int64(errorNS.Magnitude()) > 10*time.Microsecond.Nanoseconds() {
- fn = log.Infof
}
fn("Clock(%v): error: %v ns, adjusted frequency from %v Hz to %v Hz", clock, errorNS, orig.Frequency, adjusted.Frequency)
diff --git a/pkg/sentry/vfs/inotify.go b/pkg/sentry/vfs/inotify.go
index c2e21ac5f..167b731ac 100644
--- a/pkg/sentry/vfs/inotify.go
+++ b/pkg/sentry/vfs/inotify.go
@@ -179,12 +179,12 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask {
return mask & ready
}
-// PRead implements FileDescriptionImpl.
+// PRead implements FileDescriptionImpl.PRead.
func (*Inotify) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts ReadOptions) (int64, error) {
return 0, syserror.ESPIPE
}
-// PWrite implements FileDescriptionImpl.
+// PWrite implements FileDescriptionImpl.PWrite.
func (*Inotify) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts WriteOptions) (int64, error) {
return 0, syserror.ESPIPE
}
@@ -243,7 +243,7 @@ func (i *Inotify) Read(ctx context.Context, dst usermem.IOSequence, opts ReadOpt
return writeLen, nil
}
-// Ioctl implements fs.FileOperations.Ioctl.
+// Ioctl implements FileDescriptionImpl.Ioctl.
func (i *Inotify) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) {
switch args[1].Int() {
case linux.FIONREAD:
diff --git a/pkg/shim/runsc/BUILD b/pkg/shim/runsc/BUILD
index 4d388ca05..f08599ebd 100644
--- a/pkg/shim/runsc/BUILD
+++ b/pkg/shim/runsc/BUILD
@@ -11,6 +11,6 @@ go_library(
visibility = ["//:sandbox"],
deps = [
"@com_github_containerd_go_runc//:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
],
)
diff --git a/pkg/shim/runsc/runsc.go b/pkg/shim/runsc/runsc.go
index 50807d0c5..c5cf68efa 100644
--- a/pkg/shim/runsc/runsc.go
+++ b/pkg/shim/runsc/runsc.go
@@ -1,5 +1,5 @@
// Copyright 2018 The containerd Authors.
-// Copyright 2018 The gVisor authors.
+// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/shim/runsc/utils.go b/pkg/shim/runsc/utils.go
index 2732d5e98..c514b3bc7 100644
--- a/pkg/shim/runsc/utils.go
+++ b/pkg/shim/runsc/utils.go
@@ -1,5 +1,5 @@
// Copyright 2018 The containerd Authors.
-// Copyright 2018 The gVisor authors.
+// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/shim/v1/proc/BUILD b/pkg/shim/v1/proc/BUILD
index a59bf198d..4377306af 100644
--- a/pkg/shim/v1/proc/BUILD
+++ b/pkg/shim/v1/proc/BUILD
@@ -25,11 +25,12 @@ go_library(
"@com_github_containerd_containerd//errdefs:go_default_library",
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_go_runc//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
],
)
diff --git a/pkg/shim/v1/proc/deleted_state.go b/pkg/shim/v1/proc/deleted_state.go
index 52aad40ac..d9b970c4d 100644
--- a/pkg/shim/v1/proc/deleted_state.go
+++ b/pkg/shim/v1/proc/deleted_state.go
@@ -21,29 +21,29 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
)
type deletedState struct{}
func (*deletedState) Resize(ws console.WinSize) error {
- return fmt.Errorf("cannot resize a deleted process")
+ return fmt.Errorf("cannot resize a deleted process.ss")
}
func (*deletedState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a deleted process")
+ return fmt.Errorf("cannot start a deleted process.ss")
}
func (*deletedState) Delete(ctx context.Context) error {
- return fmt.Errorf("cannot delete a deleted process: %w", errdefs.ErrNotFound)
+ return fmt.Errorf("cannot delete a deleted process.ss: %w", errdefs.ErrNotFound)
}
func (*deletedState) Kill(ctx context.Context, sig uint32, all bool) error {
- return fmt.Errorf("cannot kill a deleted process: %w", errdefs.ErrNotFound)
+ return fmt.Errorf("cannot kill a deleted process.ss: %w", errdefs.ErrNotFound)
}
func (*deletedState) SetExited(status int) {}
-func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return nil, fmt.Errorf("cannot exec in a deleted state")
}
diff --git a/pkg/shim/v1/proc/exec.go b/pkg/shim/v1/proc/exec.go
index 4ef9cf6cf..1d1d90488 100644
--- a/pkg/shim/v1/proc/exec.go
+++ b/pkg/shim/v1/proc/exec.go
@@ -27,7 +27,7 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
@@ -51,7 +51,7 @@ type execProcess struct {
internalPid int
closers []io.Closer
stdin io.Closer
- stdio proc.Stdio
+ stdio stdio.Stdio
path string
spec specs.Process
@@ -164,7 +164,7 @@ func (e *execProcess) Stdin() io.Closer {
return e.stdin
}
-func (e *execProcess) Stdio() proc.Stdio {
+func (e *execProcess) Stdio() stdio.Stdio {
return e.stdio
}
@@ -223,7 +223,6 @@ func (e *execProcess) start(ctx context.Context) (err error) {
e.closers = append(e.closers, sc)
e.stdin = sc
}
- var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@@ -231,15 +230,14 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("failed to retrieve console master: %w", err)
}
- if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
+ if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return fmt.Errorf("failed to start console copy: %w", err)
}
} else if !e.stdio.IsNull() {
- if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
+ if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return fmt.Errorf("failed to start io pipe copy: %w", err)
}
}
- copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return fmt.Errorf("failed to retrieve OCI runtime exec pid: %w", err)
diff --git a/pkg/shim/v1/proc/init.go b/pkg/shim/v1/proc/init.go
index b429cb94f..dab3123d6 100644
--- a/pkg/shim/v1/proc/init.go
+++ b/pkg/shim/v1/proc/init.go
@@ -30,7 +30,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
@@ -59,7 +60,7 @@ type Init struct {
id string
Bundle string
console console.Console
- Platform proc.Platform
+ Platform stdio.Platform
io runc.IO
runtime *runsc.Runsc
status int
@@ -67,7 +68,7 @@ type Init struct {
pid int
closers []io.Closer
stdin io.Closer
- stdio proc.Stdio
+ stdio stdio.Stdio
Rootfs string
IoUID int
IoGID int
@@ -92,7 +93,7 @@ func NewRunsc(root, path, namespace, runtime string, config map[string]string) *
}
// New returns a new init process.
-func New(id string, runtime *runsc.Runsc, stdio proc.Stdio) *Init {
+func New(id string, runtime *runsc.Runsc, stdio stdio.Stdio) *Init {
p := &Init{
id: id,
runtime: runtime,
@@ -144,7 +145,6 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) {
p.stdin = sc
p.closers = append(p.closers, sc)
}
- var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
@@ -152,18 +152,16 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) {
if err != nil {
return fmt.Errorf("failed to retrieve console master: %w", err)
}
- console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
+ console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg)
if err != nil {
return fmt.Errorf("failed to start console copy: %w", err)
}
p.console = console
} else if !hasNoIO(r) {
- if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
+ if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg); err != nil {
return fmt.Errorf("failed to start io pipe copy: %w", err)
}
}
-
- copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err)
@@ -391,7 +389,7 @@ func (p *Init) Runtime() *runsc.Runsc {
}
// Exec returns a new child process.
-func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
p.mu.Lock()
defer p.mu.Unlock()
@@ -399,7 +397,7 @@ func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
}
// exec returns a new exec'd process.
-func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
@@ -412,7 +410,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
path: path,
parent: p,
spec: spec,
- stdio: proc.Stdio{
+ stdio: stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -425,7 +423,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce
}
// Stdio returns the stdio of the process.
-func (p *Init) Stdio() proc.Stdio {
+func (p *Init) Stdio() stdio.Stdio {
return p.stdio
}
@@ -453,7 +451,7 @@ func (p *Init) convertStatus(status string) string {
return status
}
-func withConditionalIO(c proc.Stdio) runc.IOOpt {
+func withConditionalIO(c stdio.Stdio) runc.IOOpt {
return func(o *runc.IOOption) {
o.OpenStdin = c.Stdin != ""
o.OpenStdout = c.Stdout != ""
diff --git a/pkg/shim/v1/proc/init_state.go b/pkg/shim/v1/proc/init_state.go
index 509f27762..9233ecc85 100644
--- a/pkg/shim/v1/proc/init_state.go
+++ b/pkg/shim/v1/proc/init_state.go
@@ -21,14 +21,14 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/runtime/proc"
+ "github.com/containerd/containerd/pkg/process"
)
type initState interface {
Resize(console.WinSize) error
Start(context.Context) error
Delete(context.Context) error
- Exec(context.Context, string, *ExecConfig) (proc.Process, error)
+ Exec(context.Context, string, *ExecConfig) (process.Process, error)
Kill(context.Context, uint32, bool) error
SetExited(int)
}
@@ -94,7 +94,7 @@ func (s *createdState) SetExited(status int) {
}
}
-func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return s.p.exec(ctx, path, r)
}
@@ -117,11 +117,11 @@ func (s *runningState) Resize(ws console.WinSize) error {
}
func (s *runningState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a running process")
+ return fmt.Errorf("cannot start a running process.ss")
}
func (s *runningState) Delete(ctx context.Context) error {
- return fmt.Errorf("cannot delete a running process")
+ return fmt.Errorf("cannot delete a running process.ss")
}
func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error {
@@ -136,7 +136,7 @@ func (s *runningState) SetExited(status int) {
}
}
-func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return s.p.exec(ctx, path, r)
}
@@ -159,7 +159,7 @@ func (s *stoppedState) Resize(ws console.WinSize) error {
}
func (s *stoppedState) Start(ctx context.Context) error {
- return fmt.Errorf("cannot start a stopped process")
+ return fmt.Errorf("cannot start a stopped process.ss")
}
func (s *stoppedState) Delete(ctx context.Context) error {
@@ -170,13 +170,13 @@ func (s *stoppedState) Delete(ctx context.Context) error {
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
- return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
+ return errdefs.ToGRPCf(errdefs.ErrNotFound, "process.ss %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {
// no op
}
-func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) {
+func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) {
return nil, fmt.Errorf("cannot exec in a stopped state")
}
diff --git a/pkg/shim/v1/proc/io.go b/pkg/shim/v1/proc/io.go
index 5313c7a50..34d825fb7 100644
--- a/pkg/shim/v1/proc/io.go
+++ b/pkg/shim/v1/proc/io.go
@@ -38,7 +38,7 @@ var bufPool = sync.Pool{
},
}
-func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
+func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error {
var sameFile *countingWriteCloser
for _, i := range []struct {
name string
@@ -48,9 +48,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil {
@@ -67,9 +65,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil {
@@ -124,9 +120,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w
if err != nil {
return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err)
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
diff --git a/pkg/shim/v1/proc/types.go b/pkg/shim/v1/proc/types.go
index 5c215de5f..2b0df4663 100644
--- a/pkg/shim/v1/proc/types.go
+++ b/pkg/shim/v1/proc/types.go
@@ -18,9 +18,8 @@ package proc
import (
"time"
- google_protobuf "github.com/gogo/protobuf/types"
-
runc "github.com/containerd/go-runc"
+ "github.com/gogo/protobuf/types"
)
// Mount holds filesystem mount configuration.
@@ -41,7 +40,7 @@ type CreateConfig struct {
Stdin string
Stdout string
Stderr string
- Options *google_protobuf.Any
+ Options *types.Any
}
// ExecConfig holds exec creation configuration.
@@ -51,7 +50,7 @@ type ExecConfig struct {
Stdin string
Stdout string
Stderr string
- Spec *google_protobuf.Any
+ Spec *types.Any
}
// Exit is the type of exit events.
diff --git a/pkg/shim/v1/shim/BUILD b/pkg/shim/v1/shim/BUILD
index 02cffbb01..05c595bc9 100644
--- a/pkg/shim/v1/shim/BUILD
+++ b/pkg/shim/v1/shim/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "shim",
srcs = [
+ "api.go",
"platform.go",
"service.go",
],
@@ -24,11 +25,12 @@ go_library(
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
"@com_github_containerd_containerd//namespaces:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_containerd//runtime:go_default_library",
"@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
- "@com_github_containerd_containerd//runtime/v1/shim:go_default_library",
"@com_github_containerd_containerd//runtime/v1/shim/v1:go_default_library",
+ "@com_github_containerd_containerd//sys/reaper:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_typeurl//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
diff --git a/pkg/shim/v1/shim/api.go b/pkg/shim/v1/shim/api.go
new file mode 100644
index 000000000..5dd8ff172
--- /dev/null
+++ b/pkg/shim/v1/shim/api.go
@@ -0,0 +1,28 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package shim
+
+import (
+ "github.com/containerd/containerd/api/events"
+)
+
+type TaskCreate = events.TaskCreate
+type TaskStart = events.TaskStart
+type TaskOOM = events.TaskOOM
+type TaskExit = events.TaskExit
+type TaskDelete = events.TaskDelete
+type TaskExecAdded = events.TaskExecAdded
+type TaskExecStarted = events.TaskExecStarted
diff --git a/pkg/shim/v1/shim/platform.go b/pkg/shim/v1/shim/platform.go
index f6795563c..f590f80ef 100644
--- a/pkg/shim/v1/shim/platform.go
+++ b/pkg/shim/v1/shim/platform.go
@@ -30,7 +30,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
-func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
+func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, fmt.Errorf("uninitialized epoller")
}
@@ -45,9 +45,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil {
return nil, err
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
@@ -63,9 +61,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err
}
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
diff --git a/pkg/shim/v1/shim/service.go b/pkg/shim/v1/shim/service.go
index 7b0280ed2..84a810cb2 100644
--- a/pkg/shim/v1/shim/service.go
+++ b/pkg/shim/v1/shim/service.go
@@ -23,20 +23,20 @@ import (
"sync"
"github.com/containerd/console"
- eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
- rproc "github.com/containerd/containerd/runtime/proc"
- "github.com/containerd/containerd/runtime/v1/shim"
- shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
+ shim "github.com/containerd/containerd/runtime/v1/shim/v1"
+ "github.com/containerd/containerd/sys/reaper"
"github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
+ "github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,7 +46,7 @@ import (
)
var (
- empty = &ptypes.Empty{}
+ empty = &types.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
@@ -73,7 +73,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
s := &Service{
config: config,
context: ctx,
- processes: make(map[string]rproc.Process),
+ processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
}
@@ -91,9 +91,9 @@ type Service struct {
config Config
context context.Context
- processes map[string]rproc.Process
+ processes map[string]process.Process
events chan interface{}
- platform rproc.Platform
+ platform stdio.Platform
ec chan proc.Exit
// Filled by Create()
@@ -102,7 +102,7 @@ type Service struct {
}
// Create creates a new initial process and container with the underlying OCI runtime.
-func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
+func (s *Service) Create(ctx context.Context, r *shim.CreateTaskRequest) (_ *shim.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -168,13 +168,13 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
- return &shimapi.CreateTaskResponse{
+ return &shim.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
// Start starts a process.
-func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
+func (s *Service) Start(ctx context.Context, r *shim.StartRequest) (*shim.StartResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -182,14 +182,14 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
if err := p.Start(ctx); err != nil {
return nil, err
}
- return &shimapi.StartResponse{
+ return &shim.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
// Delete deletes the initial process and container.
-func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
+func (s *Service) Delete(ctx context.Context, r *types.Empty) (*shim.DeleteResponse, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
@@ -201,7 +201,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR
delete(s.processes, s.id)
s.mu.Unlock()
s.platform.Close()
- return &shimapi.DeleteResponse{
+ return &shim.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
@@ -209,7 +209,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR
}
// DeleteProcess deletes an exec'd process.
-func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
+func (s *Service) DeleteProcess(ctx context.Context, r *shim.DeleteProcessRequest) (*shim.DeleteResponse, error) {
if r.ID == s.id {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
@@ -223,7 +223,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
s.mu.Lock()
delete(s.processes, r.ID)
s.mu.Unlock()
- return &shimapi.DeleteResponse{
+ return &shim.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
@@ -231,7 +231,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
}
// Exec spawns an additional process inside the container.
-func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
+func (s *Service) Exec(ctx context.Context, r *shim.ExecProcessRequest) (*types.Empty, error) {
s.mu.Lock()
if p := s.processes[r.ID]; p != nil {
@@ -263,7 +263,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty
}
// ResizePty resises the terminal of a process.
-func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
+func (s *Service) ResizePty(ctx context.Context, r *shim.ResizePtyRequest) (*types.Empty, error) {
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
}
@@ -282,7 +282,7 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
}
// State returns runtime state information for a process.
-func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
+func (s *Service) State(ctx context.Context, r *shim.StateRequest) (*shim.StateResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -301,7 +301,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
status = task.StatusStopped
}
sio := p.Stdio()
- return &shimapi.StateResponse{
+ return &shim.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Pid: uint32(p.Pid()),
@@ -316,17 +316,17 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
}
// Pause pauses the container.
-func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
+func (s *Service) Pause(ctx context.Context, r *types.Empty) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Resume resumes the container.
-func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
+func (s *Service) Resume(ctx context.Context, r *types.Empty) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Kill kills a process with the provided signal.
-func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
+func (s *Service) Kill(ctx context.Context, r *shim.KillRequest) (*types.Empty, error) {
if r.ID == "" {
p, err := s.getInitProcess()
if err != nil {
@@ -349,7 +349,7 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Emp
}
// ListPids returns all pids inside the container.
-func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
+func (s *Service) ListPids(ctx context.Context, r *shim.ListPidsRequest) (*shim.ListPidsResponse, error) {
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
@@ -374,13 +374,13 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh
}
processes = append(processes, &pInfo)
}
- return &shimapi.ListPidsResponse{
+ return &shim.ListPidsResponse{
Processes: processes,
}, nil
}
// CloseIO closes the I/O context of a process.
-func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
+func (s *Service) CloseIO(ctx context.Context, r *shim.CloseIORequest) (*types.Empty, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
@@ -394,31 +394,31 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptyp
}
// Checkpoint checkpoints the container.
-func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
+func (s *Service) Checkpoint(ctx context.Context, r *shim.CheckpointTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// ShimInfo returns shim information such as the shim's pid.
-func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
- return &shimapi.ShimInfoResponse{
+func (s *Service) ShimInfo(ctx context.Context, r *types.Empty) (*shim.ShimInfoResponse, error) {
+ return &shim.ShimInfoResponse{
ShimPid: uint32(os.Getpid()),
}, nil
}
// Update updates a running container.
-func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
+func (s *Service) Update(ctx context.Context, r *shim.UpdateTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Wait waits for a process to exit.
-func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
+func (s *Service) Wait(ctx context.Context, r *shim.WaitRequest) (*shim.WaitResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
p.Wait()
- return &shimapi.WaitResponse{
+ return &shim.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
@@ -430,11 +430,11 @@ func (s *Service) processExits() {
}
}
-func (s *Service) allProcesses() []rproc.Process {
+func (s *Service) allProcesses() []process.Process {
s.mu.Lock()
defer s.mu.Unlock()
- res := make([]rproc.Process, 0, len(s.processes))
+ res := make([]process.Process, 0, len(s.processes))
for _, p := range s.processes {
res = append(res, p)
}
@@ -452,7 +452,7 @@ func (s *Service) checkProcesses(e proc.Exit) {
}
}
p.SetExited(e.Status)
- s.events <- &eventstypes.TaskExit{
+ s.events <- &TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(p.Pid()),
@@ -490,7 +490,7 @@ func (s *Service) forward(publisher events.Publisher) {
}
// getInitProcess returns the init process.
-func (s *Service) getInitProcess() (rproc.Process, error) {
+func (s *Service) getInitProcess() (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
@@ -501,7 +501,7 @@ func (s *Service) getInitProcess() (rproc.Process, error) {
}
// getExecProcess returns the given exec process.
-func (s *Service) getExecProcess(id string) (rproc.Process, error) {
+func (s *Service) getExecProcess(id string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[id]
@@ -513,19 +513,19 @@ func (s *Service) getExecProcess(id string) (rproc.Process, error) {
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
- case *eventstypes.TaskCreate:
+ case *TaskCreate:
return runtime.TaskCreateEventTopic
- case *eventstypes.TaskStart:
+ case *TaskStart:
return runtime.TaskStartEventTopic
- case *eventstypes.TaskOOM:
+ case *TaskOOM:
return runtime.TaskOOMEventTopic
- case *eventstypes.TaskExit:
+ case *TaskExit:
return runtime.TaskExitEventTopic
- case *eventstypes.TaskDelete:
+ case *TaskDelete:
return runtime.TaskDeleteEventTopic
- case *eventstypes.TaskExecAdded:
+ case *TaskExecAdded:
return runtime.TaskExecAddedEventTopic
- case *eventstypes.TaskExecStarted:
+ case *TaskExecStarted:
return runtime.TaskExecStartedEventTopic
default:
log.L.Printf("no topic for type %#v", e)
@@ -533,7 +533,7 @@ func getTopic(ctx context.Context, e interface{}) string {
return runtime.TaskUnknownTopic
}
-func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) {
+func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform stdio.Platform, r *proc.CreateConfig) (*proc.Init, error) {
var options runctypes.CreateOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
@@ -554,7 +554,7 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string,
runsc.FormatLogPath(r.ID, config)
rootfs := filepath.Join(path, "rootfs")
runtime := proc.NewRunsc(runtimeRoot, path, namespace, r.Runtime, config)
- p := proc.New(r.ID, runtime, rproc.Stdio{
+ p := proc.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -568,6 +568,6 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string,
p.IoGID = int(options.IoGid)
p.Sandbox = utils.IsSandbox(spec)
p.UserLog = utils.UserLogPath(spec)
- p.Monitor = shim.Default
+ p.Monitor = reaper.Default
return p, nil
}
diff --git a/pkg/shim/v1/utils/BUILD b/pkg/shim/v1/utils/BUILD
index 9045781e1..54a0aabb7 100644
--- a/pkg/shim/v1/utils/BUILD
+++ b/pkg/shim/v1/utils/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "utils",
srcs = [
+ "annotations.go",
"utils.go",
"volumes.go",
],
@@ -13,8 +14,7 @@ go_library(
"//shim:__subpackages__",
],
deps = [
- "@com_github_containerd_cri//pkg/annotations:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
],
)
@@ -23,4 +23,5 @@ go_test(
size = "small",
srcs = ["volumes_test.go"],
library = ":utils",
+ deps = ["@com_github_opencontainers_runtime_spec//specs-go:go_default_library"],
)
diff --git a/pkg/shim/v1/utils/annotations.go b/pkg/shim/v1/utils/annotations.go
new file mode 100644
index 000000000..1e9d3f365
--- /dev/null
+++ b/pkg/shim/v1/utils/annotations.go
@@ -0,0 +1,25 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+// Annotations from the CRI annotations package.
+//
+// These are vendor due to import conflicts.
+const (
+ sandboxLogDirAnnotation = "io.kubernetes.cri.sandbox-log-directory"
+ containerTypeAnnotation = "io.kubernetes.cri.container-type"
+ containerTypeSandbox = "sandbox"
+ containerTypeContainer = "container"
+)
diff --git a/pkg/shim/v1/utils/utils.go b/pkg/shim/v1/utils/utils.go
index 7a400af1c..07e346654 100644
--- a/pkg/shim/v1/utils/utils.go
+++ b/pkg/shim/v1/utils/utils.go
@@ -20,7 +20,6 @@ import (
"os"
"path/filepath"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -43,13 +42,13 @@ func ReadSpec(bundle string) (*specs.Spec, error) {
// IsSandbox checks whether a container is a sandbox container.
func IsSandbox(spec *specs.Spec) bool {
- t, ok := spec.Annotations[annotations.ContainerType]
- return !ok || t == annotations.ContainerTypeSandbox
+ t, ok := spec.Annotations[containerTypeAnnotation]
+ return !ok || t == containerTypeSandbox
}
// UserLogPath gets user log path from OCI annotation.
func UserLogPath(spec *specs.Spec) string {
- sandboxLogDir := spec.Annotations[annotations.SandboxLogDir]
+ sandboxLogDir := spec.Annotations[sandboxLogDirAnnotation]
if sandboxLogDir == "" {
return ""
}
diff --git a/pkg/shim/v1/utils/volumes.go b/pkg/shim/v1/utils/volumes.go
index e4e9bf9b1..52a428179 100644
--- a/pkg/shim/v1/utils/volumes.go
+++ b/pkg/shim/v1/utils/volumes.go
@@ -21,7 +21,6 @@ import (
"path/filepath"
"strings"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -44,7 +43,7 @@ func volumeFieldName(k string) string {
// podUID gets pod UID from the pod log path.
func podUID(s *specs.Spec) (string, error) {
- sandboxLogDir := s.Annotations[annotations.SandboxLogDir]
+ sandboxLogDir := s.Annotations[sandboxLogDirAnnotation]
if sandboxLogDir == "" {
return "", fmt.Errorf("no sandbox log path annotation")
}
@@ -101,7 +100,6 @@ func UpdateVolumeAnnotations(bundle string, s *specs.Spec) error {
if err != nil {
// Skip if we can't get pod UID, because this doesn't work
// for containerd 1.1.
- fmt.Errorf("Can't get pod uid: %w", err)
return nil
}
}
diff --git a/pkg/shim/v1/utils/volumes_test.go b/pkg/shim/v1/utils/volumes_test.go
index 4b2639545..3e02c6151 100644
--- a/pkg/shim/v1/utils/volumes_test.go
+++ b/pkg/shim/v1/utils/volumes_test.go
@@ -23,7 +23,6 @@ import (
"reflect"
"testing"
- "github.com/containerd/cri/pkg/annotations"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -58,8 +57,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "volume annotations for sandbox",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -67,8 +66,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -81,8 +80,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "volume annotations for sandbox with legacy log path",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLegacyLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLegacyLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -90,8 +89,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLegacyLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLegacyLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -118,7 +117,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -140,7 +139,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -160,7 +159,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "container",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "bind",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -176,7 +175,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "container",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "bind",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -188,7 +187,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "should not return error without pod log directory",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -196,7 +195,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount." + testVolumeName + ".share": "pod",
"dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs",
"dev.gvisor.spec.mount." + testVolumeName + ".options": "ro",
@@ -207,8 +206,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "should return error if volume path does not exist",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
"dev.gvisor.spec.mount.notexist.share": "pod",
"dev.gvisor.spec.mount.notexist.type": "tmpfs",
"dev.gvisor.spec.mount.notexist.options": "ro",
@@ -220,14 +219,14 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
desc: "no volume annotations for sandbox",
spec: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
},
},
expected: &specs.Spec{
Annotations: map[string]string{
- annotations.SandboxLogDir: testLogDirPath,
- annotations.ContainerType: annotations.ContainerTypeSandbox,
+ sandboxLogDirAnnotation: testLogDirPath,
+ containerTypeAnnotation: containerTypeSandbox,
},
},
},
@@ -249,7 +248,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
},
},
expected: &specs.Spec{
@@ -268,7 +267,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) {
},
},
Annotations: map[string]string{
- annotations.ContainerType: annotations.ContainerTypeContainer,
+ containerTypeAnnotation: containerTypeContainer,
},
},
},
diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD
index 450f62979..7e0a114a0 100644
--- a/pkg/shim/v2/BUILD
+++ b/pkg/shim/v2/BUILD
@@ -5,6 +5,7 @@ package(licenses = ["notice"])
go_library(
name = "v2",
srcs = [
+ "api.go",
"epoll.go",
"service.go",
"service_linux.go",
@@ -15,10 +16,10 @@ go_library(
"//pkg/shim/v1/proc",
"//pkg/shim/v1/utils",
"//pkg/shim/v2/options",
+ "//pkg/shim/v2/runtimeoptions",
"//runsc/specutils",
"@com_github_burntsushi_toml//:go_default_library",
"@com_github_containerd_cgroups//:go_default_library",
- "@com_github_containerd_cgroups//stats/v1:go_default_library",
"@com_github_containerd_console//:go_default_library",
"@com_github_containerd_containerd//api/events:go_default_library",
"@com_github_containerd_containerd//api/types/task:go_default_library",
@@ -27,12 +28,13 @@ go_library(
"@com_github_containerd_containerd//log:go_default_library",
"@com_github_containerd_containerd//mount:go_default_library",
"@com_github_containerd_containerd//namespaces:go_default_library",
+ "@com_github_containerd_containerd//pkg/process:go_default_library",
+ "@com_github_containerd_containerd//pkg/stdio:go_default_library",
"@com_github_containerd_containerd//runtime:go_default_library",
"@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library",
- "@com_github_containerd_containerd//runtime/proc:go_default_library",
"@com_github_containerd_containerd//runtime/v2/shim:go_default_library",
"@com_github_containerd_containerd//runtime/v2/task:go_default_library",
- "@com_github_containerd_cri//pkg/api/runtimeoptions/v1:go_default_library",
+ "@com_github_containerd_containerd//sys/reaper:go_default_library",
"@com_github_containerd_fifo//:go_default_library",
"@com_github_containerd_typeurl//:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go
new file mode 100644
index 000000000..dbe5c59f6
--- /dev/null
+++ b/pkg/shim/v2/api.go
@@ -0,0 +1,22 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v2
+
+import (
+ "github.com/containerd/containerd/api/events"
+)
+
+type TaskOOM = events.TaskOOM
diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go
index 45cc38c2a..41232cca8 100644
--- a/pkg/shim/v2/epoll.go
+++ b/pkg/shim/v2/epoll.go
@@ -23,7 +23,6 @@ import (
"sync"
"github.com/containerd/cgroups"
- eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime"
"golang.org/x/sys/unix"
@@ -68,10 +67,11 @@ func (e *epoller) run(ctx context.Context) {
default:
n, err := unix.EpollWait(e.fd, events[:], -1)
if err != nil {
- if err == unix.EINTR {
+ if err == unix.EINTR || err == unix.EAGAIN {
continue
}
- fmt.Errorf("cgroups: epoll wait: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("cgroups: epoll wait: %w", err))
}
for i := 0; i < n; i++ {
e.process(ctx, uintptr(events[i].Fd))
@@ -114,10 +114,11 @@ func (e *epoller) process(ctx context.Context, fd uintptr) {
unix.Close(int(fd))
return
}
- if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
+ if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{
ContainerID: i.id,
}); err != nil {
- fmt.Errorf("publish OOM event: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("publish OOM event: %w", err))
}
}
diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD
new file mode 100644
index 000000000..01716034c
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/BUILD
@@ -0,0 +1,20 @@
+load("//tools:defs.bzl", "go_library", "proto_library")
+
+package(licenses = ["notice"])
+
+proto_library(
+ name = "api",
+ srcs = [
+ "runtimeoptions.proto",
+ ],
+)
+
+go_library(
+ name = "runtimeoptions",
+ srcs = ["runtimeoptions.go"],
+ visibility = ["//pkg/shim/v2:__pkg__"],
+ deps = [
+ "//pkg/shim/v2/runtimeoptions:api_go_proto",
+ "@com_github_gogo_protobuf//proto:go_default_library",
+ ],
+)
diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go
new file mode 100644
index 000000000..1c1a0c5d1
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go
@@ -0,0 +1,27 @@
+// Copyright 2018 The containerd Authors.
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package runtimeoptions
+
+import (
+ proto "github.com/gogo/protobuf/proto"
+ pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto"
+)
+
+type Options = pb.Options
+
+func init() {
+ proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options")
+}
diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto
new file mode 100644
index 000000000..edb19020a
--- /dev/null
+++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto
@@ -0,0 +1,25 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package runtimeoptions;
+
+// This is a version of the runtimeoptions CRI API that is vendored.
+//
+// Imported the full CRI package is a nightmare.
+message Options {
+ string type_url = 1;
+ string config_path = 2;
+}
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go
index c67b1beba..1534152fc 100644
--- a/pkg/shim/v2/service.go
+++ b/pkg/shim/v2/service.go
@@ -27,34 +27,34 @@ import (
"github.com/BurntSushi/toml"
"github.com/containerd/cgroups"
- metrics "github.com/containerd/cgroups/stats/v1"
"github.com/containerd/console"
- eventstypes "github.com/containerd/containerd/api/events"
+ "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
+ "github.com/containerd/containerd/pkg/process"
+ "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
- rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
- runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1"
+ "github.com/containerd/containerd/sys/reaper"
"github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
+ "github.com/gogo/protobuf/types"
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/shim/runsc"
"gvisor.dev/gvisor/pkg/shim/v1/proc"
"gvisor.dev/gvisor/pkg/shim/v1/utils"
"gvisor.dev/gvisor/pkg/shim/v2/options"
+ "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions"
"gvisor.dev/gvisor/runsc/specutils"
)
var (
- empty = &ptypes.Empty{}
+ empty = &types.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
@@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{})
const configFile = "config.toml"
// New returns a new shim service that can be used via GRPC.
-func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
+func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) {
ep, err := newOOMEpoller(publisher)
if err != nil {
return nil, err
}
- ctx, cancel := context.WithCancel(ctx)
go ep.run(ctx)
s := &service{
id: id,
context: ctx,
- processes: make(map[string]rproc.Process),
+ processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
oomPoller: ep,
cancel: cancel,
}
go s.processExits()
- runsc.Monitor = shim.Default
+ runsc.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
cancel()
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
@@ -101,10 +100,10 @@ type service struct {
mu sync.Mutex
context context.Context
- task rproc.Process
- processes map[string]rproc.Process
+ task process.Process
+ processes map[string]process.Process
events chan interface{}
- platform rproc.Platform
+ platform stdio.Platform
opts options.Options
ec chan proc.Exit
oomPoller *epoller
@@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string)
return cmd, nil
}
-func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
+func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, containerdBinary, containerdAddress)
if err != nil {
return "", err
@@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
break
}
if o.TypeUrl != options.OptionType {
- return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl)
+ return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl)
}
path = o.ConfigPath
default:
@@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
// Exec spawns an additional process inside the container.
-func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
+func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) {
s.mu.Lock()
p := s.processes[r.ExecID]
s.mu.Unlock()
@@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}
// ResizePty resizes the terminal of a process.
-func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
+func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
}
// Pause the container.
-func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
+func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Resume the container.
-func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
+func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
// Kill a process with the provided signal.
-func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
+func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
}
// CloseIO closes the I/O context of a process.
-func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
+func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
}
// Checkpoint checkpoints the container.
-func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
}, nil
}
-func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
+func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) {
s.cancel()
os.Exit(0)
return empty, nil
@@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
// gvisor currently (as of 2020-03-03) only returns the total memory
// usage and current PID value[0]. However, we copy the common fields here
// so that future updates will propagate correct information. We're
- // using the metrics.Metrics structure so we're returning the same type
+ // using the cgroups.Metrics structure so we're returning the same type
// as runc.
//
// [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81
- data, err := typeurl.MarshalAny(&metrics.Metrics{
- CPU: &metrics.CPUStat{
- Usage: &metrics.CPUUsage{
+ data, err := typeurl.MarshalAny(&cgroups.Metrics{
+ CPU: &cgroups.CPUStat{
+ Usage: &cgroups.CPUUsage{
Total: stats.Cpu.Usage.Total,
Kernel: stats.Cpu.Usage.Kernel,
User: stats.Cpu.Usage.User,
PerCPU: stats.Cpu.Usage.Percpu,
},
- Throttling: &metrics.Throttle{
+ Throttling: &cgroups.Throttle{
Periods: stats.Cpu.Throttling.Periods,
ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods,
ThrottledTime: stats.Cpu.Throttling.ThrottledTime,
},
},
- Memory: &metrics.MemoryStat{
+ Memory: &cgroups.MemoryStat{
Cache: stats.Memory.Cache,
- Usage: &metrics.MemoryEntry{
+ Usage: &cgroups.MemoryEntry{
Limit: stats.Memory.Usage.Limit,
Usage: stats.Memory.Usage.Usage,
Max: stats.Memory.Usage.Max,
Failcnt: stats.Memory.Usage.Failcnt,
},
- Swap: &metrics.MemoryEntry{
+ Swap: &cgroups.MemoryEntry{
Limit: stats.Memory.Swap.Limit,
Usage: stats.Memory.Swap.Usage,
Max: stats.Memory.Swap.Max,
Failcnt: stats.Memory.Swap.Failcnt,
},
- Kernel: &metrics.MemoryEntry{
+ Kernel: &cgroups.MemoryEntry{
Limit: stats.Memory.Kernel.Limit,
Usage: stats.Memory.Kernel.Usage,
Max: stats.Memory.Kernel.Max,
Failcnt: stats.Memory.Kernel.Failcnt,
},
- KernelTCP: &metrics.MemoryEntry{
+ KernelTCP: &cgroups.MemoryEntry{
Limit: stats.Memory.KernelTCP.Limit,
Usage: stats.Memory.KernelTCP.Usage,
Max: stats.Memory.KernelTCP.Max,
Failcnt: stats.Memory.KernelTCP.Failcnt,
},
},
- Pids: &metrics.PidsStat{
+ Pids: &cgroups.PidsStat{
Current: stats.Pids.Current,
Limit: stats.Pids.Limit,
},
@@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}
// Update updates a running container.
-func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
+func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) {
return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented)
}
@@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
p.SetExited(e.Status)
- s.events <- &eventstypes.TaskExit{
+ s.events <- &events.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(p.Pid()),
@@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) {
}
}
-func (s *service) allProcesses() (o []rproc.Process) {
+func (s *service) allProcesses() (o []process.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
@@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil
}
-func (s *service) forward(publisher events.Publisher) {
+func (s *service) forward(publisher shim.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
cancel()
if err != nil {
- fmt.Errorf("post event: %w", err)
+ // Should not happen.
+ panic(fmt.Errorf("post event: %w", err))
}
}
}
-func (s *service) getProcess(execID string) (rproc.Process, error) {
+func (s *service) getProcess(execID string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
if execID == "" {
@@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
func getTopic(e interface{}) string {
switch e.(type) {
- case *eventstypes.TaskCreate:
+ case *events.TaskCreate:
return runtime.TaskCreateEventTopic
- case *eventstypes.TaskStart:
+ case *events.TaskStart:
return runtime.TaskStartEventTopic
- case *eventstypes.TaskOOM:
+ case *events.TaskOOM:
return runtime.TaskOOMEventTopic
- case *eventstypes.TaskExit:
+ case *events.TaskExit:
return runtime.TaskExitEventTopic
- case *eventstypes.TaskDelete:
+ case *events.TaskDelete:
return runtime.TaskDeleteEventTopic
- case *eventstypes.TaskExecAdded:
+ case *events.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
- case *eventstypes.TaskExecStarted:
+ case *events.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
default:
log.L.Printf("no topic for type %#v", e)
@@ -795,7 +795,7 @@ func getTopic(e interface{}) string {
return runtime.TaskUnknownTopic
}
-func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
+func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
spec, err := utils.ReadSpec(r.Bundle)
if err != nil {
return nil, fmt.Errorf("read oci spec: %w", err)
@@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
}
runsc.FormatLogPath(r.ID, options.RunscConfig)
runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig)
- p := proc.New(r.ID, runtime, rproc.Stdio{
+ p := proc.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
p.IoGID = int(options.IoGid)
p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox
p.UserLog = utils.UserLogPath(spec)
- p.Monitor = shim.Default
+ p.Monitor = reaper.Default
return p, nil
}
diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go
index 257c58812..1800ab90b 100644
--- a/pkg/shim/v2/service_linux.go
+++ b/pkg/shim/v2/service_linux.go
@@ -32,7 +32,7 @@ type linuxPlatform struct {
epoller *console.Epoller
}
-func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
+func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, fmt.Errorf("uninitialized epoller")
}
@@ -47,9 +47,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
if err != nil {
return nil, err
}
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
@@ -65,9 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
return nil, err
}
wg.Add(1)
- cwg.Add(1)
go func() {
- cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
diff --git a/pkg/sleep/BUILD b/pkg/sleep/BUILD
index e131455f7..ae0fe1522 100644
--- a/pkg/sleep/BUILD
+++ b/pkg/sleep/BUILD
@@ -12,6 +12,7 @@ go_library(
"sleep_unsafe.go",
],
visibility = ["//:sandbox"],
+ deps = ["//pkg/sync"],
)
go_test(
diff --git a/pkg/sleep/sleep_test.go b/pkg/sleep/sleep_test.go
index af47e2ba1..1dd11707d 100644
--- a/pkg/sleep/sleep_test.go
+++ b/pkg/sleep/sleep_test.go
@@ -379,10 +379,7 @@ func TestRace(t *testing.T) {
// TestRaceInOrder tests that multiple wakers can continuously send wake requests to
// the sleeper and that the wakers are retrieved in the order asserted.
func TestRaceInOrder(t *testing.T) {
- const wakers = 100
- const wakeRequests = 10000
-
- w := make([]Waker, wakers)
+ w := make([]Waker, 10000)
s := Sleeper{}
// Associate each waker and start goroutines that will assert them.
@@ -390,19 +387,16 @@ func TestRaceInOrder(t *testing.T) {
s.AddWaker(&w[i], i)
}
go func() {
- n := 0
- for n < wakeRequests {
- wk := w[n%len(w)]
- wk.Assert()
- n++
+ for i := range w {
+ w[i].Assert()
}
}()
// Wait for all wake up notifications from all wakers.
- for i := 0; i < wakeRequests; i++ {
- v, _ := s.Fetch(true)
- if got, want := v, i%wakers; got != want {
- t.Fatalf("got %d want %d", got, want)
+ for want := range w {
+ got, _ := s.Fetch(true)
+ if got != want {
+ t.Fatalf("got %d want %d", got, want)
}
}
}
diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go
index f68c12620..118805492 100644
--- a/pkg/sleep/sleep_unsafe.go
+++ b/pkg/sleep/sleep_unsafe.go
@@ -75,6 +75,8 @@ package sleep
import (
"sync/atomic"
"unsafe"
+
+ "gvisor.dev/gvisor/pkg/sync"
)
const (
@@ -323,7 +325,12 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
//
// This struct is thread-safe, that is, its methods can be called concurrently
// by multiple goroutines.
+//
+// Note, it is not safe to copy a Waker as its fields are modified by value
+// (the pointer fields are individually modified with atomic operations).
type Waker struct {
+ _ sync.NoCopy
+
// s is the sleeper that this waker can wake up. Only one sleeper at a
// time is allowed. This field can have three classes of values:
// nil -- the waker is not asserted: it either is not associated with
diff --git a/pkg/sync/BUILD b/pkg/sync/BUILD
index d0d77e19c..4d47207f7 100644
--- a/pkg/sync/BUILD
+++ b/pkg/sync/BUILD
@@ -33,6 +33,7 @@ go_library(
"aliases.go",
"memmove_unsafe.go",
"mutex_unsafe.go",
+ "nocopy.go",
"norace_unsafe.go",
"race_unsafe.go",
"rwmutex_unsafe.go",
diff --git a/pkg/sync/nocopy.go b/pkg/sync/nocopy.go
new file mode 100644
index 000000000..722b29501
--- /dev/null
+++ b/pkg/sync/nocopy.go
@@ -0,0 +1,28 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sync
+
+// NoCopy may be embedded into structs which must not be copied
+// after the first use.
+//
+// See https://golang.org/issues/8005#issuecomment-190753527
+// for details.
+type NoCopy struct{}
+
+// Lock is a no-op used by -copylocks checker from `go vet`.
+func (*NoCopy) Lock() {}
+
+// Unlock is a no-op used by -copylocks checker from `go vet`.
+func (*NoCopy) Unlock() {}
diff --git a/pkg/tcpip/header/BUILD b/pkg/tcpip/header/BUILD
index 0cde694dc..d87797617 100644
--- a/pkg/tcpip/header/BUILD
+++ b/pkg/tcpip/header/BUILD
@@ -48,7 +48,7 @@ go_test(
"//pkg/rand",
"//pkg/tcpip",
"//pkg/tcpip/buffer",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
@@ -64,6 +64,6 @@ go_test(
deps = [
"//pkg/tcpip",
"//pkg/tcpip/buffer",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go
index eaee7e5d7..4bad930c7 100644
--- a/pkg/tcpip/link/fdbased/endpoint_test.go
+++ b/pkg/tcpip/link/fdbased/endpoint_test.go
@@ -500,3 +500,76 @@ func TestRecvMMsgDispatcherCapLength(t *testing.T) {
}
}
+
+// fakeNetworkDispatcher delivers packets to pkts.
+type fakeNetworkDispatcher struct {
+ pkts []*stack.PacketBuffer
+}
+
+func (d *fakeNetworkDispatcher) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
+ d.pkts = append(d.pkts, pkt)
+}
+
+func TestDispatchPacketFormat(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ newDispatcher func(fd int, e *endpoint) (linkDispatcher, error)
+ }{
+ {
+ name: "readVDispatcher",
+ newDispatcher: newReadVDispatcher,
+ },
+ {
+ name: "recvMMsgDispatcher",
+ newDispatcher: newRecvMMsgDispatcher,
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ // Create a socket pair to send/recv.
+ fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_DGRAM, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer syscall.Close(fds[0])
+ defer syscall.Close(fds[1])
+
+ data := []byte{
+ // Ethernet header.
+ 1, 2, 3, 4, 5, 60,
+ 1, 2, 3, 4, 5, 61,
+ 8, 0,
+ // Mock network header.
+ 40, 41, 42, 43,
+ }
+ err = syscall.Sendmsg(fds[1], data, nil, nil, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Create and run dispatcher once.
+ sink := &fakeNetworkDispatcher{}
+ d, err := test.newDispatcher(fds[0], &endpoint{
+ hdrSize: header.EthernetMinimumSize,
+ dispatcher: sink,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if ok, err := d.dispatch(); !ok || err != nil {
+ t.Fatalf("d.dispatch() = %v, %v", ok, err)
+ }
+
+ // Verify packet.
+ if got, want := len(sink.pkts), 1; got != want {
+ t.Fatalf("len(sink.pkts) = %d, want %d", got, want)
+ }
+ pkt := sink.pkts[0]
+ if got, want := len(pkt.LinkHeader), header.EthernetMinimumSize; got != want {
+ t.Errorf("len(pkt.LinkHeader) = %d, want %d", got, want)
+ }
+ if got, want := pkt.Data.Size(), 4; got != want {
+ t.Errorf("pkt.Data.Size() = %d, want %d", got, want)
+ }
+ })
+ }
+}
diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go
index f04738cfb..d8f2504b3 100644
--- a/pkg/tcpip/link/fdbased/packet_dispatchers.go
+++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go
@@ -278,7 +278,7 @@ func (d *recvMMsgDispatcher) dispatch() (bool, *tcpip.Error) {
eth header.Ethernet
)
if d.e.hdrSize > 0 {
- eth = header.Ethernet(d.views[k][0])
+ eth = header.Ethernet(d.views[k][0][:header.EthernetMinimumSize])
p = eth.Type()
remote = eth.SourceAddress()
local = eth.DestinationAddress()
diff --git a/pkg/tcpip/network/ipv4/BUILD b/pkg/tcpip/network/ipv4/BUILD
index 78420d6e6..d142b4ffa 100644
--- a/pkg/tcpip/network/ipv4/BUILD
+++ b/pkg/tcpip/network/ipv4/BUILD
@@ -34,6 +34,6 @@ go_test(
"//pkg/tcpip/transport/tcp",
"//pkg/tcpip/transport/udp",
"//pkg/waiter",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/tcpip/network/ipv6/BUILD b/pkg/tcpip/network/ipv6/BUILD
index 3f71fc520..feada63dc 100644
--- a/pkg/tcpip/network/ipv6/BUILD
+++ b/pkg/tcpip/network/ipv6/BUILD
@@ -39,6 +39,6 @@ go_test(
"//pkg/tcpip/transport/icmp",
"//pkg/tcpip/transport/udp",
"//pkg/waiter",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/tcpip/stack/BUILD b/pkg/tcpip/stack/BUILD
index e65c731c2..6b9a6b316 100644
--- a/pkg/tcpip/stack/BUILD
+++ b/pkg/tcpip/stack/BUILD
@@ -27,6 +27,18 @@ go_template_instance(
},
)
+go_template_instance(
+ name = "tuple_list",
+ out = "tuple_list.go",
+ package = "stack",
+ prefix = "tuple",
+ template = "//pkg/ilist:generic_list",
+ types = {
+ "Element": "*tuple",
+ "Linker": "*tuple",
+ },
+)
+
go_library(
name = "stack",
srcs = [
@@ -35,6 +47,7 @@ go_library(
"forwarder.go",
"icmp_rate_limit.go",
"iptables.go",
+ "iptables_state.go",
"iptables_targets.go",
"iptables_types.go",
"linkaddrcache.go",
@@ -50,6 +63,7 @@ go_library(
"stack_global_state.go",
"stack_options.go",
"transport_demuxer.go",
+ "tuple_list.go",
],
visibility = ["//visibility:public"],
deps = [
@@ -95,7 +109,7 @@ go_test(
"//pkg/tcpip/transport/icmp",
"//pkg/tcpip/transport/udp",
"//pkg/waiter",
- "@com_github_google_go-cmp//cmp:go_default_library",
+ "@com_github_google_go_cmp//cmp:go_default_library",
],
)
diff --git a/pkg/tcpip/stack/conntrack.go b/pkg/tcpip/stack/conntrack.go
index af9c325ca..d39baf620 100644
--- a/pkg/tcpip/stack/conntrack.go
+++ b/pkg/tcpip/stack/conntrack.go
@@ -15,9 +15,12 @@
package stack
import (
+ "encoding/binary"
"sync"
+ "time"
"gvisor.dev/gvisor/pkg/tcpip"
+ "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins"
"gvisor.dev/gvisor/pkg/tcpip/header"
"gvisor.dev/gvisor/pkg/tcpip/transport/tcpconntrack"
)
@@ -30,6 +33,10 @@ import (
//
// Currently, only TCP tracking is supported.
+// Our hash table has 16K buckets.
+// TODO(gvisor.dev/issue/170): These should be tunable.
+const numBuckets = 1 << 14
+
// Direction of the tuple.
type direction int
@@ -48,7 +55,12 @@ const (
// tuple holds a connection's identifying and manipulating data in one
// direction. It is immutable.
+//
+// +stateify savable
type tuple struct {
+ // tupleEntry is used to build an intrusive list of tuples.
+ tupleEntry
+
tupleID
// conn is the connection tracking entry this tuple belongs to.
@@ -61,6 +73,8 @@ type tuple struct {
// tupleID uniquely identifies a connection in one direction. It currently
// contains enough information to distinguish between any TCP or UDP
// connection, and will need to be extended to support other protocols.
+//
+// +stateify savable
type tupleID struct {
srcAddr tcpip.Address
srcPort uint16
@@ -83,6 +97,8 @@ func (ti tupleID) reply() tupleID {
}
// conn is a tracked connection.
+//
+// +stateify savable
type conn struct {
// original is the tuple in original direction. It is immutable.
original tuple
@@ -98,22 +114,67 @@ type conn struct {
tcbHook Hook
// mu protects tcb.
- mu sync.Mutex
+ mu sync.Mutex `state:"nosave"`
// tcb is TCB control block. It is used to keep track of states
// of tcp connection and is protected by mu.
tcb tcpconntrack.TCB
+
+ // lastUsed is the last time the connection saw a relevant packet, and
+ // is updated by each packet on the connection. It is protected by mu.
+ lastUsed time.Time `state:".(unixTime)"`
+}
+
+// timedOut returns whether the connection timed out based on its state.
+func (cn *conn) timedOut(now time.Time) bool {
+ const establishedTimeout = 5 * 24 * time.Hour
+ const defaultTimeout = 120 * time.Second
+ cn.mu.Lock()
+ defer cn.mu.Unlock()
+ if cn.tcb.State() == tcpconntrack.ResultAlive {
+ // Use the same default as Linux, which doesn't delete
+ // established connections for 5(!) days.
+ return now.Sub(cn.lastUsed) > establishedTimeout
+ }
+ // Use the same default as Linux, which lets connections in most states
+ // other than established remain for <= 120 seconds.
+ return now.Sub(cn.lastUsed) > defaultTimeout
}
// ConnTrack tracks all connections created for NAT rules. Most users are
// expected to only call handlePacket and createConnFor.
+//
+// ConnTrack keeps all connections in a slice of buckets, each of which holds a
+// linked list of tuples. This gives us some desirable properties:
+// - Each bucket has its own lock, lessening lock contention.
+// - The slice is large enough that lists stay short (<10 elements on average).
+// Thus traversal is fast.
+// - During linked list traversal we reap expired connections. This amortizes
+// the cost of reaping them and makes reapUnused faster.
+//
+// Locks are ordered by their location in the buckets slice. That is, a
+// goroutine that locks buckets[i] can only lock buckets[j] s.t. i < j.
+//
+// +stateify savable
type ConnTrack struct {
- // mu protects conns.
- mu sync.RWMutex
+ // seed is a one-time random value initialized at stack startup
+ // and is used in the calculation of hash keys for the list of buckets.
+ // It is immutable.
+ seed uint32
- // conns maintains a map of tuples needed for connection tracking for
- // iptables NAT rules. It is protected by mu.
- conns map[tupleID]tuple
+ // mu protects the buckets slice, but not buckets' contents. Only take
+ // the write lock if you are modifying the slice or saving for S/R.
+ mu sync.RWMutex `state:"nosave"`
+
+ // buckets is protected by mu.
+ buckets []bucket
+}
+
+// +stateify savable
+type bucket struct {
+ // mu protects tuples.
+ mu sync.Mutex `state:"nosave"`
+ tuples tupleList
}
// packetToTupleID converts packet to a tuple ID. It fails when pkt lacks a valid
@@ -143,8 +204,9 @@ func packetToTupleID(pkt *PacketBuffer) (tupleID, *tcpip.Error) {
// newConn creates new connection.
func newConn(orig, reply tupleID, manip manipType, hook Hook) *conn {
conn := conn{
- manip: manip,
- tcbHook: hook,
+ manip: manip,
+ tcbHook: hook,
+ lastUsed: time.Now(),
}
conn.original = tuple{conn: &conn, tupleID: orig}
conn.reply = tuple{conn: &conn, tupleID: reply, direction: dirReply}
@@ -162,14 +224,28 @@ func (ct *ConnTrack) connFor(pkt *PacketBuffer) (*conn, direction) {
return nil, dirOriginal
}
- ct.mu.Lock()
- defer ct.mu.Unlock()
-
- tuple, ok := ct.conns[tid]
- if !ok {
- return nil, dirOriginal
+ bucket := ct.bucket(tid)
+ now := time.Now()
+
+ ct.mu.RLock()
+ defer ct.mu.RUnlock()
+ ct.buckets[bucket].mu.Lock()
+ defer ct.buckets[bucket].mu.Unlock()
+
+ // Iterate over the tuples in a bucket, cleaning up any unused
+ // connections we find.
+ for other := ct.buckets[bucket].tuples.Front(); other != nil; other = other.Next() {
+ // Clean up any timed-out connections we happen to find.
+ if ct.reapTupleLocked(other, bucket, now) {
+ // The tuple expired.
+ continue
+ }
+ if tid == other.tupleID {
+ return other.conn, other.direction
+ }
}
- return tuple.conn, tuple.direction
+
+ return nil, dirOriginal
}
// createConnFor creates a new conn for pkt.
@@ -197,13 +273,31 @@ func (ct *ConnTrack) createConnFor(pkt *PacketBuffer, hook Hook, rt RedirectTarg
}
conn := newConn(tid, replyTID, manip, hook)
- // Add the changed tuple to the map.
- // TODO(gvisor.dev/issue/170): Need to support collisions using linked
- // list.
- ct.mu.Lock()
- defer ct.mu.Unlock()
- ct.conns[tid] = conn.original
- ct.conns[replyTID] = conn.reply
+ // Lock the buckets in the correct order.
+ tupleBucket := ct.bucket(tid)
+ replyBucket := ct.bucket(replyTID)
+ ct.mu.RLock()
+ defer ct.mu.RUnlock()
+ if tupleBucket < replyBucket {
+ ct.buckets[tupleBucket].mu.Lock()
+ ct.buckets[replyBucket].mu.Lock()
+ } else if tupleBucket > replyBucket {
+ ct.buckets[replyBucket].mu.Lock()
+ ct.buckets[tupleBucket].mu.Lock()
+ } else {
+ // Both tuples are in the same bucket.
+ ct.buckets[tupleBucket].mu.Lock()
+ }
+
+ // Add the tuple to the map.
+ ct.buckets[tupleBucket].tuples.PushFront(&conn.original)
+ ct.buckets[replyBucket].tuples.PushFront(&conn.reply)
+
+ // Unlocking can happen in any order.
+ ct.buckets[tupleBucket].mu.Unlock()
+ if tupleBucket != replyBucket {
+ ct.buckets[replyBucket].mu.Unlock()
+ }
return conn
}
@@ -297,35 +391,134 @@ func (ct *ConnTrack) handlePacket(pkt *PacketBuffer, hook Hook, gso *GSO, r *Rou
// other tcp states.
conn.mu.Lock()
defer conn.mu.Unlock()
- var st tcpconntrack.Result
- tcpHeader := header.TCP(pkt.TransportHeader)
- if conn.tcb.IsEmpty() {
+
+ // Mark the connection as having been used recently so it isn't reaped.
+ conn.lastUsed = time.Now()
+ // Update connection state.
+ if tcpHeader := header.TCP(pkt.TransportHeader); conn.tcb.IsEmpty() {
conn.tcb.Init(tcpHeader)
conn.tcbHook = hook
+ } else if hook == conn.tcbHook {
+ conn.tcb.UpdateStateOutbound(tcpHeader)
} else {
- switch hook {
- case conn.tcbHook:
- st = conn.tcb.UpdateStateOutbound(tcpHeader)
- default:
- st = conn.tcb.UpdateStateInbound(tcpHeader)
- }
+ conn.tcb.UpdateStateInbound(tcpHeader)
}
+}
+
+// bucket gets the conntrack bucket for a tupleID.
+func (ct *ConnTrack) bucket(id tupleID) int {
+ h := jenkins.Sum32(ct.seed)
+ h.Write([]byte(id.srcAddr))
+ h.Write([]byte(id.dstAddr))
+ shortBuf := make([]byte, 2)
+ binary.LittleEndian.PutUint16(shortBuf, id.srcPort)
+ h.Write([]byte(shortBuf))
+ binary.LittleEndian.PutUint16(shortBuf, id.dstPort)
+ h.Write([]byte(shortBuf))
+ binary.LittleEndian.PutUint16(shortBuf, uint16(id.transProto))
+ h.Write([]byte(shortBuf))
+ binary.LittleEndian.PutUint16(shortBuf, uint16(id.netProto))
+ h.Write([]byte(shortBuf))
+ ct.mu.RLock()
+ defer ct.mu.RUnlock()
+ return int(h.Sum32()) % len(ct.buckets)
+}
- // Delete conn if tcp connection is closed.
- if st == tcpconntrack.ResultClosedByPeer || st == tcpconntrack.ResultClosedBySelf || st == tcpconntrack.ResultReset {
- ct.deleteConn(conn)
+// reapUnused deletes timed out entries from the conntrack map. The rules for
+// reaping are:
+// - Most reaping occurs in connFor, which is called on each packet. connFor
+// cleans up the bucket the packet's connection maps to. Thus calls to
+// reapUnused should be fast.
+// - Each call to reapUnused traverses a fraction of the conntrack table.
+// Specifically, it traverses len(ct.buckets)/fractionPerReaping.
+// - After reaping, reapUnused decides when it should next run based on the
+// ratio of expired connections to examined connections. If the ratio is
+// greater than maxExpiredPct, it schedules the next run quickly. Otherwise it
+// slightly increases the interval between runs.
+// - maxFullTraversal caps the time it takes to traverse the entire table.
+//
+// reapUnused returns the next bucket that should be checked and the time after
+// which it should be called again.
+func (ct *ConnTrack) reapUnused(start int, prevInterval time.Duration) (int, time.Duration) {
+ // TODO(gvisor.dev/issue/170): This can be more finely controlled, as
+ // it is in Linux via sysctl.
+ const fractionPerReaping = 128
+ const maxExpiredPct = 50
+ const maxFullTraversal = 60 * time.Second
+ const minInterval = 10 * time.Millisecond
+ const maxInterval = maxFullTraversal / fractionPerReaping
+
+ now := time.Now()
+ checked := 0
+ expired := 0
+ var idx int
+ ct.mu.RLock()
+ defer ct.mu.RUnlock()
+ for i := 0; i < len(ct.buckets)/fractionPerReaping; i++ {
+ idx = (i + start) % len(ct.buckets)
+ ct.buckets[idx].mu.Lock()
+ for tuple := ct.buckets[idx].tuples.Front(); tuple != nil; tuple = tuple.Next() {
+ checked++
+ if ct.reapTupleLocked(tuple, idx, now) {
+ expired++
+ }
+ }
+ ct.buckets[idx].mu.Unlock()
+ }
+ // We already checked buckets[idx].
+ idx++
+
+ // If half or more of the connections are expired, the table has gotten
+ // stale. Reschedule quickly.
+ expiredPct := 0
+ if checked != 0 {
+ expiredPct = expired * 100 / checked
+ }
+ if expiredPct > maxExpiredPct {
+ return idx, minInterval
+ }
+ if interval := prevInterval + minInterval; interval <= maxInterval {
+ // Increment the interval between runs.
+ return idx, interval
}
+ // We've hit the maximum interval.
+ return idx, maxInterval
}
-// deleteConn deletes the connection.
-func (ct *ConnTrack) deleteConn(conn *conn) {
- if conn == nil {
- return
+// reapTupleLocked tries to remove tuple and its reply from the table. It
+// returns whether the tuple's connection has timed out.
+//
+// Preconditions: ct.mu is locked for reading and bucket is locked.
+func (ct *ConnTrack) reapTupleLocked(tuple *tuple, bucket int, now time.Time) bool {
+ if !tuple.conn.timedOut(now) {
+ return false
}
- ct.mu.Lock()
- defer ct.mu.Unlock()
+ // To maintain lock order, we can only reap these tuples if the reply
+ // appears later in the table.
+ replyBucket := ct.bucket(tuple.reply())
+ if bucket > replyBucket {
+ return true
+ }
+
+ // Don't re-lock if both tuples are in the same bucket.
+ differentBuckets := bucket != replyBucket
+ if differentBuckets {
+ ct.buckets[replyBucket].mu.Lock()
+ }
+
+ // We have the buckets locked and can remove both tuples.
+ if tuple.direction == dirOriginal {
+ ct.buckets[replyBucket].tuples.Remove(&tuple.conn.reply)
+ } else {
+ ct.buckets[replyBucket].tuples.Remove(&tuple.conn.original)
+ }
+ ct.buckets[bucket].tuples.Remove(tuple)
+
+ // Don't re-unlock if both tuples are in the same bucket.
+ if differentBuckets {
+ ct.buckets[replyBucket].mu.Unlock()
+ }
- delete(ct.conns, conn.original.tupleID)
- delete(ct.conns, conn.reply.tupleID)
+ return true
}
diff --git a/pkg/tcpip/stack/iptables.go b/pkg/tcpip/stack/iptables.go
index 974d77c36..f846ea2e5 100644
--- a/pkg/tcpip/stack/iptables.go
+++ b/pkg/tcpip/stack/iptables.go
@@ -16,6 +16,7 @@ package stack
import (
"fmt"
+ "time"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/header"
@@ -41,6 +42,9 @@ const (
// underflow.
const HookUnset = -1
+// reaperDelay is how long to wait before starting to reap connections.
+const reaperDelay = 5 * time.Second
+
// DefaultTables returns a default set of tables. Each chain is set to accept
// all packets.
func DefaultTables() *IPTables {
@@ -112,8 +116,9 @@ func DefaultTables() *IPTables {
Output: []string{TablenameMangle, TablenameNat, TablenameFilter},
},
connections: ConnTrack{
- conns: make(map[tupleID]tuple),
+ seed: generateRandUint32(),
},
+ reaperDone: make(chan struct{}, 1),
}
}
@@ -169,6 +174,12 @@ func (it *IPTables) GetTable(name string) (Table, bool) {
func (it *IPTables) ReplaceTable(name string, table Table) {
it.mu.Lock()
defer it.mu.Unlock()
+ // If iptables is being enabled, initialize the conntrack table and
+ // reaper.
+ if !it.modified {
+ it.connections.buckets = make([]bucket, numBuckets)
+ it.startReaper(reaperDelay)
+ }
it.modified = true
it.tables[name] = table
}
@@ -249,6 +260,35 @@ func (it *IPTables) Check(hook Hook, pkt *PacketBuffer, gso *GSO, r *Route, addr
return true
}
+// beforeSave is invoked by stateify.
+func (it *IPTables) beforeSave() {
+ // Ensure the reaper exits cleanly.
+ it.reaperDone <- struct{}{}
+ // Prevent others from modifying the connection table.
+ it.connections.mu.Lock()
+}
+
+// afterLoad is invoked by stateify.
+func (it *IPTables) afterLoad() {
+ it.startReaper(reaperDelay)
+}
+
+// startReaper starts a goroutine that wakes up periodically to reap timed out
+// connections.
+func (it *IPTables) startReaper(interval time.Duration) {
+ go func() { // S/R-SAFE: reaperDone is signalled when iptables is saved.
+ bucket := 0
+ for {
+ select {
+ case <-it.reaperDone:
+ return
+ case <-time.After(interval):
+ bucket, interval = it.connections.reapUnused(bucket, interval)
+ }
+ }
+ }()
+}
+
// CheckPackets runs pkts through the rules for hook and returns a map of packets that
// should not go forward.
//
diff --git a/pkg/tcpip/stack/iptables_state.go b/pkg/tcpip/stack/iptables_state.go
new file mode 100644
index 000000000..529e02a07
--- /dev/null
+++ b/pkg/tcpip/stack/iptables_state.go
@@ -0,0 +1,40 @@
+// Copyright 2020 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package stack
+
+import (
+ "time"
+)
+
+// +stateify savable
+type unixTime struct {
+ second int64
+ nano int64
+}
+
+// saveLastUsed is invoked by stateify.
+func (cn *conn) saveLastUsed() unixTime {
+ return unixTime{cn.lastUsed.Unix(), cn.lastUsed.UnixNano()}
+}
+
+// loadLastUsed is invoked by stateify.
+func (cn *conn) loadLastUsed(unix unixTime) {
+ cn.lastUsed = time.Unix(unix.second, unix.nano)
+}
+
+// beforeSave is invoked by stateify.
+func (ct *ConnTrack) beforeSave() {
+ ct.mu.Lock()
+}
diff --git a/pkg/tcpip/stack/iptables_types.go b/pkg/tcpip/stack/iptables_types.go
index c528ec381..eb70e3104 100644
--- a/pkg/tcpip/stack/iptables_types.go
+++ b/pkg/tcpip/stack/iptables_types.go
@@ -78,6 +78,8 @@ const (
)
// IPTables holds all the tables for a netstack.
+//
+// +stateify savable
type IPTables struct {
// mu protects tables, priorities, and modified.
mu sync.RWMutex
@@ -97,10 +99,15 @@ type IPTables struct {
modified bool
connections ConnTrack
+
+ // reaperDone can be signalled to stop the reaper goroutine.
+ reaperDone chan struct{}
}
// A Table defines a set of chains and hooks into the network stack. It is
// really just a list of rules.
+//
+// +stateify savable
type Table struct {
// Rules holds the rules that make up the table.
Rules []Rule
@@ -130,6 +137,8 @@ func (table *Table) ValidHooks() uint32 {
// contains zero or more matchers, each of which is a specification of which
// packets this rule applies to. If there are no matchers in the rule, it
// applies to any packet.
+//
+// +stateify savable
type Rule struct {
// Filter holds basic IP filtering fields common to every rule.
Filter IPHeaderFilter
@@ -142,6 +151,8 @@ type Rule struct {
}
// IPHeaderFilter holds basic IP filtering data common to every rule.
+//
+// +stateify savable
type IPHeaderFilter struct {
// Protocol matches the transport protocol.
Protocol tcpip.TransportProtocolNumber
diff --git a/pkg/tcpip/stack/packet_buffer.go b/pkg/tcpip/stack/packet_buffer.go
index 1b5da6017..e3556d5d2 100644
--- a/pkg/tcpip/stack/packet_buffer.go
+++ b/pkg/tcpip/stack/packet_buffer.go
@@ -14,6 +14,7 @@
package stack
import (
+ "gvisor.dev/gvisor/pkg/sync"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
)
@@ -24,7 +25,7 @@ import (
// multiple endpoints. Clone() should be called in such cases so that
// modifications to the Data field do not affect other copies.
type PacketBuffer struct {
- _ noCopy
+ _ sync.NoCopy
// PacketBufferEntry is used to build an intrusive list of
// PacketBuffers.
@@ -102,14 +103,3 @@ func (pk *PacketBuffer) Clone() *PacketBuffer {
NatDone: pk.NatDone,
}
}
-
-// noCopy may be embedded into structs which must not be copied
-// after the first use.
-//
-// See https://golang.org/issues/8005#issuecomment-190753527
-// for details.
-type noCopy struct{}
-
-// Lock is a no-op used by -copylocks checker from `go vet`.
-func (*noCopy) Lock() {}
-func (*noCopy) Unlock() {}
diff --git a/pkg/tcpip/stack/stack.go b/pkg/tcpip/stack/stack.go
index cdcfb8321..0aa815447 100644
--- a/pkg/tcpip/stack/stack.go
+++ b/pkg/tcpip/stack/stack.go
@@ -425,6 +425,7 @@ type Stack struct {
handleLocal bool
// tables are the iptables packet filtering and manipulation rules.
+ // TODO(gvisor.dev/issue/170): S/R this field.
tables *IPTables
// resumableEndpoints is a list of endpoints that need to be resumed if the
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index 25534a10d..71bcee785 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -782,7 +782,7 @@ type CongestionControlOption string
// control algorithms.
type AvailableCongestionControlOption string
-// buffer moderation.
+// ModerateReceiveBufferOption is used by buffer moderation.
type ModerateReceiveBufferOption bool
// TCPLingerTimeoutOption is used by SetSockOpt/GetSockOpt to set/get the
@@ -855,7 +855,10 @@ type OutOfBandInlineOption int
// a default TTL.
type DefaultTTLOption uint8
-//
+// SocketDetachFilterOption is used by SetSockOpt to detach a previously attached
+// classic BPF filter on a given endpoint.
+type SocketDetachFilterOption int
+
// IPPacketInfo is the message structure for IP_PKTINFO.
//
// +stateify savable
@@ -1244,6 +1247,9 @@ type UDPStats struct {
// ChecksumErrors is the number of datagrams dropped due to bad checksums.
ChecksumErrors *StatCounter
+
+ // InvalidSourceAddress is the number of invalid sourced datagrams dropped.
+ InvalidSourceAddress *StatCounter
}
// Stats holds statistics about the networking stack.
diff --git a/pkg/tcpip/timer.go b/pkg/tcpip/timer.go
index 59f3b391f..5554c573f 100644
--- a/pkg/tcpip/timer.go
+++ b/pkg/tcpip/timer.go
@@ -15,8 +15,9 @@
package tcpip
import (
- "sync"
"time"
+
+ "gvisor.dev/gvisor/pkg/sync"
)
// cancellableTimerInstance is a specific instance of CancellableTimer.
@@ -92,6 +93,8 @@ func (t *cancellableTimerInstance) stop() {
// Note, it is not safe to copy a CancellableTimer as its timer instance creates
// a closure over the address of the CancellableTimer.
type CancellableTimer struct {
+ _ sync.NoCopy
+
// The active instance of a cancellable timer.
instance cancellableTimerInstance
@@ -157,22 +160,6 @@ func (t *CancellableTimer) Reset(d time.Duration) {
}
}
-// Lock is a no-op used by the copylocks checker from go vet.
-//
-// See CancellableTimer for details about why it shouldn't be copied.
-//
-// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more
-// details about the copylocks checker.
-func (*CancellableTimer) Lock() {}
-
-// Unlock is a no-op used by the copylocks checker from go vet.
-//
-// See CancellableTimer for details about why it shouldn't be copied.
-//
-// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more
-// details about the copylocks checker.
-func (*CancellableTimer) Unlock() {}
-
// NewCancellableTimer returns an unscheduled CancellableTimer with the given
// locker and fn.
//
diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go
index 62d1acad4..678f4e016 100644
--- a/pkg/tcpip/transport/icmp/endpoint.go
+++ b/pkg/tcpip/transport/icmp/endpoint.go
@@ -344,6 +344,10 @@ func (e *endpoint) Peek([][]byte) (int64, tcpip.ControlMessages, *tcpip.Error) {
// SetSockOpt sets a socket option.
func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
+ switch opt.(type) {
+ case tcpip.SocketDetachFilterOption:
+ return nil
+ }
return nil
}
diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go
index a8f8454dd..57b7f5c19 100644
--- a/pkg/tcpip/transport/packet/endpoint.go
+++ b/pkg/tcpip/transport/packet/endpoint.go
@@ -278,7 +278,13 @@ func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// used with SetSockOpt, and this function always returns
// tcpip.ErrNotSupported.
func (ep *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
- return tcpip.ErrUnknownProtocolOption
+ switch opt.(type) {
+ case tcpip.SocketDetachFilterOption:
+ return nil
+
+ default:
+ return tcpip.ErrUnknownProtocolOption
+ }
}
// SetSockOptBool implements tcpip.Endpoint.SetSockOptBool.
diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go
index 5b6e7d102..c2e9fd29f 100644
--- a/pkg/tcpip/transport/raw/endpoint.go
+++ b/pkg/tcpip/transport/raw/endpoint.go
@@ -506,7 +506,13 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask {
// SetSockOpt implements tcpip.Endpoint.SetSockOpt.
func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
- return tcpip.ErrUnknownProtocolOption
+ switch opt.(type) {
+ case tcpip.SocketDetachFilterOption:
+ return nil
+
+ default:
+ return tcpip.ErrUnknownProtocolOption
+ }
}
// SetSockOptBool implements tcpip.Endpoint.SetSockOptBool.
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index caac6ef57..83dc10ed0 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -1792,6 +1792,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
e.deferAccept = time.Duration(v)
e.UnlockUser()
+ case tcpip.SocketDetachFilterOption:
+ return nil
+
default:
return nil
}
diff --git a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go
index 12bc1b5b5..558b06df0 100644
--- a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go
+++ b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go
@@ -106,6 +106,11 @@ func (t *TCB) UpdateStateOutbound(tcp header.TCP) Result {
return st
}
+// State returns the current state of the TCB.
+func (t *TCB) State() Result {
+ return t.state
+}
+
// IsAlive returns true as long as the connection is established(Alive)
// or connecting state.
func (t *TCB) IsAlive() bool {
diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go
index 0584ec8dc..a14643ae8 100644
--- a/pkg/tcpip/transport/udp/endpoint.go
+++ b/pkg/tcpip/transport/udp/endpoint.go
@@ -816,6 +816,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error {
e.mu.Lock()
e.bindToDevice = id
e.mu.Unlock()
+
+ case tcpip.SocketDetachFilterOption:
+ return nil
}
return nil
}
@@ -1377,6 +1380,15 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk
return
}
+ // Never receive from a multicast address.
+ if header.IsV4MulticastAddress(id.RemoteAddress) ||
+ header.IsV6MulticastAddress(id.RemoteAddress) {
+ e.stack.Stats().UDP.InvalidSourceAddress.Increment()
+ e.stack.Stats().IP.InvalidSourceAddressesReceived.Increment()
+ e.stats.ReceiveErrors.MalformedPacketsReceived.Increment()
+ return
+ }
+
// Verify checksum unless RX checksum offload is enabled.
// On IPv4, UDP checksum is optional, and a zero value means
// the transmitter omitted the checksum generation (RFC768).
@@ -1395,10 +1407,10 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk
}
}
- e.rcvMu.Lock()
e.stack.Stats().UDP.PacketsReceived.Increment()
e.stats.PacketsReceived.Increment()
+ e.rcvMu.Lock()
// Drop the packet if our buffer is currently full.
if !e.rcvReady || e.rcvClosed {
e.rcvMu.Unlock()
diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go
index 91ba031fa..90781cf49 100644
--- a/pkg/tcpip/transport/udp/udp_test.go
+++ b/pkg/tcpip/transport/udp/udp_test.go
@@ -83,16 +83,18 @@ type header4Tuple struct {
type testFlow int
const (
- unicastV4 testFlow = iota // V4 unicast on a V4 socket
- unicastV4in6 // V4-mapped unicast on a V6-dual socket
- unicastV6 // V6 unicast on a V6 socket
- unicastV6Only // V6 unicast on a V6-only socket
- multicastV4 // V4 multicast on a V4 socket
- multicastV4in6 // V4-mapped multicast on a V6-dual socket
- multicastV6 // V6 multicast on a V6 socket
- multicastV6Only // V6 multicast on a V6-only socket
- broadcast // V4 broadcast on a V4 socket
- broadcastIn6 // V4-mapped broadcast on a V6-dual socket
+ unicastV4 testFlow = iota // V4 unicast on a V4 socket
+ unicastV4in6 // V4-mapped unicast on a V6-dual socket
+ unicastV6 // V6 unicast on a V6 socket
+ unicastV6Only // V6 unicast on a V6-only socket
+ multicastV4 // V4 multicast on a V4 socket
+ multicastV4in6 // V4-mapped multicast on a V6-dual socket
+ multicastV6 // V6 multicast on a V6 socket
+ multicastV6Only // V6 multicast on a V6-only socket
+ broadcast // V4 broadcast on a V4 socket
+ broadcastIn6 // V4-mapped broadcast on a V6-dual socket
+ reverseMulticast4 // V4 multicast src. Must fail.
+ reverseMulticast6 // V6 multicast src. Must fail.
)
func (flow testFlow) String() string {
@@ -117,6 +119,10 @@ func (flow testFlow) String() string {
return "broadcast"
case broadcastIn6:
return "broadcastIn6"
+ case reverseMulticast4:
+ return "reverseMulticast4"
+ case reverseMulticast6:
+ return "reverseMulticast6"
default:
return "unknown"
}
@@ -168,6 +174,9 @@ func (flow testFlow) header4Tuple(d packetDirection) header4Tuple {
h.dstAddr.Addr = multicastV6Addr
}
}
+ if flow.isReverseMulticast() {
+ h.srcAddr.Addr = flow.getMcastAddr()
+ }
return h
}
@@ -199,9 +208,9 @@ func (flow testFlow) netProto() tcpip.NetworkProtocolNumber {
// endpoint for this flow.
func (flow testFlow) sockProto() tcpip.NetworkProtocolNumber {
switch flow {
- case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6:
+ case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6, reverseMulticast6:
return ipv6.ProtocolNumber
- case unicastV4, multicastV4, broadcast:
+ case unicastV4, multicastV4, broadcast, reverseMulticast4:
return ipv4.ProtocolNumber
default:
panic(fmt.Sprintf("invalid testFlow given: %d", flow))
@@ -224,7 +233,7 @@ func (flow testFlow) isV6Only() bool {
switch flow {
case unicastV6Only, multicastV6Only:
return true
- case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6:
+ case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6:
return false
default:
panic(fmt.Sprintf("invalid testFlow given: %d", flow))
@@ -235,7 +244,7 @@ func (flow testFlow) isMulticast() bool {
switch flow {
case multicastV4, multicastV4in6, multicastV6, multicastV6Only:
return true
- case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6:
+ case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6:
return false
default:
panic(fmt.Sprintf("invalid testFlow given: %d", flow))
@@ -246,7 +255,7 @@ func (flow testFlow) isBroadcast() bool {
switch flow {
case broadcast, broadcastIn6:
return true
- case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only:
+ case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only, reverseMulticast4, reverseMulticast6:
return false
default:
panic(fmt.Sprintf("invalid testFlow given: %d", flow))
@@ -257,13 +266,22 @@ func (flow testFlow) isMapped() bool {
switch flow {
case unicastV4in6, multicastV4in6, broadcastIn6:
return true
- case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast:
+ case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast, reverseMulticast4, reverseMulticast6:
return false
default:
panic(fmt.Sprintf("invalid testFlow given: %d", flow))
}
}
+func (flow testFlow) isReverseMulticast() bool {
+ switch flow {
+ case reverseMulticast4, reverseMulticast6:
+ return true
+ default:
+ return false
+ }
+}
+
type testContext struct {
t *testing.T
linkEP *channel.Endpoint
@@ -872,6 +890,60 @@ func TestV4ReadOnBoundToBroadcast(t *testing.T) {
}
}
+// TestReadFromMulticast checks that an endpoint will NOT receive a packet
+// that was sent with multicast SOURCE address.
+func TestReadFromMulticast(t *testing.T) {
+ for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6} {
+ t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) {
+ c := newDualTestContext(t, defaultMTU)
+ defer c.cleanup()
+
+ c.createEndpointForFlow(flow)
+
+ if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil {
+ t.Fatalf("Bind failed: %s", err)
+ }
+ testFailingRead(c, flow, false /* expectReadError */)
+ })
+ }
+}
+
+// TestReadFromMulticaststats checks that a discarded packet
+// that that was sent with multicast SOURCE address increments
+// the correct counters and that a regular packet does not.
+func TestReadFromMulticastStats(t *testing.T) {
+ t.Helper()
+ for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6, unicastV4} {
+ t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) {
+ c := newDualTestContext(t, defaultMTU)
+ defer c.cleanup()
+
+ c.createEndpointForFlow(flow)
+
+ if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil {
+ t.Fatalf("Bind failed: %s", err)
+ }
+
+ payload := newPayload()
+ c.injectPacket(flow, payload)
+
+ var want uint64 = 0
+ if flow.isReverseMulticast() {
+ want = 1
+ }
+ if got := c.s.Stats().IP.InvalidSourceAddressesReceived.Value(); got != want {
+ t.Errorf("got stats.IP.InvalidSourceAddressesReceived.Value() = %d, want = %d", got, want)
+ }
+ if got := c.s.Stats().UDP.InvalidSourceAddress.Value(); got != want {
+ t.Errorf("got stats.UDP.InvalidSourceAddress.Value() = %d, want = %d", got, want)
+ }
+ if got := c.ep.Stats().(*tcpip.TransportEndpointStats).ReceiveErrors.MalformedPacketsReceived.Value(); got != want {
+ t.Errorf("got EP Stats.ReceiveErrors.MalformedPacketsReceived stats = %d, want = %d", got, want)
+ }
+ })
+ }
+}
+
// TestV4ReadBroadcastOnBoundToWildcard checks that an endpoint can bind to ANY
// and receive broadcast and unicast data.
func TestV4ReadBroadcastOnBoundToWildcard(t *testing.T) {
diff --git a/pkg/test/criutil/criutil.go b/pkg/test/criutil/criutil.go
index 4c63d669a..66f10c016 100644
--- a/pkg/test/criutil/criutil.go
+++ b/pkg/test/criutil/criutil.go
@@ -103,6 +103,9 @@ func (cc *Crictl) Create(podID, contSpecFile, sbSpecFile string) (string, error)
// need to parse the version and add an appropriate --no-pull argument
// since the image has already been loaded locally.
out, err := cc.run("-v")
+ if err != nil {
+ return "", err
+ }
r := regexp.MustCompile("crictl version ([0-9]+)\\.([0-9]+)\\.([0-9+])")
vs := r.FindStringSubmatch(out)
if len(vs) != 4 {
diff --git a/pkg/test/dockerutil/dockerutil.go b/pkg/test/dockerutil/dockerutil.go
index f95ae3cd1..df09babf3 100644
--- a/pkg/test/dockerutil/dockerutil.go
+++ b/pkg/test/dockerutil/dockerutil.go
@@ -119,3 +119,8 @@ func Save(logger testutil.Logger, image string, w io.Writer) error {
cmd.Stdout = w // Send directly to the writer.
return cmd.Run()
}
+
+// Runtime returns the value of the flag runtime.
+func Runtime() string {
+ return *runtime
+}
diff --git a/pkg/test/testutil/BUILD b/pkg/test/testutil/BUILD
index 03b1b4677..2d8f56bc0 100644
--- a/pkg/test/testutil/BUILD
+++ b/pkg/test/testutil/BUILD
@@ -15,6 +15,6 @@ go_library(
"//runsc/boot",
"//runsc/specutils",
"@com_github_cenkalti_backoff//:go_default_library",
- "@com_github_opencontainers_runtime-spec//specs-go:go_default_library",
+ "@com_github_opencontainers_runtime_spec//specs-go:go_default_library",
],
)