From 7368e41c07c21a1e2c6a49abecd1c2fc76404e49 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 5 May 2016 21:45:55 -0700 Subject: [PATCH] Docker pull/push with max concurrency limits. This fix tries to address issues raised in #20936 and #22443 where `docker pull` or `docker push` fails because of the concurrent connection failing. Currently, the number of maximum concurrent connections is controlled by `maxDownloadConcurrency` and `maxUploadConcurrency` which are hardcoded to 3 and 5 respectively. Therefore, in situations where network connections don't support multiple downloads/uploads, failures may encounter for `docker push` or `docker pull`. This fix tries changes `maxDownloadConcurrency` and `maxUploadConcurrency` to adjustable by passing `--max-concurrent-uploads` and `--max-concurrent-downloads` to `docker daemon` command. The documentation related to docker daemon has been updated. Additional test case have been added to cover the changes in this fix. This fix fixes #20936. This fix fixes #22443. Signed-off-by: Yong Tang --- daemon/config.go | 38 +++++++- daemon/daemon.go | 44 ++++++--- distribution/xfer/download.go | 5 ++ distribution/xfer/transfer.go | 11 ++- distribution/xfer/upload.go | 5 ++ docs/reference/commandline/dockerd.md | 6 ++ integration-cli/docker_cli_daemon_test.go | 103 ++++++++++++++++++++++ man/dockerd.8.md | 8 ++ 8 files changed, 207 insertions(+), 13 deletions(-) diff --git a/daemon/config.go b/daemon/config.go index dd11ab7792..2d29dcc6d2 100644 --- a/daemon/config.go +++ b/daemon/config.go @@ -17,6 +17,17 @@ import ( "github.com/imdario/mergo" ) +const ( + // defaultMaxConcurrentDownloads is the default value for + // maximum number of downloads that + // may take place at a time for each pull. + defaultMaxConcurrentDownloads = 3 + // defaultMaxConcurrentUploads is the default value for + // maximum number of uploads that + // may take place at a time for each push. + defaultMaxConcurrentUploads = 5 +) + const ( defaultNetworkMtu = 1500 disableNetworkBridge = "none" @@ -94,6 +105,14 @@ type CommonConfig struct { // reachable by other hosts. ClusterAdvertise string `json:"cluster-advertise,omitempty"` + // MaxConcurrentDownloads is the maximum number of downloads that + // may take place at a time for each pull. + MaxConcurrentDownloads *int `json:"max-concurrent-downloads,omitempty"` + + // MaxConcurrentUploads is the maximum number of uploads that + // may take place at a time for each push. + MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"` + Debug bool `json:"debug,omitempty"` Hosts []string `json:"hosts,omitempty"` LogLevel string `json:"log-level,omitempty"` @@ -116,6 +135,8 @@ type CommonConfig struct { // Subsequent calls to `flag.Parse` will populate config with values parsed // from the command-line. func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string) string) { + var maxConcurrentDownloads, maxConcurrentUploads int + config.ServiceOptions.InstallCliFlags(cmd, usageFn) cmd.Var(opts.NewNamedListOptsRef("storage-opts", &config.GraphOptions, nil), []string{"-storage-opt"}, usageFn("Set storage driver options")) @@ -138,6 +159,11 @@ func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string) cmd.StringVar(&config.ClusterStore, []string{"-cluster-store"}, "", usageFn("Set the cluster store")) cmd.Var(opts.NewNamedMapOpts("cluster-store-opts", config.ClusterOpts, nil), []string{"-cluster-store-opt"}, usageFn("Set cluster store options")) cmd.StringVar(&config.CorsHeaders, []string{"-api-cors-header"}, "", usageFn("Set CORS headers in the remote API")) + cmd.IntVar(&maxConcurrentDownloads, []string{"-max-concurrent-downloads"}, defaultMaxConcurrentDownloads, usageFn("Set the max concurrent downloads for each pull")) + cmd.IntVar(&maxConcurrentUploads, []string{"-max-concurrent-uploads"}, defaultMaxConcurrentUploads, usageFn("Set the max concurrent uploads for each push")) + + config.MaxConcurrentDownloads = &maxConcurrentDownloads + config.MaxConcurrentUploads = &maxConcurrentUploads } // IsValueSet returns true if a configuration value @@ -355,7 +381,8 @@ func findConfigurationConflicts(config map[string]interface{}, flags *flag.FlagS } // validateConfiguration validates some specific configs. -// such as config.DNS, config.Labels, config.DNSSearch +// such as config.DNS, config.Labels, config.DNSSearch, +// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads. func validateConfiguration(config *Config) error { // validate DNS for _, dns := range config.DNS { @@ -378,5 +405,14 @@ func validateConfiguration(config *Config) error { } } + // validate MaxConcurrentDownloads + if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil && *config.MaxConcurrentDownloads < 0 { + return fmt.Errorf("invalid max concurrent downloads: %d", *config.MaxConcurrentDownloads) + } + + // validate MaxConcurrentUploads + if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 { + return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads) + } return nil } diff --git a/daemon/daemon.go b/daemon/daemon.go index 1c7ad12700..6aa1556e54 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -71,15 +71,6 @@ import ( "golang.org/x/net/context" ) -const ( - // maxDownloadConcurrency is the maximum number of downloads that - // may take place at a time for each pull. - maxDownloadConcurrency = 3 - // maxUploadConcurrency is the maximum number of uploads that - // may take place at a time for each push. - maxUploadConcurrency = 5 -) - var ( validContainerNameChars = utils.RestrictedNameChars validContainerNamePattern = utils.RestrictedNamePattern @@ -719,8 +710,10 @@ func NewDaemon(config *Config, registryService *registry.Service, containerdRemo return nil, err } - d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, maxDownloadConcurrency) - d.uploadManager = xfer.NewLayerUploadManager(maxUploadConcurrency) + logrus.Debugf("Max Concurrent Downloads: %d", *config.MaxConcurrentDownloads) + d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, *config.MaxConcurrentDownloads) + logrus.Debugf("Max Concurrent Uploads: %d", *config.MaxConcurrentUploads) + d.uploadManager = xfer.NewLayerUploadManager(*config.MaxConcurrentUploads) ifs, err := image.NewFSStoreBackend(filepath.Join(imageRoot, "imagedb")) if err != nil { @@ -1510,6 +1503,8 @@ func (daemon *Daemon) initDiscovery(config *Config) error { // These are the settings that Reload changes: // - Daemon labels. // - Daemon debug log level. +// - Daemon max concurrent downloads +// - Daemon max concurrent uploads // - Cluster discovery (reconfigure and restart). func (daemon *Daemon) Reload(config *Config) error { daemon.configStore.reloadLock.Lock() @@ -1520,6 +1515,33 @@ func (daemon *Daemon) Reload(config *Config) error { if config.IsValueSet("debug") { daemon.configStore.Debug = config.Debug } + + // If no value is set for max-concurrent-downloads we assume it is the default value + // We always "reset" as the cost is lightweight and easy to maintain. + if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil { + *daemon.configStore.MaxConcurrentDownloads = *config.MaxConcurrentDownloads + } else { + maxConcurrentDownloads := defaultMaxConcurrentDownloads + daemon.configStore.MaxConcurrentDownloads = &maxConcurrentDownloads + } + logrus.Debugf("Reset Max Concurrent Downloads: %d", *daemon.configStore.MaxConcurrentDownloads) + if daemon.downloadManager != nil { + daemon.downloadManager.SetConcurrency(*daemon.configStore.MaxConcurrentDownloads) + } + + // If no value is set for max-concurrent-upload we assume it is the default value + // We always "reset" as the cost is lightweight and easy to maintain. + if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil { + *daemon.configStore.MaxConcurrentUploads = *config.MaxConcurrentUploads + } else { + maxConcurrentUploads := defaultMaxConcurrentUploads + daemon.configStore.MaxConcurrentUploads = &maxConcurrentUploads + } + logrus.Debugf("Reset Max Concurrent Uploads: %d", *daemon.configStore.MaxConcurrentUploads) + if daemon.uploadManager != nil { + daemon.uploadManager.SetConcurrency(*daemon.configStore.MaxConcurrentUploads) + } + return daemon.reloadClusterDiscovery(config) } diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index 739c427c66..67ffc1a988 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -25,6 +25,11 @@ type LayerDownloadManager struct { tm TransferManager } +// SetConcurrency set the max concurrent downloads for each pull +func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) { + ldm.tm.SetConcurrency(concurrency) +} + // NewLayerDownloadManager returns a new LayerDownloadManager. func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager { return &LayerDownloadManager{ diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index dd83f8b8db..14f15660ac 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -279,6 +279,8 @@ type TransferManager interface { // so, it returns progress and error output from that transfer. // Otherwise, it will call xferFunc to initiate the transfer. Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) + // SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload + SetConcurrency(concurrency int) } type transferManager struct { @@ -298,6 +300,13 @@ func NewTransferManager(concurrencyLimit int) TransferManager { } } +// SetConcurrency set the concurrencyLimit +func (tm *transferManager) SetConcurrency(concurrency int) { + tm.mu.Lock() + tm.concurrencyLimit = concurrency + tm.mu.Unlock() +} + // Transfer checks if a transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. @@ -337,7 +346,7 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput start := make(chan struct{}) inactive := make(chan struct{}) - if tm.activeTransfers < tm.concurrencyLimit { + if tm.concurrencyLimit == 0 || tm.activeTransfers < tm.concurrencyLimit { close(start) tm.activeTransfers++ } else { diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index 563824c11d..ad3398369c 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -19,6 +19,11 @@ type LayerUploadManager struct { tm TransferManager } +// SetConcurrency set the max concurrent uploads for each push +func (lum *LayerUploadManager) SetConcurrency(concurrency int) { + lum.tm.SetConcurrency(concurrency) +} + // NewLayerUploadManager returns a new LayerUploadManager. func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager { return &LayerUploadManager{ diff --git a/docs/reference/commandline/dockerd.md b/docs/reference/commandline/dockerd.md index 2ecadbed5c..c3caa10fcd 100644 --- a/docs/reference/commandline/dockerd.md +++ b/docs/reference/commandline/dockerd.md @@ -54,6 +54,8 @@ weight = -1 --log-driver="json-file" Default driver for container logs --log-opt=[] Log driver specific options --mtu=0 Set the containers network MTU + --max-concurrent-downloads=3 Set the max concurrent downloads for each pull + --max-concurrent-uploads=5 Set the max concurrent uploads for each push --disable-legacy-registry Do not contact legacy registries -p, --pidfile="/var/run/docker.pid" Path to use for daemon PID file --raw-logs Full timestamps without ANSI coloring @@ -913,6 +915,8 @@ This is a full example of the allowed configuration options in the file: "cluster-store": "", "cluster-store-opts": [], "cluster-advertise": "", + "max-concurrent-downloads": 3, + "max-concurrent-uploads": 5, "debug": true, "hosts": [], "log-level": "", @@ -963,6 +967,8 @@ The list of currently supported options that can be reconfigured is this: - `cluster-store-opts`: it uses the new options to reload the discovery store. - `cluster-advertise`: it modifies the address advertised after reloading. - `labels`: it replaces the daemon labels with a new set of labels. +- `max-concurrent-downloads`: it updates the max concurrent downloads for each pull. +- `max-concurrent-uploads`: it updates the max concurrent uploads for each push. Updating and reloading the cluster configurations such as `--cluster-store`, `--cluster-advertise` and `--cluster-store-opts` will take effect only if diff --git a/integration-cli/docker_cli_daemon_test.go b/integration-cli/docker_cli_daemon_test.go index 14a2b8eae8..c3fa3edd01 100644 --- a/integration-cli/docker_cli_daemon_test.go +++ b/integration-cli/docker_cli_daemon_test.go @@ -2246,3 +2246,106 @@ func (s *DockerDaemonSuite) TestDaemonLogOptions(c *check.C) { c.Assert(err, check.IsNil, check.Commentf(out)) c.Assert(out, checker.Contains, "{json-file map[]}") } + +// Test case for #20936, #22443 +func (s *DockerDaemonSuite) TestDaemonMaxConcurrency(c *check.C) { + c.Assert(s.d.Start("--max-concurrent-uploads=6", "--max-concurrent-downloads=8"), check.IsNil) + + expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 6"` + expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"` + content, _ := ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) +} + +// Test case for #20936, #22443 +func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFile(c *check.C) { + testRequires(c, SameHostDaemon, DaemonIsLinux) + + // daemon config file + configFilePath := "test.json" + configFile, err := os.Create(configFilePath) + c.Assert(err, checker.IsNil) + defer os.Remove(configFilePath) + + daemonConfig := `{ "max-concurrent-downloads" : 8 }` + fmt.Fprintf(configFile, "%s", daemonConfig) + configFile.Close() + c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil) + + expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"` + expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"` + content, _ := ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) + + configFile, err = os.Create(configFilePath) + c.Assert(err, checker.IsNil) + daemonConfig = `{ "max-concurrent-uploads" : 7, "max-concurrent-downloads" : 9 }` + fmt.Fprintf(configFile, "%s", daemonConfig) + configFile.Close() + + syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP) + + time.Sleep(3 * time.Second) + + expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 7"` + expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 9"` + content, _ = ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) +} + +// Test case for #20936, #22443 +func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFileReload(c *check.C) { + testRequires(c, SameHostDaemon, DaemonIsLinux) + + // daemon config file + configFilePath := "test.json" + configFile, err := os.Create(configFilePath) + c.Assert(err, checker.IsNil) + defer os.Remove(configFilePath) + + daemonConfig := `{ "max-concurrent-uploads" : null }` + fmt.Fprintf(configFile, "%s", daemonConfig) + configFile.Close() + c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil) + + expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"` + expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 3"` + content, _ := ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) + + configFile, err = os.Create(configFilePath) + c.Assert(err, checker.IsNil) + daemonConfig = `{ "max-concurrent-uploads" : 1, "max-concurrent-downloads" : null }` + fmt.Fprintf(configFile, "%s", daemonConfig) + configFile.Close() + + syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP) + + time.Sleep(3 * time.Second) + + expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 1"` + expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"` + content, _ = ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) + + configFile, err = os.Create(configFilePath) + c.Assert(err, checker.IsNil) + daemonConfig = `{ "labels":["foo=bar"] }` + fmt.Fprintf(configFile, "%s", daemonConfig) + configFile.Close() + + syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP) + + time.Sleep(3 * time.Second) + + expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 5"` + expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"` + content, _ = ioutil.ReadFile(s.d.logFile.Name()) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads) + c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads) +} diff --git a/man/dockerd.8.md b/man/dockerd.8.md index 2c381fb0f7..122609ee19 100644 --- a/man/dockerd.8.md +++ b/man/dockerd.8.md @@ -44,6 +44,8 @@ dockerd - Enable daemon mode [**--log-driver**[=*json-file*]] [**--log-opt**[=*map[]*]] [**--mtu**[=*0*]] +[**--max-concurrent-downloads**[=*3*]] +[**--max-concurrent-uploads**[=*5*]] [**-p**|**--pidfile**[=*/var/run/docker.pid*]] [**--raw-logs**] [**--registry-mirror**[=*[]*]] @@ -197,6 +199,12 @@ unix://[/path/to/socket] to use. **--mtu**=*0* Set the containers network mtu. Default is `0`. +**--max-concurrent-downloads**=*3* + Set the max concurrent downloads for each pull. Default is `3`. + +**--max-concurrent-uploads**=*5* + Set the max concurrent uploads for each push. Default is `5`. + **-p**, **--pidfile**="" Path to use for daemon PID file. Default is `/var/run/docker.pid`