Merge pull request #27405 from tonistiigi/fix-fifo2

Fix issues with fifos blocking on open
This commit is contained in:
Sebastiaan van Stijn 2016-10-21 09:44:33 -07:00 committed by GitHub
commit 32b541e443
14 changed files with 520 additions and 88 deletions

View File

@ -144,6 +144,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
# containerd
clone git github.com/docker/containerd 52ef1ceb4b660c42cf4ea9013180a5663968d4c7
clone git github.com/tonistiigi/fifo 8c56881ce5e63e19e2dfc495c8af0fb90916467d
# cluster
clone git github.com/docker/swarmkit 3b221eb0391d34ae0b9dac65df02b5b64de6dff2

View File

@ -13,6 +13,7 @@ import (
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/mount"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
@ -100,12 +101,26 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
return -1, err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
if err2 := p.sendCloseStdin(); err == nil {
err = err2
}
})
return err
})
container.processes[processFriendlyName] = p
clnt.unlock(containerID)
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
p.closeFifos(iopipe)
return -1, err
}
clnt.lock(containerID)
@ -420,8 +435,18 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
if err != nil {
return err
}
var stdinOnce sync.Once
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
})
return err
})
if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
container.closeFifos(iopipe)
return err
}
@ -434,6 +459,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
}})
if err != nil {
container.closeFifos(iopipe)
return err
}

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
@ -322,10 +323,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = openReaderFromPipe(stdout)
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = openReaderFromPipe(stderr)
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
proc := &process{

View File

@ -6,12 +6,15 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
@ -90,22 +93,37 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error {
if err != nil {
return nil
}
createChan := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ready := make(chan struct{})
iopipe, err := ctr.openFifos(spec.Process.Terminal)
if err != nil {
return err
}
var stdinOnce sync.Once
// we need to delay stdin closure after container start or else "stdin close"
// event will be rejected by containerd.
// stdin closure happens in AttachStreams
stdin := iopipe.Stdin
iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
go func() {
<-createChan
stdin.Close()
}()
return nil
var err error
stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
err = stdin.Close()
go func() {
select {
case <-ready:
if err := ctr.sendCloseStdin(); err != nil {
logrus.Warnf("failed to close stdin: %+v")
}
case <-ctx.Done():
}
}()
})
return err
})
r := &containerd.CreateContainerRequest{
@ -124,19 +142,17 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error {
ctr.client.appendContainer(ctr)
if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
close(createChan)
ctr.closeFifos(iopipe)
return err
}
resp, err := ctr.client.remote.apiClient.CreateContainer(context.Background(), r)
if err != nil {
close(createChan)
ctr.closeFifos(iopipe)
return err
}
ctr.systemPid = systemPid(resp.Container)
close(createChan)
close(ready)
return ctr.client.backend.StateChanged(ctr.containerID, StateInfo{
CommonStateInfo: CommonStateInfo{
@ -207,15 +223,15 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
// discardFifos attempts to fully read the container fifos to unblock processes
// that may be blocked on the writer side.
func (ctr *container) discardFifos() {
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
for _, i := range []int{syscall.Stdout, syscall.Stderr} {
f := ctr.fifo(i)
c := make(chan struct{})
f, err := fifo.OpenFifo(ctx, ctr.fifo(i), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
logrus.Warnf("error opening fifo %v for discarding: %+v", f, err)
continue
}
go func() {
r := openReaderFromFifo(f)
close(c) // this channel is used to not close the writer too early, before readonly open has been called.
io.Copy(ioutil.Discard, r)
io.Copy(ioutil.Discard, f)
}()
<-c
closeReaderFifo(f) // avoid blocking permanently on open if there is no writer side
}
}

View File

@ -3,6 +3,7 @@ package libcontainerd
import (
"fmt"
"io"
"io/ioutil"
"strings"
"syscall"
"time"
@ -131,10 +132,10 @@ func (ctr *container) start() error {
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = openReaderFromPipe(stdout)
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = openReaderFromPipe(stderr)
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
// Save the PID

View File

@ -1,14 +1,15 @@
package libcontainerd
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/tonistiigi/fifo"
"golang.org/x/net/context"
)
@ -26,49 +27,67 @@ type process struct {
dir string
}
func (p *process) openFifos(terminal bool) (*IOPipe, error) {
bundleDir := p.dir
if err := os.MkdirAll(bundleDir, 0700); err != nil {
func (p *process) openFifos(terminal bool) (pipe *IOPipe, err error) {
if err := os.MkdirAll(p.dir, 0700); err != nil {
return nil, err
}
for i := 0; i < 3; i++ {
f := p.fifo(i)
if err := syscall.Mkfifo(f, 0700); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("mkfifo: %s %v", f, err)
}
}
ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)
io := &IOPipe{}
stdinf, err := os.OpenFile(p.fifo(syscall.Stdin), syscall.O_RDWR, 0)
io.Stdin, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdin), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
io.Stdout = openReaderFromFifo(p.fifo(syscall.Stdout))
if !terminal {
io.Stderr = openReaderFromFifo(p.fifo(syscall.Stderr))
} else {
io.Stderr = emptyReader{}
defer func() {
if err != nil {
io.Stdin.Close()
}
}()
io.Stdout, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stdout), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
io.Stdin = ioutils.NewWriteCloserWrapper(stdinf, func() error {
stdinf.Close()
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
})
defer func() {
if err != nil {
io.Stdout.Close()
}
}()
if !terminal {
io.Stderr, err = fifo.OpenFifo(ctx, p.fifo(syscall.Stderr), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
io.Stderr.Close()
}
}()
} else {
io.Stderr = ioutil.NopCloser(emptyReader{})
}
return io, nil
}
func (p *process) sendCloseStdin() error {
_, err := p.client.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
Id: p.containerID,
Pid: p.friendlyName,
CloseStdin: true,
})
return err
}
func (p *process) closeFifos(io *IOPipe) {
io.Stdin.Close()
closeReaderFifo(p.fifo(syscall.Stdout))
closeReaderFifo(p.fifo(syscall.Stderr))
io.Stdout.Close()
io.Stderr.Close()
}
type emptyReader struct{}
@ -77,34 +96,6 @@ func (r emptyReader) Read(b []byte) (int, error) {
return 0, io.EOF
}
func openReaderFromFifo(fn string) io.Reader {
r, w := io.Pipe()
c := make(chan struct{})
go func() {
close(c)
stdoutf, err := os.OpenFile(fn, syscall.O_RDONLY, 0)
if err != nil {
r.CloseWithError(err)
}
if _, err := io.Copy(w, stdoutf); err != nil {
r.CloseWithError(err)
}
w.Close()
stdoutf.Close()
}()
<-c // wait for the goroutine to get scheduled and syscall to block
return r
}
// closeReaderFifo closes fifo that may be blocked on open by opening the write side.
func closeReaderFifo(fn string) {
f, err := os.OpenFile(fn, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return
}
f.Close()
}
func (p *process) fifo(index int) string {
return filepath.Join(p.dir, p.friendlyName+"-"+fdNames[index])
}

View File

@ -2,6 +2,7 @@ package libcontainerd
import (
"io"
"sync"
"github.com/Microsoft/hcsshim"
"github.com/docker/docker/pkg/ioutils"
@ -18,16 +19,17 @@ type process struct {
hcsProcess hcsshim.Process
}
func openReaderFromPipe(p io.ReadCloser) io.Reader {
r, w := io.Pipe()
go func() {
if _, err := io.Copy(w, p); err != nil {
r.CloseWithError(err)
}
w.Close()
p.Close()
}()
return r
type autoClosingReader struct {
io.ReadCloser
sync.Once
}
func (r *autoClosingReader) Read(b []byte) (n int, err error) {
n, err = r.ReadCloser.Read(b)
if err == io.EOF {
r.Once.Do(func() { r.ReadCloser.Close() })
}
return
}
func createStdInCloser(pipe io.WriteCloser, process hcsshim.Process) io.WriteCloser {

View File

@ -61,7 +61,7 @@ type CreateOption interface {
// IOPipe contains the stdio streams.
type IOPipe struct {
Stdin io.WriteCloser
Stdout io.Reader
Stderr io.Reader
Stdout io.ReadCloser
Stderr io.ReadCloser
Terminal bool // Whether stderr is connected on Windows
}

View File

@ -0,0 +1,21 @@
MIT
Copyright (C) 2016 Tõnis Tiigi <tonistiigi@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,13 @@
.PHONY: fmt vet test deps
test: deps
go test -v ./...
deps:
go get -d -t ./...
fmt:
gofmt -s -l .
vet:
go vet ./...

View File

@ -0,0 +1,215 @@
package fifo
import (
"context"
"io"
"os"
"runtime"
"sync"
"syscall"
"github.com/pkg/errors"
)
type fifo struct {
flag int
opened chan struct{}
closed chan struct{}
closing chan struct{}
err error
file *os.File
closingOnce sync.Once // close has been called
closedOnce sync.Once // fifo is closed
handle *handle
}
var leakCheckWg *sync.WaitGroup
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
// Context can be used to cancel this function until open(2) has not returned.
// Accepted flags:
// - syscall.O_CREAT - create new fifo if one doesn't exist
// - syscall.O_RDONLY - open fifo only from reader side
// - syscall.O_WRONLY - open fifo only from writer side
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
// fifo isn't open. read/write will be connected after the actual fifo is
// open or after fifo is closed.
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
if _, err := os.Stat(fn); err != nil {
if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
if err := syscall.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
return nil, errors.Wrapf(err, "error creating fifo %v", fn)
}
} else {
return nil, err
}
}
block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
flag &= ^syscall.O_CREAT
flag &= ^syscall.O_NONBLOCK
h, err := getHandle(fn)
if err != nil {
return nil, err
}
f := &fifo{
handle: h,
flag: flag,
opened: make(chan struct{}),
closed: make(chan struct{}),
closing: make(chan struct{}),
}
wg := leakCheckWg
if wg != nil {
wg.Add(2)
}
go func() {
if wg != nil {
defer wg.Done()
}
select {
case <-ctx.Done():
f.Close()
case <-f.opened:
case <-f.closed:
}
}()
go func() {
if wg != nil {
defer wg.Done()
}
var file *os.File
fn, err := h.Path()
if err == nil {
file, err = os.OpenFile(fn, flag, 0)
}
select {
case <-f.closing:
if err == nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = errors.Errorf("fifo %v was closed before opening", fn)
}
if file != nil {
file.Close()
}
}
default:
}
if err != nil {
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
return
}
f.file = file
close(f.opened)
}()
if block {
select {
case <-f.opened:
case <-f.closed:
return nil, f.err
}
}
return f, nil
}
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error) {
if f.flag&syscall.O_WRONLY > 0 {
return 0, errors.New("reading from write-only fifo")
}
select {
case <-f.opened:
return f.file.Read(b)
default:
}
select {
case <-f.opened:
return f.file.Read(b)
case <-f.closed:
return 0, errors.New("reading from a closed fifo")
}
}
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error) {
if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
return 0, errors.New("writing to read-only fifo")
}
select {
case <-f.opened:
return f.file.Write(b)
default:
}
select {
case <-f.opened:
return f.file.Write(b)
case <-f.closed:
return 0, errors.New("writing to a closed fifo")
}
}
// Close the fifo. Next reads/writes will error. This method can also be used
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() error {
for {
select {
case <-f.closed:
f.handle.Close()
return f.err
default:
select {
case <-f.opened:
f.closedOnce.Do(func() {
f.err = f.file.Close()
close(f.closed)
})
default:
if f.flag&syscall.O_RDWR != 0 {
runtime.Gosched()
break
}
f.closingOnce.Do(func() {
close(f.closing)
})
reverseMode := syscall.O_WRONLY
if f.flag&syscall.O_WRONLY > 0 {
reverseMode = syscall.O_RDONLY
}
fn, err := f.handle.Path()
// if Close() is called concurrently(shouldn't) it may cause error
// because handle is closed
select {
case <-f.closed:
default:
if err != nil {
// Path has become invalid. We will leak a goroutine.
// This case should not happen in linux.
f.closedOnce.Do(func() {
f.err = err
close(f.closed)
})
<-f.closed
break
}
f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
if err == nil {
f.Close()
}
runtime.Gosched()
}
}
}
}
}

