diff options
Diffstat (limited to 'vendor/github.com/containerd/ttrpc')
-rw-r--r-- | vendor/github.com/containerd/ttrpc/README.md | 10 | ||||
-rw-r--r-- | vendor/github.com/containerd/ttrpc/client.go | 27 | ||||
-rw-r--r-- | vendor/github.com/containerd/ttrpc/server.go | 15 | ||||
-rw-r--r-- | vendor/github.com/containerd/ttrpc/services.go | 2 | ||||
-rw-r--r-- | vendor/github.com/containerd/ttrpc/types.go | 7 |
5 files changed, 50 insertions, 11 deletions
diff --git a/vendor/github.com/containerd/ttrpc/README.md b/vendor/github.com/containerd/ttrpc/README.md index d1eed6b12..c345c844e 100644 --- a/vendor/github.com/containerd/ttrpc/README.md +++ b/vendor/github.com/containerd/ttrpc/README.md @@ -50,3 +50,13 @@ TODO: - [ ] Document protocol layout - [ ] Add testing under concurrent load to ensure - [ ] Verify connection error handling + +# Project details + +ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE). +As a containerd sub-project, you will find the: + * [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md), + * [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS), + * and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md) + +information in our [`containerd/project`](https://github.com/containerd/project) repository. diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index e40592dd7..35ca91fba 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" @@ -48,7 +49,15 @@ type Client struct { err error } -func NewClient(conn net.Conn) *Client { +type ClientOpts func(c *Client) + +func WithOnClose(onClose func()) ClientOpts { + return func(c *Client) { + c.closeFunc = onClose + } +} + +func NewClient(conn net.Conn, opts ...ClientOpts) *Client { c := &Client{ codec: codec{}, conn: conn, @@ -59,6 +68,10 @@ func NewClient(conn net.Conn) *Client { closeFunc: func() {}, } + for _, o := range opts { + o(c) + } + go c.run() return c } @@ -86,6 +99,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int cresp = &Response{} ) + if dl, ok := ctx.Deadline(); ok { + creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds() + } + if err := c.dispatch(ctx, creq, cresp); err != nil { return err } @@ -104,6 +121,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error { errs := make(chan error, 1) call := &callRequest{ + ctx: ctx, req: req, resp: resp, errs: errs, @@ -135,11 +153,6 @@ func (c *Client) Close() error { return nil } -// OnClose allows a close func to be called when the server is closed -func (c *Client) OnClose(closer func()) { - c.closeFunc = closer -} - type message struct { messageHeader p []byte @@ -249,7 +262,7 @@ func (c *Client) recv(resp *Response, msg *message) error { } if msg.Type != messageTypeResponse { - return errors.New("unkown message type received") + return errors.New("unknown message type received") } defer c.channel.putmbuf(msg.p) diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index 263cb4583..40804eac0 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) { case request := <-requests: active++ go func(id uint32) { + ctx, cancel := getRequestContext(ctx, request.req) + defer cancel() + p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) resp := &Response{ Status: status.Proto(), @@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) { } } } + +var noopFunc = func() {} + +func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) { + cancel = noopFunc + if req.TimeoutNano == 0 { + return ctx, cancel + } + + ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano)) + return ctx, cancel +} diff --git a/vendor/github.com/containerd/ttrpc/services.go b/vendor/github.com/containerd/ttrpc/services.go index e90963825..fe1cade5a 100644 --- a/vendor/github.com/containerd/ttrpc/services.go +++ b/vendor/github.com/containerd/ttrpc/services.go @@ -76,7 +76,7 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin switch v := obj.(type) { case proto.Message: if err := proto.Unmarshal(p, v); err != nil { - return status.Errorf(codes.Internal, "ttrpc: error unmarshaling payload: %v", err.Error()) + return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error()) } default: return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v) diff --git a/vendor/github.com/containerd/ttrpc/types.go b/vendor/github.com/containerd/ttrpc/types.go index 1f7969e5c..a6b3b818e 100644 --- a/vendor/github.com/containerd/ttrpc/types.go +++ b/vendor/github.com/containerd/ttrpc/types.go @@ -23,9 +23,10 @@ import ( ) type Request struct { - Service string `protobuf:"bytes,1,opt,name=service,proto3"` - Method string `protobuf:"bytes,2,opt,name=method,proto3"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` + Service string `protobuf:"bytes,1,opt,name=service,proto3"` + Method string `protobuf:"bytes,2,opt,name=method,proto3"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` + TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"` } func (r *Request) Reset() { *r = Request{} } |