summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/pipe/rx.go
blob: 8d641c76f44791450225584e0ca6711f6f72d771 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipe

// Rx is the receive side of the shared memory ring buffer.
type Rx struct {
	p pipe

	tail uint64
	head uint64
}

// Init initializes the receive end of the pipe. In the initial state, the next
// slot to be inspected is the very first one.
func (r *Rx) Init(b []byte) {
	r.p.init(b)
	r.tail = 0xfffffffe * jump
	r.head = r.tail
}

// Pull reads the next buffer from the pipe, returning nil if there isn't one
// currently available.
//
// The returned slice is available until Flush() is next called. After that, it
// must not be touched.
func (r *Rx) Pull() []byte {
	if r.head == r.tail+jump {
		// We've already pulled the whole pipe.
		return nil
	}

	header := r.p.readAtomic(r.head)
	if header&slotFree != 0 {
		// The next slot is free, we can't pull it yet.
		return nil
	}

	payloadSize := header & slotSizeMask
	newHead := r.head + payloadToSlotSize(payloadSize)
	headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))

	// Check if this is a wrapping slot. If that's the case, it carries no
	// data, so we just skip it and try again from the first slot.
	if int64(newHead-headWrap) >= 0 {
		if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
			return nil
		}

		if r.tail == r.head {
			// If this is the first pull since the last Flush()
			// call, we flush the state so that the sender can use
			// this space if it needs to.
			r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
			r.tail = newHead
		}

		r.head = newHead
		return r.Pull()
	}

	// Grab the buffer before updating r.head.
	b := r.p.data(r.head, payloadSize)
	r.head = newHead
	return b
}

// Flush tells the transmitter that all buffers pulled since the last Flush()
// have been used, so the transmitter is free to used their slots for further
// transmission.
func (r *Rx) Flush() {
	if r.head == r.tail {
		return
	}
	r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
	r.tail = r.head
}

// Bytes returns the byte slice on which the pipe operates.
func (r *Rx) Bytes() []byte {
	return r.p.buffer
}