summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--WORKSPACE6
-rw-r--r--pkg/eventchannel/BUILD13
-rw-r--r--pkg/eventchannel/event.go64
-rw-r--r--pkg/eventchannel/event_test.go146
-rw-r--r--pkg/eventchannel/rate.go54
-rw-r--r--pkg/sentry/kernel/kernel.go36
-rw-r--r--pkg/sentry/syscalls/linux/linux64.go3
7 files changed, 294 insertions, 28 deletions
diff --git a/WORKSPACE b/WORKSPACE
index b57320e7a..11cc7ddf9 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -145,6 +145,12 @@ go_repository(
)
go_repository(
+ name = "org_golang_x_time",
+ commit = "9d24e82272b4f38b78bc8cff74fa936d31ccd8ef",
+ importpath = "golang.org/x/time",
+)
+
+go_repository(
name = "org_golang_x_tools",
commit = "aa82965741a9fecd12b026fbb3d3c6ed3231b8f8",
importpath = "golang.org/x/tools",
diff --git a/pkg/eventchannel/BUILD b/pkg/eventchannel/BUILD
index 4c336ea84..9961baaa9 100644
--- a/pkg/eventchannel/BUILD
+++ b/pkg/eventchannel/BUILD
@@ -1,4 +1,4 @@
-load("//tools/go_stateify:defs.bzl", "go_library")
+load("//tools/go_stateify:defs.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
package(licenses = ["notice"])
@@ -7,6 +7,7 @@ go_library(
name = "eventchannel",
srcs = [
"event.go",
+ "rate.go",
],
importpath = "gvisor.dev/gvisor/pkg/eventchannel",
visibility = ["//:sandbox"],
@@ -16,6 +17,7 @@ go_library(
"//pkg/unet",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_protobuf//ptypes:go_default_library_gen",
+ "@org_golang_x_time//rate:go_default_library",
],
)
@@ -30,3 +32,12 @@ go_proto_library(
proto = ":eventchannel_proto",
visibility = ["//:sandbox"],
)
+
+go_test(
+ name = "eventchannel_test",
+ srcs = ["event_test.go"],
+ embed = [":eventchannel"],
+ deps = [
+ "@com_github_golang_protobuf//proto:go_default_library",
+ ],
+)
diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go
index f6d26532b..d37ad0428 100644
--- a/pkg/eventchannel/event.go
+++ b/pkg/eventchannel/event.go
@@ -43,18 +43,36 @@ type Emitter interface {
Close() error
}
-var (
- mu sync.Mutex
- emitters = make(map[Emitter]struct{})
-)
+// DefaultEmitter is the default emitter. Calls to Emit and AddEmitter are sent
+// to this Emitter.
+var DefaultEmitter = &multiEmitter{}
-// Emit emits a message using all added emitters.
+// Emit is a helper method that calls DefaultEmitter.Emit.
func Emit(msg proto.Message) error {
- mu.Lock()
- defer mu.Unlock()
+ _, err := DefaultEmitter.Emit(msg)
+ return err
+}
+
+// AddEmitter is a helper method that calls DefaultEmitter.AddEmitter.
+func AddEmitter(e Emitter) {
+ DefaultEmitter.AddEmitter(e)
+}
+
+// multiEmitter is an Emitter that forwards messages to multiple Emitters.
+type multiEmitter struct {
+ // mu protects emitters.
+ mu sync.Mutex
+ // emitters is initialized lazily in AddEmitter.
+ emitters map[Emitter]struct{}
+}
+
+// Emit emits a message using all added emitters.
+func (me *multiEmitter) Emit(msg proto.Message) (bool, error) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
var err error
- for e := range emitters {
+ for e := range me.emitters {
hangup, eerr := e.Emit(msg)
if eerr != nil {
if err == nil {
@@ -68,18 +86,36 @@ func Emit(msg proto.Message) error {
}
if hangup {
log.Infof("Hangup on eventchannel emitter %v.", e)
- delete(emitters, e)
+ delete(me.emitters, e)
}
}
- return err
+ return false, err
}
// AddEmitter adds a new emitter.
-func AddEmitter(e Emitter) {
- mu.Lock()
- defer mu.Unlock()
- emitters[e] = struct{}{}
+func (me *multiEmitter) AddEmitter(e Emitter) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.emitters == nil {
+ me.emitters = make(map[Emitter]struct{})
+ }
+ me.emitters[e] = struct{}{}
+}
+
+// Close closes all emitters. If any Close call errors, it returns the first
+// one encountered.
+func (me *multiEmitter) Close() error {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ var err error
+ for e := range me.emitters {
+ if eerr := e.Close(); err == nil && eerr != nil {
+ err = eerr
+ }
+ delete(me.emitters, e)
+ }
+ return err
}
func marshal(msg proto.Message) ([]byte, error) {
diff --git a/pkg/eventchannel/event_test.go b/pkg/eventchannel/event_test.go
new file mode 100644
index 000000000..3649097d6
--- /dev/null
+++ b/pkg/eventchannel/event_test.go
@@ -0,0 +1,146 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package eventchannel
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+)
+
+// testEmitter is an emitter that can be used in tests. It records all events
+// emitted, and whether it has been closed.
+type testEmitter struct {
+ // mu protects all fields below.
+ mu sync.Mutex
+
+ // events contains all emitted events.
+ events []proto.Message
+
+ // closed records whether Close() was called.
+ closed bool
+}
+
+// Emit implements Emitter.Emit.
+func (te *testEmitter) Emit(msg proto.Message) (bool, error) {
+ te.mu.Lock()
+ defer te.mu.Unlock()
+ te.events = append(te.events, msg)
+ return false, nil
+}
+
+// Close implements Emitter.Close.
+func (te *testEmitter) Close() error {
+ te.mu.Lock()
+ defer te.mu.Unlock()
+ if te.closed {
+ return fmt.Errorf("closed called twice")
+ }
+ te.closed = true
+ return nil
+}
+
+// testMessage implements proto.Message for testing.
+type testMessage struct {
+ proto.Message
+
+ // name is the name of the message, used by tests to compare messages.
+ name string
+}
+
+func TestMultiEmitter(t *testing.T) {
+ // Create three testEmitters, tied together in a multiEmitter.
+ me := &multiEmitter{}
+ var emitters []*testEmitter
+ for i := 0; i < 3; i++ {
+ te := &testEmitter{}
+ emitters = append(emitters, te)
+ me.AddEmitter(te)
+ }
+
+ // Emit three messages to multiEmitter.
+ names := []string{"foo", "bar", "baz"}
+ for _, name := range names {
+ m := testMessage{name: name}
+ if _, err := me.Emit(m); err != nil {
+ t.Fatal("me.Emit(%v) failed: %v", m, err)
+ }
+ }
+
+ // All three emitters should have all three events.
+ for _, te := range emitters {
+ if got, want := len(te.events), len(names); got != want {
+ t.Fatalf("emitter got %d events, want %d", got, want)
+ }
+ for i, name := range names {
+ if got := te.events[i].(testMessage).name; got != name {
+ t.Errorf("emitter got message with name %q, want %q", got, name)
+ }
+ }
+ }
+
+ // Close multiEmitter.
+ if err := me.Close(); err != nil {
+ t.Fatal("me.Close() failed: %v", err)
+ }
+
+ // All testEmitters should be closed.
+ for _, te := range emitters {
+ if !te.closed {
+ t.Errorf("te.closed got false, want true")
+ }
+ }
+}
+
+func TestRateLimitedEmitter(t *testing.T) {
+ // Create a RateLimittedEmitter that wraps a testEmitter.
+ te := &testEmitter{}
+ max := float64(5) // events per second
+ burst := 10 // events
+ rle := RateLimitedEmitterFrom(te, max, burst)
+
+ // Send 50 messages in one shot.
+ for i := 0; i < 50; i++ {
+ if _, err := rle.Emit(testMessage{}); err != nil {
+ t.Fatalf("rle.Emit failed: %v", err)
+ }
+ }
+
+ // We should have received only 10 messages.
+ if got, want := len(te.events), 10; got != want {
+ t.Errorf("got %d events, want %d", got, want)
+ }
+
+ // Sleep for a second and then send another 50.
+ time.Sleep(1 * time.Second)
+ for i := 0; i < 50; i++ {
+ if _, err := rle.Emit(testMessage{}); err != nil {
+ t.Fatalf("rle.Emit failed: %v", err)
+ }
+ }
+
+ // We should have at least 5 more message, plus maybe a few more if the
+ // test ran slowly.
+ got, wantAtLeast, wantAtMost := len(te.events), 15, 20
+ if got < wantAtLeast {
+ t.Errorf("got %d events, want at least %d", got, wantAtLeast)
+ }
+ if got > wantAtMost {
+ t.Errorf("got %d events, want at most %d", got, wantAtMost)
+ }
+}
diff --git a/pkg/eventchannel/rate.go b/pkg/eventchannel/rate.go
new file mode 100644
index 000000000..179226c92
--- /dev/null
+++ b/pkg/eventchannel/rate.go
@@ -0,0 +1,54 @@
+// Copyright 2019 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package eventchannel
+
+import (
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/time/rate"
+)
+
+// rateLimitedEmitter wraps an emitter and limits events to the given limits.
+// Events that would exceed the limit are discarded.
+type rateLimitedEmitter struct {
+ inner Emitter
+ limiter *rate.Limiter
+}
+
+// RateLimitedEmitterFrom creates a new event channel emitter that wraps the
+// existing emitter and enforces rate limits. The limits are imposed via a
+// token bucket, with `maxRate` events per second, with burst size of `burst`
+// events. See the golang.org/x/time/rate package and
+// https://en.wikipedia.org/wiki/Token_bucket for more information about token
+// buckets generally.
+func RateLimitedEmitterFrom(inner Emitter, maxRate float64, burst int) Emitter {
+ return &rateLimitedEmitter{
+ inner: inner,
+ limiter: rate.NewLimiter(rate.Limit(maxRate), burst),
+ }
+}
+
+// Emit implements EventEmitter.Emit.
+func (rle *rateLimitedEmitter) Emit(msg proto.Message) (bool, error) {
+ if !rle.limiter.Allow() {
+ // Drop event.
+ return false, nil
+ }
+ return rle.inner.Emit(msg)
+}
+
+// Close implements EventEmitter.Close.
+func (rle *rateLimitedEmitter) Close() error {
+ return rle.inner.Close()
+}
diff --git a/pkg/sentry/kernel/kernel.go b/pkg/sentry/kernel/kernel.go
index 4c2d48e65..cf8bf3ecd 100644
--- a/pkg/sentry/kernel/kernel.go
+++ b/pkg/sentry/kernel/kernel.go
@@ -197,6 +197,11 @@ type Kernel struct {
// caches. Not all caches use it, only the caches that use host resources use
// the limiter. It may be nil if disabled.
DirentCacheLimiter *fs.DirentCacheLimiter
+
+ // unimplementedSyscallEmitter is used to emit unimplemented syscall
+ // events. This is initialized lazily on the first unimplemented
+ // syscall.
+ unimplementedSyscallEmitter eventchannel.Emitter `state:"nosave"`
}
// InitKernelArgs holds arguments to Init.
@@ -290,7 +295,6 @@ func (k *Kernel) Init(args InitKernelArgs) error {
k.monotonicClock = &timekeeperClock{tk: args.Timekeeper, c: sentrytime.Monotonic}
k.futexes = futex.NewManager()
k.netlinkPorts = port.New()
-
return nil
}
@@ -1168,16 +1172,6 @@ func (k *Kernel) SupervisorContext() context.Context {
}
}
-// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event
-// channel.
-func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) {
- t := TaskFromContext(ctx)
- eventchannel.Emit(&uspb.UnimplementedSyscall{
- Tid: int32(t.ThreadID()),
- Registers: t.Arch().StateData().Proto(),
- })
-}
-
// SocketEntry represents a socket recorded in Kernel.sockets. It implements
// refs.WeakRefUser for sockets stored in the socket table.
//
@@ -1272,3 +1266,23 @@ func (ctx supervisorContext) Value(key interface{}) interface{} {
return nil
}
}
+
+// Rate limits for the number of unimplemented syscall evants.
+const (
+ unimplementedSyscallsMaxRate = 100 // events per second
+ unimplementedSyscallBurst = 1000 // events
+)
+
+// EmitUnimplementedEvent emits an UnimplementedSyscall event via the event
+// channel.
+func (k *Kernel) EmitUnimplementedEvent(ctx context.Context) {
+ if k.unimplementedSyscallEmitter == nil {
+ k.unimplementedSyscallEmitter = eventchannel.RateLimitedEmitterFrom(eventchannel.DefaultEmitter, unimplementedSyscallsMaxRate, unimplementedSyscallBurst)
+ }
+
+ t := TaskFromContext(ctx)
+ k.unimplementedSyscallEmitter.Emit(&uspb.UnimplementedSyscall{
+ Tid: int32(t.ThreadID()),
+ Registers: t.Arch().StateData().Proto(),
+ })
+}
diff --git a/pkg/sentry/syscalls/linux/linux64.go b/pkg/sentry/syscalls/linux/linux64.go
index 51db2d8f7..ed996ba51 100644
--- a/pkg/sentry/syscalls/linux/linux64.go
+++ b/pkg/sentry/syscalls/linux/linux64.go
@@ -30,8 +30,7 @@ import (
const _AUDIT_ARCH_X86_64 = 0xc000003e
// AMD64 is a table of Linux amd64 syscall API with the corresponding syscall
-// numbers from Linux 4.4. The entries commented out are those syscalls we
-// don't currently support.
+// numbers from Linux 4.4.
var AMD64 = &kernel.SyscallTable{
OS: abi.Linux,
Arch: arch.AMD64,