diff options
Diffstat (limited to 'pkg/shim/v2')
-rw-r--r-- | pkg/shim/v2/BUILD | 1 | ||||
-rw-r--r-- | pkg/shim/v2/service.go | 86 |
2 files changed, 62 insertions, 25 deletions
diff --git a/pkg/shim/v2/BUILD b/pkg/shim/v2/BUILD index f37fefddc..b0e8daa51 100644 --- a/pkg/shim/v2/BUILD +++ b/pkg/shim/v2/BUILD @@ -22,6 +22,7 @@ go_library( "//runsc/specutils", "@com_github_burntsushi_toml//:go_default_library", "@com_github_containerd_cgroups//:go_default_library", + "@com_github_containerd_cgroups//stats/v1:go_default_library", "@com_github_containerd_console//:go_default_library", "@com_github_containerd_containerd//api/events:go_default_library", "@com_github_containerd_containerd//api/types/task:go_default_library", diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go index 2e39d2c4a..6aaf5fab8 100644 --- a/pkg/shim/v2/service.go +++ b/pkg/shim/v2/service.go @@ -28,6 +28,7 @@ import ( "github.com/BurntSushi/toml" "github.com/containerd/cgroups" + cgroupsstats "github.com/containerd/cgroups/stats/v1" "github.com/containerd/console" "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" @@ -67,9 +68,15 @@ var ( var _ = (taskAPI.TaskService)(&service{}) -// configFile is the default config file name. For containerd 1.2, -// we assume that a config.toml should exist in the runtime root. -const configFile = "config.toml" +const ( + // configFile is the default config file name. For containerd 1.2, + // we assume that a config.toml should exist in the runtime root. + configFile = "config.toml" + + // shimAddressPath is the relative path to a file that contains the address + // to the shim UDS. See service.shimAddress. + shimAddressPath = "address" +) // New returns a new shim service that can be used via GRPC. func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) { @@ -101,6 +108,11 @@ func New(ctx context.Context, id string, publisher shim.Publisher, cancel func() return nil, fmt.Errorf("failed to initialized platform behavior: %w", err) } go s.forward(ctx, publisher) + + if address, err := shim.ReadAddress(shimAddressPath); err == nil { + s.shimAddress = address + } + return s, nil } @@ -152,6 +164,9 @@ type service struct { // cancel is a function that needs to be called before the shim stops. The // function is provided by the caller to New(). cancel func() + + // shimAddress is the location of the UDS used to communicate to containerd. + shimAddress string } func (s *service) newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { @@ -191,38 +206,58 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container if err != nil { return "", err } - address, err := shim.SocketAddress(ctx, id) + address, err := shim.SocketAddress(ctx, containerdAddress, id) if err != nil { return "", err } socket, err := shim.NewSocket(address) if err != nil { - return "", err + // The only time where this would happen is if there is a bug and the socket + // was not cleaned up in the cleanup method of the shim or we are using the + // grouping functionality where the new process should be run with the same + // shim as an existing container. + if !shim.SocketEaddrinuse(err) { + return "", fmt.Errorf("create new shim socket: %w", err) + } + if shim.CanConnect(address) { + if err := shim.WriteAddress(shimAddressPath, address); err != nil { + return "", fmt.Errorf("write existing socket for shim: %w", err) + } + return address, nil + } + if err := shim.RemoveSocket(address); err != nil { + return "", fmt.Errorf("remove pre-existing socket: %w", err) + } + if socket, err = shim.NewSocket(address); err != nil { + return "", fmt.Errorf("try create new shim socket 2x: %w", err) + } } - defer socket.Close() + cu := cleanup.Make(func() { + socket.Close() + _ = shim.RemoveSocket(address) + }) + defer cu.Clean() + f, err := socket.File() if err != nil { return "", err } - defer f.Close() cmd.ExtraFiles = append(cmd.ExtraFiles, f) log.L.Debugf("Executing: %q %s", cmd.Path, cmd.Args) if err := cmd.Start(); err != nil { + f.Close() return "", err } - cu := cleanup.Make(func() { - cmd.Process.Kill() - }) - defer cu.Clean() + cu.Add(func() { cmd.Process.Kill() }) // make sure to wait after start go cmd.Wait() if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil { return "", err } - if err := shim.WriteAddress("address", address); err != nil { + if err := shim.WriteAddress(shimAddressPath, address); err != nil { return "", err } if err := shim.SetScore(cmd.Process.Pid); err != nil { @@ -675,8 +710,11 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) { log.L.Debugf("Shutdown, id: %s", r.ID) s.cancel() + if s.shimAddress != "" { + _ = shim.RemoveSocket(s.shimAddress) + } os.Exit(0) - return empty, nil + panic("Should not get here") } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { @@ -698,48 +736,48 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // as runc. // // [0]: https://github.com/google/gvisor/blob/277a0d5a1fbe8272d4729c01ee4c6e374d047ebc/runsc/boot/events.go#L61-L81 - metrics := &cgroups.Metrics{ - CPU: &cgroups.CPUStat{ - Usage: &cgroups.CPUUsage{ + metrics := &cgroupsstats.Metrics{ + CPU: &cgroupsstats.CPUStat{ + Usage: &cgroupsstats.CPUUsage{ Total: stats.Cpu.Usage.Total, Kernel: stats.Cpu.Usage.Kernel, User: stats.Cpu.Usage.User, PerCPU: stats.Cpu.Usage.Percpu, }, - Throttling: &cgroups.Throttle{ + Throttling: &cgroupsstats.Throttle{ Periods: stats.Cpu.Throttling.Periods, ThrottledPeriods: stats.Cpu.Throttling.ThrottledPeriods, ThrottledTime: stats.Cpu.Throttling.ThrottledTime, }, }, - Memory: &cgroups.MemoryStat{ + Memory: &cgroupsstats.MemoryStat{ Cache: stats.Memory.Cache, - Usage: &cgroups.MemoryEntry{ + Usage: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Usage.Limit, Usage: stats.Memory.Usage.Usage, Max: stats.Memory.Usage.Max, Failcnt: stats.Memory.Usage.Failcnt, }, - Swap: &cgroups.MemoryEntry{ + Swap: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Swap.Limit, Usage: stats.Memory.Swap.Usage, Max: stats.Memory.Swap.Max, Failcnt: stats.Memory.Swap.Failcnt, }, - Kernel: &cgroups.MemoryEntry{ + Kernel: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.Kernel.Limit, Usage: stats.Memory.Kernel.Usage, Max: stats.Memory.Kernel.Max, Failcnt: stats.Memory.Kernel.Failcnt, }, - KernelTCP: &cgroups.MemoryEntry{ + KernelTCP: &cgroupsstats.MemoryEntry{ Limit: stats.Memory.KernelTCP.Limit, Usage: stats.Memory.KernelTCP.Usage, Max: stats.Memory.KernelTCP.Max, Failcnt: stats.Memory.KernelTCP.Failcnt, }, }, - Pids: &cgroups.PidsStat{ + Pids: &cgroupsstats.PidsStat{ Current: stats.Pids.Current, Limit: stats.Pids.Limit, }, @@ -843,9 +881,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *service) forward(ctx context.Context, publisher shim.Publisher) { for e := range s.events { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) err := publisher.Publish(ctx, getTopic(e), e) - cancel() if err != nil { // Should not happen. panic(fmt.Errorf("post event: %w", err)) |