diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/tcpip/link/sharedmem/sharedmem_test.go | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/tcpip/link/sharedmem/sharedmem_test.go')
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem_test.go | 703 |
1 files changed, 703 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go new file mode 100644 index 000000000..f71e4751f --- /dev/null +++ b/pkg/tcpip/link/sharedmem/sharedmem_test.go @@ -0,0 +1,703 @@ +// Copyright 2016 The Netstack Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sharedmem + +import ( + "io/ioutil" + "math/rand" + "os" + "reflect" + "sync" + "syscall" + "testing" + "time" + + "gvisor.googlesource.com/gvisor/pkg/tcpip" + "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" + "gvisor.googlesource.com/gvisor/pkg/tcpip/header" + "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" + "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" + "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" +) + +const ( + localLinkAddr = "\xde\xad\xbe\xef\x56\x78" + remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34" + + queueDataSize = 1024 * 1024 + queuePipeSize = 4096 +) + +type queueBuffers struct { + data []byte + rx pipe.Tx + tx pipe.Rx +} + +func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) { + // Prepare tx pipe. + b, err := getBuffer(c.TxPipeFD) + if err != nil { + t.Fatalf("getBuffer failed: %v", err) + } + q.tx.Init(b) + + // Prepare rx pipe. + b, err = getBuffer(c.RxPipeFD) + if err != nil { + t.Fatalf("getBuffer failed: %v", err) + } + q.rx.Init(b) + + // Get data slice. + q.data, err = getBuffer(c.DataFD) + if err != nil { + t.Fatalf("getBuffer failed: %v", err) + } +} + +func (q *queueBuffers) cleanup() { + syscall.Munmap(q.tx.Bytes()) + syscall.Munmap(q.rx.Bytes()) + syscall.Munmap(q.data) +} + +type packetInfo struct { + addr tcpip.LinkAddress + proto tcpip.NetworkProtocolNumber + vv buffer.VectorisedView +} + +type testContext struct { + t *testing.T + ep *endpoint + txCfg QueueConfig + rxCfg QueueConfig + txq queueBuffers + rxq queueBuffers + + packetCh chan struct{} + mu sync.Mutex + packets []packetInfo +} + +func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext { + var err error + c := &testContext{ + t: t, + packetCh: make(chan struct{}, 1000000), + } + c.txCfg = createQueueFDs(t, queueSizes{ + dataSize: queueDataSize, + txPipeSize: queuePipeSize, + rxPipeSize: queuePipeSize, + sharedDataSize: 4096, + }) + + c.rxCfg = createQueueFDs(t, queueSizes{ + dataSize: queueDataSize, + txPipeSize: queuePipeSize, + rxPipeSize: queuePipeSize, + sharedDataSize: 4096, + }) + + initQueue(t, &c.txq, &c.txCfg) + initQueue(t, &c.rxq, &c.rxCfg) + + id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + c.ep = stack.FindLinkEndpoint(id).(*endpoint) + c.ep.Attach(c) + + return c +} + +func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv *buffer.VectorisedView) { + c.mu.Lock() + c.packets = append(c.packets, packetInfo{ + addr: remoteAddr, + proto: proto, + vv: vv.Clone(nil), + }) + c.mu.Unlock() + + c.packetCh <- struct{}{} +} + +func (c *testContext) cleanup() { + c.ep.Close() + closeFDs(&c.txCfg) + closeFDs(&c.rxCfg) + c.txq.cleanup() + c.rxq.cleanup() +} + +func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) { + for i := 0; i < n; i++ { + select { + case <-c.packetCh: + case <-to: + c.t.Fatalf(errorStr) + } + } +} + +func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) { + b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs))) + queue.EncodeRxCompletion(b, size, 0) + for i := range bs { + queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{ + Offset: bs[i].Offset, + Size: bs[i].Size, + ID: bs[i].ID, + }) + } +} + +func randomFill(b []byte) { + for i := range b { + b[i] = byte(rand.Intn(256)) + } +} + +func shuffle(b []int) { + for i := len(b) - 1; i >= 0; i-- { + j := rand.Intn(i + 1) + b[i], b[j] = b[j], b[i] + } +} + +func createFile(t *testing.T, size int64, initQueue bool) int { + tmpDir := os.Getenv("TEST_TMPDIR") + if tmpDir == "" { + tmpDir = os.Getenv("TMPDIR") + } + f, err := ioutil.TempFile(tmpDir, "sharedmem_test") + if err != nil { + t.Fatalf("TempFile failed: %v", err) + } + defer f.Close() + syscall.Unlink(f.Name()) + + if initQueue { + // Write the "slot-free" flag in the initial queue. + _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0) + if err != nil { + t.Fatalf("WriteAt failed: %v", err) + } + } + + fd, err := syscall.Dup(int(f.Fd())) + if err != nil { + t.Fatalf("Dup failed: %v", err) + } + + if err := syscall.Ftruncate(fd, size); err != nil { + syscall.Close(fd) + t.Fatalf("Ftruncate failed: %v", err) + } + + return fd +} + +func closeFDs(c *QueueConfig) { + syscall.Close(c.DataFD) + syscall.Close(c.EventFD) + syscall.Close(c.TxPipeFD) + syscall.Close(c.RxPipeFD) + syscall.Close(c.SharedDataFD) +} + +type queueSizes struct { + dataSize int64 + txPipeSize int64 + rxPipeSize int64 + sharedDataSize int64 +} + +func createQueueFDs(t *testing.T, s queueSizes) QueueConfig { + fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0) + if err != 0 { + t.Fatalf("eventfd failed: %v", error(err)) + } + + return QueueConfig{ + EventFD: int(fd), + DataFD: createFile(t, s.dataSize, false), + TxPipeFD: createFile(t, s.txPipeSize, true), + RxPipeFD: createFile(t, s.rxPipeSize, true), + SharedDataFD: createFile(t, s.sharedDataSize, false), + } +} + +// TestSimpleSend sends 1000 packets with random header and payload sizes, +// then checks that the right payload is received on the shared memory queues. +func TestSimpleSend(t *testing.T) { + c := newTestContext(t, 20000, 1500, localLinkAddr) + defer c.cleanup() + + // Prepare route. + r := stack.Route{ + RemoteLinkAddress: remoteLinkAddr, + } + + for iters := 1000; iters > 0; iters-- { + // Prepare and send packet. + n := rand.Intn(10000) + hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength())) + hdrBuf := hdr.Prepend(n) + randomFill(hdrBuf) + + n = rand.Intn(10000) + buf := buffer.NewView(n) + randomFill(buf) + + proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000)) + err := c.ep.WritePacket(&r, &hdr, buf, proto) + if err != nil { + t.Fatalf("WritePacket failed: %v", err) + } + + // Receive packet. + desc := c.txq.tx.Pull() + pi := queue.DecodeTxPacketHeader(desc) + contents := make([]byte, 0, pi.Size) + for i := 0; i < pi.BufferCount; i++ { + bi := queue.DecodeTxBufferHeader(desc, i) + contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...) + } + c.txq.tx.Flush() + + if pi.Reserved != 0 { + t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved) + } + + // Check the thernet header. + ethTemplate := make(header.Ethernet, header.EthernetMinimumSize) + ethTemplate.Encode(&header.EthernetFields{ + SrcAddr: localLinkAddr, + DstAddr: remoteLinkAddr, + Type: proto, + }) + if got := contents[:header.EthernetMinimumSize]; !reflect.DeepEqual(got, []byte(ethTemplate)) { + t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate) + } + + // Compare contents skipping the ethernet header added by the + // endpoint. + merged := append(hdrBuf, buf...) + if uint32(len(contents)) < pi.Size { + t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size) + } + contents = contents[:pi.Size][header.EthernetMinimumSize:] + + if !reflect.DeepEqual(contents, merged) { + t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged)) + } + + // Tell the endpoint about the completion of the write. + b := c.txq.rx.Push(8) + queue.EncodeTxCompletion(b, pi.ID) + c.txq.rx.Flush() + } +} + +// TestFillTxQueue sends packets until the queue is full. +func TestFillTxQueue(t *testing.T) { + c := newTestContext(t, 20000, 1500, localLinkAddr) + defer c.cleanup() + + // Prepare to send a packet. + r := stack.Route{ + RemoteLinkAddress: remoteLinkAddr, + } + + buf := buffer.NewView(100) + + // Each packet is uses no more than 40 bytes, so write that many packets + // until the tx queue if full. + ids := make(map[uint64]struct{}) + for i := queuePipeSize / 40; i > 0; i-- { + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } + + // Check that they have different IDs. + desc := c.txq.tx.Pull() + pi := queue.DecodeTxPacketHeader(desc) + if _, ok := ids[pi.ID]; ok { + t.Fatalf("ID (%v) reused", pi.ID) + } + ids[pi.ID] = struct{}{} + } + + // Next attempt to write must fail. + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) + if want := tcpip.ErrWouldBlock; err != want { + t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) + } +} + +// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets +// until the queue is full. +func TestFillTxQueueAfterBadCompletion(t *testing.T) { + c := newTestContext(t, 20000, 1500, localLinkAddr) + defer c.cleanup() + + // Send a bad completion. + queue.EncodeTxCompletion(c.txq.rx.Push(8), 1) + c.txq.rx.Flush() + + // Prepare to send a packet. + r := stack.Route{ + RemoteLinkAddress: remoteLinkAddr, + } + + buf := buffer.NewView(100) + + // Send two packets so that the id slice has at least two slots. + for i := 2; i > 0; i-- { + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } + } + + // Complete the two writes twice. + for i := 2; i > 0; i-- { + pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull()) + + queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) + queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) + c.txq.rx.Flush() + } + c.txq.tx.Flush() + + // Each packet is uses no more than 40 bytes, so write that many packets + // until the tx queue if full. + ids := make(map[uint64]struct{}) + for i := queuePipeSize / 40; i > 0; i-- { + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } + + // Check that they have different IDs. + desc := c.txq.tx.Pull() + pi := queue.DecodeTxPacketHeader(desc) + if _, ok := ids[pi.ID]; ok { + t.Fatalf("ID (%v) reused", pi.ID) + } + ids[pi.ID] = struct{}{} + } + + // Next attempt to write must fail. + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) + if want := tcpip.ErrWouldBlock; err != want { + t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) + } +} + +// TestFillTxMemory sends packets until the we run out of shared memory. +func TestFillTxMemory(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + defer c.cleanup() + + // Prepare to send a packet. + r := stack.Route{ + RemoteLinkAddress: remoteLinkAddr, + } + + buf := buffer.NewView(100) + + // Each packet is uses up one buffer, so write as many as possible until + // we fill the memory. + ids := make(map[uint64]struct{}) + for i := queueDataSize / bufferSize; i > 0; i-- { + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } + + // Check that they have different IDs. + desc := c.txq.tx.Pull() + pi := queue.DecodeTxPacketHeader(desc) + if _, ok := ids[pi.ID]; ok { + t.Fatalf("ID (%v) reused", pi.ID) + } + ids[pi.ID] = struct{}{} + c.txq.tx.Flush() + } + + // Next attempt to write must fail. + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) + if want := tcpip.ErrWouldBlock; err != want { + t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) + } +} + +// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of +// shared memory for a 2-buffer packet, but still with room for a 1-buffer +// packet. +func TestFillTxMemoryWithMultiBuffer(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + defer c.cleanup() + + // Prepare to send a packet. + r := stack.Route{ + RemoteLinkAddress: remoteLinkAddr, + } + + buf := buffer.NewView(100) + + // Each packet is uses up one buffer, so write as many as possible + // until there is only one buffer left. + for i := queueDataSize/bufferSize - 1; i > 0; i-- { + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } + + // Pull the posted buffer. + c.txq.tx.Pull() + c.txq.tx.Flush() + } + + // Attempt to write a two-buffer packet. It must fail. + hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + err := c.ep.WritePacket(&r, &hdr, buffer.NewView(bufferSize), header.IPv4ProtocolNumber) + if want := tcpip.ErrWouldBlock; err != want { + t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) + } + + // Attempt to write a one-buffer packet. It must succeed. + hdr = buffer.NewPrependable(int(c.ep.MaxHeaderLength())) + if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { + t.Fatalf("WritePacket failed unexpectedly: %v", err) + } +} + +func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte { + for { + b := p.Pull() + if b != nil { + return b + } + + select { + case <-time.After(10 * time.Millisecond): + case <-to: + t.Fatalf(errStr) + } + } +} + +// TestSimpleReceive completes 1000 different receives with random payload and +// random number of buffers. It checks that the contents match the expected +// values. +func TestSimpleReceive(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + defer c.cleanup() + + // Check that buffers have been posted. + limit := c.ep.rx.q.PostedBuffersLimit() + timeout := time.After(2 * time.Second) + for i := uint64(0); i < limit; i++ { + bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted")) + + if want := i * bufferSize; want != bi.Offset { + t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want) + } + + if want := i; want != bi.ID { + t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want) + } + + if bufferSize != bi.Size { + t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize) + } + } + c.rxq.tx.Flush() + + // Create a slice with the indices 0..limit-1. + idx := make([]int, limit) + for i := range idx { + idx[i] = i + } + + // Complete random packets 1000 times. + for iters := 1000; iters > 0; iters-- { + // Prepare a random packet. + shuffle(idx) + n := 1 + rand.Intn(10) + bufs := make([]queue.RxBuffer, n) + contents := make([]byte, bufferSize*n-rand.Intn(500)) + randomFill(contents) + for i := range bufs { + j := idx[i] + bufs[i].Size = bufferSize + bufs[i].Offset = uint64(bufferSize * j) + bufs[i].ID = uint64(j) + + copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:]) + } + + // Push completion. + c.pushRxCompletion(uint32(len(contents)), bufs) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Wait for packet to be received, then check it. + c.waitForPackets(1, time.After(time.Second), "Error waiting for packet") + c.mu.Lock() + rcvd := []byte(c.packets[0].vv.First()) + c.packets = c.packets[:0] + c.mu.Unlock() + + contents = contents[header.EthernetMinimumSize:] + if !reflect.DeepEqual(contents, rcvd) { + t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents) + } + + // Check that buffers have been reposted. + for i := range bufs { + bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted")) + if !reflect.DeepEqual(bi, bufs[i]) { + t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i]) + } + } + c.rxq.tx.Flush() + } +} + +// TestRxBuffersReposted tests that rx buffers get reposted after they have been +// completed. +func TestRxBuffersReposted(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + defer c.cleanup() + + // Receive all posted buffers. + limit := c.ep.rx.q.PostedBuffersLimit() + buffers := make([]queue.RxBuffer, 0, limit) + timeout := time.After(2 * time.Second) + for i := limit; i > 0; i-- { + buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers"))) + } + c.rxq.tx.Flush() + + // Check that all buffers are reposted when individually completed. + timeout = time.After(2 * time.Second) + for i := range buffers { + // Complete the buffer. + c.pushRxCompletion(buffers[i].Size, buffers[i:][:1]) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Wait for it to be reposted. + bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) + if !reflect.DeepEqual(bi, buffers[i]) { + t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i]) + } + } + c.rxq.tx.Flush() + + // Check that all buffers are reposted when completed in pairs. + timeout = time.After(2 * time.Second) + for i := 0; i < len(buffers)/2; i++ { + // Complete with two buffers. + c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2]) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Wait for them to be reposted. + bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) + if !reflect.DeepEqual(bi, buffers[2*i]) { + t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i]) + } + bi = queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) + if !reflect.DeepEqual(bi, buffers[2*i+1]) { + t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+1]) + } + } + c.rxq.tx.Flush() +} + +// TestReceivePostingIsFull checks that the endpoint will properly handle the +// case when a received buffer cannot be immediately reposted because it hasn't +// been pulled from the tx pipe yet. +func TestReceivePostingIsFull(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + defer c.cleanup() + + // Complete first posted buffer before flushing it from the tx pipe. + first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted")) + c.pushRxCompletion(first.Size, []queue.RxBuffer{first}) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Check that packet is received. + c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") + + // Complete another buffer. + second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted")) + c.pushRxCompletion(second.Size, []queue.RxBuffer{second}) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Check that no packet is received yet, as the worker is blocked trying + // to repost. + select { + case <-time.After(500 * time.Millisecond): + case <-c.packetCh: + t.Fatalf("Unexpected packet received") + } + + // Flush tx queue, which will allow the first buffer to be reposted, + // and the second completion to be pulled. + c.rxq.tx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Check that second packet completes. + c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet") +} + +// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to +// repost a buffer. Make sure it backs out. +func TestCloseWhileWaitingToPost(t *testing.T) { + const bufferSize = 1500 + c := newTestContext(t, 20000, bufferSize, localLinkAddr) + cleaned := false + defer func() { + if !cleaned { + c.cleanup() + } + }() + + // Complete first posted buffer before flushing it from the tx pipe. + bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted")) + c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi}) + c.rxq.rx.Flush() + syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) + + // Wait for packet to be indicated. + c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") + + // Cleanup and wait for worker to complete. + c.cleanup() + cleaned = true + c.ep.Wait() +} |