From 364ac92baf83f2352f78b718090472639bd92a76 Mon Sep 17 00:00:00 2001 From: Adin Scannell Date: Tue, 23 Jun 2020 23:32:23 -0700 Subject: Support for saving pointers to fields in the state package. Previously, it was not possible to encode/decode an object graph which contained a pointer to a field within another type. This was because the encoder was previously unable to disambiguate a pointer to an object and a pointer within the object. This CL remedies this by constructing an address map tracking the full memory range object occupy. The encoded Refvalue message has been extended to allow references to children objects within another object. Because the encoding process may learn about object structure over time, we cannot encode any objects under the entire graph has been generated. This CL also updates the state package to use standard interfaces intead of reflection-based dispatch in order to improve performance overall. This includes a custom wire protocol to significantly reduce the number of allocations and take advantage of structure packing. As part of these changes, there are a small number of minor changes in other places of the code base: * The lists used during encoding are changed to use intrusive lists with the objectEncodeState directly, which required that the ilist Len() method is updated to work properly with the ElementMapper mechanism. * A bug is fixed in the list code wherein Remove() called on an element that is already removed can corrupt the list (removing the element if there's only a single element). Now the behavior is correct. * Standard error wrapping is introduced. * Compressio was updated to implement the new wire.Reader and wire.Writer inteface methods directly. The lack of a ReadByte and WriteByte caused issues not due to interface dispatch, but because underlying slices for a Read or Write call through an interface would always escape to the heap! * Statify has been updated to support the new APIs. See README.md for a description of how the new mechanism works. PiperOrigin-RevId: 318010298 --- pkg/compressio/compressio.go | 54 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 12 deletions(-) (limited to 'pkg/compressio') diff --git a/pkg/compressio/compressio.go b/pkg/compressio/compressio.go index 5f52cbe74..b094c5662 100644 --- a/pkg/compressio/compressio.go +++ b/pkg/compressio/compressio.go @@ -346,20 +346,22 @@ func (p *pool) schedule(c *chunk, callback func(*chunk) error) error { } } -// reader chunks reads and decompresses. -type reader struct { +// Reader is a compressed reader. +type Reader struct { pool // in is the source. in io.Reader } +var _ io.Reader = (*Reader)(nil) + // NewReader returns a new compressed reader. If key is non-nil, the data stream // is assumed to contain expected hash values, which will be compared against // hash values computed from the compressed bytes. See package comments for // details. -func NewReader(in io.Reader, key []byte) (io.Reader, error) { - r := &reader{ +func NewReader(in io.Reader, key []byte) (*Reader, error) { + r := &Reader{ in: in, } @@ -394,8 +396,19 @@ var errNewBuffer = errors.New("buffer ready") // ErrHashMismatch is returned if the hash does not match. var ErrHashMismatch = errors.New("hash mismatch") +// ReadByte implements wire.Reader.ReadByte. +func (r *Reader) ReadByte() (byte, error) { + var p [1]byte + n, err := r.Read(p[:]) + if n != 1 { + return p[0], err + } + // Suppress EOF. + return p[0], nil +} + // Read implements io.Reader.Read. -func (r *reader) Read(p []byte) (int, error) { +func (r *Reader) Read(p []byte) (int, error) { r.mu.Lock() defer r.mu.Unlock() @@ -551,8 +564,8 @@ func (r *reader) Read(p []byte) (int, error) { return done, nil } -// writer chunks and schedules writes. -type writer struct { +// Writer is a compressed writer. +type Writer struct { pool // out is the underlying writer. @@ -562,6 +575,8 @@ type writer struct { closed bool } +var _ io.Writer = (*Writer)(nil) + // NewWriter returns a new compressed writer. If key is non-nil, hash values are // generated and written out for compressed bytes. See package comments for // details. @@ -569,8 +584,8 @@ type writer struct { // The recommended chunkSize is on the order of 1M. Extra memory may be // buffered (in the form of read-ahead, or buffered writes), and is limited to // O(chunkSize * [1+GOMAXPROCS]). -func NewWriter(out io.Writer, key []byte, chunkSize uint32, level int) (io.WriteCloser, error) { - w := &writer{ +func NewWriter(out io.Writer, key []byte, chunkSize uint32, level int) (*Writer, error) { + w := &Writer{ pool: pool{ chunkSize: chunkSize, buf: bufPool.Get().(*bytes.Buffer), @@ -597,7 +612,7 @@ func NewWriter(out io.Writer, key []byte, chunkSize uint32, level int) (io.Write } // flush writes a single buffer. -func (w *writer) flush(c *chunk) error { +func (w *Writer) flush(c *chunk) error { // Prefix each chunk with a length; this allows the reader to safely // limit reads while buffering. l := uint32(c.compressed.Len()) @@ -624,8 +639,23 @@ func (w *writer) flush(c *chunk) error { return nil } +// WriteByte implements wire.Writer.WriteByte. +// +// Note that this implementation is necessary on the object itself, as an +// interface-based dispatch cannot tell whether the array backing the slice +// escapes, therefore the all bytes written will generate an escape. +func (w *Writer) WriteByte(b byte) error { + var p [1]byte + p[0] = b + n, err := w.Write(p[:]) + if n != 1 { + return err + } + return nil +} + // Write implements io.Writer.Write. -func (w *writer) Write(p []byte) (int, error) { +func (w *Writer) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() @@ -710,7 +740,7 @@ func (w *writer) Write(p []byte) (int, error) { } // Close implements io.Closer.Close. -func (w *writer) Close() error { +func (w *Writer) Close() error { w.mu.Lock() defer w.mu.Unlock() -- cgit v1.2.3