summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/pipe/tx.go
blob: 9841eb23116c16150ce840b73b8f89d79326cd2a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipe

// Tx is the transmit side of the shared memory ring buffer.
type Tx struct {
	p              pipe
	maxPayloadSize uint64

	head uint64
	tail uint64
	next uint64

	tailHeader uint64
}

// Init initializes the transmit end of the pipe. In the initial state, the next
// slot to be written is the very first one, and the transmitter has the whole
// ring buffer available to it.
func (t *Tx) Init(b []byte) {
	t.p.init(b)
	// maxPayloadSize excludes the header of the payload, and the header
	// of the wrapping message.
	t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
	t.tail = 0xfffffffe * jump
	t.next = t.tail
	t.head = t.tail + jump
	t.p.write(t.tail, slotFree)
}

// Capacity determines how many records of the given size can be written to the
// pipe before it fills up.
func (t *Tx) Capacity(recordSize uint64) uint64 {
	available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
	entryLen := payloadToSlotSize(recordSize)
	return available / entryLen
}

// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
// populates the returned slice with the data to be transferred and enventually
// calls Flush() to make the data visible to the reader, or Abort() to make the
// pipe forget all Push() calls since the last Flush().
//
// The returned slice is available until Flush() or Abort() is next called.
// After that, it must not be touched.
func (t *Tx) Push(payloadSize uint64) []byte {
	// Fail request if we know we will never have enough room.
	if payloadSize > t.maxPayloadSize {
		return nil
	}

	totalLen := payloadToSlotSize(payloadSize)
	newNext := t.next + totalLen
	nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
	if int64(newNext-nextWrap) >= 0 {
		// The new buffer would overflow the pipe, so we push a wrapping
		// slot, then try to add the actual slot to the front of the
		// pipe.
		newNext = (newNext & revolutionMask) + jump
		wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
		if !t.reclaim(newNext) {
			return nil
		}

		oldNext := t.next
		t.next = newNext
		if oldNext != t.tail {
			t.p.write(oldNext, wrappingPayloadSize)
		} else {
			t.tailHeader = wrappingPayloadSize
			t.Flush()
		}

		newNext += totalLen
	}

	// Check that we have enough room for the buffer.
	if !t.reclaim(newNext) {
		return nil
	}

	if t.next != t.tail {
		t.p.write(t.next, payloadSize)
	} else {
		t.tailHeader = payloadSize
	}

	// Grab the buffer before updating t.next.
	b := t.p.data(t.next, payloadSize)
	t.next = newNext

	return b
}

// reclaim attempts to advance the head until at least newNext. If the head is
// already at or beyond newNext, nothing happens and true is returned; otherwise
// it tries to reclaim slots that have already been consumed by the receive end
// of the pipe (they will be marked as free) and returns a boolean indicating
// whether it was successful in reclaiming enough slots.
func (t *Tx) reclaim(newNext uint64) bool {
	for int64(newNext-t.head) > 0 {
		// Can't reclaim if slot is not free.
		header := t.p.readAtomic(t.head)
		if header&slotFree == 0 {
			return false
		}

		payloadSize := header & slotSizeMask
		newHead := t.head + payloadToSlotSize(payloadSize)

		// Check newHead is within bounds and valid.
		if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
			return false
		}

		t.head = newHead
	}

	return true
}

// Abort causes all Push() calls since the last Flush() to be forgotten and
// therefore they will not be made visible to the receiver.
func (t *Tx) Abort() {
	t.next = t.tail
}

// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
// is the most recent] to be made visible to the receiver.
func (t *Tx) Flush() {
	if t.next == t.tail {
		// Nothing to do if there are no pushed buffers.
		return
	}

	if t.next != t.head {
		// The receiver will spin in t.next, so we must make sure that
		// the slotFree bit is set.
		t.p.write(t.next, slotFree)
	}

	t.p.writeAtomic(t.tail, t.tailHeader)
	t.tail = t.next
}

// Bytes returns the byte slice on which the pipe operates.
func (t *Tx) Bytes() []byte {
	return t.p.buffer
}