diff options
Diffstat (limited to 'vendor/github.com/containerd/ttrpc/channel.go')
-rw-r--r-- | vendor/github.com/containerd/ttrpc/channel.go | 154 |
1 files changed, 0 insertions, 154 deletions
diff --git a/vendor/github.com/containerd/ttrpc/channel.go b/vendor/github.com/containerd/ttrpc/channel.go deleted file mode 100644 index 22f5496b4..000000000 --- a/vendor/github.com/containerd/ttrpc/channel.go +++ /dev/null @@ -1,154 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package ttrpc - -import ( - "bufio" - "context" - "encoding/binary" - "io" - "net" - "sync" - - "github.com/pkg/errors" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -const ( - messageHeaderLength = 10 - messageLengthMax = 4 << 20 -) - -type messageType uint8 - -const ( - messageTypeRequest messageType = 0x1 - messageTypeResponse messageType = 0x2 -) - -// messageHeader represents the fixed-length message header of 10 bytes sent -// with every request. -type messageHeader struct { - Length uint32 // length excluding this header. b[:4] - StreamID uint32 // identifies which request stream message is a part of. b[4:8] - Type messageType // message type b[8] - Flags uint8 // reserved b[9] -} - -func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) { - _, err := io.ReadFull(r, p[:messageHeaderLength]) - if err != nil { - return messageHeader{}, err - } - - return messageHeader{ - Length: binary.BigEndian.Uint32(p[:4]), - StreamID: binary.BigEndian.Uint32(p[4:8]), - Type: messageType(p[8]), - Flags: p[9], - }, nil -} - -func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error { - binary.BigEndian.PutUint32(p[:4], mh.Length) - binary.BigEndian.PutUint32(p[4:8], mh.StreamID) - p[8] = byte(mh.Type) - p[9] = mh.Flags - - _, err := w.Write(p[:]) - return err -} - -var buffers sync.Pool - -type channel struct { - conn net.Conn - bw *bufio.Writer - br *bufio.Reader - hrbuf [messageHeaderLength]byte // avoid alloc when reading header - hwbuf [messageHeaderLength]byte -} - -func newChannel(conn net.Conn) *channel { - return &channel{ - conn: conn, - bw: bufio.NewWriter(conn), - br: bufio.NewReader(conn), - } -} - -// recv a message from the channel. The returned buffer contains the message. -// -// If a valid grpc status is returned, the message header -// returned will be valid and caller should send that along to -// the correct consumer. The bytes on the underlying channel -// will be discarded. -func (ch *channel) recv(ctx context.Context) (messageHeader, []byte, error) { - mh, err := readMessageHeader(ch.hrbuf[:], ch.br) - if err != nil { - return messageHeader{}, nil, err - } - - if mh.Length > uint32(messageLengthMax) { - if _, err := ch.br.Discard(int(mh.Length)); err != nil { - return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message") - } - - return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax) - } - - p := ch.getmbuf(int(mh.Length)) - if _, err := io.ReadFull(ch.br, p); err != nil { - return messageHeader{}, nil, errors.Wrapf(err, "failed reading message") - } - - return mh, p, nil -} - -func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error { - if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil { - return err - } - - _, err := ch.bw.Write(p) - if err != nil { - return err - } - - return ch.bw.Flush() -} - -func (ch *channel) getmbuf(size int) []byte { - // we can't use the standard New method on pool because we want to allocate - // based on size. - b, ok := buffers.Get().(*[]byte) - if !ok || cap(*b) < size { - // TODO(stevvooe): It may be better to allocate these in fixed length - // buckets to reduce fragmentation but its not clear that would help - // with performance. An ilogb approach or similar would work well. - bb := make([]byte, size) - b = &bb - } else { - *b = (*b)[:size] - } - return *b -} - -func (ch *channel) putmbuf(p []byte) { - buffers.Put(&p) -} |