summaryrefslogtreecommitdiffhomepage
path: root/pkg/waiter
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/waiter')
-rw-r--r--pkg/waiter/waiter.go11
-rw-r--r--pkg/waiter/waiter_test.go20
2 files changed, 20 insertions, 11 deletions
diff --git a/pkg/waiter/waiter.go b/pkg/waiter/waiter.go
index 08519d986..83d4f893a 100644
--- a/pkg/waiter/waiter.go
+++ b/pkg/waiter/waiter.go
@@ -119,7 +119,10 @@ type EntryCallback interface {
// The callback is supposed to perform minimal work, and cannot call
// any method on the queue itself because it will be locked while the
// callback is running.
- Callback(e *Entry)
+ //
+ // The mask indicates the events that occurred and that the entry is
+ // interested in.
+ Callback(e *Entry, mask EventMask)
}
// Entry represents a waiter that can be add to the a wait queue. It can
@@ -140,7 +143,7 @@ type channelCallback struct {
}
// Callback implements EntryCallback.Callback.
-func (c *channelCallback) Callback(*Entry) {
+func (c *channelCallback) Callback(*Entry, EventMask) {
select {
case c.ch <- struct{}{}:
default:
@@ -193,8 +196,8 @@ func (q *Queue) EventUnregister(e *Entry) {
func (q *Queue) Notify(mask EventMask) {
q.mu.RLock()
for e := q.list.Front(); e != nil; e = e.Next() {
- if mask&e.mask != 0 {
- e.Callback.Callback(e)
+ if m := mask & e.mask; m != 0 {
+ e.Callback.Callback(e, m)
}
}
q.mu.RUnlock()
diff --git a/pkg/waiter/waiter_test.go b/pkg/waiter/waiter_test.go
index c1b94a4f3..6928f28b4 100644
--- a/pkg/waiter/waiter_test.go
+++ b/pkg/waiter/waiter_test.go
@@ -20,12 +20,12 @@ import (
)
type callbackStub struct {
- f func(e *Entry)
+ f func(e *Entry, m EventMask)
}
// Callback implements EntryCallback.Callback.
-func (c *callbackStub) Callback(e *Entry) {
- c.f(e)
+func (c *callbackStub) Callback(e *Entry, m EventMask) {
+ c.f(e, m)
}
func TestEmptyQueue(t *testing.T) {
@@ -36,7 +36,7 @@ func TestEmptyQueue(t *testing.T) {
// Register then unregister a waiter, then notify the queue.
cnt := 0
- e := Entry{Callback: &callbackStub{func(*Entry) { cnt++ }}}
+ e := Entry{Callback: &callbackStub{func(*Entry, EventMask) { cnt++ }}}
q.EventRegister(&e, EventIn)
q.EventUnregister(&e)
q.Notify(EventIn)
@@ -49,7 +49,7 @@ func TestMask(t *testing.T) {
// Register a waiter.
var q Queue
var cnt int
- e := Entry{Callback: &callbackStub{func(*Entry) { cnt++ }}}
+ e := Entry{Callback: &callbackStub{func(*Entry, EventMask) { cnt++ }}}
q.EventRegister(&e, EventIn|EventErr)
// Notify with an overlapping mask.
@@ -101,11 +101,14 @@ func TestConcurrentRegistration(t *testing.T) {
for i := 0; i < concurrency; i++ {
go func() {
var e Entry
- e.Callback = &callbackStub{func(entry *Entry) {
+ e.Callback = &callbackStub{func(entry *Entry, mask EventMask) {
cnt++
if entry != &e {
t.Errorf("entry = %p, want %p", entry, &e)
}
+ if mask != EventIn {
+ t.Errorf("mask = %#x want %#x", mask, EventIn)
+ }
}}
// Wait for notification, then register.
@@ -158,11 +161,14 @@ func TestConcurrentNotification(t *testing.T) {
// Register waiters.
for i := 0; i < waiterCount; i++ {
var e Entry
- e.Callback = &callbackStub{func(entry *Entry) {
+ e.Callback = &callbackStub{func(entry *Entry, mask EventMask) {
atomic.AddInt32(&cnt, 1)
if entry != &e {
t.Errorf("entry = %p, want %p", entry, &e)
}
+ if mask != EventIn {
+ t.Errorf("mask = %#x want %#x", mask, EventIn)
+ }
}}
q.EventRegister(&e, EventIn|EventErr)