summaryrefslogtreecommitdiffhomepage
path: root/pkg/compressio/compressio.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/compressio/compressio.go')
-rw-r--r--pkg/compressio/compressio.go559
1 files changed, 559 insertions, 0 deletions
diff --git a/pkg/compressio/compressio.go b/pkg/compressio/compressio.go
new file mode 100644
index 000000000..e0d36aee9
--- /dev/null
+++ b/pkg/compressio/compressio.go
@@ -0,0 +1,559 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package compressio provides parallel compression and decompression.
+package compressio
+
+import (
+ "bytes"
+ "compress/flate"
+ "errors"
+ "io"
+ "runtime"
+ "sync"
+
+ "gvisor.googlesource.com/gvisor/pkg/binary"
+)
+
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return bytes.NewBuffer(nil)
+ },
+}
+
+var chunkPool = sync.Pool{
+ New: func() interface{} {
+ return new(chunk)
+ },
+}
+
+// chunk is a unit of work.
+type chunk struct {
+ // compressed is compressed data.
+ //
+ // This will always be returned to the bufPool directly when work has
+ // finished (in schedule) and therefore must be allocated.
+ compressed *bytes.Buffer
+
+ // uncompressed is the uncompressed data.
+ //
+ // This is not returned to the bufPool automatically, since it may
+ // correspond to a inline slice (provided directly to Read or Write).
+ uncompressed *bytes.Buffer
+}
+
+// newChunk allocates a new chunk object (or pulls one from the pool). Buffers
+// will be allocated if nil is provided for compressed or uncompressed.
+func newChunk(compressed *bytes.Buffer, uncompressed *bytes.Buffer) *chunk {
+ c := chunkPool.Get().(*chunk)
+ if compressed != nil {
+ c.compressed = compressed
+ } else {
+ c.compressed = bufPool.Get().(*bytes.Buffer)
+ }
+ if uncompressed != nil {
+ c.uncompressed = uncompressed
+ } else {
+ c.uncompressed = bufPool.Get().(*bytes.Buffer)
+ }
+ return c
+}
+
+// result is the result of some work; it includes the original chunk.
+type result struct {
+ *chunk
+ err error
+}
+
+// worker is a compression/decompression worker.
+//
+// The associated worker goroutine reads in uncompressed buffers from input and
+// writes compressed buffers to its output. Alternatively, the worker reads
+// compressed buffers from input and writes uncompressed buffers to its output.
+//
+// The goroutine will exit when input is closed, and the goroutine will close
+// output.
+type worker struct {
+ input chan *chunk
+ output chan result
+}
+
+// work is the main work routine; see worker.
+func (w *worker) work(compress bool, level int) {
+ defer close(w.output)
+
+ for c := range w.input {
+ if compress {
+ // Encode this slice.
+ fw, err := flate.NewWriter(c.compressed, level)
+ if err != nil {
+ w.output <- result{c, err}
+ continue
+ }
+
+ // Encode the input.
+ if _, err := io.Copy(fw, c.uncompressed); err != nil {
+ w.output <- result{c, err}
+ continue
+ }
+ if err := fw.Close(); err != nil {
+ w.output <- result{c, err}
+ continue
+ }
+ } else {
+ // Decode this slice.
+ fr := flate.NewReader(c.compressed)
+
+ // Decode the input.
+ if _, err := io.Copy(c.uncompressed, fr); err != nil {
+ w.output <- result{c, err}
+ continue
+ }
+ }
+
+ // Send the output.
+ w.output <- result{c, nil}
+ }
+}
+
+// pool is common functionality for reader/writers.
+type pool struct {
+ // workers are the compression/decompression workers.
+ workers []worker
+
+ // chunkSize is the chunk size. This is the first four bytes in the
+ // stream and is shared across both the reader and writer.
+ chunkSize uint32
+
+ // mu protects below; it is generally the responsibility of users to
+ // acquire this mutex before calling any methods on the pool.
+ mu sync.Mutex
+
+ // nextInput is the next worker for input (scheduling).
+ nextInput int
+
+ // nextOutput is the next worker for output (result).
+ nextOutput int
+
+ // buf is the current active buffer; the exact semantics of this buffer
+ // depending on whether this is a reader or a writer.
+ buf *bytes.Buffer
+}
+
+// init initializes the worker pool.
+//
+// This should only be called once.
+func (p *pool) init(compress bool, level int) {
+ p.workers = make([]worker, 1+runtime.GOMAXPROCS(0))
+ for i := 0; i < len(p.workers); i++ {
+ p.workers[i] = worker{
+ input: make(chan *chunk, 1),
+ output: make(chan result, 1),
+ }
+ go p.workers[i].work(compress, level) // S/R-SAFE: In save path only.
+ }
+ runtime.SetFinalizer(p, (*pool).stop)
+}
+
+// stop stops all workers.
+func (p *pool) stop() {
+ for i := 0; i < len(p.workers); i++ {
+ close(p.workers[i].input)
+ }
+ p.workers = nil
+}
+
+// handleResult calls the callback.
+func handleResult(r result, callback func(*chunk) error) error {
+ defer func() {
+ r.chunk.compressed.Reset()
+ bufPool.Put(r.chunk.compressed)
+ chunkPool.Put(r.chunk)
+ }()
+ if r.err != nil {
+ return r.err
+ }
+ if err := callback(r.chunk); err != nil {
+ return err
+ }
+ return nil
+}
+
+// schedule schedules the given buffers.
+//
+// If c is non-nil, then it will return as soon as the chunk is scheduled. If c
+// is nil, then it will return only when no more work is left to do.
+//
+// If no callback function is provided, then the output channel will be
+// ignored. You must be sure that the input is schedulable in this case.
+func (p *pool) schedule(c *chunk, callback func(*chunk) error) error {
+ for {
+ var (
+ inputChan chan *chunk
+ outputChan chan result
+ )
+ if c != nil {
+ inputChan = p.workers[(p.nextInput+1)%len(p.workers)].input
+ }
+ if callback != nil && p.nextOutput != p.nextInput {
+ outputChan = p.workers[(p.nextOutput+1)%len(p.workers)].output
+ }
+ if inputChan == nil && outputChan == nil {
+ return nil
+ }
+
+ select {
+ case inputChan <- c:
+ p.nextInput++
+ return nil
+ case r := <-outputChan:
+ p.nextOutput++
+ if err := handleResult(r, callback); err != nil {
+ return err
+ }
+ }
+ }
+}
+
+// reader chunks reads and decompresses.
+type reader struct {
+ pool
+
+ // in is the source.
+ in io.Reader
+}
+
+// NewReader returns a new compressed reader.
+func NewReader(in io.Reader) (io.Reader, error) {
+ r := &reader{
+ in: in,
+ }
+ r.init(false, 0)
+ var err error
+ if r.chunkSize, err = binary.ReadUint32(r.in, binary.BigEndian); err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+// errNewBuffer is returned when a new buffer is completed.
+var errNewBuffer = errors.New("buffer ready")
+
+// Read implements io.Reader.Read.
+func (r *reader) Read(p []byte) (int, error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Total bytes completed; this is declared up front because it must be
+ // adjustable by the callback below.
+ done := 0
+
+ // Total bytes pending in the asynchronous workers for buffers. This is
+ // used to process the proper regions of the input as inline buffers.
+ var (
+ pendingPre = r.nextInput - r.nextOutput
+ pendingInline = 0
+ )
+
+ // Define our callback for completed work.
+ callback := func(c *chunk) error {
+ // Check for an inline buffer.
+ if pendingPre == 0 && pendingInline > 0 {
+ pendingInline--
+ done += c.uncompressed.Len()
+ return nil
+ }
+
+ // Copy the resulting buffer to our intermediate one, and
+ // return errNewBuffer to ensure that we aren't called a second
+ // time. This error code is handled specially below.
+ //
+ // c.buf will be freed and return to the pool when it is done.
+ if pendingPre > 0 {
+ pendingPre--
+ }
+ r.buf = c.uncompressed
+ return errNewBuffer
+ }
+
+ for done < len(p) {
+ // Do we have buffered data available?
+ if r.buf != nil {
+ n, err := r.buf.Read(p[done:])
+ done += n
+ if err == io.EOF {
+ // This is the uncompressed buffer, it can be
+ // returned to the pool at this point.
+ r.buf.Reset()
+ bufPool.Put(r.buf)
+ r.buf = nil
+ } else if err != nil {
+ // Should never happen.
+ defer r.stop()
+ return done, err
+ }
+ continue
+ }
+
+ // Read the length of the next chunk and reset the
+ // reader. The length is used to limit the reader.
+ //
+ // See writer.flush.
+ l, err := binary.ReadUint32(r.in, binary.BigEndian)
+ if err != nil {
+ // This is generally okay as long as there
+ // are still buffers outstanding. We actually
+ // just wait for completion of those buffers here
+ // and continue our loop.
+ if err := r.schedule(nil, callback); err == nil {
+ // We've actually finished all buffers; this is
+ // the normal EOF exit path.
+ defer r.stop()
+ return done, io.EOF
+ } else if err == errNewBuffer {
+ // A new buffer is now available.
+ continue
+ } else {
+ // Some other error occurred; we cannot
+ // process any further.
+ defer r.stop()
+ return done, err
+ }
+ }
+
+ // Read this chunk and schedule decompression.
+ compressed := bufPool.Get().(*bytes.Buffer)
+ if _, err := io.Copy(compressed, &io.LimitedReader{
+ R: r.in,
+ N: int64(l),
+ }); err != nil {
+ // Some other error occurred; see above.
+ return done, err
+ }
+
+ // Are we doing inline decoding?
+ //
+ // Note that we need to check the length here against
+ // bytes.MinRead, since the bytes library will choose to grow
+ // the slice if the available capacity is not at least
+ // bytes.MinRead. This limits inline decoding to chunkSizes
+ // that are at least bytes.MinRead (which is not unreasonable).
+ var c *chunk
+ start := done + ((pendingPre + pendingInline) * int(r.chunkSize))
+ if len(p) >= start+int(r.chunkSize) && len(p) >= start+bytes.MinRead {
+ c = newChunk(compressed, bytes.NewBuffer(p[start:start]))
+ pendingInline++
+ } else {
+ c = newChunk(compressed, nil)
+ }
+ if err := r.schedule(c, callback); err == errNewBuffer {
+ // A new buffer was completed while we were reading.
+ // That's great, but we need to force schedule the
+ // current buffer so that it does not get lost.
+ //
+ // It is safe to pass nil as an output function here,
+ // because we know that we just freed up a slot above.
+ r.schedule(c, nil)
+ } else if err != nil {
+ // Some other error occurred; see above.
+ defer r.stop()
+ return done, err
+ }
+ }
+
+ // Make sure that everything has been decoded successfully, otherwise
+ // parts of p may not actually have completed.
+ for pendingInline > 0 {
+ if err := r.schedule(nil, func(c *chunk) error {
+ if err := callback(c); err != nil {
+ return err
+ }
+ // The nil case means that an inline buffer has
+ // completed. The callback will have already removed
+ // the inline buffer from the map, so we just return an
+ // error to check the top of the loop again.
+ return errNewBuffer
+ }); err != errNewBuffer {
+ // Some other error occurred; see above.
+ return done, err
+ }
+ }
+
+ // Need to return done here, since it may have been adjusted by the
+ // callback to compensation for partial reads on some inline buffer.
+ return done, nil
+}
+
+// writer chunks and schedules writes.
+type writer struct {
+ pool
+
+ // out is the underlying writer.
+ out io.Writer
+
+ // closed indicates whether the file has been closed.
+ closed bool
+}
+
+// NewWriter returns a new compressed writer.
+//
+// The recommended chunkSize is on the order of 1M. Extra memory may be
+// buffered (in the form of read-ahead, or buffered writes), and is limited to
+// O(chunkSize * [1+GOMAXPROCS]).
+func NewWriter(out io.Writer, chunkSize uint32, level int) (io.WriteCloser, error) {
+ w := &writer{
+ pool: pool{
+ chunkSize: chunkSize,
+ buf: bufPool.Get().(*bytes.Buffer),
+ },
+ out: out,
+ }
+ w.init(true, level)
+ if err := binary.WriteUint32(w.out, binary.BigEndian, chunkSize); err != nil {
+ return nil, err
+ }
+ return w, nil
+}
+
+// flush writes a single buffer.
+func (w *writer) flush(c *chunk) error {
+ // Prefix each chunk with a length; this allows the reader to safely
+ // limit reads while buffering.
+ l := uint32(c.compressed.Len())
+ if err := binary.WriteUint32(w.out, binary.BigEndian, l); err != nil {
+ return err
+ }
+
+ // Write out to the stream.
+ _, err := io.Copy(w.out, c.compressed)
+ return err
+}
+
+// Write implements io.Writer.Write.
+func (w *writer) Write(p []byte) (int, error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Did we close already?
+ if w.closed {
+ return 0, io.ErrUnexpectedEOF
+ }
+
+ // See above; we need to track in the same way.
+ var (
+ pendingPre = w.nextInput - w.nextOutput
+ pendingInline = 0
+ )
+ callback := func(c *chunk) error {
+ if pendingPre == 0 && pendingInline > 0 {
+ pendingInline--
+ return w.flush(c)
+ }
+ if pendingPre > 0 {
+ pendingPre--
+ }
+ err := w.flush(c)
+ c.uncompressed.Reset()
+ bufPool.Put(c.uncompressed)
+ return err
+ }
+
+ for done := 0; done < len(p); {
+ // Construct an inline buffer if we're doing an inline
+ // encoding; see above regarding the bytes.MinRead constraint.
+ if w.buf.Len() == 0 && len(p) >= done+int(w.chunkSize) && len(p) >= done+bytes.MinRead {
+ bufPool.Put(w.buf) // Return to the pool; never scheduled.
+ w.buf = bytes.NewBuffer(p[done : done+int(w.chunkSize)])
+ done += int(w.chunkSize)
+ pendingInline++
+ }
+
+ // Do we need to flush w.buf? Note that this case should be hit
+ // immediately following the inline case above.
+ left := int(w.chunkSize) - w.buf.Len()
+ if left == 0 {
+ if err := w.schedule(newChunk(nil, w.buf), callback); err != nil {
+ return done, err
+ }
+ // Reset the buffer, since this has now been scheduled
+ // for compression. Note that this may be trampled
+ // immediately by the bufPool.Put(w.buf) above if the
+ // next buffer happens to be inline, but that's okay.
+ w.buf = bufPool.Get().(*bytes.Buffer)
+ continue
+ }
+
+ // Read from p into w.buf.
+ toWrite := len(p) - done
+ if toWrite > left {
+ toWrite = left
+ }
+ n, err := w.buf.Write(p[done : done+toWrite])
+ done += n
+ if err != nil {
+ return done, err
+ }
+ }
+
+ // Make sure that everything has been flushed, we can't return until
+ // all the contents from p have been used.
+ for pendingInline > 0 {
+ if err := w.schedule(nil, func(c *chunk) error {
+ if err := callback(c); err != nil {
+ return err
+ }
+ // The flush was successful, return errNewBuffer here
+ // to break from the loop and check the condition
+ // again.
+ return errNewBuffer
+ }); err != errNewBuffer {
+ return len(p), err
+ }
+ }
+
+ return len(p), nil
+}
+
+// Close implements io.Closer.Close.
+func (w *writer) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // Did we already close? After the call to Close, we always mark as
+ // closed, regardless of whether the flush is successful.
+ if w.closed {
+ return io.ErrUnexpectedEOF
+ }
+ w.closed = true
+ defer w.stop()
+
+ // Schedule any remaining partial buffer; we pass w.flush directly here
+ // because the final buffer is guaranteed to not be an inline buffer.
+ if w.buf.Len() > 0 {
+ if err := w.schedule(newChunk(nil, w.buf), w.flush); err != nil {
+ return err
+ }
+ }
+
+ // Flush all scheduled buffers; see above.
+ if err := w.schedule(nil, w.flush); err != nil {
+ return err
+ }
+
+ // Close the underlying writer (if necessary).
+ if closer, ok := w.out.(io.Closer); ok {
+ return closer.Close()
+ }
+ return nil
+}