package fifo import ( "io" "os" "runtime" "sync" "syscall" "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/sys/unix" ) type fifo struct { flag int opened chan struct{} closed chan struct{} closing chan struct{} err error file *os.File closingOnce sync.Once // close has been called closedOnce sync.Once // fifo is closed handle *handle } var leakCheckWg *sync.WaitGroup // OpenFifo opens a fifo. Returns io.ReadWriteCloser. // Context can be used to cancel this function until open(2) has not returned. // Accepted flags: // - syscall.O_CREAT - create new fifo if one doesn't exist // - syscall.O_RDONLY - open fifo only from reader side // - syscall.O_WRONLY - open fifo only from writer side // - syscall.O_RDWR - open fifo from both sides, never block on syscall level // - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the // fifo isn't open. read/write will be connected after the actual fifo is // open or after fifo is closed. func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { if _, err := os.Stat(fn); err != nil { if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 { if err := unix.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) { return nil, errors.Wrapf(err, "error creating fifo %v", fn) } } else { return nil, err } } block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0 flag &= ^syscall.O_CREAT flag &= ^syscall.O_NONBLOCK h, err := getHandle(fn) if err != nil { return nil, err } f := &fifo{ handle: h, flag: flag, opened: make(chan struct{}), closed: make(chan struct{}), closing: make(chan struct{}), } wg := leakCheckWg if wg != nil { wg.Add(2) } go func() { if wg != nil { defer wg.Done() } select { case <-ctx.Done(): f.Close() case <-f.opened: case <-f.closed: } }() go func() { if wg != nil { defer wg.Done() } var file *os.File fn, err := h.Path() if err == nil { file, err = os.OpenFile(fn, flag, 0) } select { case <-f.closing: if err == nil { select { case <-ctx.Done(): err = ctx.Err() default: err = errors.Errorf("fifo %v was closed before opening", h.Name()) } if file != nil { file.Close() } } default: } if err != nil { f.closedOnce.Do(func() { f.err = err close(f.closed) }) return } f.file = file close(f.opened) }() if block { select { case <-f.opened: case <-f.closed: return nil, f.err } } return f, nil } // Read from a fifo to a byte array. func (f *fifo) Read(b []byte) (int, error) { if f.flag&syscall.O_WRONLY > 0 { return 0, errors.New("reading from write-only fifo") } select { case <-f.opened: return f.file.Read(b) default: } select { case <-f.opened: return f.file.Read(b) case <-f.closed: return 0, errors.New("reading from a closed fifo") } } // Write from byte array to a fifo. func (f *fifo) Write(b []byte) (int, error) { if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 { return 0, errors.New("writing to read-only fifo") } select { case <-f.opened: return f.file.Write(b) default: } select { case <-f.opened: return f.file.Write(b) case <-f.closed: return 0, errors.New("writing to a closed fifo") } } // Close the fifo. Next reads/writes will error. This method can also be used // before open(2) has returned and fifo was never opened. func (f *fifo) Close() (retErr error) { for { select { case <-f.closed: f.handle.Close() return default: select { case <-f.opened: f.closedOnce.Do(func() { retErr = f.file.Close() f.err = retErr close(f.closed) }) default: if f.flag&syscall.O_RDWR != 0 { runtime.Gosched() break } f.closingOnce.Do(func() { close(f.closing) }) reverseMode := syscall.O_WRONLY if f.flag&syscall.O_WRONLY > 0 { reverseMode = syscall.O_RDONLY } fn, err := f.handle.Path() // if Close() is called concurrently(shouldn't) it may cause error // because handle is closed select { case <-f.closed: default: if err != nil { // Path has become invalid. We will leak a goroutine. // This case should not happen in linux. f.closedOnce.Do(func() { f.err = err close(f.closed) }) <-f.closed break } f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0) if err == nil { f.Close() } runtime.Gosched() } } } } }