diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/v2/epoll.go | 129 | ||||
-rw-r--r-- | pkg/v2/service.go | 22 |
2 files changed, 151 insertions, 0 deletions
diff --git a/pkg/v2/epoll.go b/pkg/v2/epoll.go new file mode 100644 index 000000000..76c7b54d6 --- /dev/null +++ b/pkg/v2/epoll.go @@ -0,0 +1,129 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v2 + +import ( + "context" + "sync" + + "github.com/containerd/cgroups" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/runtime" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" +) + +func newOOMEpoller(publisher events.Publisher) (*epoller, error) { + fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) + if err != nil { + return nil, err + } + return &epoller{ + fd: fd, + publisher: publisher, + set: make(map[uintptr]*item), + }, nil +} + +type epoller struct { + mu sync.Mutex + + fd int + publisher events.Publisher + set map[uintptr]*item +} + +type item struct { + id string + cg cgroups.Cgroup +} + +func (e *epoller) Close() error { + return unix.Close(e.fd) +} + +func (e *epoller) run(ctx context.Context) { + var events [128]unix.EpollEvent + for { + select { + case <-ctx.Done(): + e.Close() + return + default: + n, err := unix.EpollWait(e.fd, events[:], -1) + if err != nil { + if err == unix.EINTR { + continue + } + logrus.WithError(err).Error("cgroups: epoll wait") + } + for i := 0; i < n; i++ { + e.process(ctx, uintptr(events[i].Fd)) + } + } + } +} + +func (e *epoller) add(id string, cg cgroups.Cgroup) error { + e.mu.Lock() + defer e.mu.Unlock() + fd, err := cg.OOMEventFD() + if err != nil { + return err + } + e.set[fd] = &item{ + id: id, + cg: cg, + } + event := unix.EpollEvent{ + Fd: int32(fd), + Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR, + } + return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event) +} + +func (e *epoller) process(ctx context.Context, fd uintptr) { + flush(fd) + e.mu.Lock() + i, ok := e.set[fd] + if !ok { + e.mu.Unlock() + return + } + e.mu.Unlock() + if i.cg.State() == cgroups.Deleted { + e.mu.Lock() + delete(e.set, fd) + e.mu.Unlock() + unix.Close(int(fd)) + return + } + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{ + ContainerID: i.id, + }); err != nil { + logrus.WithError(err).Error("publish OOM event") + } +} + +func flush(fd uintptr) error { + var buf [8]byte + _, err := unix.Read(int(fd), buf[:]) + return err +} diff --git a/pkg/v2/service.go b/pkg/v2/service.go index 3f0a23625..afbe3ee7f 100644 --- a/pkg/v2/service.go +++ b/pkg/v2/service.go @@ -75,13 +75,19 @@ const configFile = "config.toml" // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { + ep, err := newOOMEpoller(publisher) + if err != nil { + return nil, err + } ctx, cancel := context.WithCancel(ctx) + go ep.run(ctx) s := &service{ id: id, context: ctx, processes: make(map[string]rproc.Process), events: make(chan interface{}, 128), ec: proc.ExitCh, + oomPoller: ep, cancel: cancel, } go s.processExits() @@ -104,6 +110,7 @@ type service struct { events chan interface{} platform rproc.Platform ec chan proc.Exit + oomPoller *epoller id string bundle string @@ -343,6 +350,19 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // save the main task id and bundle to the shim for additional requests s.id = r.ID s.bundle = r.Bundle + + // Set up OOM notification on the sandbox's cgroup. This is done on sandbox + // create since the sandbox process will be created here. + pid := process.Pid() + if pid > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) + if err != nil { + return nil, errors.Wrapf(err, "loading cgroup for %d", pid) + } + if err := s.oomPoller.add(s.id, cg); err != nil { + return nil, errors.Wrapf(err, "add cg to OOM monitor") + } + } s.task = process return &taskAPI.CreateTaskResponse{ Pid: uint32(process.Pid()), @@ -359,6 +379,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if err := p.Start(ctx); err != nil { return nil, err } + // TODO: Set the cgroup and oom notifications on restore. + // https://github.com/google/gvisor-containerd-shim/issues/58 return &taskAPI.StartResponse{ Pid: uint32(p.Pid()), }, nil |