summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/v2/epoll.go129
-rw-r--r--pkg/v2/service.go22
2 files changed, 151 insertions, 0 deletions
diff --git a/pkg/v2/epoll.go b/pkg/v2/epoll.go
new file mode 100644
index 000000000..76c7b54d6
--- /dev/null
+++ b/pkg/v2/epoll.go
@@ -0,0 +1,129 @@
+// +build linux
+
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package v2
+
+import (
+ "context"
+ "sync"
+
+ "github.com/containerd/cgroups"
+ eventstypes "github.com/containerd/containerd/api/events"
+ "github.com/containerd/containerd/events"
+ "github.com/containerd/containerd/runtime"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/sys/unix"
+)
+
+func newOOMEpoller(publisher events.Publisher) (*epoller, error) {
+ fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
+ if err != nil {
+ return nil, err
+ }
+ return &epoller{
+ fd: fd,
+ publisher: publisher,
+ set: make(map[uintptr]*item),
+ }, nil
+}
+
+type epoller struct {
+ mu sync.Mutex
+
+ fd int
+ publisher events.Publisher
+ set map[uintptr]*item
+}
+
+type item struct {
+ id string
+ cg cgroups.Cgroup
+}
+
+func (e *epoller) Close() error {
+ return unix.Close(e.fd)
+}
+
+func (e *epoller) run(ctx context.Context) {
+ var events [128]unix.EpollEvent
+ for {
+ select {
+ case <-ctx.Done():
+ e.Close()
+ return
+ default:
+ n, err := unix.EpollWait(e.fd, events[:], -1)
+ if err != nil {
+ if err == unix.EINTR {
+ continue
+ }
+ logrus.WithError(err).Error("cgroups: epoll wait")
+ }
+ for i := 0; i < n; i++ {
+ e.process(ctx, uintptr(events[i].Fd))
+ }
+ }
+ }
+}
+
+func (e *epoller) add(id string, cg cgroups.Cgroup) error {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ fd, err := cg.OOMEventFD()
+ if err != nil {
+ return err
+ }
+ e.set[fd] = &item{
+ id: id,
+ cg: cg,
+ }
+ event := unix.EpollEvent{
+ Fd: int32(fd),
+ Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR,
+ }
+ return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
+}
+
+func (e *epoller) process(ctx context.Context, fd uintptr) {
+ flush(fd)
+ e.mu.Lock()
+ i, ok := e.set[fd]
+ if !ok {
+ e.mu.Unlock()
+ return
+ }
+ e.mu.Unlock()
+ if i.cg.State() == cgroups.Deleted {
+ e.mu.Lock()
+ delete(e.set, fd)
+ e.mu.Unlock()
+ unix.Close(int(fd))
+ return
+ }
+ if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
+ ContainerID: i.id,
+ }); err != nil {
+ logrus.WithError(err).Error("publish OOM event")
+ }
+}
+
+func flush(fd uintptr) error {
+ var buf [8]byte
+ _, err := unix.Read(int(fd), buf[:])
+ return err
+}
diff --git a/pkg/v2/service.go b/pkg/v2/service.go
index 3f0a23625..afbe3ee7f 100644
--- a/pkg/v2/service.go
+++ b/pkg/v2/service.go
@@ -75,13 +75,19 @@ const configFile = "config.toml"
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
+ ep, err := newOOMEpoller(publisher)
+ if err != nil {
+ return nil, err
+ }
ctx, cancel := context.WithCancel(ctx)
+ go ep.run(ctx)
s := &service{
id: id,
context: ctx,
processes: make(map[string]rproc.Process),
events: make(chan interface{}, 128),
ec: proc.ExitCh,
+ oomPoller: ep,
cancel: cancel,
}
go s.processExits()
@@ -104,6 +110,7 @@ type service struct {
events chan interface{}
platform rproc.Platform
ec chan proc.Exit
+ oomPoller *epoller
id string
bundle string
@@ -343,6 +350,19 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
+
+ // Set up OOM notification on the sandbox's cgroup. This is done on sandbox
+ // create since the sandbox process will be created here.
+ pid := process.Pid()
+ if pid > 0 {
+ cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
+ if err != nil {
+ return nil, errors.Wrapf(err, "loading cgroup for %d", pid)
+ }
+ if err := s.oomPoller.add(s.id, cg); err != nil {
+ return nil, errors.Wrapf(err, "add cg to OOM monitor")
+ }
+ }
s.task = process
return &taskAPI.CreateTaskResponse{
Pid: uint32(process.Pid()),
@@ -359,6 +379,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if err := p.Start(ctx); err != nil {
return nil, err
}
+ // TODO: Set the cgroup and oom notifications on restore.
+ // https://github.com/google/gvisor-containerd-shim/issues/58
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil