diff options
Diffstat (limited to 'pkg')
68 files changed, 1002 insertions, 354 deletions
diff --git a/pkg/sentry/fsimpl/ext/BUILD b/pkg/sentry/fsimpl/ext/BUILD index ef24f8159..abc610ef3 100644 --- a/pkg/sentry/fsimpl/ext/BUILD +++ b/pkg/sentry/fsimpl/ext/BUILD @@ -96,7 +96,7 @@ go_test( "//pkg/syserror", "//pkg/test/testutil", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", - "@com_github_google_go-cmp//cmp/cmpopts:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp/cmpopts:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/gofer/gofer.go b/pkg/sentry/fsimpl/gofer/gofer.go index 2b83094cd..b74d489a0 100644 --- a/pkg/sentry/fsimpl/gofer/gofer.go +++ b/pkg/sentry/fsimpl/gofer/gofer.go @@ -602,8 +602,14 @@ type dentry struct { // returned by the server. dirents is protected by dirMu. dirents []vfs.Dirent - // Cached metadata; protected by metadataMu and accessed using atomic - // memory operations unless otherwise specified. + // Cached metadata; protected by metadataMu. + // To access: + // - In situations where consistency is not required (like stat), these + // can be accessed using atomic operations only (without locking). + // - Lock metadataMu and can access without atomic operations. + // To mutate: + // - Lock metadataMu and use atomic operations to update because we might + // have atomic readers that don't hold the lock. metadataMu sync.Mutex ino inodeNumber // immutable mode uint32 // type is immutable, perms are mutable @@ -616,7 +622,7 @@ type dentry struct { ctime int64 btime int64 // File size, protected by both metadataMu and dataMu (i.e. both must be - // locked to mutate it). + // locked to mutate it; locking either is sufficient to access it). size uint64 // nlink counts the number of hard links to this dentry. It's updated and @@ -904,14 +910,14 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin // Prepare for truncate. if stat.Mask&linux.STATX_SIZE != 0 { - switch d.mode & linux.S_IFMT { - case linux.S_IFREG: + switch mode.FileType() { + case linux.ModeRegular: if !setLocalMtime { // Truncate updates mtime. setLocalMtime = true stat.Mtime.Nsec = linux.UTIME_NOW } - case linux.S_IFDIR: + case linux.ModeDirectory: return syserror.EISDIR default: return syserror.EINVAL @@ -994,7 +1000,7 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin func (d *dentry) updateFileSizeLocked(newSize uint64) { d.dataMu.Lock() oldSize := d.size - d.size = newSize + atomic.StoreUint64(&d.size, newSize) // d.dataMu must be unlocked to lock d.mapsMu and invalidate mappings // below. This allows concurrent calls to Read/Translate/etc. These // functions synchronize with truncation by refusing to use cache @@ -1340,8 +1346,8 @@ func (d *dentry) removexattr(ctx context.Context, creds *auth.Credentials, name // Extended attributes in the user.* namespace are only supported for regular // files and directories. func (d *dentry) userXattrSupported() bool { - filetype := linux.S_IFMT & atomic.LoadUint32(&d.mode) - return filetype == linux.S_IFREG || filetype == linux.S_IFDIR + filetype := linux.FileMode(atomic.LoadUint32(&d.mode)).FileType() + return filetype == linux.ModeRegular || filetype == linux.ModeDirectory } // Preconditions: !d.isSynthetic(). d.isRegularFile() || d.isDir(). diff --git a/pkg/sentry/fsimpl/gofer/regular_file.go b/pkg/sentry/fsimpl/gofer/regular_file.go index a2f02d9c7..f10350c97 100644 --- a/pkg/sentry/fsimpl/gofer/regular_file.go +++ b/pkg/sentry/fsimpl/gofer/regular_file.go @@ -89,7 +89,9 @@ func (fd *regularFileFD) Allocate(ctx context.Context, mode, offset, length uint if err != nil { return err } - d.size = size + d.dataMu.Lock() + atomic.StoreUint64(&d.size, size) + d.dataMu.Unlock() if !d.cachedMetadataAuthoritative() { d.touchCMtimeLocked() } diff --git a/pkg/sentry/fsimpl/kernfs/BUILD b/pkg/sentry/fsimpl/kernfs/BUILD index 179df6c1e..3835557fe 100644 --- a/pkg/sentry/fsimpl/kernfs/BUILD +++ b/pkg/sentry/fsimpl/kernfs/BUILD @@ -70,6 +70,6 @@ go_test( "//pkg/sentry/vfs", "//pkg/syserror", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/sys/BUILD b/pkg/sentry/fsimpl/sys/BUILD index a741e2bb6..1b548ccd4 100644 --- a/pkg/sentry/fsimpl/sys/BUILD +++ b/pkg/sentry/fsimpl/sys/BUILD @@ -29,6 +29,6 @@ go_test( "//pkg/sentry/kernel", "//pkg/sentry/kernel/auth", "//pkg/sentry/vfs", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/testutil/BUILD b/pkg/sentry/fsimpl/testutil/BUILD index 0e4053a46..400a97996 100644 --- a/pkg/sentry/fsimpl/testutil/BUILD +++ b/pkg/sentry/fsimpl/testutil/BUILD @@ -32,6 +32,6 @@ go_library( "//pkg/sentry/vfs", "//pkg/sync", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/kernel/timekeeper.go b/pkg/sentry/kernel/timekeeper.go index 0adf25691..5f3908d8b 100644 --- a/pkg/sentry/kernel/timekeeper.go +++ b/pkg/sentry/kernel/timekeeper.go @@ -210,9 +210,6 @@ func (t *Timekeeper) startUpdater() { p.realtimeBaseRef = int64(realtimeParams.BaseRef) p.realtimeFrequency = realtimeParams.Frequency } - - log.Debugf("Updating VDSO parameters: %+v", p) - return p }); err != nil { log.Warningf("Unable to update VDSO parameter page: %v", err) diff --git a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go index 8bed34922..3de309c1a 100644 --- a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go +++ b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go @@ -78,19 +78,6 @@ func (c *vCPU) initArchState() error { return err } - // sctlr_el1 - regGet.id = _KVM_ARM64_REGS_SCTLR_EL1 - if err := c.getOneRegister(®Get); err != nil { - return err - } - - dataGet |= (_SCTLR_M | _SCTLR_C | _SCTLR_I) - data = dataGet - reg.id = _KVM_ARM64_REGS_SCTLR_EL1 - if err := c.setOneRegister(®); err != nil { - return err - } - // tcr_el1 data = _TCR_TXSZ_VA48 | _TCR_CACHE_FLAGS | _TCR_SHARED | _TCR_TG_FLAGS | _TCR_ASID16 | _TCR_IPS_40BITS reg.id = _KVM_ARM64_REGS_TCR_EL1 diff --git a/pkg/sentry/platform/ring0/entry_arm64.s b/pkg/sentry/platform/ring0/entry_arm64.s index 2bc5f3ecd..6ed73699b 100644 --- a/pkg/sentry/platform/ring0/entry_arm64.s +++ b/pkg/sentry/platform/ring0/entry_arm64.s @@ -40,6 +40,14 @@ #define FPEN_ENABLE (FPEN_NOTRAP << FPEN_SHIFT) +// sctlr_el1: system control register el1. +#define SCTLR_M 1 << 0 +#define SCTLR_C 1 << 2 +#define SCTLR_I 1 << 12 +#define SCTLR_UCT 1 << 15 + +#define SCTLR_EL1_DEFAULT (SCTLR_M | SCTLR_C | SCTLR_I | SCTLR_UCT) + // Saves a register set. // // This is a macro because it may need to executed in contents where a stack is @@ -496,6 +504,11 @@ TEXT ·kernelExitToEl1(SB),NOSPLIT,$0 // Start is the CPU entrypoint. TEXT ·Start(SB),NOSPLIT,$0 IRQ_DISABLE + + // Init. + MOVD $SCTLR_EL1_DEFAULT, R1 + MSR R1, SCTLR_EL1 + MOVD R8, RSV_REG ORR $0xffff000000000000, RSV_REG, RSV_REG WORD $0xd518d092 //MSR R18, TPIDR_EL1 diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go index 3b248a953..78a842973 100644 --- a/pkg/sentry/socket/netstack/netstack.go +++ b/pkg/sentry/socket/netstack/netstack.go @@ -192,6 +192,7 @@ var Metrics = tcpip.Stats{ PacketsSent: mustCreateMetric("/netstack/udp/packets_sent", "Number of UDP datagrams sent."), PacketSendErrors: mustCreateMetric("/netstack/udp/packet_send_errors", "Number of UDP datagrams failed to be sent."), ChecksumErrors: mustCreateMetric("/netstack/udp/checksum_errors", "Number of UDP datagrams dropped due to bad checksums."), + InvalidSourceAddress: mustCreateMetric("/netstack/udp/invalid_source", "Number of UDP datagrams dropped due to invalid source address."), }, } @@ -1753,6 +1754,11 @@ func setSockOptSocket(t *kernel.Task, s socket.SocketOps, ep commonEndpoint, nam return nil + case linux.SO_DETACH_FILTER: + // optval is ignored. + var v tcpip.SocketDetachFilterOption + return syserr.TranslateNetstackError(ep.SetSockOpt(v)) + default: socket.SetSockOptEmitUnimplementedEvent(t, name) } diff --git a/pkg/sentry/time/parameters.go b/pkg/sentry/time/parameters.go index 65868cb26..cd1b95117 100644 --- a/pkg/sentry/time/parameters.go +++ b/pkg/sentry/time/parameters.go @@ -228,11 +228,15 @@ func errorAdjust(prevParams Parameters, newParams Parameters, now TSCValue) (Par // // The log level is determined by the error severity. func logErrorAdjustment(clock ClockID, errorNS ReferenceNS, orig, adjusted Parameters) { - fn := log.Debugf - if int64(errorNS.Magnitude()) > time.Millisecond.Nanoseconds() { + magNS := int64(errorNS.Magnitude()) + if magNS <= 10*time.Microsecond.Nanoseconds() { + // Don't log small errors. + return + } + fn := log.Infof + if magNS > time.Millisecond.Nanoseconds() { + // Upgrade large errors to warning. fn = log.Warningf - } else if int64(errorNS.Magnitude()) > 10*time.Microsecond.Nanoseconds() { - fn = log.Infof } fn("Clock(%v): error: %v ns, adjusted frequency from %v Hz to %v Hz", clock, errorNS, orig.Frequency, adjusted.Frequency) diff --git a/pkg/sentry/vfs/inotify.go b/pkg/sentry/vfs/inotify.go index c2e21ac5f..167b731ac 100644 --- a/pkg/sentry/vfs/inotify.go +++ b/pkg/sentry/vfs/inotify.go @@ -179,12 +179,12 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { return mask & ready } -// PRead implements FileDescriptionImpl. +// PRead implements FileDescriptionImpl.PRead. func (*Inotify) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts ReadOptions) (int64, error) { return 0, syserror.ESPIPE } -// PWrite implements FileDescriptionImpl. +// PWrite implements FileDescriptionImpl.PWrite. func (*Inotify) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts WriteOptions) (int64, error) { return 0, syserror.ESPIPE } @@ -243,7 +243,7 @@ func (i *Inotify) Read(ctx context.Context, dst usermem.IOSequence, opts ReadOpt return writeLen, nil } -// Ioctl implements fs.FileOperations.Ioctl. +// Ioctl implements FileDescriptionImpl.Ioctl. func (i *Inotify) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) { switch args[1].Int() { case linux.FIONREAD: diff --git a/pkg/shim/runsc/BUILD b/pkg/shim/runsc/BUILD index 4d388ca05..f08599ebd 100644 --- a/pkg/shim/runsc/BUILD +++ b/pkg/shim/runsc/BUILD @@ -11,6 +11,6 @@ go_library( visibility = ["//:sandbox"], deps = [ "@com_github_containerd_go_runc//:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) diff --git a/pkg/shim/runsc/runsc.go b/pkg/shim/runsc/runsc.go index 50807d0c5..c5cf68efa 100644 --- a/pkg/shim/runsc/runsc.go +++ b/pkg/shim/runsc/runsc.go @@ -1,5 +1,5 @@ // Copyright 2018 The containerd Authors. -// Copyright 2018 The gVisor authors. +// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/shim/runsc/utils.go b/pkg/shim/runsc/utils.go index 2732d5e98..c514b3bc7 100644 --- a/pkg/shim/runsc/utils.go +++ b/pkg/shim/runsc/utils.go @@ -1,5 +1,5 @@ // Copyright 2018 The containerd Authors. -// Copyright 2018 The gVisor authors. +// Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/shim/v1/proc/BUILD b/pkg/shim/v1/proc/BUILD index a59bf198d..4377306af 100644 --- a/pkg/shim/v1/proc/BUILD +++ b/pkg/shim/v1/proc/BUILD @@ -25,11 +25,12 @@ go_library( "@com_github_containerd_containerd//errdefs:go_default_library", "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_go_runc//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", "@org_golang_x_sys//unix:go_default_library", ], ) diff --git a/pkg/shim/v1/proc/deleted_state.go b/pkg/shim/v1/proc/deleted_state.go index 52aad40ac..d9b970c4d 100644 --- a/pkg/shim/v1/proc/deleted_state.go +++ b/pkg/shim/v1/proc/deleted_state.go @@ -21,29 +21,29 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" ) type deletedState struct{} func (*deletedState) Resize(ws console.WinSize) error { - return fmt.Errorf("cannot resize a deleted process") + return fmt.Errorf("cannot resize a deleted process.ss") } func (*deletedState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a deleted process") + return fmt.Errorf("cannot start a deleted process.ss") } func (*deletedState) Delete(ctx context.Context) error { - return fmt.Errorf("cannot delete a deleted process: %w", errdefs.ErrNotFound) + return fmt.Errorf("cannot delete a deleted process.ss: %w", errdefs.ErrNotFound) } func (*deletedState) Kill(ctx context.Context, sig uint32, all bool) error { - return fmt.Errorf("cannot kill a deleted process: %w", errdefs.ErrNotFound) + return fmt.Errorf("cannot kill a deleted process.ss: %w", errdefs.ErrNotFound) } func (*deletedState) SetExited(status int) {} -func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return nil, fmt.Errorf("cannot exec in a deleted state") } diff --git a/pkg/shim/v1/proc/exec.go b/pkg/shim/v1/proc/exec.go index 4ef9cf6cf..1d1d90488 100644 --- a/pkg/shim/v1/proc/exec.go +++ b/pkg/shim/v1/proc/exec.go @@ -27,7 +27,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -51,7 +51,7 @@ type execProcess struct { internalPid int closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio path string spec specs.Process @@ -164,7 +164,7 @@ func (e *execProcess) Stdin() io.Closer { return e.stdin } -func (e *execProcess) Stdio() proc.Stdio { +func (e *execProcess) Stdio() stdio.Stdio { return e.stdio } @@ -223,7 +223,6 @@ func (e *execProcess) start(ctx context.Context) (err error) { e.closers = append(e.closers, sc) e.stdin = sc } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -231,15 +230,14 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return fmt.Errorf("failed to retrieve console master: %w", err) } - if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return fmt.Errorf("failed to start console copy: %w", err) } } else if !e.stdio.IsNull() { - if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { + if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { return fmt.Errorf("failed to start io pipe copy: %w", err) } } - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(opts.PidFile) if err != nil { return fmt.Errorf("failed to retrieve OCI runtime exec pid: %w", err) diff --git a/pkg/shim/v1/proc/init.go b/pkg/shim/v1/proc/init.go index b429cb94f..dab3123d6 100644 --- a/pkg/shim/v1/proc/init.go +++ b/pkg/shim/v1/proc/init.go @@ -30,7 +30,8 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -59,7 +60,7 @@ type Init struct { id string Bundle string console console.Console - Platform proc.Platform + Platform stdio.Platform io runc.IO runtime *runsc.Runsc status int @@ -67,7 +68,7 @@ type Init struct { pid int closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio Rootfs string IoUID int IoGID int @@ -92,7 +93,7 @@ func NewRunsc(root, path, namespace, runtime string, config map[string]string) * } // New returns a new init process. -func New(id string, runtime *runsc.Runsc, stdio proc.Stdio) *Init { +func New(id string, runtime *runsc.Runsc, stdio stdio.Stdio) *Init { p := &Init{ id: id, runtime: runtime, @@ -144,7 +145,6 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) { p.stdin = sc p.closers = append(p.closers, sc) } - var copyWaitGroup sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if socket != nil { @@ -152,18 +152,16 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) { if err != nil { return fmt.Errorf("failed to retrieve console master: %w", err) } - console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) + console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) if err != nil { return fmt.Errorf("failed to start console copy: %w", err) } p.console = console } else if !hasNoIO(r) { - if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup); err != nil { + if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg); err != nil { return fmt.Errorf("failed to start io pipe copy: %w", err) } } - - copyWaitGroup.Wait() pid, err := runc.ReadPidFile(pidFile) if err != nil { return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err) @@ -391,7 +389,7 @@ func (p *Init) Runtime() *runsc.Runsc { } // Exec returns a new child process. -func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { p.mu.Lock() defer p.mu.Unlock() @@ -399,7 +397,7 @@ func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce } // exec returns a new exec'd process. -func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { @@ -412,7 +410,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce path: path, parent: p, spec: spec, - stdio: proc.Stdio{ + stdio: stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -425,7 +423,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce } // Stdio returns the stdio of the process. -func (p *Init) Stdio() proc.Stdio { +func (p *Init) Stdio() stdio.Stdio { return p.stdio } @@ -453,7 +451,7 @@ func (p *Init) convertStatus(status string) string { return status } -func withConditionalIO(c proc.Stdio) runc.IOOpt { +func withConditionalIO(c stdio.Stdio) runc.IOOpt { return func(o *runc.IOOption) { o.OpenStdin = c.Stdin != "" o.OpenStdout = c.Stdout != "" diff --git a/pkg/shim/v1/proc/init_state.go b/pkg/shim/v1/proc/init_state.go index 509f27762..9233ecc85 100644 --- a/pkg/shim/v1/proc/init_state.go +++ b/pkg/shim/v1/proc/init_state.go @@ -21,14 +21,14 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/process" ) type initState interface { Resize(console.WinSize) error Start(context.Context) error Delete(context.Context) error - Exec(context.Context, string, *ExecConfig) (proc.Process, error) + Exec(context.Context, string, *ExecConfig) (process.Process, error) Kill(context.Context, uint32, bool) error SetExited(int) } @@ -94,7 +94,7 @@ func (s *createdState) SetExited(status int) { } } -func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return s.p.exec(ctx, path, r) } @@ -117,11 +117,11 @@ func (s *runningState) Resize(ws console.WinSize) error { } func (s *runningState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a running process") + return fmt.Errorf("cannot start a running process.ss") } func (s *runningState) Delete(ctx context.Context) error { - return fmt.Errorf("cannot delete a running process") + return fmt.Errorf("cannot delete a running process.ss") } func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error { @@ -136,7 +136,7 @@ func (s *runningState) SetExited(status int) { } } -func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return s.p.exec(ctx, path, r) } @@ -159,7 +159,7 @@ func (s *stoppedState) Resize(ws console.WinSize) error { } func (s *stoppedState) Start(ctx context.Context) error { - return fmt.Errorf("cannot start a stopped process") + return fmt.Errorf("cannot start a stopped process.ss") } func (s *stoppedState) Delete(ctx context.Context) error { @@ -170,13 +170,13 @@ func (s *stoppedState) Delete(ctx context.Context) error { } func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error { - return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id) + return errdefs.ToGRPCf(errdefs.ErrNotFound, "process.ss %s not found", s.p.id) } func (s *stoppedState) SetExited(status int) { // no op } -func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { return nil, fmt.Errorf("cannot exec in a stopped state") } diff --git a/pkg/shim/v1/proc/io.go b/pkg/shim/v1/proc/io.go index 5313c7a50..34d825fb7 100644 --- a/pkg/shim/v1/proc/io.go +++ b/pkg/shim/v1/proc/io.go @@ -38,7 +38,7 @@ var bufPool = sync.Pool{ }, } -func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { +func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error { var sameFile *countingWriteCloser for _, i := range []struct { name string @@ -48,9 +48,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w name: stdout, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { @@ -67,9 +65,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w name: stderr, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { @@ -124,9 +120,7 @@ func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, w if err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err) } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) diff --git a/pkg/shim/v1/proc/types.go b/pkg/shim/v1/proc/types.go index 5c215de5f..2b0df4663 100644 --- a/pkg/shim/v1/proc/types.go +++ b/pkg/shim/v1/proc/types.go @@ -18,9 +18,8 @@ package proc import ( "time" - google_protobuf "github.com/gogo/protobuf/types" - runc "github.com/containerd/go-runc" + "github.com/gogo/protobuf/types" ) // Mount holds filesystem mount configuration. @@ -41,7 +40,7 @@ type CreateConfig struct { Stdin string Stdout string Stderr string - Options *google_protobuf.Any + Options *types.Any } // ExecConfig holds exec creation configuration. @@ -51,7 +50,7 @@ type ExecConfig struct { Stdin string Stdout string Stderr string - Spec *google_protobuf.Any + Spec *types.Any } // Exit is the type of exit events. diff --git a/pkg/shim/v1/shim/BUILD b/pkg/shim/v1/shim/BUILD index 02cffbb01..05c595bc9 100644 --- a/pkg/shim/v1/shim/BUILD +++ b/pkg/shim/v1/shim/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "shim", srcs = [ + "api.go", "platform.go", "service.go", ], @@ -24,11 +25,12 @@ go_library( "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_containerd//runtime:go_default_library", "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", - "@com_github_containerd_containerd//runtime/v1/shim:go_default_library", "@com_github_containerd_containerd//runtime/v1/shim/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_typeurl//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/pkg/shim/v1/shim/api.go b/pkg/shim/v1/shim/api.go new file mode 100644 index 000000000..5dd8ff172 --- /dev/null +++ b/pkg/shim/v1/shim/api.go @@ -0,0 +1,28 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shim + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskCreate = events.TaskCreate +type TaskStart = events.TaskStart +type TaskOOM = events.TaskOOM +type TaskExit = events.TaskExit +type TaskDelete = events.TaskDelete +type TaskExecAdded = events.TaskExecAdded +type TaskExecStarted = events.TaskExecStarted diff --git a/pkg/shim/v1/shim/platform.go b/pkg/shim/v1/shim/platform.go index f6795563c..f590f80ef 100644 --- a/pkg/shim/v1/shim/platform.go +++ b/pkg/shim/v1/shim/platform.go @@ -30,7 +30,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, fmt.Errorf("uninitialized epoller") } @@ -45,9 +45,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(epollConsole, in, *p) @@ -63,9 +61,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(outw, epollConsole, *p) diff --git a/pkg/shim/v1/shim/service.go b/pkg/shim/v1/shim/service.go index 7b0280ed2..84a810cb2 100644 --- a/pkg/shim/v1/shim/service.go +++ b/pkg/shim/v1/shim/service.go @@ -23,20 +23,20 @@ import ( "sync" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/shim" - shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + shim "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,7 +46,7 @@ import ( ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -73,7 +73,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { s := &Service{ config: config, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, } @@ -91,9 +91,9 @@ type Service struct { config Config context context.Context - processes map[string]rproc.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform ec chan proc.Exit // Filled by Create() @@ -102,7 +102,7 @@ type Service struct { } // Create creates a new initial process and container with the underlying OCI runtime. -func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) { +func (s *Service) Create(ctx context.Context, r *shim.CreateTaskRequest) (_ *shim.CreateTaskResponse, err error) { s.mu.Lock() defer s.mu.Unlock() @@ -168,13 +168,13 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ * s.bundle = r.Bundle pid := process.Pid() s.processes[r.ID] = process - return &shimapi.CreateTaskResponse{ + return &shim.CreateTaskResponse{ Pid: uint32(pid), }, nil } // Start starts a process. -func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) { +func (s *Service) Start(ctx context.Context, r *shim.StartRequest) (*shim.StartResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -182,14 +182,14 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. if err := p.Start(ctx); err != nil { return nil, err } - return &shimapi.StartResponse{ + return &shim.StartResponse{ ID: p.ID(), Pid: uint32(p.Pid()), }, nil } // Delete deletes the initial process and container. -func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) { +func (s *Service) Delete(ctx context.Context, r *types.Empty) (*shim.DeleteResponse, error) { p, err := s.getInitProcess() if err != nil { return nil, err @@ -201,7 +201,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR delete(s.processes, s.id) s.mu.Unlock() s.platform.Close() - return &shimapi.DeleteResponse{ + return &shim.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), Pid: uint32(p.Pid()), @@ -209,7 +209,7 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR } // DeleteProcess deletes an exec'd process. -func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { +func (s *Service) DeleteProcess(ctx context.Context, r *shim.DeleteProcessRequest) (*shim.DeleteResponse, error) { if r.ID == s.id { return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") } @@ -223,7 +223,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq s.mu.Lock() delete(s.processes, r.ID) s.mu.Unlock() - return &shimapi.DeleteResponse{ + return &shim.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), Pid: uint32(p.Pid()), @@ -231,7 +231,7 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq } // Exec spawns an additional process inside the container. -func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *Service) Exec(ctx context.Context, r *shim.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() if p := s.processes[r.ID]; p != nil { @@ -263,7 +263,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty } // ResizePty resises the terminal of a process. -func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *Service) ResizePty(ctx context.Context, r *shim.ResizePtyRequest) (*types.Empty, error) { if r.ID == "" { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided") } @@ -282,7 +282,7 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (* } // State returns runtime state information for a process. -func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) { +func (s *Service) State(ctx context.Context, r *shim.StateRequest) (*shim.StateResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -301,7 +301,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. status = task.StatusStopped } sio := p.Stdio() - return &shimapi.StateResponse{ + return &shim.StateResponse{ ID: p.ID(), Bundle: s.bundle, Pid: uint32(p.Pid()), @@ -316,17 +316,17 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. } // Pause pauses the container. -func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) { +func (s *Service) Pause(ctx context.Context, r *types.Empty) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume resumes the container. -func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) { +func (s *Service) Resume(ctx context.Context, r *types.Empty) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill kills a process with the provided signal. -func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) { +func (s *Service) Kill(ctx context.Context, r *shim.KillRequest) (*types.Empty, error) { if r.ID == "" { p, err := s.getInitProcess() if err != nil { @@ -349,7 +349,7 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Emp } // ListPids returns all pids inside the container. -func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) { +func (s *Service) ListPids(ctx context.Context, r *shim.ListPidsRequest) (*shim.ListPidsResponse, error) { pids, err := s.getContainerPids(ctx, r.ID) if err != nil { return nil, errdefs.ToGRPC(err) @@ -374,13 +374,13 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh } processes = append(processes, &pInfo) } - return &shimapi.ListPidsResponse{ + return &shim.ListPidsResponse{ Processes: processes, }, nil } // CloseIO closes the I/O context of a process. -func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) { +func (s *Service) CloseIO(ctx context.Context, r *shim.CloseIORequest) (*types.Empty, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err @@ -394,31 +394,31 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *Service) Checkpoint(ctx context.Context, r *shim.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // ShimInfo returns shim information such as the shim's pid. -func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) { - return &shimapi.ShimInfoResponse{ +func (s *Service) ShimInfo(ctx context.Context, r *types.Empty) (*shim.ShimInfoResponse, error) { + return &shim.ShimInfoResponse{ ShimPid: uint32(os.Getpid()), }, nil } // Update updates a running container. -func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *Service) Update(ctx context.Context, r *shim.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Wait waits for a process to exit. -func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) { +func (s *Service) Wait(ctx context.Context, r *shim.WaitRequest) (*shim.WaitResponse, error) { p, err := s.getExecProcess(r.ID) if err != nil { return nil, err } p.Wait() - return &shimapi.WaitResponse{ + return &shim.WaitResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), }, nil @@ -430,11 +430,11 @@ func (s *Service) processExits() { } } -func (s *Service) allProcesses() []rproc.Process { +func (s *Service) allProcesses() []process.Process { s.mu.Lock() defer s.mu.Unlock() - res := make([]rproc.Process, 0, len(s.processes)) + res := make([]process.Process, 0, len(s.processes)) for _, p := range s.processes { res = append(res, p) } @@ -452,7 +452,7 @@ func (s *Service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -490,7 +490,7 @@ func (s *Service) forward(publisher events.Publisher) { } // getInitProcess returns the init process. -func (s *Service) getInitProcess() (rproc.Process, error) { +func (s *Service) getInitProcess() (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() p := s.processes[s.id] @@ -501,7 +501,7 @@ func (s *Service) getInitProcess() (rproc.Process, error) { } // getExecProcess returns the given exec process. -func (s *Service) getExecProcess(id string) (rproc.Process, error) { +func (s *Service) getExecProcess(id string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() p := s.processes[id] @@ -513,19 +513,19 @@ func (s *Service) getExecProcess(id string) (rproc.Process, error) { func getTopic(ctx context.Context, e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -533,7 +533,7 @@ func getTopic(ctx context.Context, e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform rproc.Platform, r *proc.CreateConfig) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform stdio.Platform, r *proc.CreateConfig) (*proc.Init, error) { var options runctypes.CreateOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -554,7 +554,7 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, runsc.FormatLogPath(r.ID, config) rootfs := filepath.Join(path, "rootfs") runtime := proc.NewRunsc(runtimeRoot, path, namespace, r.Runtime, config) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -568,6 +568,6 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, p.IoGID = int(options.IoGid) p.Sandbox = utils.IsSandbox(spec) p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } diff --git a/pkg/shim/v1/utils/BUILD b/pkg/shim/v1/utils/BUILD index 9045781e1..54a0aabb7 100644 --- a/pkg/shim/v1/utils/BUILD +++ b/pkg/shim/v1/utils/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "utils", srcs = [ + "annotations.go", "utils.go", "volumes.go", ], @@ -13,8 +14,7 @@ go_library( "//shim:__subpackages__", ], deps = [ - "@com_github_containerd_cri//pkg/annotations:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) @@ -23,4 +23,5 @@ go_test( size = "small", srcs = ["volumes_test.go"], library = ":utils", + deps = ["@com_github_opencontainers_runtime_spec//specs-go:go_default_library"], ) diff --git a/pkg/shim/v1/utils/annotations.go b/pkg/shim/v1/utils/annotations.go new file mode 100644 index 000000000..1e9d3f365 --- /dev/null +++ b/pkg/shim/v1/utils/annotations.go @@ -0,0 +1,25 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +// Annotations from the CRI annotations package. +// +// These are vendor due to import conflicts. +const ( + sandboxLogDirAnnotation = "io.kubernetes.cri.sandbox-log-directory" + containerTypeAnnotation = "io.kubernetes.cri.container-type" + containerTypeSandbox = "sandbox" + containerTypeContainer = "container" +) diff --git a/pkg/shim/v1/utils/utils.go b/pkg/shim/v1/utils/utils.go index 7a400af1c..07e346654 100644 --- a/pkg/shim/v1/utils/utils.go +++ b/pkg/shim/v1/utils/utils.go @@ -20,7 +20,6 @@ import ( "os" "path/filepath" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -43,13 +42,13 @@ func ReadSpec(bundle string) (*specs.Spec, error) { // IsSandbox checks whether a container is a sandbox container. func IsSandbox(spec *specs.Spec) bool { - t, ok := spec.Annotations[annotations.ContainerType] - return !ok || t == annotations.ContainerTypeSandbox + t, ok := spec.Annotations[containerTypeAnnotation] + return !ok || t == containerTypeSandbox } // UserLogPath gets user log path from OCI annotation. func UserLogPath(spec *specs.Spec) string { - sandboxLogDir := spec.Annotations[annotations.SandboxLogDir] + sandboxLogDir := spec.Annotations[sandboxLogDirAnnotation] if sandboxLogDir == "" { return "" } diff --git a/pkg/shim/v1/utils/volumes.go b/pkg/shim/v1/utils/volumes.go index e4e9bf9b1..52a428179 100644 --- a/pkg/shim/v1/utils/volumes.go +++ b/pkg/shim/v1/utils/volumes.go @@ -21,7 +21,6 @@ import ( "path/filepath" "strings" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -44,7 +43,7 @@ func volumeFieldName(k string) string { // podUID gets pod UID from the pod log path. func podUID(s *specs.Spec) (string, error) { - sandboxLogDir := s.Annotations[annotations.SandboxLogDir] + sandboxLogDir := s.Annotations[sandboxLogDirAnnotation] if sandboxLogDir == "" { return "", fmt.Errorf("no sandbox log path annotation") } @@ -101,7 +100,6 @@ func UpdateVolumeAnnotations(bundle string, s *specs.Spec) error { if err != nil { // Skip if we can't get pod UID, because this doesn't work // for containerd 1.1. - fmt.Errorf("Can't get pod uid: %w", err) return nil } } diff --git a/pkg/shim/v1/utils/volumes_test.go b/pkg/shim/v1/utils/volumes_test.go index 4b2639545..3e02c6151 100644 --- a/pkg/shim/v1/utils/volumes_test.go +++ b/pkg/shim/v1/utils/volumes_test.go @@ -23,7 +23,6 @@ import ( "reflect" "testing" - "github.com/containerd/cri/pkg/annotations" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -58,8 +57,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "volume annotations for sandbox", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -67,8 +66,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -81,8 +80,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "volume annotations for sandbox with legacy log path", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLegacyLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -90,8 +89,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLegacyLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -118,7 +117,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -140,7 +139,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -160,7 +159,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -176,7 +175,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -188,7 +187,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "should not return error without pod log directory", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeSandbox, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -196,7 +195,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeSandbox, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", @@ -207,8 +206,8 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "should return error if volume path does not exist", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, "dev.gvisor.spec.mount.notexist.share": "pod", "dev.gvisor.spec.mount.notexist.type": "tmpfs", "dev.gvisor.spec.mount.notexist.options": "ro", @@ -220,14 +219,14 @@ func TestUpdateVolumeAnnotations(t *testing.T) { desc: "no volume annotations for sandbox", spec: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, }, }, expected: &specs.Spec{ Annotations: map[string]string{ - annotations.SandboxLogDir: testLogDirPath, - annotations.ContainerType: annotations.ContainerTypeSandbox, + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, }, }, }, @@ -249,7 +248,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, }, }, expected: &specs.Spec{ @@ -268,7 +267,7 @@ func TestUpdateVolumeAnnotations(t *testing.T) { }, }, Annotations: map[string]string{ - annotations.ContainerType: annotations.ContainerTypeContainer, + containerTypeAnnotation: containerTypeContainer, }, }, }, diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD index 450f62979..7e0a114a0 100644 --- a/pkg/shim/v2/BUILD +++ b/pkg/shim/v2/BUILD @@ -5,6 +5,7 @@ package(licenses = ["notice"]) go_library( name = "v2", srcs = [ + "api.go", "epoll.go", "service.go", "service_linux.go", @@ -15,10 +16,10 @@ go_library( "//pkg/shim/v1/proc", "//pkg/shim/v1/utils", "//pkg/shim/v2/options", + "//pkg/shim/v2/runtimeoptions", "//runsc/specutils", "@com_github_burntsushi_toml//:go_default_library", "@com_github_containerd_cgroups//:go_default_library", - "@com_github_containerd_cgroups//stats/v1:go_default_library", "@com_github_containerd_console//:go_default_library", "@com_github_containerd_containerd//api/events:go_default_library", "@com_github_containerd_containerd//api/types/task:go_default_library", @@ -27,12 +28,13 @@ go_library( "@com_github_containerd_containerd//log:go_default_library", "@com_github_containerd_containerd//mount:go_default_library", "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", "@com_github_containerd_containerd//runtime:go_default_library", "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", - "@com_github_containerd_containerd//runtime/proc:go_default_library", "@com_github_containerd_containerd//runtime/v2/shim:go_default_library", "@com_github_containerd_containerd//runtime/v2/task:go_default_library", - "@com_github_containerd_cri//pkg/api/runtimeoptions/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", "@com_github_containerd_fifo//:go_default_library", "@com_github_containerd_typeurl//:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go new file mode 100644 index 000000000..dbe5c59f6 --- /dev/null +++ b/pkg/shim/v2/api.go @@ -0,0 +1,22 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskOOM = events.TaskOOM diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go index 45cc38c2a..41232cca8 100644 --- a/pkg/shim/v2/epoll.go +++ b/pkg/shim/v2/epoll.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/containerd/cgroups" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/events" "github.com/containerd/containerd/runtime" "golang.org/x/sys/unix" @@ -68,10 +67,11 @@ func (e *epoller) run(ctx context.Context) { default: n, err := unix.EpollWait(e.fd, events[:], -1) if err != nil { - if err == unix.EINTR { + if err == unix.EINTR || err == unix.EAGAIN { continue } - fmt.Errorf("cgroups: epoll wait: %w", err) + // Should not happen. + panic(fmt.Errorf("cgroups: epoll wait: %w", err)) } for i := 0; i < n; i++ { e.process(ctx, uintptr(events[i].Fd)) @@ -114,10 +114,11 @@ func (e *epoller) process(ctx context.Context, fd uintptr) { unix.Close(int(fd)) return } - if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{ + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{ ContainerID: i.id, }); err != nil { - fmt.Errorf("publish OOM event: %w", err) + // Should not happen. + panic(fmt.Errorf("publish OOM event: %w", err)) } } diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD new file mode 100644 index 000000000..01716034c --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/BUILD @@ -0,0 +1,20 @@ +load("//tools:defs.bzl", "go_library", "proto_library") + +package(licenses = ["notice"]) + +proto_library( + name = "api", + srcs = [ + "runtimeoptions.proto", + ], +) + +go_library( + name = "runtimeoptions", + srcs = ["runtimeoptions.go"], + visibility = ["//pkg/shim/v2:__pkg__"], + deps = [ + "//pkg/shim/v2/runtimeoptions:api_go_proto", + "@com_github_gogo_protobuf//proto:go_default_library", + ], +) diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go new file mode 100644 index 000000000..1c1a0c5d1 --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go @@ -0,0 +1,27 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtimeoptions + +import ( + proto "github.com/gogo/protobuf/proto" + pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto" +) + +type Options = pb.Options + +func init() { + proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options") +} diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto new file mode 100644 index 000000000..edb19020a --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto @@ -0,0 +1,25 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package runtimeoptions; + +// This is a version of the runtimeoptions CRI API that is vendored. +// +// Imported the full CRI package is a nightmare. +message Options { + string type_url = 1; + string config_path = 2; +} diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index c67b1beba..1534152fc 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -27,34 +27,34 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" - metrics "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" - eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" - runtimeoptions "github.com/containerd/cri/pkg/api/runtimeoptions/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/typeurl" - ptypes "github.com/gogo/protobuf/types" + "github.com/gogo/protobuf/types" "golang.org/x/sys/unix" "gvisor.dev/gvisor/pkg/shim/runsc" "gvisor.dev/gvisor/pkg/shim/v1/proc" "gvisor.dev/gvisor/pkg/shim/v1/utils" "gvisor.dev/gvisor/pkg/shim/v2/options" + "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" "gvisor.dev/gvisor/runsc/specutils" ) var ( - empty = &ptypes.Empty{} + empty = &types.Empty{} bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) @@ -70,24 +70,23 @@ var _ = (taskAPI.TaskService)(&service{}) const configFile = "config.toml" // New returns a new shim service that can be used via GRPC. -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { ep, err := newOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, oomPoller: ep, cancel: cancel, } go s.processExits() - runsc.Monitor = shim.Default + runsc.Monitor = reaper.Default if err := s.initPlatform(); err != nil { cancel() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) @@ -101,10 +100,10 @@ type service struct { mu sync.Mutex context context.Context - task rproc.Process - processes map[string]rproc.Process + task process.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform opts options.Options ec chan proc.Exit oomPoller *epoller @@ -141,7 +140,7 @@ func newCommand(ctx context.Context, containerdBinary, containerdAddress string) return cmd, nil } -func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { cmd, err := newCommand(ctx, containerdBinary, containerdAddress) if err != nil { return "", err @@ -270,7 +269,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * break } if o.TypeUrl != options.OptionType { - return nil, fmt.Errorf("unsupported runtimeoptions %q", o.TypeUrl) + return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) } path = o.ConfigPath default: @@ -415,7 +414,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } // Exec spawns an additional process inside the container. -func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { s.mu.Lock() p := s.processes[r.ExecID] s.mu.Unlock() @@ -444,7 +443,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } // ResizePty resizes the terminal of a process. -func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -494,17 +493,17 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. } // Pause the container. -func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Resume the container. -func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } // Kill a process with the provided signal. -func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -550,7 +549,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi } // CloseIO closes the I/O context of a process. -func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -564,7 +563,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp } // Checkpoint checkpoints the container. -func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -580,7 +579,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task }, nil } -func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { s.cancel() os.Exit(0) return empty, nil @@ -608,52 +607,52 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // gvisor currently (as of 2020-03-03) only returns the total memory // usage and current PID value[0]. However, we copy the common fields here // so that future updates will propagate correct information. We're - // using the metrics.Metrics structure so we're returning the same type + // using the cgroups.Metrics structure so we're returning the same type // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - data, err := typeurl.MarshalAny(&metrics.Metrics{ - CPU: &metrics.CPUStat{ - Usage: &metrics.CPUUsage{ + data, err := typeurl.MarshalAny(&cgroups.Metrics{ + CPU: &cgroups.CPUStat{ + Usage: &cgroups.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &metrics.Throttle{ + Throttling: &cgroups.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &metrics.MemoryStat{ + Memory: &cgroups.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &metrics.MemoryEntry{ + Usage: &cgroups.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &metrics.MemoryEntry{ + Swap: &cgroups.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &metrics.MemoryEntry{ + Kernel: &cgroups.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &metrics.MemoryEntry{ + KernelTCP: &cgroups.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &metrics.PidsStat{ + Pids: &cgroups.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, @@ -667,7 +666,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. } // Update updates a running container. -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) { return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) } @@ -707,7 +706,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.events <- &events.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(p.Pid()), @@ -719,7 +718,7 @@ func (s *service) checkProcesses(e proc.Exit) { } } -func (s *service) allProcesses() (o []rproc.Process) { +func (s *service) allProcesses() (o []process.Process) { s.mu.Lock() defer s.mu.Unlock() for _, p := range s.processes { @@ -749,18 +748,19 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(publisher shim.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) cancel() if err != nil { - fmt.Errorf("post event: %w", err) + // Should not happen. + panic(fmt.Errorf("post event: %w", err)) } } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getProcess(execID string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() if execID == "" { @@ -775,19 +775,19 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { func getTopic(e interface{}) string { switch e.(type) { - case *eventstypes.TaskCreate: + case *events.TaskCreate: return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: + case *events.TaskStart: return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: + case *events.TaskOOM: return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: + case *events.TaskExit: return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: + case *events.TaskDelete: return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: + case *events.TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: + case *events.TaskExecStarted: return runtime.TaskExecStartedEventTopic default: log.L.Printf("no topic for type %#v", e) @@ -795,7 +795,7 @@ func getTopic(e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { spec, err := utils.ReadSpec(r.Bundle) if err != nil { return nil, fmt.Errorf("read oci spec: %w", err) @@ -805,7 +805,7 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro } runsc.FormatLogPath(r.ID, options.RunscConfig) runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) - p := proc.New(r.ID, runtime, rproc.Stdio{ + p := proc.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -819,6 +819,6 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro p.IoGID = int(options.IoGid) p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox p.UserLog = utils.UserLogPath(spec) - p.Monitor = shim.Default + p.Monitor = reaper.Default return p, nil } diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go index 257c58812..1800ab90b 100644 --- a/pkg/shim/v2/service_linux.go +++ b/pkg/shim/v2/service_linux.go @@ -32,7 +32,7 @@ type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, fmt.Errorf("uninitialized epoller") } @@ -47,9 +47,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console if err != nil { return nil, err } - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(epollConsole, in, *p) @@ -65,9 +63,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console return nil, err } wg.Add(1) - cwg.Add(1) go func() { - cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(outw, epollConsole, *p) diff --git a/pkg/sleep/BUILD b/pkg/sleep/BUILD index e131455f7..ae0fe1522 100644 --- a/pkg/sleep/BUILD +++ b/pkg/sleep/BUILD @@ -12,6 +12,7 @@ go_library( "sleep_unsafe.go", ], visibility = ["//:sandbox"], + deps = ["//pkg/sync"], ) go_test( diff --git a/pkg/sleep/sleep_test.go b/pkg/sleep/sleep_test.go index af47e2ba1..1dd11707d 100644 --- a/pkg/sleep/sleep_test.go +++ b/pkg/sleep/sleep_test.go @@ -379,10 +379,7 @@ func TestRace(t *testing.T) { // TestRaceInOrder tests that multiple wakers can continuously send wake requests to // the sleeper and that the wakers are retrieved in the order asserted. func TestRaceInOrder(t *testing.T) { - const wakers = 100 - const wakeRequests = 10000 - - w := make([]Waker, wakers) + w := make([]Waker, 10000) s := Sleeper{} // Associate each waker and start goroutines that will assert them. @@ -390,19 +387,16 @@ func TestRaceInOrder(t *testing.T) { s.AddWaker(&w[i], i) } go func() { - n := 0 - for n < wakeRequests { - wk := w[n%len(w)] - wk.Assert() - n++ + for i := range w { + w[i].Assert() } }() // Wait for all wake up notifications from all wakers. - for i := 0; i < wakeRequests; i++ { - v, _ := s.Fetch(true) - if got, want := v, i%wakers; got != want { - t.Fatalf("got %d want %d", got, want) + for want := range w { + got, _ := s.Fetch(true) + if got != want { + t.Fatalf("got %d want %d", got, want) } } } diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go index f68c12620..118805492 100644 --- a/pkg/sleep/sleep_unsafe.go +++ b/pkg/sleep/sleep_unsafe.go @@ -75,6 +75,8 @@ package sleep import ( "sync/atomic" "unsafe" + + "gvisor.dev/gvisor/pkg/sync" ) const ( @@ -323,7 +325,12 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) { // // This struct is thread-safe, that is, its methods can be called concurrently // by multiple goroutines. +// +// Note, it is not safe to copy a Waker as its fields are modified by value +// (the pointer fields are individually modified with atomic operations). type Waker struct { + _ sync.NoCopy + // s is the sleeper that this waker can wake up. Only one sleeper at a // time is allowed. This field can have three classes of values: // nil -- the waker is not asserted: it either is not associated with diff --git a/pkg/sync/BUILD b/pkg/sync/BUILD index d0d77e19c..4d47207f7 100644 --- a/pkg/sync/BUILD +++ b/pkg/sync/BUILD @@ -33,6 +33,7 @@ go_library( "aliases.go", "memmove_unsafe.go", "mutex_unsafe.go", + "nocopy.go", "norace_unsafe.go", "race_unsafe.go", "rwmutex_unsafe.go", diff --git a/pkg/sync/nocopy.go b/pkg/sync/nocopy.go new file mode 100644 index 000000000..722b29501 --- /dev/null +++ b/pkg/sync/nocopy.go @@ -0,0 +1,28 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +// NoCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type NoCopy struct{} + +// Lock is a no-op used by -copylocks checker from `go vet`. +func (*NoCopy) Lock() {} + +// Unlock is a no-op used by -copylocks checker from `go vet`. +func (*NoCopy) Unlock() {} diff --git a/pkg/tcpip/header/BUILD b/pkg/tcpip/header/BUILD index 0cde694dc..d87797617 100644 --- a/pkg/tcpip/header/BUILD +++ b/pkg/tcpip/header/BUILD @@ -48,7 +48,7 @@ go_test( "//pkg/rand", "//pkg/tcpip", "//pkg/tcpip/buffer", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) @@ -64,6 +64,6 @@ go_test( deps = [ "//pkg/tcpip", "//pkg/tcpip/buffer", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go index eaee7e5d7..4bad930c7 100644 --- a/pkg/tcpip/link/fdbased/endpoint_test.go +++ b/pkg/tcpip/link/fdbased/endpoint_test.go @@ -500,3 +500,76 @@ func TestRecvMMsgDispatcherCapLength(t *testing.T) { } } + +// fakeNetworkDispatcher delivers packets to pkts. +type fakeNetworkDispatcher struct { + pkts []*stack.PacketBuffer +} + +func (d *fakeNetworkDispatcher) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) { + d.pkts = append(d.pkts, pkt) +} + +func TestDispatchPacketFormat(t *testing.T) { + for _, test := range []struct { + name string + newDispatcher func(fd int, e *endpoint) (linkDispatcher, error) + }{ + { + name: "readVDispatcher", + newDispatcher: newReadVDispatcher, + }, + { + name: "recvMMsgDispatcher", + newDispatcher: newRecvMMsgDispatcher, + }, + } { + t.Run(test.name, func(t *testing.T) { + // Create a socket pair to send/recv. + fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_DGRAM, 0) + if err != nil { + t.Fatal(err) + } + defer syscall.Close(fds[0]) + defer syscall.Close(fds[1]) + + data := []byte{ + // Ethernet header. + 1, 2, 3, 4, 5, 60, + 1, 2, 3, 4, 5, 61, + 8, 0, + // Mock network header. + 40, 41, 42, 43, + } + err = syscall.Sendmsg(fds[1], data, nil, nil, 0) + if err != nil { + t.Fatal(err) + } + + // Create and run dispatcher once. + sink := &fakeNetworkDispatcher{} + d, err := test.newDispatcher(fds[0], &endpoint{ + hdrSize: header.EthernetMinimumSize, + dispatcher: sink, + }) + if err != nil { + t.Fatal(err) + } + if ok, err := d.dispatch(); !ok || err != nil { + t.Fatalf("d.dispatch() = %v, %v", ok, err) + } + + // Verify packet. + if got, want := len(sink.pkts), 1; got != want { + t.Fatalf("len(sink.pkts) = %d, want %d", got, want) + } + pkt := sink.pkts[0] + if got, want := len(pkt.LinkHeader), header.EthernetMinimumSize; got != want { + t.Errorf("len(pkt.LinkHeader) = %d, want %d", got, want) + } + if got, want := pkt.Data.Size(), 4; got != want { + t.Errorf("pkt.Data.Size() = %d, want %d", got, want) + } + }) + } +} diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go index f04738cfb..d8f2504b3 100644 --- a/pkg/tcpip/link/fdbased/packet_dispatchers.go +++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go @@ -278,7 +278,7 @@ func (d *recvMMsgDispatcher) dispatch() (bool, *tcpip.Error) { eth header.Ethernet ) if d.e.hdrSize > 0 { - eth = header.Ethernet(d.views[k][0]) + eth = header.Ethernet(d.views[k][0][:header.EthernetMinimumSize]) p = eth.Type() remote = eth.SourceAddress() local = eth.DestinationAddress() diff --git a/pkg/tcpip/network/ipv4/BUILD b/pkg/tcpip/network/ipv4/BUILD index 78420d6e6..d142b4ffa 100644 --- a/pkg/tcpip/network/ipv4/BUILD +++ b/pkg/tcpip/network/ipv4/BUILD @@ -34,6 +34,6 @@ go_test( "//pkg/tcpip/transport/tcp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/network/ipv6/BUILD b/pkg/tcpip/network/ipv6/BUILD index 3f71fc520..feada63dc 100644 --- a/pkg/tcpip/network/ipv6/BUILD +++ b/pkg/tcpip/network/ipv6/BUILD @@ -39,6 +39,6 @@ go_test( "//pkg/tcpip/transport/icmp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/stack/BUILD b/pkg/tcpip/stack/BUILD index e65c731c2..6b9a6b316 100644 --- a/pkg/tcpip/stack/BUILD +++ b/pkg/tcpip/stack/BUILD @@ -27,6 +27,18 @@ go_template_instance( }, ) +go_template_instance( + name = "tuple_list", + out = "tuple_list.go", + package = "stack", + prefix = "tuple", + template = "//pkg/ilist:generic_list", + types = { + "Element": "*tuple", + "Linker": "*tuple", + }, +) + go_library( name = "stack", srcs = [ @@ -35,6 +47,7 @@ go_library( "forwarder.go", "icmp_rate_limit.go", "iptables.go", + "iptables_state.go", "iptables_targets.go", "iptables_types.go", "linkaddrcache.go", @@ -50,6 +63,7 @@ go_library( "stack_global_state.go", "stack_options.go", "transport_demuxer.go", + "tuple_list.go", ], visibility = ["//visibility:public"], deps = [ @@ -95,7 +109,7 @@ go_test( "//pkg/tcpip/transport/icmp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/stack/conntrack.go b/pkg/tcpip/stack/conntrack.go index af9c325ca..d39baf620 100644 --- a/pkg/tcpip/stack/conntrack.go +++ b/pkg/tcpip/stack/conntrack.go @@ -15,9 +15,12 @@ package stack import ( + "encoding/binary" "sync" + "time" "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/transport/tcpconntrack" ) @@ -30,6 +33,10 @@ import ( // // Currently, only TCP tracking is supported. +// Our hash table has 16K buckets. +// TODO(gvisor.dev/issue/170): These should be tunable. +const numBuckets = 1 << 14 + // Direction of the tuple. type direction int @@ -48,7 +55,12 @@ const ( // tuple holds a connection's identifying and manipulating data in one // direction. It is immutable. +// +// +stateify savable type tuple struct { + // tupleEntry is used to build an intrusive list of tuples. + tupleEntry + tupleID // conn is the connection tracking entry this tuple belongs to. @@ -61,6 +73,8 @@ type tuple struct { // tupleID uniquely identifies a connection in one direction. It currently // contains enough information to distinguish between any TCP or UDP // connection, and will need to be extended to support other protocols. +// +// +stateify savable type tupleID struct { srcAddr tcpip.Address srcPort uint16 @@ -83,6 +97,8 @@ func (ti tupleID) reply() tupleID { } // conn is a tracked connection. +// +// +stateify savable type conn struct { // original is the tuple in original direction. It is immutable. original tuple @@ -98,22 +114,67 @@ type conn struct { tcbHook Hook // mu protects tcb. - mu sync.Mutex + mu sync.Mutex `state:"nosave"` // tcb is TCB control block. It is used to keep track of states // of tcp connection and is protected by mu. tcb tcpconntrack.TCB + + // lastUsed is the last time the connection saw a relevant packet, and + // is updated by each packet on the connection. It is protected by mu. + lastUsed time.Time `state:".(unixTime)"` +} + +// timedOut returns whether the connection timed out based on its state. +func (cn *conn) timedOut(now time.Time) bool { + const establishedTimeout = 5 * 24 * time.Hour + const defaultTimeout = 120 * time.Second + cn.mu.Lock() + defer cn.mu.Unlock() + if cn.tcb.State() == tcpconntrack.ResultAlive { + // Use the same default as Linux, which doesn't delete + // established connections for 5(!) days. + return now.Sub(cn.lastUsed) > establishedTimeout + } + // Use the same default as Linux, which lets connections in most states + // other than established remain for <= 120 seconds. + return now.Sub(cn.lastUsed) > defaultTimeout } // ConnTrack tracks all connections created for NAT rules. Most users are // expected to only call handlePacket and createConnFor. +// +// ConnTrack keeps all connections in a slice of buckets, each of which holds a +// linked list of tuples. This gives us some desirable properties: +// - Each bucket has its own lock, lessening lock contention. +// - The slice is large enough that lists stay short (<10 elements on average). +// Thus traversal is fast. +// - During linked list traversal we reap expired connections. This amortizes +// the cost of reaping them and makes reapUnused faster. +// +// Locks are ordered by their location in the buckets slice. That is, a +// goroutine that locks buckets[i] can only lock buckets[j] s.t. i < j. +// +// +stateify savable type ConnTrack struct { - // mu protects conns. - mu sync.RWMutex + // seed is a one-time random value initialized at stack startup + // and is used in the calculation of hash keys for the list of buckets. + // It is immutable. + seed uint32 - // conns maintains a map of tuples needed for connection tracking for - // iptables NAT rules. It is protected by mu. - conns map[tupleID]tuple + // mu protects the buckets slice, but not buckets' contents. Only take + // the write lock if you are modifying the slice or saving for S/R. + mu sync.RWMutex `state:"nosave"` + + // buckets is protected by mu. + buckets []bucket +} + +// +stateify savable +type bucket struct { + // mu protects tuples. + mu sync.Mutex `state:"nosave"` + tuples tupleList } // packetToTupleID converts packet to a tuple ID. It fails when pkt lacks a valid @@ -143,8 +204,9 @@ func packetToTupleID(pkt *PacketBuffer) (tupleID, *tcpip.Error) { // newConn creates new connection. func newConn(orig, reply tupleID, manip manipType, hook Hook) *conn { conn := conn{ - manip: manip, - tcbHook: hook, + manip: manip, + tcbHook: hook, + lastUsed: time.Now(), } conn.original = tuple{conn: &conn, tupleID: orig} conn.reply = tuple{conn: &conn, tupleID: reply, direction: dirReply} @@ -162,14 +224,28 @@ func (ct *ConnTrack) connFor(pkt *PacketBuffer) (*conn, direction) { return nil, dirOriginal } - ct.mu.Lock() - defer ct.mu.Unlock() - - tuple, ok := ct.conns[tid] - if !ok { - return nil, dirOriginal + bucket := ct.bucket(tid) + now := time.Now() + + ct.mu.RLock() + defer ct.mu.RUnlock() + ct.buckets[bucket].mu.Lock() + defer ct.buckets[bucket].mu.Unlock() + + // Iterate over the tuples in a bucket, cleaning up any unused + // connections we find. + for other := ct.buckets[bucket].tuples.Front(); other != nil; other = other.Next() { + // Clean up any timed-out connections we happen to find. + if ct.reapTupleLocked(other, bucket, now) { + // The tuple expired. + continue + } + if tid == other.tupleID { + return other.conn, other.direction + } } - return tuple.conn, tuple.direction + + return nil, dirOriginal } // createConnFor creates a new conn for pkt. @@ -197,13 +273,31 @@ func (ct *ConnTrack) createConnFor(pkt *PacketBuffer, hook Hook, rt RedirectTarg } conn := newConn(tid, replyTID, manip, hook) - // Add the changed tuple to the map. - // TODO(gvisor.dev/issue/170): Need to support collisions using linked - // list. - ct.mu.Lock() - defer ct.mu.Unlock() - ct.conns[tid] = conn.original - ct.conns[replyTID] = conn.reply + // Lock the buckets in the correct order. + tupleBucket := ct.bucket(tid) + replyBucket := ct.bucket(replyTID) + ct.mu.RLock() + defer ct.mu.RUnlock() + if tupleBucket < replyBucket { + ct.buckets[tupleBucket].mu.Lock() + ct.buckets[replyBucket].mu.Lock() + } else if tupleBucket > replyBucket { + ct.buckets[replyBucket].mu.Lock() + ct.buckets[tupleBucket].mu.Lock() + } else { + // Both tuples are in the same bucket. + ct.buckets[tupleBucket].mu.Lock() + } + + // Add the tuple to the map. + ct.buckets[tupleBucket].tuples.PushFront(&conn.original) + ct.buckets[replyBucket].tuples.PushFront(&conn.reply) + + // Unlocking can happen in any order. + ct.buckets[tupleBucket].mu.Unlock() + if tupleBucket != replyBucket { + ct.buckets[replyBucket].mu.Unlock() + } return conn } @@ -297,35 +391,134 @@ func (ct *ConnTrack) handlePacket(pkt *PacketBuffer, hook Hook, gso *GSO, r *Rou // other tcp states. conn.mu.Lock() defer conn.mu.Unlock() - var st tcpconntrack.Result - tcpHeader := header.TCP(pkt.TransportHeader) - if conn.tcb.IsEmpty() { + + // Mark the connection as having been used recently so it isn't reaped. + conn.lastUsed = time.Now() + // Update connection state. + if tcpHeader := header.TCP(pkt.TransportHeader); conn.tcb.IsEmpty() { conn.tcb.Init(tcpHeader) conn.tcbHook = hook + } else if hook == conn.tcbHook { + conn.tcb.UpdateStateOutbound(tcpHeader) } else { - switch hook { - case conn.tcbHook: - st = conn.tcb.UpdateStateOutbound(tcpHeader) - default: - st = conn.tcb.UpdateStateInbound(tcpHeader) - } + conn.tcb.UpdateStateInbound(tcpHeader) } +} + +// bucket gets the conntrack bucket for a tupleID. +func (ct *ConnTrack) bucket(id tupleID) int { + h := jenkins.Sum32(ct.seed) + h.Write([]byte(id.srcAddr)) + h.Write([]byte(id.dstAddr)) + shortBuf := make([]byte, 2) + binary.LittleEndian.PutUint16(shortBuf, id.srcPort) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, id.dstPort) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, uint16(id.transProto)) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, uint16(id.netProto)) + h.Write([]byte(shortBuf)) + ct.mu.RLock() + defer ct.mu.RUnlock() + return int(h.Sum32()) % len(ct.buckets) +} - // Delete conn if tcp connection is closed. - if st == tcpconntrack.ResultClosedByPeer || st == tcpconntrack.ResultClosedBySelf || st == tcpconntrack.ResultReset { - ct.deleteConn(conn) +// reapUnused deletes timed out entries from the conntrack map. The rules for +// reaping are: +// - Most reaping occurs in connFor, which is called on each packet. connFor +// cleans up the bucket the packet's connection maps to. Thus calls to +// reapUnused should be fast. +// - Each call to reapUnused traverses a fraction of the conntrack table. +// Specifically, it traverses len(ct.buckets)/fractionPerReaping. +// - After reaping, reapUnused decides when it should next run based on the +// ratio of expired connections to examined connections. If the ratio is +// greater than maxExpiredPct, it schedules the next run quickly. Otherwise it +// slightly increases the interval between runs. +// - maxFullTraversal caps the time it takes to traverse the entire table. +// +// reapUnused returns the next bucket that should be checked and the time after +// which it should be called again. +func (ct *ConnTrack) reapUnused(start int, prevInterval time.Duration) (int, time.Duration) { + // TODO(gvisor.dev/issue/170): This can be more finely controlled, as + // it is in Linux via sysctl. + const fractionPerReaping = 128 + const maxExpiredPct = 50 + const maxFullTraversal = 60 * time.Second + const minInterval = 10 * time.Millisecond + const maxInterval = maxFullTraversal / fractionPerReaping + + now := time.Now() + checked := 0 + expired := 0 + var idx int + ct.mu.RLock() + defer ct.mu.RUnlock() + for i := 0; i < len(ct.buckets)/fractionPerReaping; i++ { + idx = (i + start) % len(ct.buckets) + ct.buckets[idx].mu.Lock() + for tuple := ct.buckets[idx].tuples.Front(); tuple != nil; tuple = tuple.Next() { + checked++ + if ct.reapTupleLocked(tuple, idx, now) { + expired++ + } + } + ct.buckets[idx].mu.Unlock() + } + // We already checked buckets[idx]. + idx++ + + // If half or more of the connections are expired, the table has gotten + // stale. Reschedule quickly. + expiredPct := 0 + if checked != 0 { + expiredPct = expired * 100 / checked + } + if expiredPct > maxExpiredPct { + return idx, minInterval + } + if interval := prevInterval + minInterval; interval <= maxInterval { + // Increment the interval between runs. + return idx, interval } + // We've hit the maximum interval. + return idx, maxInterval } -// deleteConn deletes the connection. -func (ct *ConnTrack) deleteConn(conn *conn) { - if conn == nil { - return +// reapTupleLocked tries to remove tuple and its reply from the table. It +// returns whether the tuple's connection has timed out. +// +// Preconditions: ct.mu is locked for reading and bucket is locked. +func (ct *ConnTrack) reapTupleLocked(tuple *tuple, bucket int, now time.Time) bool { + if !tuple.conn.timedOut(now) { + return false } - ct.mu.Lock() - defer ct.mu.Unlock() + // To maintain lock order, we can only reap these tuples if the reply + // appears later in the table. + replyBucket := ct.bucket(tuple.reply()) + if bucket > replyBucket { + return true + } + + // Don't re-lock if both tuples are in the same bucket. + differentBuckets := bucket != replyBucket + if differentBuckets { + ct.buckets[replyBucket].mu.Lock() + } + + // We have the buckets locked and can remove both tuples. + if tuple.direction == dirOriginal { + ct.buckets[replyBucket].tuples.Remove(&tuple.conn.reply) + } else { + ct.buckets[replyBucket].tuples.Remove(&tuple.conn.original) + } + ct.buckets[bucket].tuples.Remove(tuple) + + // Don't re-unlock if both tuples are in the same bucket. + if differentBuckets { + ct.buckets[replyBucket].mu.Unlock() + } - delete(ct.conns, conn.original.tupleID) - delete(ct.conns, conn.reply.tupleID) + return true } diff --git a/pkg/tcpip/stack/iptables.go b/pkg/tcpip/stack/iptables.go index 974d77c36..f846ea2e5 100644 --- a/pkg/tcpip/stack/iptables.go +++ b/pkg/tcpip/stack/iptables.go @@ -16,6 +16,7 @@ package stack import ( "fmt" + "time" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/header" @@ -41,6 +42,9 @@ const ( // underflow. const HookUnset = -1 +// reaperDelay is how long to wait before starting to reap connections. +const reaperDelay = 5 * time.Second + // DefaultTables returns a default set of tables. Each chain is set to accept // all packets. func DefaultTables() *IPTables { @@ -112,8 +116,9 @@ func DefaultTables() *IPTables { Output: []string{TablenameMangle, TablenameNat, TablenameFilter}, }, connections: ConnTrack{ - conns: make(map[tupleID]tuple), + seed: generateRandUint32(), }, + reaperDone: make(chan struct{}, 1), } } @@ -169,6 +174,12 @@ func (it *IPTables) GetTable(name string) (Table, bool) { func (it *IPTables) ReplaceTable(name string, table Table) { it.mu.Lock() defer it.mu.Unlock() + // If iptables is being enabled, initialize the conntrack table and + // reaper. + if !it.modified { + it.connections.buckets = make([]bucket, numBuckets) + it.startReaper(reaperDelay) + } it.modified = true it.tables[name] = table } @@ -249,6 +260,35 @@ func (it *IPTables) Check(hook Hook, pkt *PacketBuffer, gso *GSO, r *Route, addr return true } +// beforeSave is invoked by stateify. +func (it *IPTables) beforeSave() { + // Ensure the reaper exits cleanly. + it.reaperDone <- struct{}{} + // Prevent others from modifying the connection table. + it.connections.mu.Lock() +} + +// afterLoad is invoked by stateify. +func (it *IPTables) afterLoad() { + it.startReaper(reaperDelay) +} + +// startReaper starts a goroutine that wakes up periodically to reap timed out +// connections. +func (it *IPTables) startReaper(interval time.Duration) { + go func() { // S/R-SAFE: reaperDone is signalled when iptables is saved. + bucket := 0 + for { + select { + case <-it.reaperDone: + return + case <-time.After(interval): + bucket, interval = it.connections.reapUnused(bucket, interval) + } + } + }() +} + // CheckPackets runs pkts through the rules for hook and returns a map of packets that // should not go forward. // diff --git a/pkg/tcpip/stack/iptables_state.go b/pkg/tcpip/stack/iptables_state.go new file mode 100644 index 000000000..529e02a07 --- /dev/null +++ b/pkg/tcpip/stack/iptables_state.go @@ -0,0 +1,40 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stack + +import ( + "time" +) + +// +stateify savable +type unixTime struct { + second int64 + nano int64 +} + +// saveLastUsed is invoked by stateify. +func (cn *conn) saveLastUsed() unixTime { + return unixTime{cn.lastUsed.Unix(), cn.lastUsed.UnixNano()} +} + +// loadLastUsed is invoked by stateify. +func (cn *conn) loadLastUsed(unix unixTime) { + cn.lastUsed = time.Unix(unix.second, unix.nano) +} + +// beforeSave is invoked by stateify. +func (ct *ConnTrack) beforeSave() { + ct.mu.Lock() +} diff --git a/pkg/tcpip/stack/iptables_types.go b/pkg/tcpip/stack/iptables_types.go index c528ec381..eb70e3104 100644 --- a/pkg/tcpip/stack/iptables_types.go +++ b/pkg/tcpip/stack/iptables_types.go @@ -78,6 +78,8 @@ const ( ) // IPTables holds all the tables for a netstack. +// +// +stateify savable type IPTables struct { // mu protects tables, priorities, and modified. mu sync.RWMutex @@ -97,10 +99,15 @@ type IPTables struct { modified bool connections ConnTrack + + // reaperDone can be signalled to stop the reaper goroutine. + reaperDone chan struct{} } // A Table defines a set of chains and hooks into the network stack. It is // really just a list of rules. +// +// +stateify savable type Table struct { // Rules holds the rules that make up the table. Rules []Rule @@ -130,6 +137,8 @@ func (table *Table) ValidHooks() uint32 { // contains zero or more matchers, each of which is a specification of which // packets this rule applies to. If there are no matchers in the rule, it // applies to any packet. +// +// +stateify savable type Rule struct { // Filter holds basic IP filtering fields common to every rule. Filter IPHeaderFilter @@ -142,6 +151,8 @@ type Rule struct { } // IPHeaderFilter holds basic IP filtering data common to every rule. +// +// +stateify savable type IPHeaderFilter struct { // Protocol matches the transport protocol. Protocol tcpip.TransportProtocolNumber diff --git a/pkg/tcpip/stack/packet_buffer.go b/pkg/tcpip/stack/packet_buffer.go index 1b5da6017..e3556d5d2 100644 --- a/pkg/tcpip/stack/packet_buffer.go +++ b/pkg/tcpip/stack/packet_buffer.go @@ -14,6 +14,7 @@ package stack import ( + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" ) @@ -24,7 +25,7 @@ import ( // multiple endpoints. Clone() should be called in such cases so that // modifications to the Data field do not affect other copies. type PacketBuffer struct { - _ noCopy + _ sync.NoCopy // PacketBufferEntry is used to build an intrusive list of // PacketBuffers. @@ -102,14 +103,3 @@ func (pk *PacketBuffer) Clone() *PacketBuffer { NatDone: pk.NatDone, } } - -// noCopy may be embedded into structs which must not be copied -// after the first use. -// -// See https://golang.org/issues/8005#issuecomment-190753527 -// for details. -type noCopy struct{} - -// Lock is a no-op used by -copylocks checker from `go vet`. -func (*noCopy) Lock() {} -func (*noCopy) Unlock() {} diff --git a/pkg/tcpip/stack/stack.go b/pkg/tcpip/stack/stack.go index cdcfb8321..0aa815447 100644 --- a/pkg/tcpip/stack/stack.go +++ b/pkg/tcpip/stack/stack.go @@ -425,6 +425,7 @@ type Stack struct { handleLocal bool // tables are the iptables packet filtering and manipulation rules. + // TODO(gvisor.dev/issue/170): S/R this field. tables *IPTables // resumableEndpoints is a list of endpoints that need to be resumed if the diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index 25534a10d..71bcee785 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -782,7 +782,7 @@ type CongestionControlOption string // control algorithms. type AvailableCongestionControlOption string -// buffer moderation. +// ModerateReceiveBufferOption is used by buffer moderation. type ModerateReceiveBufferOption bool // TCPLingerTimeoutOption is used by SetSockOpt/GetSockOpt to set/get the @@ -855,7 +855,10 @@ type OutOfBandInlineOption int // a default TTL. type DefaultTTLOption uint8 -// +// SocketDetachFilterOption is used by SetSockOpt to detach a previously attached +// classic BPF filter on a given endpoint. +type SocketDetachFilterOption int + // IPPacketInfo is the message structure for IP_PKTINFO. // // +stateify savable @@ -1244,6 +1247,9 @@ type UDPStats struct { // ChecksumErrors is the number of datagrams dropped due to bad checksums. ChecksumErrors *StatCounter + + // InvalidSourceAddress is the number of invalid sourced datagrams dropped. + InvalidSourceAddress *StatCounter } // Stats holds statistics about the networking stack. diff --git a/pkg/tcpip/timer.go b/pkg/tcpip/timer.go index 59f3b391f..5554c573f 100644 --- a/pkg/tcpip/timer.go +++ b/pkg/tcpip/timer.go @@ -15,8 +15,9 @@ package tcpip import ( - "sync" "time" + + "gvisor.dev/gvisor/pkg/sync" ) // cancellableTimerInstance is a specific instance of CancellableTimer. @@ -92,6 +93,8 @@ func (t *cancellableTimerInstance) stop() { // Note, it is not safe to copy a CancellableTimer as its timer instance creates // a closure over the address of the CancellableTimer. type CancellableTimer struct { + _ sync.NoCopy + // The active instance of a cancellable timer. instance cancellableTimerInstance @@ -157,22 +160,6 @@ func (t *CancellableTimer) Reset(d time.Duration) { } } -// Lock is a no-op used by the copylocks checker from go vet. -// -// See CancellableTimer for details about why it shouldn't be copied. -// -// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more -// details about the copylocks checker. -func (*CancellableTimer) Lock() {} - -// Unlock is a no-op used by the copylocks checker from go vet. -// -// See CancellableTimer for details about why it shouldn't be copied. -// -// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more -// details about the copylocks checker. -func (*CancellableTimer) Unlock() {} - // NewCancellableTimer returns an unscheduled CancellableTimer with the given // locker and fn. // diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 62d1acad4..678f4e016 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -344,6 +344,10 @@ func (e *endpoint) Peek([][]byte) (int64, tcpip.ControlMessages, *tcpip.Error) { // SetSockOpt sets a socket option. func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + } return nil } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index a8f8454dd..57b7f5c19 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -278,7 +278,13 @@ func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // used with SetSockOpt, and this function always returns // tcpip.ErrNotSupported. func (ep *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { - return tcpip.ErrUnknownProtocolOption + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + + default: + return tcpip.ErrUnknownProtocolOption + } } // SetSockOptBool implements tcpip.Endpoint.SetSockOptBool. diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 5b6e7d102..c2e9fd29f 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -506,7 +506,13 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // SetSockOpt implements tcpip.Endpoint.SetSockOpt. func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { - return tcpip.ErrUnknownProtocolOption + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + + default: + return tcpip.ErrUnknownProtocolOption + } } // SetSockOptBool implements tcpip.Endpoint.SetSockOptBool. diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index caac6ef57..83dc10ed0 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -1792,6 +1792,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { e.deferAccept = time.Duration(v) e.UnlockUser() + case tcpip.SocketDetachFilterOption: + return nil + default: return nil } diff --git a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go index 12bc1b5b5..558b06df0 100644 --- a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go +++ b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go @@ -106,6 +106,11 @@ func (t *TCB) UpdateStateOutbound(tcp header.TCP) Result { return st } +// State returns the current state of the TCB. +func (t *TCB) State() Result { + return t.state +} + // IsAlive returns true as long as the connection is established(Alive) // or connecting state. func (t *TCB) IsAlive() bool { diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0584ec8dc..a14643ae8 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -816,6 +816,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { e.mu.Lock() e.bindToDevice = id e.mu.Unlock() + + case tcpip.SocketDetachFilterOption: + return nil } return nil } @@ -1377,6 +1380,15 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk return } + // Never receive from a multicast address. + if header.IsV4MulticastAddress(id.RemoteAddress) || + header.IsV6MulticastAddress(id.RemoteAddress) { + e.stack.Stats().UDP.InvalidSourceAddress.Increment() + e.stack.Stats().IP.InvalidSourceAddressesReceived.Increment() + e.stats.ReceiveErrors.MalformedPacketsReceived.Increment() + return + } + // Verify checksum unless RX checksum offload is enabled. // On IPv4, UDP checksum is optional, and a zero value means // the transmitter omitted the checksum generation (RFC768). @@ -1395,10 +1407,10 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk } } - e.rcvMu.Lock() e.stack.Stats().UDP.PacketsReceived.Increment() e.stats.PacketsReceived.Increment() + e.rcvMu.Lock() // Drop the packet if our buffer is currently full. if !e.rcvReady || e.rcvClosed { e.rcvMu.Unlock() diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go index 91ba031fa..90781cf49 100644 --- a/pkg/tcpip/transport/udp/udp_test.go +++ b/pkg/tcpip/transport/udp/udp_test.go @@ -83,16 +83,18 @@ type header4Tuple struct { type testFlow int const ( - unicastV4 testFlow = iota // V4 unicast on a V4 socket - unicastV4in6 // V4-mapped unicast on a V6-dual socket - unicastV6 // V6 unicast on a V6 socket - unicastV6Only // V6 unicast on a V6-only socket - multicastV4 // V4 multicast on a V4 socket - multicastV4in6 // V4-mapped multicast on a V6-dual socket - multicastV6 // V6 multicast on a V6 socket - multicastV6Only // V6 multicast on a V6-only socket - broadcast // V4 broadcast on a V4 socket - broadcastIn6 // V4-mapped broadcast on a V6-dual socket + unicastV4 testFlow = iota // V4 unicast on a V4 socket + unicastV4in6 // V4-mapped unicast on a V6-dual socket + unicastV6 // V6 unicast on a V6 socket + unicastV6Only // V6 unicast on a V6-only socket + multicastV4 // V4 multicast on a V4 socket + multicastV4in6 // V4-mapped multicast on a V6-dual socket + multicastV6 // V6 multicast on a V6 socket + multicastV6Only // V6 multicast on a V6-only socket + broadcast // V4 broadcast on a V4 socket + broadcastIn6 // V4-mapped broadcast on a V6-dual socket + reverseMulticast4 // V4 multicast src. Must fail. + reverseMulticast6 // V6 multicast src. Must fail. ) func (flow testFlow) String() string { @@ -117,6 +119,10 @@ func (flow testFlow) String() string { return "broadcast" case broadcastIn6: return "broadcastIn6" + case reverseMulticast4: + return "reverseMulticast4" + case reverseMulticast6: + return "reverseMulticast6" default: return "unknown" } @@ -168,6 +174,9 @@ func (flow testFlow) header4Tuple(d packetDirection) header4Tuple { h.dstAddr.Addr = multicastV6Addr } } + if flow.isReverseMulticast() { + h.srcAddr.Addr = flow.getMcastAddr() + } return h } @@ -199,9 +208,9 @@ func (flow testFlow) netProto() tcpip.NetworkProtocolNumber { // endpoint for this flow. func (flow testFlow) sockProto() tcpip.NetworkProtocolNumber { switch flow { - case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6: + case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6, reverseMulticast6: return ipv6.ProtocolNumber - case unicastV4, multicastV4, broadcast: + case unicastV4, multicastV4, broadcast, reverseMulticast4: return ipv4.ProtocolNumber default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -224,7 +233,7 @@ func (flow testFlow) isV6Only() bool { switch flow { case unicastV6Only, multicastV6Only: return true - case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6: + case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -235,7 +244,7 @@ func (flow testFlow) isMulticast() bool { switch flow { case multicastV4, multicastV4in6, multicastV6, multicastV6Only: return true - case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6: + case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -246,7 +255,7 @@ func (flow testFlow) isBroadcast() bool { switch flow { case broadcast, broadcastIn6: return true - case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only: + case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -257,13 +266,22 @@ func (flow testFlow) isMapped() bool { switch flow { case unicastV4in6, multicastV4in6, broadcastIn6: return true - case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast: + case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) } } +func (flow testFlow) isReverseMulticast() bool { + switch flow { + case reverseMulticast4, reverseMulticast6: + return true + default: + return false + } +} + type testContext struct { t *testing.T linkEP *channel.Endpoint @@ -872,6 +890,60 @@ func TestV4ReadOnBoundToBroadcast(t *testing.T) { } } +// TestReadFromMulticast checks that an endpoint will NOT receive a packet +// that was sent with multicast SOURCE address. +func TestReadFromMulticast(t *testing.T) { + for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6} { + t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) { + c := newDualTestContext(t, defaultMTU) + defer c.cleanup() + + c.createEndpointForFlow(flow) + + if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil { + t.Fatalf("Bind failed: %s", err) + } + testFailingRead(c, flow, false /* expectReadError */) + }) + } +} + +// TestReadFromMulticaststats checks that a discarded packet +// that that was sent with multicast SOURCE address increments +// the correct counters and that a regular packet does not. +func TestReadFromMulticastStats(t *testing.T) { + t.Helper() + for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6, unicastV4} { + t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) { + c := newDualTestContext(t, defaultMTU) + defer c.cleanup() + + c.createEndpointForFlow(flow) + + if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil { + t.Fatalf("Bind failed: %s", err) + } + + payload := newPayload() + c.injectPacket(flow, payload) + + var want uint64 = 0 + if flow.isReverseMulticast() { + want = 1 + } + if got := c.s.Stats().IP.InvalidSourceAddressesReceived.Value(); got != want { + t.Errorf("got stats.IP.InvalidSourceAddressesReceived.Value() = %d, want = %d", got, want) + } + if got := c.s.Stats().UDP.InvalidSourceAddress.Value(); got != want { + t.Errorf("got stats.UDP.InvalidSourceAddress.Value() = %d, want = %d", got, want) + } + if got := c.ep.Stats().(*tcpip.TransportEndpointStats).ReceiveErrors.MalformedPacketsReceived.Value(); got != want { + t.Errorf("got EP Stats.ReceiveErrors.MalformedPacketsReceived stats = %d, want = %d", got, want) + } + }) + } +} + // TestV4ReadBroadcastOnBoundToWildcard checks that an endpoint can bind to ANY // and receive broadcast and unicast data. func TestV4ReadBroadcastOnBoundToWildcard(t *testing.T) { diff --git a/pkg/test/criutil/criutil.go b/pkg/test/criutil/criutil.go index 4c63d669a..66f10c016 100644 --- a/pkg/test/criutil/criutil.go +++ b/pkg/test/criutil/criutil.go @@ -103,6 +103,9 @@ func (cc *Crictl) Create(podID, contSpecFile, sbSpecFile string) (string, error) // need to parse the version and add an appropriate --no-pull argument // since the image has already been loaded locally. out, err := cc.run("-v") + if err != nil { + return "", err + } r := regexp.MustCompile("crictl version ([0-9]+)\\.([0-9]+)\\.([0-9+])") vs := r.FindStringSubmatch(out) if len(vs) != 4 { diff --git a/pkg/test/dockerutil/dockerutil.go b/pkg/test/dockerutil/dockerutil.go index f95ae3cd1..df09babf3 100644 --- a/pkg/test/dockerutil/dockerutil.go +++ b/pkg/test/dockerutil/dockerutil.go @@ -119,3 +119,8 @@ func Save(logger testutil.Logger, image string, w io.Writer) error { cmd.Stdout = w // Send directly to the writer. return cmd.Run() } + +// Runtime returns the value of the flag runtime. +func Runtime() string { + return *runtime +} diff --git a/pkg/test/testutil/BUILD b/pkg/test/testutil/BUILD index 03b1b4677..2d8f56bc0 100644 --- a/pkg/test/testutil/BUILD +++ b/pkg/test/testutil/BUILD @@ -15,6 +15,6 @@ go_library( "//runsc/boot", "//runsc/specutils", "@com_github_cenkalti_backoff//:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) |