// Copyright 2018 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package compressio provides parallel compression and decompression, as well // as optional SHA-256 hashing. // // The stream format is defined as follows. // // /------------------------------------------------------\ // | chunk size (4-bytes) | // +------------------------------------------------------+ // | (optional) hash (32-bytes) | // +------------------------------------------------------+ // | compressed data size (4-bytes) | // +------------------------------------------------------+ // | compressed data | // +------------------------------------------------------+ // | (optional) hash (32-bytes) | // +------------------------------------------------------+ // | compressed data size (4-bytes) | // +------------------------------------------------------+ // | ...... | // \------------------------------------------------------/ // // where each subsequent hash is calculated from the following items in order // // compressed data // compressed data size // previous hash // // so the stream integrity cannot be compromised by switching and mixing // compressed chunks. package compressio import ( "bytes" "compress/flate" "errors" "hash" "io" "runtime" "sync" "crypto/hmac" "crypto/sha256" "gvisor.googlesource.com/gvisor/pkg/binary" ) var bufPool = sync.Pool{ New: func() interface{} { return bytes.NewBuffer(nil) }, } var chunkPool = sync.Pool{ New: func() interface{} { return new(chunk) }, } // chunk is a unit of work. type chunk struct { // compressed is compressed data. // // This will always be returned to the bufPool directly when work has // finished (in schedule) and therefore must be allocated. compressed *bytes.Buffer // uncompressed is the uncompressed data. // // This is not returned to the bufPool automatically, since it may // correspond to a inline slice (provided directly to Read or Write). uncompressed *bytes.Buffer // The current hash object. Only used in compress mode. h hash.Hash // The hash from previous chunks. Only used in uncompress mode. lastSum []byte // The expected hash after current chunk. Only used in uncompress mode. sum []byte } // newChunk allocates a new chunk object (or pulls one from the pool). Buffers // will be allocated if nil is provided for compressed or uncompressed. func newChunk(lastSum []byte, sum []byte, compressed *bytes.Buffer, uncompressed *bytes.Buffer) *chunk { c := chunkPool.Get().(*chunk) c.lastSum = lastSum c.sum = sum if compressed != nil { c.compressed = compressed } else { c.compressed = bufPool.Get().(*bytes.Buffer) } if uncompressed != nil { c.uncompressed = uncompressed } else { c.uncompressed = bufPool.Get().(*bytes.Buffer) } return c } // result is the result of some work; it includes the original chunk. type result struct { *chunk err error } // worker is a compression/decompression worker. // // The associated worker goroutine reads in uncompressed buffers from input and // writes compressed buffers to its output. Alternatively, the worker reads // compressed buffers from input and writes uncompressed buffers to its output. // // The goroutine will exit when input is closed, and the goroutine will close // output. type worker struct { hashPool *hashPool input chan *chunk output chan result } // work is the main work routine; see worker. func (w *worker) work(compress bool, level int) { defer close(w.output) var h hash.Hash for c := range w.input { if h == nil && w.hashPool != nil { h = w.hashPool.getHash() } if compress { mw := io.Writer(c.compressed) if h != nil { mw = io.MultiWriter(mw, h) } // Encode this slice. fw, err := flate.NewWriter(mw, level) if err != nil { w.output <- result{c, err} continue } // Encode the input. if _, err := io.CopyN(fw, c.uncompressed, int64(c.uncompressed.Len())); err != nil { w.output <- result{c, err} continue } if err := fw.Close(); err != nil { w.output <- result{c, err} continue } // Write the hash, if enabled. if h != nil { binary.WriteUint32(h, binary.BigEndian, uint32(c.compressed.Len())) c.h = h h = nil } } else { // Check the hash of the compressed contents. if h != nil { h.Write(c.compressed.Bytes()) binary.WriteUint32(h, binary.BigEndian, uint32(c.compressed.Len())) io.CopyN(h, bytes.NewReader(c.lastSum), int64(len(c.lastSum))) sum := h.Sum(nil) h.Reset() if !hmac.Equal(c.sum, sum) { w.output <- result{c, ErrHashMismatch} continue } } // Decode this slice. fr := flate.NewReader(c.compressed) // Decode the input. if _, err := io.Copy(c.uncompressed, fr); err != nil { w.output <- result{c, err} continue } } // Send the output. w.output <- result{c, nil} } } type hashPool struct { // mu protexts the hash list. mu sync.Mutex // key is the key used to create hash objects. key []byte // hashes is the hash object free list. Note that this cannot be // globally shared across readers or writers, as it is key-specific. hashes []hash.Hash } // getHash gets a hash object for the pool. It should only be called when the // pool key is non-nil. func (p *hashPool) getHash() hash.Hash { p.mu.Lock() defer p.mu.Unlock() if len(p.hashes) == 0 { return hmac.New(sha256.New, p.key) } h := p.hashes[len(p.hashes)-1] p.hashes = p.hashes[:len(p.hashes)-1] return h } func (p *hashPool) putHash(h hash.Hash) { h.Reset() p.mu.Lock() defer p.mu.Unlock() p.hashes = append(p.hashes, h) } // pool is common functionality for reader/writers. type pool struct { // workers are the compression/decompression workers. workers []worker // chunkSize is the chunk size. This is the first four bytes in the // stream and is shared across both the reader and writer. chunkSize uint32 // mu protects below; it is generally the responsibility of users to // acquire this mutex before calling any methods on the pool. mu sync.Mutex // nextInput is the next worker for input (scheduling). nextInput int // nextOutput is the next worker for output (result). nextOutput int // buf is the current active buffer; the exact semantics of this buffer // depending on whether this is a reader or a writer. buf *bytes.Buffer // lasSum records the hash of the last chunk processed. lastSum []byte // hashPool is the hash object pool. It cannot be embedded into pool // itself as worker refers to it and that would stop pool from being // GCed. hashPool *hashPool } // init initializes the worker pool. // // This should only be called once. func (p *pool) init(key []byte, workers int, compress bool, level int) { if key != nil { p.hashPool = &hashPool{key: key} } p.workers = make([]worker, workers) for i := 0; i < len(p.workers); i++ { p.workers[i] = worker{ hashPool: p.hashPool, input: make(chan *chunk, 1), output: make(chan result, 1), } go p.workers[i].work(compress, level) // S/R-SAFE: In save path only. } runtime.SetFinalizer(p, (*pool).stop) } // stop stops all workers. func (p *pool) stop() { for i := 0; i < len(p.workers); i++ { close(p.workers[i].input) } p.workers = nil p.hashPool = nil } // handleResult calls the callback. func handleResult(r result, callback func(*chunk) error) error { defer func() { r.chunk.compressed.Reset() bufPool.Put(r.chunk.compressed) chunkPool.Put(r.chunk) }() if r.err != nil { return r.err } return callback(r.chunk) } // schedule schedules the given buffers. // // If c is non-nil, then it will return as soon as the chunk is scheduled. If c // is nil, then it will return only when no more work is left to do. // // If no callback function is provided, then the output channel will be // ignored. You must be sure that the input is schedulable in this case. func (p *pool) schedule(c *chunk, callback func(*chunk) error) error { for { var ( inputChan chan *chunk outputChan chan result ) if c != nil { inputChan = p.workers[(p.nextInput+1)%len(p.workers)].input } if callback != nil && p.nextOutput != p.nextInput { outputChan = p.workers[(p.nextOutput+1)%len(p.workers)].output } if inputChan == nil && outputChan == nil { return nil } select { case inputChan <- c: p.nextInput++ return nil case r := <-outputChan: p.nextOutput++ if err := handleResult(r, callback); err != nil { return err } } } } // reader chunks reads and decompresses. type reader struct { pool // in is the source. in io.Reader } // NewReader returns a new compressed reader. If key is non-nil, the data stream // is assumed to contain expected hash values, which will be compared against // hash values computed from the compressed bytes. See package comments for // details. func NewReader(in io.Reader, key []byte) (io.Reader, error) { r := &reader{ in: in, } // Use double buffering for read. r.init(key, 2*runtime.GOMAXPROCS(0), false, 0) var err error if r.chunkSize, err = binary.ReadUint32(in, binary.BigEndian); err != nil { return nil, err } if r.hashPool != nil { h := r.hashPool.getHash() binary.WriteUint32(h, binary.BigEndian, r.chunkSize) r.lastSum = h.Sum(nil) r.hashPool.putHash(h) sum := make([]byte, len(r.lastSum)) if _, err := io.ReadFull(r.in, sum); err != nil { return nil, err } if !hmac.Equal(r.lastSum, sum) { return nil, ErrHashMismatch } } return r, nil } // errNewBuffer is returned when a new buffer is completed. var errNewBuffer = errors.New("buffer ready") // ErrHashMismatch is returned if the hash does not match. var ErrHashMismatch = errors.New("hash mismatch") // Read implements io.Reader.Read. func (r *reader) Read(p []byte) (int, error) { r.mu.Lock() defer r.mu.Unlock() // Total bytes completed; this is declared up front because it must be // adjustable by the callback below. done := 0 // Total bytes pending in the asynchronous workers for buffers. This is // used to process the proper regions of the input as inline buffers. var ( pendingPre = r.nextInput - r.nextOutput pendingInline = 0 ) // Define our callback for completed work. callback := func(c *chunk) error { // Check for an inline buffer. if pendingPre == 0 && pendingInline > 0 { pendingInline-- done += c.uncompressed.Len() return nil } // Copy the resulting buffer to our intermediate one, and // return errNewBuffer to ensure that we aren't called a second // time. This error code is handled specially below. // // c.buf will be freed and return to the pool when it is done. if pendingPre > 0 { pendingPre-- } r.buf = c.uncompressed return errNewBuffer } for done < len(p) { // Do we have buffered data available? if r.buf != nil { n, err := r.buf.Read(p[done:]) done += n if err == io.EOF { // This is the uncompressed buffer, it can be // returned to the pool at this point. r.buf.Reset() bufPool.Put(r.buf) r.buf = nil } else if err != nil { // Should never happen. defer r.stop() return done, err } continue } // Read the length of the next chunk and reset the // reader. The length is used to limit the reader. // // See writer.flush. l, err := binary.ReadUint32(r.in, binary.BigEndian) if err != nil { // This is generally okay as long as there // are still buffers outstanding. We actually // just wait for completion of those buffers here // and continue our loop. if err := r.schedule(nil, callback); err == nil { // We've actually finished all buffers; this is // the normal EOF exit path. defer r.stop() return done, io.EOF } else if err == errNewBuffer { // A new buffer is now available. continue } else { // Some other error occurred; we cannot // process any further. defer r.stop() return done, err } } // Read this chunk and schedule decompression. compressed := bufPool.Get().(*bytes.Buffer) if _, err := io.CopyN(compressed, r.in, int64(l)); err != nil { // Some other error occurred; see above. if err == io.EOF { err = io.ErrUnexpectedEOF } return done, err } var sum []byte if r.hashPool != nil { sum = make([]byte, len(r.lastSum)) if _, err := io.ReadFull(r.in, sum); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF } return done, err } } // Are we doing inline decoding? // // Note that we need to check the length here against // bytes.MinRead, since the bytes library will choose to grow // the slice if the available capacity is not at least // bytes.MinRead. This limits inline decoding to chunkSizes // that are at least bytes.MinRead (which is not unreasonable). var c *chunk start := done + ((pendingPre + pendingInline) * int(r.chunkSize)) if len(p) >= start+int(r.chunkSize) && len(p) >= start+bytes.MinRead { c = newChunk(r.lastSum, sum, compressed, bytes.NewBuffer(p[start:start])) pendingInline++ } else { c = newChunk(r.lastSum, sum, compressed, nil) } r.lastSum = sum if err := r.schedule(c, callback); err == errNewBuffer { // A new buffer was completed while we were reading. // That's great, but we need to force schedule the // current buffer so that it does not get lost. // // It is safe to pass nil as an output function here, // because we know that we just freed up a slot above. r.schedule(c, nil) } else if err != nil { // Some other error occurred; see above. defer r.stop() return done, err } } // Make sure that everything has been decoded successfully, otherwise // parts of p may not actually have completed. for pendingInline > 0 { if err := r.schedule(nil, func(c *chunk) error { if err := callback(c); err != nil { return err } // The nil case means that an inline buffer has // completed. The callback will have already removed // the inline buffer from the map, so we just return an // error to check the top of the loop again. return errNewBuffer }); err != errNewBuffer { // Some other error occurred; see above. return done, err } } // Need to return done here, since it may have been adjusted by the // callback to compensation for partial reads on some inline buffer. return done, nil } // writer chunks and schedules writes. type writer struct { pool // out is the underlying writer. out io.Writer // closed indicates whether the file has been closed. closed bool } // NewWriter returns a new compressed writer. If key is non-nil, hash values are // generated and written out for compressed bytes. See package comments for // details. // // The recommended chunkSize is on the order of 1M. Extra memory may be // buffered (in the form of read-ahead, or buffered writes), and is limited to // O(chunkSize * [1+GOMAXPROCS]). func NewWriter(out io.Writer, key []byte, chunkSize uint32, level int) (io.WriteCloser, error) { w := &writer{ pool: pool{ chunkSize: chunkSize, buf: bufPool.Get().(*bytes.Buffer), }, out: out, } w.init(key, 1+runtime.GOMAXPROCS(0), true, level) if err := binary.WriteUint32(w.out, binary.BigEndian, chunkSize); err != nil { return nil, err } if w.hashPool != nil { h := w.hashPool.getHash() binary.WriteUint32(h, binary.BigEndian, chunkSize) w.lastSum = h.Sum(nil) w.hashPool.putHash(h) if _, err := io.CopyN(w.out, bytes.NewReader(w.lastSum), int64(len(w.lastSum))); err != nil { return nil, err } } return w, nil } // flush writes a single buffer. func (w *writer) flush(c *chunk) error { // Prefix each chunk with a length; this allows the reader to safely // limit reads while buffering. l := uint32(c.compressed.Len()) if err := binary.WriteUint32(w.out, binary.BigEndian, l); err != nil { return err } // Write out to the stream. if _, err := io.CopyN(w.out, c.compressed, int64(c.compressed.Len())); err != nil { return err } if w.hashPool != nil { io.CopyN(c.h, bytes.NewReader(w.lastSum), int64(len(w.lastSum))) sum := c.h.Sum(nil) w.hashPool.putHash(c.h) c.h = nil if _, err := io.CopyN(w.out, bytes.NewReader(sum), int64(len(sum))); err != nil { return err } w.lastSum = sum } return nil } // Write implements io.Writer.Write. func (w *writer) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() // Did we close already? if w.closed { return 0, io.ErrUnexpectedEOF } // See above; we need to track in the same way. var ( pendingPre = w.nextInput - w.nextOutput pendingInline = 0 ) callback := func(c *chunk) error { if pendingPre == 0 && pendingInline > 0 { pendingInline-- return w.flush(c) } if pendingPre > 0 { pendingPre-- } err := w.flush(c) c.uncompressed.Reset() bufPool.Put(c.uncompressed) return err } for done := 0; done < len(p); { // Construct an inline buffer if we're doing an inline // encoding; see above regarding the bytes.MinRead constraint. if w.buf.Len() == 0 && len(p) >= done+int(w.chunkSize) && len(p) >= done+bytes.MinRead { bufPool.Put(w.buf) // Return to the pool; never scheduled. w.buf = bytes.NewBuffer(p[done : done+int(w.chunkSize)]) done += int(w.chunkSize) pendingInline++ } // Do we need to flush w.buf? Note that this case should be hit // immediately following the inline case above. left := int(w.chunkSize) - w.buf.Len() if left == 0 { if err := w.schedule(newChunk(nil, nil, nil, w.buf), callback); err != nil { return done, err } // Reset the buffer, since this has now been scheduled // for compression. Note that this may be trampled // immediately by the bufPool.Put(w.buf) above if the // next buffer happens to be inline, but that's okay. w.buf = bufPool.Get().(*bytes.Buffer) continue } // Read from p into w.buf. toWrite := len(p) - done if toWrite > left { toWrite = left } n, err := w.buf.Write(p[done : done+toWrite]) done += n if err != nil { return done, err } } // Make sure that everything has been flushed, we can't return until // all the contents from p have been used. for pendingInline > 0 { if err := w.schedule(nil, func(c *chunk) error { if err := callback(c); err != nil { return err } // The flush was successful, return errNewBuffer here // to break from the loop and check the condition // again. return errNewBuffer }); err != errNewBuffer { return len(p), err } } return len(p), nil } // Close implements io.Closer.Close. func (w *writer) Close() error { w.mu.Lock() defer w.mu.Unlock() // Did we already close? After the call to Close, we always mark as // closed, regardless of whether the flush is successful. if w.closed { return io.ErrUnexpectedEOF } w.closed = true defer w.stop() // Schedule any remaining partial buffer; we pass w.flush directly here // because the final buffer is guaranteed to not be an inline buffer. if w.buf.Len() > 0 { if err := w.schedule(newChunk(nil, nil, nil, w.buf), w.flush); err != nil { return err } } // Flush all scheduled buffers; see above. if err := w.schedule(nil, w.flush); err != nil { return err } // Close the underlying writer (if necessary). if closer, ok := w.out.(io.Closer); ok { return closer.Close() } return nil }