View File

@ -0,0 +1,70 @@
// +build linux
package fifo
import (
"fmt"
"os"
"sync"
"syscall"
"github.com/pkg/errors"
)
const O_PATH = 010000000
type handle struct {
f *os.File
dev uint64
ino uint64
closeOnce sync.Once
}
func getHandle(fn string) (*handle, error) {
f, err := os.OpenFile(fn, O_PATH, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to open %v with O_PATH", fn)
}
var stat syscall.Stat_t
if err := syscall.Fstat(int(f.Fd()), &stat); err != nil {
f.Close()
return nil, errors.Wrapf(err, "failed to stat handle %v", f.Fd())
}
h := &handle{
f: f,
dev: stat.Dev,
ino: stat.Ino,
}
// check /proc just in case
if _, err := os.Stat(h.procPath()); err != nil {
f.Close()
return nil, errors.Wrapf(err, "couldn't stat %v", h.procPath())
}
return h, nil
}
func (h *handle) procPath() string {
return fmt.Sprintf("/proc/self/fd/%d", h.f.Fd())
}
func (h *handle) Path() (string, error) {
var stat syscall.Stat_t
if err := syscall.Stat(h.procPath(), &stat); err != nil {
return "", errors.Wrapf(err, "path %v could not be statted", h.procPath())
}
if stat.Dev != h.dev || stat.Ino != h.ino {
return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino)
}
return h.procPath(), nil
}
func (h *handle) Close() error {
h.closeOnce.Do(func() {
h.f.Close()
})
return nil
}

