From c8eeedcc1d6b1ee25532ae630a7efd7aa4656bdc Mon Sep 17 00:00:00 2001 From: Bhasker Hariharan Date: Tue, 24 Mar 2020 15:33:16 -0700 Subject: Add support for setting TCP segment hash. This allows the link layer endpoints to consistenly hash a TCP segment to a single underlying queue in case a link layer endpoint does support multiple underlying queues. Updates #231 PiperOrigin-RevId: 302760664 --- pkg/tcpip/transport/tcp/accept.go | 20 +++++- pkg/tcpip/transport/tcp/connect.go | 138 +++++++++++++++++++++++++----------- pkg/tcpip/transport/tcp/endpoint.go | 5 ++ pkg/tcpip/transport/tcp/protocol.go | 10 ++- 4 files changed, 130 insertions(+), 43 deletions(-) (limited to 'pkg/tcpip/transport') diff --git a/pkg/tcpip/transport/tcp/accept.go b/pkg/tcpip/transport/tcp/accept.go index 3f80995f3..b4c4c8ab1 100644 --- a/pkg/tcpip/transport/tcp/accept.go +++ b/pkg/tcpip/transport/tcp/accept.go @@ -445,7 +445,15 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { // RFC 793 section 3.4 page 35 (figure 12) outlines that a RST // must be sent in response to a SYN-ACK while in the listen // state to prevent completing a handshake from an old SYN. - e.sendTCP(&s.route, s.id, buffer.VectorisedView{}, e.ttl, e.sendTOS, header.TCPFlagRst, s.ackNumber, 0, 0, nil, nil) + e.sendTCP(&s.route, tcpFields{ + id: s.id, + ttl: e.ttl, + tos: e.sendTOS, + flags: header.TCPFlagRst, + seq: s.ackNumber, + ack: 0, + rcvWnd: 0, + }, buffer.VectorisedView{}, nil) return } @@ -493,7 +501,15 @@ func (e *endpoint) handleListenSegment(ctx *listenContext, s *segment) { TSEcr: opts.TSVal, MSS: mssForRoute(&s.route), } - e.sendSynTCP(&s.route, s.id, e.ttl, e.sendTOS, header.TCPFlagSyn|header.TCPFlagAck, cookie, s.sequenceNumber+1, ctx.rcvWnd, synOpts) + e.sendSynTCP(&s.route, tcpFields{ + id: s.id, + ttl: e.ttl, + tos: e.sendTOS, + flags: header.TCPFlagSyn | header.TCPFlagAck, + seq: cookie, + ack: s.sequenceNumber + 1, + rcvWnd: ctx.rcvWnd, + }, synOpts) e.stack.Stats().TCP.ListenOverflowSynCookieSent.Increment() } diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go index 79552fc61..1d245c2c6 100644 --- a/pkg/tcpip/transport/tcp/connect.go +++ b/pkg/tcpip/transport/tcp/connect.go @@ -308,7 +308,15 @@ func (h *handshake) synSentState(s *segment) *tcpip.Error { if ttl == 0 { ttl = s.route.DefaultTTL() } - h.ep.sendSynTCP(&s.route, h.ep.ID, ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + h.ep.sendSynTCP(&s.route, tcpFields{ + id: h.ep.ID, + ttl: ttl, + tos: h.ep.sendTOS, + flags: h.flags, + seq: h.iss, + ack: h.ackNum, + rcvWnd: h.rcvWnd, + }, synOpts) return nil } @@ -361,7 +369,15 @@ func (h *handshake) synRcvdState(s *segment) *tcpip.Error { SACKPermitted: h.ep.sackPermitted, MSS: h.ep.amss, } - h.ep.sendSynTCP(&s.route, h.ep.ID, h.ep.ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + h.ep.sendSynTCP(&s.route, tcpFields{ + id: h.ep.ID, + ttl: h.ep.ttl, + tos: h.ep.sendTOS, + flags: h.flags, + seq: h.iss, + ack: h.ackNum, + rcvWnd: h.rcvWnd, + }, synOpts) return nil } @@ -550,7 +566,16 @@ func (h *handshake) execute() *tcpip.Error { synOpts.WS = -1 } } - h.ep.sendSynTCP(&h.ep.route, h.ep.ID, h.ep.ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + + h.ep.sendSynTCP(&h.ep.route, tcpFields{ + id: h.ep.ID, + ttl: h.ep.ttl, + tos: h.ep.sendTOS, + flags: h.flags, + seq: h.iss, + ack: h.ackNum, + rcvWnd: h.rcvWnd, + }, synOpts) for h.state != handshakeCompleted { h.ep.mu.Unlock() @@ -573,7 +598,15 @@ func (h *handshake) execute() *tcpip.Error { // the connection with another ACK or data (as ACKs are never // retransmitted on their own). if h.active || !h.acked || h.deferAccept != 0 && time.Since(h.startTime) > h.deferAccept { - h.ep.sendSynTCP(&h.ep.route, h.ep.ID, h.ep.ttl, h.ep.sendTOS, h.flags, h.iss, h.ackNum, h.rcvWnd, synOpts) + h.ep.sendSynTCP(&h.ep.route, tcpFields{ + id: h.ep.ID, + ttl: h.ep.ttl, + tos: h.ep.sendTOS, + flags: h.flags, + seq: h.iss, + ack: h.ackNum, + rcvWnd: h.rcvWnd, + }, synOpts) } case wakerForNotification: @@ -686,18 +719,33 @@ func makeSynOptions(opts header.TCPSynOptions) []byte { return options[:offset] } -func (e *endpoint) sendSynTCP(r *stack.Route, id stack.TransportEndpointID, ttl, tos uint8, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts header.TCPSynOptions) *tcpip.Error { - options := makeSynOptions(opts) +// tcpFields is a struct to carry different parameters required by the +// send*TCP variant functions below. +type tcpFields struct { + id stack.TransportEndpointID + ttl uint8 + tos uint8 + flags byte + seq seqnum.Value + ack seqnum.Value + rcvWnd seqnum.Size + opts []byte + txHash uint32 +} + +func (e *endpoint) sendSynTCP(r *stack.Route, tf tcpFields, opts header.TCPSynOptions) *tcpip.Error { + tf.opts = makeSynOptions(opts) // We ignore SYN send errors and let the callers re-attempt send. - if err := e.sendTCP(r, id, buffer.VectorisedView{}, ttl, tos, flags, seq, ack, rcvWnd, options, nil); err != nil { + if err := e.sendTCP(r, tf, buffer.VectorisedView{}, nil); err != nil { e.stats.SendErrors.SynSendToNetworkFailed.Increment() } - putOptions(options) + putOptions(tf.opts) return nil } -func (e *endpoint) sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.VectorisedView, ttl, tos uint8, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) *tcpip.Error { - if err := sendTCP(r, id, data, ttl, tos, flags, seq, ack, rcvWnd, opts, gso); err != nil { +func (e *endpoint) sendTCP(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO) *tcpip.Error { + tf.txHash = e.txHash + if err := sendTCP(r, tf, data, gso); err != nil { e.stats.SendErrors.SegmentSendToNetworkFailed.Increment() return err } @@ -705,8 +753,8 @@ func (e *endpoint) sendTCP(r *stack.Route, id stack.TransportEndpointID, data bu return nil } -func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, pkt *stack.PacketBuffer, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) { - optLen := len(opts) +func buildTCPHdr(r *stack.Route, tf tcpFields, pkt *stack.PacketBuffer, gso *stack.GSO) { + optLen := len(tf.opts) hdr := &pkt.Header packetSize := pkt.DataSize off := pkt.DataOffset @@ -714,15 +762,15 @@ func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, pkt *stack.Packet tcp := header.TCP(hdr.Prepend(header.TCPMinimumSize + optLen)) pkt.TransportHeader = buffer.View(tcp) tcp.Encode(&header.TCPFields{ - SrcPort: id.LocalPort, - DstPort: id.RemotePort, - SeqNum: uint32(seq), - AckNum: uint32(ack), + SrcPort: tf.id.LocalPort, + DstPort: tf.id.RemotePort, + SeqNum: uint32(tf.seq), + AckNum: uint32(tf.ack), DataOffset: uint8(header.TCPMinimumSize + optLen), - Flags: flags, - WindowSize: uint16(rcvWnd), + Flags: tf.flags, + WindowSize: uint16(tf.rcvWnd), }) - copy(tcp[header.TCPMinimumSize:], opts) + copy(tcp[header.TCPMinimumSize:], tf.opts) length := uint16(hdr.UsedLength() + packetSize) xsum := r.PseudoHeaderChecksum(ProtocolNumber, length) @@ -737,13 +785,12 @@ func buildTCPHdr(r *stack.Route, id stack.TransportEndpointID, pkt *stack.Packet xsum = header.ChecksumVVWithOffset(pkt.Data, xsum, off, packetSize) tcp.SetChecksum(^tcp.CalculateChecksum(xsum)) } - } -func sendTCPBatch(r *stack.Route, id stack.TransportEndpointID, data buffer.VectorisedView, ttl, tos uint8, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) *tcpip.Error { - optLen := len(opts) - if rcvWnd > 0xffff { - rcvWnd = 0xffff +func sendTCPBatch(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO) *tcpip.Error { + optLen := len(tf.opts) + if tf.rcvWnd > 0xffff { + tf.rcvWnd = 0xffff } mss := int(gso.MSS) @@ -768,14 +815,15 @@ func sendTCPBatch(r *stack.Route, id stack.TransportEndpointID, data buffer.Vect pkts[i].DataOffset = off pkts[i].DataSize = packetSize pkts[i].Data = data - buildTCPHdr(r, id, &pkts[i], flags, seq, ack, rcvWnd, opts, gso) + pkts[i].Hash = tf.txHash + buildTCPHdr(r, tf, &pkts[i], gso) off += packetSize - seq = seq.Add(seqnum.Size(packetSize)) + tf.seq = tf.seq.Add(seqnum.Size(packetSize)) } - if ttl == 0 { - ttl = r.DefaultTTL() + if tf.ttl == 0 { + tf.ttl = r.DefaultTTL() } - sent, err := r.WritePackets(gso, pkts, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}) + sent, err := r.WritePackets(gso, pkts, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: tf.ttl, TOS: tf.tos}) if err != nil { r.Stats().TCP.SegmentSendErrors.IncrementBy(uint64(n - sent)) } @@ -785,14 +833,14 @@ func sendTCPBatch(r *stack.Route, id stack.TransportEndpointID, data buffer.Vect // sendTCP sends a TCP segment with the provided options via the provided // network endpoint and under the provided identity. -func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.VectorisedView, ttl, tos uint8, flags byte, seq, ack seqnum.Value, rcvWnd seqnum.Size, opts []byte, gso *stack.GSO) *tcpip.Error { - optLen := len(opts) - if rcvWnd > 0xffff { - rcvWnd = 0xffff +func sendTCP(r *stack.Route, tf tcpFields, data buffer.VectorisedView, gso *stack.GSO) *tcpip.Error { + optLen := len(tf.opts) + if tf.rcvWnd > 0xffff { + tf.rcvWnd = 0xffff } if r.Loop&stack.PacketLoop == 0 && gso != nil && gso.Type == stack.GSOSW && int(gso.MSS) < data.Size() { - return sendTCPBatch(r, id, data, ttl, tos, flags, seq, ack, rcvWnd, opts, gso) + return sendTCPBatch(r, tf, data, gso) } pkt := stack.PacketBuffer{ @@ -800,18 +848,19 @@ func sendTCP(r *stack.Route, id stack.TransportEndpointID, data buffer.Vectorise DataOffset: 0, DataSize: data.Size(), Data: data, + Hash: tf.txHash, } - buildTCPHdr(r, id, &pkt, flags, seq, ack, rcvWnd, opts, gso) + buildTCPHdr(r, tf, &pkt, gso) - if ttl == 0 { - ttl = r.DefaultTTL() + if tf.ttl == 0 { + tf.ttl = r.DefaultTTL() } - if err := r.WritePacket(gso, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: ttl, TOS: tos}, pkt); err != nil { + if err := r.WritePacket(gso, stack.NetworkHeaderParams{Protocol: ProtocolNumber, TTL: tf.ttl, TOS: tf.tos}, pkt); err != nil { r.Stats().TCP.SegmentSendErrors.Increment() return err } r.Stats().TCP.SegmentsSent.Increment() - if (flags & header.TCPFlagRst) != 0 { + if (tf.flags & header.TCPFlagRst) != 0 { r.Stats().TCP.ResetsSent.Increment() } return nil @@ -863,7 +912,16 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn sackBlocks = e.sack.Blocks[:e.sack.NumBlocks] } options := e.makeOptions(sackBlocks) - err := e.sendTCP(&e.route, e.ID, data, e.ttl, e.sendTOS, flags, seq, ack, rcvWnd, options, e.gso) + err := e.sendTCP(&e.route, tcpFields{ + id: e.ID, + ttl: e.ttl, + tos: e.sendTOS, + flags: flags, + seq: seq, + ack: ack, + rcvWnd: rcvWnd, + opts: options, + }, data, e.gso) putOptions(options) return err } diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go index 594efaa11..b6e571361 100644 --- a/pkg/tcpip/transport/tcp/endpoint.go +++ b/pkg/tcpip/transport/tcp/endpoint.go @@ -581,6 +581,10 @@ type endpoint struct { // endpoint and at this point the endpoint is only around // to complete the TCP shutdown. closed bool + + // txHash is the transport layer hash to be set on outbound packets + // emitted by this endpoint. + txHash uint32 } // UniqueID implements stack.TransportEndpoint.UniqueID. @@ -771,6 +775,7 @@ func newEndpoint(s *stack.Stack, netProto tcpip.NetworkProtocolNumber, waiterQue count: 9, }, uniqueID: s.UniqueID(), + txHash: s.Rand().Uint32(), } var ss SendBufferSizeOption diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go index 57985b85d..1377107ca 100644 --- a/pkg/tcpip/transport/tcp/protocol.go +++ b/pkg/tcpip/transport/tcp/protocol.go @@ -191,7 +191,15 @@ func replyWithReset(s *segment) { flags |= header.TCPFlagAck ack = s.sequenceNumber.Add(s.logicalLen()) } - sendTCP(&s.route, s.id, buffer.VectorisedView{}, s.route.DefaultTTL(), stack.DefaultTOS, flags, seq, ack, 0 /* rcvWnd */, nil /* options */, nil /* gso */) + sendTCP(&s.route, tcpFields{ + id: s.id, + ttl: s.route.DefaultTTL(), + tos: stack.DefaultTOS, + flags: flags, + seq: seq, + ack: ack, + rcvWnd: 0, + }, buffer.VectorisedView{}, nil /* gso */) } // SetOption implements stack.TransportProtocol.SetOption. -- cgit v1.2.3