pkg/package: remove promise package

The promise package represents a simple enough concurrency pattern that
replicating it in place is sufficient. To end the propagation of this
package, it has been removed and the uses have been inlined.

While this code could likely be refactored to be simpler without the
package, the changes have been minimized to reduce the possibility of
defects. Someone else may want to do further refactoring to remove
closures and reduce the number of goroutines in use.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-09-21 17:56:45 -07:00
parent 777d4a1bf4
commit 0cd4ab3f9a
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
5 changed files with 103 additions and 124 deletions

View File

@ -7,7 +7,6 @@ import (
"golang.org/x/net/context"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/term"
"github.com/sirupsen/logrus"
)
@ -58,7 +57,7 @@ func (c *Config) AttachStreams(cfg *AttachConfig) {
}
// CopyStreams starts goroutines to copy data in and out to/from the container
func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error {
func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan error {
var (
wg sync.WaitGroup
errors = make(chan error, 3)
@ -137,35 +136,42 @@ func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) chan error
go attachStream("stdout", cfg.Stdout, cfg.CStdout)
go attachStream("stderr", cfg.Stderr, cfg.CStderr)
return promise.Go(func() error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
errs := make(chan error, 1)
go func() {
defer close(errs)
errs <- func() error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
<-done
}
close(errors)
for err := range errors {
if err != nil {
return err
}
}
return nil
}()
select {
case <-done:
case <-ctx.Done():
// close all pipes
if cfg.CStdin != nil {
cfg.CStdin.Close()
}
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
<-done
}
close(errors)
for err := range errors {
if err != nil {
return err
}
}
return nil
})
}()
return errs
}
func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64, err error) {

View File

@ -20,7 +20,6 @@ import (
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/system"
"github.com/sirupsen/logrus"
)
@ -1095,36 +1094,42 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) {
}
r, w := io.Pipe()
errC := promise.Go(func() error {
defer w.Close()
errC := make(chan error, 1)
srcF, err := os.Open(src)
if err != nil {
return err
}
defer srcF.Close()
go func() {
defer close(errC)
hdr, err := tar.FileInfoHeader(srcSt, "")
if err != nil {
return err
}
hdr.Name = filepath.Base(dst)
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
errC <- func() error {
defer w.Close()
if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil {
return err
}
srcF, err := os.Open(src)
if err != nil {
return err
}
defer srcF.Close()
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := io.Copy(tw, srcF); err != nil {
return err
}
return nil
})
hdr, err := tar.FileInfoHeader(srcSt, "")
if err != nil {
return err
}
hdr.Name = filepath.Base(dst)
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil {
return err
}
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := io.Copy(tw, srcF); err != nil {
return err
}
return nil
}()
}()
defer func() {
if er := <-errC; err == nil && er != nil {
err = er

View File

@ -9,7 +9,6 @@ import (
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/system"
"github.com/sirupsen/logrus"
)
@ -122,40 +121,45 @@ func (archiver *Archiver) CopyFileWithTar(src, dst string) (err error) {
}
r, w := io.Pipe()
errC := promise.Go(func() error {
defer w.Close()
errC := make(chan error, 1)
srcF, err := srcDriver.Open(src)
if err != nil {
return err
}
defer srcF.Close()
go func() {
defer close(errC)
errC <- func() error {
defer w.Close()
hdr, err := tar.FileInfoHeader(srcSt, "")
if err != nil {
return err
}
hdr.Name = dstDriver.Base(dst)
if dstDriver.OS() == "windows" {
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
} else {
hdr.Mode = int64(os.FileMode(hdr.Mode))
}
srcF, err := srcDriver.Open(src)
if err != nil {
return err
}
defer srcF.Close()
if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil {
return err
}
hdr, err := tar.FileInfoHeader(srcSt, "")
if err != nil {
return err
}
hdr.Name = dstDriver.Base(dst)
if dstDriver.OS() == "windows" {
hdr.Mode = int64(chmodTarEntry(os.FileMode(hdr.Mode)))
} else {
hdr.Mode = int64(os.FileMode(hdr.Mode))
}
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := io.Copy(tw, srcF); err != nil {
return err
}
return nil
})
if err := remapIDs(archiver.IDMappingsVar, hdr); err != nil {
return err
}
tw := tar.NewWriter(w)
defer tw.Close()
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := io.Copy(tw, srcF); err != nil {
return err
}
return nil
}()
}()
defer func() {
if er := <-errC; err == nil && er != nil {
err = er

View File

@ -1,11 +0,0 @@
package promise
// Go is a basic promise implementation: it wraps calls a function in a goroutine,
// and returns a channel which will later return the function's return value.
func Go(f func() error) chan error {
ch := make(chan error, 1)
go func() {
ch <- f()
}()
return ch
}

View File

@ -1,25 +0,0 @@
package promise
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
)
func TestGo(t *testing.T) {
errCh := Go(functionWithError)
er := <-errCh
require.EqualValues(t, "Error Occurred", er.Error())
noErrCh := Go(functionWithNoError)
er = <-noErrCh
require.Nil(t, er)
}
func functionWithError() (err error) {
return errors.New("Error Occurred")
}
func functionWithNoError() (err error) {
return nil
}