1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipe
// Rx is the receive side of the shared memory ring buffer.
type Rx struct {
p pipe
tail uint64
head uint64
}
// Init initializes the receive end of the pipe. In the initial state, the next
// slot to be inspected is the very first one.
func (r *Rx) Init(b []byte) {
r.p.init(b)
r.tail = 0xfffffffe * jump
r.head = r.tail
}
// Pull reads the next buffer from the pipe, returning nil if there isn't one
// currently available.
//
// The returned slice is available until Flush() is next called. After that, it
// must not be touched.
func (r *Rx) Pull() []byte {
if r.head == r.tail+jump {
// We've already pulled the whole pipe.
return nil
}
header := r.p.readAtomic(r.head)
if header&slotFree != 0 {
// The next slot is free, we can't pull it yet.
return nil
}
payloadSize := header & slotSizeMask
newHead := r.head + payloadToSlotSize(payloadSize)
headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
// Check if this is a wrapping slot. If that's the case, it carries no
// data, so we just skip it and try again from the first slot.
if int64(newHead-headWrap) >= 0 {
if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
return nil
}
if r.tail == r.head {
// If this is the first pull since the last Flush()
// call, we flush the state so that the sender can use
// this space if it needs to.
r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
r.tail = newHead
}
r.head = newHead
return r.Pull()
}
// Grab the buffer before updating r.head.
b := r.p.data(r.head, payloadSize)
r.head = newHead
return b
}
// Flush tells the transmitter that all buffers pulled since the last Flush()
// have been used, so the transmitter is free to used their slots for further
// transmission.
func (r *Rx) Flush() {
if r.head == r.tail {
return
}
r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
r.tail = r.head
}
// Bytes returns the byte slice on which the pipe operates.
func (r *Rx) Bytes() []byte {
return r.p.buffer
}
|