diff options
author | Googler <noreply@google.com> | 2018-04-27 10:37:02 -0700 |
---|---|---|
committer | Adin Scannell <ascannell@google.com> | 2018-04-28 01:44:26 -0400 |
commit | d02b74a5dcfed4bfc8f2f8e545bca4d2afabb296 (patch) | |
tree | 54f95eef73aee6bacbfc736fffc631be2605ed53 /pkg/compressio | |
parent | f70210e742919f40aa2f0934a22f1c9ba6dada62 (diff) |
Check in gVisor.
PiperOrigin-RevId: 194583126
Change-Id: Ica1d8821a90f74e7e745962d71801c598c652463
Diffstat (limited to 'pkg/compressio')
-rw-r--r-- | pkg/compressio/BUILD | 18 | ||||
-rw-r--r-- | pkg/compressio/compressio.go | 559 | ||||
-rw-r--r-- | pkg/compressio/compressio_test.go | 227 |
3 files changed, 804 insertions, 0 deletions
diff --git a/pkg/compressio/BUILD b/pkg/compressio/BUILD new file mode 100644 index 000000000..721b2d983 --- /dev/null +++ b/pkg/compressio/BUILD @@ -0,0 +1,18 @@ +package(licenses = ["notice"]) # Apache 2.0 + +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "compressio", + srcs = ["compressio.go"], + importpath = "gvisor.googlesource.com/gvisor/pkg/compressio", + visibility = ["//:sandbox"], + deps = ["//pkg/binary"], +) + +go_test( + name = "compressio_test", + size = "medium", + srcs = ["compressio_test.go"], + embed = [":compressio"], +) diff --git a/pkg/compressio/compressio.go b/pkg/compressio/compressio.go new file mode 100644 index 000000000..e0d36aee9 --- /dev/null +++ b/pkg/compressio/compressio.go @@ -0,0 +1,559 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package compressio provides parallel compression and decompression. +package compressio + +import ( + "bytes" + "compress/flate" + "errors" + "io" + "runtime" + "sync" + + "gvisor.googlesource.com/gvisor/pkg/binary" +) + +var bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, +} + +var chunkPool = sync.Pool{ + New: func() interface{} { + return new(chunk) + }, +} + +// chunk is a unit of work. +type chunk struct { + // compressed is compressed data. + // + // This will always be returned to the bufPool directly when work has + // finished (in schedule) and therefore must be allocated. + compressed *bytes.Buffer + + // uncompressed is the uncompressed data. + // + // This is not returned to the bufPool automatically, since it may + // correspond to a inline slice (provided directly to Read or Write). + uncompressed *bytes.Buffer +} + +// newChunk allocates a new chunk object (or pulls one from the pool). Buffers +// will be allocated if nil is provided for compressed or uncompressed. +func newChunk(compressed *bytes.Buffer, uncompressed *bytes.Buffer) *chunk { + c := chunkPool.Get().(*chunk) + if compressed != nil { + c.compressed = compressed + } else { + c.compressed = bufPool.Get().(*bytes.Buffer) + } + if uncompressed != nil { + c.uncompressed = uncompressed + } else { + c.uncompressed = bufPool.Get().(*bytes.Buffer) + } + return c +} + +// result is the result of some work; it includes the original chunk. +type result struct { + *chunk + err error +} + +// worker is a compression/decompression worker. +// +// The associated worker goroutine reads in uncompressed buffers from input and +// writes compressed buffers to its output. Alternatively, the worker reads +// compressed buffers from input and writes uncompressed buffers to its output. +// +// The goroutine will exit when input is closed, and the goroutine will close +// output. +type worker struct { + input chan *chunk + output chan result +} + +// work is the main work routine; see worker. +func (w *worker) work(compress bool, level int) { + defer close(w.output) + + for c := range w.input { + if compress { + // Encode this slice. + fw, err := flate.NewWriter(c.compressed, level) + if err != nil { + w.output <- result{c, err} + continue + } + + // Encode the input. + if _, err := io.Copy(fw, c.uncompressed); err != nil { + w.output <- result{c, err} + continue + } + if err := fw.Close(); err != nil { + w.output <- result{c, err} + continue + } + } else { + // Decode this slice. + fr := flate.NewReader(c.compressed) + + // Decode the input. + if _, err := io.Copy(c.uncompressed, fr); err != nil { + w.output <- result{c, err} + continue + } + } + + // Send the output. + w.output <- result{c, nil} + } +} + +// pool is common functionality for reader/writers. +type pool struct { + // workers are the compression/decompression workers. + workers []worker + + // chunkSize is the chunk size. This is the first four bytes in the + // stream and is shared across both the reader and writer. + chunkSize uint32 + + // mu protects below; it is generally the responsibility of users to + // acquire this mutex before calling any methods on the pool. + mu sync.Mutex + + // nextInput is the next worker for input (scheduling). + nextInput int + + // nextOutput is the next worker for output (result). + nextOutput int + + // buf is the current active buffer; the exact semantics of this buffer + // depending on whether this is a reader or a writer. + buf *bytes.Buffer +} + +// init initializes the worker pool. +// +// This should only be called once. +func (p *pool) init(compress bool, level int) { + p.workers = make([]worker, 1+runtime.GOMAXPROCS(0)) + for i := 0; i < len(p.workers); i++ { + p.workers[i] = worker{ + input: make(chan *chunk, 1), + output: make(chan result, 1), + } + go p.workers[i].work(compress, level) // S/R-SAFE: In save path only. + } + runtime.SetFinalizer(p, (*pool).stop) +} + +// stop stops all workers. +func (p *pool) stop() { + for i := 0; i < len(p.workers); i++ { + close(p.workers[i].input) + } + p.workers = nil +} + +// handleResult calls the callback. +func handleResult(r result, callback func(*chunk) error) error { + defer func() { + r.chunk.compressed.Reset() + bufPool.Put(r.chunk.compressed) + chunkPool.Put(r.chunk) + }() + if r.err != nil { + return r.err + } + if err := callback(r.chunk); err != nil { + return err + } + return nil +} + +// schedule schedules the given buffers. +// +// If c is non-nil, then it will return as soon as the chunk is scheduled. If c +// is nil, then it will return only when no more work is left to do. +// +// If no callback function is provided, then the output channel will be +// ignored. You must be sure that the input is schedulable in this case. +func (p *pool) schedule(c *chunk, callback func(*chunk) error) error { + for { + var ( + inputChan chan *chunk + outputChan chan result + ) + if c != nil { + inputChan = p.workers[(p.nextInput+1)%len(p.workers)].input + } + if callback != nil && p.nextOutput != p.nextInput { + outputChan = p.workers[(p.nextOutput+1)%len(p.workers)].output + } + if inputChan == nil && outputChan == nil { + return nil + } + + select { + case inputChan <- c: + p.nextInput++ + return nil + case r := <-outputChan: + p.nextOutput++ + if err := handleResult(r, callback); err != nil { + return err + } + } + } +} + +// reader chunks reads and decompresses. +type reader struct { + pool + + // in is the source. + in io.Reader +} + +// NewReader returns a new compressed reader. +func NewReader(in io.Reader) (io.Reader, error) { + r := &reader{ + in: in, + } + r.init(false, 0) + var err error + if r.chunkSize, err = binary.ReadUint32(r.in, binary.BigEndian); err != nil { + return nil, err + } + return r, nil +} + +// errNewBuffer is returned when a new buffer is completed. +var errNewBuffer = errors.New("buffer ready") + +// Read implements io.Reader.Read. +func (r *reader) Read(p []byte) (int, error) { + r.mu.Lock() + defer r.mu.Unlock() + + // Total bytes completed; this is declared up front because it must be + // adjustable by the callback below. + done := 0 + + // Total bytes pending in the asynchronous workers for buffers. This is + // used to process the proper regions of the input as inline buffers. + var ( + pendingPre = r.nextInput - r.nextOutput + pendingInline = 0 + ) + + // Define our callback for completed work. + callback := func(c *chunk) error { + // Check for an inline buffer. + if pendingPre == 0 && pendingInline > 0 { + pendingInline-- + done += c.uncompressed.Len() + return nil + } + + // Copy the resulting buffer to our intermediate one, and + // return errNewBuffer to ensure that we aren't called a second + // time. This error code is handled specially below. + // + // c.buf will be freed and return to the pool when it is done. + if pendingPre > 0 { + pendingPre-- + } + r.buf = c.uncompressed + return errNewBuffer + } + + for done < len(p) { + // Do we have buffered data available? + if r.buf != nil { + n, err := r.buf.Read(p[done:]) + done += n + if err == io.EOF { + // This is the uncompressed buffer, it can be + // returned to the pool at this point. + r.buf.Reset() + bufPool.Put(r.buf) + r.buf = nil + } else if err != nil { + // Should never happen. + defer r.stop() + return done, err + } + continue + } + + // Read the length of the next chunk and reset the + // reader. The length is used to limit the reader. + // + // See writer.flush. + l, err := binary.ReadUint32(r.in, binary.BigEndian) + if err != nil { + // This is generally okay as long as there + // are still buffers outstanding. We actually + // just wait for completion of those buffers here + // and continue our loop. + if err := r.schedule(nil, callback); err == nil { + // We've actually finished all buffers; this is + // the normal EOF exit path. + defer r.stop() + return done, io.EOF + } else if err == errNewBuffer { + // A new buffer is now available. + continue + } else { + // Some other error occurred; we cannot + // process any further. + defer r.stop() + return done, err + } + } + + // Read this chunk and schedule decompression. + compressed := bufPool.Get().(*bytes.Buffer) + if _, err := io.Copy(compressed, &io.LimitedReader{ + R: r.in, + N: int64(l), + }); err != nil { + // Some other error occurred; see above. + return done, err + } + + // Are we doing inline decoding? + // + // Note that we need to check the length here against + // bytes.MinRead, since the bytes library will choose to grow + // the slice if the available capacity is not at least + // bytes.MinRead. This limits inline decoding to chunkSizes + // that are at least bytes.MinRead (which is not unreasonable). + var c *chunk + start := done + ((pendingPre + pendingInline) * int(r.chunkSize)) + if len(p) >= start+int(r.chunkSize) && len(p) >= start+bytes.MinRead { + c = newChunk(compressed, bytes.NewBuffer(p[start:start])) + pendingInline++ + } else { + c = newChunk(compressed, nil) + } + if err := r.schedule(c, callback); err == errNewBuffer { + // A new buffer was completed while we were reading. + // That's great, but we need to force schedule the + // current buffer so that it does not get lost. + // + // It is safe to pass nil as an output function here, + // because we know that we just freed up a slot above. + r.schedule(c, nil) + } else if err != nil { + // Some other error occurred; see above. + defer r.stop() + return done, err + } + } + + // Make sure that everything has been decoded successfully, otherwise + // parts of p may not actually have completed. + for pendingInline > 0 { + if err := r.schedule(nil, func(c *chunk) error { + if err := callback(c); err != nil { + return err + } + // The nil case means that an inline buffer has + // completed. The callback will have already removed + // the inline buffer from the map, so we just return an + // error to check the top of the loop again. + return errNewBuffer + }); err != errNewBuffer { + // Some other error occurred; see above. + return done, err + } + } + + // Need to return done here, since it may have been adjusted by the + // callback to compensation for partial reads on some inline buffer. + return done, nil +} + +// writer chunks and schedules writes. +type writer struct { + pool + + // out is the underlying writer. + out io.Writer + + // closed indicates whether the file has been closed. + closed bool +} + +// NewWriter returns a new compressed writer. +// +// The recommended chunkSize is on the order of 1M. Extra memory may be +// buffered (in the form of read-ahead, or buffered writes), and is limited to +// O(chunkSize * [1+GOMAXPROCS]). +func NewWriter(out io.Writer, chunkSize uint32, level int) (io.WriteCloser, error) { + w := &writer{ + pool: pool{ + chunkSize: chunkSize, + buf: bufPool.Get().(*bytes.Buffer), + }, + out: out, + } + w.init(true, level) + if err := binary.WriteUint32(w.out, binary.BigEndian, chunkSize); err != nil { + return nil, err + } + return w, nil +} + +// flush writes a single buffer. +func (w *writer) flush(c *chunk) error { + // Prefix each chunk with a length; this allows the reader to safely + // limit reads while buffering. + l := uint32(c.compressed.Len()) + if err := binary.WriteUint32(w.out, binary.BigEndian, l); err != nil { + return err + } + + // Write out to the stream. + _, err := io.Copy(w.out, c.compressed) + return err +} + +// Write implements io.Writer.Write. +func (w *writer) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + + // Did we close already? + if w.closed { + return 0, io.ErrUnexpectedEOF + } + + // See above; we need to track in the same way. + var ( + pendingPre = w.nextInput - w.nextOutput + pendingInline = 0 + ) + callback := func(c *chunk) error { + if pendingPre == 0 && pendingInline > 0 { + pendingInline-- + return w.flush(c) + } + if pendingPre > 0 { + pendingPre-- + } + err := w.flush(c) + c.uncompressed.Reset() + bufPool.Put(c.uncompressed) + return err + } + + for done := 0; done < len(p); { + // Construct an inline buffer if we're doing an inline + // encoding; see above regarding the bytes.MinRead constraint. + if w.buf.Len() == 0 && len(p) >= done+int(w.chunkSize) && len(p) >= done+bytes.MinRead { + bufPool.Put(w.buf) // Return to the pool; never scheduled. + w.buf = bytes.NewBuffer(p[done : done+int(w.chunkSize)]) + done += int(w.chunkSize) + pendingInline++ + } + + // Do we need to flush w.buf? Note that this case should be hit + // immediately following the inline case above. + left := int(w.chunkSize) - w.buf.Len() + if left == 0 { + if err := w.schedule(newChunk(nil, w.buf), callback); err != nil { + return done, err + } + // Reset the buffer, since this has now been scheduled + // for compression. Note that this may be trampled + // immediately by the bufPool.Put(w.buf) above if the + // next buffer happens to be inline, but that's okay. + w.buf = bufPool.Get().(*bytes.Buffer) + continue + } + + // Read from p into w.buf. + toWrite := len(p) - done + if toWrite > left { + toWrite = left + } + n, err := w.buf.Write(p[done : done+toWrite]) + done += n + if err != nil { + return done, err + } + } + + // Make sure that everything has been flushed, we can't return until + // all the contents from p have been used. + for pendingInline > 0 { + if err := w.schedule(nil, func(c *chunk) error { + if err := callback(c); err != nil { + return err + } + // The flush was successful, return errNewBuffer here + // to break from the loop and check the condition + // again. + return errNewBuffer + }); err != errNewBuffer { + return len(p), err + } + } + + return len(p), nil +} + +// Close implements io.Closer.Close. +func (w *writer) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + // Did we already close? After the call to Close, we always mark as + // closed, regardless of whether the flush is successful. + if w.closed { + return io.ErrUnexpectedEOF + } + w.closed = true + defer w.stop() + + // Schedule any remaining partial buffer; we pass w.flush directly here + // because the final buffer is guaranteed to not be an inline buffer. + if w.buf.Len() > 0 { + if err := w.schedule(newChunk(nil, w.buf), w.flush); err != nil { + return err + } + } + + // Flush all scheduled buffers; see above. + if err := w.schedule(nil, w.flush); err != nil { + return err + } + + // Close the underlying writer (if necessary). + if closer, ok := w.out.(io.Closer); ok { + return closer.Close() + } + return nil +} diff --git a/pkg/compressio/compressio_test.go b/pkg/compressio/compressio_test.go new file mode 100644 index 000000000..d7911419d --- /dev/null +++ b/pkg/compressio/compressio_test.go @@ -0,0 +1,227 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compressio + +import ( + "bytes" + "compress/flate" + "encoding/base64" + "fmt" + "io" + "math/rand" + "runtime" + "testing" + "time" +) + +type harness interface { + Errorf(format string, v ...interface{}) + Fatalf(format string, v ...interface{}) + Logf(format string, v ...interface{}) +} + +func initTest(t harness, size int) []byte { + // Set number of processes to number of CPUs. + runtime.GOMAXPROCS(runtime.NumCPU()) + + // Construct synthetic data. We do this by encoding random data with + // base64. This gives a high level of entropy, but still quite a bit of + // structure, to give reasonable compression ratios (~75%). + var buf bytes.Buffer + bufW := base64.NewEncoder(base64.RawStdEncoding, &buf) + bufR := rand.New(rand.NewSource(0)) + if _, err := io.CopyN(bufW, bufR, int64(size)); err != nil { + t.Fatalf("unable to seed random data: %v", err) + } + return buf.Bytes() +} + +type testOpts struct { + Name string + Data []byte + NewWriter func(*bytes.Buffer) (io.Writer, error) + NewReader func(*bytes.Buffer) (io.Reader, error) + PreCompress func() + PostCompress func() + PreDecompress func() + PostDecompress func() + CompressIters int + DecompressIters int +} + +func doTest(t harness, opts testOpts) { + // Compress. + var compressed bytes.Buffer + compressionStartTime := time.Now() + if opts.PreCompress != nil { + opts.PreCompress() + } + if opts.CompressIters <= 0 { + opts.CompressIters = 1 + } + for i := 0; i < opts.CompressIters; i++ { + compressed.Reset() + w, err := opts.NewWriter(&compressed) + if err != nil { + t.Errorf("%s: NewWriter got err %v, expected nil", opts.Name, err) + } + if _, err := io.Copy(w, bytes.NewBuffer(opts.Data)); err != nil { + t.Errorf("%s: compress got err %v, expected nil", opts.Name, err) + return + } + closer, ok := w.(io.Closer) + if ok { + if err := closer.Close(); err != nil { + t.Errorf("%s: got err %v, expected nil", opts.Name, err) + return + } + } + } + if opts.PostCompress != nil { + opts.PostCompress() + } + compressionTime := time.Since(compressionStartTime) + compressionRatio := float32(compressed.Len()) / float32(len(opts.Data)) + + // Decompress. + var decompressed bytes.Buffer + decompressionStartTime := time.Now() + if opts.PreDecompress != nil { + opts.PreDecompress() + } + if opts.DecompressIters <= 0 { + opts.DecompressIters = 1 + } + for i := 0; i < opts.DecompressIters; i++ { + decompressed.Reset() + r, err := opts.NewReader(bytes.NewBuffer(compressed.Bytes())) + if err != nil { + t.Errorf("%s: NewReader got err %v, expected nil", opts.Name, err) + return + } + if _, err := io.Copy(&decompressed, r); err != nil { + t.Errorf("%s: decompress got err %v, expected nil", opts.Name, err) + return + } + } + if opts.PostDecompress != nil { + opts.PostDecompress() + } + decompressionTime := time.Since(decompressionStartTime) + + // Verify. + if decompressed.Len() != len(opts.Data) { + t.Errorf("%s: got %d bytes, expected %d", opts.Name, decompressed.Len(), len(opts.Data)) + } + if !bytes.Equal(opts.Data, decompressed.Bytes()) { + t.Errorf("%s: got mismatch, expected match", opts.Name) + if len(opts.Data) < 32 { // Don't flood the logs. + t.Errorf("got %v, expected %v", decompressed.Bytes(), opts.Data) + } + } + + t.Logf("%s: compression time %v, ratio %2.2f, decompression time %v", + opts.Name, compressionTime, compressionRatio, decompressionTime) +} + +func TestCompress(t *testing.T) { + var ( + data = initTest(t, 10*1024*1024) + data0 = data[:0] + data1 = data[:1] + data2 = data[:11] + data3 = data[:16] + data4 = data[:] + ) + + for _, data := range [][]byte{data0, data1, data2, data3, data4} { + for _, blockSize := range []uint32{1, 4, 1024, 4 * 1024, 16 * 1024} { + // Skip annoying tests; they just take too long. + if blockSize <= 16 && len(data) > 16 { + continue + } + + // Do the compress test. + doTest(t, testOpts{ + Name: fmt.Sprintf("len(data)=%d, blockSize=%d", len(data), blockSize), + Data: data, + NewWriter: func(b *bytes.Buffer) (io.Writer, error) { + return NewWriter(b, blockSize, flate.BestCompression) + }, + NewReader: func(b *bytes.Buffer) (io.Reader, error) { + return NewReader(b) + }, + }) + } + + // Do the vanilla test. + doTest(t, testOpts{ + Name: fmt.Sprintf("len(data)=%d, vanilla flate", len(data)), + Data: data, + NewWriter: func(b *bytes.Buffer) (io.Writer, error) { + return flate.NewWriter(b, flate.BestCompression) + }, + NewReader: func(b *bytes.Buffer) (io.Reader, error) { + return flate.NewReader(b), nil + }, + }) + } +} + +const ( + // benchBlockSize is the blockSize for benchmarks. + benchBlockSize = 32 * 1024 + + // benchDataSize is the amount of data for benchmarks. + benchDataSize = 10 * 1024 * 1024 +) + +func BenchmarkCompress(b *testing.B) { + b.StopTimer() + b.SetBytes(benchDataSize) + data := initTest(b, benchDataSize) + doTest(b, testOpts{ + Name: fmt.Sprintf("len(data)=%d, blockSize=%d", len(data), benchBlockSize), + Data: data, + PreCompress: b.StartTimer, + PostCompress: b.StopTimer, + NewWriter: func(b *bytes.Buffer) (io.Writer, error) { + return NewWriter(b, benchBlockSize, flate.BestCompression) + }, + NewReader: func(b *bytes.Buffer) (io.Reader, error) { + return NewReader(b) + }, + CompressIters: b.N, + }) +} + +func BenchmarkDecompress(b *testing.B) { + b.StopTimer() + b.SetBytes(benchDataSize) + data := initTest(b, benchDataSize) + doTest(b, testOpts{ + Name: fmt.Sprintf("len(data)=%d, blockSize=%d", len(data), benchBlockSize), + Data: data, + PreDecompress: b.StartTimer, + PostDecompress: b.StopTimer, + NewWriter: func(b *bytes.Buffer) (io.Writer, error) { + return NewWriter(b, benchBlockSize, flate.BestCompression) + }, + NewReader: func(b *bytes.Buffer) (io.Reader, error) { + return NewReader(b) + }, + DecompressIters: b.N, + }) +} |