1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #22445 from yongtang/20936-22443-concurrent-connection

Docker pull/push with max concurrency limits.
This commit is contained in:
Arnaud Porterie 2016-05-12 08:51:28 -07:00
commit e9117578a7
8 changed files with 207 additions and 13 deletions

View file

@ -17,6 +17,17 @@ import (
"github.com/imdario/mergo"
)
const (
// defaultMaxConcurrentDownloads is the default value for
// maximum number of downloads that
// may take place at a time for each pull.
defaultMaxConcurrentDownloads = 3
// defaultMaxConcurrentUploads is the default value for
// maximum number of uploads that
// may take place at a time for each push.
defaultMaxConcurrentUploads = 5
)
const (
defaultNetworkMtu = 1500
disableNetworkBridge = "none"
@ -94,6 +105,14 @@ type CommonConfig struct {
// reachable by other hosts.
ClusterAdvertise string `json:"cluster-advertise,omitempty"`
// MaxConcurrentDownloads is the maximum number of downloads that
// may take place at a time for each pull.
MaxConcurrentDownloads *int `json:"max-concurrent-downloads,omitempty"`
// MaxConcurrentUploads is the maximum number of uploads that
// may take place at a time for each push.
MaxConcurrentUploads *int `json:"max-concurrent-uploads,omitempty"`
Debug bool `json:"debug,omitempty"`
Hosts []string `json:"hosts,omitempty"`
LogLevel string `json:"log-level,omitempty"`
@ -116,6 +135,8 @@ type CommonConfig struct {
// Subsequent calls to `flag.Parse` will populate config with values parsed
// from the command-line.
func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string) string) {
var maxConcurrentDownloads, maxConcurrentUploads int
config.ServiceOptions.InstallCliFlags(cmd, usageFn)
cmd.Var(opts.NewNamedListOptsRef("storage-opts", &config.GraphOptions, nil), []string{"-storage-opt"}, usageFn("Set storage driver options"))
@ -138,6 +159,11 @@ func (config *Config) InstallCommonFlags(cmd *flag.FlagSet, usageFn func(string)
cmd.StringVar(&config.ClusterStore, []string{"-cluster-store"}, "", usageFn("Set the cluster store"))
cmd.Var(opts.NewNamedMapOpts("cluster-store-opts", config.ClusterOpts, nil), []string{"-cluster-store-opt"}, usageFn("Set cluster store options"))
cmd.StringVar(&config.CorsHeaders, []string{"-api-cors-header"}, "", usageFn("Set CORS headers in the remote API"))
cmd.IntVar(&maxConcurrentDownloads, []string{"-max-concurrent-downloads"}, defaultMaxConcurrentDownloads, usageFn("Set the max concurrent downloads for each pull"))
cmd.IntVar(&maxConcurrentUploads, []string{"-max-concurrent-uploads"}, defaultMaxConcurrentUploads, usageFn("Set the max concurrent uploads for each push"))
config.MaxConcurrentDownloads = &maxConcurrentDownloads
config.MaxConcurrentUploads = &maxConcurrentUploads
}
// IsValueSet returns true if a configuration value
@ -355,7 +381,8 @@ func findConfigurationConflicts(config map[string]interface{}, flags *flag.FlagS
}
// validateConfiguration validates some specific configs.
// such as config.DNS, config.Labels, config.DNSSearch
// such as config.DNS, config.Labels, config.DNSSearch,
// as well as config.MaxConcurrentDownloads, config.MaxConcurrentUploads.
func validateConfiguration(config *Config) error {
// validate DNS
for _, dns := range config.DNS {
@ -378,5 +405,14 @@ func validateConfiguration(config *Config) error {
}
}
// validate MaxConcurrentDownloads
if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil && *config.MaxConcurrentDownloads < 0 {
return fmt.Errorf("invalid max concurrent downloads: %d", *config.MaxConcurrentDownloads)
}
// validate MaxConcurrentUploads
if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil && *config.MaxConcurrentUploads < 0 {
return fmt.Errorf("invalid max concurrent uploads: %d", *config.MaxConcurrentUploads)
}
return nil
}

View file

@ -71,15 +71,6 @@ import (
"golang.org/x/net/context"
)
const (
// maxDownloadConcurrency is the maximum number of downloads that
// may take place at a time for each pull.
maxDownloadConcurrency = 3
// maxUploadConcurrency is the maximum number of uploads that
// may take place at a time for each push.
maxUploadConcurrency = 5
)
var (
validContainerNameChars = utils.RestrictedNameChars
validContainerNamePattern = utils.RestrictedNamePattern
@ -719,8 +710,10 @@ func NewDaemon(config *Config, registryService *registry.Service, containerdRemo
return nil, err
}
d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, maxDownloadConcurrency)
d.uploadManager = xfer.NewLayerUploadManager(maxUploadConcurrency)
logrus.Debugf("Max Concurrent Downloads: %d", *config.MaxConcurrentDownloads)
d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, *config.MaxConcurrentDownloads)
logrus.Debugf("Max Concurrent Uploads: %d", *config.MaxConcurrentUploads)
d.uploadManager = xfer.NewLayerUploadManager(*config.MaxConcurrentUploads)
ifs, err := image.NewFSStoreBackend(filepath.Join(imageRoot, "imagedb"))
if err != nil {
@ -1510,6 +1503,8 @@ func (daemon *Daemon) initDiscovery(config *Config) error {
// These are the settings that Reload changes:
// - Daemon labels.
// - Daemon debug log level.
// - Daemon max concurrent downloads
// - Daemon max concurrent uploads
// - Cluster discovery (reconfigure and restart).
func (daemon *Daemon) Reload(config *Config) error {
daemon.configStore.reloadLock.Lock()
@ -1520,6 +1515,33 @@ func (daemon *Daemon) Reload(config *Config) error {
if config.IsValueSet("debug") {
daemon.configStore.Debug = config.Debug
}
// If no value is set for max-concurrent-downloads we assume it is the default value
// We always "reset" as the cost is lightweight and easy to maintain.
if config.IsValueSet("max-concurrent-downloads") && config.MaxConcurrentDownloads != nil {
*daemon.configStore.MaxConcurrentDownloads = *config.MaxConcurrentDownloads
} else {
maxConcurrentDownloads := defaultMaxConcurrentDownloads
daemon.configStore.MaxConcurrentDownloads = &maxConcurrentDownloads
}
logrus.Debugf("Reset Max Concurrent Downloads: %d", *daemon.configStore.MaxConcurrentDownloads)
if daemon.downloadManager != nil {
daemon.downloadManager.SetConcurrency(*daemon.configStore.MaxConcurrentDownloads)
}
// If no value is set for max-concurrent-upload we assume it is the default value
// We always "reset" as the cost is lightweight and easy to maintain.
if config.IsValueSet("max-concurrent-uploads") && config.MaxConcurrentUploads != nil {
*daemon.configStore.MaxConcurrentUploads = *config.MaxConcurrentUploads
} else {
maxConcurrentUploads := defaultMaxConcurrentUploads
daemon.configStore.MaxConcurrentUploads = &maxConcurrentUploads
}
logrus.Debugf("Reset Max Concurrent Uploads: %d", *daemon.configStore.MaxConcurrentUploads)
if daemon.uploadManager != nil {
daemon.uploadManager.SetConcurrency(*daemon.configStore.MaxConcurrentUploads)
}
return daemon.reloadClusterDiscovery(config)
}

View file

@ -25,6 +25,11 @@ type LayerDownloadManager struct {
tm TransferManager
}
// SetConcurrency set the max concurrent downloads for each pull
func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
ldm.tm.SetConcurrency(concurrency)
}
// NewLayerDownloadManager returns a new LayerDownloadManager.
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager {
return &LayerDownloadManager{

View file

@ -279,6 +279,8 @@ type TransferManager interface {
// so, it returns progress and error output from that transfer.
// Otherwise, it will call xferFunc to initiate the transfer.
Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher)
// SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload
SetConcurrency(concurrency int)
}
type transferManager struct {
@ -298,6 +300,13 @@ func NewTransferManager(concurrencyLimit int) TransferManager {
}
}
// SetConcurrency set the concurrencyLimit
func (tm *transferManager) SetConcurrency(concurrency int) {
tm.mu.Lock()
tm.concurrencyLimit = concurrency
tm.mu.Unlock()
}
// Transfer checks if a transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer.
@ -337,7 +346,7 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
start := make(chan struct{})
inactive := make(chan struct{})
if tm.activeTransfers < tm.concurrencyLimit {
if tm.concurrencyLimit == 0 || tm.activeTransfers < tm.concurrencyLimit {
close(start)
tm.activeTransfers++
} else {

View file

@ -19,6 +19,11 @@ type LayerUploadManager struct {
tm TransferManager
}
// SetConcurrency set the max concurrent uploads for each push
func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
lum.tm.SetConcurrency(concurrency)
}
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
return &LayerUploadManager{

View file

@ -54,6 +54,8 @@ weight = -1
--log-driver="json-file" Default driver for container logs
--log-opt=[] Log driver specific options
--mtu=0 Set the containers network MTU
--max-concurrent-downloads=3 Set the max concurrent downloads for each pull
--max-concurrent-uploads=5 Set the max concurrent uploads for each push
--disable-legacy-registry Do not contact legacy registries
-p, --pidfile="/var/run/docker.pid" Path to use for daemon PID file
--raw-logs Full timestamps without ANSI coloring
@ -913,6 +915,8 @@ This is a full example of the allowed configuration options in the file:
"cluster-store": "",
"cluster-store-opts": [],
"cluster-advertise": "",
"max-concurrent-downloads": 3,
"max-concurrent-uploads": 5,
"debug": true,
"hosts": [],
"log-level": "",
@ -963,6 +967,8 @@ The list of currently supported options that can be reconfigured is this:
- `cluster-store-opts`: it uses the new options to reload the discovery store.
- `cluster-advertise`: it modifies the address advertised after reloading.
- `labels`: it replaces the daemon labels with a new set of labels.
- `max-concurrent-downloads`: it updates the max concurrent downloads for each pull.
- `max-concurrent-uploads`: it updates the max concurrent uploads for each push.
Updating and reloading the cluster configurations such as `--cluster-store`,
`--cluster-advertise` and `--cluster-store-opts` will take effect only if

View file

@ -2246,3 +2246,106 @@ func (s *DockerDaemonSuite) TestDaemonLogOptions(c *check.C) {
c.Assert(err, check.IsNil, check.Commentf(out))
c.Assert(out, checker.Contains, "{json-file map[]}")
}
// Test case for #20936, #22443
func (s *DockerDaemonSuite) TestDaemonMaxConcurrency(c *check.C) {
c.Assert(s.d.Start("--max-concurrent-uploads=6", "--max-concurrent-downloads=8"), check.IsNil)
expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 6"`
expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"`
content, _ := ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
}
// Test case for #20936, #22443
func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFile(c *check.C) {
testRequires(c, SameHostDaemon, DaemonIsLinux)
// daemon config file
configFilePath := "test.json"
configFile, err := os.Create(configFilePath)
c.Assert(err, checker.IsNil)
defer os.Remove(configFilePath)
daemonConfig := `{ "max-concurrent-downloads" : 8 }`
fmt.Fprintf(configFile, "%s", daemonConfig)
configFile.Close()
c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil)
expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"`
expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 8"`
content, _ := ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
configFile, err = os.Create(configFilePath)
c.Assert(err, checker.IsNil)
daemonConfig = `{ "max-concurrent-uploads" : 7, "max-concurrent-downloads" : 9 }`
fmt.Fprintf(configFile, "%s", daemonConfig)
configFile.Close()
syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
time.Sleep(3 * time.Second)
expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 7"`
expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 9"`
content, _ = ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
}
// Test case for #20936, #22443
func (s *DockerDaemonSuite) TestDaemonMaxConcurrencyWithConfigFileReload(c *check.C) {
testRequires(c, SameHostDaemon, DaemonIsLinux)
// daemon config file
configFilePath := "test.json"
configFile, err := os.Create(configFilePath)
c.Assert(err, checker.IsNil)
defer os.Remove(configFilePath)
daemonConfig := `{ "max-concurrent-uploads" : null }`
fmt.Fprintf(configFile, "%s", daemonConfig)
configFile.Close()
c.Assert(s.d.Start(fmt.Sprintf("--config-file=%s", configFilePath)), check.IsNil)
expectedMaxConcurrentUploads := `level=debug msg="Max Concurrent Uploads: 5"`
expectedMaxConcurrentDownloads := `level=debug msg="Max Concurrent Downloads: 3"`
content, _ := ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
configFile, err = os.Create(configFilePath)
c.Assert(err, checker.IsNil)
daemonConfig = `{ "max-concurrent-uploads" : 1, "max-concurrent-downloads" : null }`
fmt.Fprintf(configFile, "%s", daemonConfig)
configFile.Close()
syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
time.Sleep(3 * time.Second)
expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 1"`
expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"`
content, _ = ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
configFile, err = os.Create(configFilePath)
c.Assert(err, checker.IsNil)
daemonConfig = `{ "labels":["foo=bar"] }`
fmt.Fprintf(configFile, "%s", daemonConfig)
configFile.Close()
syscall.Kill(s.d.cmd.Process.Pid, syscall.SIGHUP)
time.Sleep(3 * time.Second)
expectedMaxConcurrentUploads = `level=debug msg="Reset Max Concurrent Uploads: 5"`
expectedMaxConcurrentDownloads = `level=debug msg="Reset Max Concurrent Downloads: 3"`
content, _ = ioutil.ReadFile(s.d.logFile.Name())
c.Assert(string(content), checker.Contains, expectedMaxConcurrentUploads)
c.Assert(string(content), checker.Contains, expectedMaxConcurrentDownloads)
}

View file

@ -44,6 +44,8 @@ dockerd - Enable daemon mode
[**--log-driver**[=*json-file*]]
[**--log-opt**[=*map[]*]]
[**--mtu**[=*0*]]
[**--max-concurrent-downloads**[=*3*]]
[**--max-concurrent-uploads**[=*5*]]
[**-p**|**--pidfile**[=*/var/run/docker.pid*]]
[**--raw-logs**]
[**--registry-mirror**[=*[]*]]
@ -197,6 +199,12 @@ unix://[/path/to/socket] to use.
**--mtu**=*0*
Set the containers network mtu. Default is `0`.
**--max-concurrent-downloads**=*3*
Set the max concurrent downloads for each pull. Default is `3`.
**--max-concurrent-uploads**=*5*
Set the max concurrent uploads for each push. Default is `5`.
**-p**, **--pidfile**=""
Path to use for daemon PID file. Default is `/var/run/docker.pid`