// Copyright 2018 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package sharedmem import ( "io/ioutil" "math/rand" "os" "reflect" "sync" "syscall" "testing" "time" "gvisor.googlesource.com/gvisor/pkg/tcpip" "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" "gvisor.googlesource.com/gvisor/pkg/tcpip/header" "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" ) const ( localLinkAddr = "\xde\xad\xbe\xef\x56\x78" remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34" queueDataSize = 1024 * 1024 queuePipeSize = 4096 ) type queueBuffers struct { data []byte rx pipe.Tx tx pipe.Rx } func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) { // Prepare tx pipe. b, err := getBuffer(c.TxPipeFD) if err != nil { t.Fatalf("getBuffer failed: %v", err) } q.tx.Init(b) // Prepare rx pipe. b, err = getBuffer(c.RxPipeFD) if err != nil { t.Fatalf("getBuffer failed: %v", err) } q.rx.Init(b) // Get data slice. q.data, err = getBuffer(c.DataFD) if err != nil { t.Fatalf("getBuffer failed: %v", err) } } func (q *queueBuffers) cleanup() { syscall.Munmap(q.tx.Bytes()) syscall.Munmap(q.rx.Bytes()) syscall.Munmap(q.data) } type packetInfo struct { addr tcpip.LinkAddress proto tcpip.NetworkProtocolNumber vv buffer.VectorisedView } type testContext struct { t *testing.T ep *endpoint txCfg QueueConfig rxCfg QueueConfig txq queueBuffers rxq queueBuffers packetCh chan struct{} mu sync.Mutex packets []packetInfo } func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext { var err error c := &testContext{ t: t, packetCh: make(chan struct{}, 1000000), } c.txCfg = createQueueFDs(t, queueSizes{ dataSize: queueDataSize, txPipeSize: queuePipeSize, rxPipeSize: queuePipeSize, sharedDataSize: 4096, }) c.rxCfg = createQueueFDs(t, queueSizes{ dataSize: queueDataSize, txPipeSize: queuePipeSize, rxPipeSize: queuePipeSize, sharedDataSize: 4096, }) initQueue(t, &c.txq, &c.txCfg) initQueue(t, &c.rxq, &c.rxCfg) id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg) if err != nil { t.Fatalf("New failed: %v", err) } c.ep = stack.FindLinkEndpoint(id).(*endpoint) c.ep.Attach(c) return c } func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv *buffer.VectorisedView) { c.mu.Lock() c.packets = append(c.packets, packetInfo{ addr: remoteAddr, proto: proto, vv: vv.Clone(nil), }) c.mu.Unlock() c.packetCh <- struct{}{} } func (c *testContext) cleanup() { c.ep.Close() closeFDs(&c.txCfg) closeFDs(&c.rxCfg) c.txq.cleanup() c.rxq.cleanup() } func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) { for i := 0; i < n; i++ { select { case <-c.packetCh: case <-to: c.t.Fatalf(errorStr) } } } func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) { b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs))) queue.EncodeRxCompletion(b, size, 0) for i := range bs { queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{ Offset: bs[i].Offset, Size: bs[i].Size, ID: bs[i].ID, }) } } func randomFill(b []byte) { for i := range b { b[i] = byte(rand.Intn(256)) } } func shuffle(b []int) { for i := len(b) - 1; i >= 0; i-- { j := rand.Intn(i + 1) b[i], b[j] = b[j], b[i] } } func createFile(t *testing.T, size int64, initQueue bool) int { tmpDir := os.Getenv("TEST_TMPDIR") if tmpDir == "" { tmpDir = os.Getenv("TMPDIR") } f, err := ioutil.TempFile(tmpDir, "sharedmem_test") if err != nil { t.Fatalf("TempFile failed: %v", err) } defer f.Close() syscall.Unlink(f.Name()) if initQueue { // Write the "slot-free" flag in the initial queue. _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0) if err != nil { t.Fatalf("WriteAt failed: %v", err) } } fd, err := syscall.Dup(int(f.Fd())) if err != nil { t.Fatalf("Dup failed: %v", err) } if err := syscall.Ftruncate(fd, size); err != nil { syscall.Close(fd) t.Fatalf("Ftruncate failed: %v", err) } return fd } func closeFDs(c *QueueConfig) { syscall.Close(c.DataFD) syscall.Close(c.EventFD) syscall.Close(c.TxPipeFD) syscall.Close(c.RxPipeFD) syscall.Close(c.SharedDataFD) } type queueSizes struct { dataSize int64 txPipeSize int64 rxPipeSize int64 sharedDataSize int64 } func createQueueFDs(t *testing.T, s queueSizes) QueueConfig { fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0) if err != 0 { t.Fatalf("eventfd failed: %v", error(err)) } return QueueConfig{ EventFD: int(fd), DataFD: createFile(t, s.dataSize, false), TxPipeFD: createFile(t, s.txPipeSize, true), RxPipeFD: createFile(t, s.rxPipeSize, true), SharedDataFD: createFile(t, s.sharedDataSize, false), } } // TestSimpleSend sends 1000 packets with random header and payload sizes, // then checks that the right payload is received on the shared memory queues. func TestSimpleSend(t *testing.T) { c := newTestContext(t, 20000, 1500, localLinkAddr) defer c.cleanup() // Prepare route. r := stack.Route{ RemoteLinkAddress: remoteLinkAddr, } for iters := 1000; iters > 0; iters-- { // Prepare and send packet. n := rand.Intn(10000) hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength())) hdrBuf := hdr.Prepend(n) randomFill(hdrBuf) n = rand.Intn(10000) buf := buffer.NewView(n) randomFill(buf) proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000)) err := c.ep.WritePacket(&r, &hdr, buf, proto) if err != nil { t.Fatalf("WritePacket failed: %v", err) } // Receive packet. desc := c.txq.tx.Pull() pi := queue.DecodeTxPacketHeader(desc) contents := make([]byte, 0, pi.Size) for i := 0; i < pi.BufferCount; i++ { bi := queue.DecodeTxBufferHeader(desc, i) contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...) } c.txq.tx.Flush() if pi.Reserved != 0 { t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved) } // Check the thernet header. ethTemplate := make(header.Ethernet, header.EthernetMinimumSize) ethTemplate.Encode(&header.EthernetFields{ SrcAddr: localLinkAddr, DstAddr: remoteLinkAddr, Type: proto, }) if got := contents[:header.EthernetMinimumSize]; !reflect.DeepEqual(got, []byte(ethTemplate)) { t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate) } // Compare contents skipping the ethernet header added by the // endpoint. merged := append(hdrBuf, buf...) if uint32(len(contents)) < pi.Size { t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size) } contents = contents[:pi.Size][header.EthernetMinimumSize:] if !reflect.DeepEqual(contents, merged) { t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged)) } // Tell the endpoint about the completion of the write. b := c.txq.rx.Push(8) queue.EncodeTxCompletion(b, pi.ID) c.txq.rx.Flush() } } // TestFillTxQueue sends packets until the queue is full. func TestFillTxQueue(t *testing.T) { c := newTestContext(t, 20000, 1500, localLinkAddr) defer c.cleanup() // Prepare to send a packet. r := stack.Route{ RemoteLinkAddress: remoteLinkAddr, } buf := buffer.NewView(100) // Each packet is uses no more than 40 bytes, so write that many packets // until the tx queue if full. ids := make(map[uint64]struct{}) for i := queuePipeSize / 40; i > 0; i-- { hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } // Check that they have different IDs. desc := c.txq.tx.Pull() pi := queue.DecodeTxPacketHeader(desc) if _, ok := ids[pi.ID]; ok { t.Fatalf("ID (%v) reused", pi.ID) } ids[pi.ID] = struct{}{} } // Next attempt to write must fail. hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) if want := tcpip.ErrWouldBlock; err != want { t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) } } // TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets // until the queue is full. func TestFillTxQueueAfterBadCompletion(t *testing.T) { c := newTestContext(t, 20000, 1500, localLinkAddr) defer c.cleanup() // Send a bad completion. queue.EncodeTxCompletion(c.txq.rx.Push(8), 1) c.txq.rx.Flush() // Prepare to send a packet. r := stack.Route{ RemoteLinkAddress: remoteLinkAddr, } buf := buffer.NewView(100) // Send two packets so that the id slice has at least two slots. for i := 2; i > 0; i-- { hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } } // Complete the two writes twice. for i := 2; i > 0; i-- { pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull()) queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) c.txq.rx.Flush() } c.txq.tx.Flush() // Each packet is uses no more than 40 bytes, so write that many packets // until the tx queue if full. ids := make(map[uint64]struct{}) for i := queuePipeSize / 40; i > 0; i-- { hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } // Check that they have different IDs. desc := c.txq.tx.Pull() pi := queue.DecodeTxPacketHeader(desc) if _, ok := ids[pi.ID]; ok { t.Fatalf("ID (%v) reused", pi.ID) } ids[pi.ID] = struct{}{} } // Next attempt to write must fail. hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) if want := tcpip.ErrWouldBlock; err != want { t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) } } // TestFillTxMemory sends packets until the we run out of shared memory. func TestFillTxMemory(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) defer c.cleanup() // Prepare to send a packet. r := stack.Route{ RemoteLinkAddress: remoteLinkAddr, } buf := buffer.NewView(100) // Each packet is uses up one buffer, so write as many as possible until // we fill the memory. ids := make(map[uint64]struct{}) for i := queueDataSize / bufferSize; i > 0; i-- { hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } // Check that they have different IDs. desc := c.txq.tx.Pull() pi := queue.DecodeTxPacketHeader(desc) if _, ok := ids[pi.ID]; ok { t.Fatalf("ID (%v) reused", pi.ID) } ids[pi.ID] = struct{}{} c.txq.tx.Flush() } // Next attempt to write must fail. hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber) if want := tcpip.ErrWouldBlock; err != want { t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) } } // TestFillTxMemoryWithMultiBuffer sends packets until the we run out of // shared memory for a 2-buffer packet, but still with room for a 1-buffer // packet. func TestFillTxMemoryWithMultiBuffer(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) defer c.cleanup() // Prepare to send a packet. r := stack.Route{ RemoteLinkAddress: remoteLinkAddr, } buf := buffer.NewView(100) // Each packet is uses up one buffer, so write as many as possible // until there is only one buffer left. for i := queueDataSize/bufferSize - 1; i > 0; i-- { hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } // Pull the posted buffer. c.txq.tx.Pull() c.txq.tx.Flush() } // Attempt to write a two-buffer packet. It must fail. hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) err := c.ep.WritePacket(&r, &hdr, buffer.NewView(bufferSize), header.IPv4ProtocolNumber) if want := tcpip.ErrWouldBlock; err != want { t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) } // Attempt to write a one-buffer packet. It must succeed. hdr = buffer.NewPrependable(int(c.ep.MaxHeaderLength())) if err := c.ep.WritePacket(&r, &hdr, buf, header.IPv4ProtocolNumber); err != nil { t.Fatalf("WritePacket failed unexpectedly: %v", err) } } func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte { for { b := p.Pull() if b != nil { return b } select { case <-time.After(10 * time.Millisecond): case <-to: t.Fatalf(errStr) } } } // TestSimpleReceive completes 1000 different receives with random payload and // random number of buffers. It checks that the contents match the expected // values. func TestSimpleReceive(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) defer c.cleanup() // Check that buffers have been posted. limit := c.ep.rx.q.PostedBuffersLimit() timeout := time.After(2 * time.Second) for i := uint64(0); i < limit; i++ { bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted")) if want := i * bufferSize; want != bi.Offset { t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want) } if want := i; want != bi.ID { t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want) } if bufferSize != bi.Size { t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize) } } c.rxq.tx.Flush() // Create a slice with the indices 0..limit-1. idx := make([]int, limit) for i := range idx { idx[i] = i } // Complete random packets 1000 times. for iters := 1000; iters > 0; iters-- { // Prepare a random packet. shuffle(idx) n := 1 + rand.Intn(10) bufs := make([]queue.RxBuffer, n) contents := make([]byte, bufferSize*n-rand.Intn(500)) randomFill(contents) for i := range bufs { j := idx[i] bufs[i].Size = bufferSize bufs[i].Offset = uint64(bufferSize * j) bufs[i].ID = uint64(j) copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:]) } // Push completion. c.pushRxCompletion(uint32(len(contents)), bufs) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Wait for packet to be received, then check it. c.waitForPackets(1, time.After(time.Second), "Error waiting for packet") c.mu.Lock() rcvd := []byte(c.packets[0].vv.First()) c.packets = c.packets[:0] c.mu.Unlock() contents = contents[header.EthernetMinimumSize:] if !reflect.DeepEqual(contents, rcvd) { t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents) } // Check that buffers have been reposted. for i := range bufs { bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted")) if !reflect.DeepEqual(bi, bufs[i]) { t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i]) } } c.rxq.tx.Flush() } } // TestRxBuffersReposted tests that rx buffers get reposted after they have been // completed. func TestRxBuffersReposted(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) defer c.cleanup() // Receive all posted buffers. limit := c.ep.rx.q.PostedBuffersLimit() buffers := make([]queue.RxBuffer, 0, limit) timeout := time.After(2 * time.Second) for i := limit; i > 0; i-- { buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers"))) } c.rxq.tx.Flush() // Check that all buffers are reposted when individually completed. timeout = time.After(2 * time.Second) for i := range buffers { // Complete the buffer. c.pushRxCompletion(buffers[i].Size, buffers[i:][:1]) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Wait for it to be reposted. bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) if !reflect.DeepEqual(bi, buffers[i]) { t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i]) } } c.rxq.tx.Flush() // Check that all buffers are reposted when completed in pairs. timeout = time.After(2 * time.Second) for i := 0; i < len(buffers)/2; i++ { // Complete with two buffers. c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2]) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Wait for them to be reposted. bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) if !reflect.DeepEqual(bi, buffers[2*i]) { t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i]) } bi = queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) if !reflect.DeepEqual(bi, buffers[2*i+1]) { t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+1]) } } c.rxq.tx.Flush() } // TestReceivePostingIsFull checks that the endpoint will properly handle the // case when a received buffer cannot be immediately reposted because it hasn't // been pulled from the tx pipe yet. func TestReceivePostingIsFull(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) defer c.cleanup() // Complete first posted buffer before flushing it from the tx pipe. first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted")) c.pushRxCompletion(first.Size, []queue.RxBuffer{first}) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Check that packet is received. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") // Complete another buffer. second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted")) c.pushRxCompletion(second.Size, []queue.RxBuffer{second}) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Check that no packet is received yet, as the worker is blocked trying // to repost. select { case <-time.After(500 * time.Millisecond): case <-c.packetCh: t.Fatalf("Unexpected packet received") } // Flush tx queue, which will allow the first buffer to be reposted, // and the second completion to be pulled. c.rxq.tx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Check that second packet completes. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet") } // TestCloseWhileWaitingToPost closes the endpoint while it is waiting to // repost a buffer. Make sure it backs out. func TestCloseWhileWaitingToPost(t *testing.T) { const bufferSize = 1500 c := newTestContext(t, 20000, bufferSize, localLinkAddr) cleaned := false defer func() { if !cleaned { c.cleanup() } }() // Complete first posted buffer before flushing it from the tx pipe. bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted")) c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi}) c.rxq.rx.Flush() syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) // Wait for packet to be indicated. c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") // Cleanup and wait for worker to complete. c.cleanup() cleaned = true c.ep.Wait() }