diff options
Diffstat (limited to 'pkg')
79 files changed, 5482 insertions, 196 deletions
diff --git a/pkg/sentry/fsimpl/ext/BUILD b/pkg/sentry/fsimpl/ext/BUILD index ef24f8159..abc610ef3 100644 --- a/pkg/sentry/fsimpl/ext/BUILD +++ b/pkg/sentry/fsimpl/ext/BUILD @@ -96,7 +96,7 @@ go_test( "//pkg/syserror", "//pkg/test/testutil", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", - "@com_github_google_go-cmp//cmp/cmpopts:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp/cmpopts:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/gofer/gofer.go b/pkg/sentry/fsimpl/gofer/gofer.go index 2b83094cd..b74d489a0 100644 --- a/pkg/sentry/fsimpl/gofer/gofer.go +++ b/pkg/sentry/fsimpl/gofer/gofer.go @@ -602,8 +602,14 @@ type dentry struct { // returned by the server. dirents is protected by dirMu. dirents []vfs.Dirent - // Cached metadata; protected by metadataMu and accessed using atomic - // memory operations unless otherwise specified. + // Cached metadata; protected by metadataMu. + // To access: + // - In situations where consistency is not required (like stat), these + // can be accessed using atomic operations only (without locking). + // - Lock metadataMu and can access without atomic operations. + // To mutate: + // - Lock metadataMu and use atomic operations to update because we might + // have atomic readers that don't hold the lock. metadataMu sync.Mutex ino inodeNumber // immutable mode uint32 // type is immutable, perms are mutable @@ -616,7 +622,7 @@ type dentry struct { ctime int64 btime int64 // File size, protected by both metadataMu and dataMu (i.e. both must be - // locked to mutate it). + // locked to mutate it; locking either is sufficient to access it). size uint64 // nlink counts the number of hard links to this dentry. It's updated and @@ -904,14 +910,14 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin // Prepare for truncate. if stat.Mask&linux.STATX_SIZE != 0 { - switch d.mode & linux.S_IFMT { - case linux.S_IFREG: + switch mode.FileType() { + case linux.ModeRegular: if !setLocalMtime { // Truncate updates mtime. setLocalMtime = true stat.Mtime.Nsec = linux.UTIME_NOW } - case linux.S_IFDIR: + case linux.ModeDirectory: return syserror.EISDIR default: return syserror.EINVAL @@ -994,7 +1000,7 @@ func (d *dentry) setStat(ctx context.Context, creds *auth.Credentials, stat *lin func (d *dentry) updateFileSizeLocked(newSize uint64) { d.dataMu.Lock() oldSize := d.size - d.size = newSize + atomic.StoreUint64(&d.size, newSize) // d.dataMu must be unlocked to lock d.mapsMu and invalidate mappings // below. This allows concurrent calls to Read/Translate/etc. These // functions synchronize with truncation by refusing to use cache @@ -1340,8 +1346,8 @@ func (d *dentry) removexattr(ctx context.Context, creds *auth.Credentials, name // Extended attributes in the user.* namespace are only supported for regular // files and directories. func (d *dentry) userXattrSupported() bool { - filetype := linux.S_IFMT & atomic.LoadUint32(&d.mode) - return filetype == linux.S_IFREG || filetype == linux.S_IFDIR + filetype := linux.FileMode(atomic.LoadUint32(&d.mode)).FileType() + return filetype == linux.ModeRegular || filetype == linux.ModeDirectory } // Preconditions: !d.isSynthetic(). d.isRegularFile() || d.isDir(). diff --git a/pkg/sentry/fsimpl/gofer/regular_file.go b/pkg/sentry/fsimpl/gofer/regular_file.go index a2f02d9c7..f10350c97 100644 --- a/pkg/sentry/fsimpl/gofer/regular_file.go +++ b/pkg/sentry/fsimpl/gofer/regular_file.go @@ -89,7 +89,9 @@ func (fd *regularFileFD) Allocate(ctx context.Context, mode, offset, length uint if err != nil { return err } - d.size = size + d.dataMu.Lock() + atomic.StoreUint64(&d.size, size) + d.dataMu.Unlock() if !d.cachedMetadataAuthoritative() { d.touchCMtimeLocked() } diff --git a/pkg/sentry/fsimpl/gofer/special_file.go b/pkg/sentry/fsimpl/gofer/special_file.go index c1e6b13e5..a7b53b2d2 100644 --- a/pkg/sentry/fsimpl/gofer/special_file.go +++ b/pkg/sentry/fsimpl/gofer/special_file.go @@ -28,9 +28,9 @@ import ( ) // specialFileFD implements vfs.FileDescriptionImpl for pipes, sockets, device -// special files, and (when filesystemOptions.specialRegularFiles is in effect) -// regular files. specialFileFD differs from regularFileFD by using per-FD -// handles instead of shared per-dentry handles, and never buffering I/O. +// special files, and (when filesystemOptions.regularFilesUseSpecialFileFD is +// in effect) regular files. specialFileFD differs from regularFileFD by using +// per-FD handles instead of shared per-dentry handles, and never buffering I/O. type specialFileFD struct { fileDescription diff --git a/pkg/sentry/fsimpl/kernfs/BUILD b/pkg/sentry/fsimpl/kernfs/BUILD index 179df6c1e..3835557fe 100644 --- a/pkg/sentry/fsimpl/kernfs/BUILD +++ b/pkg/sentry/fsimpl/kernfs/BUILD @@ -70,6 +70,6 @@ go_test( "//pkg/sentry/vfs", "//pkg/syserror", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/sys/BUILD b/pkg/sentry/fsimpl/sys/BUILD index a741e2bb6..1b548ccd4 100644 --- a/pkg/sentry/fsimpl/sys/BUILD +++ b/pkg/sentry/fsimpl/sys/BUILD @@ -29,6 +29,6 @@ go_test( "//pkg/sentry/kernel", "//pkg/sentry/kernel/auth", "//pkg/sentry/vfs", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/fsimpl/testutil/BUILD b/pkg/sentry/fsimpl/testutil/BUILD index 0e4053a46..400a97996 100644 --- a/pkg/sentry/fsimpl/testutil/BUILD +++ b/pkg/sentry/fsimpl/testutil/BUILD @@ -32,6 +32,6 @@ go_library( "//pkg/sentry/vfs", "//pkg/sync", "//pkg/usermem", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/sentry/kernel/timekeeper.go b/pkg/sentry/kernel/timekeeper.go index 0adf25691..5f3908d8b 100644 --- a/pkg/sentry/kernel/timekeeper.go +++ b/pkg/sentry/kernel/timekeeper.go @@ -210,9 +210,6 @@ func (t *Timekeeper) startUpdater() { p.realtimeBaseRef = int64(realtimeParams.BaseRef) p.realtimeFrequency = realtimeParams.Frequency } - - log.Debugf("Updating VDSO parameters: %+v", p) - return p }); err != nil { log.Warningf("Unable to update VDSO parameter page: %v", err) diff --git a/pkg/sentry/platform/kvm/BUILD b/pkg/sentry/platform/kvm/BUILD index 4792454c4..10a10bfe2 100644 --- a/pkg/sentry/platform/kvm/BUILD +++ b/pkg/sentry/platform/kvm/BUILD @@ -60,6 +60,7 @@ go_library( go_test( name = "kvm_test", srcs = [ + "kvm_amd64_test.go", "kvm_test.go", "virtual_map_test.go", ], diff --git a/pkg/sentry/platform/kvm/kvm_amd64_test.go b/pkg/sentry/platform/kvm/kvm_amd64_test.go new file mode 100644 index 000000000..c0b4fd374 --- /dev/null +++ b/pkg/sentry/platform/kvm/kvm_amd64_test.go @@ -0,0 +1,51 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build amd64 + +package kvm + +import ( + "testing" + + "gvisor.dev/gvisor/pkg/sentry/arch" + "gvisor.dev/gvisor/pkg/sentry/platform" + "gvisor.dev/gvisor/pkg/sentry/platform/kvm/testutil" + "gvisor.dev/gvisor/pkg/sentry/platform/ring0" + "gvisor.dev/gvisor/pkg/sentry/platform/ring0/pagetables" +) + +func TestSegments(t *testing.T) { + applicationTest(t, true, testutil.TwiddleSegments, func(c *vCPU, regs *arch.Registers, pt *pagetables.PageTables) bool { + testutil.SetTestSegments(regs) + for { + var si arch.SignalInfo + if _, err := c.SwitchToUser(ring0.SwitchOpts{ + Registers: regs, + FloatingPointState: dummyFPState, + PageTables: pt, + FullRestore: true, + }, &si); err == platform.ErrContextInterrupt { + continue // Retry. + } else if err != nil { + t.Errorf("application segment check with full restore got unexpected error: %v", err) + } + if err := testutil.CheckTestSegments(regs); err != nil { + t.Errorf("application segment check with full restore failed: %v", err) + } + break // Done. + } + return false + }) +} diff --git a/pkg/sentry/platform/kvm/kvm_test.go b/pkg/sentry/platform/kvm/kvm_test.go index 6c8f4fa28..45b3180f1 100644 --- a/pkg/sentry/platform/kvm/kvm_test.go +++ b/pkg/sentry/platform/kvm/kvm_test.go @@ -262,30 +262,6 @@ func TestRegistersFault(t *testing.T) { }) } -func TestSegments(t *testing.T) { - applicationTest(t, true, testutil.TwiddleSegments, func(c *vCPU, regs *arch.Registers, pt *pagetables.PageTables) bool { - testutil.SetTestSegments(regs) - for { - var si arch.SignalInfo - if _, err := c.SwitchToUser(ring0.SwitchOpts{ - Registers: regs, - FloatingPointState: dummyFPState, - PageTables: pt, - FullRestore: true, - }, &si); err == platform.ErrContextInterrupt { - continue // Retry. - } else if err != nil { - t.Errorf("application segment check with full restore got unexpected error: %v", err) - } - if err := testutil.CheckTestSegments(regs); err != nil { - t.Errorf("application segment check with full restore failed: %v", err) - } - break // Done. - } - return false - }) -} - func TestBounce(t *testing.T) { applicationTest(t, true, testutil.SpinLoop, func(c *vCPU, regs *arch.Registers, pt *pagetables.PageTables) bool { go func() { diff --git a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go index 8bed34922..3de309c1a 100644 --- a/pkg/sentry/platform/kvm/machine_arm64_unsafe.go +++ b/pkg/sentry/platform/kvm/machine_arm64_unsafe.go @@ -78,19 +78,6 @@ func (c *vCPU) initArchState() error { return err } - // sctlr_el1 - regGet.id = _KVM_ARM64_REGS_SCTLR_EL1 - if err := c.getOneRegister(®Get); err != nil { - return err - } - - dataGet |= (_SCTLR_M | _SCTLR_C | _SCTLR_I) - data = dataGet - reg.id = _KVM_ARM64_REGS_SCTLR_EL1 - if err := c.setOneRegister(®); err != nil { - return err - } - // tcr_el1 data = _TCR_TXSZ_VA48 | _TCR_CACHE_FLAGS | _TCR_SHARED | _TCR_TG_FLAGS | _TCR_ASID16 | _TCR_IPS_40BITS reg.id = _KVM_ARM64_REGS_TCR_EL1 diff --git a/pkg/sentry/platform/kvm/testutil/testutil_arm64.s b/pkg/sentry/platform/kvm/testutil/testutil_arm64.s index 0bebee852..07658144e 100644 --- a/pkg/sentry/platform/kvm/testutil/testutil_arm64.s +++ b/pkg/sentry/platform/kvm/testutil/testutil_arm64.s @@ -104,3 +104,9 @@ TEXT ·TwiddleRegsSyscall(SB),NOSPLIT,$0 TWIDDLE_REGS() SVC RET // never reached + +TEXT ·TwiddleRegsFault(SB),NOSPLIT,$0 + TWIDDLE_REGS() + // Branch to Register branches unconditionally to an address in <Rn>. + JMP (R4) // <=> br x4, must fault + RET // never reached diff --git a/pkg/sentry/platform/ring0/entry_arm64.s b/pkg/sentry/platform/ring0/entry_arm64.s index 2bc5f3ecd..6ed73699b 100644 --- a/pkg/sentry/platform/ring0/entry_arm64.s +++ b/pkg/sentry/platform/ring0/entry_arm64.s @@ -40,6 +40,14 @@ #define FPEN_ENABLE (FPEN_NOTRAP << FPEN_SHIFT) +// sctlr_el1: system control register el1. +#define SCTLR_M 1 << 0 +#define SCTLR_C 1 << 2 +#define SCTLR_I 1 << 12 +#define SCTLR_UCT 1 << 15 + +#define SCTLR_EL1_DEFAULT (SCTLR_M | SCTLR_C | SCTLR_I | SCTLR_UCT) + // Saves a register set. // // This is a macro because it may need to executed in contents where a stack is @@ -496,6 +504,11 @@ TEXT ·kernelExitToEl1(SB),NOSPLIT,$0 // Start is the CPU entrypoint. TEXT ·Start(SB),NOSPLIT,$0 IRQ_DISABLE + + // Init. + MOVD $SCTLR_EL1_DEFAULT, R1 + MSR R1, SCTLR_EL1 + MOVD R8, RSV_REG ORR $0xffff000000000000, RSV_REG, RSV_REG WORD $0xd518d092 //MSR R18, TPIDR_EL1 diff --git a/pkg/sentry/socket/hostinet/socket_vfs2.go b/pkg/sentry/socket/hostinet/socket_vfs2.go index 8f192c62f..8a1d52ebf 100644 --- a/pkg/sentry/socket/hostinet/socket_vfs2.go +++ b/pkg/sentry/socket/hostinet/socket_vfs2.go @@ -71,6 +71,7 @@ func newVFS2Socket(t *kernel.Task, family int, stype linux.SockType, protocol in DenyPWrite: true, UseDentryMetadata: true, }); err != nil { + fdnotifier.RemoveFD(int32(s.fd)) return nil, syserr.FromError(err) } return vfsfd, nil diff --git a/pkg/sentry/socket/netstack/netstack.go b/pkg/sentry/socket/netstack/netstack.go index e7d2c83d7..78a842973 100644 --- a/pkg/sentry/socket/netstack/netstack.go +++ b/pkg/sentry/socket/netstack/netstack.go @@ -192,6 +192,7 @@ var Metrics = tcpip.Stats{ PacketsSent: mustCreateMetric("/netstack/udp/packets_sent", "Number of UDP datagrams sent."), PacketSendErrors: mustCreateMetric("/netstack/udp/packet_send_errors", "Number of UDP datagrams failed to be sent."), ChecksumErrors: mustCreateMetric("/netstack/udp/checksum_errors", "Number of UDP datagrams dropped due to bad checksums."), + InvalidSourceAddress: mustCreateMetric("/netstack/udp/invalid_source", "Number of UDP datagrams dropped due to invalid source address."), }, } @@ -1753,6 +1754,11 @@ func setSockOptSocket(t *kernel.Task, s socket.SocketOps, ep commonEndpoint, nam return nil + case linux.SO_DETACH_FILTER: + // optval is ignored. + var v tcpip.SocketDetachFilterOption + return syserr.TranslateNetstackError(ep.SetSockOpt(v)) + default: socket.SetSockOptEmitUnimplementedEvent(t, name) } @@ -2112,13 +2118,22 @@ func setSockOptIP(t *kernel.Task, ep commonEndpoint, name int, optVal []byte) *s } return syserr.TranslateNetstackError(ep.SetSockOptBool(tcpip.ReceiveIPPacketInfoOption, v != 0)) + case linux.IP_HDRINCL: + if len(optVal) == 0 { + return nil + } + v, err := parseIntOrChar(optVal) + if err != nil { + return err + } + return syserr.TranslateNetstackError(ep.SetSockOptBool(tcpip.IPHdrIncludedOption, v != 0)) + case linux.IP_ADD_SOURCE_MEMBERSHIP, linux.IP_BIND_ADDRESS_NO_PORT, linux.IP_BLOCK_SOURCE, linux.IP_CHECKSUM, linux.IP_DROP_SOURCE_MEMBERSHIP, linux.IP_FREEBIND, - linux.IP_HDRINCL, linux.IP_IPSEC_POLICY, linux.IP_MINTTL, linux.IP_MSFILTER, diff --git a/pkg/sentry/time/parameters.go b/pkg/sentry/time/parameters.go index 65868cb26..cd1b95117 100644 --- a/pkg/sentry/time/parameters.go +++ b/pkg/sentry/time/parameters.go @@ -228,11 +228,15 @@ func errorAdjust(prevParams Parameters, newParams Parameters, now TSCValue) (Par // // The log level is determined by the error severity. func logErrorAdjustment(clock ClockID, errorNS ReferenceNS, orig, adjusted Parameters) { - fn := log.Debugf - if int64(errorNS.Magnitude()) > time.Millisecond.Nanoseconds() { + magNS := int64(errorNS.Magnitude()) + if magNS <= 10*time.Microsecond.Nanoseconds() { + // Don't log small errors. + return + } + fn := log.Infof + if magNS > time.Millisecond.Nanoseconds() { + // Upgrade large errors to warning. fn = log.Warningf - } else if int64(errorNS.Magnitude()) > 10*time.Microsecond.Nanoseconds() { - fn = log.Infof } fn("Clock(%v): error: %v ns, adjusted frequency from %v Hz to %v Hz", clock, errorNS, orig.Frequency, adjusted.Frequency) diff --git a/pkg/sentry/vfs/inotify.go b/pkg/sentry/vfs/inotify.go index c2e21ac5f..167b731ac 100644 --- a/pkg/sentry/vfs/inotify.go +++ b/pkg/sentry/vfs/inotify.go @@ -179,12 +179,12 @@ func (i *Inotify) Readiness(mask waiter.EventMask) waiter.EventMask { return mask & ready } -// PRead implements FileDescriptionImpl. +// PRead implements FileDescriptionImpl.PRead. func (*Inotify) PRead(ctx context.Context, dst usermem.IOSequence, offset int64, opts ReadOptions) (int64, error) { return 0, syserror.ESPIPE } -// PWrite implements FileDescriptionImpl. +// PWrite implements FileDescriptionImpl.PWrite. func (*Inotify) PWrite(ctx context.Context, src usermem.IOSequence, offset int64, opts WriteOptions) (int64, error) { return 0, syserror.ESPIPE } @@ -243,7 +243,7 @@ func (i *Inotify) Read(ctx context.Context, dst usermem.IOSequence, opts ReadOpt return writeLen, nil } -// Ioctl implements fs.FileOperations.Ioctl. +// Ioctl implements FileDescriptionImpl.Ioctl. func (i *Inotify) Ioctl(ctx context.Context, uio usermem.IO, args arch.SyscallArguments) (uintptr, error) { switch args[1].Int() { case linux.FIONREAD: diff --git a/pkg/shim/runsc/BUILD b/pkg/shim/runsc/BUILD new file mode 100644 index 000000000..f08599ebd --- /dev/null +++ b/pkg/shim/runsc/BUILD @@ -0,0 +1,16 @@ +load("//tools:defs.bzl", "go_library") + +package(licenses = ["notice"]) + +go_library( + name = "runsc", + srcs = [ + "runsc.go", + "utils.go", + ], + visibility = ["//:sandbox"], + deps = [ + "@com_github_containerd_go_runc//:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", + ], +) diff --git a/pkg/shim/runsc/runsc.go b/pkg/shim/runsc/runsc.go new file mode 100644 index 000000000..c5cf68efa --- /dev/null +++ b/pkg/shim/runsc/runsc.go @@ -0,0 +1,514 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runsc + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strconv" + "syscall" + "time" + + runc "github.com/containerd/go-runc" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +var Monitor runc.ProcessMonitor = runc.Monitor + +// DefaultCommand is the default command for Runsc. +const DefaultCommand = "runsc" + +// Runsc is the client to the runsc cli. +type Runsc struct { + Command string + PdeathSignal syscall.Signal + Setpgid bool + Root string + Log string + LogFormat runc.Format + Config map[string]string +} + +// List returns all containers created inside the provided runsc root directory. +func (r *Runsc) List(context context.Context) ([]*runc.Container, error) { + data, err := cmdOutput(r.command(context, "list", "--format=json"), false) + if err != nil { + return nil, err + } + var out []*runc.Container + if err := json.Unmarshal(data, &out); err != nil { + return nil, err + } + return out, nil +} + +// State returns the state for the container provided by id. +func (r *Runsc) State(context context.Context, id string) (*runc.Container, error) { + data, err := cmdOutput(r.command(context, "state", id), true) + if err != nil { + return nil, fmt.Errorf("%s: %s", err, data) + } + var c runc.Container + if err := json.Unmarshal(data, &c); err != nil { + return nil, err + } + return &c, nil +} + +type CreateOpts struct { + runc.IO + ConsoleSocket runc.ConsoleSocket + + // PidFile is a path to where a pid file should be created. + PidFile string + + // UserLog is a path to where runsc user log should be generated. + UserLog string +} + +func (o *CreateOpts) args() (out []string, err error) { + if o.PidFile != "" { + abs, err := filepath.Abs(o.PidFile) + if err != nil { + return nil, err + } + out = append(out, "--pid-file", abs) + } + if o.ConsoleSocket != nil { + out = append(out, "--console-socket", o.ConsoleSocket.Path()) + } + if o.UserLog != "" { + out = append(out, "--user-log", o.UserLog) + } + return out, nil +} + +// Create creates a new container and returns its pid if it was created successfully. +func (r *Runsc) Create(context context.Context, id, bundle string, opts *CreateOpts) error { + args := []string{"create", "--bundle", bundle} + if opts != nil { + oargs, err := opts.args() + if err != nil { + return err + } + args = append(args, oargs...) + } + cmd := r.command(context, append(args, id)...) + if opts != nil && opts.IO != nil { + opts.Set(cmd) + } + + if cmd.Stdout == nil && cmd.Stderr == nil { + data, err := cmdOutput(cmd, true) + if err != nil { + return fmt.Errorf("%s: %s", err, data) + } + return nil + } + ec, err := Monitor.Start(cmd) + if err != nil { + return err + } + if opts != nil && opts.IO != nil { + if c, ok := opts.IO.(runc.StartCloser); ok { + if err := c.CloseAfterStart(); err != nil { + return err + } + } + } + status, err := Monitor.Wait(cmd, ec) + if err == nil && status != 0 { + err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0]) + } + + return err +} + +// Start will start an already created container. +func (r *Runsc) Start(context context.Context, id string, cio runc.IO) error { + cmd := r.command(context, "start", id) + if cio != nil { + cio.Set(cmd) + } + + if cmd.Stdout == nil && cmd.Stderr == nil { + data, err := cmdOutput(cmd, true) + if err != nil { + return fmt.Errorf("%s: %s", err, data) + } + return nil + } + + ec, err := Monitor.Start(cmd) + if err != nil { + return err + } + if cio != nil { + if c, ok := cio.(runc.StartCloser); ok { + if err := c.CloseAfterStart(); err != nil { + return err + } + } + } + status, err := Monitor.Wait(cmd, ec) + if err == nil && status != 0 { + err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0]) + } + + return err +} + +type waitResult struct { + ID string `json:"id"` + ExitStatus int `json:"exitStatus"` +} + +// Wait will wait for a running container, and return its exit status. +// +// TODO(random-liu): Add exec process support. +func (r *Runsc) Wait(context context.Context, id string) (int, error) { + data, err := cmdOutput(r.command(context, "wait", id), true) + if err != nil { + return 0, fmt.Errorf("%s: %s", err, data) + } + var res waitResult + if err := json.Unmarshal(data, &res); err != nil { + return 0, err + } + return res.ExitStatus, nil +} + +type ExecOpts struct { + runc.IO + PidFile string + InternalPidFile string + ConsoleSocket runc.ConsoleSocket + Detach bool +} + +func (o *ExecOpts) args() (out []string, err error) { + if o.ConsoleSocket != nil { + out = append(out, "--console-socket", o.ConsoleSocket.Path()) + } + if o.Detach { + out = append(out, "--detach") + } + if o.PidFile != "" { + abs, err := filepath.Abs(o.PidFile) + if err != nil { + return nil, err + } + out = append(out, "--pid-file", abs) + } + if o.InternalPidFile != "" { + abs, err := filepath.Abs(o.InternalPidFile) + if err != nil { + return nil, err + } + out = append(out, "--internal-pid-file", abs) + } + return out, nil +} + +// Exec executes an additional process inside the container based on a full OCI +// Process specification. +func (r *Runsc) Exec(context context.Context, id string, spec specs.Process, opts *ExecOpts) error { + f, err := ioutil.TempFile(os.Getenv("XDG_RUNTIME_DIR"), "runsc-process") + if err != nil { + return err + } + defer os.Remove(f.Name()) + err = json.NewEncoder(f).Encode(spec) + f.Close() + if err != nil { + return err + } + args := []string{"exec", "--process", f.Name()} + if opts != nil { + oargs, err := opts.args() + if err != nil { + return err + } + args = append(args, oargs...) + } + cmd := r.command(context, append(args, id)...) + if opts != nil && opts.IO != nil { + opts.Set(cmd) + } + if cmd.Stdout == nil && cmd.Stderr == nil { + data, err := cmdOutput(cmd, true) + if err != nil { + return fmt.Errorf("%s: %s", err, data) + } + return nil + } + ec, err := Monitor.Start(cmd) + if err != nil { + return err + } + if opts != nil && opts.IO != nil { + if c, ok := opts.IO.(runc.StartCloser); ok { + if err := c.CloseAfterStart(); err != nil { + return err + } + } + } + status, err := Monitor.Wait(cmd, ec) + if err == nil && status != 0 { + err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0]) + } + return err +} + +// Run runs the create, start, delete lifecycle of the container and returns +// its exit status after it has exited. +func (r *Runsc) Run(context context.Context, id, bundle string, opts *CreateOpts) (int, error) { + args := []string{"run", "--bundle", bundle} + if opts != nil { + oargs, err := opts.args() + if err != nil { + return -1, err + } + args = append(args, oargs...) + } + cmd := r.command(context, append(args, id)...) + if opts != nil && opts.IO != nil { + opts.Set(cmd) + } + ec, err := Monitor.Start(cmd) + if err != nil { + return -1, err + } + return Monitor.Wait(cmd, ec) +} + +type DeleteOpts struct { + Force bool +} + +func (o *DeleteOpts) args() (out []string) { + if o.Force { + out = append(out, "--force") + } + return out +} + +// Delete deletes the container. +func (r *Runsc) Delete(context context.Context, id string, opts *DeleteOpts) error { + args := []string{"delete"} + if opts != nil { + args = append(args, opts.args()...) + } + return r.runOrError(r.command(context, append(args, id)...)) +} + +// KillOpts specifies options for killing a container and its processes. +type KillOpts struct { + All bool + Pid int +} + +func (o *KillOpts) args() (out []string) { + if o.All { + out = append(out, "--all") + } + if o.Pid != 0 { + out = append(out, "--pid", strconv.Itoa(o.Pid)) + } + return out +} + +// Kill sends the specified signal to the container. +func (r *Runsc) Kill(context context.Context, id string, sig int, opts *KillOpts) error { + args := []string{ + "kill", + } + if opts != nil { + args = append(args, opts.args()...) + } + return r.runOrError(r.command(context, append(args, id, strconv.Itoa(sig))...)) +} + +// Stats return the stats for a container like cpu, memory, and I/O. +func (r *Runsc) Stats(context context.Context, id string) (*runc.Stats, error) { + cmd := r.command(context, "events", "--stats", id) + rd, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + ec, err := Monitor.Start(cmd) + if err != nil { + return nil, err + } + defer func() { + rd.Close() + Monitor.Wait(cmd, ec) + }() + var e runc.Event + if err := json.NewDecoder(rd).Decode(&e); err != nil { + return nil, err + } + return e.Stats, nil +} + +// Events returns an event stream from runsc for a container with stats and OOM notifications. +func (r *Runsc) Events(context context.Context, id string, interval time.Duration) (chan *runc.Event, error) { + cmd := r.command(context, "events", fmt.Sprintf("--interval=%ds", int(interval.Seconds())), id) + rd, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + ec, err := Monitor.Start(cmd) + if err != nil { + rd.Close() + return nil, err + } + var ( + dec = json.NewDecoder(rd) + c = make(chan *runc.Event, 128) + ) + go func() { + defer func() { + close(c) + rd.Close() + Monitor.Wait(cmd, ec) + }() + for { + var e runc.Event + if err := dec.Decode(&e); err != nil { + if err == io.EOF { + return + } + e = runc.Event{ + Type: "error", + Err: err, + } + } + c <- &e + } + }() + return c, nil +} + +// Ps lists all the processes inside the container returning their pids. +func (r *Runsc) Ps(context context.Context, id string) ([]int, error) { + data, err := cmdOutput(r.command(context, "ps", "--format", "json", id), true) + if err != nil { + return nil, fmt.Errorf("%s: %s", err, data) + } + var pids []int + if err := json.Unmarshal(data, &pids); err != nil { + return nil, err + } + return pids, nil +} + +// Top lists all the processes inside the container returning the full ps data. +func (r *Runsc) Top(context context.Context, id string) (*runc.TopResults, error) { + data, err := cmdOutput(r.command(context, "ps", "--format", "table", id), true) + if err != nil { + return nil, fmt.Errorf("%s: %s", err, data) + } + + topResults, err := runc.ParsePSOutput(data) + if err != nil { + return nil, fmt.Errorf("%s: ", err) + } + return topResults, nil +} + +func (r *Runsc) args() []string { + var args []string + if r.Root != "" { + args = append(args, fmt.Sprintf("--root=%s", r.Root)) + } + if r.Log != "" { + args = append(args, fmt.Sprintf("--log=%s", r.Log)) + } + if r.LogFormat != "" { + args = append(args, fmt.Sprintf("--log-format=%s", r.LogFormat)) + } + for k, v := range r.Config { + args = append(args, fmt.Sprintf("--%s=%s", k, v)) + } + return args +} + +// runOrError will run the provided command. +// +// If an error is encountered and neither Stdout or Stderr was set the error +// will be returned in the format of <error>: <stderr>. +func (r *Runsc) runOrError(cmd *exec.Cmd) error { + if cmd.Stdout != nil || cmd.Stderr != nil { + ec, err := Monitor.Start(cmd) + if err != nil { + return err + } + status, err := Monitor.Wait(cmd, ec) + if err == nil && status != 0 { + err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0]) + } + return err + } + data, err := cmdOutput(cmd, true) + if err != nil { + return fmt.Errorf("%s: %s", err, data) + } + return nil +} + +func (r *Runsc) command(context context.Context, args ...string) *exec.Cmd { + command := r.Command + if command == "" { + command = DefaultCommand + } + cmd := exec.CommandContext(context, command, append(r.args(), args...)...) + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: r.Setpgid, + } + if r.PdeathSignal != 0 { + cmd.SysProcAttr.Pdeathsig = r.PdeathSignal + } + + return cmd +} + +func cmdOutput(cmd *exec.Cmd, combined bool) ([]byte, error) { + b := getBuf() + defer putBuf(b) + + cmd.Stdout = b + if combined { + cmd.Stderr = b + } + ec, err := Monitor.Start(cmd) + if err != nil { + return nil, err + } + + status, err := Monitor.Wait(cmd, ec) + if err == nil && status != 0 { + err = fmt.Errorf("%s did not terminate sucessfully", cmd.Args[0]) + } + + return b.Bytes(), err +} diff --git a/pkg/shim/runsc/utils.go b/pkg/shim/runsc/utils.go new file mode 100644 index 000000000..c514b3bc7 --- /dev/null +++ b/pkg/shim/runsc/utils.go @@ -0,0 +1,44 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runsc + +import ( + "bytes" + "strings" + "sync" +) + +var bytesBufferPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, +} + +func getBuf() *bytes.Buffer { + return bytesBufferPool.Get().(*bytes.Buffer) +} + +func putBuf(b *bytes.Buffer) { + b.Reset() + bytesBufferPool.Put(b) +} + +// FormatLogPath parses runsc config, and fill in %ID% in the log path. +func FormatLogPath(id string, config map[string]string) { + if path, ok := config["debug-log"]; ok { + config["debug-log"] = strings.Replace(path, "%ID%", id, -1) + } +} diff --git a/pkg/shim/v1/proc/BUILD b/pkg/shim/v1/proc/BUILD new file mode 100644 index 000000000..4377306af --- /dev/null +++ b/pkg/shim/v1/proc/BUILD @@ -0,0 +1,36 @@ +load("//tools:defs.bzl", "go_library") + +package(licenses = ["notice"]) + +go_library( + name = "proc", + srcs = [ + "deleted_state.go", + "exec.go", + "exec_state.go", + "init.go", + "init_state.go", + "io.go", + "process.go", + "types.go", + "utils.go", + ], + visibility = [ + "//pkg/shim:__subpackages__", + "//shim:__subpackages__", + ], + deps = [ + "//pkg/shim/runsc", + "@com_github_containerd_console//:go_default_library", + "@com_github_containerd_containerd//errdefs:go_default_library", + "@com_github_containerd_containerd//log:go_default_library", + "@com_github_containerd_containerd//mount:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", + "@com_github_containerd_fifo//:go_default_library", + "@com_github_containerd_go_runc//:go_default_library", + "@com_github_gogo_protobuf//types:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", + "@org_golang_x_sys//unix:go_default_library", + ], +) diff --git a/pkg/shim/v1/proc/deleted_state.go b/pkg/shim/v1/proc/deleted_state.go new file mode 100644 index 000000000..d9b970c4d --- /dev/null +++ b/pkg/shim/v1/proc/deleted_state.go @@ -0,0 +1,49 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "fmt" + + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/pkg/process" +) + +type deletedState struct{} + +func (*deletedState) Resize(ws console.WinSize) error { + return fmt.Errorf("cannot resize a deleted process.ss") +} + +func (*deletedState) Start(ctx context.Context) error { + return fmt.Errorf("cannot start a deleted process.ss") +} + +func (*deletedState) Delete(ctx context.Context) error { + return fmt.Errorf("cannot delete a deleted process.ss: %w", errdefs.ErrNotFound) +} + +func (*deletedState) Kill(ctx context.Context, sig uint32, all bool) error { + return fmt.Errorf("cannot kill a deleted process.ss: %w", errdefs.ErrNotFound) +} + +func (*deletedState) SetExited(status int) {} + +func (*deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + return nil, fmt.Errorf("cannot exec in a deleted state") +} diff --git a/pkg/shim/v1/proc/exec.go b/pkg/shim/v1/proc/exec.go new file mode 100644 index 000000000..1d1d90488 --- /dev/null +++ b/pkg/shim/v1/proc/exec.go @@ -0,0 +1,281 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/fifo" + runc "github.com/containerd/go-runc" + specs "github.com/opencontainers/runtime-spec/specs-go" + "golang.org/x/sys/unix" + + "gvisor.dev/gvisor/pkg/shim/runsc" +) + +type execProcess struct { + wg sync.WaitGroup + + execState execState + + mu sync.Mutex + id string + console console.Console + io runc.IO + status int + exited time.Time + pid int + internalPid int + closers []io.Closer + stdin io.Closer + stdio stdio.Stdio + path string + spec specs.Process + + parent *Init + waitBlock chan struct{} +} + +func (e *execProcess) Wait() { + <-e.waitBlock +} + +func (e *execProcess) ID() string { + return e.id +} + +func (e *execProcess) Pid() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.pid +} + +func (e *execProcess) ExitStatus() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.status +} + +func (e *execProcess) ExitedAt() time.Time { + e.mu.Lock() + defer e.mu.Unlock() + return e.exited +} + +func (e *execProcess) SetExited(status int) { + e.mu.Lock() + defer e.mu.Unlock() + + e.execState.SetExited(status) +} + +func (e *execProcess) setExited(status int) { + e.status = status + e.exited = time.Now() + e.parent.Platform.ShutdownConsole(context.Background(), e.console) + close(e.waitBlock) +} + +func (e *execProcess) Delete(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Delete(ctx) +} + +func (e *execProcess) delete(ctx context.Context) error { + e.wg.Wait() + if e.io != nil { + for _, c := range e.closers { + c.Close() + } + e.io.Close() + } + pidfile := filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) + // silently ignore error + os.Remove(pidfile) + internalPidfile := filepath.Join(e.path, fmt.Sprintf("%s-internal.pid", e.id)) + // silently ignore error + os.Remove(internalPidfile) + return nil +} + +func (e *execProcess) Resize(ws console.WinSize) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Resize(ws) +} + +func (e *execProcess) resize(ws console.WinSize) error { + if e.console == nil { + return nil + } + return e.console.Resize(ws) +} + +func (e *execProcess) Kill(ctx context.Context, sig uint32, _ bool) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Kill(ctx, sig, false) +} + +func (e *execProcess) kill(ctx context.Context, sig uint32, _ bool) error { + internalPid := e.internalPid + if internalPid != 0 { + if err := e.parent.runtime.Kill(ctx, e.parent.id, int(sig), &runsc.KillOpts{ + Pid: internalPid, + }); err != nil { + // If this returns error, consider the process has + // already stopped. + // + // TODO: Fix after signal handling is fixed. + return fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound) + } + } + return nil +} + +func (e *execProcess) Stdin() io.Closer { + return e.stdin +} + +func (e *execProcess) Stdio() stdio.Stdio { + return e.stdio +} + +func (e *execProcess) Start(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.execState.Start(ctx) +} + +func (e *execProcess) start(ctx context.Context) (err error) { + var ( + socket *runc.Socket + pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) + internalPidfile = filepath.Join(e.path, fmt.Sprintf("%s-internal.pid", e.id)) + ) + if e.stdio.Terminal { + if socket, err = runc.NewTempConsoleSocket(); err != nil { + return fmt.Errorf("failed to create runc console socket: %w", err) + } + defer socket.Close() + } else if e.stdio.IsNull() { + if e.io, err = runc.NewNullIO(); err != nil { + return fmt.Errorf("creating new NULL IO: %w", err) + } + } else { + if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil { + return fmt.Errorf("failed to create runc io pipes: %w", err) + } + } + opts := &runsc.ExecOpts{ + PidFile: pidfile, + InternalPidFile: internalPidfile, + IO: e.io, + Detach: true, + } + if socket != nil { + opts.ConsoleSocket = socket + } + eventCh := e.parent.Monitor.Subscribe() + defer func() { + // Unsubscribe if an error is returned. + if err != nil { + e.parent.Monitor.Unsubscribe(eventCh) + } + }() + if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil { + close(e.waitBlock) + return e.parent.runtimeError(err, "OCI runtime exec failed") + } + if e.stdio.Stdin != "" { + sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return fmt.Errorf("failed to open stdin fifo %s: %w", e.stdio.Stdin, err) + } + e.closers = append(e.closers, sc) + e.stdin = sc + } + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if socket != nil { + console, err := socket.ReceiveMaster() + if err != nil { + return fmt.Errorf("failed to retrieve console master: %w", err) + } + if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { + return fmt.Errorf("failed to start console copy: %w", err) + } + } else if !e.stdio.IsNull() { + if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil { + return fmt.Errorf("failed to start io pipe copy: %w", err) + } + } + pid, err := runc.ReadPidFile(opts.PidFile) + if err != nil { + return fmt.Errorf("failed to retrieve OCI runtime exec pid: %w", err) + } + e.pid = pid + internalPid, err := runc.ReadPidFile(opts.InternalPidFile) + if err != nil { + return fmt.Errorf("failed to retrieve OCI runtime exec internal pid: %w", err) + } + e.internalPid = internalPid + go func() { + defer e.parent.Monitor.Unsubscribe(eventCh) + for event := range eventCh { + if event.Pid == e.pid { + ExitCh <- Exit{ + Timestamp: event.Timestamp, + ID: e.id, + Status: event.Status, + } + break + } + } + }() + return nil +} + +func (e *execProcess) Status(ctx context.Context) (string, error) { + e.mu.Lock() + defer e.mu.Unlock() + // if we don't have a pid then the exec process has just been created + if e.pid == 0 { + return "created", nil + } + // if we have a pid and it can be signaled, the process is running + // TODO(random-liu): Use `runsc kill --pid`. + if err := unix.Kill(e.pid, 0); err == nil { + return "running", nil + } + // else if we have a pid but it can nolonger be signaled, it has stopped + return "stopped", nil +} diff --git a/pkg/shim/v1/proc/exec_state.go b/pkg/shim/v1/proc/exec_state.go new file mode 100644 index 000000000..4dcda8b44 --- /dev/null +++ b/pkg/shim/v1/proc/exec_state.go @@ -0,0 +1,154 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "fmt" + + "github.com/containerd/console" +) + +type execState interface { + Resize(console.WinSize) error + Start(context.Context) error + Delete(context.Context) error + Kill(context.Context, uint32, bool) error + SetExited(int) +} + +type execCreatedState struct { + p *execProcess +} + +func (s *execCreatedState) transition(name string) error { + switch name { + case "running": + s.p.execState = &execRunningState{p: s.p} + case "stopped": + s.p.execState = &execStoppedState{p: s.p} + case "deleted": + s.p.execState = &deletedState{} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *execCreatedState) Resize(ws console.WinSize) error { + return s.p.resize(ws) +} + +func (s *execCreatedState) Start(ctx context.Context) error { + if err := s.p.start(ctx); err != nil { + return err + } + return s.transition("running") +} + +func (s *execCreatedState) Delete(ctx context.Context) error { + if err := s.p.delete(ctx); err != nil { + return err + } + return s.transition("deleted") +} + +func (s *execCreatedState) Kill(ctx context.Context, sig uint32, all bool) error { + return s.p.kill(ctx, sig, all) +} + +func (s *execCreatedState) SetExited(status int) { + s.p.setExited(status) + + if err := s.transition("stopped"); err != nil { + panic(err) + } +} + +type execRunningState struct { + p *execProcess +} + +func (s *execRunningState) transition(name string) error { + switch name { + case "stopped": + s.p.execState = &execStoppedState{p: s.p} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *execRunningState) Resize(ws console.WinSize) error { + return s.p.resize(ws) +} + +func (s *execRunningState) Start(ctx context.Context) error { + return fmt.Errorf("cannot start a running process") +} + +func (s *execRunningState) Delete(ctx context.Context) error { + return fmt.Errorf("cannot delete a running process") +} + +func (s *execRunningState) Kill(ctx context.Context, sig uint32, all bool) error { + return s.p.kill(ctx, sig, all) +} + +func (s *execRunningState) SetExited(status int) { + s.p.setExited(status) + + if err := s.transition("stopped"); err != nil { + panic(err) + } +} + +type execStoppedState struct { + p *execProcess +} + +func (s *execStoppedState) transition(name string) error { + switch name { + case "deleted": + s.p.execState = &deletedState{} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *execStoppedState) Resize(ws console.WinSize) error { + return fmt.Errorf("cannot resize a stopped container") +} + +func (s *execStoppedState) Start(ctx context.Context) error { + return fmt.Errorf("cannot start a stopped process") +} + +func (s *execStoppedState) Delete(ctx context.Context) error { + if err := s.p.delete(ctx); err != nil { + return err + } + return s.transition("deleted") +} + +func (s *execStoppedState) Kill(ctx context.Context, sig uint32, all bool) error { + return s.p.kill(ctx, sig, all) +} + +func (s *execStoppedState) SetExited(status int) { + // no op +} diff --git a/pkg/shim/v1/proc/init.go b/pkg/shim/v1/proc/init.go new file mode 100644 index 000000000..dab3123d6 --- /dev/null +++ b/pkg/shim/v1/proc/init.go @@ -0,0 +1,460 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "encoding/json" + "fmt" + "io" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/fifo" + runc "github.com/containerd/go-runc" + specs "github.com/opencontainers/runtime-spec/specs-go" + + "gvisor.dev/gvisor/pkg/shim/runsc" +) + +// InitPidFile name of the file that contains the init pid. +const InitPidFile = "init.pid" + +// Init represents an initial process for a container. +type Init struct { + wg sync.WaitGroup + initState initState + + // mu is used to ensure that `Start()` and `Exited()` calls return in + // the right order when invoked in separate go routines. This is the + // case within the shim implementation as it makes use of the reaper + // interface. + mu sync.Mutex + + waitBlock chan struct{} + + WorkDir string + + id string + Bundle string + console console.Console + Platform stdio.Platform + io runc.IO + runtime *runsc.Runsc + status int + exited time.Time + pid int + closers []io.Closer + stdin io.Closer + stdio stdio.Stdio + Rootfs string + IoUID int + IoGID int + Sandbox bool + UserLog string + Monitor ProcessMonitor +} + +// NewRunsc returns a new runsc instance for a process. +func NewRunsc(root, path, namespace, runtime string, config map[string]string) *runsc.Runsc { + if root == "" { + root = RunscRoot + } + return &runsc.Runsc{ + Command: runtime, + PdeathSignal: syscall.SIGKILL, + Log: filepath.Join(path, "log.json"), + LogFormat: runc.JSON, + Root: filepath.Join(root, namespace), + Config: config, + } +} + +// New returns a new init process. +func New(id string, runtime *runsc.Runsc, stdio stdio.Stdio) *Init { + p := &Init{ + id: id, + runtime: runtime, + stdio: stdio, + status: 0, + waitBlock: make(chan struct{}), + } + p.initState = &createdState{p: p} + return p +} + +// Create the process with the provided config. +func (p *Init) Create(ctx context.Context, r *CreateConfig) (err error) { + var socket *runc.Socket + if r.Terminal { + if socket, err = runc.NewTempConsoleSocket(); err != nil { + return fmt.Errorf("failed to create OCI runtime console socket: %w", err) + } + defer socket.Close() + } else if hasNoIO(r) { + if p.io, err = runc.NewNullIO(); err != nil { + return fmt.Errorf("creating new NULL IO: %w", err) + } + } else { + if p.io, err = runc.NewPipeIO(p.IoUID, p.IoGID, withConditionalIO(p.stdio)); err != nil { + return fmt.Errorf("failed to create OCI runtime io pipes: %w", err) + } + } + pidFile := filepath.Join(p.Bundle, InitPidFile) + opts := &runsc.CreateOpts{ + PidFile: pidFile, + } + if socket != nil { + opts.ConsoleSocket = socket + } + if p.Sandbox { + opts.IO = p.io + // UserLog is only useful for sandbox. + opts.UserLog = p.UserLog + } + if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil { + return p.runtimeError(err, "OCI runtime create failed") + } + if r.Stdin != "" { + sc, err := fifo.OpenFifo(context.Background(), r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return fmt.Errorf("failed to open stdin fifo %s: %w", r.Stdin, err) + } + p.stdin = sc + p.closers = append(p.closers, sc) + } + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if socket != nil { + console, err := socket.ReceiveMaster() + if err != nil { + return fmt.Errorf("failed to retrieve console master: %w", err) + } + console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg) + if err != nil { + return fmt.Errorf("failed to start console copy: %w", err) + } + p.console = console + } else if !hasNoIO(r) { + if err := copyPipes(ctx, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg); err != nil { + return fmt.Errorf("failed to start io pipe copy: %w", err) + } + } + pid, err := runc.ReadPidFile(pidFile) + if err != nil { + return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err) + } + p.pid = pid + return nil +} + +// Wait waits for the process to exit. +func (p *Init) Wait() { + <-p.waitBlock +} + +// ID returns the ID of the process. +func (p *Init) ID() string { + return p.id +} + +// Pid returns the PID of the process. +func (p *Init) Pid() int { + return p.pid +} + +// ExitStatus returns the exit status of the process. +func (p *Init) ExitStatus() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.status +} + +// ExitedAt returns the time when the process exited. +func (p *Init) ExitedAt() time.Time { + p.mu.Lock() + defer p.mu.Unlock() + return p.exited +} + +// Status returns the status of the process. +func (p *Init) Status(ctx context.Context) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + c, err := p.runtime.State(ctx, p.id) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return "stopped", nil + } + return "", p.runtimeError(err, "OCI runtime state failed") + } + return p.convertStatus(c.Status), nil +} + +// Start starts the init process. +func (p *Init) Start(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Start(ctx) +} + +func (p *Init) start(ctx context.Context) error { + var cio runc.IO + if !p.Sandbox { + cio = p.io + } + if err := p.runtime.Start(ctx, p.id, cio); err != nil { + return p.runtimeError(err, "OCI runtime start failed") + } + go func() { + status, err := p.runtime.Wait(context.Background(), p.id) + if err != nil { + log.G(ctx).WithError(err).Errorf("Failed to wait for container %q", p.id) + // TODO(random-liu): Handle runsc kill error. + if err := p.killAll(ctx); err != nil { + log.G(ctx).WithError(err).Errorf("Failed to kill container %q", p.id) + } + status = internalErrorCode + } + ExitCh <- Exit{ + Timestamp: time.Now(), + ID: p.id, + Status: status, + } + }() + return nil +} + +// SetExited set the exit stauts of the init process. +func (p *Init) SetExited(status int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.initState.SetExited(status) +} + +func (p *Init) setExited(status int) { + p.exited = time.Now() + p.status = status + p.Platform.ShutdownConsole(context.Background(), p.console) + close(p.waitBlock) +} + +// Delete deletes the init process. +func (p *Init) Delete(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Delete(ctx) +} + +func (p *Init) delete(ctx context.Context) error { + p.killAll(ctx) + p.wg.Wait() + err := p.runtime.Delete(ctx, p.id, nil) + // ignore errors if a runtime has already deleted the process + // but we still hold metadata and pipes + // + // this is common during a checkpoint, runc will delete the container state + // after a checkpoint and the container will no longer exist within runc + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + err = nil + } else { + err = p.runtimeError(err, "failed to delete task") + } + } + if p.io != nil { + for _, c := range p.closers { + c.Close() + } + p.io.Close() + } + if err2 := mount.UnmountAll(p.Rootfs, 0); err2 != nil { + log.G(ctx).WithError(err2).Warn("failed to cleanup rootfs mount") + if err == nil { + err = fmt.Errorf("failed rootfs umount: %w", err2) + } + } + return err +} + +// Resize resizes the init processes console. +func (p *Init) Resize(ws console.WinSize) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.console == nil { + return nil + } + return p.console.Resize(ws) +} + +func (p *Init) resize(ws console.WinSize) error { + if p.console == nil { + return nil + } + return p.console.Resize(ws) +} + +// Kill kills the init process. +func (p *Init) Kill(ctx context.Context, signal uint32, all bool) error { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Kill(ctx, signal, all) +} + +func (p *Init) kill(context context.Context, signal uint32, all bool) error { + var ( + killErr error + backoff = 100 * time.Millisecond + ) + timeout := 1 * time.Second + for start := time.Now(); time.Now().Sub(start) < timeout; { + c, err := p.runtime.State(context, p.id) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return fmt.Errorf("no such process: %w", errdefs.ErrNotFound) + } + return p.runtimeError(err, "OCI runtime state failed") + } + // For runsc, signal only works when container is running state. + // If the container is not in running state, directly return + // "no such process" + if p.convertStatus(c.Status) == "stopped" { + return fmt.Errorf("no such process: %w", errdefs.ErrNotFound) + } + killErr = p.runtime.Kill(context, p.id, int(signal), &runsc.KillOpts{ + All: all, + }) + if killErr == nil { + return nil + } + time.Sleep(backoff) + backoff *= 2 + } + return p.runtimeError(killErr, "kill timeout") +} + +// KillAll kills all processes belonging to the init process. +func (p *Init) KillAll(context context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + return p.killAll(context) +} + +func (p *Init) killAll(context context.Context) error { + p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runsc.KillOpts{ + All: true, + }) + // Ignore error handling for `runsc kill --all` for now. + // * If it doesn't return error, it is good; + // * If it returns error, consider the container has already stopped. + // TODO: Fix `runsc kill --all` error handling. + return nil +} + +// Stdin returns the stdin of the process. +func (p *Init) Stdin() io.Closer { + return p.stdin +} + +// Runtime returns the OCI runtime configured for the init process. +func (p *Init) Runtime() *runsc.Runsc { + return p.runtime +} + +// Exec returns a new child process. +func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + p.mu.Lock() + defer p.mu.Unlock() + + return p.initState.Exec(ctx, path, r) +} + +// exec returns a new exec'd process. +func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + // process exec request + var spec specs.Process + if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { + return nil, err + } + spec.Terminal = r.Terminal + + e := &execProcess{ + id: r.ID, + path: path, + parent: p, + spec: spec, + stdio: stdio.Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + waitBlock: make(chan struct{}), + } + e.execState = &execCreatedState{p: e} + return e, nil +} + +// Stdio returns the stdio of the process. +func (p *Init) Stdio() stdio.Stdio { + return p.stdio +} + +func (p *Init) runtimeError(rErr error, msg string) error { + if rErr == nil { + return nil + } + + rMsg, err := getLastRuntimeError(p.runtime) + switch { + case err != nil: + return fmt.Errorf("%s: %w (unable to retrieve OCI runtime error: %v)", msg, rErr, err) + case rMsg == "": + return fmt.Errorf("%s: %w", msg, rErr) + default: + return fmt.Errorf("%s: %s", msg, rMsg) + } +} + +func (p *Init) convertStatus(status string) string { + if status == "created" && !p.Sandbox && p.status == internalErrorCode { + // Treat start failure state for non-root container as stopped. + return "stopped" + } + return status +} + +func withConditionalIO(c stdio.Stdio) runc.IOOpt { + return func(o *runc.IOOption) { + o.OpenStdin = c.Stdin != "" + o.OpenStdout = c.Stdout != "" + o.OpenStderr = c.Stderr != "" + } +} diff --git a/pkg/shim/v1/proc/init_state.go b/pkg/shim/v1/proc/init_state.go new file mode 100644 index 000000000..9233ecc85 --- /dev/null +++ b/pkg/shim/v1/proc/init_state.go @@ -0,0 +1,182 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "fmt" + + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/pkg/process" +) + +type initState interface { + Resize(console.WinSize) error + Start(context.Context) error + Delete(context.Context) error + Exec(context.Context, string, *ExecConfig) (process.Process, error) + Kill(context.Context, uint32, bool) error + SetExited(int) +} + +type createdState struct { + p *Init +} + +func (s *createdState) transition(name string) error { + switch name { + case "running": + s.p.initState = &runningState{p: s.p} + case "stopped": + s.p.initState = &stoppedState{p: s.p} + case "deleted": + s.p.initState = &deletedState{} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *createdState) Resize(ws console.WinSize) error { + return s.p.resize(ws) +} + +func (s *createdState) Start(ctx context.Context) error { + if err := s.p.start(ctx); err != nil { + // Containerd doesn't allow deleting container in created state. + // However, for gvisor, a non-root container in created state can + // only go to running state. If the container can't be started, + // it can only stay in created state, and never be deleted. + // To work around that, we treat non-root container in start failure + // state as stopped. + if !s.p.Sandbox { + s.p.io.Close() + s.p.setExited(internalErrorCode) + if err := s.transition("stopped"); err != nil { + panic(err) + } + } + return err + } + return s.transition("running") +} + +func (s *createdState) Delete(ctx context.Context) error { + if err := s.p.delete(ctx); err != nil { + return err + } + return s.transition("deleted") +} + +func (s *createdState) Kill(ctx context.Context, sig uint32, all bool) error { + return s.p.kill(ctx, sig, all) +} + +func (s *createdState) SetExited(status int) { + s.p.setExited(status) + + if err := s.transition("stopped"); err != nil { + panic(err) + } +} + +func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + return s.p.exec(ctx, path, r) +} + +type runningState struct { + p *Init +} + +func (s *runningState) transition(name string) error { + switch name { + case "stopped": + s.p.initState = &stoppedState{p: s.p} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *runningState) Resize(ws console.WinSize) error { + return s.p.resize(ws) +} + +func (s *runningState) Start(ctx context.Context) error { + return fmt.Errorf("cannot start a running process.ss") +} + +func (s *runningState) Delete(ctx context.Context) error { + return fmt.Errorf("cannot delete a running process.ss") +} + +func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error { + return s.p.kill(ctx, sig, all) +} + +func (s *runningState) SetExited(status int) { + s.p.setExited(status) + + if err := s.transition("stopped"); err != nil { + panic(err) + } +} + +func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + return s.p.exec(ctx, path, r) +} + +type stoppedState struct { + p *Init +} + +func (s *stoppedState) transition(name string) error { + switch name { + case "deleted": + s.p.initState = &deletedState{} + default: + return fmt.Errorf("invalid state transition %q to %q", stateName(s), name) + } + return nil +} + +func (s *stoppedState) Resize(ws console.WinSize) error { + return fmt.Errorf("cannot resize a stopped container") +} + +func (s *stoppedState) Start(ctx context.Context) error { + return fmt.Errorf("cannot start a stopped process.ss") +} + +func (s *stoppedState) Delete(ctx context.Context) error { + if err := s.p.delete(ctx); err != nil { + return err + } + return s.transition("deleted") +} + +func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error { + return errdefs.ToGRPCf(errdefs.ErrNotFound, "process.ss %s not found", s.p.id) +} + +func (s *stoppedState) SetExited(status int) { + // no op +} + +func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (process.Process, error) { + return nil, fmt.Errorf("cannot exec in a stopped state") +} diff --git a/pkg/shim/v1/proc/io.go b/pkg/shim/v1/proc/io.go new file mode 100644 index 000000000..34d825fb7 --- /dev/null +++ b/pkg/shim/v1/proc/io.go @@ -0,0 +1,162 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "context" + "fmt" + "io" + "os" + "sync" + "sync/atomic" + "syscall" + + "github.com/containerd/containerd/log" + "github.com/containerd/fifo" + runc "github.com/containerd/go-runc" +) + +// TODO(random-liu): This file can be a util. + +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + +func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg *sync.WaitGroup) error { + var sameFile *countingWriteCloser + for _, i := range []struct { + name string + dest func(wc io.WriteCloser, rc io.Closer) + }{ + { + name: stdout, + dest: func(wc io.WriteCloser, rc io.Closer) { + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { + log.G(ctx).Warn("error copying stdout") + } + wg.Done() + wc.Close() + if rc != nil { + rc.Close() + } + }() + }, + }, { + name: stderr, + dest: func(wc io.WriteCloser, rc io.Closer) { + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { + log.G(ctx).Warn("error copying stderr") + } + wg.Done() + wc.Close() + if rc != nil { + rc.Close() + } + }() + }, + }, + } { + ok, err := isFifo(i.name) + if err != nil { + return err + } + var ( + fw io.WriteCloser + fr io.Closer + ) + if ok { + if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil { + return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) + } + if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil { + return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) + } + } else { + if sameFile != nil { + sameFile.count++ + i.dest(sameFile, nil) + continue + } + if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil { + return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) + } + if stdout == stderr { + sameFile = &countingWriteCloser{ + WriteCloser: fw, + count: 1, + } + } + } + i.dest(fw, fr) + } + if stdin == "" { + return nil + } + f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err) + } + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + + io.CopyBuffer(rio.Stdin(), f, *p) + rio.Stdin().Close() + f.Close() + }() + return nil +} + +// countingWriteCloser masks io.Closer() until close has been invoked a certain number of times. +type countingWriteCloser struct { + io.WriteCloser + count int64 +} + +func (c *countingWriteCloser) Close() error { + if atomic.AddInt64(&c.count, -1) > 0 { + return nil + } + return c.WriteCloser.Close() +} + +// isFifo checks if a file is a fifo. +// +// If the file does not exist then it returns false. +func isFifo(path string) (bool, error) { + stat, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + if stat.Mode()&os.ModeNamedPipe == os.ModeNamedPipe { + return true, nil + } + return false, nil +} diff --git a/pkg/shim/v1/proc/process.go b/pkg/shim/v1/proc/process.go new file mode 100644 index 000000000..d462c3eef --- /dev/null +++ b/pkg/shim/v1/proc/process.go @@ -0,0 +1,37 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "fmt" +) + +// RunscRoot is the path to the root runsc state directory. +const RunscRoot = "/run/containerd/runsc" + +func stateName(v interface{}) string { + switch v.(type) { + case *runningState, *execRunningState: + return "running" + case *createdState, *execCreatedState: + return "created" + case *deletedState: + return "deleted" + case *stoppedState: + return "stopped" + } + panic(fmt.Errorf("invalid state %v", v)) +} diff --git a/pkg/shim/v1/proc/types.go b/pkg/shim/v1/proc/types.go new file mode 100644 index 000000000..2b0df4663 --- /dev/null +++ b/pkg/shim/v1/proc/types.go @@ -0,0 +1,69 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "time" + + runc "github.com/containerd/go-runc" + "github.com/gogo/protobuf/types" +) + +// Mount holds filesystem mount configuration. +type Mount struct { + Type string + Source string + Target string + Options []string +} + +// CreateConfig hold task creation configuration. +type CreateConfig struct { + ID string + Bundle string + Runtime string + Rootfs []Mount + Terminal bool + Stdin string + Stdout string + Stderr string + Options *types.Any +} + +// ExecConfig holds exec creation configuration. +type ExecConfig struct { + ID string + Terminal bool + Stdin string + Stdout string + Stderr string + Spec *types.Any +} + +// Exit is the type of exit events. +type Exit struct { + Timestamp time.Time + ID string + Status int +} + +// ProcessMonitor monitors process exit changes. +type ProcessMonitor interface { + // Subscribe to process exit changes + Subscribe() chan runc.Exit + // Unsubscribe to process exit changes + Unsubscribe(c chan runc.Exit) +} diff --git a/pkg/shim/v1/proc/utils.go b/pkg/shim/v1/proc/utils.go new file mode 100644 index 000000000..716de2f59 --- /dev/null +++ b/pkg/shim/v1/proc/utils.go @@ -0,0 +1,90 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proc + +import ( + "encoding/json" + "io" + "os" + "strings" + "time" + + "gvisor.dev/gvisor/pkg/shim/runsc" +) + +const ( + internalErrorCode = 128 + bufferSize = 32 +) + +// ExitCh is the exit events channel for containers and exec processes +// inside the sandbox. +var ExitCh = make(chan Exit, bufferSize) + +// TODO(mlaventure): move to runc package? +func getLastRuntimeError(r *runsc.Runsc) (string, error) { + if r.Log == "" { + return "", nil + } + + f, err := os.OpenFile(r.Log, os.O_RDONLY, 0400) + if err != nil { + return "", err + } + + var ( + errMsg string + log struct { + Level string + Msg string + Time time.Time + } + ) + + dec := json.NewDecoder(f) + for err = nil; err == nil; { + if err = dec.Decode(&log); err != nil && err != io.EOF { + return "", err + } + if log.Level == "error" { + errMsg = strings.TrimSpace(log.Msg) + } + } + + return errMsg, nil +} + +func copyFile(to, from string) error { + ff, err := os.Open(from) + if err != nil { + return err + } + defer ff.Close() + tt, err := os.Create(to) + if err != nil { + return err + } + defer tt.Close() + + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + _, err = io.CopyBuffer(tt, ff, *p) + return err +} + +func hasNoIO(r *CreateConfig) bool { + return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" +} diff --git a/pkg/shim/v1/shim/BUILD b/pkg/shim/v1/shim/BUILD new file mode 100644 index 000000000..05c595bc9 --- /dev/null +++ b/pkg/shim/v1/shim/BUILD @@ -0,0 +1,40 @@ +load("//tools:defs.bzl", "go_library") + +package(licenses = ["notice"]) + +go_library( + name = "shim", + srcs = [ + "api.go", + "platform.go", + "service.go", + ], + visibility = [ + "//pkg/shim:__subpackages__", + "//shim:__subpackages__", + ], + deps = [ + "//pkg/shim/runsc", + "//pkg/shim/v1/proc", + "//pkg/shim/v1/utils", + "@com_github_containerd_console//:go_default_library", + "@com_github_containerd_containerd//api/events:go_default_library", + "@com_github_containerd_containerd//api/types/task:go_default_library", + "@com_github_containerd_containerd//errdefs:go_default_library", + "@com_github_containerd_containerd//events:go_default_library", + "@com_github_containerd_containerd//log:go_default_library", + "@com_github_containerd_containerd//mount:go_default_library", + "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", + "@com_github_containerd_containerd//runtime:go_default_library", + "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", + "@com_github_containerd_containerd//runtime/v1/shim/v1:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", + "@com_github_containerd_fifo//:go_default_library", + "@com_github_containerd_typeurl//:go_default_library", + "@com_github_gogo_protobuf//types:go_default_library", + "@org_golang_google_grpc//codes:go_default_library", + "@org_golang_google_grpc//status:go_default_library", + ], +) diff --git a/pkg/shim/v1/shim/api.go b/pkg/shim/v1/shim/api.go new file mode 100644 index 000000000..5dd8ff172 --- /dev/null +++ b/pkg/shim/v1/shim/api.go @@ -0,0 +1,28 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shim + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskCreate = events.TaskCreate +type TaskStart = events.TaskStart +type TaskOOM = events.TaskOOM +type TaskExit = events.TaskExit +type TaskDelete = events.TaskDelete +type TaskExecAdded = events.TaskExecAdded +type TaskExecStarted = events.TaskExecStarted diff --git a/pkg/shim/v1/shim/platform.go b/pkg/shim/v1/shim/platform.go new file mode 100644 index 000000000..f590f80ef --- /dev/null +++ b/pkg/shim/v1/shim/platform.go @@ -0,0 +1,106 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shim + +import ( + "context" + "fmt" + "io" + "sync" + "syscall" + + "github.com/containerd/console" + "github.com/containerd/fifo" +) + +type linuxPlatform struct { + epoller *console.Epoller +} + +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { + if p.epoller == nil { + return nil, fmt.Errorf("uninitialized epoller") + } + + epollConsole, err := p.epoller.Add(console) + if err != nil { + return nil, err + } + + if stdin != "" { + in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(epollConsole, in, *p) + }() + } + + outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(outw, epollConsole, *p) + epollConsole.Close() + outr.Close() + outw.Close() + wg.Done() + }() + return epollConsole, nil +} + +func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error { + if p.epoller == nil { + return fmt.Errorf("uninitialized epoller") + } + epollConsole, ok := cons.(*console.EpollConsole) + if !ok { + return fmt.Errorf("expected EpollConsole, got %#v", cons) + } + return epollConsole.Shutdown(p.epoller.CloseConsole) +} + +func (p *linuxPlatform) Close() error { + return p.epoller.Close() +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *Service) initPlatform() error { + if s.platform != nil { + return nil + } + epoller, err := console.NewEpoller() + if err != nil { + return fmt.Errorf("failed to initialize epoller: %w", err) + } + s.platform = &linuxPlatform{ + epoller: epoller, + } + go epoller.Wait() + return nil +} diff --git a/pkg/shim/v1/shim/service.go b/pkg/shim/v1/shim/service.go new file mode 100644 index 000000000..84a810cb2 --- /dev/null +++ b/pkg/shim/v1/shim/service.go @@ -0,0 +1,573 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shim + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/containerd/console" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/linux/runctypes" + shim "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" + "github.com/containerd/typeurl" + "github.com/gogo/protobuf/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "gvisor.dev/gvisor/pkg/shim/runsc" + "gvisor.dev/gvisor/pkg/shim/v1/proc" + "gvisor.dev/gvisor/pkg/shim/v1/utils" +) + +var ( + empty = &types.Empty{} + bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, + } +) + +// Config contains shim specific configuration. +type Config struct { + Path string + Namespace string + WorkDir string + RuntimeRoot string + RunscConfig map[string]string +} + +// NewService returns a new shim service that can be used via GRPC. +func NewService(config Config, publisher events.Publisher) (*Service, error) { + if config.Namespace == "" { + return nil, fmt.Errorf("shim namespace cannot be empty") + } + ctx := namespaces.WithNamespace(context.Background(), config.Namespace) + s := &Service{ + config: config, + context: ctx, + processes: make(map[string]process.Process), + events: make(chan interface{}, 128), + ec: proc.ExitCh, + } + go s.processExits() + if err := s.initPlatform(); err != nil { + return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) + } + go s.forward(publisher) + return s, nil +} + +// Service is the shim implementation of a remote shim over GRPC. +type Service struct { + mu sync.Mutex + + config Config + context context.Context + processes map[string]process.Process + events chan interface{} + platform stdio.Platform + ec chan proc.Exit + + // Filled by Create() + id string + bundle string +} + +// Create creates a new initial process and container with the underlying OCI runtime. +func (s *Service) Create(ctx context.Context, r *shim.CreateTaskRequest) (_ *shim.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + var mounts []proc.Mount + for _, m := range r.Rootfs { + mounts = append(mounts, proc.Mount{ + Type: m.Type, + Source: m.Source, + Target: m.Target, + Options: m.Options, + }) + } + + rootfs := filepath.Join(r.Bundle, "rootfs") + if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) { + return nil, err + } + + config := &proc.CreateConfig{ + ID: r.ID, + Bundle: r.Bundle, + Runtime: r.Runtime, + Rootfs: mounts, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Options: r.Options, + } + defer func() { + if err != nil { + if err2 := mount.UnmountAll(rootfs, 0); err2 != nil { + log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount") + } + } + }() + for _, rm := range mounts { + m := &mount.Mount{ + Type: rm.Type, + Source: rm.Source, + Options: rm.Options, + } + if err := m.Mount(rootfs); err != nil { + return nil, fmt.Errorf("failed to mount rootfs component %v: %w", m, err) + } + } + process, err := newInit( + ctx, + s.config.Path, + s.config.WorkDir, + s.config.RuntimeRoot, + s.config.Namespace, + s.config.RunscConfig, + s.platform, + config, + ) + if err := process.Create(ctx, config); err != nil { + return nil, errdefs.ToGRPC(err) + } + // Save the main task id and bundle to the shim for additional + // requests. + s.id = r.ID + s.bundle = r.Bundle + pid := process.Pid() + s.processes[r.ID] = process + return &shim.CreateTaskResponse{ + Pid: uint32(pid), + }, nil +} + +// Start starts a process. +func (s *Service) Start(ctx context.Context, r *shim.StartRequest) (*shim.StartResponse, error) { + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + if err := p.Start(ctx); err != nil { + return nil, err + } + return &shim.StartResponse{ + ID: p.ID(), + Pid: uint32(p.Pid()), + }, nil +} + +// Delete deletes the initial process and container. +func (s *Service) Delete(ctx context.Context, r *types.Empty) (*shim.DeleteResponse, error) { + p, err := s.getInitProcess() + if err != nil { + return nil, err + } + if err := p.Delete(ctx); err != nil { + return nil, err + } + s.mu.Lock() + delete(s.processes, s.id) + s.mu.Unlock() + s.platform.Close() + return &shim.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// DeleteProcess deletes an exec'd process. +func (s *Service) DeleteProcess(ctx context.Context, r *shim.DeleteProcessRequest) (*shim.DeleteResponse, error) { + if r.ID == s.id { + return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") + } + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + if err := p.Delete(ctx); err != nil { + return nil, err + } + s.mu.Lock() + delete(s.processes, r.ID) + s.mu.Unlock() + return &shim.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec spawns an additional process inside the container. +func (s *Service) Exec(ctx context.Context, r *shim.ExecProcessRequest) (*types.Empty, error) { + s.mu.Lock() + + if p := s.processes[r.ID]; p != nil { + s.mu.Unlock() + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID) + } + + p := s.processes[s.id] + s.mu.Unlock() + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + + process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{ + ID: r.ID, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Spec: r.Spec, + }) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + s.mu.Lock() + s.processes[r.ID] = process + s.mu.Unlock() + return empty, nil +} + +// ResizePty resises the terminal of a process. +func (s *Service) ResizePty(ctx context.Context, r *shim.ResizePtyRequest) (*types.Empty, error) { + if r.ID == "" { + return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided") + } + ws := console.WinSize{ + Width: uint16(r.Width), + Height: uint16(r.Height), + } + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + if err := p.Resize(ws); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process. +func (s *Service) State(ctx context.Context, r *shim.StateRequest) (*shim.StateResponse, error) { + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + } + sio := p.Stdio() + return &shim.StateResponse{ + ID: p.ID(), + Bundle: s.bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause pauses the container. +func (s *Service) Pause(ctx context.Context, r *types.Empty) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Resume resumes the container. +func (s *Service) Resume(ctx context.Context, r *types.Empty) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Kill kills a process with the provided signal. +func (s *Service) Kill(ctx context.Context, r *shim.KillRequest) (*types.Empty, error) { + if r.ID == "" { + p, err := s.getInitProcess() + if err != nil { + return nil, err + } + if err := p.Kill(ctx, r.Signal, r.All); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil + } + + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + if err := p.Kill(ctx, r.Signal, r.All); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// ListPids returns all pids inside the container. +func (s *Service) ListPids(ctx context.Context, r *shim.ListPidsRequest) (*shim.ListPidsResponse, error) { + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range s.processes { + if p.Pid() == int(pid) { + d := &runctypes.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &shim.ListPidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO closes the I/O context of a process. +func (s *Service) CloseIO(ctx context.Context, r *shim.CloseIORequest) (*types.Empty, error) { + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + if stdin := p.Stdin(); stdin != nil { + if err := stdin.Close(); err != nil { + return nil, fmt.Errorf("close stdin: %w", err) + } + } + return empty, nil +} + +// Checkpoint checkpoints the container. +func (s *Service) Checkpoint(ctx context.Context, r *shim.CheckpointTaskRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// ShimInfo returns shim information such as the shim's pid. +func (s *Service) ShimInfo(ctx context.Context, r *types.Empty) (*shim.ShimInfoResponse, error) { + return &shim.ShimInfoResponse{ + ShimPid: uint32(os.Getpid()), + }, nil +} + +// Update updates a running container. +func (s *Service) Update(ctx context.Context, r *shim.UpdateTaskRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Wait waits for a process to exit. +func (s *Service) Wait(ctx context.Context, r *shim.WaitRequest) (*shim.WaitResponse, error) { + p, err := s.getExecProcess(r.ID) + if err != nil { + return nil, err + } + p.Wait() + + return &shim.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +func (s *Service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *Service) allProcesses() []process.Process { + s.mu.Lock() + defer s.mu.Unlock() + + res := make([]process.Process, 0, len(s.processes)) + for _, p := range s.processes { + res = append(res, p) + } + return res +} + +func (s *Service) checkProcesses(e proc.Exit) { + for _, p := range s.allProcesses() { + if p.ID() == e.ID { + if ip, ok := p.(*proc.Init); ok { + // Ensure all children are killed. + if err := ip.KillAll(s.context); err != nil { + log.G(s.context).WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + p.SetExited(e.Status) + s.events <- &TaskExit{ + ContainerID: s.id, + ID: p.ID(), + Pid: uint32(p.Pid()), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + } + return + } + } +} + +func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + p, err := s.getInitProcess() + if err != nil { + return nil, err + } + + ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *Service) forward(publisher events.Publisher) { + for e := range s.events { + if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { + log.G(s.context).WithError(err).Error("post event") + } + } +} + +// getInitProcess returns the init process. +func (s *Service) getInitProcess() (process.Process, error) { + s.mu.Lock() + defer s.mu.Unlock() + p := s.processes[s.id] + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + return p, nil +} + +// getExecProcess returns the given exec process. +func (s *Service) getExecProcess(id string) (process.Process, error) { + s.mu.Lock() + defer s.mu.Unlock() + p := s.processes[id] + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s does not exist", id) + } + return p, nil +} + +func getTopic(ctx context.Context, e interface{}) string { + switch e.(type) { + case *TaskCreate: + return runtime.TaskCreateEventTopic + case *TaskStart: + return runtime.TaskStartEventTopic + case *TaskOOM: + return runtime.TaskOOMEventTopic + case *TaskExit: + return runtime.TaskExitEventTopic + case *TaskDelete: + return runtime.TaskDeleteEventTopic + case *TaskExecAdded: + return runtime.TaskExecAddedEventTopic + case *TaskExecStarted: + return runtime.TaskExecStartedEventTopic + default: + log.L.Printf("no topic for type %#v", e) + } + return runtime.TaskUnknownTopic +} + +func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, config map[string]string, platform stdio.Platform, r *proc.CreateConfig) (*proc.Init, error) { + var options runctypes.CreateOptions + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return nil, err + } + options = *v.(*runctypes.CreateOptions) + } + + spec, err := utils.ReadSpec(r.Bundle) + if err != nil { + return nil, fmt.Errorf("read oci spec: %w", err) + } + if err := utils.UpdateVolumeAnnotations(r.Bundle, spec); err != nil { + return nil, fmt.Errorf("update volume annotations: %w", err) + } + + runsc.FormatLogPath(r.ID, config) + rootfs := filepath.Join(path, "rootfs") + runtime := proc.NewRunsc(runtimeRoot, path, namespace, r.Runtime, config) + p := proc.New(r.ID, runtime, stdio.Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }) + p.Bundle = r.Bundle + p.Platform = platform + p.Rootfs = rootfs + p.WorkDir = workDir + p.IoUID = int(options.IoUid) + p.IoGID = int(options.IoGid) + p.Sandbox = utils.IsSandbox(spec) + p.UserLog = utils.UserLogPath(spec) + p.Monitor = reaper.Default + return p, nil +} diff --git a/pkg/shim/v1/utils/BUILD b/pkg/shim/v1/utils/BUILD new file mode 100644 index 000000000..54a0aabb7 --- /dev/null +++ b/pkg/shim/v1/utils/BUILD @@ -0,0 +1,27 @@ +load("//tools:defs.bzl", "go_library", "go_test") + +package(licenses = ["notice"]) + +go_library( + name = "utils", + srcs = [ + "annotations.go", + "utils.go", + "volumes.go", + ], + visibility = [ + "//pkg/shim:__subpackages__", + "//shim:__subpackages__", + ], + deps = [ + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", + ], +) + +go_test( + name = "utils_test", + size = "small", + srcs = ["volumes_test.go"], + library = ":utils", + deps = ["@com_github_opencontainers_runtime_spec//specs-go:go_default_library"], +) diff --git a/pkg/shim/v1/utils/annotations.go b/pkg/shim/v1/utils/annotations.go new file mode 100644 index 000000000..1e9d3f365 --- /dev/null +++ b/pkg/shim/v1/utils/annotations.go @@ -0,0 +1,25 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +// Annotations from the CRI annotations package. +// +// These are vendor due to import conflicts. +const ( + sandboxLogDirAnnotation = "io.kubernetes.cri.sandbox-log-directory" + containerTypeAnnotation = "io.kubernetes.cri.container-type" + containerTypeSandbox = "sandbox" + containerTypeContainer = "container" +) diff --git a/pkg/shim/v1/utils/utils.go b/pkg/shim/v1/utils/utils.go new file mode 100644 index 000000000..07e346654 --- /dev/null +++ b/pkg/shim/v1/utils/utils.go @@ -0,0 +1,56 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +// ReadSpec reads OCI spec from the bundle directory. +func ReadSpec(bundle string) (*specs.Spec, error) { + f, err := os.Open(filepath.Join(bundle, "config.json")) + if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + var spec specs.Spec + if err := json.Unmarshal(b, &spec); err != nil { + return nil, err + } + return &spec, nil +} + +// IsSandbox checks whether a container is a sandbox container. +func IsSandbox(spec *specs.Spec) bool { + t, ok := spec.Annotations[containerTypeAnnotation] + return !ok || t == containerTypeSandbox +} + +// UserLogPath gets user log path from OCI annotation. +func UserLogPath(spec *specs.Spec) string { + sandboxLogDir := spec.Annotations[sandboxLogDirAnnotation] + if sandboxLogDir == "" { + return "" + } + return filepath.Join(sandboxLogDir, "gvisor.log") +} diff --git a/pkg/shim/v1/utils/volumes.go b/pkg/shim/v1/utils/volumes.go new file mode 100644 index 000000000..52a428179 --- /dev/null +++ b/pkg/shim/v1/utils/volumes.go @@ -0,0 +1,155 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "path/filepath" + "strings" + + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +const volumeKeyPrefix = "dev.gvisor.spec.mount." + +var kubeletPodsDir = "/var/lib/kubelet/pods" + +// volumeName gets volume name from volume annotation key, example: +// dev.gvisor.spec.mount.NAME.share +func volumeName(k string) string { + return strings.SplitN(strings.TrimPrefix(k, volumeKeyPrefix), ".", 2)[0] +} + +// volumeFieldName gets volume field name from volume annotation key, example: +// `type` is the field of dev.gvisor.spec.mount.NAME.type +func volumeFieldName(k string) string { + parts := strings.Split(strings.TrimPrefix(k, volumeKeyPrefix), ".") + return parts[len(parts)-1] +} + +// podUID gets pod UID from the pod log path. +func podUID(s *specs.Spec) (string, error) { + sandboxLogDir := s.Annotations[sandboxLogDirAnnotation] + if sandboxLogDir == "" { + return "", fmt.Errorf("no sandbox log path annotation") + } + fields := strings.Split(filepath.Base(sandboxLogDir), "_") + switch len(fields) { + case 1: // This is the old CRI logging path. + return fields[0], nil + case 3: // This is the new CRI logging path. + return fields[2], nil + } + return "", fmt.Errorf("unexpected sandbox log path %q", sandboxLogDir) +} + +// isVolumeKey checks whether an annotation key is for volume. +func isVolumeKey(k string) bool { + return strings.HasPrefix(k, volumeKeyPrefix) +} + +// volumeSourceKey constructs the annotation key for volume source. +func volumeSourceKey(volume string) string { + return volumeKeyPrefix + volume + ".source" +} + +// volumePath searches the volume path in the kubelet pod directory. +func volumePath(volume, uid string) (string, error) { + // TODO: Support subpath when gvisor supports pod volume bind mount. + volumeSearchPath := fmt.Sprintf("%s/%s/volumes/*/%s", kubeletPodsDir, uid, volume) + dirs, err := filepath.Glob(volumeSearchPath) + if err != nil { + return "", err + } + if len(dirs) != 1 { + return "", fmt.Errorf("unexpected matched volume list %v", dirs) + } + return dirs[0], nil +} + +// isVolumePath checks whether a string is the volume path. +func isVolumePath(volume, path string) (bool, error) { + // TODO: Support subpath when gvisor supports pod volume bind mount. + volumeSearchPath := fmt.Sprintf("%s/*/volumes/*/%s", kubeletPodsDir, volume) + return filepath.Match(volumeSearchPath, path) +} + +// UpdateVolumeAnnotations add necessary OCI annotations for gvisor +// volume optimization. +func UpdateVolumeAnnotations(bundle string, s *specs.Spec) error { + var ( + uid string + err error + ) + if IsSandbox(s) { + uid, err = podUID(s) + if err != nil { + // Skip if we can't get pod UID, because this doesn't work + // for containerd 1.1. + return nil + } + } + var updated bool + for k, v := range s.Annotations { + if !isVolumeKey(k) { + continue + } + if volumeFieldName(k) != "type" { + continue + } + volume := volumeName(k) + if uid != "" { + // This is a sandbox. + path, err := volumePath(volume, uid) + if err != nil { + return fmt.Errorf("get volume path for %q: %w", volume, err) + } + s.Annotations[volumeSourceKey(volume)] = path + updated = true + } else { + // This is a container. + for i := range s.Mounts { + // An error is returned for sandbox if source + // annotation is not successfully applied, so + // it is guaranteed that the source annotation + // for sandbox has already been successfully + // applied at this point. + // + // The volume name is unique inside a pod, so + // matching without podUID is fine here. + // + // TODO: Pass podUID down to shim for containers to do + // more accurate matching. + if yes, _ := isVolumePath(volume, s.Mounts[i].Source); yes { + // gVisor requires the container mount type to match + // sandbox mount type. + s.Mounts[i].Type = v + updated = true + } + } + } + } + if !updated { + return nil + } + // Update bundle. + b, err := json.Marshal(s) + if err != nil { + return err + } + return ioutil.WriteFile(filepath.Join(bundle, "config.json"), b, 0666) +} diff --git a/pkg/shim/v1/utils/volumes_test.go b/pkg/shim/v1/utils/volumes_test.go new file mode 100644 index 000000000..3e02c6151 --- /dev/null +++ b/pkg/shim/v1/utils/volumes_test.go @@ -0,0 +1,308 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" + + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +func TestUpdateVolumeAnnotations(t *testing.T) { + dir, err := ioutil.TempDir("", "test-update-volume-annotations") + if err != nil { + t.Fatalf("create tempdir: %v", err) + } + defer os.RemoveAll(dir) + kubeletPodsDir = dir + + const ( + testPodUID = "testuid" + testVolumeName = "testvolume" + testLogDirPath = "/var/log/pods/testns_testname_" + testPodUID + testLegacyLogDirPath = "/var/log/pods/" + testPodUID + ) + testVolumePath := fmt.Sprintf("%s/%s/volumes/kubernetes.io~empty-dir/%s", dir, testPodUID, testVolumeName) + + if err := os.MkdirAll(testVolumePath, 0755); err != nil { + t.Fatalf("Create test volume: %v", err) + } + + for _, test := range []struct { + desc string + spec *specs.Spec + expected *specs.Spec + expectErr bool + expectUpdate bool + }{ + { + desc: "volume annotations for sandbox", + spec: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expected: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + "dev.gvisor.spec.mount." + testVolumeName + ".source": testVolumePath, + }, + }, + expectUpdate: true, + }, + { + desc: "volume annotations for sandbox with legacy log path", + spec: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expected: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLegacyLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + "dev.gvisor.spec.mount." + testVolumeName + ".source": testVolumePath, + }, + }, + expectUpdate: true, + }, + { + desc: "tmpfs: volume annotations for container", + spec: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "bind", + Source: testVolumePath, + Options: []string{"ro"}, + }, + { + Destination: "/random", + Type: "bind", + Source: "/random", + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expected: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "tmpfs", + Source: testVolumePath, + Options: []string{"ro"}, + }, + { + Destination: "/random", + Type: "bind", + Source: "/random", + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expectUpdate: true, + }, + { + desc: "bind: volume annotations for container", + spec: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "bind", + Source: testVolumePath, + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expected: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "bind", + Source: testVolumePath, + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "container", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "bind", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expectUpdate: true, + }, + { + desc: "should not return error without pod log directory", + spec: &specs.Spec{ + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + expected: &specs.Spec{ + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount." + testVolumeName + ".share": "pod", + "dev.gvisor.spec.mount." + testVolumeName + ".type": "tmpfs", + "dev.gvisor.spec.mount." + testVolumeName + ".options": "ro", + }, + }, + }, + { + desc: "should return error if volume path does not exist", + spec: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + "dev.gvisor.spec.mount.notexist.share": "pod", + "dev.gvisor.spec.mount.notexist.type": "tmpfs", + "dev.gvisor.spec.mount.notexist.options": "ro", + }, + }, + expectErr: true, + }, + { + desc: "no volume annotations for sandbox", + spec: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + }, + }, + expected: &specs.Spec{ + Annotations: map[string]string{ + sandboxLogDirAnnotation: testLogDirPath, + containerTypeAnnotation: containerTypeSandbox, + }, + }, + }, + { + desc: "no volume annotations for container", + spec: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "bind", + Source: "/test", + Options: []string{"ro"}, + }, + { + Destination: "/random", + Type: "bind", + Source: "/random", + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + }, + }, + expected: &specs.Spec{ + Mounts: []specs.Mount{ + { + Destination: "/test", + Type: "bind", + Source: "/test", + Options: []string{"ro"}, + }, + { + Destination: "/random", + Type: "bind", + Source: "/random", + Options: []string{"ro"}, + }, + }, + Annotations: map[string]string{ + containerTypeAnnotation: containerTypeContainer, + }, + }, + }, + } { + t.Run(test.desc, func(t *testing.T) { + bundle, err := ioutil.TempDir(dir, "test-bundle") + if err != nil { + t.Fatalf("Create test bundle: %v", err) + } + err = UpdateVolumeAnnotations(bundle, test.spec) + if test.expectErr { + if err == nil { + t.Fatal("Expected error, but got nil") + } + return + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expected, test.spec) { + t.Fatalf("Expected %+v, got %+v", test.expected, test.spec) + } + if test.expectUpdate { + b, err := ioutil.ReadFile(filepath.Join(bundle, "config.json")) + if err != nil { + t.Fatalf("Read spec from bundle: %v", err) + } + var spec specs.Spec + if err := json.Unmarshal(b, &spec); err != nil { + t.Fatalf("Unmarshal spec: %v", err) + } + if !reflect.DeepEqual(test.expected, &spec) { + t.Fatalf("Expected %+v, got %+v", test.expected, &spec) + } + } + }) + } +} diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD new file mode 100644 index 000000000..7e0a114a0 --- /dev/null +++ b/pkg/shim/v2/BUILD @@ -0,0 +1,43 @@ +load("//tools:defs.bzl", "go_library") + +package(licenses = ["notice"]) + +go_library( + name = "v2", + srcs = [ + "api.go", + "epoll.go", + "service.go", + "service_linux.go", + ], + visibility = ["//shim:__subpackages__"], + deps = [ + "//pkg/shim/runsc", + "//pkg/shim/v1/proc", + "//pkg/shim/v1/utils", + "//pkg/shim/v2/options", + "//pkg/shim/v2/runtimeoptions", + "//runsc/specutils", + "@com_github_burntsushi_toml//:go_default_library", + "@com_github_containerd_cgroups//:go_default_library", + "@com_github_containerd_console//:go_default_library", + "@com_github_containerd_containerd//api/events:go_default_library", + "@com_github_containerd_containerd//api/types/task:go_default_library", + "@com_github_containerd_containerd//errdefs:go_default_library", + "@com_github_containerd_containerd//events:go_default_library", + "@com_github_containerd_containerd//log:go_default_library", + "@com_github_containerd_containerd//mount:go_default_library", + "@com_github_containerd_containerd//namespaces:go_default_library", + "@com_github_containerd_containerd//pkg/process:go_default_library", + "@com_github_containerd_containerd//pkg/stdio:go_default_library", + "@com_github_containerd_containerd//runtime:go_default_library", + "@com_github_containerd_containerd//runtime/linux/runctypes:go_default_library", + "@com_github_containerd_containerd//runtime/v2/shim:go_default_library", + "@com_github_containerd_containerd//runtime/v2/task:go_default_library", + "@com_github_containerd_containerd//sys/reaper:go_default_library", + "@com_github_containerd_fifo//:go_default_library", + "@com_github_containerd_typeurl//:go_default_library", + "@com_github_gogo_protobuf//types:go_default_library", + "@org_golang_x_sys//unix:go_default_library", + ], +) diff --git a/pkg/shim/v2/api.go b/pkg/shim/v2/api.go new file mode 100644 index 000000000..dbe5c59f6 --- /dev/null +++ b/pkg/shim/v2/api.go @@ -0,0 +1,22 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "github.com/containerd/containerd/api/events" +) + +type TaskOOM = events.TaskOOM diff --git a/pkg/shim/v2/epoll.go b/pkg/shim/v2/epoll.go new file mode 100644 index 000000000..41232cca8 --- /dev/null +++ b/pkg/shim/v2/epoll.go @@ -0,0 +1,129 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package v2 + +import ( + "context" + "fmt" + "sync" + + "github.com/containerd/cgroups" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/runtime" + "golang.org/x/sys/unix" +) + +func newOOMEpoller(publisher events.Publisher) (*epoller, error) { + fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) + if err != nil { + return nil, err + } + return &epoller{ + fd: fd, + publisher: publisher, + set: make(map[uintptr]*item), + }, nil +} + +type epoller struct { + mu sync.Mutex + + fd int + publisher events.Publisher + set map[uintptr]*item +} + +type item struct { + id string + cg cgroups.Cgroup +} + +func (e *epoller) Close() error { + return unix.Close(e.fd) +} + +func (e *epoller) run(ctx context.Context) { + var events [128]unix.EpollEvent + for { + select { + case <-ctx.Done(): + e.Close() + return + default: + n, err := unix.EpollWait(e.fd, events[:], -1) + if err != nil { + if err == unix.EINTR || err == unix.EAGAIN { + continue + } + // Should not happen. + panic(fmt.Errorf("cgroups: epoll wait: %w", err)) + } + for i := 0; i < n; i++ { + e.process(ctx, uintptr(events[i].Fd)) + } + } + } +} + +func (e *epoller) add(id string, cg cgroups.Cgroup) error { + e.mu.Lock() + defer e.mu.Unlock() + fd, err := cg.OOMEventFD() + if err != nil { + return err + } + e.set[fd] = &item{ + id: id, + cg: cg, + } + event := unix.EpollEvent{ + Fd: int32(fd), + Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR, + } + return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event) +} + +func (e *epoller) process(ctx context.Context, fd uintptr) { + flush(fd) + e.mu.Lock() + i, ok := e.set[fd] + if !ok { + e.mu.Unlock() + return + } + e.mu.Unlock() + if i.cg.State() == cgroups.Deleted { + e.mu.Lock() + delete(e.set, fd) + e.mu.Unlock() + unix.Close(int(fd)) + return + } + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &TaskOOM{ + ContainerID: i.id, + }); err != nil { + // Should not happen. + panic(fmt.Errorf("publish OOM event: %w", err)) + } +} + +func flush(fd uintptr) error { + var buf [8]byte + _, err := unix.Read(int(fd), buf[:]) + return err +} diff --git a/pkg/shim/v2/options/BUILD b/pkg/shim/v2/options/BUILD new file mode 100644 index 000000000..ca212e874 --- /dev/null +++ b/pkg/shim/v2/options/BUILD @@ -0,0 +1,11 @@ +load("//tools:defs.bzl", "go_library") + +package(licenses = ["notice"]) + +go_library( + name = "options", + srcs = [ + "options.go", + ], + visibility = ["//:sandbox"], +) diff --git a/pkg/shim/v2/options/options.go b/pkg/shim/v2/options/options.go new file mode 100644 index 000000000..de09f2f79 --- /dev/null +++ b/pkg/shim/v2/options/options.go @@ -0,0 +1,33 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +const OptionType = "io.containerd.runsc.v1.options" + +// Options is runtime options for io.containerd.runsc.v1. +type Options struct { + // ShimCgroup is the cgroup the shim should be in. + ShimCgroup string `toml:"shim_cgroup"` + // IoUid is the I/O's pipes uid. + IoUid uint32 `toml:"io_uid"` + // IoUid is the I/O's pipes gid. + IoGid uint32 `toml:"io_gid"` + // BinaryName is the binary name of the runsc binary. + BinaryName string `toml:"binary_name"` + // Root is the runsc root directory. + Root string `toml:"root"` + // RunscConfig is a key/value map of all runsc flags. + RunscConfig map[string]string `toml:"runsc_config"` +} diff --git a/pkg/shim/v2/runtimeoptions/BUILD b/pkg/shim/v2/runtimeoptions/BUILD new file mode 100644 index 000000000..01716034c --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/BUILD @@ -0,0 +1,20 @@ +load("//tools:defs.bzl", "go_library", "proto_library") + +package(licenses = ["notice"]) + +proto_library( + name = "api", + srcs = [ + "runtimeoptions.proto", + ], +) + +go_library( + name = "runtimeoptions", + srcs = ["runtimeoptions.go"], + visibility = ["//pkg/shim/v2:__pkg__"], + deps = [ + "//pkg/shim/v2/runtimeoptions:api_go_proto", + "@com_github_gogo_protobuf//proto:go_default_library", + ], +) diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.go b/pkg/shim/v2/runtimeoptions/runtimeoptions.go new file mode 100644 index 000000000..1c1a0c5d1 --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.go @@ -0,0 +1,27 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtimeoptions + +import ( + proto "github.com/gogo/protobuf/proto" + pb "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions/api_go_proto" +) + +type Options = pb.Options + +func init() { + proto.RegisterType((*Options)(nil), "cri.runtimeoptions.v1.Options") +} diff --git a/pkg/shim/v2/runtimeoptions/runtimeoptions.proto b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto new file mode 100644 index 000000000..edb19020a --- /dev/null +++ b/pkg/shim/v2/runtimeoptions/runtimeoptions.proto @@ -0,0 +1,25 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package runtimeoptions; + +// This is a version of the runtimeoptions CRI API that is vendored. +// +// Imported the full CRI package is a nightmare. +message Options { + string type_url = 1; + string config_path = 2; +} diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go new file mode 100644 index 000000000..1534152fc --- /dev/null +++ b/pkg/shim/v2/service.go @@ -0,0 +1,824 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/BurntSushi/toml" + "github.com/containerd/cgroups" + "github.com/containerd/console" + "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/linux/runctypes" + "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" + "github.com/containerd/typeurl" + "github.com/gogo/protobuf/types" + "golang.org/x/sys/unix" + + "gvisor.dev/gvisor/pkg/shim/runsc" + "gvisor.dev/gvisor/pkg/shim/v1/proc" + "gvisor.dev/gvisor/pkg/shim/v1/utils" + "gvisor.dev/gvisor/pkg/shim/v2/options" + "gvisor.dev/gvisor/pkg/shim/v2/runtimeoptions" + "gvisor.dev/gvisor/runsc/specutils" +) + +var ( + empty = &types.Empty{} + bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, + } +) + +var _ = (taskAPI.TaskService)(&service{}) + +// configFile is the default config file name. For containerd 1.2, +// we assume that a config.toml should exist in the runtime root. +const configFile = "config.toml" + +// New returns a new shim service that can be used via GRPC. +func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { + ep, err := newOOMEpoller(publisher) + if err != nil { + return nil, err + } + go ep.run(ctx) + s := &service{ + id: id, + context: ctx, + processes: make(map[string]process.Process), + events: make(chan interface{}, 128), + ec: proc.ExitCh, + oomPoller: ep, + cancel: cancel, + } + go s.processExits() + runsc.Monitor = reaper.Default + if err := s.initPlatform(); err != nil { + cancel() + return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) + } + go s.forward(publisher) + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC. +type service struct { + mu sync.Mutex + + context context.Context + task process.Process + processes map[string]process.Process + events chan interface{} + platform stdio.Platform + opts options.Options + ec chan proc.Exit + oomPoller *epoller + + id string + bundle string + cancel func() +} + +func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-address", containerdAddress, + "-publish-binary", containerdBinary, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=2") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) { + cmd, err := newCommand(ctx, containerdBinary, containerdAddress) + if err != nil { + return "", err + } + address, err := shim.SocketAddress(ctx, id) + if err != nil { + return "", err + } + socket, err := shim.NewSocket(address) + if err != nil { + return "", err + } + defer socket.Close() + f, err := socket.File() + if err != nil { + return "", err + } + defer f.Close() + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + if err := cmd.Start(); err != nil { + return "", err + } + defer func() { + if err != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { + return "", err + } + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + if err := shim.SetScore(cmd.Process.Pid); err != nil { + return "", fmt.Errorf("failed to set OOM Score on shim: %w", err) + } + return address, nil +} + +func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + path, err := os.Getwd() + if err != nil { + return nil, err + } + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + runtime, err := s.readRuntime(path) + if err != nil { + return nil, err + } + r := proc.NewRunsc(s.opts.Root, path, ns, runtime, nil) + if err := r.Delete(ctx, s.id, &runsc.DeleteOpts{ + Force: true, + }); err != nil { + log.L.Printf("failed to remove runc container: %v", err) + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + log.L.Printf("failed to cleanup rootfs mount: %v", err) + } + return &taskAPI.DeleteResponse{ + ExitedAt: time.Now(), + ExitStatus: 128 + uint32(unix.SIGKILL), + }, nil +} + +func (s *service) readRuntime(path string) (string, error) { + data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) + if err != nil { + return "", err + } + return string(data), nil +} + +func (s *service) writeRuntime(path, runtime string) error { + return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) +} + +// Create creates a new initial process and container with the underlying OCI +// runtime. +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, fmt.Errorf("create namespace: %w", err) + } + + // Read from root for now. + var opts options.Options + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return nil, err + } + var path string + switch o := v.(type) { + case *runctypes.CreateOptions: // containerd 1.2.x + opts.IoUid = o.IoUid + opts.IoGid = o.IoGid + opts.ShimCgroup = o.ShimCgroup + case *runctypes.RuncOptions: // containerd 1.2.x + root := proc.RunscRoot + if o.RuntimeRoot != "" { + root = o.RuntimeRoot + } + + opts.BinaryName = o.Runtime + + path = filepath.Join(root, configFile) + if _, err := os.Stat(path); err != nil { + if !os.IsNotExist(err) { + return nil, fmt.Errorf("stat config file %q: %w", path, err) + } + // A config file in runtime root is not required. + path = "" + } + case *runtimeoptions.Options: // containerd 1.3.x+ + if o.ConfigPath == "" { + break + } + if o.TypeUrl != options.OptionType { + return nil, fmt.Errorf("unsupported option type %q", o.TypeUrl) + } + path = o.ConfigPath + default: + return nil, fmt.Errorf("unsupported option type %q", r.Options.TypeUrl) + } + if path != "" { + if _, err = toml.DecodeFile(path, &opts); err != nil { + return nil, fmt.Errorf("decode config file %q: %w", path, err) + } + } + } + + var mounts []proc.Mount + for _, m := range r.Rootfs { + mounts = append(mounts, proc.Mount{ + Type: m.Type, + Source: m.Source, + Target: m.Target, + Options: m.Options, + }) + } + + rootfs := filepath.Join(r.Bundle, "rootfs") + if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) { + return nil, err + } + + config := &proc.CreateConfig{ + ID: r.ID, + Bundle: r.Bundle, + Runtime: opts.BinaryName, + Rootfs: mounts, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Options: r.Options, + } + if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil { + return nil, err + } + defer func() { + if err != nil { + if err := mount.UnmountAll(rootfs, 0); err != nil { + log.L.Printf("failed to cleanup rootfs mount: %v", err) + } + } + }() + for _, rm := range mounts { + m := &mount.Mount{ + Type: rm.Type, + Source: rm.Source, + Options: rm.Options, + } + if err := m.Mount(rootfs); err != nil { + return nil, fmt.Errorf("failed to mount rootfs component %v: %w", m, err) + } + } + process, err := newInit( + ctx, + r.Bundle, + filepath.Join(r.Bundle, "work"), + ns, + s.platform, + config, + &opts, + rootfs, + ) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + if err := process.Create(ctx, config); err != nil { + return nil, errdefs.ToGRPC(err) + } + // Save the main task id and bundle to the shim for additional + // requests. + s.id = r.ID + s.bundle = r.Bundle + + // Set up OOM notification on the sandbox's cgroup. This is done on + // sandbox create since the sandbox process will be created here. + pid := process.Pid() + if pid > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) + if err != nil { + return nil, fmt.Errorf("loading cgroup for %d: %w", pid, err) + } + if err := s.oomPoller.add(s.id, cg); err != nil { + return nil, fmt.Errorf("add cg to OOM monitor: %w", err) + } + } + s.task = process + s.opts = opts + return &taskAPI.CreateTaskResponse{ + Pid: uint32(process.Pid()), + }, nil + +} + +// Start starts a process. +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + if err := p.Start(ctx); err != nil { + return nil, err + } + // TODO: Set the cgroup and oom notifications on restore. + // https://github.com/google/gvisor-containerd-shim/issues/58 + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete deletes the initial process and container. +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + if err := p.Delete(ctx); err != nil { + return nil, err + } + isTask := r.ExecID == "" + if !isTask { + s.mu.Lock() + delete(s.processes, r.ExecID) + s.mu.Unlock() + } + if isTask && s.platform != nil { + s.platform.Close() + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec spawns an additional process inside the container. +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*types.Empty, error) { + s.mu.Lock() + p := s.processes[r.ExecID] + s.mu.Unlock() + if p != nil { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + p = s.task + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{ + ID: r.ExecID, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Spec: r.Spec, + }) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + s.mu.Lock() + s.processes[r.ExecID] = process + s.mu.Unlock() + return empty, nil +} + +// ResizePty resizes the terminal of a process. +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*types.Empty, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + ws := console.WinSize{ + Width: uint16(r.Width), + Height: uint16(r.Height), + } + if err := p.Resize(ws); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process. +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: s.bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container. +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Resume the container. +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Kill a process with the provided signal. +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*types.Empty, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + if err := p.Kill(ctx, r.Signal, r.All); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container. +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range s.processes { + if p.Pid() == int(pid) { + d := &runctypes.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO closes the I/O context of a process. +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*types.Empty, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + if stdin := p.Stdin(); stdin != nil { + if err := stdin.Close(); err != nil { + return nil, fmt.Errorf("close stdin: %w", err) + } + } + return empty, nil +} + +// Checkpoint checkpoints the container. +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Connect returns shim information such as the shim's pid. +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if s.task != nil { + pid = s.task.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { + s.cancel() + os.Exit(0) + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + path, err := os.Getwd() + if err != nil { + return nil, err + } + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + runtime, err := s.readRuntime(path) + if err != nil { + return nil, err + } + rs := proc.NewRunsc(s.opts.Root, path, ns, runtime, nil) + stats, err := rs.Stats(ctx, s.id) + if err != nil { + return nil, err + } + + // gvisor currently (as of 2020-03-03) only returns the total memory + // usage and current PID value[0]. However, we copy the common fields here + // so that future updates will propagate correct information. We're + // using the cgroups.Metrics structure so we're returning the same type + // as runc. + // + // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 + data, err := typeurl.MarshalAny(&cgroups.Metrics{ + CPU: &cgroups.CPUStat{ + Usage: &cgroups.CPUUsage{ + Total: stats.Cpu.Usage.Total, + Kernel: stats.Cpu.Usage.Kernel, + User: stats.Cpu.Usage.User, + PerCPU: stats.Cpu.Usage.Percpu, + }, + Throttling: &cgroups.Throttle{ + Periods: stats.Cpu.Throttling.Periods, + ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, + ThrottledTime: stats.Cpu.Throttling.ThrottledTime, + }, + }, + Memory: &cgroups.MemoryStat{ + Cache: stats.Memory.Cache, + Usage: &cgroups.MemoryEntry{ + Limit: stats.Memory.Usage.Limit, + Usage: stats.Memory.Usage.Usage, + Max: stats.Memory.Usage.Max, + Failcnt: stats.Memory.Usage.Failcnt, + }, + Swap: &cgroups.MemoryEntry{ + Limit: stats.Memory.Swap.Limit, + Usage: stats.Memory.Swap.Usage, + Max: stats.Memory.Swap.Max, + Failcnt: stats.Memory.Swap.Failcnt, + }, + Kernel: &cgroups.MemoryEntry{ + Limit: stats.Memory.Kernel.Limit, + Usage: stats.Memory.Kernel.Usage, + Max: stats.Memory.Kernel.Max, + Failcnt: stats.Memory.Kernel.Failcnt, + }, + KernelTCP: &cgroups.MemoryEntry{ + Limit: stats.Memory.KernelTCP.Limit, + Usage: stats.Memory.KernelTCP.Usage, + Max: stats.Memory.KernelTCP.Max, + Failcnt: stats.Memory.KernelTCP.Failcnt, + }, + }, + Pids: &cgroups.PidsStat{ + Current: stats.Pids.Current, + Limit: stats.Pids.Limit, + }, + }) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +// Update updates a running container. +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*types.Empty, error) { + return empty, errdefs.ToGRPC(errdefs.ErrNotImplemented) +} + +// Wait waits for a process to exit. +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + p, err := s.getProcess(r.ExecID) + if err != nil { + return nil, err + } + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) checkProcesses(e proc.Exit) { + // TODO(random-liu): Add `shouldKillAll` logic if container pid + // namespace is supported. + for _, p := range s.allProcesses() { + if p.ID() == e.ID { + if ip, ok := p.(*proc.Init); ok { + // Ensure all children are killed. + if err := ip.KillAll(s.context); err != nil { + log.G(s.context).WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + p.SetExited(e.Status) + s.events <- &events.TaskExit{ + ContainerID: s.id, + ID: p.ID(), + Pid: uint32(p.Pid()), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + } + return + } + } +} + +func (s *service) allProcesses() (o []process.Process) { + s.mu.Lock() + defer s.mu.Unlock() + for _, p := range s.processes { + o = append(o, p) + } + if s.task != nil { + o = append(o, s.task) + } + return o +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + s.mu.Lock() + p := s.task + s.mu.Unlock() + if p == nil { + return nil, fmt.Errorf("container must be created: %w", errdefs.ErrFailedPrecondition) + } + ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(publisher shim.Publisher) { + for e := range s.events { + ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + err := publisher.Publish(ctx, getTopic(e), e) + cancel() + if err != nil { + // Should not happen. + panic(fmt.Errorf("post event: %w", err)) + } + } +} + +func (s *service) getProcess(execID string) (process.Process, error) { + s.mu.Lock() + defer s.mu.Unlock() + if execID == "" { + return s.task, nil + } + p := s.processes[execID] + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID) + } + return p, nil +} + +func getTopic(e interface{}) string { + switch e.(type) { + case *events.TaskCreate: + return runtime.TaskCreateEventTopic + case *events.TaskStart: + return runtime.TaskStartEventTopic + case *events.TaskOOM: + return runtime.TaskOOMEventTopic + case *events.TaskExit: + return runtime.TaskExitEventTopic + case *events.TaskDelete: + return runtime.TaskDeleteEventTopic + case *events.TaskExecAdded: + return runtime.TaskExecAddedEventTopic + case *events.TaskExecStarted: + return runtime.TaskExecStartedEventTopic + default: + log.L.Printf("no topic for type %#v", e) + } + return runtime.TaskUnknownTopic +} + +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { + spec, err := utils.ReadSpec(r.Bundle) + if err != nil { + return nil, fmt.Errorf("read oci spec: %w", err) + } + if err := utils.UpdateVolumeAnnotations(r.Bundle, spec); err != nil { + return nil, fmt.Errorf("update volume annotations: %w", err) + } + runsc.FormatLogPath(r.ID, options.RunscConfig) + runtime := proc.NewRunsc(options.Root, path, namespace, options.BinaryName, options.RunscConfig) + p := proc.New(r.ID, runtime, stdio.Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }) + p.Bundle = r.Bundle + p.Platform = platform + p.Rootfs = rootfs + p.WorkDir = workDir + p.IoUID = int(options.IoUid) + p.IoGID = int(options.IoGid) + p.Sandbox = specutils.SpecContainerType(spec) == specutils.ContainerTypeSandbox + p.UserLog = utils.UserLogPath(spec) + p.Monitor = reaper.Default + return p, nil +} diff --git a/pkg/shim/v2/service_linux.go b/pkg/shim/v2/service_linux.go new file mode 100644 index 000000000..1800ab90b --- /dev/null +++ b/pkg/shim/v2/service_linux.go @@ -0,0 +1,108 @@ +// Copyright 2018 The containerd Authors. +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build linux + +package v2 + +import ( + "context" + "fmt" + "io" + "sync" + "syscall" + + "github.com/containerd/console" + "github.com/containerd/fifo" +) + +type linuxPlatform struct { + epoller *console.Epoller +} + +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) { + if p.epoller == nil { + return nil, fmt.Errorf("uninitialized epoller") + } + + epollConsole, err := p.epoller.Add(console) + if err != nil { + return nil, err + } + + if stdin != "" { + in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) + if err != nil { + return nil, err + } + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(epollConsole, in, *p) + }() + } + + outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0) + if err != nil { + return nil, err + } + outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0) + if err != nil { + return nil, err + } + wg.Add(1) + go func() { + p := bufPool.Get().(*[]byte) + defer bufPool.Put(p) + io.CopyBuffer(outw, epollConsole, *p) + epollConsole.Close() + outr.Close() + outw.Close() + wg.Done() + }() + return epollConsole, nil +} + +func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error { + if p.epoller == nil { + return fmt.Errorf("uninitialized epoller") + } + epollConsole, ok := cons.(*console.EpollConsole) + if !ok { + return fmt.Errorf("expected EpollConsole, got %#v", cons) + } + return epollConsole.Shutdown(p.epoller.CloseConsole) +} + +func (p *linuxPlatform) Close() error { + return p.epoller.Close() +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + epoller, err := console.NewEpoller() + if err != nil { + return fmt.Errorf("failed to initialize epoller: %w", err) + } + s.platform = &linuxPlatform{ + epoller: epoller, + } + go epoller.Wait() + return nil +} diff --git a/pkg/sleep/BUILD b/pkg/sleep/BUILD index e131455f7..ae0fe1522 100644 --- a/pkg/sleep/BUILD +++ b/pkg/sleep/BUILD @@ -12,6 +12,7 @@ go_library( "sleep_unsafe.go", ], visibility = ["//:sandbox"], + deps = ["//pkg/sync"], ) go_test( diff --git a/pkg/sleep/sleep_test.go b/pkg/sleep/sleep_test.go index af47e2ba1..1dd11707d 100644 --- a/pkg/sleep/sleep_test.go +++ b/pkg/sleep/sleep_test.go @@ -379,10 +379,7 @@ func TestRace(t *testing.T) { // TestRaceInOrder tests that multiple wakers can continuously send wake requests to // the sleeper and that the wakers are retrieved in the order asserted. func TestRaceInOrder(t *testing.T) { - const wakers = 100 - const wakeRequests = 10000 - - w := make([]Waker, wakers) + w := make([]Waker, 10000) s := Sleeper{} // Associate each waker and start goroutines that will assert them. @@ -390,19 +387,16 @@ func TestRaceInOrder(t *testing.T) { s.AddWaker(&w[i], i) } go func() { - n := 0 - for n < wakeRequests { - wk := w[n%len(w)] - wk.Assert() - n++ + for i := range w { + w[i].Assert() } }() // Wait for all wake up notifications from all wakers. - for i := 0; i < wakeRequests; i++ { - v, _ := s.Fetch(true) - if got, want := v, i%wakers; got != want { - t.Fatalf("got %d want %d", got, want) + for want := range w { + got, _ := s.Fetch(true) + if got != want { + t.Fatalf("got %d want %d", got, want) } } } diff --git a/pkg/sleep/sleep_unsafe.go b/pkg/sleep/sleep_unsafe.go index f68c12620..118805492 100644 --- a/pkg/sleep/sleep_unsafe.go +++ b/pkg/sleep/sleep_unsafe.go @@ -75,6 +75,8 @@ package sleep import ( "sync/atomic" "unsafe" + + "gvisor.dev/gvisor/pkg/sync" ) const ( @@ -323,7 +325,12 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) { // // This struct is thread-safe, that is, its methods can be called concurrently // by multiple goroutines. +// +// Note, it is not safe to copy a Waker as its fields are modified by value +// (the pointer fields are individually modified with atomic operations). type Waker struct { + _ sync.NoCopy + // s is the sleeper that this waker can wake up. Only one sleeper at a // time is allowed. This field can have three classes of values: // nil -- the waker is not asserted: it either is not associated with diff --git a/pkg/sync/BUILD b/pkg/sync/BUILD index d0d77e19c..4d47207f7 100644 --- a/pkg/sync/BUILD +++ b/pkg/sync/BUILD @@ -33,6 +33,7 @@ go_library( "aliases.go", "memmove_unsafe.go", "mutex_unsafe.go", + "nocopy.go", "norace_unsafe.go", "race_unsafe.go", "rwmutex_unsafe.go", diff --git a/pkg/sync/nocopy.go b/pkg/sync/nocopy.go new file mode 100644 index 000000000..722b29501 --- /dev/null +++ b/pkg/sync/nocopy.go @@ -0,0 +1,28 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +// NoCopy may be embedded into structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +type NoCopy struct{} + +// Lock is a no-op used by -copylocks checker from `go vet`. +func (*NoCopy) Lock() {} + +// Unlock is a no-op used by -copylocks checker from `go vet`. +func (*NoCopy) Unlock() {} diff --git a/pkg/tcpip/header/BUILD b/pkg/tcpip/header/BUILD index 0cde694dc..d87797617 100644 --- a/pkg/tcpip/header/BUILD +++ b/pkg/tcpip/header/BUILD @@ -48,7 +48,7 @@ go_test( "//pkg/rand", "//pkg/tcpip", "//pkg/tcpip/buffer", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) @@ -64,6 +64,6 @@ go_test( deps = [ "//pkg/tcpip", "//pkg/tcpip/buffer", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go index eaee7e5d7..4bad930c7 100644 --- a/pkg/tcpip/link/fdbased/endpoint_test.go +++ b/pkg/tcpip/link/fdbased/endpoint_test.go @@ -500,3 +500,76 @@ func TestRecvMMsgDispatcherCapLength(t *testing.T) { } } + +// fakeNetworkDispatcher delivers packets to pkts. +type fakeNetworkDispatcher struct { + pkts []*stack.PacketBuffer +} + +func (d *fakeNetworkDispatcher) DeliverNetworkPacket(remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) { + d.pkts = append(d.pkts, pkt) +} + +func TestDispatchPacketFormat(t *testing.T) { + for _, test := range []struct { + name string + newDispatcher func(fd int, e *endpoint) (linkDispatcher, error) + }{ + { + name: "readVDispatcher", + newDispatcher: newReadVDispatcher, + }, + { + name: "recvMMsgDispatcher", + newDispatcher: newRecvMMsgDispatcher, + }, + } { + t.Run(test.name, func(t *testing.T) { + // Create a socket pair to send/recv. + fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_DGRAM, 0) + if err != nil { + t.Fatal(err) + } + defer syscall.Close(fds[0]) + defer syscall.Close(fds[1]) + + data := []byte{ + // Ethernet header. + 1, 2, 3, 4, 5, 60, + 1, 2, 3, 4, 5, 61, + 8, 0, + // Mock network header. + 40, 41, 42, 43, + } + err = syscall.Sendmsg(fds[1], data, nil, nil, 0) + if err != nil { + t.Fatal(err) + } + + // Create and run dispatcher once. + sink := &fakeNetworkDispatcher{} + d, err := test.newDispatcher(fds[0], &endpoint{ + hdrSize: header.EthernetMinimumSize, + dispatcher: sink, + }) + if err != nil { + t.Fatal(err) + } + if ok, err := d.dispatch(); !ok || err != nil { + t.Fatalf("d.dispatch() = %v, %v", ok, err) + } + + // Verify packet. + if got, want := len(sink.pkts), 1; got != want { + t.Fatalf("len(sink.pkts) = %d, want %d", got, want) + } + pkt := sink.pkts[0] + if got, want := len(pkt.LinkHeader), header.EthernetMinimumSize; got != want { + t.Errorf("len(pkt.LinkHeader) = %d, want %d", got, want) + } + if got, want := pkt.Data.Size(), 4; got != want { + t.Errorf("pkt.Data.Size() = %d, want %d", got, want) + } + }) + } +} diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go index f04738cfb..d8f2504b3 100644 --- a/pkg/tcpip/link/fdbased/packet_dispatchers.go +++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go @@ -278,7 +278,7 @@ func (d *recvMMsgDispatcher) dispatch() (bool, *tcpip.Error) { eth header.Ethernet ) if d.e.hdrSize > 0 { - eth = header.Ethernet(d.views[k][0]) + eth = header.Ethernet(d.views[k][0][:header.EthernetMinimumSize]) p = eth.Type() remote = eth.SourceAddress() local = eth.DestinationAddress() diff --git a/pkg/tcpip/network/ipv4/BUILD b/pkg/tcpip/network/ipv4/BUILD index 78420d6e6..d142b4ffa 100644 --- a/pkg/tcpip/network/ipv4/BUILD +++ b/pkg/tcpip/network/ipv4/BUILD @@ -34,6 +34,6 @@ go_test( "//pkg/tcpip/transport/tcp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/network/ipv6/BUILD b/pkg/tcpip/network/ipv6/BUILD index 3f71fc520..feada63dc 100644 --- a/pkg/tcpip/network/ipv6/BUILD +++ b/pkg/tcpip/network/ipv6/BUILD @@ -39,6 +39,6 @@ go_test( "//pkg/tcpip/transport/icmp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/stack/BUILD b/pkg/tcpip/stack/BUILD index e65c731c2..6b9a6b316 100644 --- a/pkg/tcpip/stack/BUILD +++ b/pkg/tcpip/stack/BUILD @@ -27,6 +27,18 @@ go_template_instance( }, ) +go_template_instance( + name = "tuple_list", + out = "tuple_list.go", + package = "stack", + prefix = "tuple", + template = "//pkg/ilist:generic_list", + types = { + "Element": "*tuple", + "Linker": "*tuple", + }, +) + go_library( name = "stack", srcs = [ @@ -35,6 +47,7 @@ go_library( "forwarder.go", "icmp_rate_limit.go", "iptables.go", + "iptables_state.go", "iptables_targets.go", "iptables_types.go", "linkaddrcache.go", @@ -50,6 +63,7 @@ go_library( "stack_global_state.go", "stack_options.go", "transport_demuxer.go", + "tuple_list.go", ], visibility = ["//visibility:public"], deps = [ @@ -95,7 +109,7 @@ go_test( "//pkg/tcpip/transport/icmp", "//pkg/tcpip/transport/udp", "//pkg/waiter", - "@com_github_google_go-cmp//cmp:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", ], ) diff --git a/pkg/tcpip/stack/conntrack.go b/pkg/tcpip/stack/conntrack.go index af9c325ca..d39baf620 100644 --- a/pkg/tcpip/stack/conntrack.go +++ b/pkg/tcpip/stack/conntrack.go @@ -15,9 +15,12 @@ package stack import ( + "encoding/binary" "sync" + "time" "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/hash/jenkins" "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/transport/tcpconntrack" ) @@ -30,6 +33,10 @@ import ( // // Currently, only TCP tracking is supported. +// Our hash table has 16K buckets. +// TODO(gvisor.dev/issue/170): These should be tunable. +const numBuckets = 1 << 14 + // Direction of the tuple. type direction int @@ -48,7 +55,12 @@ const ( // tuple holds a connection's identifying and manipulating data in one // direction. It is immutable. +// +// +stateify savable type tuple struct { + // tupleEntry is used to build an intrusive list of tuples. + tupleEntry + tupleID // conn is the connection tracking entry this tuple belongs to. @@ -61,6 +73,8 @@ type tuple struct { // tupleID uniquely identifies a connection in one direction. It currently // contains enough information to distinguish between any TCP or UDP // connection, and will need to be extended to support other protocols. +// +// +stateify savable type tupleID struct { srcAddr tcpip.Address srcPort uint16 @@ -83,6 +97,8 @@ func (ti tupleID) reply() tupleID { } // conn is a tracked connection. +// +// +stateify savable type conn struct { // original is the tuple in original direction. It is immutable. original tuple @@ -98,22 +114,67 @@ type conn struct { tcbHook Hook // mu protects tcb. - mu sync.Mutex + mu sync.Mutex `state:"nosave"` // tcb is TCB control block. It is used to keep track of states // of tcp connection and is protected by mu. tcb tcpconntrack.TCB + + // lastUsed is the last time the connection saw a relevant packet, and + // is updated by each packet on the connection. It is protected by mu. + lastUsed time.Time `state:".(unixTime)"` +} + +// timedOut returns whether the connection timed out based on its state. +func (cn *conn) timedOut(now time.Time) bool { + const establishedTimeout = 5 * 24 * time.Hour + const defaultTimeout = 120 * time.Second + cn.mu.Lock() + defer cn.mu.Unlock() + if cn.tcb.State() == tcpconntrack.ResultAlive { + // Use the same default as Linux, which doesn't delete + // established connections for 5(!) days. + return now.Sub(cn.lastUsed) > establishedTimeout + } + // Use the same default as Linux, which lets connections in most states + // other than established remain for <= 120 seconds. + return now.Sub(cn.lastUsed) > defaultTimeout } // ConnTrack tracks all connections created for NAT rules. Most users are // expected to only call handlePacket and createConnFor. +// +// ConnTrack keeps all connections in a slice of buckets, each of which holds a +// linked list of tuples. This gives us some desirable properties: +// - Each bucket has its own lock, lessening lock contention. +// - The slice is large enough that lists stay short (<10 elements on average). +// Thus traversal is fast. +// - During linked list traversal we reap expired connections. This amortizes +// the cost of reaping them and makes reapUnused faster. +// +// Locks are ordered by their location in the buckets slice. That is, a +// goroutine that locks buckets[i] can only lock buckets[j] s.t. i < j. +// +// +stateify savable type ConnTrack struct { - // mu protects conns. - mu sync.RWMutex + // seed is a one-time random value initialized at stack startup + // and is used in the calculation of hash keys for the list of buckets. + // It is immutable. + seed uint32 - // conns maintains a map of tuples needed for connection tracking for - // iptables NAT rules. It is protected by mu. - conns map[tupleID]tuple + // mu protects the buckets slice, but not buckets' contents. Only take + // the write lock if you are modifying the slice or saving for S/R. + mu sync.RWMutex `state:"nosave"` + + // buckets is protected by mu. + buckets []bucket +} + +// +stateify savable +type bucket struct { + // mu protects tuples. + mu sync.Mutex `state:"nosave"` + tuples tupleList } // packetToTupleID converts packet to a tuple ID. It fails when pkt lacks a valid @@ -143,8 +204,9 @@ func packetToTupleID(pkt *PacketBuffer) (tupleID, *tcpip.Error) { // newConn creates new connection. func newConn(orig, reply tupleID, manip manipType, hook Hook) *conn { conn := conn{ - manip: manip, - tcbHook: hook, + manip: manip, + tcbHook: hook, + lastUsed: time.Now(), } conn.original = tuple{conn: &conn, tupleID: orig} conn.reply = tuple{conn: &conn, tupleID: reply, direction: dirReply} @@ -162,14 +224,28 @@ func (ct *ConnTrack) connFor(pkt *PacketBuffer) (*conn, direction) { return nil, dirOriginal } - ct.mu.Lock() - defer ct.mu.Unlock() - - tuple, ok := ct.conns[tid] - if !ok { - return nil, dirOriginal + bucket := ct.bucket(tid) + now := time.Now() + + ct.mu.RLock() + defer ct.mu.RUnlock() + ct.buckets[bucket].mu.Lock() + defer ct.buckets[bucket].mu.Unlock() + + // Iterate over the tuples in a bucket, cleaning up any unused + // connections we find. + for other := ct.buckets[bucket].tuples.Front(); other != nil; other = other.Next() { + // Clean up any timed-out connections we happen to find. + if ct.reapTupleLocked(other, bucket, now) { + // The tuple expired. + continue + } + if tid == other.tupleID { + return other.conn, other.direction + } } - return tuple.conn, tuple.direction + + return nil, dirOriginal } // createConnFor creates a new conn for pkt. @@ -197,13 +273,31 @@ func (ct *ConnTrack) createConnFor(pkt *PacketBuffer, hook Hook, rt RedirectTarg } conn := newConn(tid, replyTID, manip, hook) - // Add the changed tuple to the map. - // TODO(gvisor.dev/issue/170): Need to support collisions using linked - // list. - ct.mu.Lock() - defer ct.mu.Unlock() - ct.conns[tid] = conn.original - ct.conns[replyTID] = conn.reply + // Lock the buckets in the correct order. + tupleBucket := ct.bucket(tid) + replyBucket := ct.bucket(replyTID) + ct.mu.RLock() + defer ct.mu.RUnlock() + if tupleBucket < replyBucket { + ct.buckets[tupleBucket].mu.Lock() + ct.buckets[replyBucket].mu.Lock() + } else if tupleBucket > replyBucket { + ct.buckets[replyBucket].mu.Lock() + ct.buckets[tupleBucket].mu.Lock() + } else { + // Both tuples are in the same bucket. + ct.buckets[tupleBucket].mu.Lock() + } + + // Add the tuple to the map. + ct.buckets[tupleBucket].tuples.PushFront(&conn.original) + ct.buckets[replyBucket].tuples.PushFront(&conn.reply) + + // Unlocking can happen in any order. + ct.buckets[tupleBucket].mu.Unlock() + if tupleBucket != replyBucket { + ct.buckets[replyBucket].mu.Unlock() + } return conn } @@ -297,35 +391,134 @@ func (ct *ConnTrack) handlePacket(pkt *PacketBuffer, hook Hook, gso *GSO, r *Rou // other tcp states. conn.mu.Lock() defer conn.mu.Unlock() - var st tcpconntrack.Result - tcpHeader := header.TCP(pkt.TransportHeader) - if conn.tcb.IsEmpty() { + + // Mark the connection as having been used recently so it isn't reaped. + conn.lastUsed = time.Now() + // Update connection state. + if tcpHeader := header.TCP(pkt.TransportHeader); conn.tcb.IsEmpty() { conn.tcb.Init(tcpHeader) conn.tcbHook = hook + } else if hook == conn.tcbHook { + conn.tcb.UpdateStateOutbound(tcpHeader) } else { - switch hook { - case conn.tcbHook: - st = conn.tcb.UpdateStateOutbound(tcpHeader) - default: - st = conn.tcb.UpdateStateInbound(tcpHeader) - } + conn.tcb.UpdateStateInbound(tcpHeader) } +} + +// bucket gets the conntrack bucket for a tupleID. +func (ct *ConnTrack) bucket(id tupleID) int { + h := jenkins.Sum32(ct.seed) + h.Write([]byte(id.srcAddr)) + h.Write([]byte(id.dstAddr)) + shortBuf := make([]byte, 2) + binary.LittleEndian.PutUint16(shortBuf, id.srcPort) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, id.dstPort) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, uint16(id.transProto)) + h.Write([]byte(shortBuf)) + binary.LittleEndian.PutUint16(shortBuf, uint16(id.netProto)) + h.Write([]byte(shortBuf)) + ct.mu.RLock() + defer ct.mu.RUnlock() + return int(h.Sum32()) % len(ct.buckets) +} - // Delete conn if tcp connection is closed. - if st == tcpconntrack.ResultClosedByPeer || st == tcpconntrack.ResultClosedBySelf || st == tcpconntrack.ResultReset { - ct.deleteConn(conn) +// reapUnused deletes timed out entries from the conntrack map. The rules for +// reaping are: +// - Most reaping occurs in connFor, which is called on each packet. connFor +// cleans up the bucket the packet's connection maps to. Thus calls to +// reapUnused should be fast. +// - Each call to reapUnused traverses a fraction of the conntrack table. +// Specifically, it traverses len(ct.buckets)/fractionPerReaping. +// - After reaping, reapUnused decides when it should next run based on the +// ratio of expired connections to examined connections. If the ratio is +// greater than maxExpiredPct, it schedules the next run quickly. Otherwise it +// slightly increases the interval between runs. +// - maxFullTraversal caps the time it takes to traverse the entire table. +// +// reapUnused returns the next bucket that should be checked and the time after +// which it should be called again. +func (ct *ConnTrack) reapUnused(start int, prevInterval time.Duration) (int, time.Duration) { + // TODO(gvisor.dev/issue/170): This can be more finely controlled, as + // it is in Linux via sysctl. + const fractionPerReaping = 128 + const maxExpiredPct = 50 + const maxFullTraversal = 60 * time.Second + const minInterval = 10 * time.Millisecond + const maxInterval = maxFullTraversal / fractionPerReaping + + now := time.Now() + checked := 0 + expired := 0 + var idx int + ct.mu.RLock() + defer ct.mu.RUnlock() + for i := 0; i < len(ct.buckets)/fractionPerReaping; i++ { + idx = (i + start) % len(ct.buckets) + ct.buckets[idx].mu.Lock() + for tuple := ct.buckets[idx].tuples.Front(); tuple != nil; tuple = tuple.Next() { + checked++ + if ct.reapTupleLocked(tuple, idx, now) { + expired++ + } + } + ct.buckets[idx].mu.Unlock() + } + // We already checked buckets[idx]. + idx++ + + // If half or more of the connections are expired, the table has gotten + // stale. Reschedule quickly. + expiredPct := 0 + if checked != 0 { + expiredPct = expired * 100 / checked + } + if expiredPct > maxExpiredPct { + return idx, minInterval + } + if interval := prevInterval + minInterval; interval <= maxInterval { + // Increment the interval between runs. + return idx, interval } + // We've hit the maximum interval. + return idx, maxInterval } -// deleteConn deletes the connection. -func (ct *ConnTrack) deleteConn(conn *conn) { - if conn == nil { - return +// reapTupleLocked tries to remove tuple and its reply from the table. It +// returns whether the tuple's connection has timed out. +// +// Preconditions: ct.mu is locked for reading and bucket is locked. +func (ct *ConnTrack) reapTupleLocked(tuple *tuple, bucket int, now time.Time) bool { + if !tuple.conn.timedOut(now) { + return false } - ct.mu.Lock() - defer ct.mu.Unlock() + // To maintain lock order, we can only reap these tuples if the reply + // appears later in the table. + replyBucket := ct.bucket(tuple.reply()) + if bucket > replyBucket { + return true + } + + // Don't re-lock if both tuples are in the same bucket. + differentBuckets := bucket != replyBucket + if differentBuckets { + ct.buckets[replyBucket].mu.Lock() + } + + // We have the buckets locked and can remove both tuples. + if tuple.direction == dirOriginal { + ct.buckets[replyBucket].tuples.Remove(&tuple.conn.reply) + } else { + ct.buckets[replyBucket].tuples.Remove(&tuple.conn.original) + } + ct.buckets[bucket].tuples.Remove(tuple) + + // Don't re-unlock if both tuples are in the same bucket. + if differentBuckets { + ct.buckets[replyBucket].mu.Unlock() + } - delete(ct.conns, conn.original.tupleID) - delete(ct.conns, conn.reply.tupleID) + return true } diff --git a/pkg/tcpip/stack/iptables.go b/pkg/tcpip/stack/iptables.go index 974d77c36..f846ea2e5 100644 --- a/pkg/tcpip/stack/iptables.go +++ b/pkg/tcpip/stack/iptables.go @@ -16,6 +16,7 @@ package stack import ( "fmt" + "time" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/header" @@ -41,6 +42,9 @@ const ( // underflow. const HookUnset = -1 +// reaperDelay is how long to wait before starting to reap connections. +const reaperDelay = 5 * time.Second + // DefaultTables returns a default set of tables. Each chain is set to accept // all packets. func DefaultTables() *IPTables { @@ -112,8 +116,9 @@ func DefaultTables() *IPTables { Output: []string{TablenameMangle, TablenameNat, TablenameFilter}, }, connections: ConnTrack{ - conns: make(map[tupleID]tuple), + seed: generateRandUint32(), }, + reaperDone: make(chan struct{}, 1), } } @@ -169,6 +174,12 @@ func (it *IPTables) GetTable(name string) (Table, bool) { func (it *IPTables) ReplaceTable(name string, table Table) { it.mu.Lock() defer it.mu.Unlock() + // If iptables is being enabled, initialize the conntrack table and + // reaper. + if !it.modified { + it.connections.buckets = make([]bucket, numBuckets) + it.startReaper(reaperDelay) + } it.modified = true it.tables[name] = table } @@ -249,6 +260,35 @@ func (it *IPTables) Check(hook Hook, pkt *PacketBuffer, gso *GSO, r *Route, addr return true } +// beforeSave is invoked by stateify. +func (it *IPTables) beforeSave() { + // Ensure the reaper exits cleanly. + it.reaperDone <- struct{}{} + // Prevent others from modifying the connection table. + it.connections.mu.Lock() +} + +// afterLoad is invoked by stateify. +func (it *IPTables) afterLoad() { + it.startReaper(reaperDelay) +} + +// startReaper starts a goroutine that wakes up periodically to reap timed out +// connections. +func (it *IPTables) startReaper(interval time.Duration) { + go func() { // S/R-SAFE: reaperDone is signalled when iptables is saved. + bucket := 0 + for { + select { + case <-it.reaperDone: + return + case <-time.After(interval): + bucket, interval = it.connections.reapUnused(bucket, interval) + } + } + }() +} + // CheckPackets runs pkts through the rules for hook and returns a map of packets that // should not go forward. // diff --git a/pkg/tcpip/stack/iptables_state.go b/pkg/tcpip/stack/iptables_state.go new file mode 100644 index 000000000..529e02a07 --- /dev/null +++ b/pkg/tcpip/stack/iptables_state.go @@ -0,0 +1,40 @@ +// Copyright 2020 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stack + +import ( + "time" +) + +// +stateify savable +type unixTime struct { + second int64 + nano int64 +} + +// saveLastUsed is invoked by stateify. +func (cn *conn) saveLastUsed() unixTime { + return unixTime{cn.lastUsed.Unix(), cn.lastUsed.UnixNano()} +} + +// loadLastUsed is invoked by stateify. +func (cn *conn) loadLastUsed(unix unixTime) { + cn.lastUsed = time.Unix(unix.second, unix.nano) +} + +// beforeSave is invoked by stateify. +func (ct *ConnTrack) beforeSave() { + ct.mu.Lock() +} diff --git a/pkg/tcpip/stack/iptables_types.go b/pkg/tcpip/stack/iptables_types.go index c528ec381..eb70e3104 100644 --- a/pkg/tcpip/stack/iptables_types.go +++ b/pkg/tcpip/stack/iptables_types.go @@ -78,6 +78,8 @@ const ( ) // IPTables holds all the tables for a netstack. +// +// +stateify savable type IPTables struct { // mu protects tables, priorities, and modified. mu sync.RWMutex @@ -97,10 +99,15 @@ type IPTables struct { modified bool connections ConnTrack + + // reaperDone can be signalled to stop the reaper goroutine. + reaperDone chan struct{} } // A Table defines a set of chains and hooks into the network stack. It is // really just a list of rules. +// +// +stateify savable type Table struct { // Rules holds the rules that make up the table. Rules []Rule @@ -130,6 +137,8 @@ func (table *Table) ValidHooks() uint32 { // contains zero or more matchers, each of which is a specification of which // packets this rule applies to. If there are no matchers in the rule, it // applies to any packet. +// +// +stateify savable type Rule struct { // Filter holds basic IP filtering fields common to every rule. Filter IPHeaderFilter @@ -142,6 +151,8 @@ type Rule struct { } // IPHeaderFilter holds basic IP filtering data common to every rule. +// +// +stateify savable type IPHeaderFilter struct { // Protocol matches the transport protocol. Protocol tcpip.TransportProtocolNumber diff --git a/pkg/tcpip/stack/packet_buffer.go b/pkg/tcpip/stack/packet_buffer.go index 1b5da6017..e3556d5d2 100644 --- a/pkg/tcpip/stack/packet_buffer.go +++ b/pkg/tcpip/stack/packet_buffer.go @@ -14,6 +14,7 @@ package stack import ( + "gvisor.dev/gvisor/pkg/sync" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/buffer" ) @@ -24,7 +25,7 @@ import ( // multiple endpoints. Clone() should be called in such cases so that // modifications to the Data field do not affect other copies. type PacketBuffer struct { - _ noCopy + _ sync.NoCopy // PacketBufferEntry is used to build an intrusive list of // PacketBuffers. @@ -102,14 +103,3 @@ func (pk *PacketBuffer) Clone() *PacketBuffer { NatDone: pk.NatDone, } } - -// noCopy may be embedded into structs which must not be copied -// after the first use. -// -// See https://golang.org/issues/8005#issuecomment-190753527 -// for details. -type noCopy struct{} - -// Lock is a no-op used by -copylocks checker from `go vet`. -func (*noCopy) Lock() {} -func (*noCopy) Unlock() {} diff --git a/pkg/tcpip/stack/stack.go b/pkg/tcpip/stack/stack.go index cdcfb8321..0aa815447 100644 --- a/pkg/tcpip/stack/stack.go +++ b/pkg/tcpip/stack/stack.go @@ -425,6 +425,7 @@ type Stack struct { handleLocal bool // tables are the iptables packet filtering and manipulation rules. + // TODO(gvisor.dev/issue/170): S/R this field. tables *IPTables // resumableEndpoints is a list of endpoints that need to be resumed if the diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go index 2f9872dc6..71bcee785 100644 --- a/pkg/tcpip/tcpip.go +++ b/pkg/tcpip/tcpip.go @@ -648,6 +648,11 @@ const ( // whether an IPv6 socket is to be restricted to sending and receiving // IPv6 packets only. V6OnlyOption + + // IPHdrIncludedOption is used by SetSockOpt to indicate for a raw + // endpoint that all packets being written have an IP header and the + // endpoint should not attach an IP header. + IPHdrIncludedOption ) // SockOptInt represents socket options which values have the int type. @@ -777,7 +782,7 @@ type CongestionControlOption string // control algorithms. type AvailableCongestionControlOption string -// buffer moderation. +// ModerateReceiveBufferOption is used by buffer moderation. type ModerateReceiveBufferOption bool // TCPLingerTimeoutOption is used by SetSockOpt/GetSockOpt to set/get the @@ -850,7 +855,10 @@ type OutOfBandInlineOption int // a default TTL. type DefaultTTLOption uint8 -// +// SocketDetachFilterOption is used by SetSockOpt to detach a previously attached +// classic BPF filter on a given endpoint. +type SocketDetachFilterOption int + // IPPacketInfo is the message structure for IP_PKTINFO. // // +stateify savable @@ -1239,6 +1247,9 @@ type UDPStats struct { // ChecksumErrors is the number of datagrams dropped due to bad checksums. ChecksumErrors *StatCounter + + // InvalidSourceAddress is the number of invalid sourced datagrams dropped. + InvalidSourceAddress *StatCounter } // Stats holds statistics about the networking stack. diff --git a/pkg/tcpip/timer.go b/pkg/tcpip/timer.go index 59f3b391f..5554c573f 100644 --- a/pkg/tcpip/timer.go +++ b/pkg/tcpip/timer.go @@ -15,8 +15,9 @@ package tcpip import ( - "sync" "time" + + "gvisor.dev/gvisor/pkg/sync" ) // cancellableTimerInstance is a specific instance of CancellableTimer. @@ -92,6 +93,8 @@ func (t *cancellableTimerInstance) stop() { // Note, it is not safe to copy a CancellableTimer as its timer instance creates // a closure over the address of the CancellableTimer. type CancellableTimer struct { + _ sync.NoCopy + // The active instance of a cancellable timer. instance cancellableTimerInstance @@ -157,22 +160,6 @@ func (t *CancellableTimer) Reset(d time.Duration) { } } -// Lock is a no-op used by the copylocks checker from go vet. -// -// See CancellableTimer for details about why it shouldn't be copied. -// -// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more -// details about the copylocks checker. -func (*CancellableTimer) Lock() {} - -// Unlock is a no-op used by the copylocks checker from go vet. -// -// See CancellableTimer for details about why it shouldn't be copied. -// -// See https://github.com/golang/go/issues/8005#issuecomment-190753527 for more -// details about the copylocks checker. -func (*CancellableTimer) Unlock() {} - // NewCancellableTimer returns an unscheduled CancellableTimer with the given // locker and fn. // diff --git a/pkg/tcpip/transport/icmp/endpoint.go b/pkg/tcpip/transport/icmp/endpoint.go index 62d1acad4..678f4e016 100644 --- a/pkg/tcpip/transport/icmp/endpoint.go +++ b/pkg/tcpip/transport/icmp/endpoint.go @@ -344,6 +344,10 @@ func (e *endpoint) Peek([][]byte) (int64, tcpip.ControlMessages, *tcpip.Error) { // SetSockOpt sets a socket option. func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + } return nil } diff --git a/pkg/tcpip/transport/packet/endpoint.go b/pkg/tcpip/transport/packet/endpoint.go index a8f8454dd..57b7f5c19 100644 --- a/pkg/tcpip/transport/packet/endpoint.go +++ b/pkg/tcpip/transport/packet/endpoint.go @@ -278,7 +278,13 @@ func (ep *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // used with SetSockOpt, and this function always returns // tcpip.ErrNotSupported. func (ep *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { - return tcpip.ErrUnknownProtocolOption + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + + default: + return tcpip.ErrUnknownProtocolOption + } } // SetSockOptBool implements tcpip.Endpoint.SetSockOptBool. diff --git a/pkg/tcpip/transport/raw/endpoint.go b/pkg/tcpip/transport/raw/endpoint.go index 766c7648e..c2e9fd29f 100644 --- a/pkg/tcpip/transport/raw/endpoint.go +++ b/pkg/tcpip/transport/raw/endpoint.go @@ -63,6 +63,7 @@ type endpoint struct { stack *stack.Stack `state:"manual"` waiterQueue *waiter.Queue associated bool + hdrIncluded bool // The following fields are used to manage the receive queue and are // protected by rcvMu. @@ -108,6 +109,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, transProt rcvBufSizeMax: 32 * 1024, sndBufSizeMax: 32 * 1024, associated: associated, + hdrIncluded: !associated, } // Override with stack defaults. @@ -182,10 +184,6 @@ func (e *endpoint) SetOwner(owner tcpip.PacketOwner) { // Read implements tcpip.Endpoint.Read. func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMessages, *tcpip.Error) { - if !e.associated { - return buffer.View{}, tcpip.ControlMessages{}, tcpip.ErrInvalidOptionValue - } - e.rcvMu.Lock() // If there's no data to read, return that read would block or that the @@ -263,7 +261,7 @@ func (e *endpoint) write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, <-c // If this is an unassociated socket and callee provided a nonzero // destination address, route using that address. - if !e.associated { + if e.hdrIncluded { ip := header.IPv4(payloadBytes) if !ip.IsValid(len(payloadBytes)) { e.mu.RUnlock() @@ -353,7 +351,7 @@ func (e *endpoint) finishWrite(payloadBytes []byte, route *stack.Route) (int64, } } - if !e.associated { + if e.hdrIncluded { if err := route.WriteHeaderIncludedPacket(&stack.PacketBuffer{ Data: buffer.View(payloadBytes).ToVectorisedView(), }); err != nil { @@ -508,11 +506,24 @@ func (e *endpoint) Readiness(mask waiter.EventMask) waiter.EventMask { // SetSockOpt implements tcpip.Endpoint.SetSockOpt. func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { - return tcpip.ErrUnknownProtocolOption + switch opt.(type) { + case tcpip.SocketDetachFilterOption: + return nil + + default: + return tcpip.ErrUnknownProtocolOption + } } // SetSockOptBool implements tcpip.Endpoint.SetSockOptBool. func (e *endpoint) SetSockOptBool(opt tcpip.SockOptBool, v bool) *tcpip.Error { + switch opt { + case tcpip.IPHdrIncludedOption: + e.mu.Lock() + e.hdrIncluded = v + e.mu.Unlock() + return nil + } return tcpip.ErrUnknownProtocolOption } @@ -577,6 +588,12 @@ func (e *endpoint) GetSockOptBool(opt tcpip.SockOptBool) (bool, *tcpip.Error) { case tcpip.KeepaliveEnabledOption: return false, nil + case tcpip.IPHdrIncludedOption: + e.mu.Lock() + v := e.hdrIncluded + e.mu.Unlock() + return v, nil + default: return false, tcpip.ErrUnknownProtocolOption } @@ -616,8 +633,15 @@ func (e *endpoint) GetSockOptInt(opt tcpip.SockOptInt) (int, *tcpip.Error) { func (e *endpoint) HandlePacket(route *stack.Route, pkt *stack.PacketBuffer) { e.rcvMu.Lock() - // Drop the packet if our buffer is currently full. - if e.rcvClosed { + // Drop the packet if our buffer is currently full or if this is an unassociated + // endpoint (i.e endpoint created w/ IPPROTO_RAW). Such endpoints are send only + // See: https://man7.org/linux/man-pages/man7/raw.7.html + // + // An IPPROTO_RAW socket is send only. If you really want to receive + // all IP packets, use a packet(7) socket with the ETH_P_IP protocol. + // Note that packet sockets don't reassemble IP fragments, unlike raw + // sockets. + if e.rcvClosed || !e.associated { e.rcvMu.Unlock() e.stack.Stats().DroppedPackets.Increment() e.stats.ReceiveErrors.ClosedReceiver.Increment() diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index caac6ef57..83dc10ed0 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -1792,6 +1792,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { e.deferAccept = time.Duration(v) e.UnlockUser() + case tcpip.SocketDetachFilterOption: + return nil + default: return nil } diff --git a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go index 12bc1b5b5..558b06df0 100644 --- a/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go +++ b/pkg/tcpip/transport/tcpconntrack/tcp_conntrack.go @@ -106,6 +106,11 @@ func (t *TCB) UpdateStateOutbound(tcp header.TCP) Result { return st } +// State returns the current state of the TCB. +func (t *TCB) State() Result { + return t.state +} + // IsAlive returns true as long as the connection is established(Alive) // or connecting state. func (t *TCB) IsAlive() bool { diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index 0584ec8dc..a14643ae8 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -816,6 +816,9 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { e.mu.Lock() e.bindToDevice = id e.mu.Unlock() + + case tcpip.SocketDetachFilterOption: + return nil } return nil } @@ -1377,6 +1380,15 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk return } + // Never receive from a multicast address. + if header.IsV4MulticastAddress(id.RemoteAddress) || + header.IsV6MulticastAddress(id.RemoteAddress) { + e.stack.Stats().UDP.InvalidSourceAddress.Increment() + e.stack.Stats().IP.InvalidSourceAddressesReceived.Increment() + e.stats.ReceiveErrors.MalformedPacketsReceived.Increment() + return + } + // Verify checksum unless RX checksum offload is enabled. // On IPv4, UDP checksum is optional, and a zero value means // the transmitter omitted the checksum generation (RFC768). @@ -1395,10 +1407,10 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, pk } } - e.rcvMu.Lock() e.stack.Stats().UDP.PacketsReceived.Increment() e.stats.PacketsReceived.Increment() + e.rcvMu.Lock() // Drop the packet if our buffer is currently full. if !e.rcvReady || e.rcvClosed { e.rcvMu.Unlock() diff --git a/pkg/tcpip/transport/udp/udp_test.go b/pkg/tcpip/transport/udp/udp_test.go index 91ba031fa..90781cf49 100644 --- a/pkg/tcpip/transport/udp/udp_test.go +++ b/pkg/tcpip/transport/udp/udp_test.go @@ -83,16 +83,18 @@ type header4Tuple struct { type testFlow int const ( - unicastV4 testFlow = iota // V4 unicast on a V4 socket - unicastV4in6 // V4-mapped unicast on a V6-dual socket - unicastV6 // V6 unicast on a V6 socket - unicastV6Only // V6 unicast on a V6-only socket - multicastV4 // V4 multicast on a V4 socket - multicastV4in6 // V4-mapped multicast on a V6-dual socket - multicastV6 // V6 multicast on a V6 socket - multicastV6Only // V6 multicast on a V6-only socket - broadcast // V4 broadcast on a V4 socket - broadcastIn6 // V4-mapped broadcast on a V6-dual socket + unicastV4 testFlow = iota // V4 unicast on a V4 socket + unicastV4in6 // V4-mapped unicast on a V6-dual socket + unicastV6 // V6 unicast on a V6 socket + unicastV6Only // V6 unicast on a V6-only socket + multicastV4 // V4 multicast on a V4 socket + multicastV4in6 // V4-mapped multicast on a V6-dual socket + multicastV6 // V6 multicast on a V6 socket + multicastV6Only // V6 multicast on a V6-only socket + broadcast // V4 broadcast on a V4 socket + broadcastIn6 // V4-mapped broadcast on a V6-dual socket + reverseMulticast4 // V4 multicast src. Must fail. + reverseMulticast6 // V6 multicast src. Must fail. ) func (flow testFlow) String() string { @@ -117,6 +119,10 @@ func (flow testFlow) String() string { return "broadcast" case broadcastIn6: return "broadcastIn6" + case reverseMulticast4: + return "reverseMulticast4" + case reverseMulticast6: + return "reverseMulticast6" default: return "unknown" } @@ -168,6 +174,9 @@ func (flow testFlow) header4Tuple(d packetDirection) header4Tuple { h.dstAddr.Addr = multicastV6Addr } } + if flow.isReverseMulticast() { + h.srcAddr.Addr = flow.getMcastAddr() + } return h } @@ -199,9 +208,9 @@ func (flow testFlow) netProto() tcpip.NetworkProtocolNumber { // endpoint for this flow. func (flow testFlow) sockProto() tcpip.NetworkProtocolNumber { switch flow { - case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6: + case unicastV4in6, unicastV6, unicastV6Only, multicastV4in6, multicastV6, multicastV6Only, broadcastIn6, reverseMulticast6: return ipv6.ProtocolNumber - case unicastV4, multicastV4, broadcast: + case unicastV4, multicastV4, broadcast, reverseMulticast4: return ipv4.ProtocolNumber default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -224,7 +233,7 @@ func (flow testFlow) isV6Only() bool { switch flow { case unicastV6Only, multicastV6Only: return true - case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6: + case unicastV4, unicastV4in6, unicastV6, multicastV4, multicastV4in6, multicastV6, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -235,7 +244,7 @@ func (flow testFlow) isMulticast() bool { switch flow { case multicastV4, multicastV4in6, multicastV6, multicastV6Only: return true - case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6: + case unicastV4, unicastV4in6, unicastV6, unicastV6Only, broadcast, broadcastIn6, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -246,7 +255,7 @@ func (flow testFlow) isBroadcast() bool { switch flow { case broadcast, broadcastIn6: return true - case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only: + case unicastV4, unicastV4in6, unicastV6, unicastV6Only, multicastV4, multicastV4in6, multicastV6, multicastV6Only, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) @@ -257,13 +266,22 @@ func (flow testFlow) isMapped() bool { switch flow { case unicastV4in6, multicastV4in6, broadcastIn6: return true - case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast: + case unicastV4, unicastV6, unicastV6Only, multicastV4, multicastV6, multicastV6Only, broadcast, reverseMulticast4, reverseMulticast6: return false default: panic(fmt.Sprintf("invalid testFlow given: %d", flow)) } } +func (flow testFlow) isReverseMulticast() bool { + switch flow { + case reverseMulticast4, reverseMulticast6: + return true + default: + return false + } +} + type testContext struct { t *testing.T linkEP *channel.Endpoint @@ -872,6 +890,60 @@ func TestV4ReadOnBoundToBroadcast(t *testing.T) { } } +// TestReadFromMulticast checks that an endpoint will NOT receive a packet +// that was sent with multicast SOURCE address. +func TestReadFromMulticast(t *testing.T) { + for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6} { + t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) { + c := newDualTestContext(t, defaultMTU) + defer c.cleanup() + + c.createEndpointForFlow(flow) + + if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil { + t.Fatalf("Bind failed: %s", err) + } + testFailingRead(c, flow, false /* expectReadError */) + }) + } +} + +// TestReadFromMulticaststats checks that a discarded packet +// that that was sent with multicast SOURCE address increments +// the correct counters and that a regular packet does not. +func TestReadFromMulticastStats(t *testing.T) { + t.Helper() + for _, flow := range []testFlow{reverseMulticast4, reverseMulticast6, unicastV4} { + t.Run(fmt.Sprintf("flow:%s", flow), func(t *testing.T) { + c := newDualTestContext(t, defaultMTU) + defer c.cleanup() + + c.createEndpointForFlow(flow) + + if err := c.ep.Bind(tcpip.FullAddress{Port: stackPort}); err != nil { + t.Fatalf("Bind failed: %s", err) + } + + payload := newPayload() + c.injectPacket(flow, payload) + + var want uint64 = 0 + if flow.isReverseMulticast() { + want = 1 + } + if got := c.s.Stats().IP.InvalidSourceAddressesReceived.Value(); got != want { + t.Errorf("got stats.IP.InvalidSourceAddressesReceived.Value() = %d, want = %d", got, want) + } + if got := c.s.Stats().UDP.InvalidSourceAddress.Value(); got != want { + t.Errorf("got stats.UDP.InvalidSourceAddress.Value() = %d, want = %d", got, want) + } + if got := c.ep.Stats().(*tcpip.TransportEndpointStats).ReceiveErrors.MalformedPacketsReceived.Value(); got != want { + t.Errorf("got EP Stats.ReceiveErrors.MalformedPacketsReceived stats = %d, want = %d", got, want) + } + }) + } +} + // TestV4ReadBroadcastOnBoundToWildcard checks that an endpoint can bind to ANY // and receive broadcast and unicast data. func TestV4ReadBroadcastOnBoundToWildcard(t *testing.T) { diff --git a/pkg/test/criutil/criutil.go b/pkg/test/criutil/criutil.go index 8fed29ff5..66f10c016 100644 --- a/pkg/test/criutil/criutil.go +++ b/pkg/test/criutil/criutil.go @@ -22,6 +22,9 @@ import ( "fmt" "os" "os/exec" + "path" + "regexp" + "strconv" "strings" "time" @@ -33,28 +36,44 @@ import ( type Crictl struct { logger testutil.Logger endpoint string + runpArgs []string cleanup []func() } // resolvePath attempts to find binary paths. It may set the path to invalid, // which will cause the execution to fail with a sensible error. func resolvePath(executable string) string { + runtime, err := dockerutil.RuntimePath() + if err == nil { + // Check first the directory of the runtime itself. + if dir := path.Dir(runtime); dir != "" && dir != "." { + guess := path.Join(dir, executable) + if fi, err := os.Stat(guess); err == nil && (fi.Mode()&0111) != 0 { + return guess + } + } + } + + // Try to find via the path. guess, err := exec.LookPath(executable) - if err != nil { - guess = fmt.Sprintf("/usr/local/bin/%s", executable) + if err == nil { + return guess } - return guess + + // Return a default path. + return fmt.Sprintf("/usr/local/bin/%s", executable) } // NewCrictl returns a Crictl configured with a timeout and an endpoint over // which it will talk to containerd. -func NewCrictl(logger testutil.Logger, endpoint string) *Crictl { +func NewCrictl(logger testutil.Logger, endpoint string, runpArgs []string) *Crictl { // Attempt to find the executable, but don't bother propagating the // error at this point. The first command executed will return with a // binary not found error. return &Crictl{ logger: logger, endpoint: endpoint, + runpArgs: runpArgs, } } @@ -67,8 +86,8 @@ func (cc *Crictl) CleanUp() { } // RunPod creates a sandbox. It corresponds to `crictl runp`. -func (cc *Crictl) RunPod(sbSpecFile string) (string, error) { - podID, err := cc.run("runp", sbSpecFile) +func (cc *Crictl) RunPod(runtime, sbSpecFile string) (string, error) { + podID, err := cc.run("runp", "--runtime", runtime, sbSpecFile) if err != nil { return "", fmt.Errorf("runp failed: %v", err) } @@ -79,10 +98,42 @@ func (cc *Crictl) RunPod(sbSpecFile string) (string, error) { // Create creates a container within a sandbox. It corresponds to `crictl // create`. func (cc *Crictl) Create(podID, contSpecFile, sbSpecFile string) (string, error) { - podID, err := cc.run("create", podID, contSpecFile, sbSpecFile) + // In version 1.16.0, crictl annoying starting attempting to pull the + // container, even if it was already available locally. We therefore + // need to parse the version and add an appropriate --no-pull argument + // since the image has already been loaded locally. + out, err := cc.run("-v") + if err != nil { + return "", err + } + r := regexp.MustCompile("crictl version ([0-9]+)\\.([0-9]+)\\.([0-9+])") + vs := r.FindStringSubmatch(out) + if len(vs) != 4 { + return "", fmt.Errorf("crictl -v had unexpected output: %s", out) + } + major, err := strconv.ParseUint(vs[1], 10, 64) if err != nil { + return "", fmt.Errorf("crictl had invalid version: %v (%s)", err, out) + } + minor, err := strconv.ParseUint(vs[2], 10, 64) + if err != nil { + return "", fmt.Errorf("crictl had invalid version: %v (%s)", err, out) + } + + args := []string{"create"} + if (major == 1 && minor >= 16) || major > 1 { + args = append(args, "--no-pull") + } + args = append(args, podID) + args = append(args, contSpecFile) + args = append(args, sbSpecFile) + + podID, err = cc.run(args...) + if err != nil { + time.Sleep(10 * time.Minute) // XXX return "", fmt.Errorf("create failed: %v", err) } + // Strip the trailing newline from crictl output. return strings.TrimSpace(podID), nil } @@ -260,7 +311,7 @@ func (cc *Crictl) StopContainer(contID string) error { // StartPodAndContainer starts a sandbox and container in that sandbox. It // returns the pod ID and container ID. -func (cc *Crictl) StartPodAndContainer(image, sbSpec, contSpec string) (string, string, error) { +func (cc *Crictl) StartPodAndContainer(runtime, image, sbSpec, contSpec string) (string, string, error) { if err := cc.Import(image); err != nil { return "", "", err } @@ -277,7 +328,7 @@ func (cc *Crictl) StartPodAndContainer(image, sbSpec, contSpec string) (string, } cc.cleanup = append(cc.cleanup, cleanup) - podID, err := cc.RunPod(sbSpecFile) + podID, err := cc.RunPod(runtime, sbSpecFile) if err != nil { return "", "", err } diff --git a/pkg/test/dockerutil/dockerutil.go b/pkg/test/dockerutil/dockerutil.go index f95ae3cd1..df09babf3 100644 --- a/pkg/test/dockerutil/dockerutil.go +++ b/pkg/test/dockerutil/dockerutil.go @@ -119,3 +119,8 @@ func Save(logger testutil.Logger, image string, w io.Writer) error { cmd.Stdout = w // Send directly to the writer. return cmd.Run() } + +// Runtime returns the value of the flag runtime. +func Runtime() string { + return *runtime +} diff --git a/pkg/test/testutil/BUILD b/pkg/test/testutil/BUILD index 03b1b4677..2d8f56bc0 100644 --- a/pkg/test/testutil/BUILD +++ b/pkg/test/testutil/BUILD @@ -15,6 +15,6 @@ go_library( "//runsc/boot", "//runsc/specutils", "@com_github_cenkalti_backoff//:go_default_library", - "@com_github_opencontainers_runtime-spec//specs-go:go_default_library", + "@com_github_opencontainers_runtime_spec//specs-go:go_default_library", ], ) |