diff options
-rw-r--r-- | pkg/tcpip/link/fdbased/mmap_amd64_unsafe.go | 38 |
1 files changed, 30 insertions, 8 deletions
diff --git a/pkg/tcpip/link/fdbased/mmap_amd64_unsafe.go b/pkg/tcpip/link/fdbased/mmap_amd64_unsafe.go index e49cf9f61..e5ac7996d 100644 --- a/pkg/tcpip/link/fdbased/mmap_amd64_unsafe.go +++ b/pkg/tcpip/link/fdbased/mmap_amd64_unsafe.go @@ -19,6 +19,7 @@ package fdbased import ( "encoding/binary" "fmt" + "sync/atomic" "syscall" "unsafe" @@ -42,7 +43,12 @@ const ( // // Memory allocated for the ring buffer: tpBlockSize * tpBlockNR = 2 MiB // -// NOTE: Frames need to be aligned at 16 byte boundaries. +// NOTE: +// Frames need to be aligned at 16 byte boundaries. +// BlockSize needs to be page aligned. +// +// For details see PACKET_MMAP setting constraints in +// https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt const ( tpFrameSize = 65536 + 128 tpBlockSize = tpFrameSize * 32 @@ -81,12 +87,22 @@ const ( tpUSecOffset = 24 ) +// tpStatus returns the frame status field. +// The status is concurrently updated by the kernel as a result we must +// use atomic operations to prevent races. func (t tPacketHdr) tpStatus() uint32 { - return binary.LittleEndian.Uint32(t[tpStatusOffset:]) + hdr := unsafe.Pointer(&t[0]) + statusPtr := unsafe.Pointer(uintptr(hdr) + uintptr(tpStatusOffset)) + return atomic.LoadUint32((*uint32)(statusPtr)) } +// setTPStatus set's the frame status to the provided status. +// The status is concurrently updated by the kernel as a result we must +// use atomic operations to prevent races. func (t tPacketHdr) setTPStatus(status uint32) { - binary.LittleEndian.PutUint32(t[tpStatusOffset:], status) + hdr := unsafe.Pointer(&t[0]) + statusPtr := unsafe.Pointer(uintptr(hdr) + uintptr(tpStatusOffset)) + atomic.StoreUint32((*uint32)(statusPtr), status) } func (t tPacketHdr) tpLen() uint32 { @@ -118,6 +134,10 @@ func (t tPacketHdr) Payload() []byte { } func (e *endpoint) setupPacketRXRing() error { + pageSize := unix.Getpagesize() + if tpBlockSize%pageSize != 0 { + return fmt.Errorf("tpBlockSize: %d is not page aligned, pagesize: %d", tpBlockSize, pageSize) + } tReq := tPacketReq{ tpBlockSize: uint32(tpBlockSize), tpBlockNR: uint32(tpBlockNR), @@ -139,8 +159,8 @@ func (e *endpoint) setupPacketRXRing() error { } func (e *endpoint) readMMappedPacket() ([]byte, *tcpip.Error) { - hdr := (tPacketHdr)(e.ringBuffer[0+e.ringOffset*tpFrameSize:]) - for (hdr.tpStatus() & tpStatusUser) == 0 { + hdr := (tPacketHdr)(e.ringBuffer[e.ringOffset*tpFrameSize:]) + for hdr.tpStatus()&tpStatusUser == 0 { event := rawfile.PollEvent{ FD: int32(e.fd), Events: unix.POLLIN | unix.POLLERR, @@ -153,9 +173,11 @@ func (e *endpoint) readMMappedPacket() ([]byte, *tcpip.Error) { return nil, rawfile.TranslateErrno(errno) } if hdr.tpStatus()&tpStatusCopy != 0 { - continue - } - if hdr.tpStatus()&tpStatusLosing != 0 { + // This frame is truncated so skip it after flipping the + // buffer to the kernel. + hdr.setTPStatus(tpStatusKernel) + e.ringOffset = (e.ringOffset + 1) % tpFrameNR + hdr = (tPacketHdr)(e.ringBuffer[e.ringOffset*tpFrameSize:]) continue } } |