// Copyright 2018 The containerd Authors. // Copyright 2018 The gVisor Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package proc import ( "context" "fmt" "io" "os" "sync" "sync/atomic" "syscall" "github.com/containerd/containerd/log" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" ) // TODO(random-liu): This file can be a util. var bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 32<<10) return &buffer }, } func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error { var sameFile *countingWriteCloser for _, i := range []struct { name string dest func(wc io.WriteCloser, rc io.Closer) }{ { name: stdout, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) cwg.Add(1) go func() { cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stdout(), *p); err != nil { log.G(ctx).Warn("error copying stdout") } wg.Done() wc.Close() if rc != nil { rc.Close() } }() }, }, { name: stderr, dest: func(wc io.WriteCloser, rc io.Closer) { wg.Add(1) cwg.Add(1) go func() { cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) if _, err := io.CopyBuffer(wc, rio.Stderr(), *p); err != nil { log.G(ctx).Warn("error copying stderr") } wg.Done() wc.Close() if rc != nil { rc.Close() } }() }, }, } { ok, err := isFifo(i.name) if err != nil { return err } var ( fw io.WriteCloser fr io.Closer ) if ok { if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) } if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) } } else { if sameFile != nil { sameFile.count++ i.dest(sameFile, nil) continue } if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", i.name, err) } if stdout == stderr { sameFile = &countingWriteCloser{ WriteCloser: fw, count: 1, } } } i.dest(fw, fr) } if stdin == "" { return nil } f, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0) if err != nil { return fmt.Errorf("gvisor-containerd-shim: opening %s failed: %s", stdin, err) } cwg.Add(1) go func() { cwg.Done() p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(rio.Stdin(), f, *p) rio.Stdin().Close() f.Close() }() return nil } // countingWriteCloser masks io.Closer() until close has been invoked a certain number of times. type countingWriteCloser struct { io.WriteCloser count int64 } func (c *countingWriteCloser) Close() error { if atomic.AddInt64(&c.count, -1) > 0 { return nil } return c.WriteCloser.Close() } // isFifo checks if a file is a fifo. // // If the file does not exist then it returns false. func isFifo(path string) (bool, error) { stat, err := os.Stat(path) if err != nil { if os.IsNotExist(err) { return false, nil } return false, err } if stat.Mode()&os.ModeNamedPipe == os.ModeNamedPipe { return true, nil } return false, nil }