package utils import ( "io" "net" "os" "syscall" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/streamformatter" "github.com/sirupsen/logrus" ) // WriteDistributionProgress is a helper for writing progress from chan to JSON // stream with an optional cancel function. func WriteDistributionProgress(cancelFunc func(), outStream io.Writer, progressChan <-chan progress.Progress) { progressOutput := streamformatter.NewJSONProgressOutput(outStream, false) operationCancelled := false for prog := range progressChan { if err := progressOutput.WriteProgress(prog); err != nil && !operationCancelled { // don't log broken pipe errors as this is the normal case when a client aborts if isBrokenPipe(err) { logrus.Info("Pull session cancelled") } else { logrus.Errorf("error writing progress to client: %v", err) } cancelFunc() operationCancelled = true // Don't return, because we need to continue draining // progressChan until it's closed to avoid a deadlock. } } } func isBrokenPipe(e error) bool { if netErr, ok := e.(*net.OpError); ok { e = netErr.Err if sysErr, ok := netErr.Err.(*os.SyscallError); ok { e = sysErr.Err } } return e == syscall.EPIPE }