diff options
Diffstat (limited to 'pkg/eventchannel')
-rw-r--r-- | pkg/eventchannel/event.go | 165 | ||||
-rwxr-xr-x | pkg/eventchannel/eventchannel_go_proto/event.pb.go | 85 | ||||
-rwxr-xr-x | pkg/eventchannel/eventchannel_state_autogen.go | 4 |
3 files changed, 254 insertions, 0 deletions
diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go new file mode 100644 index 000000000..4c8ae573b --- /dev/null +++ b/pkg/eventchannel/event.go @@ -0,0 +1,165 @@ +// Copyright 2018 The gVisor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventchannel contains functionality for sending any protobuf message +// on a socketpair. +// +// The wire format is a uvarint length followed by a binary protobuf.Any +// message. +package eventchannel + +import ( + "encoding/binary" + "fmt" + "sync" + "syscall" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + pb "gvisor.googlesource.com/gvisor/pkg/eventchannel/eventchannel_go_proto" + "gvisor.googlesource.com/gvisor/pkg/log" + "gvisor.googlesource.com/gvisor/pkg/unet" +) + +// Emitter emits a proto message. +type Emitter interface { + // Emit writes a single eventchannel message to an emitter. Emit should + // return hangup = true to indicate an emitter has "hung up" and no further + // messages should be directed to it. + Emit(msg proto.Message) (hangup bool, err error) + + // Close closes this emitter. Emit cannot be used after Close is called. + Close() error +} + +var ( + mu sync.Mutex + emitters = make(map[Emitter]struct{}) +) + +// Emit emits a message using all added emitters. +func Emit(msg proto.Message) error { + mu.Lock() + defer mu.Unlock() + + var err error + for e := range emitters { + hangup, eerr := e.Emit(msg) + if eerr != nil { + if err == nil { + err = fmt.Errorf("error emitting %v: on %v: %v", msg, e, eerr) + } else { + err = fmt.Errorf("%v; on %v: %v", err, e, eerr) + } + + // Log as well, since most callers ignore the error. + log.Warningf("Error emitting %v on %v: %v", msg, e, eerr) + } + if hangup { + log.Infof("Hangup on eventchannel emitter %v.", e) + delete(emitters, e) + } + } + + return err +} + +// AddEmitter adds a new emitter. +func AddEmitter(e Emitter) { + mu.Lock() + defer mu.Unlock() + emitters[e] = struct{}{} +} + +func marshal(msg proto.Message) ([]byte, error) { + anypb, err := ptypes.MarshalAny(msg) + if err != nil { + return nil, err + } + + // Wire format is uvarint message length followed by binary proto. + bufMsg, err := proto.Marshal(anypb) + if err != nil { + return nil, err + } + p := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(p, uint64(len(bufMsg))) + return append(p[:n], bufMsg...), nil +} + +// socketEmitter emits proto messages on a socket. +type socketEmitter struct { + socket *unet.Socket +} + +// SocketEmitter creates a new event channel based on the given fd. +// +// SocketEmitter takes ownership of fd. +func SocketEmitter(fd int) (Emitter, error) { + s, err := unet.NewSocket(fd) + if err != nil { + return nil, err + } + + return &socketEmitter{ + socket: s, + }, nil +} + +// Emit implements Emitter.Emit. +func (s *socketEmitter) Emit(msg proto.Message) (bool, error) { + p, err := marshal(msg) + if err != nil { + return false, err + } + for done := 0; done < len(p); { + n, err := s.socket.Write(p[done:]) + if err != nil { + return (err == syscall.EPIPE), err + } + done += n + } + return false, nil +} + +// Close implements Emitter.Emit. +func (s *socketEmitter) Close() error { + return s.socket.Close() +} + +// debugEmitter wraps an emitter to emit stringified event messages. This is +// useful for debugging -- when the messages are intended for humans. +type debugEmitter struct { + inner Emitter +} + +// DebugEmitterFrom creates a new event channel emitter by wraping an existing +// raw emitter. +func DebugEmitterFrom(inner Emitter) Emitter { + return &debugEmitter{ + inner: inner, + } +} + +func (d *debugEmitter) Emit(msg proto.Message) (bool, error) { + ev := &pb.DebugEvent{ + Name: proto.MessageName(msg), + Text: proto.MarshalTextString(msg), + } + return d.inner.Emit(ev) +} + +func (d *debugEmitter) Close() error { + return d.inner.Close() +} diff --git a/pkg/eventchannel/eventchannel_go_proto/event.pb.go b/pkg/eventchannel/eventchannel_go_proto/event.pb.go new file mode 100755 index 000000000..bb71ed3e6 --- /dev/null +++ b/pkg/eventchannel/eventchannel_go_proto/event.pb.go @@ -0,0 +1,85 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: pkg/eventchannel/event.proto + +package gvisor + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type DebugEvent struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DebugEvent) Reset() { *m = DebugEvent{} } +func (m *DebugEvent) String() string { return proto.CompactTextString(m) } +func (*DebugEvent) ProtoMessage() {} +func (*DebugEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_fcfbd51abd9de962, []int{0} +} + +func (m *DebugEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DebugEvent.Unmarshal(m, b) +} +func (m *DebugEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DebugEvent.Marshal(b, m, deterministic) +} +func (m *DebugEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_DebugEvent.Merge(m, src) +} +func (m *DebugEvent) XXX_Size() int { + return xxx_messageInfo_DebugEvent.Size(m) +} +func (m *DebugEvent) XXX_DiscardUnknown() { + xxx_messageInfo_DebugEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_DebugEvent proto.InternalMessageInfo + +func (m *DebugEvent) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *DebugEvent) GetText() string { + if m != nil { + return m.Text + } + return "" +} + +func init() { + proto.RegisterType((*DebugEvent)(nil), "gvisor.DebugEvent") +} + +func init() { proto.RegisterFile("pkg/eventchannel/event.proto", fileDescriptor_fcfbd51abd9de962) } + +var fileDescriptor_fcfbd51abd9de962 = []byte{ + // 103 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0xc8, 0x4e, 0xd7, + 0x4f, 0x2d, 0x4b, 0xcd, 0x2b, 0x49, 0xce, 0x48, 0xcc, 0xcb, 0x4b, 0xcd, 0x81, 0x70, 0xf4, 0x0a, + 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0xd8, 0xd2, 0xcb, 0x32, 0x8b, 0xf3, 0x8b, 0x94, 0x4c, 0xb8, 0xb8, + 0x5c, 0x52, 0x93, 0x4a, 0xd3, 0x5d, 0x41, 0x72, 0x42, 0x42, 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, + 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0x36, 0x48, 0xac, 0x24, 0xb5, 0xa2, 0x44, 0x82, + 0x09, 0x22, 0x06, 0x62, 0x27, 0xb1, 0x81, 0x0d, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x17, + 0xee, 0x7f, 0xef, 0x64, 0x00, 0x00, 0x00, +} diff --git a/pkg/eventchannel/eventchannel_state_autogen.go b/pkg/eventchannel/eventchannel_state_autogen.go new file mode 100755 index 000000000..cfd3a5e43 --- /dev/null +++ b/pkg/eventchannel/eventchannel_state_autogen.go @@ -0,0 +1,4 @@ +// automatically generated by stateify. + +package eventchannel + |