diff options
Diffstat (limited to 'pkg/tcpip/link')
-rw-r--r-- | pkg/tcpip/link/channel/channel.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/fdbased/endpoint.go | 32 | ||||
-rw-r--r-- | pkg/tcpip/link/loopback/loopback.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/muxed/injectable.go | 7 | ||||
-rw-r--r-- | pkg/tcpip/link/sharedmem/sharedmem.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/sniffer/sniffer.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/waitable/waitable.go | 3 | ||||
-rw-r--r-- | pkg/tcpip/link/waitable/waitable_test.go | 3 |
8 files changed, 54 insertions, 3 deletions
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go index eec430d0a..18adb2085 100644 --- a/pkg/tcpip/link/channel/channel.go +++ b/pkg/tcpip/link/channel/channel.go @@ -133,3 +133,6 @@ func (e *Endpoint) WritePacket(_ *stack.Route, gso *stack.GSO, hdr buffer.Prepen return nil } + +// Wait implements stack.LinkEndpoint.Wait. +func (*Endpoint) Wait() {} diff --git a/pkg/tcpip/link/fdbased/endpoint.go b/pkg/tcpip/link/fdbased/endpoint.go index adcf21371..7636418b1 100644 --- a/pkg/tcpip/link/fdbased/endpoint.go +++ b/pkg/tcpip/link/fdbased/endpoint.go @@ -41,6 +41,7 @@ package fdbased import ( "fmt" + "sync" "syscall" "golang.org/x/sys/unix" @@ -81,6 +82,19 @@ const ( PacketMMap ) +func (p PacketDispatchMode) String() string { + switch p { + case Readv: + return "Readv" + case RecvMMsg: + return "RecvMMsg" + case PacketMMap: + return "PacketMMap" + default: + return fmt.Sprintf("unknown packet dispatch mode %v", p) + } +} + type endpoint struct { // fds is the set of file descriptors each identifying one inbound/outbound // channel. The endpoint will dispatch from all inbound channels as well as @@ -114,6 +128,9 @@ type endpoint struct { // gsoMaxSize is the maximum GSO packet size. It is zero if GSO is // disabled. gsoMaxSize uint32 + + // wg keeps track of running goroutines. + wg sync.WaitGroup } // Options specify the details about the fd-based endpoint to be created. @@ -164,7 +181,8 @@ type Options struct { // New creates a new fd-based endpoint. // // Makes fd non-blocking, but does not take ownership of fd, which must remain -// open for the lifetime of the returned endpoint. +// open for the lifetime of the returned endpoint (until after the endpoint has +// stopped being using and Wait returns). func New(opts *Options) (stack.LinkEndpoint, error) { caps := stack.LinkEndpointCapabilities(0) if opts.RXChecksumOffload { @@ -290,7 +308,11 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) { // saved, they stop sending outgoing packets and all incoming packets // are rejected. for i := range e.inboundDispatchers { - go e.dispatchLoop(e.inboundDispatchers[i]) // S/R-SAFE: See above. + e.wg.Add(1) + go func(i int) { // S/R-SAFE: See above. + e.dispatchLoop(e.inboundDispatchers[i]) + e.wg.Done() + }(i) } } @@ -320,6 +342,12 @@ func (e *endpoint) LinkAddress() tcpip.LinkAddress { return e.addr } +// Wait implements stack.LinkEndpoint.Wait. It waits for the endpoint to stop +// reading from its FD. +func (e *endpoint) Wait() { + e.wg.Wait() +} + // virtioNetHdr is declared in linux/virtio_net.h. type virtioNetHdr struct { flags uint8 diff --git a/pkg/tcpip/link/loopback/loopback.go b/pkg/tcpip/link/loopback/loopback.go index e121ea1a5..b36629d2c 100644 --- a/pkg/tcpip/link/loopback/loopback.go +++ b/pkg/tcpip/link/loopback/loopback.go @@ -85,3 +85,6 @@ func (e *endpoint) WritePacket(_ *stack.Route, _ *stack.GSO, hdr buffer.Prependa return nil } + +// Wait implements stack.LinkEndpoint.Wait. +func (*endpoint) Wait() {} diff --git a/pkg/tcpip/link/muxed/injectable.go b/pkg/tcpip/link/muxed/injectable.go index 3ed7b98d1..7c946101d 100644 --- a/pkg/tcpip/link/muxed/injectable.go +++ b/pkg/tcpip/link/muxed/injectable.go @@ -104,6 +104,13 @@ func (m *InjectableEndpoint) WriteRawPacket(dest tcpip.Address, packet []byte) * return endpoint.WriteRawPacket(dest, packet) } +// Wait implements stack.LinkEndpoint.Wait. +func (m *InjectableEndpoint) Wait() { + for _, ep := range m.routes { + ep.Wait() + } +} + // NewInjectableEndpoint creates a new multi-endpoint injectable endpoint. func NewInjectableEndpoint(routes map[tcpip.Address]stack.InjectableLinkEndpoint) *InjectableEndpoint { return &InjectableEndpoint{ diff --git a/pkg/tcpip/link/sharedmem/sharedmem.go b/pkg/tcpip/link/sharedmem/sharedmem.go index ba387af73..9e71d4edf 100644 --- a/pkg/tcpip/link/sharedmem/sharedmem.go +++ b/pkg/tcpip/link/sharedmem/sharedmem.go @@ -132,7 +132,8 @@ func (e *endpoint) Close() { } } -// Wait waits until all workers have stopped after a Close() call. +// Wait implements stack.LinkEndpoint.Wait. It waits until all workers have +// stopped after a Close() call. func (e *endpoint) Wait() { e.completed.Wait() } diff --git a/pkg/tcpip/link/sniffer/sniffer.go b/pkg/tcpip/link/sniffer/sniffer.go index e7b6d7912..e401dce44 100644 --- a/pkg/tcpip/link/sniffer/sniffer.go +++ b/pkg/tcpip/link/sniffer/sniffer.go @@ -240,6 +240,9 @@ func (e *endpoint) WritePacket(r *stack.Route, gso *stack.GSO, hdr buffer.Prepen return e.lower.WritePacket(r, gso, hdr, payload, protocol) } +// Wait implements stack.LinkEndpoint.Wait. +func (*endpoint) Wait() {} + func logPacket(prefix string, protocol tcpip.NetworkProtocolNumber, b buffer.View, gso *stack.GSO) { // Figure out the network layer info. var transProto uint8 diff --git a/pkg/tcpip/link/waitable/waitable.go b/pkg/tcpip/link/waitable/waitable.go index 408cc62f7..5a1791cb5 100644 --- a/pkg/tcpip/link/waitable/waitable.go +++ b/pkg/tcpip/link/waitable/waitable.go @@ -120,3 +120,6 @@ func (e *Endpoint) WaitWrite() { func (e *Endpoint) WaitDispatch() { e.dispatchGate.Close() } + +// Wait implements stack.LinkEndpoint.Wait. +func (e *Endpoint) Wait() {} diff --git a/pkg/tcpip/link/waitable/waitable_test.go b/pkg/tcpip/link/waitable/waitable_test.go index 1031438b1..ae23c96b7 100644 --- a/pkg/tcpip/link/waitable/waitable_test.go +++ b/pkg/tcpip/link/waitable/waitable_test.go @@ -70,6 +70,9 @@ func (e *countedEndpoint) WritePacket(r *stack.Route, _ *stack.GSO, hdr buffer.P return nil } +// Wait implements stack.LinkEndpoint.Wait. +func (*countedEndpoint) Wait() {} + func TestWaitWrite(t *testing.T) { ep := &countedEndpoint{} wep := New(ep) |