summaryrefslogtreecommitdiffhomepage
path: root/pkg/tcpip/network/fragmentation
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/tcpip/network/fragmentation')
-rw-r--r--pkg/tcpip/network/fragmentation/fragmentation.go28
-rw-r--r--pkg/tcpip/network/fragmentation/fragmentation_test.go104
-rw-r--r--pkg/tcpip/network/fragmentation/reassembler.go22
-rw-r--r--pkg/tcpip/network/fragmentation/reassembler_test.go23
4 files changed, 162 insertions, 15 deletions
diff --git a/pkg/tcpip/network/fragmentation/fragmentation.go b/pkg/tcpip/network/fragmentation/fragmentation.go
index bb31ef61a..936601287 100644
--- a/pkg/tcpip/network/fragmentation/fragmentation.go
+++ b/pkg/tcpip/network/fragmentation/fragmentation.go
@@ -136,8 +136,16 @@ func NewFragmentation(blockSize uint16, highMemoryLimit, lowMemoryLimit int, rea
// proto is the protocol number marked in the fragment being processed. It has
// to be given here outside of the FragmentID struct because IPv6 should not use
// the protocol to identify a fragment.
+//
+// releaseCB is a callback that will run when the fragment reassembly of a
+// packet is complete or cancelled. releaseCB take a a boolean argument which is
+// true iff the reassembly is cancelled due to timeout. releaseCB should be
+// passed only with the first fragment of a packet. If more than one releaseCB
+// are passed for the same packet, only the first releaseCB will be saved for
+// the packet and the succeeding ones will be dropped by running them
+// immediately with a false argument.
func (f *Fragmentation) Process(
- id FragmentID, first, last uint16, more bool, proto uint8, vv buffer.VectorisedView) (
+ id FragmentID, first, last uint16, more bool, proto uint8, vv buffer.VectorisedView, releaseCB func(bool)) (
buffer.VectorisedView, uint8, bool, error) {
if first > last {
return buffer.VectorisedView{}, 0, false, fmt.Errorf("first=%d is greater than last=%d: %w", first, last, ErrInvalidArgs)
@@ -171,6 +179,12 @@ func (f *Fragmentation) Process(
f.releaseReassemblersLocked()
}
}
+ if releaseCB != nil {
+ if !r.setCallback(releaseCB) {
+ // We got a duplicate callback. Release it immediately.
+ releaseCB(false /* timedOut */)
+ }
+ }
f.mu.Unlock()
res, firstFragmentProto, done, consumed, err := r.process(first, last, more, proto, vv)
@@ -178,14 +192,14 @@ func (f *Fragmentation) Process(
// We probably got an invalid sequence of fragments. Just
// discard the reassembler and move on.
f.mu.Lock()
- f.release(r)
+ f.release(r, false /* timedOut */)
f.mu.Unlock()
return buffer.VectorisedView{}, 0, false, fmt.Errorf("fragmentation processing error: %w", err)
}
f.mu.Lock()
f.size += consumed
if done {
- f.release(r)
+ f.release(r, false /* timedOut */)
}
// Evict reassemblers if we are consuming more memory than highLimit until
// we reach lowLimit.
@@ -195,14 +209,14 @@ func (f *Fragmentation) Process(
if tail == nil {
break
}
- f.release(tail)
+ f.release(tail, false /* timedOut */)
}
}
f.mu.Unlock()
return res, firstFragmentProto, done, nil
}
-func (f *Fragmentation) release(r *reassembler) {
+func (f *Fragmentation) release(r *reassembler, timedOut bool) {
// Before releasing a fragment we need to check if r is already marked as done.
// Otherwise, we would delete it twice.
if r.checkDoneOrMark() {
@@ -216,6 +230,8 @@ func (f *Fragmentation) release(r *reassembler) {
log.Printf("memory counter < 0 (%d), this is an accounting bug that requires investigation", f.size)
f.size = 0
}
+
+ r.release(timedOut) // releaseCB may run.
}
// releaseReassemblersLocked releases already-expired reassemblers, then
@@ -238,7 +254,7 @@ func (f *Fragmentation) releaseReassemblersLocked() {
break
}
// If the oldest reassembler has already expired, release it.
- f.release(r)
+ f.release(r, true /* timedOut*/)
}
}
diff --git a/pkg/tcpip/network/fragmentation/fragmentation_test.go b/pkg/tcpip/network/fragmentation/fragmentation_test.go
index a1eb1e243..5dcd10730 100644
--- a/pkg/tcpip/network/fragmentation/fragmentation_test.go
+++ b/pkg/tcpip/network/fragmentation/fragmentation_test.go
@@ -105,7 +105,7 @@ func TestFragmentationProcess(t *testing.T) {
f := NewFragmentation(minBlockSize, 1024, 512, reassembleTimeout, &faketime.NullClock{})
firstFragmentProto := c.in[0].proto
for i, in := range c.in {
- vv, proto, done, err := f.Process(in.id, in.first, in.last, in.more, in.proto, in.vv)
+ vv, proto, done, err := f.Process(in.id, in.first, in.last, in.more, in.proto, in.vv, nil)
if err != nil {
t.Fatalf("f.Process(%+v, %d, %d, %t, %d, %X) failed: %s",
in.id, in.first, in.last, in.more, in.proto, in.vv.ToView(), err)
@@ -240,7 +240,7 @@ func TestReassemblingTimeout(t *testing.T) {
for _, event := range test.events {
clock.Advance(event.clockAdvance)
if frag := event.fragment; frag != nil {
- _, _, done, err := f.Process(FragmentID{}, frag.first, frag.last, frag.more, protocol, vv(len(frag.data), frag.data))
+ _, _, done, err := f.Process(FragmentID{}, frag.first, frag.last, frag.more, protocol, vv(len(frag.data), frag.data), nil)
if err != nil {
t.Fatalf("%s: f.Process failed: %s", event.name, err)
}
@@ -259,15 +259,15 @@ func TestReassemblingTimeout(t *testing.T) {
func TestMemoryLimits(t *testing.T) {
f := NewFragmentation(minBlockSize, 3, 1, reassembleTimeout, &faketime.NullClock{})
// Send first fragment with id = 0.
- f.Process(FragmentID{ID: 0}, 0, 0, true, 0xFF, vv(1, "0"))
+ f.Process(FragmentID{ID: 0}, 0, 0, true, 0xFF, vv(1, "0"), nil)
// Send first fragment with id = 1.
- f.Process(FragmentID{ID: 1}, 0, 0, true, 0xFF, vv(1, "1"))
+ f.Process(FragmentID{ID: 1}, 0, 0, true, 0xFF, vv(1, "1"), nil)
// Send first fragment with id = 2.
- f.Process(FragmentID{ID: 2}, 0, 0, true, 0xFF, vv(1, "2"))
+ f.Process(FragmentID{ID: 2}, 0, 0, true, 0xFF, vv(1, "2"), nil)
// Send first fragment with id = 3. This should caused id = 0 and id = 1 to be
// evicted.
- f.Process(FragmentID{ID: 3}, 0, 0, true, 0xFF, vv(1, "3"))
+ f.Process(FragmentID{ID: 3}, 0, 0, true, 0xFF, vv(1, "3"), nil)
if _, ok := f.reassemblers[FragmentID{ID: 0}]; ok {
t.Errorf("Memory limits are not respected: id=0 has not been evicted.")
@@ -283,9 +283,9 @@ func TestMemoryLimits(t *testing.T) {
func TestMemoryLimitsIgnoresDuplicates(t *testing.T) {
f := NewFragmentation(minBlockSize, 1, 0, reassembleTimeout, &faketime.NullClock{})
// Send first fragment with id = 0.
- f.Process(FragmentID{}, 0, 0, true, 0xFF, vv(1, "0"))
+ f.Process(FragmentID{}, 0, 0, true, 0xFF, vv(1, "0"), nil)
// Send the same packet again.
- f.Process(FragmentID{}, 0, 0, true, 0xFF, vv(1, "0"))
+ f.Process(FragmentID{}, 0, 0, true, 0xFF, vv(1, "0"), nil)
got := f.size
want := 1
@@ -377,7 +377,7 @@ func TestErrors(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f := NewFragmentation(test.blockSize, HighFragThreshold, LowFragThreshold, reassembleTimeout, &faketime.NullClock{})
- _, _, done, err := f.Process(FragmentID{}, test.first, test.last, test.more, 0, vv(len(test.data), test.data))
+ _, _, done, err := f.Process(FragmentID{}, test.first, test.last, test.more, 0, vv(len(test.data), test.data), nil)
if !errors.Is(err, test.err) {
t.Errorf("got Process(_, %d, %d, %t, _, %q) = (_, _, _, %v), want = (_, _, _, %v)", test.first, test.last, test.more, test.data, err, test.err)
}
@@ -497,3 +497,89 @@ func TestPacketFragmenter(t *testing.T) {
})
}
}
+
+func TestReleaseCallback(t *testing.T) {
+ const (
+ proto = 99
+ )
+
+ var result int
+ var callbackReasonIsTimeout bool
+ cb1 := func(timedOut bool) { result = 1; callbackReasonIsTimeout = timedOut }
+ cb2 := func(timedOut bool) { result = 2; callbackReasonIsTimeout = timedOut }
+
+ tests := []struct {
+ name string
+ callbacks []func(bool)
+ timeout bool
+ wantResult int
+ wantCallbackReasonIsTimeout bool
+ }{
+ {
+ name: "callback runs on release",
+ callbacks: []func(bool){cb1},
+ timeout: false,
+ wantResult: 1,
+ wantCallbackReasonIsTimeout: false,
+ },
+ {
+ name: "first callback is nil",
+ callbacks: []func(bool){nil, cb2},
+ timeout: false,
+ wantResult: 2,
+ wantCallbackReasonIsTimeout: false,
+ },
+ {
+ name: "two callbacks - first one is set",
+ callbacks: []func(bool){cb1, cb2},
+ timeout: false,
+ wantResult: 1,
+ wantCallbackReasonIsTimeout: false,
+ },
+ {
+ name: "callback runs on timeout",
+ callbacks: []func(bool){cb1},
+ timeout: true,
+ wantResult: 1,
+ wantCallbackReasonIsTimeout: true,
+ },
+ {
+ name: "no callbacks",
+ callbacks: []func(bool){nil},
+ timeout: false,
+ wantResult: 0,
+ wantCallbackReasonIsTimeout: false,
+ },
+ }
+
+ id := FragmentID{ID: 0}
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ result = 0
+ callbackReasonIsTimeout = false
+
+ f := NewFragmentation(minBlockSize, HighFragThreshold, LowFragThreshold, reassembleTimeout, &faketime.NullClock{})
+
+ for i, cb := range test.callbacks {
+ _, _, _, err := f.Process(id, uint16(i), uint16(i), true, proto, vv(1, "0"), cb)
+ if err != nil {
+ t.Errorf("f.Process error = %s", err)
+ }
+ }
+
+ r, ok := f.reassemblers[id]
+ if !ok {
+ t.Fatalf("Reassemberr not found")
+ }
+ f.release(r, test.timeout)
+
+ if result != test.wantResult {
+ t.Errorf("got result = %d, want = %d", result, test.wantResult)
+ }
+ if callbackReasonIsTimeout != test.wantCallbackReasonIsTimeout {
+ t.Errorf("got callbackReasonIsTimeout = %t, want = %t", callbackReasonIsTimeout, test.wantCallbackReasonIsTimeout)
+ }
+ })
+ }
+}
diff --git a/pkg/tcpip/network/fragmentation/reassembler.go b/pkg/tcpip/network/fragmentation/reassembler.go
index 9bb051a30..c0cc0bde0 100644
--- a/pkg/tcpip/network/fragmentation/reassembler.go
+++ b/pkg/tcpip/network/fragmentation/reassembler.go
@@ -41,6 +41,7 @@ type reassembler struct {
heap fragHeap
done bool
creationTime int64
+ callback func(bool)
}
func newReassembler(id FragmentID, clock tcpip.Clock) *reassembler {
@@ -123,3 +124,24 @@ func (r *reassembler) checkDoneOrMark() bool {
r.mu.Unlock()
return prev
}
+
+func (r *reassembler) setCallback(c func(bool)) bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.callback != nil {
+ return false
+ }
+ r.callback = c
+ return true
+}
+
+func (r *reassembler) release(timedOut bool) {
+ r.mu.Lock()
+ callback := r.callback
+ r.callback = nil
+ r.mu.Unlock()
+
+ if callback != nil {
+ callback(timedOut)
+ }
+}
diff --git a/pkg/tcpip/network/fragmentation/reassembler_test.go b/pkg/tcpip/network/fragmentation/reassembler_test.go
index a0a04a027..fa2a70dc8 100644
--- a/pkg/tcpip/network/fragmentation/reassembler_test.go
+++ b/pkg/tcpip/network/fragmentation/reassembler_test.go
@@ -105,3 +105,26 @@ func TestUpdateHoles(t *testing.T) {
}
}
}
+
+func TestSetCallback(t *testing.T) {
+ result := 0
+ reasonTimeout := false
+
+ cb1 := func(timedOut bool) { result = 1; reasonTimeout = timedOut }
+ cb2 := func(timedOut bool) { result = 2; reasonTimeout = timedOut }
+
+ r := newReassembler(FragmentID{}, &faketime.NullClock{})
+ if !r.setCallback(cb1) {
+ t.Errorf("setCallback failed")
+ }
+ if r.setCallback(cb2) {
+ t.Errorf("setCallback should fail if one is already set")
+ }
+ r.release(true)
+ if result != 1 {
+ t.Errorf("got result = %d, want = 1", result)
+ }
+ if !reasonTimeout {
+ t.Errorf("got reasonTimeout = %t, want = true", reasonTimeout)
+ }
+}