summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--pkg/tcpip/link/fdbased/endpoint.go16
-rw-r--r--pkg/tcpip/link/fdbased/mmap.go4
-rw-r--r--pkg/tcpip/link/fdbased/packet_dispatchers.go21
-rw-r--r--pkg/tcpip/link/rawfile/rawfile_unsafe.go31
4 files changed, 64 insertions, 8 deletions
diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go
index 2bb1be5d6..01d1ad3c8 100644
--- a/pkg/tcpip/link/fdbased/endpoint.go
+++ b/pkg/tcpip/link/fdbased/endpoint.go
@@ -57,6 +57,7 @@ import (
// NetworkDispatcher.
type linkDispatcher interface {
dispatch() (bool, tcpip.Error)
+ Cancel()
}
// PacketDispatchMode are the various supported methods of receiving and
@@ -322,9 +323,24 @@ func isSocketFD(fd int) (bool, error) {
return (stat.Mode & unix.S_IFSOCK) == unix.S_IFSOCK, nil
}
+func (e *endpoint) detach() {
+ for _, d := range e.inboundDispatchers {
+ d.Cancel()
+ }
+ e.Wait()
+}
+
// Attach launches the goroutine that reads packets from the file descriptor and
// dispatches them via the provided dispatcher.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
+ if (e.dispatcher != nil) {
+ e.detach()
+ }
+
+ if (dispatcher == nil) {
+ return
+ }
+
e.dispatcher = dispatcher
// Link endpoints are not savable. When transportation endpoints are
// saved, they stop sending outgoing packets and all incoming packets
diff --git a/pkg/tcpip/link/fdbased/mmap.go b/pkg/tcpip/link/fdbased/mmap.go
index 5d698a5e9..3c95143c9 100644
--- a/pkg/tcpip/link/fdbased/mmap.go
+++ b/pkg/tcpip/link/fdbased/mmap.go
@@ -160,6 +160,10 @@ func (d *packetMMapDispatcher) readMMappedPacket() ([]byte, tcpip.Error) {
return pkt, nil
}
+func (d *packetMMapDispatcher) Cancel() {
+ // TODO
+}
+
// dispatch reads packets from an mmaped ring buffer and dispatches them to the
// network stack.
func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) {
diff --git a/pkg/tcpip/link/fdbased/packet_dispatchers.go b/pkg/tcpip/link/fdbased/packet_dispatchers.go
index a7adf822b..9deee81ac 100644
--- a/pkg/tcpip/link/fdbased/packet_dispatchers.go
+++ b/pkg/tcpip/link/fdbased/packet_dispatchers.go
@@ -17,6 +17,8 @@
package fdbased
import (
+ "os"
+
"golang.org/x/sys/unix"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/buffer"
@@ -124,18 +126,29 @@ type readVDispatcher struct {
// buf is the iovec buffer that contains the packet contents.
buf *iovecBuffer
+
+ cancelRead *os.File
+ cancelWrite *os.File
}
func newReadVDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
- d := &readVDispatcher{fd: fd, e: e}
+ cancelRead, cancelWrite, err := os.Pipe()
+ if err != nil {
+ return nil, err
+ }
+ d := &readVDispatcher{fd: fd, e: e, cancelRead: cancelRead, cancelWrite: cancelWrite}
skipsVnetHdr := d.e.Capabilities()&stack.CapabilityHardwareGSO != 0
d.buf = newIovecBuffer(BufConfig, skipsVnetHdr)
return d, nil
}
+func (d *readVDispatcher) Cancel() {
+ d.cancelWrite.Write([]byte{0})
+}
+
// dispatch reads one packet from the file descriptor and dispatches it.
func (d *readVDispatcher) dispatch() (bool, tcpip.Error) {
- n, err := rawfile.BlockingReadv(d.fd, d.buf.nextIovecs())
+ n, err := rawfile.BlockingReadvWithCancel(d.fd, d.buf.nextIovecs(), d.cancelRead)
if n == 0 || err != nil {
return false, err
}
@@ -219,6 +232,10 @@ func newRecvMMsgDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
return d, nil
}
+func (d *recvMMsgDispatcher) Cancel() {
+ // TODO
+}
+
// recvMMsgDispatch reads more than one packet at a time from the file
// descriptor and dispatches it.
func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
diff --git a/pkg/tcpip/link/rawfile/rawfile_unsafe.go b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
index ba92aedbc..591cb4279 100644
--- a/pkg/tcpip/link/rawfile/rawfile_unsafe.go
+++ b/pkg/tcpip/link/rawfile/rawfile_unsafe.go
@@ -19,6 +19,7 @@
package rawfile
import (
+ "os"
"unsafe"
"golang.org/x/sys/unix"
@@ -105,7 +106,7 @@ func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
event := PollEvent{
FD: int32(fd),
- Events: 1, // POLLIN
+ Events: unix.POLLIN,
}
_, e = BlockingPoll(&event, 1, nil)
@@ -119,21 +120,39 @@ func BlockingRead(fd int, b []byte) (int, tcpip.Error) {
// stores the data in a list of iovecs buffers. If no data is available, it will
// block in a poll() syscall until the file descriptor becomes readable.
func BlockingReadv(fd int, iovecs []unix.Iovec) (int, tcpip.Error) {
+ return BlockingReadvWithCancel(fd, iovecs, nil)
+}
+
+func BlockingReadvWithCancel(fd int, iovecs []unix.Iovec, cancel *os.File) (int, tcpip.Error) {
for {
n, _, e := unix.RawSyscall(unix.SYS_READV, uintptr(fd), uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
if e == 0 {
return int(n), nil
}
- event := PollEvent{
- FD: int32(fd),
- Events: 1, // POLLIN
+ var cancelFd uintptr = 0
+ nfds := 1
+
+ if cancel != nil {
+ cancelFd = cancel.Fd()
+ nfds += 1
}
- _, e = BlockingPoll(&event, 1, nil)
+ events := []PollEvent{{
+ FD: int32(fd),
+ Events: unix.POLLIN,
+ },{
+ FD: int32(cancelFd),
+ Events: unix.POLLIN,
+ }}
+
+ _, e = BlockingPoll(&events[0], nfds, nil)
if e != 0 && e != unix.EINTR {
return 0, TranslateErrno(e)
}
+ if nfds > 1 && events[1].Revents == unix.POLLIN {
+ return 0, &tcpip.ErrAborted{}
+ }
}
}
@@ -157,7 +176,7 @@ func BlockingRecvMMsg(fd int, msgHdrs []MMsgHdr) (int, tcpip.Error) {
event := PollEvent{
FD: int32(fd),
- Events: 1, // POLLIN
+ Events: unix.POLLIN,
}
if _, e := BlockingPoll(&event, 1, nil); e != 0 && e != unix.EINTR {