summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/link/sharedmem/sharedmem_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/link/sharedmem/sharedmem_test.go')
-rw-r--r--pkg/tcpip/link/sharedmem/sharedmem_test.go776
1 files changed, 0 insertions, 776 deletions
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go
deleted file mode 100644
index 98036f367..000000000
--- a/pkg/tcpip/link/sharedmem/sharedmem_test.go
+++ /dev/null
@@ -1,776 +0,0 @@
-// Copyright 2018 The gVisor Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// +build linux
-
-package sharedmem
-
-import (
- "bytes"
- "io/ioutil"
- "math/rand"
- "os"
- "strings"
- "sync"
- "syscall"
- "testing"
- "time"
-
- "gvisor.dev/gvisor/pkg/tcpip"
- "gvisor.dev/gvisor/pkg/tcpip/buffer"
- "gvisor.dev/gvisor/pkg/tcpip/header"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe"
- "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue"
- "gvisor.dev/gvisor/pkg/tcpip/stack"
-)
-
-const (
- localLinkAddr = "\xde\xad\xbe\xef\x56\x78"
- remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34"
-
- queueDataSize = 1024 * 1024
- queuePipeSize = 4096
-)
-
-type queueBuffers struct {
- data []byte
- rx pipe.Tx
- tx pipe.Rx
-}
-
-func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) {
- // Prepare tx pipe.
- b, err := getBuffer(c.TxPipeFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
- q.tx.Init(b)
-
- // Prepare rx pipe.
- b, err = getBuffer(c.RxPipeFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
- q.rx.Init(b)
-
- // Get data slice.
- q.data, err = getBuffer(c.DataFD)
- if err != nil {
- t.Fatalf("getBuffer failed: %v", err)
- }
-}
-
-func (q *queueBuffers) cleanup() {
- syscall.Munmap(q.tx.Bytes())
- syscall.Munmap(q.rx.Bytes())
- syscall.Munmap(q.data)
-}
-
-type packetInfo struct {
- addr tcpip.LinkAddress
- proto tcpip.NetworkProtocolNumber
- vv buffer.VectorisedView
-}
-
-type testContext struct {
- t *testing.T
- ep *endpoint
- txCfg QueueConfig
- rxCfg QueueConfig
- txq queueBuffers
- rxq queueBuffers
-
- packetCh chan struct{}
- mu sync.Mutex
- packets []packetInfo
-}
-
-func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext {
- var err error
- c := &testContext{
- t: t,
- packetCh: make(chan struct{}, 1000000),
- }
- c.txCfg = createQueueFDs(t, queueSizes{
- dataSize: queueDataSize,
- txPipeSize: queuePipeSize,
- rxPipeSize: queuePipeSize,
- sharedDataSize: 4096,
- })
-
- c.rxCfg = createQueueFDs(t, queueSizes{
- dataSize: queueDataSize,
- txPipeSize: queuePipeSize,
- rxPipeSize: queuePipeSize,
- sharedDataSize: 4096,
- })
-
- initQueue(t, &c.txq, &c.txCfg)
- initQueue(t, &c.rxq, &c.rxCfg)
-
- id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg)
- if err != nil {
- t.Fatalf("New failed: %v", err)
- }
-
- c.ep = stack.FindLinkEndpoint(id).(*endpoint)
- c.ep.Attach(c)
-
- return c
-}
-
-func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteLinkAddr, localLinkAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) {
- c.mu.Lock()
- c.packets = append(c.packets, packetInfo{
- addr: remoteLinkAddr,
- proto: proto,
- vv: vv.Clone(nil),
- })
- c.mu.Unlock()
-
- c.packetCh <- struct{}{}
-}
-
-func (c *testContext) cleanup() {
- c.ep.Close()
- closeFDs(&c.txCfg)
- closeFDs(&c.rxCfg)
- c.txq.cleanup()
- c.rxq.cleanup()
-}
-
-func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) {
- for i := 0; i < n; i++ {
- select {
- case <-c.packetCh:
- case <-to:
- c.t.Fatalf(errorStr)
- }
- }
-}
-
-func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) {
- b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs)))
- queue.EncodeRxCompletion(b, size, 0)
- for i := range bs {
- queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{
- Offset: bs[i].Offset,
- Size: bs[i].Size,
- ID: bs[i].ID,
- })
- }
-}
-
-func randomFill(b []byte) {
- for i := range b {
- b[i] = byte(rand.Intn(256))
- }
-}
-
-func shuffle(b []int) {
- for i := len(b) - 1; i >= 0; i-- {
- j := rand.Intn(i + 1)
- b[i], b[j] = b[j], b[i]
- }
-}
-
-func createFile(t *testing.T, size int64, initQueue bool) int {
- tmpDir := os.Getenv("TEST_TMPDIR")
- if tmpDir == "" {
- tmpDir = os.Getenv("TMPDIR")
- }
- f, err := ioutil.TempFile(tmpDir, "sharedmem_test")
- if err != nil {
- t.Fatalf("TempFile failed: %v", err)
- }
- defer f.Close()
- syscall.Unlink(f.Name())
-
- if initQueue {
- // Write the "slot-free" flag in the initial queue.
- _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0)
- if err != nil {
- t.Fatalf("WriteAt failed: %v", err)
- }
- }
-
- fd, err := syscall.Dup(int(f.Fd()))
- if err != nil {
- t.Fatalf("Dup failed: %v", err)
- }
-
- if err := syscall.Ftruncate(fd, size); err != nil {
- syscall.Close(fd)
- t.Fatalf("Ftruncate failed: %v", err)
- }
-
- return fd
-}
-
-func closeFDs(c *QueueConfig) {
- syscall.Close(c.DataFD)
- syscall.Close(c.EventFD)
- syscall.Close(c.TxPipeFD)
- syscall.Close(c.RxPipeFD)
- syscall.Close(c.SharedDataFD)
-}
-
-type queueSizes struct {
- dataSize int64
- txPipeSize int64
- rxPipeSize int64
- sharedDataSize int64
-}
-
-func createQueueFDs(t *testing.T, s queueSizes) QueueConfig {
- fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0)
- if err != 0 {
- t.Fatalf("eventfd failed: %v", error(err))
- }
-
- return QueueConfig{
- EventFD: int(fd),
- DataFD: createFile(t, s.dataSize, false),
- TxPipeFD: createFile(t, s.txPipeSize, true),
- RxPipeFD: createFile(t, s.rxPipeSize, true),
- SharedDataFD: createFile(t, s.sharedDataSize, false),
- }
-}
-
-// TestSimpleSend sends 1000 packets with random header and payload sizes,
-// then checks that the right payload is received on the shared memory queues.
-func TestSimpleSend(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Prepare route.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- for iters := 1000; iters > 0; iters-- {
- func() {
- // Prepare and send packet.
- n := rand.Intn(10000)
- hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength()))
- hdrBuf := hdr.Prepend(n)
- randomFill(hdrBuf)
-
- n = rand.Intn(10000)
- buf := buffer.NewView(n)
- randomFill(buf)
-
- proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), proto); err != nil {
- t.Fatalf("WritePacket failed: %v", err)
- }
-
- // Receive packet.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if pi.Reserved != 0 {
- t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
- }
- contents := make([]byte, 0, pi.Size)
- for i := 0; i < pi.BufferCount; i++ {
- bi := queue.DecodeTxBufferHeader(desc, i)
- contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
- }
- c.txq.tx.Flush()
-
- defer func() {
- // Tell the endpoint about the completion of the write.
- b := c.txq.rx.Push(8)
- queue.EncodeTxCompletion(b, pi.ID)
- c.txq.rx.Flush()
- }()
-
- // Check the ethernet header.
- ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
- ethTemplate.Encode(&header.EthernetFields{
- SrcAddr: localLinkAddr,
- DstAddr: remoteLinkAddr,
- Type: proto,
- })
- if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
- t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
- }
-
- // Compare contents skipping the ethernet header added by the
- // endpoint.
- merged := append(hdrBuf, buf...)
- if uint32(len(contents)) < pi.Size {
- t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size)
- }
- contents = contents[:pi.Size][header.EthernetMinimumSize:]
-
- if !bytes.Equal(contents, merged) {
- t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged))
- }
- }()
- }
-}
-
-// TestPreserveSrcAddressInSend calls WritePacket once with LocalLinkAddress
-// set in Route (using much of the same code as TestSimpleSend), then checks
-// that the encoded ethernet header received includes the correct SrcAddr.
-func TestPreserveSrcAddressInSend(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- newLocalLinkAddress := tcpip.LinkAddress(strings.Repeat("0xFE", 6))
- // Set both remote and local link address in route.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- LocalLinkAddress: newLocalLinkAddress,
- }
-
- // WritePacket panics given a prependable with anything less than
- // the minimum size of the ethernet header.
- hdr := buffer.NewPrependable(header.EthernetMinimumSize)
-
- proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buffer.VectorisedView{}, proto); err != nil {
- t.Fatalf("WritePacket failed: %v", err)
- }
-
- // Receive packet.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if pi.Reserved != 0 {
- t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved)
- }
- contents := make([]byte, 0, pi.Size)
- for i := 0; i < pi.BufferCount; i++ {
- bi := queue.DecodeTxBufferHeader(desc, i)
- contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...)
- }
- c.txq.tx.Flush()
-
- defer func() {
- // Tell the endpoint about the completion of the write.
- b := c.txq.rx.Push(8)
- queue.EncodeTxCompletion(b, pi.ID)
- c.txq.rx.Flush()
- }()
-
- // Check that the ethernet header contains the expected SrcAddr.
- ethTemplate := make(header.Ethernet, header.EthernetMinimumSize)
- ethTemplate.Encode(&header.EthernetFields{
- SrcAddr: newLocalLinkAddress,
- DstAddr: remoteLinkAddr,
- Type: proto,
- })
- if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) {
- t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate)
- }
-}
-
-// TestFillTxQueue sends packets until the queue is full.
-func TestFillTxQueue(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses no more than 40 bytes, so write that many packets
- // until the tx queue if full.
- ids := make(map[uint64]struct{})
- for i := queuePipeSize / 40; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
-
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets
-// until the queue is full.
-func TestFillTxQueueAfterBadCompletion(t *testing.T) {
- c := newTestContext(t, 20000, 1500, localLinkAddr)
- defer c.cleanup()
-
- // Send a bad completion.
- queue.EncodeTxCompletion(c.txq.rx.Push(8), 1)
- c.txq.rx.Flush()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Send two packets so that the id slice has at least two slots.
- for i := 2; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
- }
-
- // Complete the two writes twice.
- for i := 2; i > 0; i-- {
- pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull())
-
- queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
- queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID)
- c.txq.rx.Flush()
- }
- c.txq.tx.Flush()
-
- // Each packet is uses no more than 40 bytes, so write that many packets
- // until the tx queue if full.
- ids := make(map[uint64]struct{})
- for i := queuePipeSize / 40; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxMemory sends packets until the we run out of shared memory.
-func TestFillTxMemory(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses up one buffer, so write as many as possible until
- // we fill the memory.
- ids := make(map[uint64]struct{})
- for i := queueDataSize / bufferSize; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Check that they have different IDs.
- desc := c.txq.tx.Pull()
- pi := queue.DecodeTxPacketHeader(desc)
- if _, ok := ids[pi.ID]; ok {
- t.Fatalf("ID (%v) reused", pi.ID)
- }
- ids[pi.ID] = struct{}{}
- c.txq.tx.Flush()
- }
-
- // Next attempt to write must fail.
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber)
- if want := tcpip.ErrWouldBlock; err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
-}
-
-// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of
-// shared memory for a 2-buffer packet, but still with room for a 1-buffer
-// packet.
-func TestFillTxMemoryWithMultiBuffer(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Prepare to send a packet.
- r := stack.Route{
- RemoteLinkAddress: remoteLinkAddr,
- }
-
- buf := buffer.NewView(100)
-
- // Each packet is uses up one buffer, so write as many as possible
- // until there is only one buffer left.
- for i := queueDataSize/bufferSize - 1; i > 0; i-- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
-
- // Pull the posted buffer.
- c.txq.tx.Pull()
- c.txq.tx.Flush()
- }
-
- // Attempt to write a two-buffer packet. It must fail.
- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- uu := buffer.NewView(bufferSize).ToVectorisedView()
- if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, uu, header.IPv4ProtocolNumber); err != want {
- t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want)
- }
- }
-
- // Attempt to write the one-buffer packet again. It must succeed.
- {
- hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()))
- if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil {
- t.Fatalf("WritePacket failed unexpectedly: %v", err)
- }
- }
-}
-
-func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte {
- t.Helper()
-
- for {
- b := p.Pull()
- if b != nil {
- return b
- }
-
- select {
- case <-time.After(10 * time.Millisecond):
- case <-to:
- t.Fatal(errStr)
- }
- }
-}
-
-// TestSimpleReceive completes 1000 different receives with random payload and
-// random number of buffers. It checks that the contents match the expected
-// values.
-func TestSimpleReceive(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Check that buffers have been posted.
- limit := c.ep.rx.q.PostedBuffersLimit()
- for i := uint64(0); i < limit; i++ {
- timeout := time.After(2 * time.Second)
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted"))
-
- if want := i * bufferSize; want != bi.Offset {
- t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want)
- }
-
- if want := i; want != bi.ID {
- t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want)
- }
-
- if bufferSize != bi.Size {
- t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize)
- }
- }
- c.rxq.tx.Flush()
-
- // Create a slice with the indices 0..limit-1.
- idx := make([]int, limit)
- for i := range idx {
- idx[i] = i
- }
-
- // Complete random packets 1000 times.
- for iters := 1000; iters > 0; iters-- {
- timeout := time.After(2 * time.Second)
- // Prepare a random packet.
- shuffle(idx)
- n := 1 + rand.Intn(10)
- bufs := make([]queue.RxBuffer, n)
- contents := make([]byte, bufferSize*n-rand.Intn(500))
- randomFill(contents)
- for i := range bufs {
- j := idx[i]
- bufs[i].Size = bufferSize
- bufs[i].Offset = uint64(bufferSize * j)
- bufs[i].ID = uint64(j)
-
- copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:])
- }
-
- // Push completion.
- c.pushRxCompletion(uint32(len(contents)), bufs)
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for packet to be received, then check it.
- c.waitForPackets(1, time.After(5*time.Second), "Timeout waiting for packet")
- c.mu.Lock()
- rcvd := []byte(c.packets[0].vv.First())
- c.packets = c.packets[:0]
- c.mu.Unlock()
-
- if contents := contents[header.EthernetMinimumSize:]; !bytes.Equal(contents, rcvd) {
- t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents)
- }
-
- // Check that buffers have been reposted.
- for i := range bufs {
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted"))
- if bi != bufs[i] {
- t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i])
- }
- }
- c.rxq.tx.Flush()
- }
-}
-
-// TestRxBuffersReposted tests that rx buffers get reposted after they have been
-// completed.
-func TestRxBuffersReposted(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Receive all posted buffers.
- limit := c.ep.rx.q.PostedBuffersLimit()
- buffers := make([]queue.RxBuffer, 0, limit)
- for i := limit; i > 0; i-- {
- timeout := time.After(2 * time.Second)
- buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers")))
- }
- c.rxq.tx.Flush()
-
- // Check that all buffers are reposted when individually completed.
- for i := range buffers {
- timeout := time.After(2 * time.Second)
- // Complete the buffer.
- c.pushRxCompletion(buffers[i].Size, buffers[i:][:1])
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for it to be reposted.
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
- if bi != buffers[i] {
- t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i])
- }
- }
- c.rxq.tx.Flush()
-
- // Check that all buffers are reposted when completed in pairs.
- for i := 0; i < len(buffers)/2; i++ {
- timeout := time.After(2 * time.Second)
- // Complete with two buffers.
- c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2])
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for them to be reposted.
- for j := 0; j < 2; j++ {
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted"))
- if bi != buffers[2*i+j] {
- t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+j])
- }
- }
- }
- c.rxq.tx.Flush()
-}
-
-// TestReceivePostingIsFull checks that the endpoint will properly handle the
-// case when a received buffer cannot be immediately reposted because it hasn't
-// been pulled from the tx pipe yet.
-func TestReceivePostingIsFull(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- defer c.cleanup()
-
- // Complete first posted buffer before flushing it from the tx pipe.
- first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted"))
- c.pushRxCompletion(first.Size, []queue.RxBuffer{first})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that packet is received.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
-
- // Complete another buffer.
- second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted"))
- c.pushRxCompletion(second.Size, []queue.RxBuffer{second})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that no packet is received yet, as the worker is blocked trying
- // to repost.
- select {
- case <-time.After(500 * time.Millisecond):
- case <-c.packetCh:
- t.Fatalf("Unexpected packet received")
- }
-
- // Flush tx queue, which will allow the first buffer to be reposted,
- // and the second completion to be pulled.
- c.rxq.tx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Check that second packet completes.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet")
-}
-
-// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to
-// repost a buffer. Make sure it backs out.
-func TestCloseWhileWaitingToPost(t *testing.T) {
- const bufferSize = 1500
- c := newTestContext(t, 20000, bufferSize, localLinkAddr)
- cleaned := false
- defer func() {
- if !cleaned {
- c.cleanup()
- }
- }()
-
- // Complete first posted buffer before flushing it from the tx pipe.
- bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted"))
- c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi})
- c.rxq.rx.Flush()
- syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0})
-
- // Wait for packet to be indicated.
- c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet")
-
- // Cleanup and wait for worker to complete.
- c.cleanup()
- cleaned = true
- c.ep.Wait()
-}