From 0cd4ab3f9a3f242468484fc62b46e632fdba5e13 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 21 Sep 2017 17:56:45 -0700 Subject: [PATCH] pkg/package: remove promise package The promise package represents a simple enough concurrency pattern that replicating it in place is sufficient. To end the propagation of this package, it has been removed and the uses have been inlined. While this code could likely be refactored to be simpler without the package, the changes have been minimized to reduce the possibility of defects. Someone else may want to do further refactoring to remove closures and reduce the number of goroutines in use. Signed-off-by: Stephen J Day --- container/stream/attach.go | 66 ++++++++++++++++++++----------------- pkg/archive/archive.go | 59 ++++++++++++++++++--------------- pkg/containerfs/archiver.go | 66 ++++++++++++++++++++----------------- pkg/promise/promise.go | 11 ------- pkg/promise/promise_test.go | 25 -------------- 5 files changed, 103 insertions(+), 124 deletions(-) delete mode 100644 pkg/promise/promise.go delete mode 100644 pkg/promise/promise_test.go diff --git a/container/stream/attach.go b/container/stream/attach.go index 24b68863d7..4b4a4541e6 100644 --- a/container/stream/attach.go +++ b/container/stream/attach.go @@ -7,7 +7,6 @@ import ( "golang.org/x/net/context" "github.com/docker/docker/pkg/pools" - "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/term" "github.com/sirupsen/logrus" ) @@ -58,7 +57,7 @@ func (c *Config) AttachStreams(cfg *AttachConfig) { } // CopyStreams starts goroutines to copy data in and out to/from the container -func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error { +func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error { var ( wg sync.WaitGroup errors = make(chan error, 3) @@ -137,35 +136,42 @@ func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error go attachStream("stdout", cfg.Stdout, cfg.CStdout) go attachStream("stderr", cfg.Stderr, cfg.CStderr) - return promise.Go(func() error { - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) + errs := make(chan error, 1) + + go func() { + defer close(errs) + errs <- func() error { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + // close all pipes + if cfg.CStdin != nil { + cfg.CStdin.Close() + } + if cfg.CStdout != nil { + cfg.CStdout.Close() + } + if cfg.CStderr != nil { + cfg.CStderr.Close() + } + <-done + } + close(errors) + for err := range errors { + if err != nil { + return err + } + } + return nil }() - select { - case <-done: - case <-ctx.Done(): - // close all pipes - if cfg.CStdin != nil { - cfg.CStdin.Close() - } - if cfg.CStdout != nil { - cfg.CStdout.Close() - } - if cfg.CStderr != nil { - cfg.CStderr.Close() - } - <-done - } - close(errors) - for err := range errors { - if err != nil { - return err - } - } - return nil - }) + }() + + return errs } func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 876e605680..aa55637565 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -20,7 +20,6 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" - "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/system" "github.com/sirupsen/logrus" ) @@ -1095,36 +1094,42 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) { } r, w := io.Pipe() - errC := promise.Go(func() error { - defer w.Close() + errC := make(chan error, 1) - srcF, err := os.Open(src) - if err != nil { - return err - } - defer srcF.Close() + go func() { + defer close(errC) - hdr, err := tar.FileInfoHeader(srcSt, "") - if err != nil { - return err - } - hdr.Name = filepath.Base(dst) - hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode))) + errC <- func() error { + defer w.Close() - if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil { - return err - } + srcF, err := os.Open(src) + if err != nil { + return err + } + defer srcF.Close() - tw := tar.NewWriter(w) - defer tw.Close() - if err := tw.WriteHeader(hdr); err != nil { - return err - } - if _, err := io.Copy(tw, srcF); err != nil { - return err - } - return nil - }) + hdr, err := tar.FileInfoHeader(srcSt, "") + if err != nil { + return err + } + hdr.Name = filepath.Base(dst) + hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode))) + + if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil { + return err + } + + tw := tar.NewWriter(w) + defer tw.Close() + if err := tw.WriteHeader(hdr); err != nil { + return err + } + if _, err := io.Copy(tw, srcF); err != nil { + return err + } + return nil + }() + }() defer func() { if er := <-errC; err == nil && er != nil { err = er diff --git a/pkg/containerfs/archiver.go b/pkg/containerfs/archiver.go index 7fffa00036..3eeab49912 100644 --- a/pkg/containerfs/archiver.go +++ b/pkg/containerfs/archiver.go @@ -9,7 +9,6 @@ import ( "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/idtools" - "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/system" "github.com/sirupsen/logrus" ) @@ -122,40 +121,45 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) { } r, w := io.Pipe() - errC := promise.Go(func() error { - defer w.Close() + errC := make(chan error, 1) - srcF, err := srcDriver.Open(src) - if err != nil { - return err - } - defer srcF.Close() + go func() { + defer close(errC) + errC <- func() error { + defer w.Close() - hdr, err := tar.FileInfoHeader(srcSt, "") - if err != nil { - return err - } - hdr.Name = dstDriver.Base(dst) - if dstDriver.OS() == "windows" { - hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode))) - } else { - hdr.Mode = int64(os.FileMode(hdr.Mode)) - } + srcF, err := srcDriver.Open(src) + if err != nil { + return err + } + defer srcF.Close() - if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil { - return err - } + hdr, err := tar.FileInfoHeader(srcSt, "") + if err != nil { + return err + } + hdr.Name = dstDriver.Base(dst) + if dstDriver.OS() == "windows" { + hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode))) + } else { + hdr.Mode = int64(os.FileMode(hdr.Mode)) + } - tw := tar.NewWriter(w) - defer tw.Close() - if err := tw.WriteHeader(hdr); err != nil { - return err - } - if _, err := io.Copy(tw, srcF); err != nil { - return err - } - return nil - }) + if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil { + return err + } + + tw := tar.NewWriter(w) + defer tw.Close() + if err := tw.WriteHeader(hdr); err != nil { + return err + } + if _, err := io.Copy(tw, srcF); err != nil { + return err + } + return nil + }() + }() defer func() { if er := <-errC; err == nil && er != nil { err = er diff --git a/pkg/promise/promise.go b/pkg/promise/promise.go deleted file mode 100644 index dd52b9082f..0000000000 --- a/pkg/promise/promise.go +++ /dev/null @@ -1,11 +0,0 @@ -package promise - -// Go is a basic promise implementation: it wraps calls a function in a goroutine, -// and returns a channel which will later return the function's return value. -func Go(f func() error) chan error { - ch := make(chan error, 1) - go func() { - ch <- f() - }() - return ch -} diff --git a/pkg/promise/promise_test.go b/pkg/promise/promise_test.go deleted file mode 100644 index 287213b504..0000000000 --- a/pkg/promise/promise_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package promise - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGo(t *testing.T) { - errCh := Go(functionWithError) - er := <-errCh - require.EqualValues(t, "Error Occurred", er.Error()) - - noErrCh := Go(functionWithNoError) - er = <-noErrCh - require.Nil(t, er) -} - -func functionWithError() (err error) { - return errors.New("Error Occurred") -} -func functionWithNoError() (err error) { - return nil -}