summaryrefslogtreecommitdiffhomepage
path: root/pkg/eventchannel
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/eventchannel')
-rw-r--r--pkg/eventchannel/event.go165
-rwxr-xr-xpkg/eventchannel/eventchannel_go_proto/event.pb.go85
-rwxr-xr-xpkg/eventchannel/eventchannel_state_autogen.go4
3 files changed, 254 insertions, 0 deletions
diff --git a/pkg/eventchannel/event.go b/pkg/eventchannel/event.go
new file mode 100644
index 000000000..4c8ae573b
--- /dev/null
+++ b/pkg/eventchannel/event.go
@@ -0,0 +1,165 @@
+// Copyright 2018 The gVisor Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package eventchannel contains functionality for sending any protobuf message
+// on a socketpair.
+//
+// The wire format is a uvarint length followed by a binary protobuf.Any
+// message.
+package eventchannel
+
+import (
+ "encoding/binary"
+ "fmt"
+ "sync"
+ "syscall"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ pb "gvisor.googlesource.com/gvisor/pkg/eventchannel/eventchannel_go_proto"
+ "gvisor.googlesource.com/gvisor/pkg/log"
+ "gvisor.googlesource.com/gvisor/pkg/unet"
+)
+
+// Emitter emits a proto message.
+type Emitter interface {
+ // Emit writes a single eventchannel message to an emitter. Emit should
+ // return hangup = true to indicate an emitter has "hung up" and no further
+ // messages should be directed to it.
+ Emit(msg proto.Message) (hangup bool, err error)
+
+ // Close closes this emitter. Emit cannot be used after Close is called.
+ Close() error
+}
+
+var (
+ mu sync.Mutex
+ emitters = make(map[Emitter]struct{})
+)
+
+// Emit emits a message using all added emitters.
+func Emit(msg proto.Message) error {
+ mu.Lock()
+ defer mu.Unlock()
+
+ var err error
+ for e := range emitters {
+ hangup, eerr := e.Emit(msg)
+ if eerr != nil {
+ if err == nil {
+ err = fmt.Errorf("error emitting %v: on %v: %v", msg, e, eerr)
+ } else {
+ err = fmt.Errorf("%v; on %v: %v", err, e, eerr)
+ }
+
+ // Log as well, since most callers ignore the error.
+ log.Warningf("Error emitting %v on %v: %v", msg, e, eerr)
+ }
+ if hangup {
+ log.Infof("Hangup on eventchannel emitter %v.", e)
+ delete(emitters, e)
+ }
+ }
+
+ return err
+}
+
+// AddEmitter adds a new emitter.
+func AddEmitter(e Emitter) {
+ mu.Lock()
+ defer mu.Unlock()
+ emitters[e] = struct{}{}
+}
+
+func marshal(msg proto.Message) ([]byte, error) {
+ anypb, err := ptypes.MarshalAny(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ // Wire format is uvarint message length followed by binary proto.
+ bufMsg, err := proto.Marshal(anypb)
+ if err != nil {
+ return nil, err
+ }
+ p := make([]byte, binary.MaxVarintLen64)
+ n := binary.PutUvarint(p, uint64(len(bufMsg)))
+ return append(p[:n], bufMsg...), nil
+}
+
+// socketEmitter emits proto messages on a socket.
+type socketEmitter struct {
+ socket *unet.Socket
+}
+
+// SocketEmitter creates a new event channel based on the given fd.
+//
+// SocketEmitter takes ownership of fd.
+func SocketEmitter(fd int) (Emitter, error) {
+ s, err := unet.NewSocket(fd)
+ if err != nil {
+ return nil, err
+ }
+
+ return &socketEmitter{
+ socket: s,
+ }, nil
+}
+
+// Emit implements Emitter.Emit.
+func (s *socketEmitter) Emit(msg proto.Message) (bool, error) {
+ p, err := marshal(msg)
+ if err != nil {
+ return false, err
+ }
+ for done := 0; done < len(p); {
+ n, err := s.socket.Write(p[done:])
+ if err != nil {
+ return (err == syscall.EPIPE), err
+ }
+ done += n
+ }
+ return false, nil
+}
+
+// Close implements Emitter.Emit.
+func (s *socketEmitter) Close() error {
+ return s.socket.Close()
+}
+
+// debugEmitter wraps an emitter to emit stringified event messages. This is
+// useful for debugging -- when the messages are intended for humans.
+type debugEmitter struct {
+ inner Emitter
+}
+
+// DebugEmitterFrom creates a new event channel emitter by wraping an existing
+// raw emitter.
+func DebugEmitterFrom(inner Emitter) Emitter {
+ return &debugEmitter{
+ inner: inner,
+ }
+}
+
+func (d *debugEmitter) Emit(msg proto.Message) (bool, error) {
+ ev := &pb.DebugEvent{
+ Name: proto.MessageName(msg),
+ Text: proto.MarshalTextString(msg),
+ }
+ return d.inner.Emit(ev)
+}
+
+func (d *debugEmitter) Close() error {
+ return d.inner.Close()
+}
diff --git a/pkg/eventchannel/eventchannel_go_proto/event.pb.go b/pkg/eventchannel/eventchannel_go_proto/event.pb.go
new file mode 100755
index 000000000..bb71ed3e6
--- /dev/null
+++ b/pkg/eventchannel/eventchannel_go_proto/event.pb.go
@@ -0,0 +1,85 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: pkg/eventchannel/event.proto
+
+package gvisor
+
+import (
+ fmt "fmt"
+ proto "github.com/golang/protobuf/proto"
+ math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+type DebugEvent struct {
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Text string `protobuf:"bytes,2,opt,name=text,proto3" json:"text,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *DebugEvent) Reset() { *m = DebugEvent{} }
+func (m *DebugEvent) String() string { return proto.CompactTextString(m) }
+func (*DebugEvent) ProtoMessage() {}
+func (*DebugEvent) Descriptor() ([]byte, []int) {
+ return fileDescriptor_fcfbd51abd9de962, []int{0}
+}
+
+func (m *DebugEvent) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_DebugEvent.Unmarshal(m, b)
+}
+func (m *DebugEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_DebugEvent.Marshal(b, m, deterministic)
+}
+func (m *DebugEvent) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_DebugEvent.Merge(m, src)
+}
+func (m *DebugEvent) XXX_Size() int {
+ return xxx_messageInfo_DebugEvent.Size(m)
+}
+func (m *DebugEvent) XXX_DiscardUnknown() {
+ xxx_messageInfo_DebugEvent.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_DebugEvent proto.InternalMessageInfo
+
+func (m *DebugEvent) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *DebugEvent) GetText() string {
+ if m != nil {
+ return m.Text
+ }
+ return ""
+}
+
+func init() {
+ proto.RegisterType((*DebugEvent)(nil), "gvisor.DebugEvent")
+}
+
+func init() { proto.RegisterFile("pkg/eventchannel/event.proto", fileDescriptor_fcfbd51abd9de962) }
+
+var fileDescriptor_fcfbd51abd9de962 = []byte{
+ // 103 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0xc8, 0x4e, 0xd7,
+ 0x4f, 0x2d, 0x4b, 0xcd, 0x2b, 0x49, 0xce, 0x48, 0xcc, 0xcb, 0x4b, 0xcd, 0x81, 0x70, 0xf4, 0x0a,
+ 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0xd8, 0xd2, 0xcb, 0x32, 0x8b, 0xf3, 0x8b, 0x94, 0x4c, 0xb8, 0xb8,
+ 0x5c, 0x52, 0x93, 0x4a, 0xd3, 0x5d, 0x41, 0x72, 0x42, 0x42, 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9,
+ 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0x36, 0x48, 0xac, 0x24, 0xb5, 0xa2, 0x44, 0x82,
+ 0x09, 0x22, 0x06, 0x62, 0x27, 0xb1, 0x81, 0x0d, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x17,
+ 0xee, 0x7f, 0xef, 0x64, 0x00, 0x00, 0x00,
+}
diff --git a/pkg/eventchannel/eventchannel_state_autogen.go b/pkg/eventchannel/eventchannel_state_autogen.go
new file mode 100755
index 000000000..cfd3a5e43
--- /dev/null
+++ b/pkg/eventchannel/eventchannel_state_autogen.go
@@ -0,0 +1,4 @@
+// automatically generated by stateify.
+
+package eventchannel
+