diff options
-rw-r--r-- | WORKSPACE | 6 | ||||
-rw-r--r-- | pkg/eventchannel/BUILD | 13 | ||||
-rw-r--r-- | pkg/eventchannel/event.go | 64 | ||||
-rw-r--r-- | pkg/eventchannel/event_test.go | 146 | ||||
-rw-r--r-- | pkg/eventchannel/rate.go | 54 | ||||
-rw-r--r-- | pkg/sentry/kernel/kernel.go | 36 | ||||
-rw-r--r-- | pkg/sentry/syscalls/linux/linux64.go | 3 |
7 files changed, 294 insertions, 28 deletions
@@ -145,6 +145,12 @@ go_repository( ) go_repository( + name = "org_golang_x_time", + commit = "9d24e82272b4f38b78bc8cff74fa936d31ccd8ef", + importpath = "golang.org/x/time", +) + +go_repository( name = "org_golang_x_tools", commit = "aa82965741a9fecd12b026fbb3d3c6ed3231b8f8", importpath = "golang.org/x/tools", diff --git a/pkg/eventchannel/BUILD b/pkg/eventchannel/BUILD index 4c336ea84..9961baaa9 100644 --- a/pkg/eventchannel/BUILD +++ b/pkg/eventchannel/BUILD @@ -1,4 +1,4 @@ -load("//tools/go_stateify:defs.bzl", "go_library") +load("//tools/go_stateify:defs.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") package(licenses = ["notice"]) @@ -7,6 +7,7 @@ go_library( name = "eventchannel", srcs = [ "event.go", + "rate.go", ], importpath = "gvisor.dev/gvisor/pkg/eventchannel", visibility = ["//:sandbox"], @@ -16,6 +17,7 @@ go_library( "//pkg/unet", "@com_github_golang_protobuf//proto:go_default_library", "@com_github_golang_protobuf//ptypes:go_default_library_gen", + "@org_golang_x_time//rate:go_default_library", ], ) @@ -30,3 +32,12 @@ go_proto_library( proto = ":eventchannel_proto", visibility = ["//:sandbox"], ) + +go_test( + name = "eventchannel_test", + srcs = ["event_test.go"], + embed = [":eventchannel"], + deps = [ + "@com_github_golang_protobuf//proto:go_default_library", + ], +) diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go index f6d26532b..d37ad0428 100644 --- a/pkg/eventchannel/event.go +++ b/pkg/eventchannel/event.go @@ -43,18 +43,36 @@ type Emitter interface { Close() error } -var ( - mu sync.Mutex - emitters = make(map[Emitter]struct{}) -) +// DefaultEmitter is the default emitter. Calls to Emit and AddEmitter are sent +// to this Emitter. +var DefaultEmitter = &multiEmitter{} -// Emit emits a message using all added emitters. +// Emit is a helper method that calls DefaultEmitter.Emit. func Emit(msg proto.Message) error { - mu.Lock() - defer mu.Unlock() + _, err := DefaultEmitter.Emit(msg) + return err +} + +// AddEmitter is a helper method that calls DefaultEmitter.AddEmitter. +func AddEmitter(e Emitter) { + DefaultEmitter.AddEmitter(e) +} + +// multiEmitter is an Emitter that forwards messages to multiple Emitters. +type multiEmitter struct { + // mu protects emitters. + mu sync.Mutex + // emitters is initialized lazily in AddEmitter. + emitters map[Emitter]struct{} +} + +// Emit emits a message using all added emitters. +func (me *multiEmitter) Emit(msg proto.Message) (bool, error) { + me.mu.Lock() + defer me.mu.Unlock() var err error - for e := range emitters { + for e := range me.emitters { hangup, eerr := e.Emit(msg) if eerr != nil { if err == nil { @@ -68,18 +86,36 @@ func Emit(msg proto.Message) error { } if hangup { log.Infof("Hangup on eventchannel emitter %v.", e) - delete(emitters, e) + delete(me.emitters, e) } } - return err + return false, err } // AddEmitter adds a new emitter. -func AddEmitter(e Emitter) { - mu.Lock() - defer mu.Unlock() - emitters[e] = struct{}{} +func (me *multiEmitter) AddEmitter(e Emitter) { + me.mu.Lock() + defer me.mu.Unlock() + if me.emitters == nil { + me.emitters = make(map[Emitter]struct{}) + } + me.emitters[e] = struct{}{} +} + +// Close closes all emitters. If any Close call errors, it returns the first +// one encountered. +func (me *multiEmitter) Close() error { + me.mu.Lock() + defer me.mu.Unlock() + var err error + for e := range me.emitters { + if eerr := e.Close(); err == nil && eerr != nil { + err = eerr + } + delete(me.emitters, e) + } + return err } func marshal(msg proto.Message) ([]byte, error) { diff --git a/pkg/eventchannel/event_test.go b/pkg/eventchannel/event_test.go new file mode 100644 index 000000000..3649097d6 --- /dev/null +++ b/pkg/eventchannel/event_test.go @@ -0,0 +1,146 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" +) + +// testEmitter is an emitter that can be used in tests. It records all events +// emitted, and whether it has been closed. +type testEmitter struct { + // mu protects all fields below. + mu sync.Mutex + + // events contains all emitted events. + events []proto.Message + + // closed records whether Close() was called. + closed bool +} + +// Emit implements Emitter.Emit. +func (te *testEmitter) Emit(msg proto.Message) (bool, error) { + te.mu.Lock() + defer te.mu.Unlock() + te.events = append(te.events, msg) + return false, nil +} + +// Close implements Emitter.Close. +func (te *testEmitter) Close() error { + te.mu.Lock() + defer te.mu.Unlock() + if te.closed { + return fmt.Errorf("closed called twice") + } + te.closed = true + return nil +} + +// testMessage implements proto.Message for testing. +type testMessage struct { + proto.Message + + // name is the name of the message, used by tests to compare messages. + name string +} + +func TestMultiEmitter(t *testing.T) { + // Create three testEmitters, tied together in a multiEmitter. + me := &multiEmitter{} + var emitters []*testEmitter + for i := 0; i < 3; i++ { + te := &testEmitter{} + emitters = append(emitters, te) + me.AddEmitter(te) + } + + // Emit three messages to multiEmitter. + names := []string{"foo", "bar", "baz"} + for _, name := range names { + m := testMessage{name: name} + if _, err := me.Emit(m); err != nil { + t.Fatal("me.Emit(%v) failed: %v", m, err) + } + } + + // All three emitters should have all three events. + for _, te := range emitters { + if got, want := len(te.events), len(names); got != want { + t.Fatalf("emitter got %d events, want %d", got, want) + } + for i, name := range names { + if got := te.events[i].(testMessage).name; got != name { + t.Errorf("emitter got message with name %q, want %q", got, name) + } + } + } + + // Close multiEmitter. + if err := me.Close(); err != nil { + t.Fatal("me.Close() failed: %v", err) + } + + // All testEmitters should be closed. + for _, te := range emitters { + if !te.closed { + t.Errorf("te.closed got false, want true") + } + } +} + +func TestRateLimitedEmitter(t *testing.T) { + // Create a RateLimittedEmitter that wraps a testEmitter. + te := &testEmitter{} + max := float64(5) // events per second + burst := 10 // events + rle := RateLimitedEmitterFrom(te, max, burst) + + // Send 50 messages in one shot. + for i := 0; i < 50; i++ { + if _, err := rle.Emit(testMessage{}); err != nil { + t.Fatalf("rle.Emit failed: %v", err) + } + } + + // We should have received only 10 messages. + if got, want := len(te.events), 10; got != want { + t.Errorf("got %d events, want %d", got, want) + } + + // Sleep for a second and then send another 50. + time.Sleep(1 * time.Second) + for i := 0; i < 50; i++ { + if _, err := rle.Emit(testMessage{}); err != nil { + t.Fatalf("rle.Emit failed: %v", err) + } + } + + // We should have at least 5 more message, plus maybe a few more if the + // test ran slowly. + got, wantAtLeast, wantAtMost := len(te.events), 15, 20 + if got < wantAtLeast { + t.Errorf("got %d events, want at least %d", got, wantAtLeast) + } + if got > wantAtMost { + t.Errorf("got %d events, want at most %d", got, wantAtMost) + } +} diff --git a/pkg/eventchannel/rate.go b/pkg/eventchannel/rate.go new file mode 100644 index 000000000..179226c92 --- /dev/null +++ b/pkg/eventchannel/rate.go @@ -0,0 +1,54 @@ +// Copyright 2019 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel + +import ( + "github.com/golang/protobuf/proto" + "golang.org/x/time/rate" +) + +// rateLimitedEmitter wraps an emitter and limits events to the given limits. +// Events that would exceed the limit are discarded. +type rateLimitedEmitter struct { + inner Emitter + limiter *rate.Limiter +} + +// RateLimitedEmitterFrom creates a new event channel emitter that wraps the +// existing emitter and enforces rate limits. The limits are imposed via a +// token bucket, with `maxRate` events per second, with burst size of `burst` +// events. See the golang.org/x/time/rate package and +// https://en.wikipedia.org/wiki/Token_bucket for more information about token +// buckets generally. +func RateLimitedEmitterFrom(inner Emitter, maxRate float64, burst int) Emitter { + return &rateLimitedEmitter{ + inner: inner, + limiter: rate.NewLimiter(rate.Limit(maxRate), burst), + } +} + +// Emit implements EventEmitter.Emit. +func (rle *rateLimitedEmitter) Emit(msg proto.Message) (bool, error) { + if !rle.limiter.Allow() { + // Drop event. + return false, nil + } + return rle.inner.Emit(msg) +} + +// Close implements EventEmitter.Close. +func (rle *rateLimitedEmitter) Close() error { + return rle.inner.Close() +} diff --git a/pkg/sentry/kernel/kernel.go b/pkg/sentry/kernel/kernel.go index 4c2d48e65..cf8bf3ecd 100644 --- a/pkg/sentry/kernel/kernel.go +++ b/pkg/sentry/kernel/kernel.go @@ -197,6 +197,11 @@ type Kernel struct { // caches. Not all caches use it, only the caches that use host resources use // the limiter. It may be nil if disabled. DirentCacheLimiter *fs.DirentCacheLimiter + + // unimplementedSyscallEmitter is used to emit unimplemented syscall + // events. This is initialized lazily on the first unimplemented + // syscall. + unimplementedSyscallEmitter eventchannel.Emitter `state:"nosave"` } // InitKernelArgs holds arguments to Init. @@ -290,7 +295,6 @@ func (k *Kernel) Init(args InitKernelArgs) error { k.monotonicClock = &timekeeperClock{tk: args.Timekeeper, c: sentrytime.Monotonic} k.futexes = futex.NewManager() k.netlinkPorts = port.New() - return nil } @@ -1168,16 +1172,6 @@ func (k *Kernel) SupervisorContext() context.Context { } } -// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event -// channel. -func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) { - t := TaskFromContext(ctx) - eventchannel.Emit(&uspb.UnimplementedSyscall{ - Tid: int32(t.ThreadID()), - Registers: t.Arch().StateData().Proto(), - }) -} - // SocketEntry represents a socket recorded in Kernel.sockets. It implements // refs.WeakRefUser for sockets stored in the socket table. // @@ -1272,3 +1266,23 @@ func (ctx supervisorContext) Value(key interface{}) interface{} { return nil } } + +// Rate limits for the number of unimplemented syscall evants. +const ( + unimplementedSyscallsMaxRate = 100 // events per second + unimplementedSyscallBurst = 1000 // events +) + +// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event +// channel. +func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) { + if k.unimplementedSyscallEmitter == nil { + k.unimplementedSyscallEmitter = eventchannel.RateLimitedEmitterFrom(eventchannel.DefaultEmitter, unimplementedSyscallsMaxRate, unimplementedSyscallBurst) + } + + t := TaskFromContext(ctx) + k.unimplementedSyscallEmitter.Emit(&uspb.UnimplementedSyscall{ + Tid: int32(t.ThreadID()), + Registers: t.Arch().StateData().Proto(), + }) +} diff --git a/pkg/sentry/syscalls/linux/linux64.go b/pkg/sentry/syscalls/linux/linux64.go index 51db2d8f7..ed996ba51 100644 --- a/pkg/sentry/syscalls/linux/linux64.go +++ b/pkg/sentry/syscalls/linux/linux64.go @@ -30,8 +30,7 @@ import ( const _AUDIT_ARCH_X86_64 = 0xc000003e // AMD64 is a table of Linux amd64 syscall API with the corresponding syscall -// numbers from Linux 4.4. The entries commented out are those syscalls we -// don't currently support. +// numbers from Linux 4.4. var AMD64 = &kernel.SyscallTable{ OS: abi.Linux, Arch: arch.AMD64, |