summaryrefslogtreecommitdiffhomepage
path: root/pkg/eventchannel/event.go
diff options
context:
space:
mode:
authorNicolas Lacasse <nlacasse@google.com>2019-07-29 17:11:27 -0700
committergVisor bot <gvisor-bot@google.com>2019-07-29 17:12:50 -0700
commit5fdb945a0dc7a05329f97dc1ca193baf1b3448f3 (patch)
treed44ade1ae6e44aae25463e69e0cbb5c2c6d820de /pkg/eventchannel/event.go
parentf0507e1db1574ff165000fa5e34b651413f37aae (diff)
Rate limit the unimplemented syscall event handler.
This introduces two new types of Emitters: 1. MultiEmitter, which will forward events to other registered Emitters, and 2. RateLimitedEmitter, which will forward events to a wrapped Emitter, subject to given rate limits. The methods in the eventchannel package itself act like a multiEmitter, but is not actually an Emitter. Now we have a DefaultEmitter, and the methods in eventchannel simply forward calls to the DefaultEmitter. The unimplemented syscall handler now uses a RateLimetedEmitter that wraps the DefaultEmitter. PiperOrigin-RevId: 260612770
Diffstat (limited to 'pkg/eventchannel/event.go')
-rw-r--r--pkg/eventchannel/event.go64
1 files changed, 50 insertions, 14 deletions
diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go
index f6d26532b..d37ad0428 100644
--- a/pkg/eventchannel/event.go
+++ b/pkg/eventchannel/event.go
@@ -43,18 +43,36 @@ type Emitter interface {
Close() error
}
-var (
- mu sync.Mutex
- emitters = make(map[Emitter]struct{})
-)
+// DefaultEmitter is the default emitter. Calls to Emit and AddEmitter are sent
+// to this Emitter.
+var DefaultEmitter = &multiEmitter{}
-// Emit emits a message using all added emitters.
+// Emit is a helper method that calls DefaultEmitter.Emit.
func Emit(msg proto.Message) error {
- mu.Lock()
- defer mu.Unlock()
+ _, err := DefaultEmitter.Emit(msg)
+ return err
+}
+
+// AddEmitter is a helper method that calls DefaultEmitter.AddEmitter.
+func AddEmitter(e Emitter) {
+ DefaultEmitter.AddEmitter(e)
+}
+
+// multiEmitter is an Emitter that forwards messages to multiple Emitters.
+type multiEmitter struct {
+ // mu protects emitters.
+ mu sync.Mutex
+ // emitters is initialized lazily in AddEmitter.
+ emitters map[Emitter]struct{}
+}
+
+// Emit emits a message using all added emitters.
+func (me *multiEmitter) Emit(msg proto.Message) (bool, error) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
var err error
- for e := range emitters {
+ for e := range me.emitters {
hangup, eerr := e.Emit(msg)
if eerr != nil {
if err == nil {
@@ -68,18 +86,36 @@ func Emit(msg proto.Message) error {
}
if hangup {
log.Infof("Hangup on eventchannel emitter %v.", e)
- delete(emitters, e)
+ delete(me.emitters, e)
}
}
- return err
+ return false, err
}
// AddEmitter adds a new emitter.
-func AddEmitter(e Emitter) {
- mu.Lock()
- defer mu.Unlock()
- emitters[e] = struct{}{}
+func (me *multiEmitter) AddEmitter(e Emitter) {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.emitters == nil {
+ me.emitters = make(map[Emitter]struct{})
+ }
+ me.emitters[e] = struct{}{}
+}
+
+// Close closes all emitters. If any Close call errors, it returns the first
+// one encountered.
+func (me *multiEmitter) Close() error {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ var err error
+ for e := range me.emitters {
+ if eerr := e.Close(); err == nil && eerr != nil {
+ err = eerr
+ }
+ delete(me.emitters, e)
+ }
+ return err
}
func marshal(msg proto.Message) ([]byte, error) {