summaryrefslogtreecommitdiffhomepage
path: root/pkg/waiter/waiter.go
blob: 832b6a5a9120d670abb940572732d3d2e069538c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright 2018 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package waiter provides the implementation of a wait queue, where waiters can
// be enqueued to be notified when an event of interest happens.
//
// Becoming readable and/or writable are examples of events. Waiters are
// expected to use a pattern similar to this to make a blocking function out of
// a non-blocking one:
//
//	func (o *object) blockingRead(...) error {
//		err := o.nonBlockingRead(...)
//		if err != ErrAgain {
//			// Completed with no need to wait!
//			return err
//		}
//
//		e := createOrGetWaiterEntry(...)
//		o.EventRegister(&e, waiter.EventIn)
//		defer o.EventUnregister(&e)
//
//		// We need to try to read again after registration because the
//		// object may have become readable between the last attempt to
//		// read and read registration.
//		err = o.nonBlockingRead(...)
//		for err == ErrAgain {
//			wait()
//			err = o.nonBlockingRead(...)
//		}
//
//		return err
//	}
//
// Another goroutine needs to notify waiters when events happen. For example:
//
//	func (o *object) Write(...) ... {
//		// Do write work.
//		[...]
//
//		if oldDataAvailableSize == 0 && dataAvailableSize > 0 {
//			// If no data was available and now some data is
//			// available, the object became readable, so notify
//			// potential waiters about this.
//			o.Notify(waiter.EventIn)
//		}
//	}
package waiter

import (
	"sync"

	"gvisor.googlesource.com/gvisor/pkg/ilist"
)

// EventMask represents io events as used in the poll() syscall.
type EventMask uint16

// Events that waiters can wait on. The meaning is the same as those in the
// poll() syscall.
const (
	EventIn   EventMask = 0x01 // syscall.EPOLLIN
	EventPri  EventMask = 0x02 // syscall.EPOLLPRI
	EventOut  EventMask = 0x04 // syscall.EPOLLOUT
	EventErr  EventMask = 0x08 // syscall.EPOLLERR
	EventHUp  EventMask = 0x10 // syscall.EPOLLHUP
	EventNVal EventMask = 0x20 // Not defined in syscall.
)

// Waitable contains the methods that need to be implemented by waitable
// objects.
type Waitable interface {
	// Readiness returns what the object is currently ready for. If it's
	// not ready for a desired purpose, the caller may use EventRegister and
	// EventUnregister to get notifications once the object becomes ready.
	//
	// Implementations should allow for events like EventHUp and EventErr
	// to be returned regardless of whether they are in the input EventMask.
	Readiness(mask EventMask) EventMask

	// EventRegister registers the given waiter entry to receive
	// notifications when an event occurs that makes the object ready for
	// at least one of the events in mask.
	EventRegister(e *Entry, mask EventMask)

	// EventUnregister unregisters a waiter entry previously registered with
	// EventRegister().
	EventUnregister(e *Entry)
}

// EntryCallback provides a notify callback.
type EntryCallback interface {
	// Callback is the function to be called when the waiter entry is
	// notified. It is responsible for doing whatever is needed to wake up
	// the waiter.
	//
	// The callback is supposed to perform minimal work, and cannot call
	// any method on the queue itself because it will be locked while the
	// callback is running.
	Callback(e *Entry)
}

// Entry represents a waiter that can be add to the a wait queue. It can
// only be in one queue at a time, and is added "intrusively" to the queue with
// no extra memory allocations.
//
// +stateify savable
type Entry struct {
	// Context stores any state the waiter may wish to store in the entry
	// itself, which may be used at wake up time.
	//
	// Note that use of this field is optional and state may alternatively be
	// stored in the callback itself.
	Context interface{}

	Callback EntryCallback

	// The following fields are protected by the queue lock.
	mask EventMask
	ilist.Entry
}

type channelCallback struct{}

// Callback implements EntryCallback.Callback.
func (*channelCallback) Callback(e *Entry) {
	ch := e.Context.(chan struct{})
	select {
	case ch <- struct{}{}:
	default:
	}
}

// NewChannelEntry initializes a new Entry that does a non-blocking write to a
// struct{} channel when the callback is called. It returns the new Entry
// instance and the channel being used.
//
// If a channel isn't specified (i.e., if "c" is nil), then NewChannelEntry
// allocates a new channel.
func NewChannelEntry(c chan struct{}) (Entry, chan struct{}) {
	if c == nil {
		c = make(chan struct{}, 1)
	}

	return Entry{Context: c, Callback: &channelCallback{}}, c
}

// Queue represents the wait queue where waiters can be added and
// notifiers can notify them when events happen.
//
// The zero value for waiter.Queue is an empty queue ready for use.
//
// +stateify savable
type Queue struct {
	list ilist.List   `state:"zerovalue"`
	mu   sync.RWMutex `state:"nosave"`
}

// EventRegister adds a waiter to the wait queue; the waiter will be notified
// when at least one of the events specified in mask happens.
func (q *Queue) EventRegister(e *Entry, mask EventMask) {
	q.mu.Lock()
	e.mask = mask
	q.list.PushBack(e)
	q.mu.Unlock()
}

// EventUnregister removes the given waiter entry from the wait queue.
func (q *Queue) EventUnregister(e *Entry) {
	q.mu.Lock()
	q.list.Remove(e)
	q.mu.Unlock()
}

// Notify notifies all waiters in the queue whose masks have at least one bit
// in common with the notification mask.
func (q *Queue) Notify(mask EventMask) {
	q.mu.RLock()
	for it := q.list.Front(); it != nil; it = it.Next() {
		e := it.(*Entry)
		if mask&e.mask != 0 {
			e.Callback.Callback(e)
		}
	}
	q.mu.RUnlock()
}

// Events returns the set of events being waited on. It is the union of the
// masks of all registered entries.
func (q *Queue) Events() EventMask {
	ret := EventMask(0)

	q.mu.RLock()
	for it := q.list.Front(); it != nil; it = it.Next() {
		e := it.(*Entry)
		ret |= e.mask
	}
	q.mu.RUnlock()

	return ret
}

// IsEmpty returns if the wait queue is empty or not.
func (q *Queue) IsEmpty() bool {
	q.mu.Lock()
	defer q.mu.Unlock()

	return q.list.Front() == nil
}

// AlwaysReady implements the Waitable interface but is always ready. Embedding
// this struct into another struct makes it implement the boilerplate empty
// functions automatically.
type AlwaysReady struct {
}

// Readiness always returns the input mask because this object is always ready.
func (*AlwaysReady) Readiness(mask EventMask) EventMask {
	return mask
}

// EventRegister doesn't do anything because this object doesn't need to issue
// notifications because its readiness never changes.
func (*AlwaysReady) EventRegister(*Entry, EventMask) {
}

// EventUnregister doesn't do anything because this object doesn't need to issue
// notifications because its readiness never changes.
func (*AlwaysReady) EventUnregister(e *Entry) {
}