summaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/containerd/ttrpc/channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/containerd/ttrpc/channel.go')
-rw-r--r--vendor/github.com/containerd/ttrpc/channel.go154
1 files changed, 0 insertions, 154 deletions
diff --git a/vendor/github.com/containerd/ttrpc/channel.go b/vendor/github.com/containerd/ttrpc/channel.go
deleted file mode 100644
index 22f5496b4..000000000
--- a/vendor/github.com/containerd/ttrpc/channel.go
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- Copyright The containerd Authors.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package ttrpc
-
-import (
- "bufio"
- "context"
- "encoding/binary"
- "io"
- "net"
- "sync"
-
- "github.com/pkg/errors"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-const (
- messageHeaderLength = 10
- messageLengthMax = 4 << 20
-)
-
-type messageType uint8
-
-const (
- messageTypeRequest messageType = 0x1
- messageTypeResponse messageType = 0x2
-)
-
-// messageHeader represents the fixed-length message header of 10 bytes sent
-// with every request.
-type messageHeader struct {
- Length uint32 // length excluding this header. b[:4]
- StreamID uint32 // identifies which request stream message is a part of. b[4:8]
- Type messageType // message type b[8]
- Flags uint8 // reserved b[9]
-}
-
-func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) {
- _, err := io.ReadFull(r, p[:messageHeaderLength])
- if err != nil {
- return messageHeader{}, err
- }
-
- return messageHeader{
- Length: binary.BigEndian.Uint32(p[:4]),
- StreamID: binary.BigEndian.Uint32(p[4:8]),
- Type: messageType(p[8]),
- Flags: p[9],
- }, nil
-}
-
-func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
- binary.BigEndian.PutUint32(p[:4], mh.Length)
- binary.BigEndian.PutUint32(p[4:8], mh.StreamID)
- p[8] = byte(mh.Type)
- p[9] = mh.Flags
-
- _, err := w.Write(p[:])
- return err
-}
-
-var buffers sync.Pool
-
-type channel struct {
- conn net.Conn
- bw *bufio.Writer
- br *bufio.Reader
- hrbuf [messageHeaderLength]byte // avoid alloc when reading header
- hwbuf [messageHeaderLength]byte
-}
-
-func newChannel(conn net.Conn) *channel {
- return &channel{
- conn: conn,
- bw: bufio.NewWriter(conn),
- br: bufio.NewReader(conn),
- }
-}
-
-// recv a message from the channel. The returned buffer contains the message.
-//
-// If a valid grpc status is returned, the message header
-// returned will be valid and caller should send that along to
-// the correct consumer. The bytes on the underlying channel
-// will be discarded.
-func (ch *channel) recv(ctx context.Context) (messageHeader, []byte, error) {
- mh, err := readMessageHeader(ch.hrbuf[:], ch.br)
- if err != nil {
- return messageHeader{}, nil, err
- }
-
- if mh.Length > uint32(messageLengthMax) {
- if _, err := ch.br.Discard(int(mh.Length)); err != nil {
- return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message")
- }
-
- return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
- }
-
- p := ch.getmbuf(int(mh.Length))
- if _, err := io.ReadFull(ch.br, p); err != nil {
- return messageHeader{}, nil, errors.Wrapf(err, "failed reading message")
- }
-
- return mh, p, nil
-}
-
-func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error {
- if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil {
- return err
- }
-
- _, err := ch.bw.Write(p)
- if err != nil {
- return err
- }
-
- return ch.bw.Flush()
-}
-
-func (ch *channel) getmbuf(size int) []byte {
- // we can't use the standard New method on pool because we want to allocate
- // based on size.
- b, ok := buffers.Get().(*[]byte)
- if !ok || cap(*b) < size {
- // TODO(stevvooe): It may be better to allocate these in fixed length
- // buckets to reduce fragmentation but its not clear that would help
- // with performance. An ilogb approach or similar would work well.
- bb := make([]byte, size)
- b = &bb
- } else {
- *b = (*b)[:size]
- }
- return *b
-}
-
-func (ch *channel) putmbuf(p []byte) {
- buffers.Put(&p)
-}