diff options
Diffstat (limited to 'pkg/tcpip/link/sharedmem/server_tx.go')
-rw-r--r-- | pkg/tcpip/link/sharedmem/server_tx.go | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/pkg/tcpip/link/sharedmem/server_tx.go b/pkg/tcpip/link/sharedmem/server_tx.go new file mode 100644 index 000000000..9370b2a46 --- /dev/null +++ b/pkg/tcpip/link/sharedmem/server_tx.go @@ -0,0 +1,179 @@ +// Copyright 2021 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux +// +build linux + +package sharedmem + +import ( + "golang.org/x/sys/unix" + "gvisor.dev/gvisor/pkg/cleanup" + "gvisor.dev/gvisor/pkg/tcpip/buffer" + "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/pipe" + "gvisor.dev/gvisor/pkg/tcpip/link/sharedmem/queue" +) + +// serverTx represents the server end of the sharedmem queue and is used to send +// packets to the peer in the buffers posted by the peer in the fillPipe. +type serverTx struct { + // fillPipe represents the receive end of the pipe that carries the RxBuffers + // posted by the peer. + fillPipe pipe.Rx + + // completionPipe represents the transmit end of the pipe that carries the + // descriptors for filled RxBuffers. + completionPipe pipe.Tx + + // data represents the buffer area where the packet payload is held. + data []byte + + // eventFD is used to notify the peer when fill requests are fulfilled. + eventFD int + + // sharedData the memory region to use to enable/disable notifications. + sharedData []byte +} + +// init initializes all tstate needed by the serverTx queue based on the +// information provided. +// +// The caller always retains ownership of all file descriptors passed in. The +// queue implementation will duplicate any that it may need in the future. +func (s *serverTx) init(c *QueueConfig) error { + // Map in all buffers. + fillPipeMem, err := getBuffer(c.TxPipeFD) + if err != nil { + return err + } + cu := cleanup.Make(func() { unix.Munmap(fillPipeMem) }) + defer cu.Clean() + + completionPipeMem, err := getBuffer(c.RxPipeFD) + if err != nil { + return err + } + cu.Add(func() { unix.Munmap(completionPipeMem) }) + + data, err := getBuffer(c.DataFD) + if err != nil { + return err + } + cu.Add(func() { unix.Munmap(data) }) + + sharedData, err := getBuffer(c.SharedDataFD) + if err != nil { + return err + } + cu.Add(func() { unix.Munmap(sharedData) }) + + // Duplicate the eventFD so that caller can close it but we can still + // use it. + efd, err := unix.Dup(c.EventFD) + if err != nil { + return err + } + cu.Add(func() { unix.Close(efd) }) + + // Set the eventfd as non-blocking. + if err := unix.SetNonblock(efd, true); err != nil { + return err + } + + cu.Release() + + s.fillPipe.Init(fillPipeMem) + s.completionPipe.Init(completionPipeMem) + s.data = data + s.eventFD = efd + s.sharedData = sharedData + + return nil +} + +func (s *serverTx) cleanup() { + unix.Munmap(s.fillPipe.Bytes()) + unix.Munmap(s.completionPipe.Bytes()) + unix.Munmap(s.data) + unix.Munmap(s.sharedData) + unix.Close(s.eventFD) +} + +// fillPacket copies the data in the provided views into buffers pulled from the +// fillPipe and returns a slice of RxBuffers that contain the copied data as +// well as the total number of bytes copied. +// +// To avoid allocations the filledBuffers are appended to the buffers slice +// which will be grown as required. +func (s *serverTx) fillPacket(views []buffer.View, buffers []queue.RxBuffer) (filledBuffers []queue.RxBuffer, totalCopied uint32) { + filledBuffers = buffers[:0] + // fillBuffer copies as much of the views as possible into the provided buffer + // and returns any left over views (if any). + fillBuffer := func(buffer *queue.RxBuffer, views []buffer.View) (left []buffer.View) { + if len(views) == 0 { + return nil + } + availBytes := buffer.Size + copied := uint64(0) + for availBytes > 0 && len(views) > 0 { + n := copy(s.data[buffer.Offset+copied:][:uint64(buffer.Size)-copied], views[0]) + views[0].TrimFront(n) + if !views[0].IsEmpty() { + break + } + views = views[1:] + copied += uint64(n) + availBytes -= uint32(n) + } + buffer.Size = uint32(copied) + return views + } + + for len(views) > 0 { + var b []byte + // Spin till we get a free buffer reposted by the peer. + for { + if b = s.fillPipe.Pull(); b != nil { + break + } + } + rxBuffer := queue.DecodeRxBufferHeader(b) + // Copy the packet into the posted buffer. + views = fillBuffer(&rxBuffer, views) + totalCopied += rxBuffer.Size + filledBuffers = append(filledBuffers, rxBuffer) + } + + return filledBuffers, totalCopied +} + +func (s *serverTx) transmit(views []buffer.View) bool { + buffers := make([]queue.RxBuffer, 8) + buffers, totalCopied := s.fillPacket(views, buffers) + b := s.completionPipe.Push(queue.RxCompletionSize(len(buffers))) + if b == nil { + return false + } + queue.EncodeRxCompletion(b, totalCopied, 0 /* reserved */) + for i := 0; i < len(buffers); i++ { + queue.EncodeRxCompletionBuffer(b, i, buffers[i]) + } + s.completionPipe.Flush() + s.fillPipe.Flush() + return true +} + +func (s *serverTx) notify() { + unix.Write(s.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) +} |