summaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/containerd/ttrpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/containerd/ttrpc')
-rw-r--r--vendor/github.com/containerd/ttrpc/README.md10
-rw-r--r--vendor/github.com/containerd/ttrpc/client.go27
-rw-r--r--vendor/github.com/containerd/ttrpc/server.go15
-rw-r--r--vendor/github.com/containerd/ttrpc/services.go2
-rw-r--r--vendor/github.com/containerd/ttrpc/types.go7
5 files changed, 50 insertions, 11 deletions
diff --git a/vendor/github.com/containerd/ttrpc/README.md b/vendor/github.com/containerd/ttrpc/README.md
index d1eed6b12..c345c844e 100644
--- a/vendor/github.com/containerd/ttrpc/README.md
+++ b/vendor/github.com/containerd/ttrpc/README.md
@@ -50,3 +50,13 @@ TODO:
- [ ] Document protocol layout
- [ ] Add testing under concurrent load to ensure
- [ ] Verify connection error handling
+
+# Project details
+
+ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
+As a containerd sub-project, you will find the:
+ * [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
+ * [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
+ * and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
+
+information in our [`containerd/project`](https://github.com/containerd/project) repository.
diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go
index e40592dd7..35ca91fba 100644
--- a/vendor/github.com/containerd/ttrpc/client.go
+++ b/vendor/github.com/containerd/ttrpc/client.go
@@ -24,6 +24,7 @@ import (
"strings"
"sync"
"syscall"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
@@ -48,7 +49,15 @@ type Client struct {
err error
}
-func NewClient(conn net.Conn) *Client {
+type ClientOpts func(c *Client)
+
+func WithOnClose(onClose func()) ClientOpts {
+ return func(c *Client) {
+ c.closeFunc = onClose
+ }
+}
+
+func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
c := &Client{
codec: codec{},
conn: conn,
@@ -59,6 +68,10 @@ func NewClient(conn net.Conn) *Client {
closeFunc: func() {},
}
+ for _, o := range opts {
+ o(c)
+ }
+
go c.run()
return c
}
@@ -86,6 +99,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
cresp = &Response{}
)
+ if dl, ok := ctx.Deadline(); ok {
+ creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
+ }
+
if err := c.dispatch(ctx, creq, cresp); err != nil {
return err
}
@@ -104,6 +121,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1)
call := &callRequest{
+ ctx: ctx,
req: req,
resp: resp,
errs: errs,
@@ -135,11 +153,6 @@ func (c *Client) Close() error {
return nil
}
-// OnClose allows a close func to be called when the server is closed
-func (c *Client) OnClose(closer func()) {
- c.closeFunc = closer
-}
-
type message struct {
messageHeader
p []byte
@@ -249,7 +262,7 @@ func (c *Client) recv(resp *Response, msg *message) error {
}
if msg.Type != messageTypeResponse {
- return errors.New("unkown message type received")
+ return errors.New("unknown message type received")
}
defer c.channel.putmbuf(msg.p)
diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go
index 263cb4583..40804eac0 100644
--- a/vendor/github.com/containerd/ttrpc/server.go
+++ b/vendor/github.com/containerd/ttrpc/server.go
@@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
case request := <-requests:
active++
go func(id uint32) {
+ ctx, cancel := getRequestContext(ctx, request.req)
+ defer cancel()
+
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{
Status: status.Proto(),
@@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
}
}
}
+
+var noopFunc = func() {}
+
+func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
+ cancel = noopFunc
+ if req.TimeoutNano == 0 {
+ return ctx, cancel
+ }
+
+ ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
+ return ctx, cancel
+}
diff --git a/vendor/github.com/containerd/ttrpc/services.go b/vendor/github.com/containerd/ttrpc/services.go
index e90963825..fe1cade5a 100644
--- a/vendor/github.com/containerd/ttrpc/services.go
+++ b/vendor/github.com/containerd/ttrpc/services.go
@@ -76,7 +76,7 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin
switch v := obj.(type) {
case proto.Message:
if err := proto.Unmarshal(p, v); err != nil {
- return status.Errorf(codes.Internal, "ttrpc: error unmarshaling payload: %v", err.Error())
+ return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error())
}
default:
return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v)
diff --git a/vendor/github.com/containerd/ttrpc/types.go b/vendor/github.com/containerd/ttrpc/types.go
index 1f7969e5c..a6b3b818e 100644
--- a/vendor/github.com/containerd/ttrpc/types.go
+++ b/vendor/github.com/containerd/ttrpc/types.go
@@ -23,9 +23,10 @@ import (
)
type Request struct {
- Service string `protobuf:"bytes,1,opt,name=service,proto3"`
- Method string `protobuf:"bytes,2,opt,name=method,proto3"`
- Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
+ Service string `protobuf:"bytes,1,opt,name=service,proto3"`
+ Method string `protobuf:"bytes,2,opt,name=method,proto3"`
+ Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
+ TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
}
func (r *Request) Reset() { *r = Request{} }