diff options
Diffstat (limited to 'pkg/tcpip/link')
35 files changed, 16 insertions, 4696 deletions
diff --git a/pkg/tcpip/link/channel/BUILD b/pkg/tcpip/link/channel/BUILD deleted file mode 100644 index ae285e495..000000000 --- a/pkg/tcpip/link/channel/BUILD +++ /dev/null @@ -1,15 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") - -package(licenses = ["notice"]) - -go_library( - name = "channel", - srcs = ["channel.go"], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/channel", - visibility = ["//:sandbox"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go deleted file mode 100644 index ee9dd8700..000000000 --- a/pkg/tcpip/link/channel/channel.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package channel provides the implemention of channel-based data-link layer -// endpoints. Such endpoints allow injection of inbound packets and store -// outbound packets in a channel. -package channel - -import ( - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -// PacketInfo holds all the information about an outbound packet. -type PacketInfo struct { - Header buffer.View - Payload buffer.View - Proto tcpip.NetworkProtocolNumber - GSO *stack.GSO -} - -// Endpoint is link layer endpoint that stores outbound packets in a channel -// and allows injection of inbound packets. -type Endpoint struct { - dispatcher stack.NetworkDispatcher - mtu uint32 - linkAddr tcpip.LinkAddress - GSO bool - - // C is where outbound packets are queued. - C chan PacketInfo -} - -// New creates a new channel endpoint. -func New(size int, mtu uint32, linkAddr tcpip.LinkAddress) (tcpip.LinkEndpointID, *Endpoint) { - e := &Endpoint{ - C: make(chan PacketInfo, size), - mtu: mtu, - linkAddr: linkAddr, - } - - return stack.RegisterLinkEndpoint(e), e -} - -// Drain removes all outbound packets from the channel and counts them. -func (e *Endpoint) Drain() int { - c := 0 - for { - select { - case <-e.C: - c++ - default: - return c - } - } -} - -// Inject injects an inbound packet. -func (e *Endpoint) Inject(protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - e.InjectLinkAddr(protocol, "", vv) -} - -// InjectLinkAddr injects an inbound packet with a remote link address. -func (e *Endpoint) InjectLinkAddr(protocol tcpip.NetworkProtocolNumber, remote tcpip.LinkAddress, vv buffer.VectorisedView) { - e.dispatcher.DeliverNetworkPacket(e, remote, "" /* local */, protocol, vv.Clone(nil)) -} - -// Attach saves the stack network-layer dispatcher for use later when packets -// are injected. -func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) { - e.dispatcher = dispatcher -} - -// IsAttached implements stack.LinkEndpoint.IsAttached. -func (e *Endpoint) IsAttached() bool { - return e.dispatcher != nil -} - -// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized -// during construction. -func (e *Endpoint) MTU() uint32 { - return e.mtu -} - -// Capabilities implements stack.LinkEndpoint.Capabilities. -func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities { - caps := stack.LinkEndpointCapabilities(0) - if e.GSO { - caps |= stack.CapabilityGSO - } - return caps -} - -// GSOMaxSize returns the maximum GSO packet size. -func (*Endpoint) GSOMaxSize() uint32 { - return 1 << 15 -} - -// MaxHeaderLength returns the maximum size of the link layer header. Given it -// doesn't have a header, it just returns 0. -func (*Endpoint) MaxHeaderLength() uint16 { - return 0 -} - -// LinkAddress returns the link address of this endpoint. -func (e *Endpoint) LinkAddress() tcpip.LinkAddress { - return e.linkAddr -} - -// WritePacket stores outbound packets into the channel. -func (e *Endpoint) WritePacket(_ *stack.Route, gso *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { - p := PacketInfo{ - Header: hdr.View(), - Proto: protocol, - Payload: payload.ToView(), - GSO: gso, - } - - select { - case e.C <- p: - default: - } - - return nil -} diff --git a/pkg/tcpip/link/fdbased/BUILD b/pkg/tcpip/link/fdbased/BUILD deleted file mode 100644 index cef98c353..000000000 --- a/pkg/tcpip/link/fdbased/BUILD +++ /dev/null @@ -1,41 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "fdbased", - srcs = [ - "endpoint.go", - "endpoint_unsafe.go", - "mmap.go", - "mmap_amd64.go", - "mmap_amd64_unsafe.go", - "packet_dispatchers.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/fdbased", - visibility = [ - "//visibility:public", - ], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/header", - "//pkg/tcpip/link/rawfile", - "//pkg/tcpip/stack", - "@org_golang_x_sys//unix:go_default_library", - ], -) - -go_test( - name = "fdbased_test", - size = "small", - srcs = ["endpoint_test.go"], - embed = [":fdbased"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/header", - "//pkg/tcpip/link/rawfile", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/fdbased/endpoint_test.go b/pkg/tcpip/link/fdbased/endpoint_test.go deleted file mode 100644 index fd1722074..000000000 --- a/pkg/tcpip/link/fdbased/endpoint_test.go +++ /dev/null @@ -1,455 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build linux - -package fdbased - -import ( - "bytes" - "fmt" - "math/rand" - "reflect" - "syscall" - "testing" - "time" - "unsafe" - - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/header" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -const ( - mtu = 1500 - laddr = tcpip.LinkAddress("\x11\x22\x33\x44\x55\x66") - raddr = tcpip.LinkAddress("\x77\x88\x99\xaa\xbb\xcc") - proto = 10 - csumOffset = 48 - gsoMSS = 500 -) - -type packetInfo struct { - raddr tcpip.LinkAddress - proto tcpip.NetworkProtocolNumber - contents buffer.View -} - -type context struct { - t *testing.T - fds [2]int - ep stack.LinkEndpoint - ch chan packetInfo - done chan struct{} -} - -func newContext(t *testing.T, opt *Options) *context { - fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0) - if err != nil { - t.Fatalf("Socketpair failed: %v", err) - } - - done := make(chan struct{}, 1) - opt.ClosedFunc = func(*tcpip.Error) { - done <- struct{}{} - } - - opt.FD = fds[1] - epID, err := New(opt) - if err != nil { - t.Fatalf("Failed to create FD endpoint: %v", err) - } - ep := stack.FindLinkEndpoint(epID).(*endpoint) - - c := &context{ - t: t, - fds: fds, - ep: ep, - ch: make(chan packetInfo, 100), - done: done, - } - - ep.Attach(c) - - return c -} - -func (c *context) cleanup() { - syscall.Close(c.fds[0]) - <-c.done - syscall.Close(c.fds[1]) -} - -func (c *context) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote tcpip.LinkAddress, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - c.ch <- packetInfo{remote, protocol, vv.ToView()} -} - -func TestNoEthernetProperties(t *testing.T) { - c := newContext(t, &Options{MTU: mtu}) - defer c.cleanup() - - if want, v := uint16(0), c.ep.MaxHeaderLength(); want != v { - t.Fatalf("MaxHeaderLength() = %v, want %v", v, want) - } - - if want, v := uint32(mtu), c.ep.MTU(); want != v { - t.Fatalf("MTU() = %v, want %v", v, want) - } -} - -func TestEthernetProperties(t *testing.T) { - c := newContext(t, &Options{EthernetHeader: true, MTU: mtu}) - defer c.cleanup() - - if want, v := uint16(header.EthernetMinimumSize), c.ep.MaxHeaderLength(); want != v { - t.Fatalf("MaxHeaderLength() = %v, want %v", v, want) - } - - if want, v := uint32(mtu), c.ep.MTU(); want != v { - t.Fatalf("MTU() = %v, want %v", v, want) - } -} - -func TestAddress(t *testing.T) { - addrs := []tcpip.LinkAddress{"", "abc", "def"} - for _, a := range addrs { - t.Run(fmt.Sprintf("Address: %q", a), func(t *testing.T) { - c := newContext(t, &Options{Address: a, MTU: mtu}) - defer c.cleanup() - - if want, v := a, c.ep.LinkAddress(); want != v { - t.Fatalf("LinkAddress() = %v, want %v", v, want) - } - }) - } -} - -func testWritePacket(t *testing.T, plen int, eth bool, gsoMaxSize uint32) { - c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: eth, GSOMaxSize: gsoMaxSize}) - defer c.cleanup() - - r := &stack.Route{ - RemoteLinkAddress: raddr, - } - - // Build header. - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength()) + 100) - b := hdr.Prepend(100) - for i := range b { - b[i] = uint8(rand.Intn(256)) - } - - // Build payload and write. - payload := make(buffer.View, plen) - for i := range payload { - payload[i] = uint8(rand.Intn(256)) - } - want := append(hdr.View(), payload...) - var gso *stack.GSO - if gsoMaxSize != 0 { - gso = &stack.GSO{ - Type: stack.GSOTCPv6, - NeedsCsum: true, - CsumOffset: csumOffset, - MSS: gsoMSS, - MaxSize: gsoMaxSize, - L3HdrLen: header.IPv4MaximumHeaderSize, - } - } - if err := c.ep.WritePacket(r, gso, hdr, payload.ToVectorisedView(), proto); err != nil { - t.Fatalf("WritePacket failed: %v", err) - } - - // Read from fd, then compare with what we wrote. - b = make([]byte, mtu) - n, err := syscall.Read(c.fds[0], b) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - b = b[:n] - if gsoMaxSize != 0 { - vnetHdr := *(*virtioNetHdr)(unsafe.Pointer(&b[0])) - if vnetHdr.flags&_VIRTIO_NET_HDR_F_NEEDS_CSUM == 0 { - t.Fatalf("virtioNetHdr.flags %v doesn't contain %v", vnetHdr.flags, _VIRTIO_NET_HDR_F_NEEDS_CSUM) - } - csumStart := header.EthernetMinimumSize + gso.L3HdrLen - if vnetHdr.csumStart != csumStart { - t.Fatalf("vnetHdr.csumStart = %v, want %v", vnetHdr.csumStart, csumStart) - } - if vnetHdr.csumOffset != csumOffset { - t.Fatalf("vnetHdr.csumOffset = %v, want %v", vnetHdr.csumOffset, csumOffset) - } - gsoType := uint8(0) - if int(gso.MSS) < plen { - gsoType = _VIRTIO_NET_HDR_GSO_TCPV6 - } - if vnetHdr.gsoType != gsoType { - t.Fatalf("vnetHdr.gsoType = %v, want %v", vnetHdr.gsoType, gsoType) - } - b = b[virtioNetHdrSize:] - } - if eth { - h := header.Ethernet(b) - b = b[header.EthernetMinimumSize:] - - if a := h.SourceAddress(); a != laddr { - t.Fatalf("SourceAddress() = %v, want %v", a, laddr) - } - - if a := h.DestinationAddress(); a != raddr { - t.Fatalf("DestinationAddress() = %v, want %v", a, raddr) - } - - if et := h.Type(); et != proto { - t.Fatalf("Type() = %v, want %v", et, proto) - } - } - if len(b) != len(want) { - t.Fatalf("Read returned %v bytes, want %v", len(b), len(want)) - } - if !bytes.Equal(b, want) { - t.Fatalf("Read returned %x, want %x", b, want) - } -} - -func TestWritePacket(t *testing.T) { - lengths := []int{0, 100, 1000} - eths := []bool{true, false} - gsos := []uint32{0, 32768} - - for _, eth := range eths { - for _, plen := range lengths { - for _, gso := range gsos { - t.Run( - fmt.Sprintf("Eth=%v,PayloadLen=%v,GSOMaxSize=%v", eth, plen, gso), - func(t *testing.T) { - testWritePacket(t, plen, eth, gso) - }, - ) - } - } - } -} - -func TestPreserveSrcAddress(t *testing.T) { - baddr := tcpip.LinkAddress("\xcc\xbb\xaa\x77\x88\x99") - - c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: true}) - defer c.cleanup() - - // Set LocalLinkAddress in route to the value of the bridged address. - r := &stack.Route{ - RemoteLinkAddress: raddr, - LocalLinkAddress: baddr, - } - - // WritePacket panics given a prependable with anything less than - // the minimum size of the ethernet header. - hdr := buffer.NewPrependable(header.EthernetMinimumSize) - if err := c.ep.WritePacket(r, nil /* gso */, hdr, buffer.VectorisedView{}, proto); err != nil { - t.Fatalf("WritePacket failed: %v", err) - } - - // Read from the FD, then compare with what we wrote. - b := make([]byte, mtu) - n, err := syscall.Read(c.fds[0], b) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - b = b[:n] - h := header.Ethernet(b) - - if a := h.SourceAddress(); a != baddr { - t.Fatalf("SourceAddress() = %v, want %v", a, baddr) - } -} - -func TestDeliverPacket(t *testing.T) { - lengths := []int{100, 1000} - eths := []bool{true, false} - - for _, eth := range eths { - for _, plen := range lengths { - t.Run(fmt.Sprintf("Eth=%v,PayloadLen=%v", eth, plen), func(t *testing.T) { - c := newContext(t, &Options{Address: laddr, MTU: mtu, EthernetHeader: eth}) - defer c.cleanup() - - // Build packet. - b := make([]byte, plen) - all := b - for i := range b { - b[i] = uint8(rand.Intn(256)) - } - - if !eth { - // So that it looks like an IPv4 packet. - b[0] = 0x40 - } else { - hdr := make(header.Ethernet, header.EthernetMinimumSize) - hdr.Encode(&header.EthernetFields{ - SrcAddr: raddr, - DstAddr: laddr, - Type: proto, - }) - all = append(hdr, b...) - } - - // Write packet via the file descriptor. - if _, err := syscall.Write(c.fds[0], all); err != nil { - t.Fatalf("Write failed: %v", err) - } - - // Receive packet through the endpoint. - select { - case pi := <-c.ch: - want := packetInfo{ - raddr: raddr, - proto: proto, - contents: b, - } - if !eth { - want.proto = header.IPv4ProtocolNumber - want.raddr = "" - } - if !reflect.DeepEqual(want, pi) { - t.Fatalf("Unexpected received packet: %+v, want %+v", pi, want) - } - case <-time.After(10 * time.Second): - t.Fatalf("Timed out waiting for packet") - } - }) - } - } -} - -func TestBufConfigMaxLength(t *testing.T) { - got := 0 - for _, i := range BufConfig { - got += i - } - want := header.MaxIPPacketSize // maximum TCP packet size - if got < want { - t.Errorf("total buffer size is invalid: got %d, want >= %d", got, want) - } -} - -func TestBufConfigFirst(t *testing.T) { - // The stack assumes that the TCP/IP header is enterily contained in the first view. - // Therefore, the first view needs to be large enough to contain the maximum TCP/IP - // header, which is 120 bytes (60 bytes for IP + 60 bytes for TCP). - want := 120 - got := BufConfig[0] - if got < want { - t.Errorf("first view has an invalid size: got %d, want >= %d", got, want) - } -} - -var capLengthTestCases = []struct { - comment string - config []int - n int - wantUsed int - wantLengths []int -}{ - { - comment: "Single slice", - config: []int{2}, - n: 1, - wantUsed: 1, - wantLengths: []int{1}, - }, - { - comment: "Multiple slices", - config: []int{1, 2}, - n: 2, - wantUsed: 2, - wantLengths: []int{1, 1}, - }, - { - comment: "Entire buffer", - config: []int{1, 2}, - n: 3, - wantUsed: 2, - wantLengths: []int{1, 2}, - }, - { - comment: "Entire buffer but not on the last slice", - config: []int{1, 2, 3}, - n: 3, - wantUsed: 2, - wantLengths: []int{1, 2, 3}, - }, -} - -func TestReadVDispatcherCapLength(t *testing.T) { - for _, c := range capLengthTestCases { - // fd does not matter for this test. - d := readVDispatcher{fd: -1, e: &endpoint{}} - d.views = make([]buffer.View, len(c.config)) - d.iovecs = make([]syscall.Iovec, len(c.config)) - d.allocateViews(c.config) - - used := d.capViews(c.n, c.config) - if used != c.wantUsed { - t.Errorf("Test %q failed when calling capViews(%d, %v). Got %d. Want %d", c.comment, c.n, c.config, used, c.wantUsed) - } - lengths := make([]int, len(d.views)) - for i, v := range d.views { - lengths[i] = len(v) - } - if !reflect.DeepEqual(lengths, c.wantLengths) { - t.Errorf("Test %q failed when calling capViews(%d, %v). Got %v. Want %v", c.comment, c.n, c.config, lengths, c.wantLengths) - } - } -} - -func TestRecvMMsgDispatcherCapLength(t *testing.T) { - for _, c := range capLengthTestCases { - d := recvMMsgDispatcher{ - fd: -1, // fd does not matter for this test. - e: &endpoint{}, - views: make([][]buffer.View, 1), - iovecs: make([][]syscall.Iovec, 1), - msgHdrs: make([]rawfile.MMsgHdr, 1), - } - - for i, _ := range d.views { - d.views[i] = make([]buffer.View, len(c.config)) - } - for i := range d.iovecs { - d.iovecs[i] = make([]syscall.Iovec, len(c.config)) - } - for k, msgHdr := range d.msgHdrs { - msgHdr.Msg.Iov = &d.iovecs[k][0] - msgHdr.Msg.Iovlen = uint64(len(c.config)) - } - - d.allocateViews(c.config) - - used := d.capViews(0, c.n, c.config) - if used != c.wantUsed { - t.Errorf("Test %q failed when calling capViews(%d, %v). Got %d. Want %d", c.comment, c.n, c.config, used, c.wantUsed) - } - lengths := make([]int, len(d.views[0])) - for i, v := range d.views[0] { - lengths[i] = len(v) - } - if !reflect.DeepEqual(lengths, c.wantLengths) { - t.Errorf("Test %q failed when calling capViews(%d, %v). Got %v. Want %v", c.comment, c.n, c.config, lengths, c.wantLengths) - } - - } -} diff --git a/pkg/tcpip/link/fdbased/fdbased_state_autogen.go b/pkg/tcpip/link/fdbased/fdbased_state_autogen.go new file mode 100755 index 000000000..0555db528 --- /dev/null +++ b/pkg/tcpip/link/fdbased/fdbased_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package fdbased + diff --git a/pkg/tcpip/link/loopback/BUILD b/pkg/tcpip/link/loopback/BUILD deleted file mode 100644 index 710a05ede..000000000 --- a/pkg/tcpip/link/loopback/BUILD +++ /dev/null @@ -1,15 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") - -package(licenses = ["notice"]) - -go_library( - name = "loopback", - srcs = ["loopback.go"], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/loopback", - visibility = ["//:sandbox"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/loopback/loopback_state_autogen.go b/pkg/tcpip/link/loopback/loopback_state_autogen.go new file mode 100755 index 000000000..87ec8cfc7 --- /dev/null +++ b/pkg/tcpip/link/loopback/loopback_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package loopback + diff --git a/pkg/tcpip/link/muxed/BUILD b/pkg/tcpip/link/muxed/BUILD deleted file mode 100644 index 84cfae784..000000000 --- a/pkg/tcpip/link/muxed/BUILD +++ /dev/null @@ -1,33 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package( - licenses = ["notice"], # Apache 2.0 -) - -go_library( - name = "muxed", - srcs = ["injectable.go"], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/muxed", - visibility = [ - "//visibility:public", - ], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/stack", - ], -) - -go_test( - name = "muxed_test", - size = "small", - srcs = ["injectable_test.go"], - embed = [":muxed"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/link/fdbased", - "//pkg/tcpip/network/ipv4", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/muxed/injectable.go b/pkg/tcpip/link/muxed/injectable.go deleted file mode 100644 index be07b7c29..000000000 --- a/pkg/tcpip/link/muxed/injectable.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package muxed provides a muxed link endpoints. -package muxed - -import ( - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -// InjectableEndpoint is an injectable multi endpoint. The endpoint has -// trivial routing rules that determine which InjectableEndpoint a given packet -// will be written to. Note that HandleLocal works differently for this -// endpoint (see WritePacket). -type InjectableEndpoint struct { - routes map[tcpip.Address]stack.InjectableLinkEndpoint - dispatcher stack.NetworkDispatcher -} - -// MTU implements stack.LinkEndpoint. -func (m *InjectableEndpoint) MTU() uint32 { - minMTU := ^uint32(0) - for _, endpoint := range m.routes { - if endpointMTU := endpoint.MTU(); endpointMTU < minMTU { - minMTU = endpointMTU - } - } - return minMTU -} - -// Capabilities implements stack.LinkEndpoint. -func (m *InjectableEndpoint) Capabilities() stack.LinkEndpointCapabilities { - minCapabilities := stack.LinkEndpointCapabilities(^uint(0)) - for _, endpoint := range m.routes { - minCapabilities &= endpoint.Capabilities() - } - return minCapabilities -} - -// MaxHeaderLength implements stack.LinkEndpoint. -func (m *InjectableEndpoint) MaxHeaderLength() uint16 { - minHeaderLen := ^uint16(0) - for _, endpoint := range m.routes { - if headerLen := endpoint.MaxHeaderLength(); headerLen < minHeaderLen { - minHeaderLen = headerLen - } - } - return minHeaderLen -} - -// LinkAddress implements stack.LinkEndpoint. -func (m *InjectableEndpoint) LinkAddress() tcpip.LinkAddress { - return "" -} - -// Attach implements stack.LinkEndpoint. -func (m *InjectableEndpoint) Attach(dispatcher stack.NetworkDispatcher) { - for _, endpoint := range m.routes { - endpoint.Attach(dispatcher) - } - m.dispatcher = dispatcher -} - -// IsAttached implements stack.LinkEndpoint. -func (m *InjectableEndpoint) IsAttached() bool { - return m.dispatcher != nil -} - -// Inject implements stack.InjectableLinkEndpoint. -func (m *InjectableEndpoint) Inject(protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - m.dispatcher.DeliverNetworkPacket(m, "" /* remote */, "" /* local */, protocol, vv) -} - -// WritePacket writes outbound packets to the appropriate LinkInjectableEndpoint -// based on the RemoteAddress. HandleLocal only works if r.RemoteAddress has a -// route registered in this endpoint. -func (m *InjectableEndpoint) WritePacket(r *stack.Route, _ *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { - if endpoint, ok := m.routes[r.RemoteAddress]; ok { - return endpoint.WritePacket(r, nil /* gso */, hdr, payload, protocol) - } - return tcpip.ErrNoRoute -} - -// WriteRawPacket writes outbound packets to the appropriate -// LinkInjectableEndpoint based on the dest address. -func (m *InjectableEndpoint) WriteRawPacket(dest tcpip.Address, packet []byte) *tcpip.Error { - endpoint, ok := m.routes[dest] - if !ok { - return tcpip.ErrNoRoute - } - return endpoint.WriteRawPacket(dest, packet) -} - -// NewInjectableEndpoint creates a new multi-endpoint injectable endpoint. -func NewInjectableEndpoint(routes map[tcpip.Address]stack.InjectableLinkEndpoint) (tcpip.LinkEndpointID, *InjectableEndpoint) { - e := &InjectableEndpoint{ - routes: routes, - } - return stack.RegisterLinkEndpoint(e), e -} diff --git a/pkg/tcpip/link/muxed/injectable_test.go b/pkg/tcpip/link/muxed/injectable_test.go deleted file mode 100644 index 5d40dfacc..000000000 --- a/pkg/tcpip/link/muxed/injectable_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2019 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package muxed - -import ( - "bytes" - "net" - "os" - "syscall" - "testing" - - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/fdbased" - "gvisor.googlesource.com/gvisor/pkg/tcpip/network/ipv4" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -func TestInjectableEndpointRawDispatch(t *testing.T) { - endpoint, sock, dstIP := makeTestInjectableEndpoint(t) - - endpoint.WriteRawPacket(dstIP, []byte{0xFA}) - - buf := make([]byte, ipv4.MaxTotalSize) - bytesRead, err := sock.Read(buf) - if err != nil { - t.Fatalf("Unable to read from socketpair: %v", err) - } - if got, want := buf[:bytesRead], []byte{0xFA}; !bytes.Equal(got, want) { - t.Fatalf("Read %v from the socketpair, wanted %v", got, want) - } -} - -func TestInjectableEndpointDispatch(t *testing.T) { - endpoint, sock, dstIP := makeTestInjectableEndpoint(t) - - hdr := buffer.NewPrependable(1) - hdr.Prepend(1)[0] = 0xFA - packetRoute := stack.Route{RemoteAddress: dstIP} - - endpoint.WritePacket(&packetRoute, nil /* gso */, hdr, - buffer.NewViewFromBytes([]byte{0xFB}).ToVectorisedView(), ipv4.ProtocolNumber) - - buf := make([]byte, 6500) - bytesRead, err := sock.Read(buf) - if err != nil { - t.Fatalf("Unable to read from socketpair: %v", err) - } - if got, want := buf[:bytesRead], []byte{0xFA, 0xFB}; !bytes.Equal(got, want) { - t.Fatalf("Read %v from the socketpair, wanted %v", got, want) - } -} - -func TestInjectableEndpointDispatchHdrOnly(t *testing.T) { - endpoint, sock, dstIP := makeTestInjectableEndpoint(t) - hdr := buffer.NewPrependable(1) - hdr.Prepend(1)[0] = 0xFA - packetRoute := stack.Route{RemoteAddress: dstIP} - endpoint.WritePacket(&packetRoute, nil /* gso */, hdr, - buffer.NewView(0).ToVectorisedView(), ipv4.ProtocolNumber) - buf := make([]byte, 6500) - bytesRead, err := sock.Read(buf) - if err != nil { - t.Fatalf("Unable to read from socketpair: %v", err) - } - if got, want := buf[:bytesRead], []byte{0xFA}; !bytes.Equal(got, want) { - t.Fatalf("Read %v from the socketpair, wanted %v", got, want) - } -} - -func makeTestInjectableEndpoint(t *testing.T) (*InjectableEndpoint, *os.File, tcpip.Address) { - dstIP := tcpip.Address(net.ParseIP("1.2.3.4").To4()) - pair, err := syscall.Socketpair(syscall.AF_UNIX, - syscall.SOCK_SEQPACKET|syscall.SOCK_CLOEXEC|syscall.SOCK_NONBLOCK, 0) - if err != nil { - t.Fatal("Failed to create socket pair:", err) - } - _, underlyingEndpoint := fdbased.NewInjectable(pair[1], 6500, stack.CapabilityNone) - routes := map[tcpip.Address]stack.InjectableLinkEndpoint{dstIP: underlyingEndpoint} - _, endpoint := NewInjectableEndpoint(routes) - return endpoint, os.NewFile(uintptr(pair[0]), "test route end"), dstIP -} diff --git a/pkg/tcpip/link/rawfile/BUILD b/pkg/tcpip/link/rawfile/BUILD deleted file mode 100644 index f01bb2c07..000000000 --- a/pkg/tcpip/link/rawfile/BUILD +++ /dev/null @@ -1,19 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") - -package(licenses = ["notice"]) - -go_library( - name = "rawfile", - srcs = [ - "blockingpoll_amd64.s", - "blockingpoll_amd64_unsafe.go", - "blockingpoll_unsafe.go", - "errors.go", - "rawfile_unsafe.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile", - visibility = [ - "//visibility:public", - ], - deps = ["//pkg/tcpip"], -) diff --git a/pkg/tcpip/link/rawfile/rawfile_state_autogen.go b/pkg/tcpip/link/rawfile/rawfile_state_autogen.go new file mode 100755 index 000000000..662c04444 --- /dev/null +++ b/pkg/tcpip/link/rawfile/rawfile_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package rawfile + diff --git a/pkg/tcpip/link/sharedmem/BUILD b/pkg/tcpip/link/sharedmem/BUILD deleted file mode 100644 index dc8f1543e..000000000 --- a/pkg/tcpip/link/sharedmem/BUILD +++ /dev/null @@ -1,42 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "sharedmem", - srcs = [ - "rx.go", - "sharedmem.go", - "sharedmem_unsafe.go", - "tx.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem", - visibility = [ - "//:sandbox", - ], - deps = [ - "//pkg/log", - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/header", - "//pkg/tcpip/link/rawfile", - "//pkg/tcpip/link/sharedmem/queue", - "//pkg/tcpip/stack", - ], -) - -go_test( - name = "sharedmem_test", - srcs = [ - "sharedmem_test.go", - ], - embed = [":sharedmem"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/header", - "//pkg/tcpip/link/sharedmem/pipe", - "//pkg/tcpip/link/sharedmem/queue", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/sharedmem/pipe/BUILD b/pkg/tcpip/link/sharedmem/pipe/BUILD deleted file mode 100644 index 85deafa38..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/BUILD +++ /dev/null @@ -1,23 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "pipe", - srcs = [ - "pipe.go", - "pipe_unsafe.go", - "rx.go", - "tx.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe", - visibility = ["//:sandbox"], -) - -go_test( - name = "pipe_test", - srcs = [ - "pipe_test.go", - ], - embed = [":pipe"], -) diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe.go b/pkg/tcpip/link/sharedmem/pipe/pipe.go deleted file mode 100644 index 74c9f0311..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package pipe implements a shared memory ring buffer on which a single reader -// and a single writer can operate (read/write) concurrently. The ring buffer -// allows for data of different sizes to be written, and preserves the boundary -// of the written data. -// -// Example usage is as follows: -// -// wb := t.Push(20) -// // Write data to wb. -// t.Flush() -// -// rb := r.Pull() -// // Do something with data in rb. -// t.Flush() -package pipe - -import ( - "math" -) - -const ( - jump uint64 = math.MaxUint32 + 1 - offsetMask uint64 = math.MaxUint32 - revolutionMask uint64 = ^offsetMask - - sizeOfSlotHeader = 8 // sizeof(uint64) - slotFree uint64 = 1 << 63 - slotSizeMask uint64 = math.MaxUint32 -) - -// payloadToSlotSize calculates the total size of a slot based on its payload -// size. The total size is the header size, plus the payload size, plus padding -// if necessary to make the total size a multiple of sizeOfSlotHeader. -func payloadToSlotSize(payloadSize uint64) uint64 { - s := sizeOfSlotHeader + payloadSize - return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1) -} - -// slotToPayloadSize calculates the payload size of a slot based on the total -// size of the slot. This is only meant to be used when creating slots that -// don't carry information (e.g., free slots or wrap slots). -func slotToPayloadSize(offset uint64) uint64 { - return offset - sizeOfSlotHeader -} - -// pipe is a basic data structure used by both (transmit & receive) ends of a -// pipe. Indices into this pipe are split into two fields: offset, which counts -// the number of bytes from the beginning of the buffer, and revolution, which -// counts the number of times the index has wrapped around. -type pipe struct { - buffer []byte -} - -// init initializes the pipe buffer such that its size is a multiple of the size -// of the slot header. -func (p *pipe) init(b []byte) { - p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)] -} - -// data returns a section of the buffer starting at the given index (which may -// include revolution information) and with the given size. -func (p *pipe) data(idx uint64, size uint64) []byte { - return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size] -} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go b/pkg/tcpip/link/sharedmem/pipe/pipe_test.go deleted file mode 100644 index 59ef69a8b..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe_test.go +++ /dev/null @@ -1,517 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "math/rand" - "reflect" - "runtime" - "sync" - "testing" -) - -func TestSimpleReadWrite(t *testing.T) { - // Check that a simple write can be properly read from the rx side. - tr := rand.New(rand.NewSource(99)) - rr := rand.New(rand.NewSource(99)) - - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - wb := tx.Push(10) - if wb == nil { - t.Fatalf("Push failed on empty pipe") - } - for i := range wb { - wb[i] = byte(tr.Intn(256)) - } - tx.Flush() - - var rx Rx - rx.Init(b) - rb := rx.Pull() - if len(rb) != 10 { - t.Fatalf("Bad buffer size returned: got %v, want %v", len(rb), 10) - } - - for i := range rb { - if v := byte(rr.Intn(256)); v != rb[i] { - t.Fatalf("Bad read buffer at index %v: got %v, want %v", i, rb[i], v) - } - } - rx.Flush() -} - -func TestEmptyRead(t *testing.T) { - // Check that pulling from an empty pipe fails. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestTooLargeWrite(t *testing.T) { - // Check that writes that are too large are properly rejected. - b := make([]byte, 96) - var tx Tx - tx.Init(b) - - if wb := tx.Push(96); wb != nil { - t.Fatalf("Write of 96 bytes succeeded on 96-byte pipe") - } - - if wb := tx.Push(88); wb != nil { - t.Fatalf("Write of 88 bytes succeeded on 96-byte pipe") - } - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } -} - -func TestFullWrite(t *testing.T) { - // Check that writes fail when the pipe is full. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } - - if wb := tx.Push(1); wb != nil { - t.Fatalf("Write succeeded on full pipe") - } -} - -func TestFullAndFlushedWrite(t *testing.T) { - // Check that writes fail when the pipe is full and has already been - // flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(80); wb == nil { - t.Fatalf("Write of 80 bytes failed on 96-byte pipe") - } - - tx.Flush() - - if wb := tx.Push(1); wb != nil { - t.Fatalf("Write succeeded on full pipe") - } -} - -func TestTxFlushTwice(t *testing.T) { - // Checks that a second consecutive tx flush is a no-op. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - // Make copy of original tx queue, flush it, then check that it didn't - // change. - orig := tx - tx.Flush() - - if !reflect.DeepEqual(orig, tx) { - t.Fatalf("Flush mutated tx pipe: got %v, want %v", tx, orig) - } -} - -func TestRxFlushTwice(t *testing.T) { - // Checks that a second consecutive rx flush is a no-op. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // Make copy of original rx queue, flush it, then check that it didn't - // change. - orig := rx - rx.Flush() - - if !reflect.DeepEqual(orig, rx) { - t.Fatalf("Flush mutated rx pipe: got %v, want %v", rx, orig) - } -} - -func TestWrapInMiddleOfTransaction(t *testing.T) { - // Check that writes are not flushed when we need to wrap the buffer - // around. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - // We haven't flushed yet, so pull must return nil. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - tx.Flush() - - // The two buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } -} - -func TestWriteAbort(t *testing.T) { - // Check that a read fails on a pipe that has had data pushed to it but - // has aborted the push. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(10); wb == nil { - t.Fatalf("Write failed on empty pipe") - } - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } - - tx.Abort() - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestWrappedWriteAbort(t *testing.T) { - // Check that writes are properly aborted even if the writes wrap - // around. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - // We haven't flushed yet, so pull must return nil. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - tx.Abort() - - // The pushes were aborted, so no data should be readable. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on non-flushed pipe") - } - - // Try the same transactions again, but flush this time. - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - tx.Flush() - - // The two buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } -} - -func TestEmptyReadOnNonFlushedWrite(t *testing.T) { - // Check that a read fails on a pipe that has had data pushed to it - // but not yet flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(10); wb == nil { - t.Fatalf("Write failed on empty pipe") - } - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } - - tx.Flush() - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull on failed on non-empty pipe") - } -} - -func TestPullAfterPullingEntirePipe(t *testing.T) { - // Check that Pull fails when the pipe is full, but all of it has - // already been pulled but not yet flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 3 - // buffers that will fill the pipe. - if wb := tx.Push(10); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - if wb := tx.Push(20); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - if wb := tx.Push(24); wb == nil { - t.Fatalf("Push failed on non-full pipe") - } - - tx.Flush() - - // The three buffers must be available now. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - - // Fourth pull must fail. - if rb := rx.Pull(); rb != nil { - t.Fatalf("Pull succeeded on empty pipe") - } -} - -func TestNoRoomToWrapOnPush(t *testing.T) { - // Check that Push fails when it tries to allocate room to add a wrap - // message. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - var rx Rx - rx.Init(b) - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // At this point the ring buffer is empty, but the write is at offset - // 64 (50 + sizeOfSlotHeader + padding-for-8-byte-alignment). Write 20, - // which won't fit (64+20+8+padding = 96, which wouldn't leave room for - // the padding), so it wraps around. - if wb := tx.Push(20); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - - tx.Flush() - - // Buffer offset is at 28. Try to write 70, which would require a wrap - // slot which cannot be created now. - if wb := tx.Push(70); wb != nil { - t.Fatalf("Push succeeded on pipe with no room for wrap message") - } -} - -func TestRxImplicitFlushOfWrapMessage(t *testing.T) { - // Check if the first read is that of a wrapping message, that it gets - // immediately flushed. - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - if wb := tx.Push(50); wb == nil { - t.Fatalf("Push failed on empty pipe") - } - tx.Flush() - - // This will cause a wrapping message to written. - if wb := tx.Push(60); wb != nil { - t.Fatalf("Push succeeded when there is no room in pipe") - } - - var rx Rx - rx.Init(b) - - // Read the first message. - if rb := rx.Pull(); rb == nil { - t.Fatalf("Pull failed on non-empty pipe") - } - rx.Flush() - - // This should fail because of the wrapping message is taking up space. - if wb := tx.Push(60); wb != nil { - t.Fatalf("Push succeeded when there is no room in pipe") - } - - // Try to read the next one. This should consume the wrapping message. - rx.Pull() - - // This must now succeed. - if wb := tx.Push(60); wb == nil { - t.Fatalf("Push failed on empty pipe") - } -} - -func TestConcurrentReaderWriter(t *testing.T) { - // Push a million buffers of random sizes and random contents. Check - // that buffers read match what was written. - tr := rand.New(rand.NewSource(99)) - rr := rand.New(rand.NewSource(99)) - - b := make([]byte, 100) - var tx Tx - tx.Init(b) - - var rx Rx - rx.Init(b) - - const count = 1000000 - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - runtime.Gosched() - for i := 0; i < count; i++ { - n := 1 + tr.Intn(80) - wb := tx.Push(uint64(n)) - for wb == nil { - wb = tx.Push(uint64(n)) - } - - for j := range wb { - wb[j] = byte(tr.Intn(256)) - } - - tx.Flush() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - runtime.Gosched() - for i := 0; i < count; i++ { - n := 1 + rr.Intn(80) - rb := rx.Pull() - for rb == nil { - rb = rx.Pull() - } - - if n != len(rb) { - t.Fatalf("Bad %v-th buffer length: got %v, want %v", i, len(rb), n) - } - - for j := range rb { - if v := byte(rr.Intn(256)); v != rb[j] { - t.Fatalf("Bad %v-th read buffer at index %v: got %v, want %v", i, j, rb[j], v) - } - } - - rx.Flush() - } - }() - - wg.Wait() -} diff --git a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go b/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go deleted file mode 100644 index 62d17029e..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/pipe_unsafe.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -import ( - "sync/atomic" - "unsafe" -) - -func (p *pipe) write(idx uint64, v uint64) { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - *ptr = v -} - -func (p *pipe) writeAtomic(idx uint64, v uint64) { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - atomic.StoreUint64(ptr, v) -} - -func (p *pipe) readAtomic(idx uint64) uint64 { - ptr := (*uint64)(unsafe.Pointer(&p.buffer[idx&offsetMask:][:8][0])) - return atomic.LoadUint64(ptr) -} diff --git a/pkg/tcpip/link/sharedmem/pipe/rx.go b/pkg/tcpip/link/sharedmem/pipe/rx.go deleted file mode 100644 index f22e533ac..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/rx.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Rx is the receive side of the shared memory ring buffer. -type Rx struct { - p pipe - - tail uint64 - head uint64 -} - -// Init initializes the receive end of the pipe. In the initial state, the next -// slot to be inspected is the very first one. -func (r *Rx) Init(b []byte) { - r.p.init(b) - r.tail = 0xfffffffe * jump - r.head = r.tail -} - -// Pull reads the next buffer from the pipe, returning nil if there isn't one -// currently available. -// -// The returned slice is available until Flush() is next called. After that, it -// must not be touched. -func (r *Rx) Pull() []byte { - if r.head == r.tail+jump { - // We've already pulled the whole pipe. - return nil - } - - header := r.p.readAtomic(r.head) - if header&slotFree != 0 { - // The next slot is free, we can't pull it yet. - return nil - } - - payloadSize := header & slotSizeMask - newHead := r.head + payloadToSlotSize(payloadSize) - headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer)) - - // Check if this is a wrapping slot. If that's the case, it carries no - // data, so we just skip it and try again from the first slot. - if int64(newHead-headWrap) >= 0 { - if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 { - return nil - } - - if r.tail == r.head { - // If this is the first pull since the last Flush() - // call, we flush the state so that the sender can use - // this space if it needs to. - r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head)) - r.tail = newHead - } - - r.head = newHead - return r.Pull() - } - - // Grab the buffer before updating r.head. - b := r.p.data(r.head, payloadSize) - r.head = newHead - return b -} - -// Flush tells the transmitter that all buffers pulled since the last Flush() -// have been used, so the transmitter is free to used their slots for further -// transmission. -func (r *Rx) Flush() { - if r.head == r.tail { - return - } - r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail)) - r.tail = r.head -} - -// Bytes returns the byte slice on which the pipe operates. -func (r *Rx) Bytes() []byte { - return r.p.buffer -} diff --git a/pkg/tcpip/link/sharedmem/pipe/tx.go b/pkg/tcpip/link/sharedmem/pipe/tx.go deleted file mode 100644 index 9841eb231..000000000 --- a/pkg/tcpip/link/sharedmem/pipe/tx.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipe - -// Tx is the transmit side of the shared memory ring buffer. -type Tx struct { - p pipe - maxPayloadSize uint64 - - head uint64 - tail uint64 - next uint64 - - tailHeader uint64 -} - -// Init initializes the transmit end of the pipe. In the initial state, the next -// slot to be written is the very first one, and the transmitter has the whole -// ring buffer available to it. -func (t *Tx) Init(b []byte) { - t.p.init(b) - // maxPayloadSize excludes the header of the payload, and the header - // of the wrapping message. - t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader - t.tail = 0xfffffffe * jump - t.next = t.tail - t.head = t.tail + jump - t.p.write(t.tail, slotFree) -} - -// Capacity determines how many records of the given size can be written to the -// pipe before it fills up. -func (t *Tx) Capacity(recordSize uint64) uint64 { - available := uint64(len(t.p.buffer)) - sizeOfSlotHeader - entryLen := payloadToSlotSize(recordSize) - return available / entryLen -} - -// Push reserves "payloadSize" bytes for transmission in the pipe. The caller -// populates the returned slice with the data to be transferred and enventually -// calls Flush() to make the data visible to the reader, or Abort() to make the -// pipe forget all Push() calls since the last Flush(). -// -// The returned slice is available until Flush() or Abort() is next called. -// After that, it must not be touched. -func (t *Tx) Push(payloadSize uint64) []byte { - // Fail request if we know we will never have enough room. - if payloadSize > t.maxPayloadSize { - return nil - } - - totalLen := payloadToSlotSize(payloadSize) - newNext := t.next + totalLen - nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer)) - if int64(newNext-nextWrap) >= 0 { - // The new buffer would overflow the pipe, so we push a wrapping - // slot, then try to add the actual slot to the front of the - // pipe. - newNext = (newNext & revolutionMask) + jump - wrappingPayloadSize := slotToPayloadSize(newNext - t.next) - if !t.reclaim(newNext) { - return nil - } - - oldNext := t.next - t.next = newNext - if oldNext != t.tail { - t.p.write(oldNext, wrappingPayloadSize) - } else { - t.tailHeader = wrappingPayloadSize - t.Flush() - } - - newNext += totalLen - } - - // Check that we have enough room for the buffer. - if !t.reclaim(newNext) { - return nil - } - - if t.next != t.tail { - t.p.write(t.next, payloadSize) - } else { - t.tailHeader = payloadSize - } - - // Grab the buffer before updating t.next. - b := t.p.data(t.next, payloadSize) - t.next = newNext - - return b -} - -// reclaim attempts to advance the head until at least newNext. If the head is -// already at or beyond newNext, nothing happens and true is returned; otherwise -// it tries to reclaim slots that have already been consumed by the receive end -// of the pipe (they will be marked as free) and returns a boolean indicating -// whether it was successful in reclaiming enough slots. -func (t *Tx) reclaim(newNext uint64) bool { - for int64(newNext-t.head) > 0 { - // Can't reclaim if slot is not free. - header := t.p.readAtomic(t.head) - if header&slotFree == 0 { - return false - } - - payloadSize := header & slotSizeMask - newHead := t.head + payloadToSlotSize(payloadSize) - - // Check newHead is within bounds and valid. - if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) { - return false - } - - t.head = newHead - } - - return true -} - -// Abort causes all Push() calls since the last Flush() to be forgotten and -// therefore they will not be made visible to the receiver. -func (t *Tx) Abort() { - t.next = t.tail -} - -// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever -// is the most recent] to be made visible to the receiver. -func (t *Tx) Flush() { - if t.next == t.tail { - // Nothing to do if there are no pushed buffers. - return - } - - if t.next != t.head { - // The receiver will spin in t.next, so we must make sure that - // the slotFree bit is set. - t.p.write(t.next, slotFree) - } - - t.p.writeAtomic(t.tail, t.tailHeader) - t.tail = t.next -} - -// Bytes returns the byte slice on which the pipe operates. -func (t *Tx) Bytes() []byte { - return t.p.buffer -} diff --git a/pkg/tcpip/link/sharedmem/queue/BUILD b/pkg/tcpip/link/sharedmem/queue/BUILD deleted file mode 100644 index d7dc631eb..000000000 --- a/pkg/tcpip/link/sharedmem/queue/BUILD +++ /dev/null @@ -1,28 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "queue", - srcs = [ - "rx.go", - "tx.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue", - visibility = ["//:sandbox"], - deps = [ - "//pkg/log", - "//pkg/tcpip/link/sharedmem/pipe", - ], -) - -go_test( - name = "queue_test", - srcs = [ - "queue_test.go", - ], - embed = [":queue"], - deps = [ - "//pkg/tcpip/link/sharedmem/pipe", - ], -) diff --git a/pkg/tcpip/link/sharedmem/queue/queue_test.go b/pkg/tcpip/link/sharedmem/queue/queue_test.go deleted file mode 100644 index d3f8f4b8b..000000000 --- a/pkg/tcpip/link/sharedmem/queue/queue_test.go +++ /dev/null @@ -1,517 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package queue - -import ( - "encoding/binary" - "reflect" - "testing" - - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -func TestBasicTxQueue(t *testing.T) { - // Tests that a basic transmit on a queue works, and that completion - // gets properly reported as well. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Enqueue two buffers. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue failed on empty queue") - } - - // Check the contents of the pipe. - d := rxp.Pull() - if d == nil { - t.Fatalf("Tx pipe is empty after Enqueue") - } - - want := []byte{ - 234, 3, 0, 0, 0, 0, 0, 0, // id - 100, 0, 0, 0, // total size - 0, 0, 0, 0, // reserved - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - } - - if !reflect.DeepEqual(want, d) { - t.Fatalf("Bad posted packet: got %v, want %v", d, want) - } - - rxp.Flush() - - // Check that there are no completions yet. - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Packet reported as completed too soon") - } - - // Post a completion. - d = txp.Push(8) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - binary.LittleEndian.PutUint64(d, usedID) - txp.Flush() - - // Check that completion is properly reported. - id, ok := q.CompletedPacket() - if !ok { - t.Fatalf("Completion not reported") - } - - if id != usedID { - t.Fatalf("Bad completion id: got %v, want %v", id, usedID) - } -} - -func TestBasicRxQueue(t *testing.T) { - // Tests that a basic receive on a queue works. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post two buffers. - b := []RxBuffer{ - {100, 60, 1077, 0}, - {200, 40, 2123, 0}, - } - - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on empty queue") - } - - // Check the contents of the pipe. - want := [][]byte{ - { - 100, 0, 0, 0, 0, 0, 0, 0, // Offset1 - 60, 0, 0, 0, // Size1 - 0, 0, 0, 0, // Remaining in group 1 - 0, 0, 0, 0, 0, 0, 0, 0, // User data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - }, - { - 200, 0, 0, 0, 0, 0, 0, 0, // Offset2 - 40, 0, 0, 0, // Size2 - 0, 0, 0, 0, // Remaining in group 2 - 0, 0, 0, 0, 0, 0, 0, 0, // User data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }, - } - - for i := range b { - d := rxp.Pull() - if d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - - if !reflect.DeepEqual(want[i], d) { - t.Fatalf("Bad posted packet: got %v, want %v", d, want[i]) - } - - rxp.Flush() - } - - // Check that there are no completions. - if _, n := q.Dequeue(nil); n != 0 { - t.Fatalf("Packet reported as received too soon") - } - - // Post a completion. - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - - // Check that completion is properly reported. - bufs, n := q.Dequeue(nil) - if n != 100 { - t.Fatalf("Bad packet size: got %v, want %v", n, 100) - } - - if !reflect.DeepEqual(bufs, b) { - t.Fatalf("Bad returned buffers: got %v, want %v", bufs, b) - } -} - -func TestBadTxCompletion(t *testing.T) { - // Check that tx completions with bad sizes are properly ignored. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Post a completion that is too short, and check that it is ignored. - if d := txp.Push(7); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion that is too long, and check that it is ignored. - if d := txp.Push(10); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if _, ok := q.CompletedPacket(); ok { - t.Fatalf("Bad completion not ignored") - } -} - -func TestBadRxCompletion(t *testing.T) { - // Check that bad rx completions are properly ignored. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post a completion that is too short, and check that it is ignored. - if d := txp.Push(7); d == nil { - t.Fatalf("Unable to push to rx pipe") - } - txp.Flush() - - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion whose buffer sizes add up to less than the total - // size. - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 10, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 10, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } - - // Post a completion whose buffer sizes will cause a 32-bit overflow, - // but adds up to the right number. - d = txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 255, 255, 255, 255, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 101, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - if b, _ := q.Dequeue(nil); b != nil { - t.Fatalf("Bad completion not ignored") - } -} - -func TestFillTxPipe(t *testing.T) { - // Check that transmitting a new buffer when the buffer pipe is full - // fails gracefully. - pb1 := make([]byte, 104) - pb2 := make([]byte, 104) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Transmit twice, which should fill the tx pipe. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - for i := uint64(0); i < 2; i++ { - if !q.Enqueue(usedID+i, usedTotalSize, 2, &b[0]) { - t.Fatalf("Failed to transmit buffer") - } - } - - // Transmit another packet now that the tx pipe is full. - if q.Enqueue(usedID+2, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue succeeded when tx pipe is full") - } -} - -func TestFillRxPipe(t *testing.T) { - // Check that posting a new buffer when the buffer pipe is full fails - // gracefully. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Post a buffer twice, it should fill the tx pipe. - b := []RxBuffer{ - {100, 60, 1077, 0}, - } - - for i := 0; i < 2; i++ { - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on non-full queue") - } - } - - // Post another buffer now that the tx pipe is full. - if q.PostBuffers(b) { - t.Fatalf("PostBuffers succeeded on full queue") - } -} - -func TestLotsOfTransmissions(t *testing.T) { - // Make sure pipes are being properly flushed when transmitting packets. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Tx - q.Init(pb1, pb2) - - // Prepare packet with two buffers. - b := []TxBuffer{ - {nil, 100, 60}, - {nil, 200, 40}, - } - - b[0].Next = &b[1] - - const usedID = 1002 - const usedTotalSize = 100 - - // Post 100000 packets and completions. - for i := 100000; i > 0; i-- { - if !q.Enqueue(usedID, usedTotalSize, 2, &b[0]) { - t.Fatalf("Enqueue failed on non-full queue") - } - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after Enqueue") - } - rxp.Flush() - - d := txp.Push(8) - if d == nil { - t.Fatalf("Unable to write to rx pipe") - } - binary.LittleEndian.PutUint64(d, usedID) - txp.Flush() - if _, ok := q.CompletedPacket(); !ok { - t.Fatalf("Completion not returned") - } - } -} - -func TestLotsOfReceptions(t *testing.T) { - // Make sure pipes are being properly flushed when receiving packets. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var rxp pipe.Rx - rxp.Init(pb1) - - var txp pipe.Tx - txp.Init(pb2) - - var q Rx - q.Init(pb1, pb2, nil) - - // Prepare for posting two buffers. - b := []RxBuffer{ - {100, 60, 1077, 0}, - {200, 40, 2123, 0}, - } - - // Post 100000 buffers and completions. - for i := 100000; i > 0; i-- { - if !q.PostBuffers(b) { - t.Fatalf("PostBuffers failed on non-full queue") - } - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - rxp.Flush() - - if d := rxp.Pull(); d == nil { - t.Fatalf("Tx pipe is empty after PostBuffers") - } - rxp.Flush() - - d := txp.Push(sizeOfConsumedPacketHeader + 2*sizeOfConsumedBuffer) - if d == nil { - t.Fatalf("Unable to push to rx pipe") - } - - copy(d, []byte{ - 100, 0, 0, 0, // packet size - 0, 0, 0, 0, // reserved - - 100, 0, 0, 0, 0, 0, 0, 0, // offset 1 - 60, 0, 0, 0, // size 1 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 1 - 53, 4, 0, 0, 0, 0, 0, 0, // ID 1 - - 200, 0, 0, 0, 0, 0, 0, 0, // offset 2 - 40, 0, 0, 0, // size 2 - 0, 0, 0, 0, 0, 0, 0, 0, // user data 2 - 75, 8, 0, 0, 0, 0, 0, 0, // ID 2 - }) - - txp.Flush() - - if _, n := q.Dequeue(nil); n == 0 { - t.Fatalf("Dequeue failed when there is a completion") - } - } -} - -func TestRxEnableNotification(t *testing.T) { - // Check that enabling nofifications results in properly updated state. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var state uint32 - var q Rx - q.Init(pb1, pb2, &state) - - q.EnableNotification() - if state != eventFDEnabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDEnabled) - } -} - -func TestRxDisableNotification(t *testing.T) { - // Check that disabling nofifications results in properly updated state. - pb1 := make([]byte, 100) - pb2 := make([]byte, 100) - - var state uint32 - var q Rx - q.Init(pb1, pb2, &state) - - q.DisableNotification() - if state != eventFDDisabled { - t.Fatalf("Bad value in shared state: got %v, want %v", state, eventFDDisabled) - } -} diff --git a/pkg/tcpip/link/sharedmem/queue/rx.go b/pkg/tcpip/link/sharedmem/queue/rx.go deleted file mode 100644 index d9aecf2d9..000000000 --- a/pkg/tcpip/link/sharedmem/queue/rx.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package queue provides the implementation of transmit and receive queues -// based on shared memory ring buffers. -package queue - -import ( - "encoding/binary" - "sync/atomic" - - "gvisor.googlesource.com/gvisor/pkg/log" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -const ( - // Offsets within a posted buffer. - postedOffset = 0 - postedSize = 8 - postedRemainingInGroup = 12 - postedUserData = 16 - postedID = 24 - - sizeOfPostedBuffer = 32 - - // Offsets within a received packet header. - consumedPacketSize = 0 - consumedPacketReserved = 4 - - sizeOfConsumedPacketHeader = 8 - - // Offsets within a consumed buffer. - consumedOffset = 0 - consumedSize = 8 - consumedUserData = 12 - consumedID = 20 - - sizeOfConsumedBuffer = 28 - - // The following are the allowed states of the shared data area. - eventFDUninitialized = 0 - eventFDDisabled = 1 - eventFDEnabled = 2 -) - -// RxBuffer is the descriptor of a receive buffer. -type RxBuffer struct { - Offset uint64 - Size uint32 - ID uint64 - UserData uint64 -} - -// Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx -// pipe is used to "post" buffers, while the rx pipe is used to receive packets -// whose contents have been written to previously posted buffers. -// -// This struct is thread-compatible. -type Rx struct { - tx pipe.Tx - rx pipe.Rx - sharedEventFDState *uint32 -} - -// Init initializes the receive queue with the given pipes, and shared state -// pointer -- the latter is used to enable/disable eventfd notifications. -func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { - r.sharedEventFDState = sharedEventFDState - r.tx.Init(tx) - r.rx.Init(rx) -} - -// EnableNotification updates the shared state such that the peer will notify -// the eventfd when there are packets to be dequeued. -func (r *Rx) EnableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) -} - -// DisableNotification updates the shared state such that the peer will not -// notify the eventfd. -func (r *Rx) DisableNotification() { - atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) -} - -// PostedBuffersLimit returns the maximum number of buffers that can be posted -// before the tx queue fills up. -func (r *Rx) PostedBuffersLimit() uint64 { - return r.tx.Capacity(sizeOfPostedBuffer) -} - -// PostBuffers makes the given buffers available for receiving data from the -// peer. Once they are posted, the peer is free to write to them and will -// eventually post them back for consumption. -func (r *Rx) PostBuffers(buffers []RxBuffer) bool { - for i := range buffers { - b := r.tx.Push(sizeOfPostedBuffer) - if b == nil { - r.tx.Abort() - return false - } - - pb := &buffers[i] - binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) - binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) - binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) - binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) - binary.LittleEndian.PutUint64(b[postedID:], pb.ID) - } - - r.tx.Flush() - - return true -} - -// Dequeue receives buffers that have been previously posted by PostBuffers() -// and that have been filled by the peer and posted back. -// -// This is similar to append() in that new buffers are appended to "bufs", with -// reallocation only if "bufs" doesn't have enough capacity. -func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { - for { - outBufs := bufs - - // Pull the next descriptor from the rx pipe. - b := r.rx.Pull() - if b == nil { - return bufs, 0 - } - - if len(b) < sizeOfConsumedPacketHeader { - log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) - r.rx.Flush() - continue - } - - totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) - - // Calculate the number of buffer descriptors and copy them - // over to the output. - count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer - offset := sizeOfConsumedPacketHeader - buffersSize := uint32(0) - for i := count; i > 0; i-- { - s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) - buffersSize += s - if buffersSize < s { - // The buffer size overflows an unsigned 32-bit - // integer, so break out and force it to be - // ignored. - totalDataSize = 1 - buffersSize = 0 - break - } - - outBufs = append(outBufs, RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), - Size: s, - ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), - }) - - offset += sizeOfConsumedBuffer - } - - r.rx.Flush() - - if buffersSize < totalDataSize { - // The descriptor is corrupted, ignore it. - log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) - continue - } - - return outBufs, totalDataSize - } -} - -// Bytes returns the byte slices on which the queue operates. -func (r *Rx) Bytes() (tx, rx []byte) { - return r.tx.Bytes(), r.rx.Bytes() -} - -// DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. -func DecodeRxBufferHeader(b []byte) RxBuffer { - return RxBuffer{ - Offset: binary.LittleEndian.Uint64(b[postedOffset:]), - Size: binary.LittleEndian.Uint32(b[postedSize:]), - ID: binary.LittleEndian.Uint64(b[postedID:]), - UserData: binary.LittleEndian.Uint64(b[postedUserData:]), - } -} - -// RxCompletionSize returns the number of bytes needed to encode an rx -// completion containing "count" buffers. -func RxCompletionSize(count int) uint64 { - return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer -} - -// EncodeRxCompletion encodes an rx completion header. -func EncodeRxCompletion(b []byte, size, reserved uint32) { - binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) - binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) -} - -// EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. -func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { - b = b[RxCompletionSize(i):] - binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) - binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) - binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) - binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) -} diff --git a/pkg/tcpip/link/sharedmem/queue/tx.go b/pkg/tcpip/link/sharedmem/queue/tx.go deleted file mode 100644 index a24dccd11..000000000 --- a/pkg/tcpip/link/sharedmem/queue/tx.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package queue - -import ( - "encoding/binary" - - "gvisor.googlesource.com/gvisor/pkg/log" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" -) - -const ( - // Offsets within a packet header. - packetID = 0 - packetSize = 8 - packetReserved = 12 - - sizeOfPacketHeader = 16 - - // Offsets with a buffer descriptor - bufferOffset = 0 - bufferSize = 8 - - sizeOfBufferDescriptor = 12 -) - -// TxBuffer is the descriptor of a transmit buffer. -type TxBuffer struct { - Next *TxBuffer - Offset uint64 - Size uint32 -} - -// Tx is a transmit queue. It is implemented with one tx and one rx pipe: the -// tx pipe is used to request the transmission of packets, while the rx pipe -// is used to receive which transmissions have completed. -// -// This struct is thread-compatible. -type Tx struct { - tx pipe.Tx - rx pipe.Rx -} - -// Init initializes the transmit queue with the given pipes. -func (t *Tx) Init(tx, rx []byte) { - t.tx.Init(tx) - t.rx.Init(rx) -} - -// Enqueue queues the given linked list of buffers for transmission as one -// packet. While it is queued, the caller must not modify them. -func (t *Tx) Enqueue(id uint64, totalDataLen, bufferCount uint32, buffer *TxBuffer) bool { - // Reserve room in the tx pipe. - totalLen := sizeOfPacketHeader + uint64(bufferCount)*sizeOfBufferDescriptor - - b := t.tx.Push(totalLen) - if b == nil { - return false - } - - // Initialize the packet and buffer descriptors. - binary.LittleEndian.PutUint64(b[packetID:], id) - binary.LittleEndian.PutUint32(b[packetSize:], totalDataLen) - binary.LittleEndian.PutUint32(b[packetReserved:], 0) - - offset := sizeOfPacketHeader - for i := bufferCount; i != 0; i-- { - binary.LittleEndian.PutUint64(b[offset+bufferOffset:], buffer.Offset) - binary.LittleEndian.PutUint32(b[offset+bufferSize:], buffer.Size) - offset += sizeOfBufferDescriptor - buffer = buffer.Next - } - - t.tx.Flush() - - return true -} - -// CompletedPacket returns the id of the last completed transmission. The -// returned id, if any, refers to a value passed on a previous call to -// Enqueue(). -func (t *Tx) CompletedPacket() (id uint64, ok bool) { - for { - b := t.rx.Pull() - if b == nil { - return 0, false - } - - if len(b) != 8 { - t.rx.Flush() - log.Warningf("Ignoring completed packet: size (%v) is less than expected (%v)", len(b), 8) - continue - } - - v := binary.LittleEndian.Uint64(b) - - t.rx.Flush() - - return v, true - } -} - -// Bytes returns the byte slices on which the queue operates. -func (t *Tx) Bytes() (tx, rx []byte) { - return t.tx.Bytes(), t.rx.Bytes() -} - -// TxPacketInfo holds information about a packet sent on a tx queue. -type TxPacketInfo struct { - ID uint64 - Size uint32 - Reserved uint32 - BufferCount int -} - -// DecodeTxPacketHeader decodes the header of a packet sent over a tx queue. -func DecodeTxPacketHeader(b []byte) TxPacketInfo { - return TxPacketInfo{ - ID: binary.LittleEndian.Uint64(b[packetID:]), - Size: binary.LittleEndian.Uint32(b[packetSize:]), - Reserved: binary.LittleEndian.Uint32(b[packetReserved:]), - BufferCount: (len(b) - sizeOfPacketHeader) / sizeOfBufferDescriptor, - } -} - -// DecodeTxBufferHeader decodes the header of the i-th buffer of a packet sent -// over a tx queue. -func DecodeTxBufferHeader(b []byte, i int) TxBuffer { - b = b[sizeOfPacketHeader+i*sizeOfBufferDescriptor:] - return TxBuffer{ - Offset: binary.LittleEndian.Uint64(b[bufferOffset:]), - Size: binary.LittleEndian.Uint32(b[bufferSize:]), - } -} - -// EncodeTxCompletion encodes a tx completion header. -func EncodeTxCompletion(b []byte, id uint64) { - binary.LittleEndian.PutUint64(b, id) -} diff --git a/pkg/tcpip/link/sharedmem/rx.go b/pkg/tcpip/link/sharedmem/rx.go deleted file mode 100644 index 215cb607f..000000000 --- a/pkg/tcpip/link/sharedmem/rx.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build linux - -package sharedmem - -import ( - "sync/atomic" - "syscall" - - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/rawfile" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" -) - -// rx holds all state associated with an rx queue. -type rx struct { - data []byte - sharedData []byte - q queue.Rx - eventFD int -} - -// init initializes all state needed by the rx queue based on the information -// provided. -// -// The caller always retains ownership of all file descriptors passed in. The -// queue implementation will duplicate any that it may need in the future. -func (r *rx) init(mtu uint32, c *QueueConfig) error { - // Map in all buffers. - txPipe, err := getBuffer(c.TxPipeFD) - if err != nil { - return err - } - - rxPipe, err := getBuffer(c.RxPipeFD) - if err != nil { - syscall.Munmap(txPipe) - return err - } - - data, err := getBuffer(c.DataFD) - if err != nil { - syscall.Munmap(txPipe) - syscall.Munmap(rxPipe) - return err - } - - sharedData, err := getBuffer(c.SharedDataFD) - if err != nil { - syscall.Munmap(txPipe) - syscall.Munmap(rxPipe) - syscall.Munmap(data) - return err - } - - // Duplicate the eventFD so that caller can close it but we can still - // use it. - efd, err := syscall.Dup(c.EventFD) - if err != nil { - syscall.Munmap(txPipe) - syscall.Munmap(rxPipe) - syscall.Munmap(data) - syscall.Munmap(sharedData) - return err - } - - // Set the eventfd as non-blocking. - if err := syscall.SetNonblock(efd, true); err != nil { - syscall.Munmap(txPipe) - syscall.Munmap(rxPipe) - syscall.Munmap(data) - syscall.Munmap(sharedData) - syscall.Close(efd) - return err - } - - // Initialize state based on buffers. - r.q.Init(txPipe, rxPipe, sharedDataPointer(sharedData)) - r.data = data - r.eventFD = efd - r.sharedData = sharedData - - return nil -} - -// cleanup releases all resources allocated during init(). It must only be -// called if init() has previously succeeded. -func (r *rx) cleanup() { - a, b := r.q.Bytes() - syscall.Munmap(a) - syscall.Munmap(b) - - syscall.Munmap(r.data) - syscall.Munmap(r.sharedData) - syscall.Close(r.eventFD) -} - -// postAndReceive posts the provided buffers (if any), and then tries to read -// from the receive queue. -// -// Capacity permitting, it reuses the posted buffer slice to store the buffers -// that were read as well. -// -// This function will block if there aren't any available packets. -func (r *rx) postAndReceive(b []queue.RxBuffer, stopRequested *uint32) ([]queue.RxBuffer, uint32) { - // Post the buffers first. If we cannot post, sleep until we can. We - // never post more than will fit concurrently, so it's safe to wait - // until enough room is available. - if len(b) != 0 && !r.q.PostBuffers(b) { - r.q.EnableNotification() - for !r.q.PostBuffers(b) { - var tmp [8]byte - rawfile.BlockingRead(r.eventFD, tmp[:]) - if atomic.LoadUint32(stopRequested) != 0 { - r.q.DisableNotification() - return nil, 0 - } - } - r.q.DisableNotification() - } - - // Read the next set of descriptors. - b, n := r.q.Dequeue(b[:0]) - if len(b) != 0 { - return b, n - } - - // Data isn't immediately available. Enable eventfd notifications. - r.q.EnableNotification() - for { - b, n = r.q.Dequeue(b) - if len(b) != 0 { - break - } - - // Wait for notification. - var tmp [8]byte - rawfile.BlockingRead(r.eventFD, tmp[:]) - if atomic.LoadUint32(stopRequested) != 0 { - r.q.DisableNotification() - return nil, 0 - } - } - r.q.DisableNotification() - - return b, n -} diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go deleted file mode 100644 index e34b780f8..000000000 --- a/pkg/tcpip/link/sharedmem/sharedmem.go +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build linux - -// Package sharedmem provides the implemention of data-link layer endpoints -// backed by shared memory. -// -// Shared memory endpoints can be used in the networking stack by calling New() -// to create a new endpoint, and then passing it as an argument to -// Stack.CreateNIC(). -package sharedmem - -import ( - "sync" - "sync/atomic" - "syscall" - - "gvisor.googlesource.com/gvisor/pkg/log" - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/header" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -// QueueConfig holds all the file descriptors needed to describe a tx or rx -// queue over shared memory. It is used when creating new shared memory -// endpoints to describe tx and rx queues. -type QueueConfig struct { - // DataFD is a file descriptor for the file that contains the data to - // be transmitted via this queue. Descriptors contain offsets within - // this file. - DataFD int - - // EventFD is a file descriptor for the event that is signaled when - // data is becomes available in this queue. - EventFD int - - // TxPipeFD is a file descriptor for the tx pipe associated with the - // queue. - TxPipeFD int - - // RxPipeFD is a file descriptor for the rx pipe associated with the - // queue. - RxPipeFD int - - // SharedDataFD is a file descriptor for the file that contains shared - // state between the two ends of the queue. This data specifies, for - // example, whether EventFD signaling is enabled or disabled. - SharedDataFD int -} - -type endpoint struct { - // mtu (maximum transmission unit) is the maximum size of a packet. - mtu uint32 - - // bufferSize is the size of each individual buffer. - bufferSize uint32 - - // addr is the local address of this endpoint. - addr tcpip.LinkAddress - - // rx is the receive queue. - rx rx - - // stopRequested is to be accessed atomically only, and determines if - // the worker goroutines should stop. - stopRequested uint32 - - // Wait group used to indicate that all workers have stopped. - completed sync.WaitGroup - - // mu protects the following fields. - mu sync.Mutex - - // tx is the transmit queue. - tx tx - - // workerStarted specifies whether the worker goroutine was started. - workerStarted bool -} - -// New creates a new shared-memory-based endpoint. Buffers will be broken up -// into buffers of "bufferSize" bytes. -func New(mtu, bufferSize uint32, addr tcpip.LinkAddress, tx, rx QueueConfig) (tcpip.LinkEndpointID, error) { - e := &endpoint{ - mtu: mtu, - bufferSize: bufferSize, - addr: addr, - } - - if err := e.tx.init(bufferSize, &tx); err != nil { - return 0, err - } - - if err := e.rx.init(bufferSize, &rx); err != nil { - e.tx.cleanup() - return 0, err - } - - return stack.RegisterLinkEndpoint(e), nil -} - -// Close frees all resources associated with the endpoint. -func (e *endpoint) Close() { - // Tell dispatch goroutine to stop, then write to the eventfd so that - // it wakes up in case it's sleeping. - atomic.StoreUint32(&e.stopRequested, 1) - syscall.Write(e.rx.eventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Cleanup the queues inline if the worker hasn't started yet; we also - // know it won't start from now on because stopRequested is set to 1. - e.mu.Lock() - workerPresent := e.workerStarted - e.mu.Unlock() - - if !workerPresent { - e.tx.cleanup() - e.rx.cleanup() - } -} - -// Wait waits until all workers have stopped after a Close() call. -func (e *endpoint) Wait() { - e.completed.Wait() -} - -// Attach implements stack.LinkEndpoint.Attach. It launches the goroutine that -// reads packets from the rx queue. -func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) { - e.mu.Lock() - if !e.workerStarted && atomic.LoadUint32(&e.stopRequested) == 0 { - e.workerStarted = true - e.completed.Add(1) - // Link endpoints are not savable. When transportation endpoints - // are saved, they stop sending outgoing packets and all - // incoming packets are rejected. - go e.dispatchLoop(dispatcher) // S/R-SAFE: see above. - } - e.mu.Unlock() -} - -// IsAttached implements stack.LinkEndpoint.IsAttached. -func (e *endpoint) IsAttached() bool { - e.mu.Lock() - defer e.mu.Unlock() - return e.workerStarted -} - -// MTU implements stack.LinkEndpoint.MTU. It returns the value initialized -// during construction. -func (e *endpoint) MTU() uint32 { - return e.mtu - header.EthernetMinimumSize -} - -// Capabilities implements stack.LinkEndpoint.Capabilities. -func (*endpoint) Capabilities() stack.LinkEndpointCapabilities { - return 0 -} - -// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It returns the -// ethernet frame header size. -func (*endpoint) MaxHeaderLength() uint16 { - return header.EthernetMinimumSize -} - -// LinkAddress implements stack.LinkEndpoint.LinkAddress. It returns the local -// link address. -func (e *endpoint) LinkAddress() tcpip.LinkAddress { - return e.addr -} - -// WritePacket writes outbound packets to the file descriptor. If it is not -// currently writable, the packet is dropped. -func (e *endpoint) WritePacket(r *stack.Route, _ *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { - // Add the ethernet header here. - eth := header.Ethernet(hdr.Prepend(header.EthernetMinimumSize)) - ethHdr := &header.EthernetFields{ - DstAddr: r.RemoteLinkAddress, - Type: protocol, - } - if r.LocalLinkAddress != "" { - ethHdr.SrcAddr = r.LocalLinkAddress - } else { - ethHdr.SrcAddr = e.addr - } - eth.Encode(ethHdr) - - v := payload.ToView() - // Transmit the packet. - e.mu.Lock() - ok := e.tx.transmit(hdr.View(), v) - e.mu.Unlock() - - if !ok { - return tcpip.ErrWouldBlock - } - - return nil -} - -// dispatchLoop reads packets from the rx queue in a loop and dispatches them -// to the network stack. -func (e *endpoint) dispatchLoop(d stack.NetworkDispatcher) { - // Post initial set of buffers. - limit := e.rx.q.PostedBuffersLimit() - if l := uint64(len(e.rx.data)) / uint64(e.bufferSize); limit > l { - limit = l - } - for i := uint64(0); i < limit; i++ { - b := queue.RxBuffer{ - Offset: i * uint64(e.bufferSize), - Size: e.bufferSize, - ID: i, - } - if !e.rx.q.PostBuffers([]queue.RxBuffer{b}) { - log.Warningf("Unable to post %v-th buffer", i) - } - } - - // Read in a loop until a stop is requested. - var rxb []queue.RxBuffer - for atomic.LoadUint32(&e.stopRequested) == 0 { - var n uint32 - rxb, n = e.rx.postAndReceive(rxb, &e.stopRequested) - - // Copy data from the shared area to its own buffer, then - // prepare to repost the buffer. - b := make([]byte, n) - offset := uint32(0) - for i := range rxb { - copy(b[offset:], e.rx.data[rxb[i].Offset:][:rxb[i].Size]) - offset += rxb[i].Size - - rxb[i].Size = e.bufferSize - } - - if n < header.EthernetMinimumSize { - continue - } - - // Send packet up the stack. - eth := header.Ethernet(b) - d.DeliverNetworkPacket(e, eth.SourceAddress(), eth.DestinationAddress(), eth.Type(), buffer.View(b[header.EthernetMinimumSize:]).ToVectorisedView()) - } - - // Clean state. - e.tx.cleanup() - e.rx.cleanup() - - e.completed.Done() -} diff --git a/pkg/tcpip/link/sharedmem/sharedmem_test.go b/pkg/tcpip/link/sharedmem/sharedmem_test.go deleted file mode 100644 index 65b9d7085..000000000 --- a/pkg/tcpip/link/sharedmem/sharedmem_test.go +++ /dev/null @@ -1,776 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build linux - -package sharedmem - -import ( - "bytes" - "io/ioutil" - "math/rand" - "os" - "strings" - "sync" - "syscall" - "testing" - "time" - - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/header" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -const ( - localLinkAddr = "\xde\xad\xbe\xef\x56\x78" - remoteLinkAddr = "\xde\xad\xbe\xef\x12\x34" - - queueDataSize = 1024 * 1024 - queuePipeSize = 4096 -) - -type queueBuffers struct { - data []byte - rx pipe.Tx - tx pipe.Rx -} - -func initQueue(t *testing.T, q *queueBuffers, c *QueueConfig) { - // Prepare tx pipe. - b, err := getBuffer(c.TxPipeFD) - if err != nil { - t.Fatalf("getBuffer failed: %v", err) - } - q.tx.Init(b) - - // Prepare rx pipe. - b, err = getBuffer(c.RxPipeFD) - if err != nil { - t.Fatalf("getBuffer failed: %v", err) - } - q.rx.Init(b) - - // Get data slice. - q.data, err = getBuffer(c.DataFD) - if err != nil { - t.Fatalf("getBuffer failed: %v", err) - } -} - -func (q *queueBuffers) cleanup() { - syscall.Munmap(q.tx.Bytes()) - syscall.Munmap(q.rx.Bytes()) - syscall.Munmap(q.data) -} - -type packetInfo struct { - addr tcpip.LinkAddress - proto tcpip.NetworkProtocolNumber - vv buffer.VectorisedView -} - -type testContext struct { - t *testing.T - ep *endpoint - txCfg QueueConfig - rxCfg QueueConfig - txq queueBuffers - rxq queueBuffers - - packetCh chan struct{} - mu sync.Mutex - packets []packetInfo -} - -func newTestContext(t *testing.T, mtu, bufferSize uint32, addr tcpip.LinkAddress) *testContext { - var err error - c := &testContext{ - t: t, - packetCh: make(chan struct{}, 1000000), - } - c.txCfg = createQueueFDs(t, queueSizes{ - dataSize: queueDataSize, - txPipeSize: queuePipeSize, - rxPipeSize: queuePipeSize, - sharedDataSize: 4096, - }) - - c.rxCfg = createQueueFDs(t, queueSizes{ - dataSize: queueDataSize, - txPipeSize: queuePipeSize, - rxPipeSize: queuePipeSize, - sharedDataSize: 4096, - }) - - initQueue(t, &c.txq, &c.txCfg) - initQueue(t, &c.rxq, &c.rxCfg) - - id, err := New(mtu, bufferSize, addr, c.txCfg, c.rxCfg) - if err != nil { - t.Fatalf("New failed: %v", err) - } - - c.ep = stack.FindLinkEndpoint(id).(*endpoint) - c.ep.Attach(c) - - return c -} - -func (c *testContext) DeliverNetworkPacket(_ stack.LinkEndpoint, remoteLinkAddr, localLinkAddr tcpip.LinkAddress, proto tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - c.mu.Lock() - c.packets = append(c.packets, packetInfo{ - addr: remoteLinkAddr, - proto: proto, - vv: vv.Clone(nil), - }) - c.mu.Unlock() - - c.packetCh <- struct{}{} -} - -func (c *testContext) cleanup() { - c.ep.Close() - closeFDs(&c.txCfg) - closeFDs(&c.rxCfg) - c.txq.cleanup() - c.rxq.cleanup() -} - -func (c *testContext) waitForPackets(n int, to <-chan time.Time, errorStr string) { - for i := 0; i < n; i++ { - select { - case <-c.packetCh: - case <-to: - c.t.Fatalf(errorStr) - } - } -} - -func (c *testContext) pushRxCompletion(size uint32, bs []queue.RxBuffer) { - b := c.rxq.rx.Push(queue.RxCompletionSize(len(bs))) - queue.EncodeRxCompletion(b, size, 0) - for i := range bs { - queue.EncodeRxCompletionBuffer(b, i, queue.RxBuffer{ - Offset: bs[i].Offset, - Size: bs[i].Size, - ID: bs[i].ID, - }) - } -} - -func randomFill(b []byte) { - for i := range b { - b[i] = byte(rand.Intn(256)) - } -} - -func shuffle(b []int) { - for i := len(b) - 1; i >= 0; i-- { - j := rand.Intn(i + 1) - b[i], b[j] = b[j], b[i] - } -} - -func createFile(t *testing.T, size int64, initQueue bool) int { - tmpDir := os.Getenv("TEST_TMPDIR") - if tmpDir == "" { - tmpDir = os.Getenv("TMPDIR") - } - f, err := ioutil.TempFile(tmpDir, "sharedmem_test") - if err != nil { - t.Fatalf("TempFile failed: %v", err) - } - defer f.Close() - syscall.Unlink(f.Name()) - - if initQueue { - // Write the "slot-free" flag in the initial queue. - _, err := f.WriteAt([]byte{0, 0, 0, 0, 0, 0, 0, 0x80}, 0) - if err != nil { - t.Fatalf("WriteAt failed: %v", err) - } - } - - fd, err := syscall.Dup(int(f.Fd())) - if err != nil { - t.Fatalf("Dup failed: %v", err) - } - - if err := syscall.Ftruncate(fd, size); err != nil { - syscall.Close(fd) - t.Fatalf("Ftruncate failed: %v", err) - } - - return fd -} - -func closeFDs(c *QueueConfig) { - syscall.Close(c.DataFD) - syscall.Close(c.EventFD) - syscall.Close(c.TxPipeFD) - syscall.Close(c.RxPipeFD) - syscall.Close(c.SharedDataFD) -} - -type queueSizes struct { - dataSize int64 - txPipeSize int64 - rxPipeSize int64 - sharedDataSize int64 -} - -func createQueueFDs(t *testing.T, s queueSizes) QueueConfig { - fd, _, err := syscall.RawSyscall(syscall.SYS_EVENTFD2, 0, 0, 0) - if err != 0 { - t.Fatalf("eventfd failed: %v", error(err)) - } - - return QueueConfig{ - EventFD: int(fd), - DataFD: createFile(t, s.dataSize, false), - TxPipeFD: createFile(t, s.txPipeSize, true), - RxPipeFD: createFile(t, s.rxPipeSize, true), - SharedDataFD: createFile(t, s.sharedDataSize, false), - } -} - -// TestSimpleSend sends 1000 packets with random header and payload sizes, -// then checks that the right payload is received on the shared memory queues. -func TestSimpleSend(t *testing.T) { - c := newTestContext(t, 20000, 1500, localLinkAddr) - defer c.cleanup() - - // Prepare route. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - } - - for iters := 1000; iters > 0; iters-- { - func() { - // Prepare and send packet. - n := rand.Intn(10000) - hdr := buffer.NewPrependable(n + int(c.ep.MaxHeaderLength())) - hdrBuf := hdr.Prepend(n) - randomFill(hdrBuf) - - n = rand.Intn(10000) - buf := buffer.NewView(n) - randomFill(buf) - - proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000)) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), proto); err != nil { - t.Fatalf("WritePacket failed: %v", err) - } - - // Receive packet. - desc := c.txq.tx.Pull() - pi := queue.DecodeTxPacketHeader(desc) - if pi.Reserved != 0 { - t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved) - } - contents := make([]byte, 0, pi.Size) - for i := 0; i < pi.BufferCount; i++ { - bi := queue.DecodeTxBufferHeader(desc, i) - contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...) - } - c.txq.tx.Flush() - - defer func() { - // Tell the endpoint about the completion of the write. - b := c.txq.rx.Push(8) - queue.EncodeTxCompletion(b, pi.ID) - c.txq.rx.Flush() - }() - - // Check the ethernet header. - ethTemplate := make(header.Ethernet, header.EthernetMinimumSize) - ethTemplate.Encode(&header.EthernetFields{ - SrcAddr: localLinkAddr, - DstAddr: remoteLinkAddr, - Type: proto, - }) - if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) { - t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate) - } - - // Compare contents skipping the ethernet header added by the - // endpoint. - merged := append(hdrBuf, buf...) - if uint32(len(contents)) < pi.Size { - t.Fatalf("Sum of buffers is less than packet size: %v < %v", len(contents), pi.Size) - } - contents = contents[:pi.Size][header.EthernetMinimumSize:] - - if !bytes.Equal(contents, merged) { - t.Fatalf("Buffers are different: got %x (%v bytes), want %x (%v bytes)", contents, len(contents), merged, len(merged)) - } - }() - } -} - -// TestPreserveSrcAddressInSend calls WritePacket once with LocalLinkAddress -// set in Route (using much of the same code as TestSimpleSend), then checks -// that the encoded ethernet header received includes the correct SrcAddr. -func TestPreserveSrcAddressInSend(t *testing.T) { - c := newTestContext(t, 20000, 1500, localLinkAddr) - defer c.cleanup() - - newLocalLinkAddress := tcpip.LinkAddress(strings.Repeat("0xFE", 6)) - // Set both remote and local link address in route. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - LocalLinkAddress: newLocalLinkAddress, - } - - // WritePacket panics given a prependable with anything less than - // the minimum size of the ethernet header. - hdr := buffer.NewPrependable(header.EthernetMinimumSize) - - proto := tcpip.NetworkProtocolNumber(rand.Intn(0x10000)) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buffer.VectorisedView{}, proto); err != nil { - t.Fatalf("WritePacket failed: %v", err) - } - - // Receive packet. - desc := c.txq.tx.Pull() - pi := queue.DecodeTxPacketHeader(desc) - if pi.Reserved != 0 { - t.Fatalf("Reserved value is non-zero: 0x%x", pi.Reserved) - } - contents := make([]byte, 0, pi.Size) - for i := 0; i < pi.BufferCount; i++ { - bi := queue.DecodeTxBufferHeader(desc, i) - contents = append(contents, c.txq.data[bi.Offset:][:bi.Size]...) - } - c.txq.tx.Flush() - - defer func() { - // Tell the endpoint about the completion of the write. - b := c.txq.rx.Push(8) - queue.EncodeTxCompletion(b, pi.ID) - c.txq.rx.Flush() - }() - - // Check that the ethernet header contains the expected SrcAddr. - ethTemplate := make(header.Ethernet, header.EthernetMinimumSize) - ethTemplate.Encode(&header.EthernetFields{ - SrcAddr: newLocalLinkAddress, - DstAddr: remoteLinkAddr, - Type: proto, - }) - if got := contents[:header.EthernetMinimumSize]; !bytes.Equal(got, []byte(ethTemplate)) { - t.Fatalf("Bad ethernet header in packet: got %x, want %x", got, ethTemplate) - } -} - -// TestFillTxQueue sends packets until the queue is full. -func TestFillTxQueue(t *testing.T) { - c := newTestContext(t, 20000, 1500, localLinkAddr) - defer c.cleanup() - - // Prepare to send a packet. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - } - - buf := buffer.NewView(100) - - // Each packet is uses no more than 40 bytes, so write that many packets - // until the tx queue if full. - ids := make(map[uint64]struct{}) - for i := queuePipeSize / 40; i > 0; i-- { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - - // Check that they have different IDs. - desc := c.txq.tx.Pull() - pi := queue.DecodeTxPacketHeader(desc) - if _, ok := ids[pi.ID]; ok { - t.Fatalf("ID (%v) reused", pi.ID) - } - ids[pi.ID] = struct{}{} - } - - // Next attempt to write must fail. - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want { - t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) - } -} - -// TestFillTxQueueAfterBadCompletion sends a bad completion, then sends packets -// until the queue is full. -func TestFillTxQueueAfterBadCompletion(t *testing.T) { - c := newTestContext(t, 20000, 1500, localLinkAddr) - defer c.cleanup() - - // Send a bad completion. - queue.EncodeTxCompletion(c.txq.rx.Push(8), 1) - c.txq.rx.Flush() - - // Prepare to send a packet. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - } - - buf := buffer.NewView(100) - - // Send two packets so that the id slice has at least two slots. - for i := 2; i > 0; i-- { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - } - - // Complete the two writes twice. - for i := 2; i > 0; i-- { - pi := queue.DecodeTxPacketHeader(c.txq.tx.Pull()) - - queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) - queue.EncodeTxCompletion(c.txq.rx.Push(8), pi.ID) - c.txq.rx.Flush() - } - c.txq.tx.Flush() - - // Each packet is uses no more than 40 bytes, so write that many packets - // until the tx queue if full. - ids := make(map[uint64]struct{}) - for i := queuePipeSize / 40; i > 0; i-- { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - - // Check that they have different IDs. - desc := c.txq.tx.Pull() - pi := queue.DecodeTxPacketHeader(desc) - if _, ok := ids[pi.ID]; ok { - t.Fatalf("ID (%v) reused", pi.ID) - } - ids[pi.ID] = struct{}{} - } - - // Next attempt to write must fail. - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != want { - t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) - } -} - -// TestFillTxMemory sends packets until the we run out of shared memory. -func TestFillTxMemory(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - defer c.cleanup() - - // Prepare to send a packet. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - } - - buf := buffer.NewView(100) - - // Each packet is uses up one buffer, so write as many as possible until - // we fill the memory. - ids := make(map[uint64]struct{}) - for i := queueDataSize / bufferSize; i > 0; i-- { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - - // Check that they have different IDs. - desc := c.txq.tx.Pull() - pi := queue.DecodeTxPacketHeader(desc) - if _, ok := ids[pi.ID]; ok { - t.Fatalf("ID (%v) reused", pi.ID) - } - ids[pi.ID] = struct{}{} - c.txq.tx.Flush() - } - - // Next attempt to write must fail. - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber) - if want := tcpip.ErrWouldBlock; err != want { - t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) - } -} - -// TestFillTxMemoryWithMultiBuffer sends packets until the we run out of -// shared memory for a 2-buffer packet, but still with room for a 1-buffer -// packet. -func TestFillTxMemoryWithMultiBuffer(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - defer c.cleanup() - - // Prepare to send a packet. - r := stack.Route{ - RemoteLinkAddress: remoteLinkAddr, - } - - buf := buffer.NewView(100) - - // Each packet is uses up one buffer, so write as many as possible - // until there is only one buffer left. - for i := queueDataSize/bufferSize - 1; i > 0; i-- { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - - // Pull the posted buffer. - c.txq.tx.Pull() - c.txq.tx.Flush() - } - - // Attempt to write a two-buffer packet. It must fail. - { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - uu := buffer.NewView(bufferSize).ToVectorisedView() - if want, err := tcpip.ErrWouldBlock, c.ep.WritePacket(&r, nil /* gso */, hdr, uu, header.IPv4ProtocolNumber); err != want { - t.Fatalf("WritePacket return unexpected result: got %v, want %v", err, want) - } - } - - // Attempt to write the one-buffer packet again. It must succeed. - { - hdr := buffer.NewPrependable(int(c.ep.MaxHeaderLength())) - if err := c.ep.WritePacket(&r, nil /* gso */, hdr, buf.ToVectorisedView(), header.IPv4ProtocolNumber); err != nil { - t.Fatalf("WritePacket failed unexpectedly: %v", err) - } - } -} - -func pollPull(t *testing.T, p *pipe.Rx, to <-chan time.Time, errStr string) []byte { - t.Helper() - - for { - b := p.Pull() - if b != nil { - return b - } - - select { - case <-time.After(10 * time.Millisecond): - case <-to: - t.Fatal(errStr) - } - } -} - -// TestSimpleReceive completes 1000 different receives with random payload and -// random number of buffers. It checks that the contents match the expected -// values. -func TestSimpleReceive(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - defer c.cleanup() - - // Check that buffers have been posted. - limit := c.ep.rx.q.PostedBuffersLimit() - for i := uint64(0); i < limit; i++ { - timeout := time.After(2 * time.Second) - bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers to be posted")) - - if want := i * bufferSize; want != bi.Offset { - t.Fatalf("Bad posted offset: got %v, want %v", bi.Offset, want) - } - - if want := i; want != bi.ID { - t.Fatalf("Bad posted ID: got %v, want %v", bi.ID, want) - } - - if bufferSize != bi.Size { - t.Fatalf("Bad posted bufferSize: got %v, want %v", bi.Size, bufferSize) - } - } - c.rxq.tx.Flush() - - // Create a slice with the indices 0..limit-1. - idx := make([]int, limit) - for i := range idx { - idx[i] = i - } - - // Complete random packets 1000 times. - for iters := 1000; iters > 0; iters-- { - timeout := time.After(2 * time.Second) - // Prepare a random packet. - shuffle(idx) - n := 1 + rand.Intn(10) - bufs := make([]queue.RxBuffer, n) - contents := make([]byte, bufferSize*n-rand.Intn(500)) - randomFill(contents) - for i := range bufs { - j := idx[i] - bufs[i].Size = bufferSize - bufs[i].Offset = uint64(bufferSize * j) - bufs[i].ID = uint64(j) - - copy(c.rxq.data[bufs[i].Offset:][:bufferSize], contents[i*bufferSize:]) - } - - // Push completion. - c.pushRxCompletion(uint32(len(contents)), bufs) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Wait for packet to be received, then check it. - c.waitForPackets(1, time.After(time.Second), "Error waiting for packet") - c.mu.Lock() - rcvd := []byte(c.packets[0].vv.First()) - c.packets = c.packets[:0] - c.mu.Unlock() - - if contents := contents[header.EthernetMinimumSize:]; !bytes.Equal(contents, rcvd) { - t.Fatalf("Unexpected buffer contents: got %x, want %x", rcvd, contents) - } - - // Check that buffers have been reposted. - for i := range bufs { - bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffers to be reposted")) - if bi != bufs[i] { - t.Fatalf("Unexpected buffer reposted: got %x, want %x", bi, bufs[i]) - } - } - c.rxq.tx.Flush() - } -} - -// TestRxBuffersReposted tests that rx buffers get reposted after they have been -// completed. -func TestRxBuffersReposted(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - defer c.cleanup() - - // Receive all posted buffers. - limit := c.ep.rx.q.PostedBuffersLimit() - buffers := make([]queue.RxBuffer, 0, limit) - for i := limit; i > 0; i-- { - timeout := time.After(2 * time.Second) - buffers = append(buffers, queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for all buffers"))) - } - c.rxq.tx.Flush() - - // Check that all buffers are reposted when individually completed. - for i := range buffers { - timeout := time.After(2 * time.Second) - // Complete the buffer. - c.pushRxCompletion(buffers[i].Size, buffers[i:][:1]) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Wait for it to be reposted. - bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) - if bi != buffers[i] { - t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[i]) - } - } - c.rxq.tx.Flush() - - // Check that all buffers are reposted when completed in pairs. - for i := 0; i < len(buffers)/2; i++ { - timeout := time.After(2 * time.Second) - // Complete with two buffers. - c.pushRxCompletion(2*bufferSize, buffers[2*i:][:2]) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Wait for them to be reposted. - for j := 0; j < 2; j++ { - bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, timeout, "Timeout waiting for buffer to be reposted")) - if bi != buffers[2*i+j] { - t.Fatalf("Different buffer posted: got %v, want %v", bi, buffers[2*i+j]) - } - } - } - c.rxq.tx.Flush() -} - -// TestReceivePostingIsFull checks that the endpoint will properly handle the -// case when a received buffer cannot be immediately reposted because it hasn't -// been pulled from the tx pipe yet. -func TestReceivePostingIsFull(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - defer c.cleanup() - - // Complete first posted buffer before flushing it from the tx pipe. - first := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for first buffer to be posted")) - c.pushRxCompletion(first.Size, []queue.RxBuffer{first}) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Check that packet is received. - c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") - - // Complete another buffer. - second := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for second buffer to be posted")) - c.pushRxCompletion(second.Size, []queue.RxBuffer{second}) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Check that no packet is received yet, as the worker is blocked trying - // to repost. - select { - case <-time.After(500 * time.Millisecond): - case <-c.packetCh: - t.Fatalf("Unexpected packet received") - } - - // Flush tx queue, which will allow the first buffer to be reposted, - // and the second completion to be pulled. - c.rxq.tx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Check that second packet completes. - c.waitForPackets(1, time.After(time.Second), "Timeout waiting for second completed packet") -} - -// TestCloseWhileWaitingToPost closes the endpoint while it is waiting to -// repost a buffer. Make sure it backs out. -func TestCloseWhileWaitingToPost(t *testing.T) { - const bufferSize = 1500 - c := newTestContext(t, 20000, bufferSize, localLinkAddr) - cleaned := false - defer func() { - if !cleaned { - c.cleanup() - } - }() - - // Complete first posted buffer before flushing it from the tx pipe. - bi := queue.DecodeRxBufferHeader(pollPull(t, &c.rxq.tx, time.After(time.Second), "Timeout waiting for initial buffer to be posted")) - c.pushRxCompletion(bi.Size, []queue.RxBuffer{bi}) - c.rxq.rx.Flush() - syscall.Write(c.rxCfg.EventFD, []byte{1, 0, 0, 0, 0, 0, 0, 0}) - - // Wait for packet to be indicated. - c.waitForPackets(1, time.After(time.Second), "Timeout waiting for completed packet") - - // Cleanup and wait for worker to complete. - c.cleanup() - cleaned = true - c.ep.Wait() -} diff --git a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go b/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go deleted file mode 100644 index f7e816a41..000000000 --- a/pkg/tcpip/link/sharedmem/sharedmem_unsafe.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sharedmem - -import ( - "unsafe" -) - -// sharedDataPointer converts the shared data slice into a pointer so that it -// can be used in atomic operations. -func sharedDataPointer(sharedData []byte) *uint32 { - return (*uint32)(unsafe.Pointer(&sharedData[0:4][0])) -} diff --git a/pkg/tcpip/link/sharedmem/tx.go b/pkg/tcpip/link/sharedmem/tx.go deleted file mode 100644 index ac3577aa6..000000000 --- a/pkg/tcpip/link/sharedmem/tx.go +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sharedmem - -import ( - "math" - "syscall" - - "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/queue" -) - -const ( - nilID = math.MaxUint64 -) - -// tx holds all state associated with a tx queue. -type tx struct { - data []byte - q queue.Tx - ids idManager - bufs bufferManager -} - -// init initializes all state needed by the tx queue based on the information -// provided. -// -// The caller always retains ownership of all file descriptors passed in. The -// queue implementation will duplicate any that it may need in the future. -func (t *tx) init(mtu uint32, c *QueueConfig) error { - // Map in all buffers. - txPipe, err := getBuffer(c.TxPipeFD) - if err != nil { - return err - } - - rxPipe, err := getBuffer(c.RxPipeFD) - if err != nil { - syscall.Munmap(txPipe) - return err - } - - data, err := getBuffer(c.DataFD) - if err != nil { - syscall.Munmap(txPipe) - syscall.Munmap(rxPipe) - return err - } - - // Initialize state based on buffers. - t.q.Init(txPipe, rxPipe) - t.ids.init() - t.bufs.init(0, len(data), int(mtu)) - t.data = data - - return nil -} - -// cleanup releases all resources allocated during init(). It must only be -// called if init() has previously succeeded. -func (t *tx) cleanup() { - a, b := t.q.Bytes() - syscall.Munmap(a) - syscall.Munmap(b) - syscall.Munmap(t.data) -} - -// transmit sends a packet made up of up to two buffers. Returns a boolean that -// specifies whether the packet was successfully transmitted. -func (t *tx) transmit(a, b []byte) bool { - // Pull completions from the tx queue and add their buffers back to the - // pool so that we can reuse them. - for { - id, ok := t.q.CompletedPacket() - if !ok { - break - } - - if buf := t.ids.remove(id); buf != nil { - t.bufs.free(buf) - } - } - - bSize := t.bufs.entrySize - total := uint32(len(a) + len(b)) - bufCount := (total + bSize - 1) / bSize - - // Allocate enough buffers to hold all the data. - var buf *queue.TxBuffer - for i := bufCount; i != 0; i-- { - b := t.bufs.alloc() - if b == nil { - // Failed to get all buffers. Return to the pool - // whatever we had managed to get. - if buf != nil { - t.bufs.free(buf) - } - return false - } - b.Next = buf - buf = b - } - - // Copy data into allocated buffers. - nBuf := buf - var dBuf []byte - for _, data := range [][]byte{a, b} { - for len(data) > 0 { - if len(dBuf) == 0 { - dBuf = t.data[nBuf.Offset:][:nBuf.Size] - nBuf = nBuf.Next - } - n := copy(dBuf, data) - data = data[n:] - dBuf = dBuf[n:] - } - } - - // Get an id for this packet and send it out. - id := t.ids.add(buf) - if !t.q.Enqueue(id, total, bufCount, buf) { - t.ids.remove(id) - t.bufs.free(buf) - return false - } - - return true -} - -// getBuffer returns a memory region mapped to the full contents of the given -// file descriptor. -func getBuffer(fd int) ([]byte, error) { - var s syscall.Stat_t - if err := syscall.Fstat(fd, &s); err != nil { - return nil, err - } - - // Check that size doesn't overflow an int. - if s.Size > int64(^uint(0)>>1) { - return nil, syscall.EDOM - } - - return syscall.Mmap(fd, 0, int(s.Size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED|syscall.MAP_FILE) -} - -// idDescriptor is used by idManager to either point to a tx buffer (in case -// the ID is assigned) or to the next free element (if the id is not assigned). -type idDescriptor struct { - buf *queue.TxBuffer - nextFree uint64 -} - -// idManager is a manager of tx buffer identifiers. It assigns unique IDs to -// tx buffers that are added to it; the IDs can only be reused after they have -// been removed. -// -// The ID assignments are stored so that the tx buffers can be retrieved from -// the IDs previously assigned to them. -type idManager struct { - // ids is a slice containing all tx buffers. The ID is the index into - // this slice. - ids []idDescriptor - - // freeList a list of free IDs. - freeList uint64 -} - -// init initializes the id manager. -func (m *idManager) init() { - m.freeList = nilID -} - -// add assigns an ID to the given tx buffer. -func (m *idManager) add(b *queue.TxBuffer) uint64 { - if i := m.freeList; i != nilID { - // There is an id available in the free list, just use it. - m.ids[i].buf = b - m.freeList = m.ids[i].nextFree - return i - } - - // We need to expand the id descriptor. - m.ids = append(m.ids, idDescriptor{buf: b}) - return uint64(len(m.ids) - 1) -} - -// remove retrieves the tx buffer associated with the given ID, and removes the -// ID from the assigned table so that it can be reused in the future. -func (m *idManager) remove(i uint64) *queue.TxBuffer { - if i >= uint64(len(m.ids)) { - return nil - } - - desc := &m.ids[i] - b := desc.buf - if b == nil { - // The provided id is not currently assigned. - return nil - } - - desc.buf = nil - desc.nextFree = m.freeList - m.freeList = i - - return b -} - -// bufferManager manages a buffer region broken up into smaller, equally sized -// buffers. Smaller buffers can be allocated and freed. -type bufferManager struct { - freeList *queue.TxBuffer - curOffset uint64 - limit uint64 - entrySize uint32 -} - -// init initializes the buffer manager. -func (b *bufferManager) init(initialOffset, size, entrySize int) { - b.freeList = nil - b.curOffset = uint64(initialOffset) - b.limit = uint64(initialOffset + size/entrySize*entrySize) - b.entrySize = uint32(entrySize) -} - -// alloc allocates a buffer from the manager, if one is available. -func (b *bufferManager) alloc() *queue.TxBuffer { - if b.freeList != nil { - // There is a descriptor ready for reuse in the free list. - d := b.freeList - b.freeList = d.Next - d.Next = nil - return d - } - - if b.curOffset < b.limit { - // There is room available in the never-used range, so create - // a new descriptor for it. - d := &queue.TxBuffer{ - Offset: b.curOffset, - Size: b.entrySize, - } - b.curOffset += uint64(b.entrySize) - return d - } - - return nil -} - -// free returns all buffers in the list to the buffer manager so that they can -// be reused. -func (b *bufferManager) free(d *queue.TxBuffer) { - // Find the last buffer in the list. - last := d - for last.Next != nil { - last = last.Next - } - - // Push list onto free list. - last.Next = b.freeList - b.freeList = d -} diff --git a/pkg/tcpip/link/sniffer/BUILD b/pkg/tcpip/link/sniffer/BUILD deleted file mode 100644 index 7d0d1781e..000000000 --- a/pkg/tcpip/link/sniffer/BUILD +++ /dev/null @@ -1,22 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") - -package(licenses = ["notice"]) - -go_library( - name = "sniffer", - srcs = [ - "pcap.go", - "sniffer.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sniffer", - visibility = [ - "//visibility:public", - ], - deps = [ - "//pkg/log", - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/header", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/sniffer/sniffer_state_autogen.go b/pkg/tcpip/link/sniffer/sniffer_state_autogen.go new file mode 100755 index 000000000..cfd84a739 --- /dev/null +++ b/pkg/tcpip/link/sniffer/sniffer_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package sniffer + diff --git a/pkg/tcpip/link/tun/BUILD b/pkg/tcpip/link/tun/BUILD deleted file mode 100644 index e54852d3f..000000000 --- a/pkg/tcpip/link/tun/BUILD +++ /dev/null @@ -1,12 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library") - -package(licenses = ["notice"]) - -go_library( - name = "tun", - srcs = ["tun_unsafe.go"], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/tun", - visibility = [ - "//visibility:public", - ], -) diff --git a/pkg/tcpip/link/tun/tun_unsafe.go b/pkg/tcpip/link/tun/tun_unsafe.go deleted file mode 100644 index 09ca9b527..000000000 --- a/pkg/tcpip/link/tun/tun_unsafe.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// +build linux - -// Package tun contains methods to open TAP and TUN devices. -package tun - -import ( - "syscall" - "unsafe" -) - -// Open opens the specified TUN device, sets it to non-blocking mode, and -// returns its file descriptor. -func Open(name string) (int, error) { - return open(name, syscall.IFF_TUN|syscall.IFF_NO_PI) -} - -// OpenTAP opens the specified TAP device, sets it to non-blocking mode, and -// returns its file descriptor. -func OpenTAP(name string) (int, error) { - return open(name, syscall.IFF_TAP|syscall.IFF_NO_PI) -} - -func open(name string, flags uint16) (int, error) { - fd, err := syscall.Open("/dev/net/tun", syscall.O_RDWR, 0) - if err != nil { - return -1, err - } - - var ifr struct { - name [16]byte - flags uint16 - _ [22]byte - } - - copy(ifr.name[:], name) - ifr.flags = flags - _, _, errno := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), syscall.TUNSETIFF, uintptr(unsafe.Pointer(&ifr))) - if errno != 0 { - syscall.Close(fd) - return -1, errno - } - - if err = syscall.SetNonblock(fd, true); err != nil { - syscall.Close(fd) - return -1, err - } - - return fd, nil -} diff --git a/pkg/tcpip/link/waitable/BUILD b/pkg/tcpip/link/waitable/BUILD deleted file mode 100644 index 89a9eee23..000000000 --- a/pkg/tcpip/link/waitable/BUILD +++ /dev/null @@ -1,33 +0,0 @@ -load("//tools/go_stateify:defs.bzl", "go_library", "go_test") - -package(licenses = ["notice"]) - -go_library( - name = "waitable", - srcs = [ - "waitable.go", - ], - importpath = "gvisor.googlesource.com/gvisor/pkg/tcpip/link/waitable", - visibility = [ - "//visibility:public", - ], - deps = [ - "//pkg/gate", - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/stack", - ], -) - -go_test( - name = "waitable_test", - srcs = [ - "waitable_test.go", - ], - embed = [":waitable"], - deps = [ - "//pkg/tcpip", - "//pkg/tcpip/buffer", - "//pkg/tcpip/stack", - ], -) diff --git a/pkg/tcpip/link/waitable/waitable.go b/pkg/tcpip/link/waitable/waitable.go deleted file mode 100644 index 21690a226..000000000 --- a/pkg/tcpip/link/waitable/waitable.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package waitable provides the implementation of data-link layer endpoints -// that wrap other endpoints, and can wait for inflight calls to WritePacket or -// DeliverNetworkPacket to finish (and new ones to be prevented). -// -// Waitable endpoints can be used in the networking stack by calling New(eID) to -// create a new endpoint, where eID is the ID of the endpoint being wrapped, -// and then passing it as an argument to Stack.CreateNIC(). -package waitable - -import ( - "gvisor.googlesource.com/gvisor/pkg/gate" - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -// Endpoint is a waitable link-layer endpoint. -type Endpoint struct { - dispatchGate gate.Gate - dispatcher stack.NetworkDispatcher - - writeGate gate.Gate - lower stack.LinkEndpoint -} - -// New creates a new waitable link-layer endpoint. It wraps around another -// endpoint and allows the caller to block new write/dispatch calls and wait for -// the inflight ones to finish before returning. -func New(lower tcpip.LinkEndpointID) (tcpip.LinkEndpointID, *Endpoint) { - e := &Endpoint{ - lower: stack.FindLinkEndpoint(lower), - } - return stack.RegisterLinkEndpoint(e), e -} - -// DeliverNetworkPacket implements stack.NetworkDispatcher.DeliverNetworkPacket. -// It is called by the link-layer endpoint being wrapped when a packet arrives, -// and only forwards to the actual dispatcher if Wait or WaitDispatch haven't -// been called. -func (e *Endpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - if !e.dispatchGate.Enter() { - return - } - - e.dispatcher.DeliverNetworkPacket(e, remote, local, protocol, vv) - e.dispatchGate.Leave() -} - -// Attach implements stack.LinkEndpoint.Attach. It saves the dispatcher and -// registers with the lower endpoint as its dispatcher so that "e" is called -// for inbound packets. -func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) { - e.dispatcher = dispatcher - e.lower.Attach(e) -} - -// IsAttached implements stack.LinkEndpoint.IsAttached. -func (e *Endpoint) IsAttached() bool { - return e.dispatcher != nil -} - -// MTU implements stack.LinkEndpoint.MTU. It just forwards the request to the -// lower endpoint. -func (e *Endpoint) MTU() uint32 { - return e.lower.MTU() -} - -// Capabilities implements stack.LinkEndpoint.Capabilities. It just forwards the -// request to the lower endpoint. -func (e *Endpoint) Capabilities() stack.LinkEndpointCapabilities { - return e.lower.Capabilities() -} - -// MaxHeaderLength implements stack.LinkEndpoint.MaxHeaderLength. It just -// forwards the request to the lower endpoint. -func (e *Endpoint) MaxHeaderLength() uint16 { - return e.lower.MaxHeaderLength() -} - -// LinkAddress implements stack.LinkEndpoint.LinkAddress. It just forwards the -// request to the lower endpoint. -func (e *Endpoint) LinkAddress() tcpip.LinkAddress { - return e.lower.LinkAddress() -} - -// WritePacket implements stack.LinkEndpoint.WritePacket. It is called by -// higher-level protocols to write packets. It only forwards packets to the -// lower endpoint if Wait or WaitWrite haven't been called. -func (e *Endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { - if !e.writeGate.Enter() { - return nil - } - - err := e.lower.WritePacket(r, gso, hdr, payload, protocol) - e.writeGate.Leave() - return err -} - -// WaitWrite prevents new calls to WritePacket from reaching the lower endpoint, -// and waits for inflight ones to finish before returning. -func (e *Endpoint) WaitWrite() { - e.writeGate.Close() -} - -// WaitDispatch prevents new calls to DeliverNetworkPacket from reaching the -// actual dispatcher, and waits for inflight ones to finish before returning. -func (e *Endpoint) WaitDispatch() { - e.dispatchGate.Close() -} diff --git a/pkg/tcpip/link/waitable/waitable_test.go b/pkg/tcpip/link/waitable/waitable_test.go deleted file mode 100644 index 62054fb7f..000000000 --- a/pkg/tcpip/link/waitable/waitable_test.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2018 The gVisor Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package waitable - -import ( - "testing" - - "gvisor.googlesource.com/gvisor/pkg/tcpip" - "gvisor.googlesource.com/gvisor/pkg/tcpip/buffer" - "gvisor.googlesource.com/gvisor/pkg/tcpip/stack" -) - -type countedEndpoint struct { - dispatchCount int - writeCount int - attachCount int - - mtu uint32 - capabilities stack.LinkEndpointCapabilities - hdrLen uint16 - linkAddr tcpip.LinkAddress - - dispatcher stack.NetworkDispatcher -} - -func (e *countedEndpoint) DeliverNetworkPacket(linkEP stack.LinkEndpoint, remote, local tcpip.LinkAddress, protocol tcpip.NetworkProtocolNumber, vv buffer.VectorisedView) { - e.dispatchCount++ -} - -func (e *countedEndpoint) Attach(dispatcher stack.NetworkDispatcher) { - e.attachCount++ - e.dispatcher = dispatcher -} - -// IsAttached implements stack.LinkEndpoint.IsAttached. -func (e *countedEndpoint) IsAttached() bool { - return e.dispatcher != nil -} - -func (e *countedEndpoint) MTU() uint32 { - return e.mtu -} - -func (e *countedEndpoint) Capabilities() stack.LinkEndpointCapabilities { - return e.capabilities -} - -func (e *countedEndpoint) MaxHeaderLength() uint16 { - return e.hdrLen -} - -func (e *countedEndpoint) LinkAddress() tcpip.LinkAddress { - return e.linkAddr -} - -func (e *countedEndpoint) WritePacket(r *stack.Route, _ *stack.GSO, hdr buffer.Prependable, payload buffer.VectorisedView, protocol tcpip.NetworkProtocolNumber) *tcpip.Error { - e.writeCount++ - return nil -} - -func TestWaitWrite(t *testing.T) { - ep := &countedEndpoint{} - _, wep := New(stack.RegisterLinkEndpoint(ep)) - - // Write and check that it goes through. - wep.WritePacket(nil, nil /* gso */, buffer.Prependable{}, buffer.VectorisedView{}, 0) - if want := 1; ep.writeCount != want { - t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want) - } - - // Wait on dispatches, then try to write. It must go through. - wep.WaitDispatch() - wep.WritePacket(nil, nil /* gso */, buffer.Prependable{}, buffer.VectorisedView{}, 0) - if want := 2; ep.writeCount != want { - t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want) - } - - // Wait on writes, then try to write. It must not go through. - wep.WaitWrite() - wep.WritePacket(nil, nil /* gso */, buffer.Prependable{}, buffer.VectorisedView{}, 0) - if want := 2; ep.writeCount != want { - t.Fatalf("Unexpected writeCount: got=%v, want=%v", ep.writeCount, want) - } -} - -func TestWaitDispatch(t *testing.T) { - ep := &countedEndpoint{} - _, wep := New(stack.RegisterLinkEndpoint(ep)) - - // Check that attach happens. - wep.Attach(ep) - if want := 1; ep.attachCount != want { - t.Fatalf("Unexpected attachCount: got=%v, want=%v", ep.attachCount, want) - } - - // Dispatch and check that it goes through. - ep.dispatcher.DeliverNetworkPacket(ep, "", "", 0, buffer.VectorisedView{}) - if want := 1; ep.dispatchCount != want { - t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want) - } - - // Wait on writes, then try to dispatch. It must go through. - wep.WaitWrite() - ep.dispatcher.DeliverNetworkPacket(ep, "", "", 0, buffer.VectorisedView{}) - if want := 2; ep.dispatchCount != want { - t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want) - } - - // Wait on dispatches, then try to dispatch. It must not go through. - wep.WaitDispatch() - ep.dispatcher.DeliverNetworkPacket(ep, "", "", 0, buffer.VectorisedView{}) - if want := 2; ep.dispatchCount != want { - t.Fatalf("Unexpected dispatchCount: got=%v, want=%v", ep.dispatchCount, want) - } -} - -func TestOtherMethods(t *testing.T) { - const ( - mtu = 0xdead - capabilities = 0xbeef - hdrLen = 0x1234 - linkAddr = "test address" - ) - ep := &countedEndpoint{ - mtu: mtu, - capabilities: capabilities, - hdrLen: hdrLen, - linkAddr: linkAddr, - } - _, wep := New(stack.RegisterLinkEndpoint(ep)) - - if v := wep.MTU(); v != mtu { - t.Fatalf("Unexpected mtu: got=%v, want=%v", v, mtu) - } - - if v := wep.Capabilities(); v != capabilities { - t.Fatalf("Unexpected capabilities: got=%v, want=%v", v, capabilities) - } - - if v := wep.MaxHeaderLength(); v != hdrLen { - t.Fatalf("Unexpected MaxHeaderLength: got=%v, want=%v", v, hdrLen) - } - - if v := wep.LinkAddress(); v != linkAddr { - t.Fatalf("Unexpected LinkAddress: got=%q, want=%q", v, linkAddr) - } -} |