summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xpkg/tcpip/link/channel/channel.go34
-rw-r--r--pkg/tcpip/tcpip.go4
-rw-r--r--pkg/tcpip/transport/tcp/protocol.go17
-rw-r--r--pkg/tcpip/transport/tcp/snd.go15
-rwxr-xr-xpkg/tcpip/transport/tcp/tcp_state_autogen.go2
5 files changed, 42 insertions, 30 deletions
diff --git a/pkg/tcpip/link/channel/channel.go b/pkg/tcpip/link/channel/channel.go
index b4a0ae53d..9bf67686d 100755
--- a/pkg/tcpip/link/channel/channel.go
+++ b/pkg/tcpip/link/channel/channel.go
@@ -50,13 +50,11 @@ type NotificationHandle struct {
}
type queue struct {
+ // c is the outbound packet channel.
+ c chan PacketInfo
// mu protects fields below.
- mu sync.RWMutex
- // c is the outbound packet channel. Sending to c should hold mu.
- c chan PacketInfo
- numWrite int
- numRead int
- notify []*NotificationHandle
+ mu sync.RWMutex
+ notify []*NotificationHandle
}
func (q *queue) Close() {
@@ -64,11 +62,8 @@ func (q *queue) Close() {
}
func (q *queue) Read() (PacketInfo, bool) {
- q.mu.Lock()
- defer q.mu.Unlock()
select {
case p := <-q.c:
- q.numRead++
return p, true
default:
return PacketInfo{}, false
@@ -76,15 +71,8 @@ func (q *queue) Read() (PacketInfo, bool) {
}
func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) {
- // We have to receive from channel without holding the lock, since it can
- // block indefinitely. This will cause a window that numWrite - numRead
- // produces a larger number, but won't go to negative. numWrite >= numRead
- // still holds.
select {
case pkt := <-q.c:
- q.mu.Lock()
- defer q.mu.Unlock()
- q.numRead++
return pkt, true
case <-ctx.Done():
return PacketInfo{}, false
@@ -93,16 +81,12 @@ func (q *queue) ReadContext(ctx context.Context) (PacketInfo, bool) {
func (q *queue) Write(p PacketInfo) bool {
wrote := false
-
- // It's important to make sure nobody can see numWrite until we increment it,
- // so numWrite >= numRead holds.
- q.mu.Lock()
select {
case q.c <- p:
wrote = true
- q.numWrite++
default:
}
+ q.mu.Lock()
notify := q.notify
q.mu.Unlock()
@@ -116,13 +100,7 @@ func (q *queue) Write(p PacketInfo) bool {
}
func (q *queue) Num() int {
- q.mu.RLock()
- defer q.mu.RUnlock()
- n := q.numWrite - q.numRead
- if n < 0 {
- panic("numWrite < numRead")
- }
- return n
+ return len(q.c)
}
func (q *queue) AddNotify(notify Notification) *NotificationHandle {
diff --git a/pkg/tcpip/tcpip.go b/pkg/tcpip/tcpip.go
index aec7126ff..109121dbc 100644
--- a/pkg/tcpip/tcpip.go
+++ b/pkg/tcpip/tcpip.go
@@ -681,6 +681,10 @@ type TCPTimeWaitTimeoutOption time.Duration
// for a handshake till the specified timeout until a segment with data arrives.
type TCPDeferAcceptOption time.Duration
+// TCPMinRTOOption is use by SetSockOpt/GetSockOpt to allow overriding
+// default MinRTO used by the Stack.
+type TCPMinRTOOption time.Duration
+
// MulticastInterfaceOption is used by SetSockOpt/GetSockOpt to specify a
// default interface for multicast.
type MulticastInterfaceOption struct {
diff --git a/pkg/tcpip/transport/tcp/protocol.go b/pkg/tcpip/transport/tcp/protocol.go
index dce9a1652..91f25c132 100644
--- a/pkg/tcpip/transport/tcp/protocol.go
+++ b/pkg/tcpip/transport/tcp/protocol.go
@@ -105,6 +105,7 @@ type protocol struct {
moderateReceiveBuffer bool
tcpLingerTimeout time.Duration
tcpTimeWaitTimeout time.Duration
+ minRTO time.Duration
dispatcher *dispatcher
}
@@ -272,6 +273,15 @@ func (p *protocol) SetOption(option interface{}) *tcpip.Error {
p.mu.Unlock()
return nil
+ case tcpip.TCPMinRTOOption:
+ if v < 0 {
+ v = tcpip.TCPMinRTOOption(MinRTO)
+ }
+ p.mu.Lock()
+ p.minRTO = time.Duration(v)
+ p.mu.Unlock()
+ return nil
+
default:
return tcpip.ErrUnknownProtocolOption
}
@@ -334,6 +344,12 @@ func (p *protocol) Option(option interface{}) *tcpip.Error {
p.mu.RUnlock()
return nil
+ case *tcpip.TCPMinRTOOption:
+ p.mu.RLock()
+ *v = tcpip.TCPMinRTOOption(p.minRTO)
+ p.mu.RUnlock()
+ return nil
+
default:
return tcpip.ErrUnknownProtocolOption
}
@@ -359,5 +375,6 @@ func NewProtocol() stack.TransportProtocol {
tcpLingerTimeout: DefaultTCPLingerTimeout,
tcpTimeWaitTimeout: DefaultTCPTimeWaitTimeout,
dispatcher: newDispatcher(runtime.GOMAXPROCS(0)),
+ minRTO: MinRTO,
}
}
diff --git a/pkg/tcpip/transport/tcp/snd.go b/pkg/tcpip/transport/tcp/snd.go
index 6b7bac37d..d8cfe3115 100644
--- a/pkg/tcpip/transport/tcp/snd.go
+++ b/pkg/tcpip/transport/tcp/snd.go
@@ -15,6 +15,7 @@
package tcp
import (
+ "fmt"
"math"
"sync/atomic"
"time"
@@ -149,6 +150,9 @@ type sender struct {
rtt rtt
rto time.Duration
+ // minRTO is the minimum permitted value for sender.rto.
+ minRTO time.Duration
+
// maxPayloadSize is the maximum size of the payload of a given segment.
// It is initialized on demand.
maxPayloadSize int
@@ -260,6 +264,13 @@ func newSender(ep *endpoint, iss, irs seqnum.Value, sndWnd seqnum.Size, mss uint
// etc.
s.ep.scoreboard = NewSACKScoreboard(uint16(s.maxPayloadSize), iss)
+ // Get Stack wide minRTO.
+ var v tcpip.TCPMinRTOOption
+ if err := ep.stack.TransportProtocolOption(ProtocolNumber, &v); err != nil {
+ panic(fmt.Sprintf("unable to get minRTO from stack: %s", err))
+ }
+ s.minRTO = time.Duration(v)
+
return s
}
@@ -394,8 +405,8 @@ func (s *sender) updateRTO(rtt time.Duration) {
s.rto = s.rtt.srtt + 4*s.rtt.rttvar
s.rtt.Unlock()
- if s.rto < MinRTO {
- s.rto = MinRTO
+ if s.rto < s.minRTO {
+ s.rto = s.minRTO
}
}
diff --git a/pkg/tcpip/transport/tcp/tcp_state_autogen.go b/pkg/tcpip/transport/tcp/tcp_state_autogen.go
index b3cb5f986..5ac73e3b7 100755
--- a/pkg/tcpip/transport/tcp/tcp_state_autogen.go
+++ b/pkg/tcpip/transport/tcp/tcp_state_autogen.go
@@ -374,6 +374,7 @@ func (x *sender) save(m state.Map) {
m.Save("writeList", &x.writeList)
m.Save("rtt", &x.rtt)
m.Save("rto", &x.rto)
+ m.Save("minRTO", &x.minRTO)
m.Save("maxPayloadSize", &x.maxPayloadSize)
m.Save("gso", &x.gso)
m.Save("sndWndScale", &x.sndWndScale)
@@ -399,6 +400,7 @@ func (x *sender) load(m state.Map) {
m.Load("writeList", &x.writeList)
m.Load("rtt", &x.rtt)
m.Load("rto", &x.rto)
+ m.Load("minRTO", &x.minRTO)
m.Load("maxPayloadSize", &x.maxPayloadSize)
m.Load("gso", &x.gso)
m.Load("sndWndScale", &x.sndWndScale)