View File

@ -0,0 +1,45 @@
// +build !linux
package fifo
import (
"syscall"
"github.com/pkg/errors"
)
type handle struct {
fn string
dev uint64
ino uint64
}
func getHandle(fn string) (*handle, error) {
var stat syscall.Stat_t
if err := syscall.Stat(fn, &stat); err != nil {
return nil, errors.Wrapf(err, "failed to stat %v", fn)
}
h := &handle{
fn: fn,
dev: uint64(stat.Dev),
ino: stat.Ino,
}
return h, nil
}
func (h *handle) Path() (string, error) {
var stat syscall.Stat_t
if err := syscall.Stat(h.fn, &stat); err != nil {
return "", errors.Wrapf(err, "path %v could not be statted", h.fn)
}
if uint64(stat.Dev) != h.dev || stat.Ino != h.ino {
return "", errors.Errorf("failed to verify handle %v/%v %v/%v", stat.Dev, h.dev, stat.Ino, h.ino)
}
return h.fn, nil
}
func (h *handle) Close() error {
return nil
}

View File

@ -0,0 +1,30 @@
### fifo
Go package for handling fifos in a sane way.
```
// OpenFifo opens a fifo. Returns io.ReadWriteCloser.
// Context can be used to cancel this function until open(2) has not returned.
// Accepted flags:
// - syscall.O_CREAT - create new fifo if one doesn't exist
// - syscall.O_RDONLY - open fifo only from reader side
// - syscall.O_WRONLY - open fifo only from writer side
// - syscall.O_RDWR - open fifo from both sides, never block on syscall level
// - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
// fifo isn't open. read/write will be connected after the actual fifo is
// open or after fifo is closed.
func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
// Read from a fifo to a byte array.
func (f *fifo) Read(b []byte) (int, error)
// Write from byte array to a fifo.
func (f *fifo) Write(b []byte) (int, error)
// Close the fifo. Next reads/writes will error. This method can also be used
// before open(2) has returned and fifo was never opened.
func (f *fifo) Close() error
```