From 3bb8fec7e41fcc2138ddb4cba3f46100814fc523 Mon Sep 17 00:00:00 2001 From: Jordan Whited Date: Thu, 2 Mar 2023 14:48:02 -0800 Subject: conn, device, tun: implement vectorized I/O plumbing Accept packet vectors for reading and writing in the tun.Device and conn.Bind interfaces, so that the internal plumbing between these interfaces now passes a vector of packets. Vectors move untouched between these interfaces, i.e. if 128 packets are received from conn.Bind.Read(), 128 packets are passed to tun.Device.Write(). There is no internal buffering. Currently, existing implementations are only adjusted to have vectors of length one. Subsequent patches will improve that. Also, as a related fixup, use the unix and windows packages rather than the syscall package when possible. Co-authored-by: James Tucker Signed-off-by: James Tucker Signed-off-by: Jordan Whited Signed-off-by: Jason A. Donenfeld --- tun/errors.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++ tun/netstack/tun.go | 45 ++++++++++++++++++++-------------- tun/tun.go | 40 ++++++++++++++++++++++++------- tun/tun_darwin.go | 65 +++++++++++++++++++++++++++----------------------- tun/tun_freebsd.go | 53 +++++++++++++++++++++------------------- tun/tun_linux.go | 39 +++++++++++++++++------------- tun/tun_openbsd.go | 58 ++++++++++++++++++++++---------------------- tun/tun_windows.go | 52 ++++++++++++++++++++++------------------ tun/tuntest/tuntest.go | 29 +++++++++++++--------- 9 files changed, 283 insertions(+), 158 deletions(-) create mode 100644 tun/errors.go (limited to 'tun') diff --git a/tun/errors.go b/tun/errors.go new file mode 100644 index 0000000..e70b13c --- /dev/null +++ b/tun/errors.go @@ -0,0 +1,60 @@ +package tun + +import ( + "errors" + "fmt" +) + +var ( + // ErrTooManySegments is returned by Device.Read() when segmentation + // overflows the length of supplied buffers. This error should not cause + // reads to cease. + ErrTooManySegments = errors.New("too many segments") +) + +type errorBatch []error + +// ErrorBatch takes a possibly nil or empty list of errors, and if the list is +// non-nil returns an error type that wraps all of the errors. Expected usage is +// to append to an []errors and coerce the set to an error using this method. +func ErrorBatch(errs []error) error { + if len(errs) == 0 { + return nil + } + return errorBatch(errs) +} + +func (e errorBatch) Error() string { + if len(e) == 0 { + return "" + } + if len(e) == 1 { + return e[0].Error() + } + return fmt.Sprintf("batch operation: %v (and %d more errors)", e[0], len(e)-1) +} + +func (e errorBatch) Is(target error) bool { + for _, err := range e { + if errors.Is(err, target) { + return true + } + } + return false +} + +func (e errorBatch) As(target interface{}) bool { + for _, err := range e { + if errors.As(err, target) { + return true + } + } + return false +} + +func (e errorBatch) Unwrap() error { + if len(e) == 0 { + return nil + } + return e[0] +} diff --git a/tun/netstack/tun.go b/tun/netstack/tun.go index 37c879d..a0b212a 100644 --- a/tun/netstack/tun.go +++ b/tun/netstack/tun.go @@ -19,6 +19,7 @@ import ( "regexp" "strconv" "strings" + "syscall" "time" "golang.zx2c4.com/wireguard/tun" @@ -113,29 +114,37 @@ func (tun *netTun) Events() <-chan tun.Event { return tun.events } -func (tun *netTun) Read(buf []byte, offset int) (int, error) { +func (tun *netTun) Read(buf [][]byte, sizes []int, offset int) (int, error) { view, ok := <-tun.incomingPacket if !ok { return 0, os.ErrClosed } - return view.Read(buf[offset:]) + n, err := view.Read(buf[0][offset:]) + if err != nil { + return 0, err + } + sizes[0] = n + return 1, nil } -func (tun *netTun) Write(buf []byte, offset int) (int, error) { - packet := buf[offset:] - if len(packet) == 0 { - return 0, nil - } +func (tun *netTun) Write(buf [][]byte, offset int) (int, error) { + for _, buf := range buf { + packet := buf[offset:] + if len(packet) == 0 { + continue + } - pkb := stack.NewPacketBuffer(stack.PacketBufferOptions{Payload: bufferv2.MakeWithData(packet)}) - switch packet[0] >> 4 { - case 4: - tun.ep.InjectInbound(header.IPv4ProtocolNumber, pkb) - case 6: - tun.ep.InjectInbound(header.IPv6ProtocolNumber, pkb) + pkb := stack.NewPacketBuffer(stack.PacketBufferOptions{Payload: bufferv2.MakeWithData(packet)}) + switch packet[0] >> 4 { + case 4: + tun.ep.InjectInbound(header.IPv4ProtocolNumber, pkb) + case 6: + tun.ep.InjectInbound(header.IPv6ProtocolNumber, pkb) + default: + return 0, syscall.EAFNOSUPPORT + } } - return len(buf), nil } @@ -151,10 +160,6 @@ func (tun *netTun) WriteNotify() { tun.incomingPacket <- view } -func (tun *netTun) Flush() error { - return nil -} - func (tun *netTun) Close() error { tun.stack.RemoveNIC(1) @@ -175,6 +180,10 @@ func (tun *netTun) MTU() (int, error) { return tun.mtu, nil } +func (tun *netTun) BatchSize() int { + return 1 +} + func convertToFullAddr(endpoint netip.AddrPort) (tcpip.FullAddress, tcpip.NetworkProtocolNumber) { var protoNumber tcpip.NetworkProtocolNumber if endpoint.Addr().Is4() { diff --git a/tun/tun.go b/tun/tun.go index 01051b9..e203ba8 100644 --- a/tun/tun.go +++ b/tun/tun.go @@ -18,12 +18,36 @@ const ( ) type Device interface { - File() *os.File // returns the file descriptor of the device - Read([]byte, int) (int, error) // read a packet from the device (without any additional headers) - Write([]byte, int) (int, error) // writes a packet to the device (without any additional headers) - Flush() error // flush all previous writes to the device - MTU() (int, error) // returns the MTU of the device - Name() (string, error) // fetches and returns the current name - Events() <-chan Event // returns a constant channel of events related to the device - Close() error // stops the device and closes the event channel + // File returns the file descriptor of the device. + File() *os.File + + // Read one or more packets from the Device (without any additional headers). + // On a successful read it returns the number of packets read, and sets + // packet lengths within the sizes slice. len(sizes) must be >= len(buffs). + // A nonzero offset can be used to instruct the Device on where to begin + // reading into each element of the buffs slice. + Read(buffs [][]byte, sizes []int, offset int) (n int, err error) + + // Write one or more packets to the device (without any additional headers). + // On a successful write it returns the number of packets written. A nonzero + // offset can be used to instruct the Device on where to begin writing from + // each packet contained within the buffs slice. + Write(buffs [][]byte, offset int) (int, error) + + // MTU returns the MTU of the Device. + MTU() (int, error) + + // Name returns the current name of the Device. + Name() (string, error) + + // Events returns a channel of type Event, which is fed Device events. + Events() <-chan Event + + // Close stops the Device and closes the Event channel. + Close() error + + // BatchSize returns the preferred/max number of packets that can be read or + // written in a single read/write call. BatchSize must not change over the + // lifetime of a Device. + BatchSize() int } diff --git a/tun/tun_darwin.go b/tun/tun_darwin.go index 7411a69..b927e6f 100644 --- a/tun/tun_darwin.go +++ b/tun/tun_darwin.go @@ -8,6 +8,7 @@ package tun import ( "errors" "fmt" + "io" "net" "os" "sync" @@ -15,7 +16,6 @@ import ( "time" "unsafe" - "golang.org/x/net/ipv6" "golang.org/x/sys/unix" ) @@ -33,7 +33,7 @@ type NativeTun struct { func retryInterfaceByIndex(index int) (iface *net.Interface, err error) { for i := 0; i < 20; i++ { iface, err = net.InterfaceByIndex(index) - if err != nil && errors.Is(err, syscall.ENOMEM) { + if err != nil && errors.Is(err, unix.ENOMEM) { time.Sleep(time.Duration(i) * time.Second / 3) continue } @@ -55,7 +55,7 @@ func (tun *NativeTun) routineRouteListener(tunIfindex int) { retry: n, err := unix.Read(tun.routeSocket, data) if err != nil { - if errno, ok := err.(syscall.Errno); ok && errno == syscall.EINTR { + if errno, ok := err.(unix.Errno); ok && errno == unix.EINTR { goto retry } tun.errors <- err @@ -217,45 +217,46 @@ func (tun *NativeTun) Events() <-chan Event { return tun.events } -func (tun *NativeTun) Read(buff []byte, offset int) (int, error) { +func (tun *NativeTun) Read(buffs [][]byte, sizes []int, offset int) (int, error) { + // TODO: the BSDs look very similar in Read() and Write(). They should be + // collapsed, with platform-specific files containing the varying parts of + // their implementations. select { case err := <-tun.errors: return 0, err default: - buff := buff[offset-4:] + buff := buffs[0][offset-4:] n, err := tun.tunFile.Read(buff[:]) if n < 4 { return 0, err } - return n - 4, err + sizes[0] = n - 4 + return 1, err } } -func (tun *NativeTun) Write(buff []byte, offset int) (int, error) { - // reserve space for header - - buff = buff[offset-4:] - - // add packet information header - - buff[0] = 0x00 - buff[1] = 0x00 - buff[2] = 0x00 - - if buff[4]>>4 == ipv6.Version { - buff[3] = unix.AF_INET6 - } else { - buff[3] = unix.AF_INET +func (tun *NativeTun) Write(buffs [][]byte, offset int) (int, error) { + if offset < 4 { + return 0, io.ErrShortBuffer } - - // write - - return tun.tunFile.Write(buff) -} - -func (tun *NativeTun) Flush() error { - // TODO: can flushing be implemented by buffering and using sendmmsg? - return nil + for i, buf := range buffs { + buf = buf[offset-4:] + buf[0] = 0x00 + buf[1] = 0x00 + buf[2] = 0x00 + switch buf[4] >> 4 { + case 4: + buf[3] = unix.AF_INET + case 6: + buf[3] = unix.AF_INET6 + default: + return i, unix.EAFNOSUPPORT + } + if _, err := tun.tunFile.Write(buf); err != nil { + return i, err + } + } + return len(buffs), nil } func (tun *NativeTun) Close() error { @@ -318,6 +319,10 @@ func (tun *NativeTun) MTU() (int, error) { return int(ifr.MTU), nil } +func (tun *NativeTun) BatchSize() int { + return 1 +} + func socketCloexec(family, sotype, proto int) (fd int, err error) { // See go/src/net/sys_cloexec.go for background. syscall.ForkLock.RLock() diff --git a/tun/tun_freebsd.go b/tun/tun_freebsd.go index 42431aa..0783f74 100644 --- a/tun/tun_freebsd.go +++ b/tun/tun_freebsd.go @@ -333,45 +333,46 @@ func (tun *NativeTun) Events() <-chan Event { return tun.events } -func (tun *NativeTun) Read(buff []byte, offset int) (int, error) { +func (tun *NativeTun) Read(buffs [][]byte, sizes []int, offset int) (int, error) { select { case err := <-tun.errors: return 0, err default: - buff := buff[offset-4:] + buff := buffs[0][offset-4:] n, err := tun.tunFile.Read(buff[:]) if n < 4 { return 0, err } - return n - 4, err + sizes[0] = n - 4 + return 1, err } } -func (tun *NativeTun) Write(buf []byte, offset int) (int, error) { +func (tun *NativeTun) Write(buffs [][]byte, offset int) (int, error) { if offset < 4 { return 0, io.ErrShortBuffer } - buf = buf[offset-4:] - if len(buf) < 5 { - return 0, io.ErrShortBuffer - } - buf[0] = 0x00 - buf[1] = 0x00 - buf[2] = 0x00 - switch buf[4] >> 4 { - case 4: - buf[3] = unix.AF_INET - case 6: - buf[3] = unix.AF_INET6 - default: - return 0, unix.EAFNOSUPPORT + for i, buf := range buffs { + buf = buf[offset-4:] + if len(buf) < 5 { + return i, io.ErrShortBuffer + } + buf[0] = 0x00 + buf[1] = 0x00 + buf[2] = 0x00 + switch buf[4] >> 4 { + case 4: + buf[3] = unix.AF_INET + case 6: + buf[3] = unix.AF_INET6 + default: + return i, unix.EAFNOSUPPORT + } + if _, err := tun.tunFile.Write(buf); err != nil { + return i, err + } } - return tun.tunFile.Write(buf) -} - -func (tun *NativeTun) Flush() error { - // TODO: can flushing be implemented by buffering and using sendmmsg? - return nil + return len(buffs), nil } func (tun *NativeTun) Close() error { @@ -428,3 +429,7 @@ func (tun *NativeTun) MTU() (int, error) { } return int(*(*int32)(unsafe.Pointer(&ifr.MTU))), nil } + +func (tun *NativeTun) BatchSize() int { + return 1 +} diff --git a/tun/tun_linux.go b/tun/tun_linux.go index 25dbc07..21984ca 100644 --- a/tun/tun_linux.go +++ b/tun/tun_linux.go @@ -323,12 +323,13 @@ func (tun *NativeTun) nameSlow() (string, error) { return unix.ByteSliceToString(ifr[:]), nil } -func (tun *NativeTun) Write(buf []byte, offset int) (int, error) { +func (tun *NativeTun) Write(buffs [][]byte, offset int) (n int, err error) { + var buf []byte if tun.nopi { - buf = buf[offset:] + buf = buffs[0][offset:] } else { // reserve space for header - buf = buf[offset-4:] + buf = buffs[0][offset-4:] // add packet information header buf[0] = 0x00 @@ -342,34 +343,36 @@ func (tun *NativeTun) Write(buf []byte, offset int) (int, error) { } } - n, err := tun.tunFile.Write(buf) + _, err = tun.tunFile.Write(buf) if errors.Is(err, syscall.EBADFD) { err = os.ErrClosed + } else if err == nil { + n = 1 } return n, err } -func (tun *NativeTun) Flush() error { - // TODO: can flushing be implemented by buffering and using sendmmsg? - return nil -} - -func (tun *NativeTun) Read(buf []byte, offset int) (n int, err error) { +func (tun *NativeTun) Read(buffs [][]byte, sizes []int, offset int) (n int, err error) { select { case err = <-tun.errors: default: if tun.nopi { - n, err = tun.tunFile.Read(buf[offset:]) + sizes[0], err = tun.tunFile.Read(buffs[0][offset:]) + if err == nil { + n = 1 + } } else { - buff := buf[offset-4:] - n, err = tun.tunFile.Read(buff[:]) + buff := buffs[0][offset-4:] + sizes[0], err = tun.tunFile.Read(buff[:]) if errors.Is(err, syscall.EBADFD) { err = os.ErrClosed + } else if err == nil { + n = 1 } - if n < 4 { - n = 0 + if sizes[0] < 4 { + sizes[0] = 0 } else { - n -= 4 + sizes[0] -= 4 } } } @@ -399,6 +402,10 @@ func (tun *NativeTun) Close() error { return err2 } +func (tun *NativeTun) BatchSize() int { + return 1 +} + func CreateTUN(name string, mtu int) (Device, error) { nfd, err := unix.Open(cloneDevicePath, unix.O_RDWR|unix.O_CLOEXEC, 0) if err != nil { diff --git a/tun/tun_openbsd.go b/tun/tun_openbsd.go index e7fd79c..210830c 100644 --- a/tun/tun_openbsd.go +++ b/tun/tun_openbsd.go @@ -8,13 +8,13 @@ package tun import ( "errors" "fmt" + "io" "net" "os" "sync" "syscall" "unsafe" - "golang.org/x/net/ipv6" "golang.org/x/sys/unix" ) @@ -204,45 +204,43 @@ func (tun *NativeTun) Events() <-chan Event { return tun.events } -func (tun *NativeTun) Read(buff []byte, offset int) (int, error) { +func (tun *NativeTun) Read(buffs [][]byte, sizes []int, offset int) (int, error) { select { case err := <-tun.errors: return 0, err default: - buff := buff[offset-4:] + buff := buffs[0][offset-4:] n, err := tun.tunFile.Read(buff[:]) if n < 4 { return 0, err } - return n - 4, err + sizes[0] = n - 4 + return 1, err } } -func (tun *NativeTun) Write(buff []byte, offset int) (int, error) { - // reserve space for header - - buff = buff[offset-4:] - - // add packet information header - - buff[0] = 0x00 - buff[1] = 0x00 - buff[2] = 0x00 - - if buff[4]>>4 == ipv6.Version { - buff[3] = unix.AF_INET6 - } else { - buff[3] = unix.AF_INET +func (tun *NativeTun) Write(buffs [][]byte, offset int) (int, error) { + if offset < 4 { + return 0, io.ErrShortBuffer } - - // write - - return tun.tunFile.Write(buff) -} - -func (tun *NativeTun) Flush() error { - // TODO: can flushing be implemented by buffering and using sendmmsg? - return nil + for i, buf := range buffs { + buf = buf[offset-4:] + buf[0] = 0x00 + buf[1] = 0x00 + buf[2] = 0x00 + switch buf[4] >> 4 { + case 4: + buf[3] = unix.AF_INET + case 6: + buf[3] = unix.AF_INET6 + default: + return i, unix.EAFNOSUPPORT + } + if _, err := tun.tunFile.Write(buf); err != nil { + return i, err + } + } + return len(buffs), nil } func (tun *NativeTun) Close() error { @@ -329,3 +327,7 @@ func (tun *NativeTun) MTU() (int, error) { return int(*(*int32)(unsafe.Pointer(&ifr.MTU))), nil } + +func (tun *NativeTun) BatchSize() int { + return 1 +} diff --git a/tun/tun_windows.go b/tun/tun_windows.go index d5abb14..320dd59 100644 --- a/tun/tun_windows.go +++ b/tun/tun_windows.go @@ -15,7 +15,6 @@ import ( _ "unsafe" "golang.org/x/sys/windows" - "golang.zx2c4.com/wintun" ) @@ -44,6 +43,7 @@ type NativeTun struct { closeOnce sync.Once close atomic.Bool forcedMTU int + outSizes []int } var ( @@ -134,9 +134,14 @@ func (tun *NativeTun) ForceMTU(mtu int) { } } +func (tun *NativeTun) BatchSize() int { + // TODO: implement batching with wintun + return 1 +} + // Note: Read() and Write() assume the caller comes only from a single thread; there's no locking. -func (tun *NativeTun) Read(buff []byte, offset int) (int, error) { +func (tun *NativeTun) Read(buffs [][]byte, sizes []int, offset int) (int, error) { tun.running.Add(1) defer tun.running.Done() retry: @@ -153,10 +158,11 @@ retry: switch err { case nil: packetSize := len(packet) - copy(buff[offset:], packet) + copy(buffs[0][offset:], packet) + sizes[0] = packetSize tun.session.ReleaseReceivePacket(packet) tun.rate.update(uint64(packetSize)) - return packetSize, nil + return 1, nil case windows.ERROR_NO_MORE_ITEMS: if !shouldSpin || uint64(nanotime()-start) >= spinloopDuration { windows.WaitForSingleObject(tun.readWait, windows.INFINITE) @@ -173,33 +179,33 @@ retry: } } -func (tun *NativeTun) Flush() error { - return nil -} - -func (tun *NativeTun) Write(buff []byte, offset int) (int, error) { +func (tun *NativeTun) Write(buffs [][]byte, offset int) (int, error) { tun.running.Add(1) defer tun.running.Done() if tun.close.Load() { return 0, os.ErrClosed } - packetSize := len(buff) - offset - tun.rate.update(uint64(packetSize)) + for i, buff := range buffs { + packetSize := len(buff) - offset + tun.rate.update(uint64(packetSize)) - packet, err := tun.session.AllocateSendPacket(packetSize) - if err == nil { - copy(packet, buff[offset:]) - tun.session.SendPacket(packet) - return packetSize, nil - } - switch err { - case windows.ERROR_HANDLE_EOF: - return 0, os.ErrClosed - case windows.ERROR_BUFFER_OVERFLOW: - return 0, nil // Dropping when ring is full. + packet, err := tun.session.AllocateSendPacket(packetSize) + switch err { + case nil: + // TODO: Explore options to eliminate this copy. + copy(packet, buff[offset:]) + tun.session.SendPacket(packet) + continue + case windows.ERROR_HANDLE_EOF: + return i, os.ErrClosed + case windows.ERROR_BUFFER_OVERFLOW: + continue // Dropping when ring is full. + default: + return i, fmt.Errorf("Write failed: %w", err) + } } - return 0, fmt.Errorf("Write failed: %w", err) + return len(buffs), nil } // LUID returns Windows interface instance ID. diff --git a/tun/tuntest/tuntest.go b/tun/tuntest/tuntest.go index b143c76..d07e860 100644 --- a/tun/tuntest/tuntest.go +++ b/tun/tuntest/tuntest.go @@ -110,35 +110,42 @@ type chTun struct { func (t *chTun) File() *os.File { return nil } -func (t *chTun) Read(data []byte, offset int) (int, error) { +func (t *chTun) Read(packets [][]byte, sizes []int, offset int) (int, error) { select { case <-t.c.closed: return 0, os.ErrClosed case msg := <-t.c.Outbound: - return copy(data[offset:], msg), nil + n := copy(packets[0][offset:], msg) + sizes[0] = n + return 1, nil } } // Write is called by the wireguard device to deliver a packet for routing. -func (t *chTun) Write(data []byte, offset int) (int, error) { +func (t *chTun) Write(packets [][]byte, offset int) (int, error) { if offset == -1 { close(t.c.closed) close(t.c.events) return 0, io.EOF } - msg := make([]byte, len(data)-offset) - copy(msg, data[offset:]) - select { - case <-t.c.closed: - return 0, os.ErrClosed - case t.c.Inbound <- msg: - return len(data) - offset, nil + for i, data := range packets { + msg := make([]byte, len(data)-offset) + copy(msg, data[offset:]) + select { + case <-t.c.closed: + return i, os.ErrClosed + case t.c.Inbound <- msg: + } } + return len(packets), nil +} + +func (t *chTun) BatchSize() int { + return 1 } const DefaultMTU = 1420 -func (t *chTun) Flush() error { return nil } func (t *chTun) MTU() (int, error) { return DefaultMTU, nil } func (t *chTun) Name() (string, error) { return "loopbackTun1", nil } func (t *chTun) Events() <-chan tun.Event { return t.c.events } -- cgit v1.2.3