summaryrefslogtreecommitdiffhomepage
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/tcpip/transport/tcp/connect.go41
-rw-r--r--pkg/tcpip/transport/tcp/endpoint.go121
2 files changed, 88 insertions, 74 deletions
diff --git a/pkg/tcpip/transport/tcp/connect.go b/pkg/tcpip/transport/tcp/connect.go
index f711cd4df..62954d7e4 100644
--- a/pkg/tcpip/transport/tcp/connect.go
+++ b/pkg/tcpip/transport/tcp/connect.go
@@ -895,43 +895,46 @@ func (e *endpoint) sendRaw(data buffer.VectorisedView, flags byte, seq, ack seqn
return err
}
-func (e *endpoint) handleWrite() *tcpip.Error {
- // Move packets from send queue to send list. The queue is accessible
- // from other goroutines and protected by the send mutex, while the send
- // list is only accessible from the handler goroutine, so it needs no
- // mutexes.
+func (e *endpoint) handleWrite() {
e.sndBufMu.Lock()
+ next := e.drainSendQueueLocked()
+ e.sndBufMu.Unlock()
+
+ e.sendData(next)
+}
+// Move packets from send queue to send list.
+//
+// Precondition: e.sndBufMu must be locked.
+func (e *endpoint) drainSendQueueLocked() *segment {
first := e.sndQueue.Front()
if first != nil {
e.snd.writeList.PushBackList(&e.sndQueue)
e.sndBufInQueue = 0
}
+ return first
+}
- e.sndBufMu.Unlock()
-
+// Precondition: e.mu must be locked.
+func (e *endpoint) sendData(next *segment) {
// Initialize the next segment to write if it's currently nil.
if e.snd.writeNext == nil {
- e.snd.writeNext = first
+ e.snd.writeNext = next
}
// Push out any new packets.
e.snd.sendData()
-
- return nil
}
-func (e *endpoint) handleClose() *tcpip.Error {
+func (e *endpoint) handleClose() {
if !e.EndpointState().connected() {
- return nil
+ return
}
// Drain the send queue.
e.handleWrite()
// Mark send side as closed.
e.snd.closed = true
-
- return nil
}
// resetConnectionLocked puts the endpoint in an error state with the given
@@ -1348,11 +1351,17 @@ func (e *endpoint) protocolMainLoop(handshake bool, wakerInitDone chan<- struct{
}{
{
w: &e.sndWaker,
- f: e.handleWrite,
+ f: func() *tcpip.Error {
+ e.handleWrite()
+ return nil
+ },
},
{
w: &e.sndCloseWaker,
- f: e.handleClose,
+ f: func() *tcpip.Error {
+ e.handleClose()
+ return nil
+ },
},
{
w: &closeWaker,
diff --git a/pkg/tcpip/transport/tcp/endpoint.go b/pkg/tcpip/transport/tcp/endpoint.go
index 8d27d43c2..05c431e83 100644
--- a/pkg/tcpip/transport/tcp/endpoint.go
+++ b/pkg/tcpip/transport/tcp/endpoint.go
@@ -1513,74 +1513,79 @@ func (e *endpoint) Write(p tcpip.Payloader, opts tcpip.WriteOptions) (int64, *tc
// and opts.EndOfRecord are also ignored.
e.LockUser()
- e.sndBufMu.Lock()
-
- avail, err := e.isEndpointWritableLocked()
- if err != nil {
- e.sndBufMu.Unlock()
- e.UnlockUser()
- e.stats.WriteErrors.WriteClosed.Increment()
- return 0, err
- }
-
- // We can release locks while copying data.
- //
- // This is not possible if atomic is set, because we can't allow the
- // available buffer space to be consumed by some other caller while we
- // are copying data in.
- if !opts.Atomic {
- e.sndBufMu.Unlock()
- e.UnlockUser()
- }
-
- // Fetch data.
- if l := p.Len(); l < avail {
- avail = l
- }
- if avail == 0 {
- return 0, nil
- }
- v := make([]byte, avail)
- if _, err := io.ReadFull(p, v); err != nil {
- if opts.Atomic {
- e.sndBufMu.Unlock()
- e.UnlockUser()
- }
- return 0, tcpip.ErrBadBuffer
- }
+ defer e.UnlockUser()
- if !opts.Atomic {
- // Since we released locks in between it's possible that the
- // endpoint transitioned to a CLOSED/ERROR states so make
- // sure endpoint is still writable before trying to write.
- e.LockUser()
+ nextSeg, n, err := func() (*segment, int, *tcpip.Error) {
e.sndBufMu.Lock()
+ defer e.sndBufMu.Unlock()
+
avail, err := e.isEndpointWritableLocked()
if err != nil {
- e.sndBufMu.Unlock()
- e.UnlockUser()
e.stats.WriteErrors.WriteClosed.Increment()
- return 0, err
+ return nil, 0, err
}
- // Discard any excess data copied in due to avail being reduced due
- // to a simultaneous write call to the socket.
- if avail < len(v) {
- v = v[:avail]
+ v, err := func() ([]byte, *tcpip.Error) {
+ // We can release locks while copying data.
+ //
+ // This is not possible if atomic is set, because we can't allow the
+ // available buffer space to be consumed by some other caller while we
+ // are copying data in.
+ if !opts.Atomic {
+ e.sndBufMu.Unlock()
+ defer e.sndBufMu.Lock()
+
+ e.UnlockUser()
+ defer e.LockUser()
+ }
+
+ // Fetch data.
+ if l := p.Len(); l < avail {
+ avail = l
+ }
+ if avail == 0 {
+ return nil, nil
+ }
+ v := make([]byte, avail)
+ if _, err := io.ReadFull(p, v); err != nil {
+ return nil, tcpip.ErrBadBuffer
+ }
+ return v, nil
+ }()
+ if len(v) == 0 || err != nil {
+ return nil, 0, err
+ }
+
+ if !opts.Atomic {
+ // Since we released locks in between it's possible that the
+ // endpoint transitioned to a CLOSED/ERROR states so make
+ // sure endpoint is still writable before trying to write.
+ avail, err := e.isEndpointWritableLocked()
+ if err != nil {
+ e.stats.WriteErrors.WriteClosed.Increment()
+ return nil, 0, err
+ }
+
+ // Discard any excess data copied in due to avail being reduced due
+ // to a simultaneous write call to the socket.
+ if avail < len(v) {
+ v = v[:avail]
+ }
}
- }
- // Add data to the send queue.
- s := newOutgoingSegment(e.ID, v)
- e.sndBufUsed += len(v)
- e.sndBufInQueue += seqnum.Size(len(v))
- e.sndQueue.PushBack(s)
- e.sndBufMu.Unlock()
+ // Add data to the send queue.
+ s := newOutgoingSegment(e.ID, v)
+ e.sndBufUsed += len(v)
+ e.sndBufInQueue += seqnum.Size(len(v))
+ e.sndQueue.PushBack(s)
- // Do the work inline.
- e.handleWrite()
- e.UnlockUser()
- return int64(len(v)), nil
+ return e.drainSendQueueLocked(), len(v), nil
+ }()
+ if err != nil {
+ return 0, err
+ }
+ e.sendData(nextSeg)
+ return int64(n), nil
}
// selectWindowLocked returns the new window without checking for shrinking or scaling