summaryrefslogtreecommitdiffhomepage
path: root/pkg/shim/v2/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/shim/v2/service.go')
-rw-r--r--pkg/shim/v2/service.go65
1 files changed, 50 insertions, 15 deletions
diff --git a/pkg/shim/v2/service.go b/pkg/shim/v2/service.go
index 2e39d2c4a..cba403cae 100644
--- a/pkg/shim/v2/service.go
+++ b/pkg/shim/v2/service.go
@@ -67,9 +67,15 @@ var (
var _ = (taskAPI.TaskService)(&service{})
-// configFile is the default config file name. For containerd 1.2,
-// we assume that a config.toml should exist in the runtime root.
-const configFile = "config.toml"
+const (
+ // configFile is the default config file name. For containerd 1.2,
+ // we assume that a config.toml should exist in the runtime root.
+ configFile = "config.toml"
+
+ // shimAddressPath is the relative path to a file that contains the address
+ // to the shim UDS. See service.shimAddress.
+ shimAddressPath = "address"
+)
// New returns a new shim service that can be used via GRPC.
func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()) (shim.Shim, error) {
@@ -101,6 +107,11 @@ func New(ctx context.Context, id string, publisher shim.Publisher, cancel func()
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
}
go s.forward(ctx, publisher)
+
+ if address, err := shim.ReadAddress(shimAddressPath); err == nil {
+ s.shimAddress = address
+ }
+
return s, nil
}
@@ -152,6 +163,9 @@ type service struct {
// cancel is a function that needs to be called before the shim stops. The
// function is provided by the caller to New().
cancel func()
+
+ // shimAddress is the location of the UDS used to communicate to containerd.
+ shimAddress string
}
func (s *service) newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
@@ -191,38 +205,58 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
if err != nil {
return "", err
}
- address, err := shim.SocketAddress(ctx, id)
+ address, err := shim.SocketAddress(ctx, containerdAddress, id)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
- return "", err
+ // The only time where this would happen is if there is a bug and the socket
+ // was not cleaned up in the cleanup method of the shim or we are using the
+ // grouping functionality where the new process should be run with the same
+ // shim as an existing container.
+ if !shim.SocketEaddrinuse(err) {
+ return "", fmt.Errorf("create new shim socket: %w", err)
+ }
+ if shim.CanConnect(address) {
+ if err := shim.WriteAddress(shimAddressPath, address); err != nil {
+ return "", fmt.Errorf("write existing socket for shim: %w", err)
+ }
+ return address, nil
+ }
+ if err := shim.RemoveSocket(address); err != nil {
+ return "", fmt.Errorf("remove pre-existing socket: %w", err)
+ }
+ if socket, err = shim.NewSocket(address); err != nil {
+ return "", fmt.Errorf("try create new shim socket 2x: %w", err)
+ }
}
- defer socket.Close()
+ cu := cleanup.Make(func() {
+ socket.Close()
+ _ = shim.RemoveSocket(address)
+ })
+ defer cu.Clean()
+
f, err := socket.File()
if err != nil {
return "", err
}
- defer f.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
log.L.Debugf("Executing: %q %s", cmd.Path, cmd.Args)
if err := cmd.Start(); err != nil {
+ f.Close()
return "", err
}
- cu := cleanup.Make(func() {
- cmd.Process.Kill()
- })
- defer cu.Clean()
+ cu.Add(func() { cmd.Process.Kill() })
// make sure to wait after start
go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
- if err := shim.WriteAddress("address", address); err != nil {
+ if err := shim.WriteAddress(shimAddressPath, address); err != nil {
return "", err
}
if err := shim.SetScore(cmd.Process.Pid); err != nil {
@@ -675,8 +709,11 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*types.Empty, error) {
log.L.Debugf("Shutdown, id: %s", r.ID)
s.cancel()
+ if s.shimAddress != "" {
+ _ = shim.RemoveSocket(s.shimAddress)
+ }
os.Exit(0)
- return empty, nil
+ panic("Should not get here")
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
@@ -843,9 +880,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
for e := range s.events {
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
- cancel()
if err != nil {
// Should not happen.
panic(fmt.Errorf("post event: %w", err))