diff options
author | Nicolas Lacasse <nlacasse@google.com> | 2019-07-29 17:11:27 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2019-07-29 17:12:50 -0700 |
commit | 5fdb945a0dc7a05329f97dc1ca193baf1b3448f3 (patch) | |
tree | d44ade1ae6e44aae25463e69e0cbb5c2c6d820de /pkg/eventchannel/event.go | |
parent | f0507e1db1574ff165000fa5e34b651413f37aae (diff) |
Rate limit the unimplemented syscall event handler.
This introduces two new types of Emitters:
1. MultiEmitter, which will forward events to other registered Emitters, and
2. RateLimitedEmitter, which will forward events to a wrapped Emitter, subject
to given rate limits.
The methods in the eventchannel package itself act like a multiEmitter, but is
not actually an Emitter. Now we have a DefaultEmitter, and the methods in
eventchannel simply forward calls to the DefaultEmitter.
The unimplemented syscall handler now uses a RateLimetedEmitter that wraps the
DefaultEmitter.
PiperOrigin-RevId: 260612770
Diffstat (limited to 'pkg/eventchannel/event.go')
-rw-r--r-- | pkg/eventchannel/event.go | 64 |
1 files changed, 50 insertions, 14 deletions
diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go index f6d26532b..d37ad0428 100644 --- a/pkg/eventchannel/event.go +++ b/pkg/eventchannel/event.go @@ -43,18 +43,36 @@ type Emitter interface { Close() error } -var ( - mu sync.Mutex - emitters = make(map[Emitter]struct{}) -) +// DefaultEmitter is the default emitter. Calls to Emit and AddEmitter are sent +// to this Emitter. +var DefaultEmitter = &multiEmitter{} -// Emit emits a message using all added emitters. +// Emit is a helper method that calls DefaultEmitter.Emit. func Emit(msg proto.Message) error { - mu.Lock() - defer mu.Unlock() + _, err := DefaultEmitter.Emit(msg) + return err +} + +// AddEmitter is a helper method that calls DefaultEmitter.AddEmitter. +func AddEmitter(e Emitter) { + DefaultEmitter.AddEmitter(e) +} + +// multiEmitter is an Emitter that forwards messages to multiple Emitters. +type multiEmitter struct { + // mu protects emitters. + mu sync.Mutex + // emitters is initialized lazily in AddEmitter. + emitters map[Emitter]struct{} +} + +// Emit emits a message using all added emitters. +func (me *multiEmitter) Emit(msg proto.Message) (bool, error) { + me.mu.Lock() + defer me.mu.Unlock() var err error - for e := range emitters { + for e := range me.emitters { hangup, eerr := e.Emit(msg) if eerr != nil { if err == nil { @@ -68,18 +86,36 @@ func Emit(msg proto.Message) error { } if hangup { log.Infof("Hangup on eventchannel emitter %v.", e) - delete(emitters, e) + delete(me.emitters, e) } } - return err + return false, err } // AddEmitter adds a new emitter. -func AddEmitter(e Emitter) { - mu.Lock() - defer mu.Unlock() - emitters[e] = struct{}{} +func (me *multiEmitter) AddEmitter(e Emitter) { + me.mu.Lock() + defer me.mu.Unlock() + if me.emitters == nil { + me.emitters = make(map[Emitter]struct{}) + } + me.emitters[e] = struct{}{} +} + +// Close closes all emitters. If any Close call errors, it returns the first +// one encountered. +func (me *multiEmitter) Close() error { + me.mu.Lock() + defer me.mu.Unlock() + var err error + for e := range me.emitters { + if eerr := e.Close(); err == nil && eerr != nil { + err = eerr + } + delete(me.emitters, e) + } + return err } func marshal(msg proto.Message) ([]byte, error) { |