diff options
author | Rahat Mahmood <rahat@google.com> | 2019-08-29 14:29:43 -0700 |
---|---|---|
committer | gVisor bot <gvisor-bot@google.com> | 2019-08-29 14:30:41 -0700 |
commit | 863e11ac4d6a49787cd5e5f6fe1cd771d0ceb100 (patch) | |
tree | 93c781bcce55dec62f4acd0725ff4d0192ca8054 /pkg | |
parent | 0789b9cc08249f8d0d6efcb25029efd271e47a9d (diff) |
Implement /proc/net/udp.
PiperOrigin-RevId: 266229756
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/sentry/fs/proc/BUILD | 1 | ||||
-rw-r--r-- | pkg/sentry/fs/proc/net.go | 201 | ||||
-rw-r--r-- | pkg/sentry/socket/epsocket/epsocket.go | 25 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint.go | 61 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/endpoint_state.go | 4 | ||||
-rw-r--r-- | pkg/tcpip/transport/udp/forwarder.go | 2 |
6 files changed, 239 insertions, 55 deletions
diff --git a/pkg/sentry/fs/proc/BUILD b/pkg/sentry/fs/proc/BUILD index 70ed854a8..c7599d1f6 100644 --- a/pkg/sentry/fs/proc/BUILD +++ b/pkg/sentry/fs/proc/BUILD @@ -31,7 +31,6 @@ go_library( visibility = ["//pkg/sentry:internal"], deps = [ "//pkg/abi/linux", - "//pkg/binary", "//pkg/log", "//pkg/sentry/context", "//pkg/sentry/fs", diff --git a/pkg/sentry/fs/proc/net.go b/pkg/sentry/fs/proc/net.go index 9adb23608..5e28982c5 100644 --- a/pkg/sentry/fs/proc/net.go +++ b/pkg/sentry/fs/proc/net.go @@ -17,10 +17,10 @@ package proc import ( "bytes" "fmt" + "io" "time" "gvisor.dev/gvisor/pkg/abi/linux" - "gvisor.dev/gvisor/pkg/binary" "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/sentry/context" "gvisor.dev/gvisor/pkg/sentry/fs" @@ -28,9 +28,11 @@ import ( "gvisor.dev/gvisor/pkg/sentry/fs/ramfs" "gvisor.dev/gvisor/pkg/sentry/inet" "gvisor.dev/gvisor/pkg/sentry/kernel" + "gvisor.dev/gvisor/pkg/sentry/kernel/auth" "gvisor.dev/gvisor/pkg/sentry/socket" "gvisor.dev/gvisor/pkg/sentry/socket/unix" "gvisor.dev/gvisor/pkg/sentry/socket/unix/transport" + "gvisor.dev/gvisor/pkg/sentry/usermem" ) // newNet creates a new proc net entry. @@ -57,9 +59,8 @@ func (p *proc) newNetDir(ctx context.Context, k *kernel.Kernel, msrc *fs.MountSo "ptype": newStaticProcInode(ctx, msrc, []byte("Type Device Function")), "route": newStaticProcInode(ctx, msrc, []byte("Iface Destination Gateway Flags RefCnt Use Metric Mask MTU Window IRTT")), "tcp": seqfile.NewSeqFileInode(ctx, &netTCP{k: k}, msrc), - "udp": newStaticProcInode(ctx, msrc, []byte(" sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode ref pointer drops")), - - "unix": seqfile.NewSeqFileInode(ctx, &netUnix{k: k}, msrc), + "udp": seqfile.NewSeqFileInode(ctx, &netUDP{k: k}, msrc), + "unix": seqfile.NewSeqFileInode(ctx, &netUnix{k: k}, msrc), } if s.SupportsIPv6() { @@ -216,7 +217,7 @@ func (n *netUnix) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]s for _, se := range n.k.ListSockets() { s := se.Sock.Get() if s == nil { - log.Debugf("Couldn't resolve weakref %v in socket table, racing with destruction?", se.Sock) + log.Debugf("Couldn't resolve weakref with ID %v in socket table, racing with destruction?", se.ID) continue } sfile := s.(*fs.File) @@ -297,6 +298,42 @@ func (n *netUnix) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]s return data, 0 } +func networkToHost16(n uint16) uint16 { + // n is in network byte order, so is big-endian. The most-significant byte + // should be stored in the lower address. + // + // We manually inline binary.BigEndian.Uint16() because Go does not support + // non-primitive consts, so binary.BigEndian is a (mutable) var, so calls to + // binary.BigEndian.Uint16() require a read of binary.BigEndian and an + // interface method call, defeating inlining. + buf := [2]byte{byte(n >> 8 & 0xff), byte(n & 0xff)} + return usermem.ByteOrder.Uint16(buf[:]) +} + +func writeInetAddr(w io.Writer, a linux.SockAddrInet) { + // linux.SockAddrInet.Port is stored in the network byte order and is + // printed like a number in host byte order. Note that all numbers in host + // byte order are printed with the most-significant byte first when + // formatted with %X. See get_tcp4_sock() and udp4_format_sock() in Linux. + port := networkToHost16(a.Port) + + // linux.SockAddrInet.Addr is stored as a byte slice in big-endian order + // (i.e. most-significant byte in index 0). Linux represents this as a + // __be32 which is a typedef for an unsigned int, and is printed with + // %X. This means that for a little-endian machine, Linux prints the + // least-significant byte of the address first. To emulate this, we first + // invert the byte order for the address using usermem.ByteOrder.Uint32, + // which makes it have the equivalent encoding to a __be32 on a little + // endian machine. Note that this operation is a no-op on a big endian + // machine. Then similar to Linux, we format it with %X, which will print + // the most-significant byte of the __be32 address first, which is now + // actually the least-significant byte of the original address in + // linux.SockAddrInet.Addr on little endian machines, due to the conversion. + addr := usermem.ByteOrder.Uint32(a.Addr[:]) + + fmt.Fprintf(w, "%08X:%04X ", addr, port) +} + // netTCP implements seqfile.SeqSource for /proc/net/tcp. // // +stateify savable @@ -311,6 +348,9 @@ func (*netTCP) NeedsUpdate(generation int64) bool { // ReadSeqFileData implements seqfile.SeqSource.ReadSeqFileData. func (n *netTCP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]seqfile.SeqData, int64) { + // t may be nil here if our caller is not part of a task goroutine. This can + // happen for example if we're here for "sentryctl cat". When t is nil, + // degrade gracefully and retrieve what we can. t := kernel.TaskFromContext(ctx) if h != nil { @@ -321,7 +361,7 @@ func (n *netTCP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]se for _, se := range n.k.ListSockets() { s := se.Sock.Get() if s == nil { - log.Debugf("Couldn't resolve weakref %+v in socket table, racing with destruction?", se.Sock) + log.Debugf("Couldn't resolve weakref with ID %v in socket table, racing with destruction?", se.ID) continue } sfile := s.(*fs.File) @@ -343,27 +383,23 @@ func (n *netTCP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]se // Field: sl; entry number. fmt.Fprintf(&buf, "%4d: ", se.ID) - portBuf := make([]byte, 2) - // Field: local_adddress. var localAddr linux.SockAddrInet - if local, _, err := sops.GetSockName(t); err == nil { - localAddr = *local.(*linux.SockAddrInet) + if t != nil { + if local, _, err := sops.GetSockName(t); err == nil { + localAddr = *local.(*linux.SockAddrInet) + } } - binary.LittleEndian.PutUint16(portBuf, localAddr.Port) - fmt.Fprintf(&buf, "%08X:%04X ", - binary.LittleEndian.Uint32(localAddr.Addr[:]), - portBuf) + writeInetAddr(&buf, localAddr) // Field: rem_address. var remoteAddr linux.SockAddrInet - if remote, _, err := sops.GetPeerName(t); err == nil { - remoteAddr = *remote.(*linux.SockAddrInet) + if t != nil { + if remote, _, err := sops.GetPeerName(t); err == nil { + remoteAddr = *remote.(*linux.SockAddrInet) + } } - binary.LittleEndian.PutUint16(portBuf, remoteAddr.Port) - fmt.Fprintf(&buf, "%08X:%04X ", - binary.LittleEndian.Uint32(remoteAddr.Addr[:]), - portBuf) + writeInetAddr(&buf, remoteAddr) // Field: state; socket state. fmt.Fprintf(&buf, "%02X ", sops.State()) @@ -386,7 +422,8 @@ func (n *netTCP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]se log.Warningf("Failed to retrieve unstable attr for socket file: %v", err) fmt.Fprintf(&buf, "%5d ", 0) } else { - fmt.Fprintf(&buf, "%5d ", uint32(uattr.Owner.UID.In(t.UserNamespace()).OrOverflow())) + creds := auth.CredentialsFromContext(ctx) + fmt.Fprintf(&buf, "%5d ", uint32(uattr.Owner.UID.In(creds.UserNamespace).OrOverflow())) } // Field: timeout; number of unanswered 0-window probes. @@ -438,3 +475,125 @@ func (n *netTCP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]se } return data, 0 } + +// netUDP implements seqfile.SeqSource for /proc/net/udp. +// +// +stateify savable +type netUDP struct { + k *kernel.Kernel +} + +// NeedsUpdate implements seqfile.SeqSource.NeedsUpdate. +func (*netUDP) NeedsUpdate(generation int64) bool { + return true +} + +// ReadSeqFileData implements seqfile.SeqSource.ReadSeqFileData. +func (n *netUDP) ReadSeqFileData(ctx context.Context, h seqfile.SeqHandle) ([]seqfile.SeqData, int64) { + // t may be nil here if our caller is not part of a task goroutine. This can + // happen for example if we're here for "sentryctl cat". When t is nil, + // degrade gracefully and retrieve what we can. + t := kernel.TaskFromContext(ctx) + + if h != nil { + return nil, 0 + } + + var buf bytes.Buffer + for _, se := range n.k.ListSockets() { + s := se.Sock.Get() + if s == nil { + log.Debugf("Couldn't resolve weakref with ID %v in socket table, racing with destruction?", se.ID) + continue + } + sfile := s.(*fs.File) + sops, ok := sfile.FileOperations.(socket.Socket) + if !ok { + panic(fmt.Sprintf("Found non-socket file in socket table: %+v", sfile)) + } + if family, stype, _ := sops.Type(); family != linux.AF_INET || stype != linux.SOCK_DGRAM { + s.DecRef() + // Not udp4 socket. + continue + } + + // For Linux's implementation, see net/ipv4/udp.c:udp4_format_sock(). + + // Field: sl; entry number. + fmt.Fprintf(&buf, "%5d: ", se.ID) + + // Field: local_adddress. + var localAddr linux.SockAddrInet + if t != nil { + if local, _, err := sops.GetSockName(t); err == nil { + localAddr = *local.(*linux.SockAddrInet) + } + } + writeInetAddr(&buf, localAddr) + + // Field: rem_address. + var remoteAddr linux.SockAddrInet + if t != nil { + if remote, _, err := sops.GetPeerName(t); err == nil { + remoteAddr = *remote.(*linux.SockAddrInet) + } + } + writeInetAddr(&buf, remoteAddr) + + // Field: state; socket state. + fmt.Fprintf(&buf, "%02X ", sops.State()) + + // Field: tx_queue, rx_queue; number of packets in the transmit and + // receive queue. Unimplemented. + fmt.Fprintf(&buf, "%08X:%08X ", 0, 0) + + // Field: tr, tm->when. Always 0 for UDP. + fmt.Fprintf(&buf, "%02X:%08X ", 0, 0) + + // Field: retrnsmt. Always 0 for UDP. + fmt.Fprintf(&buf, "%08X ", 0) + + // Field: uid. + uattr, err := sfile.Dirent.Inode.UnstableAttr(ctx) + if err != nil { + log.Warningf("Failed to retrieve unstable attr for socket file: %v", err) + fmt.Fprintf(&buf, "%5d ", 0) + } else { + creds := auth.CredentialsFromContext(ctx) + fmt.Fprintf(&buf, "%5d ", uint32(uattr.Owner.UID.In(creds.UserNamespace).OrOverflow())) + } + + // Field: timeout. Always 0 for UDP. + fmt.Fprintf(&buf, "%8d ", 0) + + // Field: inode. + fmt.Fprintf(&buf, "%8d ", sfile.InodeID()) + + // Field: ref; reference count on the socket inode. Don't count the ref + // we obtain while deferencing the weakref to this socket. + fmt.Fprintf(&buf, "%d ", sfile.ReadRefs()-1) + + // Field: Socket struct address. Redacted due to the same reason as + // the 'Num' field in /proc/net/unix, see netUnix.ReadSeqFileData. + fmt.Fprintf(&buf, "%#016p ", (*socket.Socket)(nil)) + + // Field: drops; number of dropped packets. Unimplemented. + fmt.Fprintf(&buf, "%d", 0) + + fmt.Fprintf(&buf, "\n") + + s.DecRef() + } + + data := []seqfile.SeqData{ + { + Buf: []byte(" sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode ref pointer drops \n"), + Handle: n, + }, + { + Buf: buf.Bytes(), + Handle: n, + }, + } + return data, 0 +} diff --git a/pkg/sentry/socket/epsocket/epsocket.go b/pkg/sentry/socket/epsocket/epsocket.go index 635042263..def29646e 100644 --- a/pkg/sentry/socket/epsocket/epsocket.go +++ b/pkg/sentry/socket/epsocket/epsocket.go @@ -27,12 +27,14 @@ package epsocket import ( "bytes" "math" + "reflect" "sync" "syscall" "time" "gvisor.dev/gvisor/pkg/abi/linux" "gvisor.dev/gvisor/pkg/binary" + "gvisor.dev/gvisor/pkg/log" "gvisor.dev/gvisor/pkg/metric" "gvisor.dev/gvisor/pkg/sentry/arch" "gvisor.dev/gvisor/pkg/sentry/context" @@ -52,6 +54,7 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/buffer" "gvisor.dev/gvisor/pkg/tcpip/stack" "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + "gvisor.dev/gvisor/pkg/tcpip/transport/udp" "gvisor.dev/gvisor/pkg/waiter" ) @@ -2421,7 +2424,8 @@ func (s *SocketOperations) State() uint32 { return 0 } - if !s.isPacketBased() { + switch { + case s.skType == linux.SOCK_STREAM && s.protocol == 0 || s.protocol == syscall.IPPROTO_TCP: // TCP socket. switch tcp.EndpointState(s.Endpoint.State()) { case tcp.StateEstablished: @@ -2450,9 +2454,26 @@ func (s *SocketOperations) State() uint32 { // Internal or unknown state. return 0 } + case s.skType == linux.SOCK_DGRAM && s.protocol == 0 || s.protocol == syscall.IPPROTO_UDP: + // UDP socket. + switch udp.EndpointState(s.Endpoint.State()) { + case udp.StateInitial, udp.StateBound, udp.StateClosed: + return linux.TCP_CLOSE + case udp.StateConnected: + return linux.TCP_ESTABLISHED + default: + return 0 + } + case s.skType == linux.SOCK_DGRAM && s.protocol == syscall.IPPROTO_ICMP || s.protocol == syscall.IPPROTO_ICMPV6: + // TODO(b/112063468): Export states for ICMP sockets. + case s.skType == linux.SOCK_RAW: + // TODO(b/112063468): Export states for raw sockets. + default: + // Unknown transport protocol, how did we make this socket? + log.Warningf("Unknown transport protocol for an existing socket: family=%v, type=%v, protocol=%v, internal type %v", s.family, s.skType, s.protocol, reflect.TypeOf(s.Endpoint).Elem()) + return 0 } - // TODO(b/112063468): Export states for UDP, ICMP, and raw sockets. return 0 } diff --git a/pkg/tcpip/transport/udp/endpoint.go b/pkg/tcpip/transport/udp/endpoint.go index ac5905772..66455ef46 100644 --- a/pkg/tcpip/transport/udp/endpoint.go +++ b/pkg/tcpip/transport/udp/endpoint.go @@ -37,13 +37,17 @@ type udpPacket struct { views [8]buffer.View `state:"nosave"` } -type endpointState int +// EndpointState represents the state of a UDP endpoint. +type EndpointState uint32 +// Endpoint states. Note that are represented in a netstack-specific manner and +// may not be meaningful externally. Specifically, they need to be translated to +// Linux's representation for these states if presented to userspace. const ( - stateInitial endpointState = iota - stateBound - stateConnected - stateClosed + StateInitial EndpointState = iota + StateBound + StateConnected + StateClosed ) // endpoint represents a UDP endpoint. This struct serves as the interface @@ -74,7 +78,7 @@ type endpoint struct { mu sync.RWMutex `state:"nosave"` sndBufSize int id stack.TransportEndpointID - state endpointState + state EndpointState bindNICID tcpip.NICID regNICID tcpip.NICID route stack.Route `state:"manual"` @@ -140,7 +144,7 @@ func (e *endpoint) Close() { e.shutdownFlags = tcpip.ShutdownRead | tcpip.ShutdownWrite switch e.state { - case stateBound, stateConnected: + case StateBound, StateConnected: e.stack.UnregisterTransportEndpoint(e.regNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e) e.stack.ReleasePort(e.effectiveNetProtos, ProtocolNumber, e.id.LocalAddress, e.id.LocalPort) } @@ -163,7 +167,7 @@ func (e *endpoint) Close() { e.route.Release() // Update the state. - e.state = stateClosed + e.state = StateClosed e.mu.Unlock() @@ -211,11 +215,11 @@ func (e *endpoint) Read(addr *tcpip.FullAddress) (buffer.View, tcpip.ControlMess // Returns true for retry if preparation should be retried. func (e *endpoint) prepareForWrite(to *tcpip.FullAddress) (retry bool, err *tcpip.Error) { switch e.state { - case stateInitial: - case stateConnected: + case StateInitial: + case StateConnected: return false, nil - case stateBound: + case StateBound: if to == nil { return false, tcpip.ErrDestinationRequired } @@ -232,7 +236,7 @@ func (e *endpoint) prepareForWrite(to *tcpip.FullAddress) (retry bool, err *tcpi // The state changed when we released the shared locked and re-acquired // it in exclusive mode. Try again. - if e.state != stateInitial { + if e.state != StateInitial { return true, nil } @@ -322,7 +326,7 @@ func (e *endpoint) Write(p tcpip.Payload, opts tcpip.WriteOptions) (int64, <-cha defer e.mu.Unlock() // Recheck state after lock was re-acquired. - if e.state != stateConnected { + if e.state != StateConnected { return 0, nil, tcpip.ErrInvalidEndpointState } } @@ -400,7 +404,7 @@ func (e *endpoint) SetSockOpt(opt interface{}) *tcpip.Error { defer e.mu.Unlock() // We only allow this to be set when we're in the initial state. - if e.state != stateInitial { + if e.state != StateInitial { return tcpip.ErrInvalidEndpointState } @@ -726,7 +730,7 @@ func (e *endpoint) Disconnect() *tcpip.Error { e.mu.Lock() defer e.mu.Unlock() - if e.state != stateConnected { + if e.state != StateConnected { return nil } id := stack.TransportEndpointID{} @@ -741,9 +745,9 @@ func (e *endpoint) Disconnect() *tcpip.Error { if err != nil { return err } - e.state = stateBound + e.state = StateBound } else { - e.state = stateInitial + e.state = StateInitial } e.stack.UnregisterTransportEndpoint(e.regNICID, e.effectiveNetProtos, ProtocolNumber, e.id, e) @@ -772,8 +776,8 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { nicid := addr.NIC var localPort uint16 switch e.state { - case stateInitial: - case stateBound, stateConnected: + case StateInitial: + case StateBound, StateConnected: localPort = e.id.LocalPort if e.bindNICID == 0 { break @@ -801,7 +805,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { RemoteAddress: r.RemoteAddress, } - if e.state == stateInitial { + if e.state == StateInitial { id.LocalAddress = r.LocalAddress } @@ -832,7 +836,7 @@ func (e *endpoint) Connect(addr tcpip.FullAddress) *tcpip.Error { e.regNICID = nicid e.effectiveNetProtos = netProtos - e.state = stateConnected + e.state = StateConnected e.rcvMu.Lock() e.rcvReady = true @@ -854,7 +858,7 @@ func (e *endpoint) Shutdown(flags tcpip.ShutdownFlags) *tcpip.Error { // A socket in the bound state can still receive multicast messages, // so we need to notify waiters on shutdown. - if e.state != stateBound && e.state != stateConnected { + if e.state != StateBound && e.state != StateConnected { return tcpip.ErrNotConnected } @@ -903,7 +907,7 @@ func (e *endpoint) registerWithStack(nicid tcpip.NICID, netProtos []tcpip.Networ func (e *endpoint) bindLocked(addr tcpip.FullAddress) *tcpip.Error { // Don't allow binding once endpoint is not in the initial state // anymore. - if e.state != stateInitial { + if e.state != StateInitial { return tcpip.ErrInvalidEndpointState } @@ -946,7 +950,7 @@ func (e *endpoint) bindLocked(addr tcpip.FullAddress) *tcpip.Error { e.effectiveNetProtos = netProtos // Mark endpoint as bound. - e.state = stateBound + e.state = StateBound e.rcvMu.Lock() e.rcvReady = true @@ -989,7 +993,7 @@ func (e *endpoint) GetRemoteAddress() (tcpip.FullAddress, *tcpip.Error) { e.mu.RLock() defer e.mu.RUnlock() - if e.state != stateConnected { + if e.state != StateConnected { return tcpip.FullAddress{}, tcpip.ErrNotConnected } @@ -1069,10 +1073,11 @@ func (e *endpoint) HandlePacket(r *stack.Route, id stack.TransportEndpointID, vv func (e *endpoint) HandleControlPacket(id stack.TransportEndpointID, typ stack.ControlType, extra uint32, vv buffer.VectorisedView) { } -// State implements socket.Socket.State. +// State implements tcpip.Endpoint.State. func (e *endpoint) State() uint32 { - // TODO(b/112063468): Translate internal state to values returned by Linux. - return 0 + e.mu.Lock() + defer e.mu.Unlock() + return uint32(e.state) } func isBroadcastOrMulticast(a tcpip.Address) bool { diff --git a/pkg/tcpip/transport/udp/endpoint_state.go b/pkg/tcpip/transport/udp/endpoint_state.go index 5cbb56120..be46e6d4e 100644 --- a/pkg/tcpip/transport/udp/endpoint_state.go +++ b/pkg/tcpip/transport/udp/endpoint_state.go @@ -77,7 +77,7 @@ func (e *endpoint) Resume(s *stack.Stack) { } } - if e.state != stateBound && e.state != stateConnected { + if e.state != StateBound && e.state != StateConnected { return } @@ -92,7 +92,7 @@ func (e *endpoint) Resume(s *stack.Stack) { } var err *tcpip.Error - if e.state == stateConnected { + if e.state == StateConnected { e.route, err = e.stack.FindRoute(e.regNICID, e.id.LocalAddress, e.id.RemoteAddress, netProto, e.multicastLoop) if err != nil { panic(err) diff --git a/pkg/tcpip/transport/udp/forwarder.go b/pkg/tcpip/transport/udp/forwarder.go index a874fc9fd..a9edc2c8d 100644 --- a/pkg/tcpip/transport/udp/forwarder.go +++ b/pkg/tcpip/transport/udp/forwarder.go @@ -84,7 +84,7 @@ func (r *ForwarderRequest) CreateEndpoint(queue *waiter.Queue) (tcpip.Endpoint, ep.dstPort = r.id.RemotePort ep.regNICID = r.route.NICID() - ep.state = stateConnected + ep.state = StateConnected ep.rcvMu.Lock() ep.rcvReady = true |